@@ -7,6 +7,7 @@
|
|||||||
- Memory: allow custom OpenAI-compatible embedding endpoints for memory search (remote baseUrl/apiKey/headers). (#819 — thanks @mukhtharcm)
|
- Memory: allow custom OpenAI-compatible embedding endpoints for memory search (remote baseUrl/apiKey/headers). (#819 — thanks @mukhtharcm)
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
- Telegram: persist polling update offsets across restarts to avoid duplicate updates. (#739 — thanks @thewilloftheshadow)
|
||||||
- Discord: avoid duplicate message/reaction listeners on monitor reloads. (#744 — thanks @thewilloftheshadow)
|
- Discord: avoid duplicate message/reaction listeners on monitor reloads. (#744 — thanks @thewilloftheshadow)
|
||||||
- System events: include local timestamps when events are injected into prompts. (#245 — thanks @thewilloftheshadow)
|
- System events: include local timestamps when events are injected into prompts. (#245 — thanks @thewilloftheshadow)
|
||||||
- Cron: accept `jobId` aliases for cron update/run/remove params in gateway validation. (#252 — thanks @thewilloftheshadow)
|
- Cron: accept `jobId` aliases for cron update/run/remove params in gateway validation. (#252 — thanks @thewilloftheshadow)
|
||||||
|
|||||||
@@ -109,8 +109,11 @@ type TelegramUpdateKeyContext = {
|
|||||||
callbackQuery?: { id?: string; message?: TelegramMessage };
|
callbackQuery?: { id?: string; message?: TelegramMessage };
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const resolveTelegramUpdateId = (ctx: TelegramUpdateKeyContext) =>
|
||||||
|
ctx.update?.update_id ?? ctx.update_id;
|
||||||
|
|
||||||
const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => {
|
const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => {
|
||||||
const updateId = ctx.update?.update_id ?? ctx.update_id;
|
const updateId = resolveTelegramUpdateId(ctx);
|
||||||
if (typeof updateId === "number") return `update:${updateId}`;
|
if (typeof updateId === "number") return `update:${updateId}`;
|
||||||
const callbackId = ctx.callbackQuery?.id;
|
const callbackId = ctx.callbackQuery?.id;
|
||||||
if (callbackId) return `callback:${callbackId}`;
|
if (callbackId) return `callback:${callbackId}`;
|
||||||
@@ -172,6 +175,10 @@ export type TelegramBotOptions = {
|
|||||||
replyToMode?: ReplyToMode;
|
replyToMode?: ReplyToMode;
|
||||||
proxyFetch?: typeof fetch;
|
proxyFetch?: typeof fetch;
|
||||||
config?: ClawdbotConfig;
|
config?: ClawdbotConfig;
|
||||||
|
updateOffset?: {
|
||||||
|
lastUpdateId?: number | null;
|
||||||
|
onUpdateId?: (updateId: number) => void | Promise<void>;
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
export function getTelegramSequentialKey(ctx: {
|
export function getTelegramSequentialKey(ctx: {
|
||||||
@@ -220,7 +227,24 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
|||||||
bot.use(sequentialize(getTelegramSequentialKey));
|
bot.use(sequentialize(getTelegramSequentialKey));
|
||||||
|
|
||||||
const recentUpdates = createTelegramUpdateDedupe();
|
const recentUpdates = createTelegramUpdateDedupe();
|
||||||
|
let lastUpdateId =
|
||||||
|
typeof opts.updateOffset?.lastUpdateId === "number"
|
||||||
|
? opts.updateOffset.lastUpdateId
|
||||||
|
: null;
|
||||||
|
|
||||||
|
const recordUpdateId = (ctx: TelegramUpdateKeyContext) => {
|
||||||
|
const updateId = resolveTelegramUpdateId(ctx);
|
||||||
|
if (typeof updateId !== "number") return;
|
||||||
|
if (lastUpdateId !== null && updateId <= lastUpdateId) return;
|
||||||
|
lastUpdateId = updateId;
|
||||||
|
void opts.updateOffset?.onUpdateId?.(updateId);
|
||||||
|
};
|
||||||
|
|
||||||
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
|
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
|
||||||
|
const updateId = resolveTelegramUpdateId(ctx);
|
||||||
|
if (typeof updateId === "number" && lastUpdateId !== null) {
|
||||||
|
if (updateId <= lastUpdateId) return true;
|
||||||
|
}
|
||||||
const key = buildTelegramUpdateKey(ctx);
|
const key = buildTelegramUpdateKey(ctx);
|
||||||
const skipped = recentUpdates.check(key);
|
const skipped = recentUpdates.check(key);
|
||||||
if (skipped && key && shouldLogVerbose()) {
|
if (skipped && key && shouldLogVerbose()) {
|
||||||
@@ -229,6 +253,11 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
|||||||
return skipped;
|
return skipped;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
bot.use(async (ctx, next) => {
|
||||||
|
await next();
|
||||||
|
recordUpdateId(ctx);
|
||||||
|
});
|
||||||
|
|
||||||
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
|
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
|
||||||
let mediaGroupProcessing: Promise<void> = Promise.resolve();
|
let mediaGroupProcessing: Promise<void> = Promise.resolve();
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,10 @@ import { formatDurationMs } from "../infra/format-duration.js";
|
|||||||
import type { RuntimeEnv } from "../runtime.js";
|
import type { RuntimeEnv } from "../runtime.js";
|
||||||
import { resolveTelegramAccount } from "./accounts.js";
|
import { resolveTelegramAccount } from "./accounts.js";
|
||||||
import { createTelegramBot } from "./bot.js";
|
import { createTelegramBot } from "./bot.js";
|
||||||
|
import {
|
||||||
|
readTelegramUpdateOffset,
|
||||||
|
writeTelegramUpdateOffset,
|
||||||
|
} from "./update-offset-store.js";
|
||||||
import { makeProxyFetch } from "./proxy.js";
|
import { makeProxyFetch } from "./proxy.js";
|
||||||
import { startTelegramWebhook } from "./webhook.js";
|
import { startTelegramWebhook } from "./webhook.js";
|
||||||
|
|
||||||
@@ -85,12 +89,34 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
|||||||
? makeProxyFetch(account.config.proxy as string)
|
? makeProxyFetch(account.config.proxy as string)
|
||||||
: undefined);
|
: undefined);
|
||||||
|
|
||||||
|
let lastUpdateId = await readTelegramUpdateOffset({
|
||||||
|
accountId: account.accountId,
|
||||||
|
});
|
||||||
|
const persistUpdateId = async (updateId: number) => {
|
||||||
|
if (lastUpdateId !== null && updateId <= lastUpdateId) return;
|
||||||
|
lastUpdateId = updateId;
|
||||||
|
try {
|
||||||
|
await writeTelegramUpdateOffset({
|
||||||
|
accountId: account.accountId,
|
||||||
|
updateId,
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
(opts.runtime?.error ?? console.error)(
|
||||||
|
`telegram: failed to persist update offset: ${String(err)}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const bot = createTelegramBot({
|
const bot = createTelegramBot({
|
||||||
token,
|
token,
|
||||||
runtime: opts.runtime,
|
runtime: opts.runtime,
|
||||||
proxyFetch,
|
proxyFetch,
|
||||||
config: cfg,
|
config: cfg,
|
||||||
accountId: account.accountId,
|
accountId: account.accountId,
|
||||||
|
updateOffset: {
|
||||||
|
lastUpdateId,
|
||||||
|
onUpdateId: persistUpdateId,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
if (opts.useWebhook) {
|
if (opts.useWebhook) {
|
||||||
|
|||||||
42
src/telegram/update-offset-store.test.ts
Normal file
42
src/telegram/update-offset-store.test.ts
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import fs from "node:fs/promises";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
|
||||||
|
import {
|
||||||
|
readTelegramUpdateOffset,
|
||||||
|
writeTelegramUpdateOffset,
|
||||||
|
} from "./update-offset-store.js";
|
||||||
|
|
||||||
|
async function withTempStateDir<T>(fn: (dir: string) => Promise<T>) {
|
||||||
|
const previous = process.env.CLAWDBOT_STATE_DIR;
|
||||||
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-telegram-"));
|
||||||
|
process.env.CLAWDBOT_STATE_DIR = dir;
|
||||||
|
try {
|
||||||
|
return await fn(dir);
|
||||||
|
} finally {
|
||||||
|
if (previous === undefined) delete process.env.CLAWDBOT_STATE_DIR;
|
||||||
|
else process.env.CLAWDBOT_STATE_DIR = previous;
|
||||||
|
await fs.rm(dir, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("telegram update offset store", () => {
|
||||||
|
it("persists and reloads the last update id", async () => {
|
||||||
|
await withTempStateDir(async () => {
|
||||||
|
expect(
|
||||||
|
await readTelegramUpdateOffset({ accountId: "primary" }),
|
||||||
|
).toBeNull();
|
||||||
|
|
||||||
|
await writeTelegramUpdateOffset({
|
||||||
|
accountId: "primary",
|
||||||
|
updateId: 421,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(
|
||||||
|
await readTelegramUpdateOffset({ accountId: "primary" }),
|
||||||
|
).toBe(421);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
89
src/telegram/update-offset-store.ts
Normal file
89
src/telegram/update-offset-store.ts
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
import crypto from "node:crypto";
|
||||||
|
import fs from "node:fs/promises";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
import { resolveStateDir } from "../config/paths.js";
|
||||||
|
|
||||||
|
const STORE_VERSION = 1;
|
||||||
|
|
||||||
|
type TelegramUpdateOffsetState = {
|
||||||
|
version: number;
|
||||||
|
lastUpdateId: number | null;
|
||||||
|
};
|
||||||
|
|
||||||
|
function normalizeAccountId(accountId?: string) {
|
||||||
|
const trimmed = accountId?.trim();
|
||||||
|
if (!trimmed) return "default";
|
||||||
|
return trimmed.replace(/[^a-z0-9._-]+/gi, "_");
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveTelegramUpdateOffsetPath(
|
||||||
|
accountId?: string,
|
||||||
|
env: NodeJS.ProcessEnv = process.env,
|
||||||
|
): string {
|
||||||
|
const stateDir = resolveStateDir(env, os.homedir);
|
||||||
|
const normalized = normalizeAccountId(accountId);
|
||||||
|
return path.join(stateDir, "telegram", `update-offset-${normalized}.json`);
|
||||||
|
}
|
||||||
|
|
||||||
|
function safeParseState(raw: string): TelegramUpdateOffsetState | null {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(raw) as TelegramUpdateOffsetState;
|
||||||
|
if (parsed?.version !== STORE_VERSION) return null;
|
||||||
|
if (
|
||||||
|
parsed.lastUpdateId !== null &&
|
||||||
|
typeof parsed.lastUpdateId !== "number"
|
||||||
|
) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return parsed;
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function readTelegramUpdateOffset(params: {
|
||||||
|
accountId?: string;
|
||||||
|
env?: NodeJS.ProcessEnv;
|
||||||
|
}): Promise<number | null> {
|
||||||
|
const filePath = resolveTelegramUpdateOffsetPath(
|
||||||
|
params.accountId,
|
||||||
|
params.env,
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
const raw = await fs.readFile(filePath, "utf-8");
|
||||||
|
const parsed = safeParseState(raw);
|
||||||
|
return parsed?.lastUpdateId ?? null;
|
||||||
|
} catch (err) {
|
||||||
|
const code = (err as { code?: string }).code;
|
||||||
|
if (code === "ENOENT") return null;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function writeTelegramUpdateOffset(params: {
|
||||||
|
accountId?: string;
|
||||||
|
updateId: number;
|
||||||
|
env?: NodeJS.ProcessEnv;
|
||||||
|
}): Promise<void> {
|
||||||
|
const filePath = resolveTelegramUpdateOffsetPath(
|
||||||
|
params.accountId,
|
||||||
|
params.env,
|
||||||
|
);
|
||||||
|
const dir = path.dirname(filePath);
|
||||||
|
await fs.mkdir(dir, { recursive: true, mode: 0o700 });
|
||||||
|
const tmp = path.join(
|
||||||
|
dir,
|
||||||
|
`${path.basename(filePath)}.${crypto.randomUUID()}.tmp`,
|
||||||
|
);
|
||||||
|
const payload: TelegramUpdateOffsetState = {
|
||||||
|
version: STORE_VERSION,
|
||||||
|
lastUpdateId: params.updateId,
|
||||||
|
};
|
||||||
|
await fs.writeFile(tmp, `${JSON.stringify(payload, null, 2)}\n`, {
|
||||||
|
encoding: "utf-8",
|
||||||
|
});
|
||||||
|
await fs.chmod(tmp, 0o600);
|
||||||
|
await fs.rename(tmp, filePath);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user