fix: normalize abort signals for fetch
This commit is contained in:
@@ -484,6 +484,10 @@ The agent sees reactions as **system notifications** in the conversation history
|
|||||||
- Make sure your Telegram user ID is authorized (via pairing or `channels.telegram.allowFrom`)
|
- Make sure your Telegram user ID is authorized (via pairing or `channels.telegram.allowFrom`)
|
||||||
- Commands require authorization even in groups with `groupPolicy: "open"`
|
- Commands require authorization even in groups with `groupPolicy: "open"`
|
||||||
|
|
||||||
|
**Long-polling aborts immediately on Node 22+ (often with proxies/custom fetch):**
|
||||||
|
- Node 22+ is stricter about `AbortSignal` instances; foreign signals can abort `fetch` calls right away.
|
||||||
|
- Upgrade to a Clawdbot build that normalizes abort signals, or run the gateway on Node 20 until you can upgrade.
|
||||||
|
|
||||||
**Bot starts, then silently stops responding (or logs `HttpError: Network request ... failed`):**
|
**Bot starts, then silently stops responding (or logs `HttpError: Network request ... failed`):**
|
||||||
- Some hosts resolve `api.telegram.org` to IPv6 first. If your server does not have working IPv6 egress, grammY can get stuck on IPv6-only requests.
|
- Some hosts resolve `api.telegram.org` to IPv6 first. If your server does not have working IPv6 egress, grammY can get stuck on IPv6-only requests.
|
||||||
- Fix by enabling IPv6 egress **or** forcing IPv4 resolution for `api.telegram.org` (for example, add an `/etc/hosts` entry using the IPv4 A record, or prefer IPv4 in your OS DNS stack), then restart the gateway.
|
- Fix by enabling IPv6 egress **or** forcing IPv4 resolution for `api.telegram.org` (for example, add an `/etc/hosts` entry using the IPv4 A record, or prefer IPv4 in your OS DNS stack), then restart the gateway.
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import { resolveFetch } from "../infra/fetch.js";
|
||||||
|
|
||||||
const DISCORD_API_BASE = "https://discord.com/api/v10";
|
const DISCORD_API_BASE = "https://discord.com/api/v10";
|
||||||
|
|
||||||
type DiscordApiErrorPayload = {
|
type DiscordApiErrorPayload = {
|
||||||
@@ -48,7 +50,11 @@ export async function fetchDiscord<T>(
|
|||||||
token: string,
|
token: string,
|
||||||
fetcher: typeof fetch = fetch,
|
fetcher: typeof fetch = fetch,
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
const res = await fetcher(`${DISCORD_API_BASE}${path}`, {
|
const fetchImpl = resolveFetch(fetcher);
|
||||||
|
if (!fetchImpl) {
|
||||||
|
throw new Error("fetch is not available");
|
||||||
|
}
|
||||||
|
const res = await fetchImpl(`${DISCORD_API_BASE}${path}`, {
|
||||||
headers: { Authorization: `Bot ${token}` },
|
headers: { Authorization: `Bot ${token}` },
|
||||||
});
|
});
|
||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { resolveFetch } from "../infra/fetch.js";
|
||||||
import { normalizeDiscordToken } from "./token.js";
|
import { normalizeDiscordToken } from "./token.js";
|
||||||
|
|
||||||
const DISCORD_API_BASE = "https://discord.com/api/v10";
|
const DISCORD_API_BASE = "https://discord.com/api/v10";
|
||||||
@@ -90,10 +91,14 @@ async function fetchWithTimeout(
|
|||||||
fetcher: typeof fetch,
|
fetcher: typeof fetch,
|
||||||
headers?: HeadersInit,
|
headers?: HeadersInit,
|
||||||
): Promise<Response> {
|
): Promise<Response> {
|
||||||
|
const fetchImpl = resolveFetch(fetcher);
|
||||||
|
if (!fetchImpl) {
|
||||||
|
throw new Error("fetch is not available");
|
||||||
|
}
|
||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||||
try {
|
try {
|
||||||
return await fetcher(url, { signal: controller.signal, headers });
|
return await fetchImpl(url, { signal: controller.signal, headers });
|
||||||
} finally {
|
} finally {
|
||||||
clearTimeout(timer);
|
clearTimeout(timer);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,3 +28,9 @@ export function wrapFetchWithAbortSignal(fetchImpl: typeof fetch): typeof fetch
|
|||||||
}) as typeof fetch;
|
}) as typeof fetch;
|
||||||
return Object.assign(wrapped, fetchImpl);
|
return Object.assign(wrapped, fetchImpl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function resolveFetch(fetchImpl?: typeof fetch): typeof fetch | undefined {
|
||||||
|
const resolved = fetchImpl ?? globalThis.fetch;
|
||||||
|
if (!resolved) return undefined;
|
||||||
|
return wrapFetchWithAbortSignal(resolved);
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import type {
|
|||||||
UsageProviderId,
|
UsageProviderId,
|
||||||
UsageSummary,
|
UsageSummary,
|
||||||
} from "./provider-usage.types.js";
|
} from "./provider-usage.types.js";
|
||||||
|
import { resolveFetch } from "./fetch.js";
|
||||||
|
|
||||||
type UsageSummaryOptions = {
|
type UsageSummaryOptions = {
|
||||||
now?: number;
|
now?: number;
|
||||||
@@ -34,7 +35,10 @@ export async function loadProviderUsageSummary(
|
|||||||
): Promise<UsageSummary> {
|
): Promise<UsageSummary> {
|
||||||
const now = opts.now ?? Date.now();
|
const now = opts.now ?? Date.now();
|
||||||
const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS;
|
const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS;
|
||||||
const fetchFn = opts.fetch ?? fetch;
|
const fetchFn = resolveFetch(opts.fetch);
|
||||||
|
if (!fetchFn) {
|
||||||
|
throw new Error("fetch is not available");
|
||||||
|
}
|
||||||
|
|
||||||
const auths = await resolveProviderAuths({
|
const auths = await resolveProviderAuths({
|
||||||
providers: opts.providers ?? usageProviders,
|
providers: opts.providers ?? usageProviders,
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
import { randomUUID } from "node:crypto";
|
import { randomUUID } from "node:crypto";
|
||||||
|
|
||||||
|
import { resolveFetch } from "../infra/fetch.js";
|
||||||
|
|
||||||
export type SignalRpcOptions = {
|
export type SignalRpcOptions = {
|
||||||
baseUrl: string;
|
baseUrl: string;
|
||||||
timeoutMs?: number;
|
timeoutMs?: number;
|
||||||
@@ -36,10 +38,14 @@ function normalizeBaseUrl(url: string): string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function fetchWithTimeout(url: string, init: RequestInit, timeoutMs: number) {
|
async function fetchWithTimeout(url: string, init: RequestInit, timeoutMs: number) {
|
||||||
|
const fetchImpl = resolveFetch();
|
||||||
|
if (!fetchImpl) {
|
||||||
|
throw new Error("fetch is not available");
|
||||||
|
}
|
||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
const timer = setTimeout(() => controller.abort(), timeoutMs);
|
||||||
try {
|
try {
|
||||||
return await fetch(url, { ...init, signal: controller.signal });
|
return await fetchImpl(url, { ...init, signal: controller.signal });
|
||||||
} finally {
|
} finally {
|
||||||
clearTimeout(timer);
|
clearTimeout(timer);
|
||||||
}
|
}
|
||||||
@@ -113,7 +119,11 @@ export async function streamSignalEvents(params: {
|
|||||||
const url = new URL(`${baseUrl}/api/v1/events`);
|
const url = new URL(`${baseUrl}/api/v1/events`);
|
||||||
if (params.account) url.searchParams.set("account", params.account);
|
if (params.account) url.searchParams.set("account", params.account);
|
||||||
|
|
||||||
const res = await fetch(url, {
|
const fetchImpl = resolveFetch();
|
||||||
|
if (!fetchImpl) {
|
||||||
|
throw new Error("fetch is not available");
|
||||||
|
}
|
||||||
|
const res = await fetchImpl(url, {
|
||||||
method: "GET",
|
method: "GET",
|
||||||
headers: { Accept: "text/event-stream" },
|
headers: { Accept: "text/event-stream" },
|
||||||
signal: params.abortSignal,
|
signal: params.abortSignal,
|
||||||
|
|||||||
37
src/telegram/fetch.test.ts
Normal file
37
src/telegram/fetch.test.ts
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
import { describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
import { resolveTelegramFetch } from "./fetch.js";
|
||||||
|
|
||||||
|
describe("resolveTelegramFetch", () => {
|
||||||
|
it("wraps proxy fetch to normalize foreign abort signals", async () => {
|
||||||
|
let seenSignal: AbortSignal | undefined;
|
||||||
|
const proxyFetch = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => {
|
||||||
|
seenSignal = init?.signal as AbortSignal | undefined;
|
||||||
|
return {} as Response;
|
||||||
|
});
|
||||||
|
|
||||||
|
const fetcher = resolveTelegramFetch(proxyFetch);
|
||||||
|
expect(fetcher).toBeTypeOf("function");
|
||||||
|
|
||||||
|
let abortHandler: (() => void) | null = null;
|
||||||
|
const fakeSignal = {
|
||||||
|
aborted: false,
|
||||||
|
addEventListener: (event: string, handler: () => void) => {
|
||||||
|
if (event === "abort") abortHandler = handler;
|
||||||
|
},
|
||||||
|
removeEventListener: (event: string, handler: () => void) => {
|
||||||
|
if (event === "abort" && abortHandler === handler) abortHandler = null;
|
||||||
|
},
|
||||||
|
} as AbortSignal;
|
||||||
|
|
||||||
|
const promise = fetcher!("https://example.com", { signal: fakeSignal });
|
||||||
|
expect(proxyFetch).toHaveBeenCalledOnce();
|
||||||
|
expect(seenSignal).toBeInstanceOf(AbortSignal);
|
||||||
|
expect(seenSignal).not.toBe(fakeSignal);
|
||||||
|
|
||||||
|
abortHandler?.();
|
||||||
|
expect(seenSignal?.aborted).toBe(true);
|
||||||
|
|
||||||
|
await promise;
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -1,13 +1,13 @@
|
|||||||
import { wrapFetchWithAbortSignal } from "../infra/fetch.js";
|
import { resolveFetch } from "../infra/fetch.js";
|
||||||
|
|
||||||
// Bun-only: force native fetch to avoid grammY's Node shim under Bun.
|
// Bun-only: force native fetch to avoid grammY's Node shim under Bun.
|
||||||
export function resolveTelegramFetch(proxyFetch?: typeof fetch): typeof fetch | undefined {
|
export function resolveTelegramFetch(proxyFetch?: typeof fetch): typeof fetch | undefined {
|
||||||
if (proxyFetch) return wrapFetchWithAbortSignal(proxyFetch);
|
if (proxyFetch) return resolveFetch(proxyFetch);
|
||||||
const fetchImpl = globalThis.fetch;
|
|
||||||
const isBun = "Bun" in globalThis || Boolean(process?.versions?.bun);
|
const isBun = "Bun" in globalThis || Boolean(process?.versions?.bun);
|
||||||
if (!isBun) return undefined;
|
if (!isBun) return undefined;
|
||||||
|
const fetchImpl = resolveFetch();
|
||||||
if (!fetchImpl) {
|
if (!fetchImpl) {
|
||||||
throw new Error("fetch is not available; set channels.telegram.proxy in config");
|
throw new Error("fetch is not available; set channels.telegram.proxy in config");
|
||||||
}
|
}
|
||||||
return wrapFetchWithAbortSignal(fetchImpl);
|
return fetchImpl;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user