feat(telegram): support media groups (multi-image messages) (#220)

This commit is contained in:
Ayaan Zaidi
2026-01-06 09:34:33 +05:30
committed by GitHub
parent fb2513e265
commit bd735182b6
3 changed files with 437 additions and 205 deletions

View File

@@ -10,6 +10,9 @@ export type MsgContext = {
MediaPath?: string;
MediaUrl?: string;
MediaType?: string;
MediaPaths?: string[];
MediaUrls?: string[];
MediaTypes?: string[];
Transcript?: string;
ChatType?: string;
GroupSubject?: string;

View File

@@ -209,3 +209,135 @@ describe("telegram inbound media", () => {
fetchSpy.mockRestore();
});
});
describe("telegram media groups", () => {
const waitForMediaGroupProcessing = () =>
new Promise((resolve) => setTimeout(resolve, 600));
it("buffers messages with same media_group_id and processes them together", async () => {
const { createTelegramBot } = await import("./bot.js");
const replyModule = await import("../auto-reply/reply.js");
const replySpy = replyModule.__replySpy as unknown as ReturnType<
typeof vi.fn
>;
onSpy.mockReset();
replySpy.mockReset();
const runtimeError = vi.fn();
const fetchSpy = vi.spyOn(globalThis, "fetch" as never).mockResolvedValue({
ok: true,
status: 200,
statusText: "OK",
headers: { get: () => "image/png" },
arrayBuffer: async () => new Uint8Array([0x89, 0x50, 0x4e, 0x47]).buffer,
} as Response);
createTelegramBot({
token: "tok",
runtime: {
log: vi.fn(),
error: runtimeError,
exit: () => {
throw new Error("exit");
},
},
});
const handler = onSpy.mock.calls[0][1] as (
ctx: Record<string, unknown>,
) => Promise<void>;
await handler({
message: {
chat: { id: 42, type: "private" },
message_id: 1,
caption: "Here are my photos",
date: 1736380800,
media_group_id: "album123",
photo: [{ file_id: "photo1" }],
},
me: { username: "clawdbot_bot" },
getFile: async () => ({ file_path: "photos/photo1.jpg" }),
});
await handler({
message: {
chat: { id: 42, type: "private" },
message_id: 2,
date: 1736380801,
media_group_id: "album123",
photo: [{ file_id: "photo2" }],
},
me: { username: "clawdbot_bot" },
getFile: async () => ({ file_path: "photos/photo2.jpg" }),
});
expect(replySpy).not.toHaveBeenCalled();
await waitForMediaGroupProcessing();
expect(runtimeError).not.toHaveBeenCalled();
expect(replySpy).toHaveBeenCalledTimes(1);
const payload = replySpy.mock.calls[0][0];
expect(payload.Body).toContain("Here are my photos");
expect(payload.MediaPaths).toHaveLength(2);
fetchSpy.mockRestore();
}, 2000);
it("processes separate media groups independently", async () => {
const { createTelegramBot } = await import("./bot.js");
const replyModule = await import("../auto-reply/reply.js");
const replySpy = replyModule.__replySpy as unknown as ReturnType<
typeof vi.fn
>;
onSpy.mockReset();
replySpy.mockReset();
const fetchSpy = vi.spyOn(globalThis, "fetch" as never).mockResolvedValue({
ok: true,
status: 200,
statusText: "OK",
headers: { get: () => "image/png" },
arrayBuffer: async () => new Uint8Array([0x89, 0x50, 0x4e, 0x47]).buffer,
} as Response);
createTelegramBot({ token: "tok" });
const handler = onSpy.mock.calls[0][1] as (
ctx: Record<string, unknown>,
) => Promise<void>;
await handler({
message: {
chat: { id: 42, type: "private" },
message_id: 1,
caption: "Album A",
date: 1736380800,
media_group_id: "albumA",
photo: [{ file_id: "photoA1" }],
},
me: { username: "clawdbot_bot" },
getFile: async () => ({ file_path: "photos/photoA1.jpg" }),
});
await handler({
message: {
chat: { id: 42, type: "private" },
message_id: 2,
caption: "Album B",
date: 1736380801,
media_group_id: "albumB",
photo: [{ file_id: "photoB1" }],
},
me: { username: "clawdbot_bot" },
getFile: async () => ({ file_path: "photos/photoB1.jpg" }),
});
expect(replySpy).not.toHaveBeenCalled();
await waitForMediaGroupProcessing();
expect(replySpy).toHaveBeenCalledTimes(2);
fetchSpy.mockRestore();
}, 2000);
});

