From bd735182b6239459b52c1ff813ef6e166a531843 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Tue, 6 Jan 2026 09:34:33 +0530 Subject: [PATCH] feat(telegram): support media groups (multi-image messages) (#220) --- src/auto-reply/templating.ts | 3 + src/telegram/bot.media.test.ts | 132 +++++++++ src/telegram/bot.ts | 507 ++++++++++++++++++++------------- 3 files changed, 437 insertions(+), 205 deletions(-) diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index be5df6196..d2987ec22 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -10,6 +10,9 @@ export type MsgContext = { MediaPath?: string; MediaUrl?: string; MediaType?: string; + MediaPaths?: string[]; + MediaUrls?: string[]; + MediaTypes?: string[]; Transcript?: string; ChatType?: string; GroupSubject?: string; diff --git a/src/telegram/bot.media.test.ts b/src/telegram/bot.media.test.ts index 0157d0831..6f94e7cf8 100644 --- a/src/telegram/bot.media.test.ts +++ b/src/telegram/bot.media.test.ts @@ -209,3 +209,135 @@ describe("telegram inbound media", () => { fetchSpy.mockRestore(); }); }); + +describe("telegram media groups", () => { + const waitForMediaGroupProcessing = () => + new Promise((resolve) => setTimeout(resolve, 600)); + + it("buffers messages with same media_group_id and processes them together", async () => { + const { createTelegramBot } = await import("./bot.js"); + const replyModule = await import("../auto-reply/reply.js"); + const replySpy = replyModule.__replySpy as unknown as ReturnType< + typeof vi.fn + >; + + onSpy.mockReset(); + replySpy.mockReset(); + + const runtimeError = vi.fn(); + const fetchSpy = vi.spyOn(globalThis, "fetch" as never).mockResolvedValue({ + ok: true, + status: 200, + statusText: "OK", + headers: { get: () => "image/png" }, + arrayBuffer: async () => new Uint8Array([0x89, 0x50, 0x4e, 0x47]).buffer, + } as Response); + + createTelegramBot({ + token: "tok", + runtime: { + log: vi.fn(), + error: runtimeError, + exit: () => { + throw new Error("exit"); + }, + }, + }); + const handler = onSpy.mock.calls[0][1] as ( + ctx: Record, + ) => Promise; + + await handler({ + message: { + chat: { id: 42, type: "private" }, + message_id: 1, + caption: "Here are my photos", + date: 1736380800, + media_group_id: "album123", + photo: [{ file_id: "photo1" }], + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({ file_path: "photos/photo1.jpg" }), + }); + + await handler({ + message: { + chat: { id: 42, type: "private" }, + message_id: 2, + date: 1736380801, + media_group_id: "album123", + photo: [{ file_id: "photo2" }], + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({ file_path: "photos/photo2.jpg" }), + }); + + expect(replySpy).not.toHaveBeenCalled(); + await waitForMediaGroupProcessing(); + + expect(runtimeError).not.toHaveBeenCalled(); + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0][0]; + expect(payload.Body).toContain("Here are my photos"); + expect(payload.MediaPaths).toHaveLength(2); + + fetchSpy.mockRestore(); + }, 2000); + + it("processes separate media groups independently", async () => { + const { createTelegramBot } = await import("./bot.js"); + const replyModule = await import("../auto-reply/reply.js"); + const replySpy = replyModule.__replySpy as unknown as ReturnType< + typeof vi.fn + >; + + onSpy.mockReset(); + replySpy.mockReset(); + + const fetchSpy = vi.spyOn(globalThis, "fetch" as never).mockResolvedValue({ + ok: true, + status: 200, + statusText: "OK", + headers: { get: () => "image/png" }, + arrayBuffer: async () => new Uint8Array([0x89, 0x50, 0x4e, 0x47]).buffer, + } as Response); + + createTelegramBot({ token: "tok" }); + const handler = onSpy.mock.calls[0][1] as ( + ctx: Record, + ) => Promise; + + await handler({ + message: { + chat: { id: 42, type: "private" }, + message_id: 1, + caption: "Album A", + date: 1736380800, + media_group_id: "albumA", + photo: [{ file_id: "photoA1" }], + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({ file_path: "photos/photoA1.jpg" }), + }); + + await handler({ + message: { + chat: { id: 42, type: "private" }, + message_id: 2, + caption: "Album B", + date: 1736380801, + media_group_id: "albumB", + photo: [{ file_id: "photoB1" }], + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({ file_path: "photos/photoB1.jpg" }), + }); + + expect(replySpy).not.toHaveBeenCalled(); + await waitForMediaGroupProcessing(); + + expect(replySpy).toHaveBeenCalledTimes(2); + + fetchSpy.mockRestore(); + }, 2000); +}); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 0353a08b1..c9ead4cd4 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -34,8 +34,20 @@ import { loadWebMedia } from "../web/media.js"; const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i; +// Media group aggregation - Telegram sends multi-image messages as separate updates +// with a shared media_group_id. We buffer them and process as a single message after a short delay. +const MEDIA_GROUP_TIMEOUT_MS = 500; + type TelegramMessage = Message.CommonMessage; +type MediaGroupEntry = { + messages: Array<{ + msg: TelegramMessage; + ctx: TelegramContext; + }>; + timer: ReturnType; +}; + type TelegramContext = { message: TelegramMessage; me?: { username?: string }; @@ -69,6 +81,8 @@ export function createTelegramBot(opts: TelegramBotOptions) { const bot = new Bot(opts.token, { client }); bot.api.config.use(apiThrottler()); + const mediaGroupBuffer = new Map(); + const cfg = loadConfig(); const textLimit = resolveTextChunkLimit(cfg, "telegram"); const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom; @@ -94,14 +108,249 @@ export function createTelegramBot(opts: TelegramBotOptions) { overrideOrder: "after-config", }); + const processMessage = async ( + primaryCtx: TelegramContext, + allMedia: Array<{ path: string; contentType?: string }>, + ) => { + const msg = primaryCtx.message; + const chatId = msg.chat.id; + const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; + + const sendTyping = async () => { + try { + await bot.api.sendChatAction(chatId, "typing"); + } catch (err) { + logVerbose( + `telegram typing cue failed for chat ${chatId}: ${String(err)}`, + ); + } + }; + + // allowFrom for direct chats + if (!isGroup && Array.isArray(allowFrom) && allowFrom.length > 0) { + const candidate = String(chatId); + const allowed = allowFrom.map(String); + const allowedWithPrefix = allowFrom.map((v) => `telegram:${String(v)}`); + const permitted = + allowed.includes(candidate) || + allowedWithPrefix.includes(`telegram:${candidate}`) || + allowed.includes("*"); + if (!permitted) { + logVerbose( + `Blocked unauthorized telegram sender ${candidate} (not in allowFrom)`, + ); + return; + } + } + + const botUsername = primaryCtx.me?.username?.toLowerCase(); + const allowFromList = Array.isArray(allowFrom) + ? allowFrom.map((entry) => String(entry).trim()).filter(Boolean) + : []; + const senderId = msg.from?.id ? String(msg.from.id) : ""; + const senderUsername = msg.from?.username ?? ""; + const commandAuthorized = + allowFromList.length === 0 || + allowFromList.includes("*") || + (senderId && allowFromList.includes(senderId)) || + (senderId && allowFromList.includes(`telegram:${senderId}`)) || + (senderUsername && + allowFromList.some( + (entry) => + entry.toLowerCase() === senderUsername.toLowerCase() || + entry.toLowerCase() === `@${senderUsername.toLowerCase()}`, + )); + const wasMentioned = + (Boolean(botUsername) && hasBotMention(msg, botUsername)) || + matchesMentionPatterns(msg.text ?? msg.caption ?? "", mentionRegexes); + const hasAnyMention = (msg.entities ?? msg.caption_entities ?? []).some( + (ent) => ent.type === "mention", + ); + const requireMention = resolveGroupRequireMention(chatId); + const shouldBypassMention = + isGroup && + requireMention && + !wasMentioned && + !hasAnyMention && + commandAuthorized && + hasControlCommand(msg.text ?? msg.caption ?? ""); + const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0; + if (isGroup && requireMention && canDetectMention) { + if (!wasMentioned && !shouldBypassMention) { + logger.info({ chatId, reason: "no-mention" }, "skipping group message"); + return; + } + } + + // ACK reactions + const shouldAckReaction = () => { + if (!ackReaction) return false; + if (ackReactionScope === "all") return true; + if (ackReactionScope === "direct") return !isGroup; + if (ackReactionScope === "group-all") return isGroup; + if (ackReactionScope === "group-mentions") { + if (!isGroup) return false; + if (!requireMention) return false; + if (!canDetectMention) return false; + return wasMentioned || shouldBypassMention; + } + return false; + }; + if (shouldAckReaction() && msg.message_id) { + const api = bot.api as unknown as { + setMessageReaction?: ( + chatId: number | string, + messageId: number, + reactions: Array<{ type: "emoji"; emoji: string }>, + ) => Promise; + }; + if (typeof api.setMessageReaction === "function") { + api + .setMessageReaction(chatId, msg.message_id, [ + { type: "emoji", emoji: ackReaction }, + ]) + .catch((err) => { + logVerbose( + `telegram react failed for chat ${chatId}: ${String(err)}`, + ); + }); + } + } + + let placeholder = ""; + if (msg.photo) placeholder = ""; + else if (msg.video) placeholder = ""; + else if (msg.audio || msg.voice) placeholder = ""; + else if (msg.document) placeholder = ""; + + const replyTarget = describeReplyTarget(msg); + const rawBody = (msg.text ?? msg.caption ?? placeholder).trim(); + if (!rawBody && allMedia.length === 0) return; + + let bodyText = rawBody; + if (!bodyText && allMedia.length > 0) { + bodyText = `${allMedia.length > 1 ? ` (${allMedia.length} images)` : ""}`; + } + + const replySuffix = replyTarget + ? `\n\n[Replying to ${replyTarget.sender}${ + replyTarget.id ? ` id:${replyTarget.id}` : "" + }]\n${replyTarget.body}\n[/Replying]` + : ""; + const body = formatAgentEnvelope({ + surface: "Telegram", + from: isGroup + ? buildGroupLabel(msg, chatId) + : buildSenderLabel(msg, chatId), + timestamp: msg.date ? msg.date * 1000 : undefined, + body: `${bodyText}${replySuffix}`, + }); + + const ctxPayload = { + Body: body, + From: isGroup ? `group:${chatId}` : `telegram:${chatId}`, + To: `telegram:${chatId}`, + ChatType: isGroup ? "group" : "direct", + GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined, + SenderName: buildSenderName(msg), + SenderId: senderId || undefined, + SenderUsername: senderUsername || undefined, + Surface: "telegram", + MessageSid: String(msg.message_id), + ReplyToId: replyTarget?.id, + ReplyToBody: replyTarget?.body, + ReplyToSender: replyTarget?.sender, + Timestamp: msg.date ? msg.date * 1000 : undefined, + WasMentioned: isGroup ? wasMentioned : undefined, + MediaPath: allMedia[0]?.path, + MediaType: allMedia[0]?.contentType, + MediaUrl: allMedia[0]?.path, + MediaPaths: allMedia.length > 0 ? allMedia.map((m) => m.path) : undefined, + MediaUrls: allMedia.length > 0 ? allMedia.map((m) => m.path) : undefined, + MediaTypes: + allMedia.length > 0 + ? (allMedia.map((m) => m.contentType).filter(Boolean) as string[]) + : undefined, + CommandAuthorized: commandAuthorized, + }; + + if (replyTarget && shouldLogVerbose()) { + const preview = replyTarget.body.replace(/\s+/g, " ").slice(0, 120); + logVerbose( + `telegram reply-context: replyToId=${replyTarget.id} replyToSender=${replyTarget.sender} replyToBody="${preview}"`, + ); + } + + if (!isGroup) { + const sessionCfg = cfg.session; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const storePath = resolveStorePath(sessionCfg?.store); + await updateLastRoute({ + storePath, + sessionKey: mainKey, + channel: "telegram", + to: String(chatId), + }); + } + + if (shouldLogVerbose()) { + const preview = body.slice(0, 200).replace(/\n/g, "\\n"); + const mediaInfo = + allMedia.length > 1 ? ` mediaCount=${allMedia.length}` : ""; + logVerbose( + `telegram inbound: chatId=${chatId} from=${ctxPayload.From} len=${body.length}${mediaInfo} preview="${preview}"`, + ); + } + + let typingController: TypingController | undefined; + const dispatcher = createReplyDispatcher({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + chatId: String(chatId), + token: opts.token, + runtime, + bot, + replyToMode, + textLimit, + }); + }, + onIdle: () => { + typingController?.markDispatchIdle(); + }, + onError: (err, info) => { + runtime.error?.( + danger(`telegram ${info.kind} reply failed: ${String(err)}`), + ); + }, + }); + + const { queuedFinal } = await dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + onReplyStart: sendTyping, + onTypingController: (typing) => { + typingController = typing; + }, + }, + }); + typingController?.markDispatchIdle(); + if (!queuedFinal) return; + }; + bot.on("message", async (ctx) => { try { const msg = ctx.message; if (!msg) return; + const chatId = msg.chat.id; const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; + // Group policy check - skip disallowed groups early if (isGroup) { const groupPolicy = resolveGroupPolicy(chatId); if (groupPolicy.allowlistEnabled && !groupPolicy.allowed) { @@ -113,74 +362,28 @@ export function createTelegramBot(opts: TelegramBotOptions) { } } - const sendTyping = async () => { - try { - await bot.api.sendChatAction(chatId, "typing"); - } catch (err) { - logVerbose( - `telegram typing cue failed for chat ${chatId}: ${String(err)}`, - ); - } - }; - - // allowFrom for direct chats - if (!isGroup && Array.isArray(allowFrom) && allowFrom.length > 0) { - const candidate = String(chatId); - const allowed = allowFrom.map(String); - const allowedWithPrefix = allowFrom.map((v) => `telegram:${String(v)}`); - const permitted = - allowed.includes(candidate) || - allowedWithPrefix.includes(`telegram:${candidate}`) || - allowed.includes("*"); - if (!permitted) { - logVerbose( - `Blocked unauthorized telegram sender ${candidate} (not in allowFrom)`, - ); - return; - } - } - - const botUsername = ctx.me?.username?.toLowerCase(); - const allowFromList = Array.isArray(allowFrom) - ? allowFrom.map((entry) => String(entry).trim()).filter(Boolean) - : []; - const senderId = msg.from?.id ? String(msg.from.id) : ""; - const senderUsername = msg.from?.username ?? ""; - const commandAuthorized = - allowFromList.length === 0 || - allowFromList.includes("*") || - (senderId && allowFromList.includes(senderId)) || - (senderId && allowFromList.includes(`telegram:${senderId}`)) || - (senderUsername && - allowFromList.some( - (entry) => - entry.toLowerCase() === senderUsername.toLowerCase() || - entry.toLowerCase() === `@${senderUsername.toLowerCase()}`, - )); - const wasMentioned = - (Boolean(botUsername) && hasBotMention(msg, botUsername)) || - matchesMentionPatterns(msg.text ?? msg.caption ?? "", mentionRegexes); - const hasAnyMention = (msg.entities ?? msg.caption_entities ?? []).some( - (ent) => ent.type === "mention", - ); - const requireMention = resolveGroupRequireMention(chatId); - const shouldBypassMention = - isGroup && - requireMention && - !wasMentioned && - !hasAnyMention && - commandAuthorized && - hasControlCommand(msg.text ?? msg.caption ?? ""); - const canDetectMention = - Boolean(botUsername) || mentionRegexes.length > 0; - if (isGroup && requireMention && canDetectMention) { - if (!wasMentioned && !shouldBypassMention) { - logger.info( - { chatId, reason: "no-mention" }, - "skipping group message", - ); - return; + // Media group handling - buffer multi-image messages + const mediaGroupId = (msg as { media_group_id?: string }).media_group_id; + if (mediaGroupId) { + const existing = mediaGroupBuffer.get(mediaGroupId); + if (existing) { + clearTimeout(existing.timer); + existing.messages.push({ msg, ctx }); + existing.timer = setTimeout(async () => { + mediaGroupBuffer.delete(mediaGroupId); + await processMediaGroup(existing); + }, MEDIA_GROUP_TIMEOUT_MS); + } else { + const entry: MediaGroupEntry = { + messages: [{ msg, ctx }], + timer: setTimeout(async () => { + mediaGroupBuffer.delete(mediaGroupId); + await processMediaGroup(entry); + }, MEDIA_GROUP_TIMEOUT_MS), + }; + mediaGroupBuffer.set(mediaGroupId, entry); } + return; } const media = await resolveMedia( @@ -189,149 +392,43 @@ export function createTelegramBot(opts: TelegramBotOptions) { opts.token, opts.proxyFetch, ); - const replyTarget = describeReplyTarget(msg); - const rawBody = ( - msg.text ?? - msg.caption ?? - media?.placeholder ?? - "" - ).trim(); - if (!rawBody) return; - const shouldAckReaction = () => { - if (!ackReaction) return false; - if (ackReactionScope === "all") return true; - if (ackReactionScope === "direct") return !isGroup; - if (ackReactionScope === "group-all") return isGroup; - if (ackReactionScope === "group-mentions") { - if (!isGroup) return false; - if (!resolveGroupRequireMention(chatId)) return false; - if (!canDetectMention) return false; - return wasMentioned || shouldBypassMention; - } - return false; - }; - if (shouldAckReaction() && msg.message_id) { - const api = bot.api as unknown as { - setMessageReaction?: ( - chatId: number | string, - messageId: number, - reactions: Array<{ type: "emoji"; emoji: string }>, - ) => Promise; - }; - if (typeof api.setMessageReaction === "function") { - api - .setMessageReaction(chatId, msg.message_id, [ - { type: "emoji", emoji: ackReaction }, - ]) - .catch((err) => { - logVerbose( - `telegram react failed for chat ${chatId}: ${String(err)}`, - ); - }); - } - } - const replySuffix = replyTarget - ? `\n\n[Replying to ${replyTarget.sender}${replyTarget.id ? ` id:${replyTarget.id}` : ""}]\n${replyTarget.body}\n[/Replying]` - : ""; - const body = formatAgentEnvelope({ - surface: "Telegram", - from: isGroup - ? buildGroupLabel(msg, chatId) - : buildSenderLabel(msg, chatId), - timestamp: msg.date ? msg.date * 1000 : undefined, - body: `${rawBody}${replySuffix}`, - }); - - const ctxPayload = { - Body: body, - From: isGroup ? `group:${chatId}` : `telegram:${chatId}`, - To: `telegram:${chatId}`, - ChatType: isGroup ? "group" : "direct", - GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined, - SenderName: buildSenderName(msg), - SenderId: senderId || undefined, - SenderUsername: senderUsername || undefined, - Surface: "telegram", - MessageSid: String(msg.message_id), - ReplyToId: replyTarget?.id, - ReplyToBody: replyTarget?.body, - ReplyToSender: replyTarget?.sender, - Timestamp: msg.date ? msg.date * 1000 : undefined, - WasMentioned: isGroup ? wasMentioned : undefined, - MediaPath: media?.path, - MediaType: media?.contentType, - MediaUrl: media?.path, - CommandAuthorized: commandAuthorized, - }; - - if (replyTarget && shouldLogVerbose()) { - const preview = replyTarget.body.replace(/\s+/g, " ").slice(0, 120); - logVerbose( - `telegram reply-context: replyToId=${replyTarget.id} replyToSender=${replyTarget.sender} replyToBody="${preview}"`, - ); - } - - if (!isGroup) { - const sessionCfg = cfg.session; - const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; - const storePath = resolveStorePath(sessionCfg?.store); - await updateLastRoute({ - storePath, - sessionKey: mainKey, - channel: "telegram", - to: String(chatId), - }); - } - - if (shouldLogVerbose()) { - const preview = body.slice(0, 200).replace(/\n/g, "\\n"); - logVerbose( - `telegram inbound: chatId=${chatId} from=${ctxPayload.From} len=${body.length} preview="${preview}"`, - ); - } - - let typingController: TypingController | undefined; - const dispatcher = createReplyDispatcher({ - responsePrefix: cfg.messages?.responsePrefix, - deliver: async (payload) => { - await deliverReplies({ - replies: [payload], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - replyToMode, - textLimit, - }); - }, - onIdle: () => { - typingController?.markDispatchIdle(); - }, - onError: (err, info) => { - runtime.error?.( - danger(`telegram ${info.kind} reply failed: ${String(err)}`), - ); - }, - }); - - const { queuedFinal } = await dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - onReplyStart: sendTyping, - onTypingController: (typing) => { - typingController = typing; - }, - }, - }); - typingController?.markDispatchIdle(); - if (!queuedFinal) return; + const allMedia = media + ? [{ path: media.path, contentType: media.contentType }] + : []; + await processMessage(ctx, allMedia); } catch (err) { runtime.error?.(danger(`handler failed: ${String(err)}`)); } }); + const processMediaGroup = async (entry: MediaGroupEntry) => { + try { + entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); + + const captionMsg = entry.messages.find( + (m) => m.msg.caption || m.msg.text, + ); + const primaryEntry = captionMsg ?? entry.messages[0]; + + const allMedia: Array<{ path: string; contentType?: string }> = []; + for (const { ctx } of entry.messages) { + const media = await resolveMedia( + ctx, + mediaMaxBytes, + opts.token, + opts.proxyFetch, + ); + if (media) { + allMedia.push({ path: media.path, contentType: media.contentType }); + } + } + + await processMessage(primaryEntry.ctx, allMedia); + } catch (err) { + runtime.error?.(danger(`media group handler failed: ${String(err)}`)); + } + }; + return bot; }