Telegram: harden network retries and config
Co-authored-by: techboss <techboss@users.noreply.github.com>
This commit is contained in:
committed by
Gustavo Madeira Santana
parent
e43f4c0628
commit
b861a0bd73
@@ -310,6 +310,7 @@ const FIELD_LABELS: Record<string, string> = {
|
||||
"channels.telegram.retry.minDelayMs": "Telegram Retry Min Delay (ms)",
|
||||
"channels.telegram.retry.maxDelayMs": "Telegram Retry Max Delay (ms)",
|
||||
"channels.telegram.retry.jitter": "Telegram Retry Jitter",
|
||||
"channels.telegram.network.autoSelectFamily": "Telegram autoSelectFamily",
|
||||
"channels.telegram.timeoutSeconds": "Telegram API Timeout (seconds)",
|
||||
"channels.telegram.capabilities.inlineButtons": "Telegram Inline Buttons",
|
||||
"channels.whatsapp.dmPolicy": "WhatsApp DM Policy",
|
||||
@@ -643,6 +644,8 @@ const FIELD_HELP: Record<string, string> = {
|
||||
"channels.telegram.retry.maxDelayMs":
|
||||
"Maximum retry delay cap in ms for Telegram outbound calls.",
|
||||
"channels.telegram.retry.jitter": "Jitter factor (0-1) applied to Telegram retry delays.",
|
||||
"channels.telegram.network.autoSelectFamily":
|
||||
"Override Node autoSelectFamily for Telegram (true=enable, false=disable).",
|
||||
"channels.telegram.timeoutSeconds":
|
||||
"Max seconds before Telegram API requests are aborted (default: 500 per grammY).",
|
||||
"channels.whatsapp.dmPolicy":
|
||||
|
||||
@@ -18,6 +18,11 @@ export type TelegramActionConfig = {
|
||||
editMessage?: boolean;
|
||||
};
|
||||
|
||||
export type TelegramNetworkConfig = {
|
||||
/** Override Node's autoSelectFamily behavior (true = enable, false = disable). */
|
||||
autoSelectFamily?: boolean;
|
||||
};
|
||||
|
||||
export type TelegramInlineButtonsScope = "off" | "dm" | "group" | "all" | "allowlist";
|
||||
|
||||
export type TelegramCapabilitiesConfig =
|
||||
@@ -96,6 +101,8 @@ export type TelegramAccountConfig = {
|
||||
timeoutSeconds?: number;
|
||||
/** Retry policy for outbound Telegram API calls. */
|
||||
retry?: OutboundRetryConfig;
|
||||
/** Network transport overrides for Telegram. */
|
||||
network?: TelegramNetworkConfig;
|
||||
proxy?: string;
|
||||
webhookUrl?: string;
|
||||
webhookSecret?: string;
|
||||
|
||||
@@ -110,6 +110,12 @@ export const TelegramAccountSchemaBase = z
|
||||
mediaMaxMb: z.number().positive().optional(),
|
||||
timeoutSeconds: z.number().int().positive().optional(),
|
||||
retry: RetryConfigSchema,
|
||||
network: z
|
||||
.object({
|
||||
autoSelectFamily: z.boolean().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
proxy: z.string().optional(),
|
||||
webhookUrl: z.string().optional(),
|
||||
webhookSecret: z.string().optional(),
|
||||
|
||||
27
src/infra/retry-policy.test.ts
Normal file
27
src/infra/retry-policy.test.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { createTelegramRetryRunner } from "./retry-policy.js";
|
||||
|
||||
describe("createTelegramRetryRunner", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("retries when custom shouldRetry matches non-telegram error", async () => {
|
||||
vi.useFakeTimers();
|
||||
const runner = createTelegramRetryRunner({
|
||||
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
|
||||
shouldRetry: (err) => err instanceof Error && err.message === "boom",
|
||||
});
|
||||
const fn = vi
|
||||
.fn<[], Promise<string>>()
|
||||
.mockRejectedValueOnce(new Error("boom"))
|
||||
.mockResolvedValue("ok");
|
||||
|
||||
const promise = runner(fn, "request");
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
await expect(promise).resolves.toBe("ok");
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@@ -72,16 +72,21 @@ export function createTelegramRetryRunner(params: {
|
||||
retry?: RetryConfig;
|
||||
configRetry?: RetryConfig;
|
||||
verbose?: boolean;
|
||||
shouldRetry?: (err: unknown) => boolean;
|
||||
}): RetryRunner {
|
||||
const retryConfig = resolveRetryConfig(TELEGRAM_RETRY_DEFAULTS, {
|
||||
...params.configRetry,
|
||||
...params.retry,
|
||||
});
|
||||
const shouldRetry = params.shouldRetry
|
||||
? (err: unknown) => params.shouldRetry?.(err) || TELEGRAM_RETRY_RE.test(formatErrorMessage(err))
|
||||
: (err: unknown) => TELEGRAM_RETRY_RE.test(formatErrorMessage(err));
|
||||
|
||||
return <T>(fn: () => Promise<T>, label?: string) =>
|
||||
retryAsync(fn, {
|
||||
...retryConfig,
|
||||
label,
|
||||
shouldRetry: (err) => TELEGRAM_RETRY_RE.test(formatErrorMessage(err)),
|
||||
shouldRetry,
|
||||
retryAfterMs: getTelegramRetryAfterMs,
|
||||
onRetry: params.verbose
|
||||
? (info) => {
|
||||
|
||||
@@ -89,6 +89,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -88,6 +88,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -88,6 +88,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -88,6 +88,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -90,6 +90,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: {
|
||||
|
||||
@@ -88,6 +88,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -88,6 +88,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -88,6 +88,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -93,6 +93,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -32,6 +32,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
command = vi.fn();
|
||||
stop = stopSpy;
|
||||
catch = vi.fn();
|
||||
constructor(public token: string) {}
|
||||
},
|
||||
InputFile: class {},
|
||||
|
||||
@@ -30,6 +30,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
command = vi.fn();
|
||||
stop = stopSpy;
|
||||
catch = vi.fn();
|
||||
constructor(public token: string) {}
|
||||
},
|
||||
InputFile: class {},
|
||||
|
||||
@@ -126,6 +126,7 @@ vi.mock("grammy", () => ({
|
||||
on = onSpy;
|
||||
stop = stopSpy;
|
||||
command = commandSpy;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -21,6 +21,7 @@ import {
|
||||
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
|
||||
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { formatUncaughtError } from "../infra/errors.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { resolveAgentRoute } from "../routing/resolve-route.js";
|
||||
@@ -118,7 +119,9 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
});
|
||||
const telegramCfg = account.config;
|
||||
|
||||
const fetchImpl = resolveTelegramFetch(opts.proxyFetch);
|
||||
const fetchImpl = resolveTelegramFetch(opts.proxyFetch, {
|
||||
network: telegramCfg.network,
|
||||
});
|
||||
const shouldProvideFetch = Boolean(fetchImpl);
|
||||
const timeoutSeconds =
|
||||
typeof telegramCfg?.timeoutSeconds === "number" && Number.isFinite(telegramCfg.timeoutSeconds)
|
||||
@@ -137,6 +140,9 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
const bot = new Bot(opts.token, client ? { client } : undefined);
|
||||
bot.api.config.use(apiThrottler());
|
||||
bot.use(sequentialize(getTelegramSequentialKey));
|
||||
bot.catch((err) => {
|
||||
runtime.error?.(danger(`telegram bot error: ${formatUncaughtError(err)}`));
|
||||
});
|
||||
|
||||
// Catch all errors from bot middleware to prevent unhandled rejections
|
||||
bot.catch((err) => {
|
||||
|
||||
@@ -1,11 +1,21 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
|
||||
describe("resolveTelegramFetch", () => {
|
||||
const originalFetch = globalThis.fetch;
|
||||
|
||||
const loadModule = async () => {
|
||||
const setDefaultAutoSelectFamily = vi.fn();
|
||||
vi.resetModules();
|
||||
vi.doMock("node:net", () => ({
|
||||
setDefaultAutoSelectFamily,
|
||||
}));
|
||||
const mod = await import("./fetch.js");
|
||||
return { resolveTelegramFetch: mod.resolveTelegramFetch, setDefaultAutoSelectFamily };
|
||||
};
|
||||
|
||||
afterEach(() => {
|
||||
vi.unstubAllEnvs();
|
||||
vi.clearAllMocks();
|
||||
if (originalFetch) {
|
||||
globalThis.fetch = originalFetch;
|
||||
} else {
|
||||
@@ -13,16 +23,41 @@ describe("resolveTelegramFetch", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("returns wrapped global fetch when available", () => {
|
||||
it("returns wrapped global fetch when available", async () => {
|
||||
const fetchMock = vi.fn(async () => ({}));
|
||||
globalThis.fetch = fetchMock as unknown as typeof fetch;
|
||||
const { resolveTelegramFetch } = await loadModule();
|
||||
const resolved = resolveTelegramFetch();
|
||||
expect(resolved).toBeTypeOf("function");
|
||||
});
|
||||
|
||||
it("prefers proxy fetch when provided", () => {
|
||||
it("prefers proxy fetch when provided", async () => {
|
||||
const fetchMock = vi.fn(async () => ({}));
|
||||
const { resolveTelegramFetch } = await loadModule();
|
||||
const resolved = resolveTelegramFetch(fetchMock as unknown as typeof fetch);
|
||||
expect(resolved).toBeTypeOf("function");
|
||||
});
|
||||
|
||||
it("honors env enable override", async () => {
|
||||
vi.stubEnv("CLAWDBOT_TELEGRAM_ENABLE_AUTO_SELECT_FAMILY", "1");
|
||||
globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch;
|
||||
const { resolveTelegramFetch, setDefaultAutoSelectFamily } = await loadModule();
|
||||
resolveTelegramFetch();
|
||||
expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(true);
|
||||
});
|
||||
|
||||
it("uses config override when provided", async () => {
|
||||
globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch;
|
||||
const { resolveTelegramFetch, setDefaultAutoSelectFamily } = await loadModule();
|
||||
resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } });
|
||||
expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(true);
|
||||
});
|
||||
|
||||
it("env disable override wins over config", async () => {
|
||||
vi.stubEnv("CLAWDBOT_TELEGRAM_DISABLE_AUTO_SELECT_FAMILY", "1");
|
||||
globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch;
|
||||
const { resolveTelegramFetch, setDefaultAutoSelectFamily } = await loadModule();
|
||||
resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } });
|
||||
expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,19 +1,36 @@
|
||||
import { setDefaultAutoSelectFamily } from "net";
|
||||
import * as net from "node:net";
|
||||
import { resolveFetch } from "../infra/fetch.js";
|
||||
import type { TelegramNetworkConfig } from "../config/types.telegram.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { resolveTelegramAutoSelectFamilyDecision } from "./network-config.js";
|
||||
|
||||
// Workaround for Node.js 22 "Happy Eyeballs" (autoSelectFamily) bug
|
||||
// that causes intermittent ETIMEDOUT errors when connecting to Telegram's
|
||||
// dual-stack servers. Disabling autoSelectFamily forces sequential IPv4/IPv6
|
||||
// attempts which works reliably.
|
||||
let appliedAutoSelectFamily: boolean | null = null;
|
||||
const log = createSubsystemLogger("telegram/network");
|
||||
|
||||
// Node 22 workaround: disable autoSelectFamily to avoid Happy Eyeballs timeouts.
|
||||
// See: https://github.com/nodejs/node/issues/54359
|
||||
try {
|
||||
setDefaultAutoSelectFamily(false);
|
||||
} catch {
|
||||
// Ignore if not available (older Node versions)
|
||||
function applyTelegramNetworkWorkarounds(network?: TelegramNetworkConfig): void {
|
||||
const decision = resolveTelegramAutoSelectFamilyDecision({ network });
|
||||
if (decision.value === null || decision.value === appliedAutoSelectFamily) return;
|
||||
appliedAutoSelectFamily = decision.value;
|
||||
|
||||
if (typeof net.setDefaultAutoSelectFamily === "function") {
|
||||
try {
|
||||
net.setDefaultAutoSelectFamily(decision.value);
|
||||
const label = decision.source ? ` (${decision.source})` : "";
|
||||
log.info(`telegram: autoSelectFamily=${decision.value}${label}`);
|
||||
} catch {
|
||||
// ignore if unsupported by the runtime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Prefer wrapped fetch when available to normalize AbortSignal across runtimes.
|
||||
export function resolveTelegramFetch(proxyFetch?: typeof fetch): typeof fetch | undefined {
|
||||
export function resolveTelegramFetch(
|
||||
proxyFetch?: typeof fetch,
|
||||
options?: { network?: TelegramNetworkConfig },
|
||||
): typeof fetch | undefined {
|
||||
applyTelegramNetworkWorkarounds(options?.network);
|
||||
if (proxyFetch) return resolveFetch(proxyFetch);
|
||||
const fetchImpl = resolveFetch();
|
||||
if (!fetchImpl) {
|
||||
|
||||
@@ -35,6 +35,11 @@ const { initSpy, runSpy, loadConfig } = vi.hoisted(() => ({
|
||||
})),
|
||||
}));
|
||||
|
||||
const { computeBackoff, sleepWithAbort } = vi.hoisted(() => ({
|
||||
computeBackoff: vi.fn(() => 0),
|
||||
sleepWithAbort: vi.fn(async () => undefined),
|
||||
}));
|
||||
|
||||
vi.mock("../config/config.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../config/config.js")>();
|
||||
return {
|
||||
@@ -70,6 +75,11 @@ vi.mock("@grammyjs/runner", () => ({
|
||||
run: runSpy,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/backoff.js", () => ({
|
||||
computeBackoff,
|
||||
sleepWithAbort,
|
||||
}));
|
||||
|
||||
vi.mock("../auto-reply/reply.js", () => ({
|
||||
getReplyFromConfig: async (ctx: { Body?: string }) => ({
|
||||
text: `echo:${ctx.Body}`,
|
||||
@@ -84,6 +94,8 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
});
|
||||
initSpy.mockClear();
|
||||
runSpy.mockClear();
|
||||
computeBackoff.mockClear();
|
||||
sleepWithAbort.mockClear();
|
||||
});
|
||||
|
||||
it("processes a DM and sends reply", async () => {
|
||||
@@ -119,7 +131,11 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
expect.anything(),
|
||||
expect.objectContaining({
|
||||
sink: { concurrency: 3 },
|
||||
runner: expect.objectContaining({ silent: true }),
|
||||
runner: expect.objectContaining({
|
||||
silent: true,
|
||||
maxRetryTime: 5 * 60 * 1000,
|
||||
retryInterval: "exponential",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
@@ -140,4 +156,32 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
});
|
||||
expect(api.sendMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("retries on recoverable network errors", async () => {
|
||||
const networkError = Object.assign(new Error("timeout"), { code: "ETIMEDOUT" });
|
||||
runSpy
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(networkError),
|
||||
stop: vi.fn(),
|
||||
}))
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.resolve(),
|
||||
stop: vi.fn(),
|
||||
}));
|
||||
|
||||
await monitorTelegramProvider({ token: "tok" });
|
||||
|
||||
expect(computeBackoff).toHaveBeenCalled();
|
||||
expect(sleepWithAbort).toHaveBeenCalled();
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("surfaces non-recoverable errors", async () => {
|
||||
runSpy.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(new Error("bad token")),
|
||||
stop: vi.fn(),
|
||||
}));
|
||||
|
||||
await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,11 +3,13 @@ import type { ClawdbotConfig } from "../config/config.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { resolveAgentMaxConcurrent } from "../config/agent-limits.js";
|
||||
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { formatDurationMs } from "../infra/format-duration.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { resolveTelegramAccount } from "./accounts.js";
|
||||
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
|
||||
import { createTelegramBot } from "./bot.js";
|
||||
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
|
||||
import { makeProxyFetch } from "./proxy.js";
|
||||
import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js";
|
||||
import { startTelegramWebhook } from "./webhook.js";
|
||||
@@ -40,9 +42,8 @@ export function createTelegramRunnerOptions(cfg: ClawdbotConfig): RunOptions<unk
|
||||
},
|
||||
// Suppress grammY getUpdates stack traces; we log concise errors ourselves.
|
||||
silent: true,
|
||||
// Retry failed getUpdates calls for up to 5 minutes before giving up
|
||||
// Retry transient failures for a limited window before surfacing errors.
|
||||
maxRetryTime: 5 * 60 * 1000,
|
||||
// Use exponential backoff for retries
|
||||
retryInterval: "exponential",
|
||||
},
|
||||
};
|
||||
@@ -73,23 +74,6 @@ const isGetUpdatesConflict = (err: unknown) => {
|
||||
return haystack.includes("getupdates");
|
||||
};
|
||||
|
||||
const isRecoverableNetworkError = (err: unknown): boolean => {
|
||||
if (!err) return false;
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
const lowerMessage = message.toLowerCase();
|
||||
// Recoverable network errors that should trigger retry, not crash
|
||||
return (
|
||||
lowerMessage.includes("fetch failed") ||
|
||||
lowerMessage.includes("network request") ||
|
||||
lowerMessage.includes("econnrefused") ||
|
||||
lowerMessage.includes("econnreset") ||
|
||||
lowerMessage.includes("etimedout") ||
|
||||
lowerMessage.includes("socket hang up") ||
|
||||
lowerMessage.includes("enotfound") ||
|
||||
lowerMessage.includes("abort")
|
||||
);
|
||||
};
|
||||
|
||||
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
@@ -154,7 +138,6 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
}
|
||||
|
||||
// Use grammyjs/runner for concurrent update processing
|
||||
const log = opts.runtime?.log ?? console.log;
|
||||
let restartAttempts = 0;
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
@@ -174,14 +157,14 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
throw err;
|
||||
}
|
||||
const isConflict = isGetUpdatesConflict(err);
|
||||
const isNetworkError = isRecoverableNetworkError(err);
|
||||
if (!isConflict && !isNetworkError) {
|
||||
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||
if (!isConflict && !isRecoverable) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
const reason = isConflict ? "getUpdates conflict" : "network error";
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
const errMsg = formatErrorMessage(err);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
|
||||
);
|
||||
|
||||
48
src/telegram/network-config.test.ts
Normal file
48
src/telegram/network-config.test.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { resolveTelegramAutoSelectFamilyDecision } from "./network-config.js";
|
||||
|
||||
describe("resolveTelegramAutoSelectFamilyDecision", () => {
|
||||
it("prefers env enable over env disable", () => {
|
||||
const decision = resolveTelegramAutoSelectFamilyDecision({
|
||||
env: {
|
||||
CLAWDBOT_TELEGRAM_ENABLE_AUTO_SELECT_FAMILY: "1",
|
||||
CLAWDBOT_TELEGRAM_DISABLE_AUTO_SELECT_FAMILY: "1",
|
||||
},
|
||||
nodeMajor: 22,
|
||||
});
|
||||
expect(decision).toEqual({
|
||||
value: true,
|
||||
source: "env:CLAWDBOT_TELEGRAM_ENABLE_AUTO_SELECT_FAMILY",
|
||||
});
|
||||
});
|
||||
|
||||
it("uses env disable when set", () => {
|
||||
const decision = resolveTelegramAutoSelectFamilyDecision({
|
||||
env: { CLAWDBOT_TELEGRAM_DISABLE_AUTO_SELECT_FAMILY: "1" },
|
||||
nodeMajor: 22,
|
||||
});
|
||||
expect(decision).toEqual({
|
||||
value: false,
|
||||
source: "env:CLAWDBOT_TELEGRAM_DISABLE_AUTO_SELECT_FAMILY",
|
||||
});
|
||||
});
|
||||
|
||||
it("uses config override when provided", () => {
|
||||
const decision = resolveTelegramAutoSelectFamilyDecision({
|
||||
network: { autoSelectFamily: true },
|
||||
nodeMajor: 22,
|
||||
});
|
||||
expect(decision).toEqual({ value: true, source: "config" });
|
||||
});
|
||||
|
||||
it("defaults to disable on Node 22", () => {
|
||||
const decision = resolveTelegramAutoSelectFamilyDecision({ nodeMajor: 22 });
|
||||
expect(decision).toEqual({ value: false, source: "default-node22" });
|
||||
});
|
||||
|
||||
it("returns null when no decision applies", () => {
|
||||
const decision = resolveTelegramAutoSelectFamilyDecision({ nodeMajor: 20 });
|
||||
expect(decision).toEqual({ value: null });
|
||||
});
|
||||
});
|
||||
39
src/telegram/network-config.ts
Normal file
39
src/telegram/network-config.ts
Normal file
@@ -0,0 +1,39 @@
|
||||
import process from "node:process";
|
||||
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
import type { TelegramNetworkConfig } from "../config/types.telegram.js";
|
||||
|
||||
export const TELEGRAM_DISABLE_AUTO_SELECT_FAMILY_ENV =
|
||||
"CLAWDBOT_TELEGRAM_DISABLE_AUTO_SELECT_FAMILY";
|
||||
export const TELEGRAM_ENABLE_AUTO_SELECT_FAMILY_ENV = "CLAWDBOT_TELEGRAM_ENABLE_AUTO_SELECT_FAMILY";
|
||||
|
||||
export type TelegramAutoSelectFamilyDecision = {
|
||||
value: boolean | null;
|
||||
source?: string;
|
||||
};
|
||||
|
||||
export function resolveTelegramAutoSelectFamilyDecision(params?: {
|
||||
network?: TelegramNetworkConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
nodeMajor?: number;
|
||||
}): TelegramAutoSelectFamilyDecision {
|
||||
const env = params?.env ?? process.env;
|
||||
const nodeMajor =
|
||||
typeof params?.nodeMajor === "number"
|
||||
? params.nodeMajor
|
||||
: Number(process.versions.node.split(".")[0]);
|
||||
|
||||
if (isTruthyEnvValue(env[TELEGRAM_ENABLE_AUTO_SELECT_FAMILY_ENV])) {
|
||||
return { value: true, source: `env:${TELEGRAM_ENABLE_AUTO_SELECT_FAMILY_ENV}` };
|
||||
}
|
||||
if (isTruthyEnvValue(env[TELEGRAM_DISABLE_AUTO_SELECT_FAMILY_ENV])) {
|
||||
return { value: false, source: `env:${TELEGRAM_DISABLE_AUTO_SELECT_FAMILY_ENV}` };
|
||||
}
|
||||
if (typeof params?.network?.autoSelectFamily === "boolean") {
|
||||
return { value: params.network.autoSelectFamily, source: "config" };
|
||||
}
|
||||
if (Number.isFinite(nodeMajor) && nodeMajor >= 22) {
|
||||
return { value: false, source: "default-node22" };
|
||||
}
|
||||
return { value: null };
|
||||
}
|
||||
31
src/telegram/network-errors.test.ts
Normal file
31
src/telegram/network-errors.test.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
|
||||
|
||||
describe("isRecoverableTelegramNetworkError", () => {
|
||||
it("detects recoverable error codes", () => {
|
||||
const err = Object.assign(new Error("timeout"), { code: "ETIMEDOUT" });
|
||||
expect(isRecoverableTelegramNetworkError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("detects AbortError names", () => {
|
||||
const err = Object.assign(new Error("The operation was aborted"), { name: "AbortError" });
|
||||
expect(isRecoverableTelegramNetworkError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("detects nested causes", () => {
|
||||
const cause = Object.assign(new Error("socket hang up"), { code: "ECONNRESET" });
|
||||
const err = Object.assign(new TypeError("fetch failed"), { cause });
|
||||
expect(isRecoverableTelegramNetworkError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("skips message matches for send context", () => {
|
||||
const err = new TypeError("fetch failed");
|
||||
expect(isRecoverableTelegramNetworkError(err, { context: "send" })).toBe(false);
|
||||
expect(isRecoverableTelegramNetworkError(err, { context: "polling" })).toBe(true);
|
||||
});
|
||||
|
||||
it("returns false for unrelated errors", () => {
|
||||
expect(isRecoverableTelegramNetworkError(new Error("invalid token"))).toBe(false);
|
||||
});
|
||||
});
|
||||
112
src/telegram/network-errors.ts
Normal file
112
src/telegram/network-errors.ts
Normal file
@@ -0,0 +1,112 @@
|
||||
import { extractErrorCode, formatErrorMessage } from "../infra/errors.js";
|
||||
|
||||
const RECOVERABLE_ERROR_CODES = new Set([
|
||||
"ECONNRESET",
|
||||
"ECONNREFUSED",
|
||||
"EPIPE",
|
||||
"ETIMEDOUT",
|
||||
"ESOCKETTIMEDOUT",
|
||||
"ENETUNREACH",
|
||||
"EHOSTUNREACH",
|
||||
"ENOTFOUND",
|
||||
"EAI_AGAIN",
|
||||
"UND_ERR_CONNECT_TIMEOUT",
|
||||
"UND_ERR_HEADERS_TIMEOUT",
|
||||
"UND_ERR_BODY_TIMEOUT",
|
||||
"UND_ERR_SOCKET",
|
||||
"UND_ERR_ABORTED",
|
||||
]);
|
||||
|
||||
const RECOVERABLE_ERROR_NAMES = new Set([
|
||||
"AbortError",
|
||||
"TimeoutError",
|
||||
"ConnectTimeoutError",
|
||||
"HeadersTimeoutError",
|
||||
"BodyTimeoutError",
|
||||
]);
|
||||
|
||||
const RECOVERABLE_MESSAGE_SNIPPETS = [
|
||||
"fetch failed",
|
||||
"network error",
|
||||
"network request",
|
||||
"client network socket disconnected",
|
||||
"socket hang up",
|
||||
"getaddrinfo",
|
||||
];
|
||||
|
||||
function normalizeCode(code?: string): string {
|
||||
return code?.trim().toUpperCase() ?? "";
|
||||
}
|
||||
|
||||
function getErrorName(err: unknown): string {
|
||||
if (!err || typeof err !== "object") return "";
|
||||
return "name" in err ? String(err.name) : "";
|
||||
}
|
||||
|
||||
function getErrorCode(err: unknown): string | undefined {
|
||||
const direct = extractErrorCode(err);
|
||||
if (direct) return direct;
|
||||
if (!err || typeof err !== "object") return undefined;
|
||||
const errno = (err as { errno?: unknown }).errno;
|
||||
if (typeof errno === "string") return errno;
|
||||
if (typeof errno === "number") return String(errno);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function collectErrorCandidates(err: unknown): unknown[] {
|
||||
const queue = [err];
|
||||
const seen = new Set<unknown>();
|
||||
const candidates: unknown[] = [];
|
||||
|
||||
while (queue.length > 0) {
|
||||
const current = queue.shift();
|
||||
if (current == null || seen.has(current)) continue;
|
||||
seen.add(current);
|
||||
candidates.push(current);
|
||||
|
||||
if (typeof current === "object") {
|
||||
const cause = (current as { cause?: unknown }).cause;
|
||||
if (cause && !seen.has(cause)) queue.push(cause);
|
||||
const reason = (current as { reason?: unknown }).reason;
|
||||
if (reason && !seen.has(reason)) queue.push(reason);
|
||||
const errors = (current as { errors?: unknown }).errors;
|
||||
if (Array.isArray(errors)) {
|
||||
for (const nested of errors) {
|
||||
if (nested && !seen.has(nested)) queue.push(nested);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return candidates;
|
||||
}
|
||||
|
||||
export type TelegramNetworkErrorContext = "polling" | "send" | "webhook" | "unknown";
|
||||
|
||||
export function isRecoverableTelegramNetworkError(
|
||||
err: unknown,
|
||||
options: { context?: TelegramNetworkErrorContext; allowMessageMatch?: boolean } = {},
|
||||
): boolean {
|
||||
if (!err) return false;
|
||||
const allowMessageMatch =
|
||||
typeof options.allowMessageMatch === "boolean"
|
||||
? options.allowMessageMatch
|
||||
: options.context !== "send";
|
||||
|
||||
for (const candidate of collectErrorCandidates(err)) {
|
||||
const code = normalizeCode(getErrorCode(candidate));
|
||||
if (code && RECOVERABLE_ERROR_CODES.has(code)) return true;
|
||||
|
||||
const name = getErrorName(candidate);
|
||||
if (name && RECOVERABLE_ERROR_NAMES.has(name)) return true;
|
||||
|
||||
if (allowMessageMatch) {
|
||||
const message = formatErrorMessage(candidate).toLowerCase();
|
||||
if (message && RECOVERABLE_MESSAGE_SNIPPETS.some((snippet) => message.includes(snippet))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -19,6 +19,7 @@ vi.mock("../web/media.js", () => ({
|
||||
vi.mock("grammy", () => ({
|
||||
Bot: class {
|
||||
api = botApi;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: {
|
||||
|
||||
@@ -19,6 +19,7 @@ vi.mock("../web/media.js", () => ({
|
||||
vi.mock("grammy", () => ({
|
||||
Bot: class {
|
||||
api = botApi;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch } },
|
||||
|
||||
@@ -40,6 +40,7 @@ vi.mock("./fetch.js", () => ({
|
||||
vi.mock("grammy", () => ({
|
||||
Bot: class {
|
||||
api = botApi;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: { client?: { fetch?: typeof fetch; timeoutSeconds?: number } },
|
||||
@@ -76,7 +77,7 @@ describe("telegram proxy client", () => {
|
||||
await sendMessageTelegram("123", "hi", { token: "tok", accountId: "foo" });
|
||||
|
||||
expect(makeProxyFetch).toHaveBeenCalledWith(proxyUrl);
|
||||
expect(resolveTelegramFetch).toHaveBeenCalledWith(proxyFetch);
|
||||
expect(resolveTelegramFetch).toHaveBeenCalledWith(proxyFetch, { network: undefined });
|
||||
expect(botCtorSpy).toHaveBeenCalledWith(
|
||||
"tok",
|
||||
expect.objectContaining({
|
||||
@@ -94,7 +95,7 @@ describe("telegram proxy client", () => {
|
||||
await reactMessageTelegram("123", "456", "✅", { token: "tok", accountId: "foo" });
|
||||
|
||||
expect(makeProxyFetch).toHaveBeenCalledWith(proxyUrl);
|
||||
expect(resolveTelegramFetch).toHaveBeenCalledWith(proxyFetch);
|
||||
expect(resolveTelegramFetch).toHaveBeenCalledWith(proxyFetch, { network: undefined });
|
||||
expect(botCtorSpy).toHaveBeenCalledWith(
|
||||
"tok",
|
||||
expect.objectContaining({
|
||||
@@ -112,7 +113,7 @@ describe("telegram proxy client", () => {
|
||||
await deleteMessageTelegram("123", "456", { token: "tok", accountId: "foo" });
|
||||
|
||||
expect(makeProxyFetch).toHaveBeenCalledWith(proxyUrl);
|
||||
expect(resolveTelegramFetch).toHaveBeenCalledWith(proxyFetch);
|
||||
expect(resolveTelegramFetch).toHaveBeenCalledWith(proxyFetch, { network: undefined });
|
||||
expect(botCtorSpy).toHaveBeenCalledWith(
|
||||
"tok",
|
||||
expect.objectContaining({
|
||||
|
||||
@@ -19,6 +19,7 @@ vi.mock("../web/media.js", () => ({
|
||||
vi.mock("grammy", () => ({
|
||||
Bot: class {
|
||||
api = botApi;
|
||||
catch = vi.fn();
|
||||
constructor(
|
||||
public token: string,
|
||||
public options?: {
|
||||
|
||||
@@ -22,6 +22,7 @@ import { resolveTelegramFetch } from "./fetch.js";
|
||||
import { makeProxyFetch } from "./proxy.js";
|
||||
import { renderTelegramHtmlText } from "./format.js";
|
||||
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
|
||||
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
|
||||
import { splitTelegramCaption } from "./caption.js";
|
||||
import { recordSentMessage } from "./sent-message-cache.js";
|
||||
import { parseTelegramTarget, stripTelegramInternalPrefixes } from "./targets.js";
|
||||
@@ -84,7 +85,9 @@ function resolveTelegramClientOptions(
|
||||
): ApiClientOptions | undefined {
|
||||
const proxyUrl = account.config.proxy?.trim();
|
||||
const proxyFetch = proxyUrl ? makeProxyFetch(proxyUrl) : undefined;
|
||||
const fetchImpl = resolveTelegramFetch(proxyFetch);
|
||||
const fetchImpl = resolveTelegramFetch(proxyFetch, {
|
||||
network: account.config.network,
|
||||
});
|
||||
const timeoutSeconds =
|
||||
typeof account.config.timeoutSeconds === "number" &&
|
||||
Number.isFinite(account.config.timeoutSeconds)
|
||||
@@ -203,6 +206,7 @@ export async function sendMessageTelegram(
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
@@ -434,6 +438,7 @@ export async function reactMessageTelegram(
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
@@ -483,6 +488,7 @@ export async function deleteMessageTelegram(
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { type ApiClientOptions, Bot } from "grammy";
|
||||
import type { TelegramNetworkConfig } from "../config/types.telegram.js";
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
|
||||
export async function setTelegramWebhook(opts: {
|
||||
@@ -6,8 +7,9 @@ export async function setTelegramWebhook(opts: {
|
||||
url: string;
|
||||
secret?: string;
|
||||
dropPendingUpdates?: boolean;
|
||||
network?: TelegramNetworkConfig;
|
||||
}) {
|
||||
const fetchImpl = resolveTelegramFetch();
|
||||
const fetchImpl = resolveTelegramFetch(undefined, { network: opts.network });
|
||||
const client: ApiClientOptions | undefined = fetchImpl
|
||||
? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] }
|
||||
: undefined;
|
||||
@@ -18,8 +20,11 @@ export async function setTelegramWebhook(opts: {
|
||||
});
|
||||
}
|
||||
|
||||
export async function deleteTelegramWebhook(opts: { token: string }) {
|
||||
const fetchImpl = resolveTelegramFetch();
|
||||
export async function deleteTelegramWebhook(opts: {
|
||||
token: string;
|
||||
network?: TelegramNetworkConfig;
|
||||
}) {
|
||||
const fetchImpl = resolveTelegramFetch(undefined, { network: opts.network });
|
||||
const client: ApiClientOptions | undefined = fetchImpl
|
||||
? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] }
|
||||
: undefined;
|
||||
|
||||
Reference in New Issue
Block a user