View File

@@ -34,8 +34,20 @@ import { loadWebMedia } from "../web/media.js";
const PARSE_ERR_RE =
/can't parse entities|parse entities|find end of the entity/i;
// Media group aggregation - Telegram sends multi-image messages as separate updates
// with a shared media_group_id. We buffer them and process as a single message after a short delay.
const MEDIA_GROUP_TIMEOUT_MS = 500;
type TelegramMessage = Message.CommonMessage;
type MediaGroupEntry = {
messages: Array<{
msg: TelegramMessage;
ctx: TelegramContext;
}>;
timer: ReturnType<typeof setTimeout>;
};
type TelegramContext = {
message: TelegramMessage;
me?: { username?: string };
@@ -69,6 +81,8 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const bot = new Bot(opts.token, { client });
bot.api.config.use(apiThrottler());
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
const cfg = loadConfig();
const textLimit = resolveTextChunkLimit(cfg, "telegram");
const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom;
@@ -94,14 +108,249 @@ export function createTelegramBot(opts: TelegramBotOptions) {
overrideOrder: "after-config",
});
const processMessage = async (
primaryCtx: TelegramContext,
allMedia: Array<{ path: string; contentType?: string }>,
) => {
const msg = primaryCtx.message;
const chatId = msg.chat.id;
const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup";
const sendTyping = async () => {
try {
await bot.api.sendChatAction(chatId, "typing");
} catch (err) {
logVerbose(
`telegram typing cue failed for chat ${chatId}: ${String(err)}`,
);
}
};
// allowFrom for direct chats
if (!isGroup && Array.isArray(allowFrom) && allowFrom.length > 0) {
const candidate = String(chatId);
const allowed = allowFrom.map(String);
const allowedWithPrefix = allowFrom.map((v) => `telegram:${String(v)}`);
const permitted =
allowed.includes(candidate) ||
allowedWithPrefix.includes(`telegram:${candidate}`) ||
allowed.includes("*");
if (!permitted) {
logVerbose(
`Blocked unauthorized telegram sender ${candidate} (not in allowFrom)`,
);
return;
}
}
const botUsername = primaryCtx.me?.username?.toLowerCase();
const allowFromList = Array.isArray(allowFrom)
? allowFrom.map((entry) => String(entry).trim()).filter(Boolean)
: [];
const senderId = msg.from?.id ? String(msg.from.id) : "";
const senderUsername = msg.from?.username ?? "";
const commandAuthorized =
allowFromList.length === 0 ||
allowFromList.includes("*") ||
(senderId && allowFromList.includes(senderId)) ||
(senderId && allowFromList.includes(`telegram:${senderId}`)) ||
(senderUsername &&
allowFromList.some(
(entry) =>
entry.toLowerCase() === senderUsername.toLowerCase() ||
entry.toLowerCase() === `@${senderUsername.toLowerCase()}`,
));
const wasMentioned =
(Boolean(botUsername) && hasBotMention(msg, botUsername)) ||
matchesMentionPatterns(msg.text ?? msg.caption ?? "", mentionRegexes);
const hasAnyMention = (msg.entities ?? msg.caption_entities ?? []).some(
(ent) => ent.type === "mention",
);
const requireMention = resolveGroupRequireMention(chatId);
const shouldBypassMention =
isGroup &&
requireMention &&
!wasMentioned &&
!hasAnyMention &&
commandAuthorized &&
hasControlCommand(msg.text ?? msg.caption ?? "");
const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0;
if (isGroup && requireMention && canDetectMention) {
if (!wasMentioned && !shouldBypassMention) {
logger.info({ chatId, reason: "no-mention" }, "skipping group message");
return;
}
}
// ACK reactions
const shouldAckReaction = () => {
if (!ackReaction) return false;
if (ackReactionScope === "all") return true;
if (ackReactionScope === "direct") return !isGroup;
if (ackReactionScope === "group-all") return isGroup;
if (ackReactionScope === "group-mentions") {
if (!isGroup) return false;
if (!requireMention) return false;
if (!canDetectMention) return false;
return wasMentioned || shouldBypassMention;
}
return false;
};
if (shouldAckReaction() && msg.message_id) {
const api = bot.api as unknown as {
setMessageReaction?: (
chatId: number | string,
messageId: number,
reactions: Array<{ type: "emoji"; emoji: string }>,
) => Promise<void>;
};
if (typeof api.setMessageReaction === "function") {
api
.setMessageReaction(chatId, msg.message_id, [
{ type: "emoji", emoji: ackReaction },
])
.catch((err) => {
logVerbose(
`telegram react failed for chat ${chatId}: ${String(err)}`,
);
});
}
}
let placeholder = "";
if (msg.photo) placeholder = "<media:image>";
else if (msg.video) placeholder = "<media:video>";
else if (msg.audio || msg.voice) placeholder = "<media:audio>";
else if (msg.document) placeholder = "<media:document>";
const replyTarget = describeReplyTarget(msg);
const rawBody = (msg.text ?? msg.caption ?? placeholder).trim();
if (!rawBody && allMedia.length === 0) return;
let bodyText = rawBody;
if (!bodyText && allMedia.length > 0) {
bodyText = `<media:image>${allMedia.length > 1 ? ` (${allMedia.length} images)` : ""}`;
}
const replySuffix = replyTarget
? `\n\n[Replying to ${replyTarget.sender}${
replyTarget.id ? ` id:${replyTarget.id}` : ""
}]\n${replyTarget.body}\n[/Replying]`
: "";
const body = formatAgentEnvelope({
surface: "Telegram",
from: isGroup
? buildGroupLabel(msg, chatId)
: buildSenderLabel(msg, chatId),
timestamp: msg.date ? msg.date * 1000 : undefined,
body: `${bodyText}${replySuffix}`,
});
const ctxPayload = {
Body: body,
From: isGroup ? `group:${chatId}` : `telegram:${chatId}`,
To: `telegram:${chatId}`,
ChatType: isGroup ? "group" : "direct",
GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined,
SenderName: buildSenderName(msg),
SenderId: senderId || undefined,
SenderUsername: senderUsername || undefined,
Surface: "telegram",
MessageSid: String(msg.message_id),
ReplyToId: replyTarget?.id,
ReplyToBody: replyTarget?.body,
ReplyToSender: replyTarget?.sender,
Timestamp: msg.date ? msg.date * 1000 : undefined,
WasMentioned: isGroup ? wasMentioned : undefined,
MediaPath: allMedia[0]?.path,
MediaType: allMedia[0]?.contentType,
MediaUrl: allMedia[0]?.path,
MediaPaths: allMedia.length > 0 ? allMedia.map((m) => m.path) : undefined,
MediaUrls: allMedia.length > 0 ? allMedia.map((m) => m.path) : undefined,
MediaTypes:
allMedia.length > 0
? (allMedia.map((m) => m.contentType).filter(Boolean) as string[])
: undefined,
CommandAuthorized: commandAuthorized,
};
if (replyTarget && shouldLogVerbose()) {
const preview = replyTarget.body.replace(/\s+/g, " ").slice(0, 120);
logVerbose(
`telegram reply-context: replyToId=${replyTarget.id} replyToSender=${replyTarget.sender} replyToBody="${preview}"`,
);
}
if (!isGroup) {
const sessionCfg = cfg.session;
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const storePath = resolveStorePath(sessionCfg?.store);
await updateLastRoute({
storePath,
sessionKey: mainKey,
channel: "telegram",
to: String(chatId),
});
}
if (shouldLogVerbose()) {
const preview = body.slice(0, 200).replace(/\n/g, "\\n");
const mediaInfo =
allMedia.length > 1 ? ` mediaCount=${allMedia.length}` : "";
logVerbose(
`telegram inbound: chatId=${chatId} from=${ctxPayload.From} len=${body.length}${mediaInfo} preview="${preview}"`,
);
}
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
await deliverReplies({
replies: [payload],
chatId: String(chatId),
token: opts.token,
runtime,
bot,
replyToMode,
textLimit,
});
},
onIdle: () => {
typingController?.markDispatchIdle();
},
onError: (err, info) => {
runtime.error?.(
danger(`telegram ${info.kind} reply failed: ${String(err)}`),
);
},
});
const { queuedFinal } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
onReplyStart: sendTyping,
onTypingController: (typing) => {
typingController = typing;
},
},
});
typingController?.markDispatchIdle();
if (!queuedFinal) return;
};
bot.on("message", async (ctx) => {
try {
const msg = ctx.message;
if (!msg) return;
const chatId = msg.chat.id;
const isGroup =
msg.chat.type === "group" || msg.chat.type === "supergroup";
// Group policy check - skip disallowed groups early
if (isGroup) {
const groupPolicy = resolveGroupPolicy(chatId);
if (groupPolicy.allowlistEnabled && !groupPolicy.allowed) {
@@ -113,74 +362,28 @@ export function createTelegramBot(opts: TelegramBotOptions) {
}
}
const sendTyping = async () => {
try {
await bot.api.sendChatAction(chatId, "typing");
} catch (err) {
logVerbose(
`telegram typing cue failed for chat ${chatId}: ${String(err)}`,
);
}
};
// allowFrom for direct chats
if (!isGroup && Array.isArray(allowFrom) && allowFrom.length > 0) {
const candidate = String(chatId);
const allowed = allowFrom.map(String);
const allowedWithPrefix = allowFrom.map((v) => `telegram:${String(v)}`);
const permitted =
allowed.includes(candidate) ||
allowedWithPrefix.includes(`telegram:${candidate}`) ||
allowed.includes("*");
if (!permitted) {
logVerbose(
`Blocked unauthorized telegram sender ${candidate} (not in allowFrom)`,
);
return;
}
}
const botUsername = ctx.me?.username?.toLowerCase();
const allowFromList = Array.isArray(allowFrom)
? allowFrom.map((entry) => String(entry).trim()).filter(Boolean)
: [];
const senderId = msg.from?.id ? String(msg.from.id) : "";
const senderUsername = msg.from?.username ?? "";
const commandAuthorized =
allowFromList.length === 0 ||
allowFromList.includes("*") ||
(senderId && allowFromList.includes(senderId)) ||
(senderId && allowFromList.includes(`telegram:${senderId}`)) ||
(senderUsername &&
allowFromList.some(
(entry) =>
entry.toLowerCase() === senderUsername.toLowerCase() ||
entry.toLowerCase() === `@${senderUsername.toLowerCase()}`,
));
const wasMentioned =
(Boolean(botUsername) && hasBotMention(msg, botUsername)) ||
matchesMentionPatterns(msg.text ?? msg.caption ?? "", mentionRegexes);
const hasAnyMention = (msg.entities ?? msg.caption_entities ?? []).some(
(ent) => ent.type === "mention",
);
const requireMention = resolveGroupRequireMention(chatId);
const shouldBypassMention =
isGroup &&
requireMention &&
!wasMentioned &&
!hasAnyMention &&
commandAuthorized &&
hasControlCommand(msg.text ?? msg.caption ?? "");
const canDetectMention =
Boolean(botUsername) || mentionRegexes.length > 0;
if (isGroup && requireMention && canDetectMention) {
if (!wasMentioned && !shouldBypassMention) {
logger.info(
{ chatId, reason: "no-mention" },
"skipping group message",
);
return;
// Media group handling - buffer multi-image messages
const mediaGroupId = (msg as { media_group_id?: string }).media_group_id;
if (mediaGroupId) {
const existing = mediaGroupBuffer.get(mediaGroupId);
if (existing) {
clearTimeout(existing.timer);
existing.messages.push({ msg, ctx });
existing.timer = setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
await processMediaGroup(existing);
}, MEDIA_GROUP_TIMEOUT_MS);
} else {
const entry: MediaGroupEntry = {
messages: [{ msg, ctx }],
timer: setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
await processMediaGroup(entry);
}, MEDIA_GROUP_TIMEOUT_MS),
};
mediaGroupBuffer.set(mediaGroupId, entry);
}
return;
}
const media = await resolveMedia(
@@ -189,149 +392,43 @@ export function createTelegramBot(opts: TelegramBotOptions) {
opts.token,
opts.proxyFetch,
);
const replyTarget = describeReplyTarget(msg);
const rawBody = (
msg.text ??
msg.caption ??
media?.placeholder ??
""
).trim();
if (!rawBody) return;
const shouldAckReaction = () => {
if (!ackReaction) return false;
if (ackReactionScope === "all") return true;
if (ackReactionScope === "direct") return !isGroup;
if (ackReactionScope === "group-all") return isGroup;
if (ackReactionScope === "group-mentions") {
if (!isGroup) return false;
if (!resolveGroupRequireMention(chatId)) return false;
if (!canDetectMention) return false;
return wasMentioned || shouldBypassMention;
}
return false;
};
if (shouldAckReaction() && msg.message_id) {
const api = bot.api as unknown as {
setMessageReaction?: (
chatId: number | string,
messageId: number,
reactions: Array<{ type: "emoji"; emoji: string }>,
) => Promise<void>;
};
if (typeof api.setMessageReaction === "function") {
api
.setMessageReaction(chatId, msg.message_id, [
{ type: "emoji", emoji: ackReaction },
])
.catch((err) => {
logVerbose(
`telegram react failed for chat ${chatId}: ${String(err)}`,
);
});
}
}
const replySuffix = replyTarget
? `\n\n[Replying to ${replyTarget.sender}${replyTarget.id ? ` id:${replyTarget.id}` : ""}]\n${replyTarget.body}\n[/Replying]`
: "";
const body = formatAgentEnvelope({
surface: "Telegram",
from: isGroup
? buildGroupLabel(msg, chatId)
: buildSenderLabel(msg, chatId),
timestamp: msg.date ? msg.date * 1000 : undefined,
body: `${rawBody}${replySuffix}`,
});
const ctxPayload = {
Body: body,
From: isGroup ? `group:${chatId}` : `telegram:${chatId}`,
To: `telegram:${chatId}`,
ChatType: isGroup ? "group" : "direct",
GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined,
SenderName: buildSenderName(msg),
SenderId: senderId || undefined,
SenderUsername: senderUsername || undefined,
Surface: "telegram",
MessageSid: String(msg.message_id),
ReplyToId: replyTarget?.id,
ReplyToBody: replyTarget?.body,
ReplyToSender: replyTarget?.sender,
Timestamp: msg.date ? msg.date * 1000 : undefined,
WasMentioned: isGroup ? wasMentioned : undefined,
MediaPath: media?.path,
MediaType: media?.contentType,
MediaUrl: media?.path,
CommandAuthorized: commandAuthorized,
};
if (replyTarget && shouldLogVerbose()) {
const preview = replyTarget.body.replace(/\s+/g, " ").slice(0, 120);
logVerbose(
`telegram reply-context: replyToId=${replyTarget.id} replyToSender=${replyTarget.sender} replyToBody="${preview}"`,
);
}
if (!isGroup) {
const sessionCfg = cfg.session;
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const storePath = resolveStorePath(sessionCfg?.store);
await updateLastRoute({
storePath,
sessionKey: mainKey,
channel: "telegram",
to: String(chatId),
});
}
if (shouldLogVerbose()) {
const preview = body.slice(0, 200).replace(/\n/g, "\\n");
logVerbose(
`telegram inbound: chatId=${chatId} from=${ctxPayload.From} len=${body.length} preview="${preview}"`,
);
}
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
await deliverReplies({
replies: [payload],
chatId: String(chatId),
token: opts.token,
runtime,
bot,
replyToMode,
textLimit,
});
},
onIdle: () => {
typingController?.markDispatchIdle();
},
onError: (err, info) => {
runtime.error?.(
danger(`telegram ${info.kind} reply failed: ${String(err)}`),
);
},
});
const { queuedFinal } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
onReplyStart: sendTyping,
onTypingController: (typing) => {
typingController = typing;
},
},
});
typingController?.markDispatchIdle();
if (!queuedFinal) return;
const allMedia = media
? [{ path: media.path, contentType: media.contentType }]
: [];
await processMessage(ctx, allMedia);
} catch (err) {
runtime.error?.(danger(`handler failed: ${String(err)}`));
}
});
const processMediaGroup = async (entry: MediaGroupEntry) => {
try {
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
const captionMsg = entry.messages.find(
(m) => m.msg.caption || m.msg.text,
);
const primaryEntry = captionMsg ?? entry.messages[0];
const allMedia: Array<{ path: string; contentType?: string }> = [];
for (const { ctx } of entry.messages) {
const media = await resolveMedia(
ctx,
mediaMaxBytes,
opts.token,
opts.proxyFetch,
);
if (media) {
allMedia.push({ path: media.path, contentType: media.contentType });
}
}
await processMessage(primaryEntry.ctx, allMedia);
} catch (err) {
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
}
};
return bot;
}