From c08441c42cc36894e8e1b297815557b9714c766f Mon Sep 17 00:00:00 2001 From: Shadow Date: Mon, 12 Jan 2026 21:52:13 -0600 Subject: [PATCH] Telegram: persist polling update offsets Closes #739 --- CHANGELOG.md | 1 + src/telegram/bot.ts | 31 ++++++++- src/telegram/monitor.ts | 26 +++++++ src/telegram/update-offset-store.test.ts | 42 +++++++++++ src/telegram/update-offset-store.ts | 89 ++++++++++++++++++++++++ 5 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 src/telegram/update-offset-store.test.ts create mode 100644 src/telegram/update-offset-store.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index bc73ced89..336792653 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Memory: allow custom OpenAI-compatible embedding endpoints for memory search (remote baseUrl/apiKey/headers). (#819 — thanks @mukhtharcm) ### Fixes +- Telegram: persist polling update offsets across restarts to avoid duplicate updates. (#739 — thanks @thewilloftheshadow) - Discord: avoid duplicate message/reaction listeners on monitor reloads. (#744 — thanks @thewilloftheshadow) - System events: include local timestamps when events are injected into prompts. (#245 — thanks @thewilloftheshadow) - Cron: accept `jobId` aliases for cron update/run/remove params in gateway validation. (#252 — thanks @thewilloftheshadow) diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index ea2f09ddf..c5135c8f6 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -109,8 +109,11 @@ type TelegramUpdateKeyContext = { callbackQuery?: { id?: string; message?: TelegramMessage }; }; +const resolveTelegramUpdateId = (ctx: TelegramUpdateKeyContext) => + ctx.update?.update_id ?? ctx.update_id; + const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => { - const updateId = ctx.update?.update_id ?? ctx.update_id; + const updateId = resolveTelegramUpdateId(ctx); if (typeof updateId === "number") return `update:${updateId}`; const callbackId = ctx.callbackQuery?.id; if (callbackId) return `callback:${callbackId}`; @@ -172,6 +175,10 @@ export type TelegramBotOptions = { replyToMode?: ReplyToMode; proxyFetch?: typeof fetch; config?: ClawdbotConfig; + updateOffset?: { + lastUpdateId?: number | null; + onUpdateId?: (updateId: number) => void | Promise; + }; }; export function getTelegramSequentialKey(ctx: { @@ -220,7 +227,24 @@ export function createTelegramBot(opts: TelegramBotOptions) { bot.use(sequentialize(getTelegramSequentialKey)); const recentUpdates = createTelegramUpdateDedupe(); + let lastUpdateId = + typeof opts.updateOffset?.lastUpdateId === "number" + ? opts.updateOffset.lastUpdateId + : null; + + const recordUpdateId = (ctx: TelegramUpdateKeyContext) => { + const updateId = resolveTelegramUpdateId(ctx); + if (typeof updateId !== "number") return; + if (lastUpdateId !== null && updateId <= lastUpdateId) return; + lastUpdateId = updateId; + void opts.updateOffset?.onUpdateId?.(updateId); + }; + const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => { + const updateId = resolveTelegramUpdateId(ctx); + if (typeof updateId === "number" && lastUpdateId !== null) { + if (updateId <= lastUpdateId) return true; + } const key = buildTelegramUpdateKey(ctx); const skipped = recentUpdates.check(key); if (skipped && key && shouldLogVerbose()) { @@ -229,6 +253,11 @@ export function createTelegramBot(opts: TelegramBotOptions) { return skipped; }; + bot.use(async (ctx, next) => { + await next(); + recordUpdateId(ctx); + }); + const mediaGroupBuffer = new Map(); let mediaGroupProcessing: Promise = Promise.resolve(); diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 7ed3f2faf..908c8de39 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -6,6 +6,10 @@ import { formatDurationMs } from "../infra/format-duration.js"; import type { RuntimeEnv } from "../runtime.js"; import { resolveTelegramAccount } from "./accounts.js"; import { createTelegramBot } from "./bot.js"; +import { + readTelegramUpdateOffset, + writeTelegramUpdateOffset, +} from "./update-offset-store.js"; import { makeProxyFetch } from "./proxy.js"; import { startTelegramWebhook } from "./webhook.js"; @@ -85,12 +89,34 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { ? makeProxyFetch(account.config.proxy as string) : undefined); + let lastUpdateId = await readTelegramUpdateOffset({ + accountId: account.accountId, + }); + const persistUpdateId = async (updateId: number) => { + if (lastUpdateId !== null && updateId <= lastUpdateId) return; + lastUpdateId = updateId; + try { + await writeTelegramUpdateOffset({ + accountId: account.accountId, + updateId, + }); + } catch (err) { + (opts.runtime?.error ?? console.error)( + `telegram: failed to persist update offset: ${String(err)}`, + ); + } + }; + const bot = createTelegramBot({ token, runtime: opts.runtime, proxyFetch, config: cfg, accountId: account.accountId, + updateOffset: { + lastUpdateId, + onUpdateId: persistUpdateId, + }, }); if (opts.useWebhook) { diff --git a/src/telegram/update-offset-store.test.ts b/src/telegram/update-offset-store.test.ts new file mode 100644 index 000000000..2483c4101 --- /dev/null +++ b/src/telegram/update-offset-store.test.ts @@ -0,0 +1,42 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { + readTelegramUpdateOffset, + writeTelegramUpdateOffset, +} from "./update-offset-store.js"; + +async function withTempStateDir(fn: (dir: string) => Promise) { + const previous = process.env.CLAWDBOT_STATE_DIR; + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-telegram-")); + process.env.CLAWDBOT_STATE_DIR = dir; + try { + return await fn(dir); + } finally { + if (previous === undefined) delete process.env.CLAWDBOT_STATE_DIR; + else process.env.CLAWDBOT_STATE_DIR = previous; + await fs.rm(dir, { recursive: true, force: true }); + } +} + +describe("telegram update offset store", () => { + it("persists and reloads the last update id", async () => { + await withTempStateDir(async () => { + expect( + await readTelegramUpdateOffset({ accountId: "primary" }), + ).toBeNull(); + + await writeTelegramUpdateOffset({ + accountId: "primary", + updateId: 421, + }); + + expect( + await readTelegramUpdateOffset({ accountId: "primary" }), + ).toBe(421); + }); + }); +}); diff --git a/src/telegram/update-offset-store.ts b/src/telegram/update-offset-store.ts new file mode 100644 index 000000000..50ce8d8d0 --- /dev/null +++ b/src/telegram/update-offset-store.ts @@ -0,0 +1,89 @@ +import crypto from "node:crypto"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { resolveStateDir } from "../config/paths.js"; + +const STORE_VERSION = 1; + +type TelegramUpdateOffsetState = { + version: number; + lastUpdateId: number | null; +}; + +function normalizeAccountId(accountId?: string) { + const trimmed = accountId?.trim(); + if (!trimmed) return "default"; + return trimmed.replace(/[^a-z0-9._-]+/gi, "_"); +} + +function resolveTelegramUpdateOffsetPath( + accountId?: string, + env: NodeJS.ProcessEnv = process.env, +): string { + const stateDir = resolveStateDir(env, os.homedir); + const normalized = normalizeAccountId(accountId); + return path.join(stateDir, "telegram", `update-offset-${normalized}.json`); +} + +function safeParseState(raw: string): TelegramUpdateOffsetState | null { + try { + const parsed = JSON.parse(raw) as TelegramUpdateOffsetState; + if (parsed?.version !== STORE_VERSION) return null; + if ( + parsed.lastUpdateId !== null && + typeof parsed.lastUpdateId !== "number" + ) { + return null; + } + return parsed; + } catch { + return null; + } +} + +export async function readTelegramUpdateOffset(params: { + accountId?: string; + env?: NodeJS.ProcessEnv; +}): Promise { + const filePath = resolveTelegramUpdateOffsetPath( + params.accountId, + params.env, + ); + try { + const raw = await fs.readFile(filePath, "utf-8"); + const parsed = safeParseState(raw); + return parsed?.lastUpdateId ?? null; + } catch (err) { + const code = (err as { code?: string }).code; + if (code === "ENOENT") return null; + return null; + } +} + +export async function writeTelegramUpdateOffset(params: { + accountId?: string; + updateId: number; + env?: NodeJS.ProcessEnv; +}): Promise { + const filePath = resolveTelegramUpdateOffsetPath( + params.accountId, + params.env, + ); + const dir = path.dirname(filePath); + await fs.mkdir(dir, { recursive: true, mode: 0o700 }); + const tmp = path.join( + dir, + `${path.basename(filePath)}.${crypto.randomUUID()}.tmp`, + ); + const payload: TelegramUpdateOffsetState = { + version: STORE_VERSION, + lastUpdateId: params.updateId, + }; + await fs.writeFile(tmp, `${JSON.stringify(payload, null, 2)}\n`, { + encoding: "utf-8", + }); + await fs.chmod(tmp, 0o600); + await fs.rename(tmp, filePath); +}