From 4e1806947d2a181d6a43828611ac69e3a6a01d18 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 21 Jan 2026 17:29:39 +0000 Subject: [PATCH] fix: normalize abort signals for fetch --- docs/channels/telegram.md | 4 ++++ src/discord/api.ts | 8 ++++++- src/discord/probe.ts | 7 +++++- src/infra/fetch.ts | 6 ++++++ src/infra/provider-usage.load.ts | 6 +++++- src/signal/client.ts | 14 ++++++++++-- src/telegram/fetch.test.ts | 37 ++++++++++++++++++++++++++++++++ src/telegram/fetch.ts | 8 +++---- 8 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 src/telegram/fetch.test.ts diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index 3c894303e..da29b3c90 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -484,6 +484,10 @@ The agent sees reactions as **system notifications** in the conversation history - Make sure your Telegram user ID is authorized (via pairing or `channels.telegram.allowFrom`) - Commands require authorization even in groups with `groupPolicy: "open"` +**Long-polling aborts immediately on Node 22+ (often with proxies/custom fetch):** +- Node 22+ is stricter about `AbortSignal` instances; foreign signals can abort `fetch` calls right away. +- Upgrade to a Clawdbot build that normalizes abort signals, or run the gateway on Node 20 until you can upgrade. + **Bot starts, then silently stops responding (or logs `HttpError: Network request ... failed`):** - Some hosts resolve `api.telegram.org` to IPv6 first. If your server does not have working IPv6 egress, grammY can get stuck on IPv6-only requests. - Fix by enabling IPv6 egress **or** forcing IPv4 resolution for `api.telegram.org` (for example, add an `/etc/hosts` entry using the IPv4 A record, or prefer IPv4 in your OS DNS stack), then restart the gateway. diff --git a/src/discord/api.ts b/src/discord/api.ts index 6be8b4a0b..de72c8ba1 100644 --- a/src/discord/api.ts +++ b/src/discord/api.ts @@ -1,3 +1,5 @@ +import { resolveFetch } from "../infra/fetch.js"; + const DISCORD_API_BASE = "https://discord.com/api/v10"; type DiscordApiErrorPayload = { @@ -48,7 +50,11 @@ export async function fetchDiscord( token: string, fetcher: typeof fetch = fetch, ): Promise { - const res = await fetcher(`${DISCORD_API_BASE}${path}`, { + const fetchImpl = resolveFetch(fetcher); + if (!fetchImpl) { + throw new Error("fetch is not available"); + } + const res = await fetchImpl(`${DISCORD_API_BASE}${path}`, { headers: { Authorization: `Bot ${token}` }, }); if (!res.ok) { diff --git a/src/discord/probe.ts b/src/discord/probe.ts index 21c6d1922..78175c3b9 100644 --- a/src/discord/probe.ts +++ b/src/discord/probe.ts @@ -1,3 +1,4 @@ +import { resolveFetch } from "../infra/fetch.js"; import { normalizeDiscordToken } from "./token.js"; const DISCORD_API_BASE = "https://discord.com/api/v10"; @@ -90,10 +91,14 @@ async function fetchWithTimeout( fetcher: typeof fetch, headers?: HeadersInit, ): Promise { + const fetchImpl = resolveFetch(fetcher); + if (!fetchImpl) { + throw new Error("fetch is not available"); + } const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), timeoutMs); try { - return await fetcher(url, { signal: controller.signal, headers }); + return await fetchImpl(url, { signal: controller.signal, headers }); } finally { clearTimeout(timer); } diff --git a/src/infra/fetch.ts b/src/infra/fetch.ts index 6a472253b..70ab6f614 100644 --- a/src/infra/fetch.ts +++ b/src/infra/fetch.ts @@ -28,3 +28,9 @@ export function wrapFetchWithAbortSignal(fetchImpl: typeof fetch): typeof fetch }) as typeof fetch; return Object.assign(wrapped, fetchImpl); } + +export function resolveFetch(fetchImpl?: typeof fetch): typeof fetch | undefined { + const resolved = fetchImpl ?? globalThis.fetch; + if (!resolved) return undefined; + return wrapFetchWithAbortSignal(resolved); +} diff --git a/src/infra/provider-usage.load.ts b/src/infra/provider-usage.load.ts index 7fdf8de3e..676ac9920 100644 --- a/src/infra/provider-usage.load.ts +++ b/src/infra/provider-usage.load.ts @@ -19,6 +19,7 @@ import type { UsageProviderId, UsageSummary, } from "./provider-usage.types.js"; +import { resolveFetch } from "./fetch.js"; type UsageSummaryOptions = { now?: number; @@ -34,7 +35,10 @@ export async function loadProviderUsageSummary( ): Promise { const now = opts.now ?? Date.now(); const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; - const fetchFn = opts.fetch ?? fetch; + const fetchFn = resolveFetch(opts.fetch); + if (!fetchFn) { + throw new Error("fetch is not available"); + } const auths = await resolveProviderAuths({ providers: opts.providers ?? usageProviders, diff --git a/src/signal/client.ts b/src/signal/client.ts index 925cc0c63..5595edb5b 100644 --- a/src/signal/client.ts +++ b/src/signal/client.ts @@ -1,5 +1,7 @@ import { randomUUID } from "node:crypto"; +import { resolveFetch } from "../infra/fetch.js"; + export type SignalRpcOptions = { baseUrl: string; timeoutMs?: number; @@ -36,10 +38,14 @@ function normalizeBaseUrl(url: string): string { } async function fetchWithTimeout(url: string, init: RequestInit, timeoutMs: number) { + const fetchImpl = resolveFetch(); + if (!fetchImpl) { + throw new Error("fetch is not available"); + } const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), timeoutMs); try { - return await fetch(url, { ...init, signal: controller.signal }); + return await fetchImpl(url, { ...init, signal: controller.signal }); } finally { clearTimeout(timer); } @@ -113,7 +119,11 @@ export async function streamSignalEvents(params: { const url = new URL(`${baseUrl}/api/v1/events`); if (params.account) url.searchParams.set("account", params.account); - const res = await fetch(url, { + const fetchImpl = resolveFetch(); + if (!fetchImpl) { + throw new Error("fetch is not available"); + } + const res = await fetchImpl(url, { method: "GET", headers: { Accept: "text/event-stream" }, signal: params.abortSignal, diff --git a/src/telegram/fetch.test.ts b/src/telegram/fetch.test.ts new file mode 100644 index 000000000..f1a2353c2 --- /dev/null +++ b/src/telegram/fetch.test.ts @@ -0,0 +1,37 @@ +import { describe, expect, it, vi } from "vitest"; + +import { resolveTelegramFetch } from "./fetch.js"; + +describe("resolveTelegramFetch", () => { + it("wraps proxy fetch to normalize foreign abort signals", async () => { + let seenSignal: AbortSignal | undefined; + const proxyFetch = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => { + seenSignal = init?.signal as AbortSignal | undefined; + return {} as Response; + }); + + const fetcher = resolveTelegramFetch(proxyFetch); + expect(fetcher).toBeTypeOf("function"); + + let abortHandler: (() => void) | null = null; + const fakeSignal = { + aborted: false, + addEventListener: (event: string, handler: () => void) => { + if (event === "abort") abortHandler = handler; + }, + removeEventListener: (event: string, handler: () => void) => { + if (event === "abort" && abortHandler === handler) abortHandler = null; + }, + } as AbortSignal; + + const promise = fetcher!("https://example.com", { signal: fakeSignal }); + expect(proxyFetch).toHaveBeenCalledOnce(); + expect(seenSignal).toBeInstanceOf(AbortSignal); + expect(seenSignal).not.toBe(fakeSignal); + + abortHandler?.(); + expect(seenSignal?.aborted).toBe(true); + + await promise; + }); +}); diff --git a/src/telegram/fetch.ts b/src/telegram/fetch.ts index 1c4a288d0..ee1c6780c 100644 --- a/src/telegram/fetch.ts +++ b/src/telegram/fetch.ts @@ -1,13 +1,13 @@ -import { wrapFetchWithAbortSignal } from "../infra/fetch.js"; +import { resolveFetch } from "../infra/fetch.js"; // Bun-only: force native fetch to avoid grammY's Node shim under Bun. export function resolveTelegramFetch(proxyFetch?: typeof fetch): typeof fetch | undefined { - if (proxyFetch) return wrapFetchWithAbortSignal(proxyFetch); - const fetchImpl = globalThis.fetch; + if (proxyFetch) return resolveFetch(proxyFetch); const isBun = "Bun" in globalThis || Boolean(process?.versions?.bun); if (!isBun) return undefined; + const fetchImpl = resolveFetch(); if (!fetchImpl) { throw new Error("fetch is not available; set channels.telegram.proxy in config"); } - return wrapFetchWithAbortSignal(fetchImpl); + return fetchImpl; }