fix: retry telegram poll conflicts
This commit is contained in:
@@ -6,6 +6,7 @@
|
|||||||
- Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini).
|
- Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini).
|
||||||
- Control UI: logs tab opens at the newest entries (bottom).
|
- Control UI: logs tab opens at the newest entries (bottom).
|
||||||
- Control UI: add Docs link, remove chat composer divider, and add New session button.
|
- Control UI: add Docs link, remove chat composer divider, and add New session button.
|
||||||
|
- Telegram: retry long-polling conflicts with backoff to avoid fatal exits.
|
||||||
|
|
||||||
## 2026.1.8
|
## 2026.1.8
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import { type RunOptions, run } from "@grammyjs/runner";
|
import { type RunOptions, run } from "@grammyjs/runner";
|
||||||
import type { ClawdbotConfig } from "../config/config.js";
|
import type { ClawdbotConfig } from "../config/config.js";
|
||||||
import { loadConfig } from "../config/config.js";
|
import { loadConfig } from "../config/config.js";
|
||||||
|
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
|
||||||
|
import { formatDurationMs } from "../infra/format-duration.js";
|
||||||
import type { RuntimeEnv } from "../runtime.js";
|
import type { RuntimeEnv } from "../runtime.js";
|
||||||
import { resolveTelegramAccount } from "./accounts.js";
|
import { resolveTelegramAccount } from "./accounts.js";
|
||||||
import { createTelegramBot } from "./bot.js";
|
import { createTelegramBot } from "./bot.js";
|
||||||
@@ -37,6 +39,35 @@ export function createTelegramRunnerOptions(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const TELEGRAM_POLL_RESTART_POLICY = {
|
||||||
|
initialMs: 2000,
|
||||||
|
maxMs: 30_000,
|
||||||
|
factor: 1.8,
|
||||||
|
jitter: 0.25,
|
||||||
|
};
|
||||||
|
|
||||||
|
const isGetUpdatesConflict = (err: unknown) => {
|
||||||
|
if (!err || typeof err !== "object") return false;
|
||||||
|
const typed = err as {
|
||||||
|
error_code?: number;
|
||||||
|
errorCode?: number;
|
||||||
|
description?: string;
|
||||||
|
method?: string;
|
||||||
|
message?: string;
|
||||||
|
};
|
||||||
|
const errorCode = typed.error_code ?? typed.errorCode;
|
||||||
|
if (errorCode !== 409) return false;
|
||||||
|
const haystack = [
|
||||||
|
typed.method,
|
||||||
|
typed.description,
|
||||||
|
typed.message,
|
||||||
|
]
|
||||||
|
.filter((value): value is string => typeof value === "string")
|
||||||
|
.join(" ")
|
||||||
|
.toLowerCase();
|
||||||
|
return haystack.includes("getupdates");
|
||||||
|
};
|
||||||
|
|
||||||
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||||
const cfg = opts.config ?? loadConfig();
|
const cfg = opts.config ?? loadConfig();
|
||||||
const account = resolveTelegramAccount({
|
const account = resolveTelegramAccount({
|
||||||
@@ -79,19 +110,44 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Use grammyjs/runner for concurrent update processing
|
// Use grammyjs/runner for concurrent update processing
|
||||||
const runner = run(bot, createTelegramRunnerOptions(cfg));
|
const log = opts.runtime?.log ?? console.log;
|
||||||
|
let restartAttempts = 0;
|
||||||
|
|
||||||
|
while (!opts.abortSignal?.aborted) {
|
||||||
|
const runner = run(bot, createTelegramRunnerOptions(cfg));
|
||||||
const stopOnAbort = () => {
|
const stopOnAbort = () => {
|
||||||
if (opts.abortSignal?.aborted) {
|
if (opts.abortSignal?.aborted) {
|
||||||
void runner.stop();
|
void runner.stop();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// runner.task() returns a promise that resolves when the runner stops
|
// runner.task() returns a promise that resolves when the runner stops
|
||||||
await runner.task();
|
await runner.task();
|
||||||
|
return;
|
||||||
|
} catch (err) {
|
||||||
|
if (opts.abortSignal?.aborted) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
if (!isGetUpdatesConflict(err)) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
restartAttempts += 1;
|
||||||
|
const delayMs = computeBackoff(
|
||||||
|
TELEGRAM_POLL_RESTART_POLICY,
|
||||||
|
restartAttempts,
|
||||||
|
);
|
||||||
|
log(
|
||||||
|
`Telegram getUpdates conflict; retrying in ${formatDurationMs(delayMs)}.`,
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||||
|
} catch (sleepErr) {
|
||||||
|
if (opts.abortSignal?.aborted) return;
|
||||||
|
throw sleepErr;
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user