fix: dedupe telegram updates
This commit is contained in:
@@ -1794,4 +1794,126 @@ describe("createTelegramBot", () => {
|
||||
expect.objectContaining({ message_thread_id: 99 }),
|
||||
);
|
||||
});
|
||||
|
||||
it("dedupes duplicate message updates by update_id", async () => {
|
||||
onSpy.mockReset();
|
||||
const replySpy = replyModule.__replySpy as unknown as ReturnType<
|
||||
typeof vi.fn
|
||||
>;
|
||||
replySpy.mockReset();
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
telegram: { dmPolicy: "open", allowFrom: ["*"] },
|
||||
});
|
||||
|
||||
createTelegramBot({ token: "tok" });
|
||||
const handler = getOnHandler("message") as (
|
||||
ctx: Record<string, unknown>,
|
||||
) => Promise<void>;
|
||||
|
||||
const ctx = {
|
||||
update: { update_id: 111 },
|
||||
message: {
|
||||
chat: { id: 123, type: "private" },
|
||||
from: { id: 456, username: "testuser" },
|
||||
text: "hello",
|
||||
date: 1736380800,
|
||||
message_id: 42,
|
||||
},
|
||||
me: { username: "clawdbot_bot" },
|
||||
getFile: async () => ({ download: async () => new Uint8Array() }),
|
||||
};
|
||||
|
||||
await handler(ctx);
|
||||
await handler(ctx);
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("dedupes duplicate callback_query updates by update_id", async () => {
|
||||
onSpy.mockReset();
|
||||
const replySpy = replyModule.__replySpy as unknown as ReturnType<
|
||||
typeof vi.fn
|
||||
>;
|
||||
replySpy.mockReset();
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
telegram: { dmPolicy: "open", allowFrom: ["*"] },
|
||||
});
|
||||
|
||||
createTelegramBot({ token: "tok" });
|
||||
const handler = getOnHandler("callback_query") as (
|
||||
ctx: Record<string, unknown>,
|
||||
) => Promise<void>;
|
||||
|
||||
const ctx = {
|
||||
update: { update_id: 222 },
|
||||
callbackQuery: {
|
||||
id: "cb-1",
|
||||
data: "ping",
|
||||
from: { id: 789, username: "testuser" },
|
||||
message: {
|
||||
chat: { id: 123, type: "private" },
|
||||
date: 1736380800,
|
||||
message_id: 9001,
|
||||
},
|
||||
},
|
||||
me: { username: "clawdbot_bot" },
|
||||
getFile: async () => ({}),
|
||||
};
|
||||
|
||||
await handler(ctx);
|
||||
await handler(ctx);
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("allows distinct callback_query ids without update_id", async () => {
|
||||
onSpy.mockReset();
|
||||
const replySpy = replyModule.__replySpy as unknown as ReturnType<
|
||||
typeof vi.fn
|
||||
>;
|
||||
replySpy.mockReset();
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
telegram: { dmPolicy: "open", allowFrom: ["*"] },
|
||||
});
|
||||
|
||||
createTelegramBot({ token: "tok" });
|
||||
const handler = getOnHandler("callback_query") as (
|
||||
ctx: Record<string, unknown>,
|
||||
) => Promise<void>;
|
||||
|
||||
await handler({
|
||||
callbackQuery: {
|
||||
id: "cb-1",
|
||||
data: "ping",
|
||||
from: { id: 789, username: "testuser" },
|
||||
message: {
|
||||
chat: { id: 123, type: "private" },
|
||||
date: 1736380800,
|
||||
message_id: 9001,
|
||||
},
|
||||
},
|
||||
me: { username: "clawdbot_bot" },
|
||||
getFile: async () => ({}),
|
||||
});
|
||||
|
||||
await handler({
|
||||
callbackQuery: {
|
||||
id: "cb-2",
|
||||
data: "ping",
|
||||
from: { id: 789, username: "testuser" },
|
||||
message: {
|
||||
chat: { id: 123, type: "private" },
|
||||
date: 1736380800,
|
||||
message_id: 9001,
|
||||
},
|
||||
},
|
||||
me: { username: "clawdbot_bot" },
|
||||
getFile: async () => ({}),
|
||||
});
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -71,6 +71,8 @@ const PARSE_ERR_RE =
|
||||
// Media group aggregation - Telegram sends multi-image messages as separate updates
|
||||
// with a shared media_group_id. We buffer them and process as a single message after a short delay.
|
||||
const MEDIA_GROUP_TIMEOUT_MS = 500;
|
||||
const RECENT_TELEGRAM_UPDATE_TTL_MS = 5 * 60_000;
|
||||
const RECENT_TELEGRAM_UPDATE_MAX = 2000;
|
||||
|
||||
type TelegramMessage = Message.CommonMessage;
|
||||
|
||||
@@ -84,6 +86,62 @@ type MediaGroupEntry = {
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
|
||||
type TelegramUpdateKeyContext = {
|
||||
update?: {
|
||||
update_id?: number;
|
||||
message?: TelegramMessage;
|
||||
edited_message?: TelegramMessage;
|
||||
};
|
||||
update_id?: number;
|
||||
message?: TelegramMessage;
|
||||
callbackQuery?: { id?: string; message?: TelegramMessage };
|
||||
};
|
||||
|
||||
const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => {
|
||||
const updateId = ctx.update?.update_id ?? ctx.update_id;
|
||||
if (typeof updateId === "number") return `update:${updateId}`;
|
||||
const callbackId = ctx.callbackQuery?.id;
|
||||
if (callbackId) return `callback:${callbackId}`;
|
||||
const msg =
|
||||
ctx.message ??
|
||||
ctx.update?.message ??
|
||||
ctx.update?.edited_message ??
|
||||
ctx.callbackQuery?.message;
|
||||
const chatId = msg?.chat?.id;
|
||||
const messageId = msg?.message_id;
|
||||
if (typeof chatId !== "undefined" && typeof messageId === "number") {
|
||||
return `message:${chatId}:${messageId}`;
|
||||
}
|
||||
return undefined;
|
||||
};
|
||||
|
||||
const shouldSkipTelegramUpdate = (
|
||||
cache: Map<string, { ts: number }>,
|
||||
key?: string,
|
||||
) => {
|
||||
if (!key) return false;
|
||||
const now = Date.now();
|
||||
const existing = cache.get(key);
|
||||
if (existing && now - existing.ts < RECENT_TELEGRAM_UPDATE_TTL_MS) {
|
||||
return true;
|
||||
}
|
||||
if (existing) cache.delete(key);
|
||||
cache.set(key, { ts: now });
|
||||
if (cache.size > RECENT_TELEGRAM_UPDATE_MAX) {
|
||||
for (const [cachedKey, entry] of cache) {
|
||||
if (now - entry.ts > RECENT_TELEGRAM_UPDATE_TTL_MS) {
|
||||
cache.delete(cachedKey);
|
||||
}
|
||||
}
|
||||
while (cache.size > RECENT_TELEGRAM_UPDATE_MAX) {
|
||||
const oldestKey = cache.keys().next().value as string | undefined;
|
||||
if (!oldestKey) break;
|
||||
cache.delete(oldestKey);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
/** Telegram Location object */
|
||||
interface TelegramLocation {
|
||||
latitude: number;
|
||||
@@ -170,6 +228,16 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
bot.api.config.use(apiThrottler());
|
||||
bot.use(sequentialize(getTelegramSequentialKey));
|
||||
|
||||
const recentUpdates = new Map<string, { ts: number }>();
|
||||
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
|
||||
const key = buildTelegramUpdateKey(ctx);
|
||||
const skipped = shouldSkipTelegramUpdate(recentUpdates, key);
|
||||
if (skipped && key && shouldLogVerbose()) {
|
||||
logVerbose(`telegram dedupe: skipped ${key}`);
|
||||
}
|
||||
return skipped;
|
||||
};
|
||||
|
||||
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
|
||||
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
@@ -804,6 +872,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
bot.command(command.name, async (ctx) => {
|
||||
const msg = ctx.message;
|
||||
if (!msg) return;
|
||||
if (shouldSkipUpdate(ctx)) return;
|
||||
const chatId = msg.chat.id;
|
||||
const isGroup =
|
||||
msg.chat.type === "group" || msg.chat.type === "supergroup";
|
||||
@@ -997,6 +1066,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
bot.on("callback_query", async (ctx) => {
|
||||
const callback = ctx.callbackQuery;
|
||||
if (!callback) return;
|
||||
if (shouldSkipUpdate(ctx)) return;
|
||||
try {
|
||||
const data = (callback.data ?? "").trim();
|
||||
const callbackMessage = callback.message;
|
||||
@@ -1032,6 +1102,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
try {
|
||||
const msg = ctx.message;
|
||||
if (!msg) return;
|
||||
if (shouldSkipUpdate(ctx)) return;
|
||||
|
||||
const chatId = msg.chat.id;
|
||||
const isGroup =
|
||||
|
||||
Reference in New Issue
Block a user