From 58c7c61e6222c68b1d052eb0640b75911f1d2b0a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 25 Jan 2026 04:04:42 +0000 Subject: [PATCH] fix: add duplex for fetch uploads --- CHANGELOG.md | 1 + src/infra/fetch.test.ts | 14 ++++++++++++++ src/infra/fetch.ts | 32 ++++++++++++++++++++++++++------ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 604acc8c1..8d82acd4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.clawd.bot - Heartbeat: normalize target identifiers for consistent routing. - TUI: reload history after gateway reconnect to restore session state. (#1663) - Telegram: use wrapped fetch for long-polling on Node to normalize AbortSignal handling. (#1639) +- Telegram: set fetch duplex="half" for uploads on Node 22 to avoid sendPhoto failures. (#1684) Thanks @commdata2338. - Signal: repair reaction sends (group/UUID targets + CLI author flags). (#1651) Thanks @vilkasdev. - Exec: keep approvals for elevated ask unless full mode. (#1616) Thanks @ivancasco. - Agents: auto-compact on context overflow prompt errors before failing. (#1627) Thanks @rodrigouroz. diff --git a/src/infra/fetch.test.ts b/src/infra/fetch.test.ts index 9c286d4be..6a41f71f5 100644 --- a/src/infra/fetch.test.ts +++ b/src/infra/fetch.test.ts @@ -3,6 +3,20 @@ import { describe, expect, it, vi } from "vitest"; import { wrapFetchWithAbortSignal } from "./fetch.js"; describe("wrapFetchWithAbortSignal", () => { + it("adds duplex for requests with a body", async () => { + let seenInit: RequestInit | undefined; + const fetchImpl = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => { + seenInit = init; + return {} as Response; + }); + + const wrapped = wrapFetchWithAbortSignal(fetchImpl); + + await wrapped("https://example.com", { method: "POST", body: "hi" }); + + expect(seenInit?.duplex).toBe("half"); + }); + it("converts foreign abort signals to native controllers", async () => { let seenSignal: AbortSignal | undefined; const fetchImpl = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => { diff --git a/src/infra/fetch.ts b/src/infra/fetch.ts index 2f9993c7a..61012e485 100644 --- a/src/infra/fetch.ts +++ b/src/infra/fetch.ts @@ -2,18 +2,38 @@ type FetchWithPreconnect = typeof fetch & { preconnect: (url: string, init?: { credentials?: RequestCredentials }) => void; }; +type RequestInitWithDuplex = RequestInit & { duplex?: "half" }; + +function withDuplex( + init: RequestInit | undefined, + input: RequestInfo | URL, +): RequestInit | undefined { + const hasInitBody = init?.body != null; + const hasRequestBody = + !hasInitBody && + typeof Request !== "undefined" && + input instanceof Request && + input.body != null; + if (!hasInitBody && !hasRequestBody) return init; + if (init && "duplex" in (init as Record)) return init; + return init + ? ({ ...init, duplex: "half" as const } as RequestInitWithDuplex) + : ({ duplex: "half" as const } as RequestInitWithDuplex); +} + export function wrapFetchWithAbortSignal(fetchImpl: typeof fetch): typeof fetch { const wrapped = ((input: RequestInfo | URL, init?: RequestInit) => { - const signal = init?.signal; - if (!signal) return fetchImpl(input, init); + const patchedInit = withDuplex(init, input); + const signal = patchedInit?.signal; + if (!signal) return fetchImpl(input, patchedInit); if (typeof AbortSignal !== "undefined" && signal instanceof AbortSignal) { - return fetchImpl(input, init); + return fetchImpl(input, patchedInit); } if (typeof AbortController === "undefined") { - return fetchImpl(input, init); + return fetchImpl(input, patchedInit); } if (typeof signal.addEventListener !== "function") { - return fetchImpl(input, init); + return fetchImpl(input, patchedInit); } const controller = new AbortController(); const onAbort = () => controller.abort(); @@ -22,7 +42,7 @@ export function wrapFetchWithAbortSignal(fetchImpl: typeof fetch): typeof fetch } else { signal.addEventListener("abort", onAbort, { once: true }); } - const response = fetchImpl(input, { ...init, signal: controller.signal }); + const response = fetchImpl(input, { ...patchedInit, signal: controller.signal }); if (typeof signal.removeEventListener === "function") { void response.finally(() => { signal.removeEventListener("abort", onAbort);