From ce59e2dd76701a2584c3e1b437a26ce4e89bb849 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 14 Jan 2026 09:11:32 +0000 Subject: [PATCH] refactor(telegram): split bot handlers --- src/telegram/bot-access.ts | 48 + src/telegram/bot-handlers.ts | 269 ++++++ src/telegram/bot-message-context.ts | 465 ++++++++++ src/telegram/bot-message-dispatch.ts | 190 ++++ src/telegram/bot-message.ts | 63 ++ src/telegram/bot-native-commands.ts | 300 +++++++ src/telegram/bot-updates.ts | 54 ++ src/telegram/bot.ts | 1241 ++------------------------ 8 files changed, 1451 insertions(+), 1179 deletions(-) create mode 100644 src/telegram/bot-access.ts create mode 100644 src/telegram/bot-handlers.ts create mode 100644 src/telegram/bot-message-context.ts create mode 100644 src/telegram/bot-message-dispatch.ts create mode 100644 src/telegram/bot-message.ts create mode 100644 src/telegram/bot-native-commands.ts create mode 100644 src/telegram/bot-updates.ts diff --git a/src/telegram/bot-access.ts b/src/telegram/bot-access.ts new file mode 100644 index 000000000..8d2b78c7c --- /dev/null +++ b/src/telegram/bot-access.ts @@ -0,0 +1,48 @@ +export type NormalizedAllowFrom = { + entries: string[]; + entriesLower: string[]; + hasWildcard: boolean; + hasEntries: boolean; +}; + +export const normalizeAllowFrom = ( + list?: Array, +): NormalizedAllowFrom => { + const entries = (list ?? []) + .map((value) => String(value).trim()) + .filter(Boolean); + const hasWildcard = entries.includes("*"); + const normalized = entries + .filter((value) => value !== "*") + .map((value) => value.replace(/^(telegram|tg):/i, "")); + const normalizedLower = normalized.map((value) => value.toLowerCase()); + return { + entries: normalized, + entriesLower: normalizedLower, + hasWildcard, + hasEntries: entries.length > 0, + }; +}; + +export const firstDefined = (...values: Array) => { + for (const value of values) { + if (typeof value !== "undefined") return value; + } + return undefined; +}; + +export const isSenderAllowed = (params: { + allow: NormalizedAllowFrom; + senderId?: string; + senderUsername?: string; +}) => { + const { allow, senderId, senderUsername } = params; + if (!allow.hasEntries) return true; + if (allow.hasWildcard) return true; + if (senderId && allow.entries.includes(senderId)) return true; + const username = senderUsername?.toLowerCase(); + if (!username) return false; + return allow.entriesLower.some( + (entry) => entry === username || entry === `@${username}`, + ); +}; diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts new file mode 100644 index 000000000..6ce0aa900 --- /dev/null +++ b/src/telegram/bot-handlers.ts @@ -0,0 +1,269 @@ +// @ts-nocheck +import { danger, logVerbose } from "../globals.js"; +import { resolveMedia } from "./bot/delivery.js"; +import { resolveTelegramForumThreadId } from "./bot/helpers.js"; +import type { TelegramMessage } from "./bot/types.js"; +import { + firstDefined, + isSenderAllowed, + normalizeAllowFrom, +} from "./bot-access.js"; +import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js"; +import { readTelegramAllowFromStore } from "./pairing-store.js"; + +export const registerTelegramHandlers = ({ + bot, + opts, + runtime, + mediaMaxBytes, + telegramCfg, + groupAllowFrom, + resolveGroupPolicy, + resolveTelegramGroupConfig, + shouldSkipUpdate, + processMessage, + logger, +}) => { + const mediaGroupBuffer = new Map(); + let mediaGroupProcessing: Promise = Promise.resolve(); + + const processMediaGroup = async (entry: MediaGroupEntry) => { + try { + entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); + + const captionMsg = entry.messages.find( + (m) => m.msg.caption || m.msg.text, + ); + const primaryEntry = captionMsg ?? entry.messages[0]; + + const allMedia: Array<{ path: string; contentType?: string }> = []; + for (const { ctx } of entry.messages) { + const media = await resolveMedia( + ctx, + mediaMaxBytes, + opts.token, + opts.proxyFetch, + ); + if (media) { + allMedia.push({ path: media.path, contentType: media.contentType }); + } + } + + const storeAllowFrom = await readTelegramAllowFromStore().catch(() => []); + await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom); + } catch (err) { + runtime.error?.(danger(`media group handler failed: ${String(err)}`)); + } + }; + + bot.on("callback_query", async (ctx) => { + const callback = ctx.callbackQuery; + if (!callback) return; + if (shouldSkipUpdate(ctx)) return; + try { + const data = (callback.data ?? "").trim(); + const callbackMessage = callback.message; + if (!data || !callbackMessage) return; + + const syntheticMessage: TelegramMessage = { + ...callbackMessage, + from: callback.from, + text: data, + caption: undefined, + caption_entities: undefined, + entities: undefined, + }; + const storeAllowFrom = await readTelegramAllowFromStore().catch(() => []); + const getFile = + typeof ctx.getFile === "function" + ? ctx.getFile.bind(ctx) + : async () => ({}); + await processMessage( + { message: syntheticMessage, me: ctx.me, getFile }, + [], + storeAllowFrom, + { forceWasMentioned: true, messageIdOverride: callback.id }, + ); + } catch (err) { + runtime.error?.(danger(`callback handler failed: ${String(err)}`)); + } finally { + await bot.api.answerCallbackQuery(callback.id).catch(() => {}); + } + }); + + bot.on("message", async (ctx) => { + try { + const msg = ctx.message; + if (!msg) return; + if (shouldSkipUpdate(ctx)) return; + + const chatId = msg.chat.id; + const isGroup = + msg.chat.type === "group" || msg.chat.type === "supergroup"; + const messageThreadId = (msg as { message_thread_id?: number }) + .message_thread_id; + const isForum = (msg.chat as { is_forum?: boolean }).is_forum === true; + const resolvedThreadId = resolveTelegramForumThreadId({ + isForum, + messageThreadId, + }); + const storeAllowFrom = await readTelegramAllowFromStore().catch(() => []); + const { groupConfig, topicConfig } = resolveTelegramGroupConfig( + chatId, + resolvedThreadId, + ); + const groupAllowOverride = firstDefined( + topicConfig?.allowFrom, + groupConfig?.allowFrom, + ); + const effectiveGroupAllow = normalizeAllowFrom([ + ...(groupAllowOverride ?? groupAllowFrom ?? []), + ...storeAllowFrom, + ]); + const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; + + if (isGroup) { + if (groupConfig?.enabled === false) { + logVerbose(`Blocked telegram group ${chatId} (group disabled)`); + return; + } + if (topicConfig?.enabled === false) { + logVerbose( + `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, + ); + return; + } + if (hasGroupAllowOverride) { + const senderId = msg.from?.id; + const senderUsername = msg.from?.username ?? ""; + const allowed = + senderId != null && + isSenderAllowed({ + allow: effectiveGroupAllow, + senderId: String(senderId), + senderUsername, + }); + if (!allowed) { + logVerbose( + `Blocked telegram group sender ${senderId ?? "unknown"} (group allowFrom override)`, + ); + return; + } + } + // Group policy filtering: controls how group messages are handled + // - "open": groups bypass allowFrom, only mention-gating applies + // - "disabled": block all group messages entirely + // - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom + const groupPolicy = telegramCfg.groupPolicy ?? "open"; + if (groupPolicy === "disabled") { + logVerbose(`Blocked telegram group message (groupPolicy: disabled)`); + return; + } + if (groupPolicy === "allowlist") { + // For allowlist mode, the sender (msg.from.id) must be in allowFrom + const senderId = msg.from?.id; + if (senderId == null) { + logVerbose( + `Blocked telegram group message (no sender ID, groupPolicy: allowlist)`, + ); + return; + } + if (!effectiveGroupAllow.hasEntries) { + logVerbose( + "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", + ); + return; + } + const senderUsername = msg.from?.username ?? ""; + if ( + !isSenderAllowed({ + allow: effectiveGroupAllow, + senderId: String(senderId), + senderUsername, + }) + ) { + logVerbose( + `Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`, + ); + return; + } + } + + // Group allowlist based on configured group IDs. + const groupAllowlist = resolveGroupPolicy(chatId); + if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { + logger.info( + { chatId, title: msg.chat.title, reason: "not-allowed" }, + "skipping group message", + ); + return; + } + } + + // Media group handling - buffer multi-image messages + const mediaGroupId = (msg as { media_group_id?: string }).media_group_id; + if (mediaGroupId) { + const existing = mediaGroupBuffer.get(mediaGroupId); + if (existing) { + clearTimeout(existing.timer); + existing.messages.push({ msg, ctx }); + existing.timer = setTimeout(async () => { + mediaGroupBuffer.delete(mediaGroupId); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(existing); + }) + .catch(() => undefined); + await mediaGroupProcessing; + }, MEDIA_GROUP_TIMEOUT_MS); + } else { + const entry: MediaGroupEntry = { + messages: [{ msg, ctx }], + timer: setTimeout(async () => { + mediaGroupBuffer.delete(mediaGroupId); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(entry); + }) + .catch(() => undefined); + await mediaGroupProcessing; + }, MEDIA_GROUP_TIMEOUT_MS), + }; + mediaGroupBuffer.set(mediaGroupId, entry); + } + return; + } + + let media: Awaited> = null; + try { + media = await resolveMedia( + ctx, + mediaMaxBytes, + opts.token, + opts.proxyFetch, + ); + } catch (mediaErr) { + const errMsg = String(mediaErr); + if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) { + const limitMb = Math.round(mediaMaxBytes / (1024 * 1024)); + await bot.api + .sendMessage( + chatId, + `⚠️ File too large. Maximum size is ${limitMb}MB.`, + { reply_to_message_id: msg.message_id }, + ) + .catch(() => {}); + logger.warn({ chatId, error: errMsg }, "media exceeds size limit"); + return; + } + throw mediaErr; + } + const allMedia = media + ? [{ path: media.path, contentType: media.contentType }] + : []; + await processMessage(ctx, allMedia, storeAllowFrom); + } catch (err) { + runtime.error?.(danger(`handler failed: ${String(err)}`)); + } + }); +}; diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts new file mode 100644 index 000000000..618f57119 --- /dev/null +++ b/src/telegram/bot-message-context.ts @@ -0,0 +1,465 @@ +// @ts-nocheck +import { resolveAckReaction } from "../agents/identity.js"; +import { hasControlCommand } from "../auto-reply/command-detection.js"; +import { normalizeCommandBody } from "../auto-reply/commands-registry.js"; +import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { buildHistoryContextFromMap } from "../auto-reply/reply/history.js"; +import { + buildMentionRegexes, + matchesMentionPatterns, +} from "../auto-reply/reply/mentions.js"; +import { formatLocationText, toLocationContext } from "../channels/location.js"; +import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; +import { logVerbose, shouldLogVerbose } from "../globals.js"; +import { recordChannelActivity } from "../infra/channel-activity.js"; +import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { + buildGroupFromLabel, + buildGroupLabel, + buildSenderLabel, + buildSenderName, + buildTelegramGroupFrom, + buildTelegramGroupPeerId, + buildTelegramThreadParams, + describeReplyTarget, + extractTelegramLocation, + hasBotMention, + resolveTelegramForumThreadId, +} from "./bot/helpers.js"; +import { + firstDefined, + isSenderAllowed, + normalizeAllowFrom, +} from "./bot-access.js"; +import { upsertTelegramPairingRequest } from "./pairing-store.js"; + +export const buildTelegramMessageContext = async ({ + primaryCtx, + allMedia, + storeAllowFrom, + options, + bot, + cfg, + account, + historyLimit, + groupHistories, + dmPolicy, + allowFrom, + groupAllowFrom, + ackReactionScope, + logger, + resolveGroupActivation, + resolveGroupRequireMention, + resolveTelegramGroupConfig, +}) => { + const msg = primaryCtx.message; + recordChannelActivity({ + channel: "telegram", + accountId: account.accountId, + direction: "inbound", + }); + const chatId = msg.chat.id; + const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; + const messageThreadId = (msg as { message_thread_id?: number }) + .message_thread_id; + const isForum = (msg.chat as { is_forum?: boolean }).is_forum === true; + const resolvedThreadId = resolveTelegramForumThreadId({ + isForum, + messageThreadId, + }); + const { groupConfig, topicConfig } = resolveTelegramGroupConfig( + chatId, + resolvedThreadId, + ); + const peerId = isGroup + ? buildTelegramGroupPeerId(chatId, resolvedThreadId) + : String(chatId); + const route = resolveAgentRoute({ + cfg, + channel: "telegram", + accountId: account.accountId, + peer: { + kind: isGroup ? "group" : "dm", + id: peerId, + }, + }); + const mentionRegexes = buildMentionRegexes(cfg, route.agentId); + const effectiveDmAllow = normalizeAllowFrom([ + ...(allowFrom ?? []), + ...storeAllowFrom, + ]); + const groupAllowOverride = firstDefined( + topicConfig?.allowFrom, + groupConfig?.allowFrom, + ); + const effectiveGroupAllow = normalizeAllowFrom([ + ...(groupAllowOverride ?? groupAllowFrom ?? []), + ...storeAllowFrom, + ]); + const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; + + if (isGroup && groupConfig?.enabled === false) { + logVerbose(`Blocked telegram group ${chatId} (group disabled)`); + return null; + } + if (isGroup && topicConfig?.enabled === false) { + logVerbose( + `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, + ); + return null; + } + + const sendTyping = async () => { + try { + await bot.api.sendChatAction( + chatId, + "typing", + buildTelegramThreadParams(resolvedThreadId), + ); + } catch (err) { + logVerbose( + `telegram typing cue failed for chat ${chatId}: ${String(err)}`, + ); + } + }; + + // DM access control (secure defaults): "pairing" (default) / "allowlist" / "open" / "disabled" + if (!isGroup) { + if (dmPolicy === "disabled") return null; + + if (dmPolicy !== "open") { + const candidate = String(chatId); + const senderUsername = msg.from?.username ?? ""; + const allowed = + effectiveDmAllow.hasWildcard || + (effectiveDmAllow.hasEntries && + isSenderAllowed({ + allow: effectiveDmAllow, + senderId: candidate, + senderUsername, + })); + if (!allowed) { + if (dmPolicy === "pairing") { + try { + const from = msg.from as + | { + first_name?: string; + last_name?: string; + username?: string; + id?: number; + } + | undefined; + const telegramUserId = from?.id ? String(from.id) : candidate; + const { code, created } = await upsertTelegramPairingRequest({ + chatId: candidate, + username: from?.username, + firstName: from?.first_name, + lastName: from?.last_name, + }); + if (created) { + logger.info( + { + chatId: candidate, + username: from?.username, + firstName: from?.first_name, + lastName: from?.last_name, + }, + "telegram pairing request", + ); + await bot.api.sendMessage( + chatId, + [ + "Clawdbot: access not configured.", + "", + `Your Telegram user id: ${telegramUserId}`, + "", + `Pairing code: ${code}`, + "", + "Ask the bot owner to approve with:", + "clawdbot pairing approve telegram ", + ].join("\n"), + ); + } + } catch (err) { + logVerbose( + `telegram pairing reply failed for chat ${chatId}: ${String(err)}`, + ); + } + } else { + logVerbose( + `Blocked unauthorized telegram sender ${candidate} (dmPolicy=${dmPolicy})`, + ); + } + return null; + } + } + } + + const botUsername = primaryCtx.me?.username?.toLowerCase(); + const senderId = msg.from?.id ? String(msg.from.id) : ""; + const senderUsername = msg.from?.username ?? ""; + if (isGroup && hasGroupAllowOverride) { + const allowed = isSenderAllowed({ + allow: effectiveGroupAllow, + senderId, + senderUsername, + }); + if (!allowed) { + logVerbose( + `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, + ); + return null; + } + } + const commandAuthorized = isSenderAllowed({ + allow: isGroup ? effectiveGroupAllow : effectiveDmAllow, + senderId, + senderUsername, + }); + const computedWasMentioned = + (Boolean(botUsername) && hasBotMention(msg, botUsername)) || + matchesMentionPatterns(msg.text ?? msg.caption ?? "", mentionRegexes); + const wasMentioned = + options?.forceWasMentioned === true ? true : computedWasMentioned; + const hasAnyMention = (msg.entities ?? msg.caption_entities ?? []).some( + (ent) => ent.type === "mention", + ); + const activationOverride = resolveGroupActivation({ + chatId, + messageThreadId: resolvedThreadId, + sessionKey: route.sessionKey, + agentId: route.agentId, + }); + const baseRequireMention = resolveGroupRequireMention(chatId); + const requireMention = firstDefined( + activationOverride, + topicConfig?.requireMention, + groupConfig?.requireMention, + baseRequireMention, + ); + const shouldBypassMention = + isGroup && + requireMention && + !wasMentioned && + !hasAnyMention && + commandAuthorized && + hasControlCommand(msg.text ?? msg.caption ?? "", cfg, { botUsername }); + const effectiveWasMentioned = wasMentioned || shouldBypassMention; + const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0; + if (isGroup && requireMention && canDetectMention) { + if (!wasMentioned && !shouldBypassMention) { + logger.info({ chatId, reason: "no-mention" }, "skipping group message"); + return null; + } + } + + // ACK reactions + const ackReaction = resolveAckReaction(cfg, route.agentId); + const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false; + const shouldAckReaction = () => { + if (!ackReaction) return false; + if (ackReactionScope === "all") return true; + if (ackReactionScope === "direct") return !isGroup; + if (ackReactionScope === "group-all") return isGroup; + if (ackReactionScope === "group-mentions") { + if (!isGroup) return false; + if (!requireMention) return false; + if (!canDetectMention) return false; + return wasMentioned || shouldBypassMention; + } + return false; + }; + const api = bot.api as unknown as { + setMessageReaction?: ( + chatId: number | string, + messageId: number, + reactions: Array<{ type: "emoji"; emoji: string }>, + ) => Promise; + }; + const reactionApi = + typeof api.setMessageReaction === "function" + ? api.setMessageReaction.bind(api) + : null; + const ackReactionPromise = + shouldAckReaction() && msg.message_id && reactionApi + ? reactionApi(chatId, msg.message_id, [ + { type: "emoji", emoji: ackReaction }, + ]).then( + () => true, + (err) => { + logVerbose( + `telegram react failed for chat ${chatId}: ${String(err)}`, + ); + return false; + }, + ) + : null; + + let placeholder = ""; + if (msg.photo) placeholder = ""; + else if (msg.video) placeholder = ""; + else if (msg.audio || msg.voice) placeholder = ""; + else if (msg.document) placeholder = ""; + + const replyTarget = describeReplyTarget(msg); + const locationData = extractTelegramLocation(msg); + const locationText = locationData + ? formatLocationText(locationData) + : undefined; + const rawText = (msg.text ?? msg.caption ?? "").trim(); + let rawBody = [rawText, locationText].filter(Boolean).join("\n").trim(); + if (!rawBody) rawBody = placeholder; + if (!rawBody && allMedia.length === 0) return null; + + let bodyText = rawBody; + if (!bodyText && allMedia.length > 0) { + bodyText = `${allMedia.length > 1 ? ` (${allMedia.length} images)` : ""}`; + } + + const replySuffix = replyTarget + ? `\n\n[Replying to ${replyTarget.sender}${ + replyTarget.id ? ` id:${replyTarget.id}` : "" + }]\n${replyTarget.body}\n[/Replying]` + : ""; + const groupLabel = isGroup + ? buildGroupLabel(msg, chatId, resolvedThreadId) + : undefined; + const body = formatAgentEnvelope({ + channel: "Telegram", + from: isGroup + ? buildGroupFromLabel(msg, chatId, senderId, resolvedThreadId) + : buildSenderLabel(msg, senderId || chatId), + timestamp: msg.date ? msg.date * 1000 : undefined, + body: `${bodyText}${replySuffix}`, + }); + let combinedBody = body; + const historyKey = isGroup + ? buildTelegramGroupPeerId(chatId, resolvedThreadId) + : undefined; + if (isGroup && historyKey && historyLimit > 0) { + combinedBody = buildHistoryContextFromMap({ + historyMap: groupHistories, + historyKey, + limit: historyLimit, + entry: { + sender: buildSenderLabel(msg, senderId || chatId), + body: rawBody, + timestamp: msg.date ? msg.date * 1000 : undefined, + messageId: + typeof msg.message_id === "number" + ? String(msg.message_id) + : undefined, + }, + currentMessage: combinedBody, + formatEntry: (entry) => + formatAgentEnvelope({ + channel: "Telegram", + from: groupLabel ?? `group:${chatId}`, + timestamp: entry.timestamp, + body: `${entry.sender}: ${entry.body} [id:${entry.messageId ?? "unknown"} chat:${chatId}]`, + }), + }); + } + + const skillFilter = firstDefined(topicConfig?.skills, groupConfig?.skills); + const systemPromptParts = [ + groupConfig?.systemPrompt?.trim() || null, + topicConfig?.systemPrompt?.trim() || null, + ].filter((entry): entry is string => Boolean(entry)); + const groupSystemPrompt = + systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined; + const commandBody = normalizeCommandBody(rawBody, { botUsername }); + const ctxPayload = { + Body: combinedBody, + RawBody: rawBody, + CommandBody: commandBody, + From: isGroup + ? buildTelegramGroupFrom(chatId, resolvedThreadId) + : `telegram:${chatId}`, + To: `telegram:${chatId}`, + SessionKey: route.sessionKey, + AccountId: route.accountId, + ChatType: isGroup ? "group" : "direct", + GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined, + GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined, + SenderName: buildSenderName(msg), + SenderId: senderId || undefined, + SenderUsername: senderUsername || undefined, + Provider: "telegram", + Surface: "telegram", + MessageSid: options?.messageIdOverride ?? String(msg.message_id), + ReplyToId: replyTarget?.id, + ReplyToBody: replyTarget?.body, + ReplyToSender: replyTarget?.sender, + Timestamp: msg.date ? msg.date * 1000 : undefined, + WasMentioned: isGroup ? effectiveWasMentioned : undefined, + MediaPath: allMedia[0]?.path, + MediaType: allMedia[0]?.contentType, + MediaUrl: allMedia[0]?.path, + MediaPaths: allMedia.length > 0 ? allMedia.map((m) => m.path) : undefined, + MediaUrls: allMedia.length > 0 ? allMedia.map((m) => m.path) : undefined, + MediaTypes: + allMedia.length > 0 + ? (allMedia.map((m) => m.contentType).filter(Boolean) as string[]) + : undefined, + ...(locationData ? toLocationContext(locationData) : undefined), + CommandAuthorized: commandAuthorized, + MessageThreadId: resolvedThreadId, + IsForum: isForum, + // Originating channel for reply routing. + OriginatingChannel: "telegram" as const, + OriginatingTo: `telegram:${chatId}`, + }; + + if (replyTarget && shouldLogVerbose()) { + const preview = replyTarget.body.replace(/\s+/g, " ").slice(0, 120); + logVerbose( + `telegram reply-context: replyToId=${replyTarget.id} replyToSender=${replyTarget.sender} replyToBody="${preview}"`, + ); + } + + if (!isGroup) { + const sessionCfg = cfg.session; + const storePath = resolveStorePath(sessionCfg?.store, { + agentId: route.agentId, + }); + await updateLastRoute({ + storePath, + sessionKey: route.mainSessionKey, + channel: "telegram", + to: String(chatId), + accountId: route.accountId, + }); + } + + if (shouldLogVerbose()) { + const preview = body.slice(0, 200).replace(/\n/g, "\\n"); + const mediaInfo = + allMedia.length > 1 ? ` mediaCount=${allMedia.length}` : ""; + const topicInfo = + resolvedThreadId != null ? ` topic=${resolvedThreadId}` : ""; + logVerbose( + `telegram inbound: chatId=${chatId} from=${ctxPayload.From} len=${body.length}${mediaInfo}${topicInfo} preview="${preview}"`, + ); + } + + return { + ctxPayload, + primaryCtx, + msg, + chatId, + isGroup, + resolvedThreadId, + isForum, + historyKey, + historyLimit, + groupHistories, + route, + skillFilter, + sendTyping, + ackReactionPromise, + reactionApi, + removeAckAfterReply, + accountId: account.accountId, + }; +}; diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts new file mode 100644 index 000000000..7c8c69bfb --- /dev/null +++ b/src/telegram/bot-message-dispatch.ts @@ -0,0 +1,190 @@ +// @ts-nocheck +import { resolveEffectiveMessagesConfig } from "../agents/identity.js"; +import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; +import { clearHistoryEntries } from "../auto-reply/reply/history.js"; +import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; +import { danger, logVerbose } from "../globals.js"; +import { deliverReplies } from "./bot/delivery.js"; +import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js"; +import { createTelegramDraftStream } from "./draft-stream.js"; + +export const dispatchTelegramMessage = async ({ + context, + bot, + cfg, + runtime, + replyToMode, + streamMode, + textLimit, + telegramCfg, + opts, + resolveBotTopicsEnabled, +}) => { + const { + ctxPayload, + primaryCtx, + msg, + chatId, + isGroup, + resolvedThreadId, + historyKey, + historyLimit, + groupHistories, + route, + skillFilter, + sendTyping, + ackReactionPromise, + reactionApi, + removeAckAfterReply, + } = context; + + const isPrivateChat = msg.chat.type === "private"; + const draftMaxChars = Math.min(textLimit, 4096); + const canStreamDraft = + streamMode !== "off" && + isPrivateChat && + typeof resolvedThreadId === "number" && + (await resolveBotTopicsEnabled(primaryCtx)); + const draftStream = canStreamDraft + ? createTelegramDraftStream({ + api: bot.api, + chatId, + draftId: msg.message_id || Date.now(), + maxChars: draftMaxChars, + messageThreadId: resolvedThreadId, + log: logVerbose, + warn: logVerbose, + }) + : undefined; + const draftChunking = + draftStream && streamMode === "block" + ? resolveTelegramDraftStreamingChunking(cfg, route.accountId) + : undefined; + const draftChunker = draftChunking + ? new EmbeddedBlockChunker(draftChunking) + : undefined; + let lastPartialText = ""; + let draftText = ""; + const updateDraftFromPartial = (text?: string) => { + if (!draftStream || !text) return; + if (text === lastPartialText) return; + if (streamMode === "partial") { + lastPartialText = text; + draftStream.update(text); + return; + } + let delta = text; + if (text.startsWith(lastPartialText)) { + delta = text.slice(lastPartialText.length); + } else { + // Streaming buffer reset (or non-monotonic stream). Start fresh. + draftChunker?.reset(); + draftText = ""; + } + lastPartialText = text; + if (!delta) return; + if (!draftChunker) { + draftText = text; + draftStream.update(draftText); + return; + } + draftChunker.append(delta); + draftChunker.drain({ + force: false, + emit: (chunk) => { + draftText += chunk; + draftStream.update(draftText); + }, + }); + }; + const flushDraft = async () => { + if (!draftStream) return; + if (draftChunker?.hasBuffered()) { + draftChunker.drain({ + force: true, + emit: (chunk) => { + draftText += chunk; + }, + }); + draftChunker.reset(); + if (draftText) draftStream.update(draftText); + } + await draftStream.flush(); + }; + + const disableBlockStreaming = + Boolean(draftStream) || + (typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : undefined); + + let didSendReply = false; + const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg, + dispatcherOptions: { + responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) + .responsePrefix, + deliver: async (payload, info) => { + if (info.kind === "final") { + await flushDraft(); + draftStream?.stop(); + } + await deliverReplies({ + replies: [payload], + chatId: String(chatId), + token: opts.token, + runtime, + bot, + replyToMode, + textLimit, + messageThreadId: resolvedThreadId, + }); + didSendReply = true; + }, + onError: (err, info) => { + runtime.error?.( + danger(`telegram ${info.kind} reply failed: ${String(err)}`), + ); + }, + onReplyStart: sendTyping, + }, + replyOptions: { + skillFilter, + onPartialReply: draftStream + ? (payload) => updateDraftFromPartial(payload.text) + : undefined, + onReasoningStream: draftStream + ? (payload) => { + if (payload.text) draftStream.update(payload.text); + } + : undefined, + disableBlockStreaming, + }, + }); + draftStream?.stop(); + if (!queuedFinal) { + if (isGroup && historyKey && historyLimit > 0 && didSendReply) { + clearHistoryEntries({ historyMap: groupHistories, historyKey }); + } + return; + } + if ( + removeAckAfterReply && + ackReactionPromise && + msg.message_id && + reactionApi + ) { + void ackReactionPromise.then((didAck) => { + if (!didAck) return; + reactionApi(chatId, msg.message_id, []).catch((err) => { + logVerbose( + `telegram: failed to remove ack reaction from ${chatId}/${msg.message_id}: ${String(err)}`, + ); + }); + }); + } + if (isGroup && historyKey && historyLimit > 0 && didSendReply) { + clearHistoryEntries({ historyMap: groupHistories, historyKey }); + } +}; diff --git a/src/telegram/bot-message.ts b/src/telegram/bot-message.ts new file mode 100644 index 000000000..313296b1d --- /dev/null +++ b/src/telegram/bot-message.ts @@ -0,0 +1,63 @@ +// @ts-nocheck +import { buildTelegramMessageContext } from "./bot-message-context.js"; +import { dispatchTelegramMessage } from "./bot-message-dispatch.js"; + +export const createTelegramMessageProcessor = (deps) => { + const { + bot, + cfg, + account, + telegramCfg, + historyLimit, + groupHistories, + dmPolicy, + allowFrom, + groupAllowFrom, + ackReactionScope, + logger, + resolveGroupActivation, + resolveGroupRequireMention, + resolveTelegramGroupConfig, + runtime, + replyToMode, + streamMode, + textLimit, + opts, + resolveBotTopicsEnabled, + } = deps; + + return async (primaryCtx, allMedia, storeAllowFrom, options) => { + const context = await buildTelegramMessageContext({ + primaryCtx, + allMedia, + storeAllowFrom, + options, + bot, + cfg, + account, + historyLimit, + groupHistories, + dmPolicy, + allowFrom, + groupAllowFrom, + ackReactionScope, + logger, + resolveGroupActivation, + resolveGroupRequireMention, + resolveTelegramGroupConfig, + }); + if (!context) return; + await dispatchTelegramMessage({ + context, + bot, + cfg, + runtime, + replyToMode, + streamMode, + textLimit, + telegramCfg, + opts, + resolveBotTopicsEnabled, + }); + }; +}; diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts new file mode 100644 index 000000000..91ffe2c8f --- /dev/null +++ b/src/telegram/bot-native-commands.ts @@ -0,0 +1,300 @@ +// @ts-nocheck + +import { resolveEffectiveMessagesConfig } from "../agents/identity.js"; +import { + buildCommandText, + listNativeCommandSpecsForConfig, +} from "../auto-reply/commands-registry.js"; +import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; +import { danger, logVerbose } from "../globals.js"; +import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { deliverReplies } from "./bot/delivery.js"; +import { + buildSenderName, + buildTelegramGroupFrom, + buildTelegramGroupPeerId, + resolveTelegramForumThreadId, +} from "./bot/helpers.js"; +import { + firstDefined, + isSenderAllowed, + normalizeAllowFrom, +} from "./bot-access.js"; +import { readTelegramAllowFromStore } from "./pairing-store.js"; + +export const registerTelegramNativeCommands = ({ + bot, + cfg, + runtime, + accountId, + telegramCfg, + allowFrom, + groupAllowFrom, + replyToMode, + textLimit, + useAccessGroups, + nativeEnabled, + nativeDisabledExplicit, + resolveGroupPolicy, + resolveTelegramGroupConfig, + shouldSkipUpdate, + opts, +}) => { + const nativeCommands = nativeEnabled + ? listNativeCommandSpecsForConfig(cfg) + : []; + if (nativeCommands.length > 0) { + const api = bot.api as unknown as { + setMyCommands?: ( + commands: Array<{ command: string; description: string }>, + ) => Promise; + }; + if (typeof api.setMyCommands === "function") { + api + .setMyCommands( + nativeCommands.map((command) => ({ + command: command.name, + description: command.description, + })), + ) + .catch((err) => { + runtime.error?.( + danger(`telegram setMyCommands failed: ${String(err)}`), + ); + }); + } else { + logVerbose("telegram: setMyCommands unavailable; skipping registration"); + } + + if ( + typeof (bot as unknown as { command?: unknown }).command !== "function" + ) { + logVerbose("telegram: bot.command unavailable; skipping native handlers"); + } else { + for (const command of nativeCommands) { + bot.command(command.name, async (ctx) => { + const msg = ctx.message; + if (!msg) return; + if (shouldSkipUpdate(ctx)) return; + const chatId = msg.chat.id; + const isGroup = + msg.chat.type === "group" || msg.chat.type === "supergroup"; + const messageThreadId = (msg as { message_thread_id?: number }) + .message_thread_id; + const isForum = + (msg.chat as { is_forum?: boolean }).is_forum === true; + const resolvedThreadId = resolveTelegramForumThreadId({ + isForum, + messageThreadId, + }); + const storeAllowFrom = await readTelegramAllowFromStore().catch( + () => [], + ); + const { groupConfig, topicConfig } = resolveTelegramGroupConfig( + chatId, + resolvedThreadId, + ); + const groupAllowOverride = firstDefined( + topicConfig?.allowFrom, + groupConfig?.allowFrom, + ); + const effectiveGroupAllow = normalizeAllowFrom([ + ...(groupAllowOverride ?? groupAllowFrom ?? []), + ...storeAllowFrom, + ]); + const hasGroupAllowOverride = + typeof groupAllowOverride !== "undefined"; + + if (isGroup && groupConfig?.enabled === false) { + await bot.api.sendMessage(chatId, "This group is disabled."); + return; + } + if (isGroup && topicConfig?.enabled === false) { + await bot.api.sendMessage(chatId, "This topic is disabled."); + return; + } + if (isGroup && hasGroupAllowOverride) { + const senderId = msg.from?.id; + const senderUsername = msg.from?.username ?? ""; + if ( + senderId == null || + !isSenderAllowed({ + allow: effectiveGroupAllow, + senderId: String(senderId), + senderUsername, + }) + ) { + await bot.api.sendMessage( + chatId, + "You are not authorized to use this command.", + ); + return; + } + } + + if (isGroup && useAccessGroups) { + const groupPolicy = telegramCfg.groupPolicy ?? "open"; + if (groupPolicy === "disabled") { + await bot.api.sendMessage( + chatId, + "Telegram group commands are disabled.", + ); + return; + } + if (groupPolicy === "allowlist") { + const senderId = msg.from?.id; + if (senderId == null) { + await bot.api.sendMessage( + chatId, + "You are not authorized to use this command.", + ); + return; + } + const senderUsername = msg.from?.username ?? ""; + if ( + !isSenderAllowed({ + allow: effectiveGroupAllow, + senderId: String(senderId), + senderUsername, + }) + ) { + await bot.api.sendMessage( + chatId, + "You are not authorized to use this command.", + ); + return; + } + } + const groupAllowlist = resolveGroupPolicy(chatId); + if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { + await bot.api.sendMessage(chatId, "This group is not allowed."); + return; + } + } + + const allowFromList = Array.isArray(allowFrom) + ? allowFrom.map((entry) => String(entry).trim()).filter(Boolean) + : []; + const senderId = msg.from?.id ? String(msg.from.id) : ""; + const senderUsername = msg.from?.username ?? ""; + const commandAuthorized = + allowFromList.length === 0 || + allowFromList.includes("*") || + (senderId && allowFromList.includes(senderId)) || + (senderId && allowFromList.includes(`telegram:${senderId}`)) || + (senderUsername && + allowFromList.some( + (entry) => + entry.toLowerCase() === senderUsername.toLowerCase() || + entry.toLowerCase() === `@${senderUsername.toLowerCase()}`, + )); + if (!commandAuthorized) { + await bot.api.sendMessage( + chatId, + "You are not authorized to use this command.", + ); + return; + } + + const prompt = buildCommandText(command.name, ctx.match ?? ""); + const route = resolveAgentRoute({ + cfg, + channel: "telegram", + accountId, + peer: { + kind: isGroup ? "group" : "dm", + id: isGroup + ? buildTelegramGroupPeerId(chatId, resolvedThreadId) + : String(chatId), + }, + }); + const skillFilter = firstDefined( + topicConfig?.skills, + groupConfig?.skills, + ); + const systemPromptParts = [ + groupConfig?.systemPrompt?.trim() || null, + topicConfig?.systemPrompt?.trim() || null, + ].filter((entry): entry is string => Boolean(entry)); + const groupSystemPrompt = + systemPromptParts.length > 0 + ? systemPromptParts.join("\n\n") + : undefined; + const ctxPayload = { + Body: prompt, + From: isGroup + ? buildTelegramGroupFrom(chatId, resolvedThreadId) + : `telegram:${chatId}`, + To: `slash:${senderId || chatId}`, + ChatType: isGroup ? "group" : "direct", + GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined, + GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined, + SenderName: buildSenderName(msg), + SenderId: senderId || undefined, + SenderUsername: senderUsername || undefined, + Surface: "telegram", + MessageSid: String(msg.message_id), + Timestamp: msg.date ? msg.date * 1000 : undefined, + WasMentioned: true, + CommandAuthorized: commandAuthorized, + CommandSource: "native" as const, + SessionKey: `telegram:slash:${senderId || chatId}`, + CommandTargetSessionKey: route.sessionKey, + MessageThreadId: resolvedThreadId, + IsForum: isForum, + }; + + const disableBlockStreaming = + typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : undefined; + + await dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg, + dispatcherOptions: { + responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) + .responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + chatId: String(chatId), + token: opts.token, + runtime, + bot, + replyToMode, + textLimit, + messageThreadId: resolvedThreadId, + }); + }, + onError: (err, info) => { + runtime.error?.( + danger( + `telegram slash ${info.kind} reply failed: ${String(err)}`, + ), + ); + }, + }, + replyOptions: { + skillFilter, + disableBlockStreaming, + }, + }); + }); + } + } + } else if (nativeDisabledExplicit) { + const api = bot.api as unknown as { + setMyCommands?: (commands: []) => Promise; + }; + if (typeof api.setMyCommands === "function") { + api.setMyCommands([]).catch((err) => { + runtime.error?.( + danger(`telegram clear commands failed: ${String(err)}`), + ); + }); + } else { + logVerbose("telegram: setMyCommands unavailable; skipping clear"); + } + } +}; diff --git a/src/telegram/bot-updates.ts b/src/telegram/bot-updates.ts new file mode 100644 index 000000000..165c2b3c1 --- /dev/null +++ b/src/telegram/bot-updates.ts @@ -0,0 +1,54 @@ +import { createDedupeCache } from "../infra/dedupe.js"; +import type { TelegramContext, TelegramMessage } from "./bot/types.js"; + +const MEDIA_GROUP_TIMEOUT_MS = 500; +const RECENT_TELEGRAM_UPDATE_TTL_MS = 5 * 60_000; +const RECENT_TELEGRAM_UPDATE_MAX = 2000; + +export type MediaGroupEntry = { + messages: Array<{ + msg: TelegramMessage; + ctx: TelegramContext; + }>; + timer: ReturnType; +}; + +export type TelegramUpdateKeyContext = { + update?: { + update_id?: number; + message?: TelegramMessage; + edited_message?: TelegramMessage; + }; + update_id?: number; + message?: TelegramMessage; + callbackQuery?: { id?: string; message?: TelegramMessage }; +}; + +export const resolveTelegramUpdateId = (ctx: TelegramUpdateKeyContext) => + ctx.update?.update_id ?? ctx.update_id; + +export const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => { + const updateId = resolveTelegramUpdateId(ctx); + if (typeof updateId === "number") return `update:${updateId}`; + const callbackId = ctx.callbackQuery?.id; + if (callbackId) return `callback:${callbackId}`; + const msg = + ctx.message ?? + ctx.update?.message ?? + ctx.update?.edited_message ?? + ctx.callbackQuery?.message; + const chatId = msg?.chat?.id; + const messageId = msg?.message_id; + if (typeof chatId !== "undefined" && typeof messageId === "number") { + return `message:${chatId}:${messageId}`; + } + return undefined; +}; + +export const createTelegramUpdateDedupe = () => + createDedupeCache({ + ttlMs: RECENT_TELEGRAM_UPDATE_TTL_MS, + maxSize: RECENT_TELEGRAM_UPDATE_MAX, + }); + +export { MEDIA_GROUP_TIMEOUT_MS }; diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 442c8b93e..d8504abe2 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -4,31 +4,11 @@ import { apiThrottler } from "@grammyjs/transformer-throttler"; import type { ApiClientOptions } from "grammy"; import { Bot, webhookCallback } from "grammy"; import { resolveDefaultAgentId } from "../agents/agent-scope.js"; -import { - resolveAckReaction, - resolveEffectiveMessagesConfig, -} from "../agents/identity.js"; -import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; import { resolveTextChunkLimit } from "../auto-reply/chunk.js"; -import { hasControlCommand } from "../auto-reply/command-detection.js"; import { - buildCommandText, - listNativeCommandSpecsForConfig, - normalizeCommandBody, -} from "../auto-reply/commands-registry.js"; -import { formatAgentEnvelope } from "../auto-reply/envelope.js"; -import { - buildHistoryContextFromMap, - clearHistoryEntries, DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry, } from "../auto-reply/reply/history.js"; -import { - buildMentionRegexes, - matchesMentionPatterns, -} from "../auto-reply/reply/mentions.js"; -import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; -import { formatLocationText, toLocationContext } from "../channels/location.js"; import { isNativeCommandsExplicitlyDisabled, resolveNativeCommandsEnabled, @@ -39,93 +19,26 @@ import { resolveChannelGroupPolicy, resolveChannelGroupRequireMention, } from "../config/group-policy.js"; -import { - loadSessionStore, - resolveStorePath, - updateLastRoute, -} from "../config/sessions.js"; -import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; -import { recordChannelActivity } from "../infra/channel-activity.js"; -import { createDedupeCache } from "../infra/dedupe.js"; +import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; +import { logVerbose, shouldLogVerbose } from "../globals.js"; import { getChildLogger } from "../logging.js"; -import { resolveAgentRoute } from "../routing/resolve-route.js"; import type { RuntimeEnv } from "../runtime.js"; import { resolveTelegramAccount } from "./accounts.js"; -import { deliverReplies, resolveMedia } from "./bot/delivery.js"; import { - buildGroupFromLabel, - buildGroupLabel, - buildSenderLabel, - buildSenderName, - buildTelegramGroupFrom, - buildTelegramGroupPeerId, - buildTelegramThreadParams, - describeReplyTarget, - extractTelegramLocation, - hasBotMention, resolveTelegramForumThreadId, resolveTelegramStreamMode, } from "./bot/helpers.js"; import type { TelegramContext, TelegramMessage } from "./bot/types.js"; -import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js"; -import { createTelegramDraftStream } from "./draft-stream.js"; -import { resolveTelegramFetch } from "./fetch.js"; +import { registerTelegramHandlers } from "./bot-handlers.js"; +import { createTelegramMessageProcessor } from "./bot-message.js"; +import { registerTelegramNativeCommands } from "./bot-native-commands.js"; import { - readTelegramAllowFromStore, - upsertTelegramPairingRequest, -} from "./pairing-store.js"; - -// Media group aggregation - Telegram sends multi-image messages as separate updates -// with a shared media_group_id. We buffer them and process as a single message after a short delay. -const MEDIA_GROUP_TIMEOUT_MS = 500; -const RECENT_TELEGRAM_UPDATE_TTL_MS = 5 * 60_000; -const RECENT_TELEGRAM_UPDATE_MAX = 2000; - -type MediaGroupEntry = { - messages: Array<{ - msg: TelegramMessage; - ctx: TelegramContext; - }>; - timer: ReturnType; -}; - -type TelegramUpdateKeyContext = { - update?: { - update_id?: number; - message?: TelegramMessage; - edited_message?: TelegramMessage; - }; - update_id?: number; - message?: TelegramMessage; - callbackQuery?: { id?: string; message?: TelegramMessage }; -}; - -const resolveTelegramUpdateId = (ctx: TelegramUpdateKeyContext) => - ctx.update?.update_id ?? ctx.update_id; - -const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => { - const updateId = resolveTelegramUpdateId(ctx); - if (typeof updateId === "number") return `update:${updateId}`; - const callbackId = ctx.callbackQuery?.id; - if (callbackId) return `callback:${callbackId}`; - const msg = - ctx.message ?? - ctx.update?.message ?? - ctx.update?.edited_message ?? - ctx.callbackQuery?.message; - const chatId = msg?.chat?.id; - const messageId = msg?.message_id; - if (typeof chatId !== "undefined" && typeof messageId === "number") { - return `message:${chatId}:${messageId}`; - } - return undefined; -}; - -const createTelegramUpdateDedupe = () => - createDedupeCache({ - ttlMs: RECENT_TELEGRAM_UPDATE_TTL_MS, - maxSize: RECENT_TELEGRAM_UPDATE_MAX, - }); + buildTelegramUpdateKey, + createTelegramUpdateDedupe, + resolveTelegramUpdateId, + type TelegramUpdateKeyContext, +} from "./bot-updates.js"; +import { resolveTelegramFetch } from "./fetch.js"; export type TelegramBotOptions = { token: string; @@ -225,9 +138,6 @@ export function createTelegramBot(opts: TelegramBotOptions) { recordUpdateId(ctx); }); - const mediaGroupBuffer = new Map(); - let mediaGroupProcessing: Promise = Promise.resolve(); - const cfg = opts.config ?? loadConfig(); const account = resolveTelegramAccount({ cfg, @@ -251,43 +161,6 @@ export function createTelegramBot(opts: TelegramBotOptions) { ? telegramCfg.allowFrom : undefined) ?? (opts.allowFrom && opts.allowFrom.length > 0 ? opts.allowFrom : undefined); - const normalizeAllowFrom = (list?: Array) => { - const entries = (list ?? []) - .map((value) => String(value).trim()) - .filter(Boolean); - const hasWildcard = entries.includes("*"); - const normalized = entries - .filter((value) => value !== "*") - .map((value) => value.replace(/^(telegram|tg):/i, "")); - const normalizedLower = normalized.map((value) => value.toLowerCase()); - return { - entries: normalized, - entriesLower: normalizedLower, - hasWildcard, - hasEntries: entries.length > 0, - }; - }; - const firstDefined = (...values: Array) => { - for (const value of values) { - if (typeof value !== "undefined") return value; - } - return undefined; - }; - const isSenderAllowed = (params: { - allow: ReturnType; - senderId?: string; - senderUsername?: string; - }) => { - const { allow, senderId, senderUsername } = params; - if (!allow.hasEntries) return true; - if (allow.hasWildcard) return true; - if (senderId && allow.entries.includes(senderId)) return true; - const username = senderUsername?.toLowerCase(); - if (!username) return false; - return allow.entriesLower.some( - (entry) => entry === username || entry === `@${username}`, - ); - }; const replyToMode = opts.replyToMode ?? telegramCfg.replyToMode ?? "first"; const streamMode = resolveTelegramStreamMode(telegramCfg); const nativeEnabled = resolveNativeCommandsEnabled({ @@ -373,1051 +246,61 @@ export function createTelegramBot(opts: TelegramBotOptions) { return { groupConfig, topicConfig }; }; - const processMessage = async ( - primaryCtx: TelegramContext, - allMedia: Array<{ path: string; contentType?: string }>, - storeAllowFrom: string[], - options?: { forceWasMentioned?: boolean; messageIdOverride?: string }, - ) => { - const msg = primaryCtx.message; - recordChannelActivity({ - channel: "telegram", - accountId: account.accountId, - direction: "inbound", - }); - const chatId = msg.chat.id; - const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; - const messageThreadId = (msg as { message_thread_id?: number }) - .message_thread_id; - const isForum = (msg.chat as { is_forum?: boolean }).is_forum === true; - const resolvedThreadId = resolveTelegramForumThreadId({ - isForum, - messageThreadId, - }); - const { groupConfig, topicConfig } = resolveTelegramGroupConfig( - chatId, - resolvedThreadId, - ); - const peerId = isGroup - ? buildTelegramGroupPeerId(chatId, resolvedThreadId) - : String(chatId); - const route = resolveAgentRoute({ - cfg, - channel: "telegram", - accountId: account.accountId, - peer: { - kind: isGroup ? "group" : "dm", - id: peerId, - }, - }); - const mentionRegexes = buildMentionRegexes(cfg, route.agentId); - const effectiveDmAllow = normalizeAllowFrom([ - ...(allowFrom ?? []), - ...storeAllowFrom, - ]); - const groupAllowOverride = firstDefined( - topicConfig?.allowFrom, - groupConfig?.allowFrom, - ); - const effectiveGroupAllow = normalizeAllowFrom([ - ...(groupAllowOverride ?? groupAllowFrom ?? []), - ...storeAllowFrom, - ]); - const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; - - if (isGroup && groupConfig?.enabled === false) { - logVerbose(`Blocked telegram group ${chatId} (group disabled)`); - return; - } - if (isGroup && topicConfig?.enabled === false) { - logVerbose( - `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, - ); - return; - } - - const sendTyping = async () => { - try { - await bot.api.sendChatAction( - chatId, - "typing", - buildTelegramThreadParams(resolvedThreadId), - ); - } catch (err) { - logVerbose( - `telegram typing cue failed for chat ${chatId}: ${String(err)}`, - ); - } - }; - - // DM access control (secure defaults): "pairing" (default) / "allowlist" / "open" / "disabled" - if (!isGroup) { - if (dmPolicy === "disabled") return; - - if (dmPolicy !== "open") { - const candidate = String(chatId); - const senderUsername = msg.from?.username ?? ""; - const allowed = - effectiveDmAllow.hasWildcard || - (effectiveDmAllow.hasEntries && - isSenderAllowed({ - allow: effectiveDmAllow, - senderId: candidate, - senderUsername, - })); - if (!allowed) { - if (dmPolicy === "pairing") { - try { - const from = msg.from as - | { - first_name?: string; - last_name?: string; - username?: string; - id?: number; - } - | undefined; - const telegramUserId = from?.id ? String(from.id) : candidate; - const { code, created } = await upsertTelegramPairingRequest({ - chatId: candidate, - username: from?.username, - firstName: from?.first_name, - lastName: from?.last_name, - }); - if (created) { - logger.info( - { - chatId: candidate, - username: from?.username, - firstName: from?.first_name, - lastName: from?.last_name, - }, - "telegram pairing request", - ); - await bot.api.sendMessage( - chatId, - [ - "Clawdbot: access not configured.", - "", - `Your Telegram user id: ${telegramUserId}`, - "", - `Pairing code: ${code}`, - "", - "Ask the bot owner to approve with:", - "clawdbot pairing approve telegram ", - ].join("\n"), - ); - } - } catch (err) { - logVerbose( - `telegram pairing reply failed for chat ${chatId}: ${String(err)}`, - ); - } - } else { - logVerbose( - `Blocked unauthorized telegram sender ${candidate} (dmPolicy=${dmPolicy})`, - ); - } - return; - } - } - } - - const botUsername = primaryCtx.me?.username?.toLowerCase(); - const senderId = msg.from?.id ? String(msg.from.id) : ""; - const senderUsername = msg.from?.username ?? ""; - if (isGroup && hasGroupAllowOverride) { - const allowed = isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - }); - if (!allowed) { - logVerbose( - `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, - ); - return; - } - } - const commandAuthorized = isSenderAllowed({ - allow: isGroup ? effectiveGroupAllow : effectiveDmAllow, - senderId, - senderUsername, - }); - const computedWasMentioned = - (Boolean(botUsername) && hasBotMention(msg, botUsername)) || - matchesMentionPatterns(msg.text ?? msg.caption ?? "", mentionRegexes); - const wasMentioned = - options?.forceWasMentioned === true ? true : computedWasMentioned; - const hasAnyMention = (msg.entities ?? msg.caption_entities ?? []).some( - (ent) => ent.type === "mention", - ); - const activationOverride = resolveGroupActivation({ - chatId, - messageThreadId: resolvedThreadId, - sessionKey: route.sessionKey, - agentId: route.agentId, - }); - const baseRequireMention = resolveGroupRequireMention(chatId); - const requireMention = firstDefined( - activationOverride, - topicConfig?.requireMention, - groupConfig?.requireMention, - baseRequireMention, - ); - const shouldBypassMention = - isGroup && - requireMention && - !wasMentioned && - !hasAnyMention && - commandAuthorized && - hasControlCommand(msg.text ?? msg.caption ?? "", cfg, { botUsername }); - const effectiveWasMentioned = wasMentioned || shouldBypassMention; - const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0; - if (isGroup && requireMention && canDetectMention) { - if (!wasMentioned && !shouldBypassMention) { - logger.info({ chatId, reason: "no-mention" }, "skipping group message"); - return; - } - } - - // ACK reactions - const ackReaction = resolveAckReaction(cfg, route.agentId); - const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false; - const shouldAckReaction = () => { - if (!ackReaction) return false; - if (ackReactionScope === "all") return true; - if (ackReactionScope === "direct") return !isGroup; - if (ackReactionScope === "group-all") return isGroup; - if (ackReactionScope === "group-mentions") { - if (!isGroup) return false; - if (!requireMention) return false; - if (!canDetectMention) return false; - return wasMentioned || shouldBypassMention; - } - return false; - }; - const api = bot.api as unknown as { - setMessageReaction?: ( - chatId: number | string, - messageId: number, - reactions: Array<{ type: "emoji"; emoji: string }>, - ) => Promise; - }; - const reactionApi = - typeof api.setMessageReaction === "function" - ? api.setMessageReaction.bind(api) - : null; - const ackReactionPromise = - shouldAckReaction() && msg.message_id && reactionApi - ? reactionApi(chatId, msg.message_id, [ - { type: "emoji", emoji: ackReaction }, - ]).then( - () => true, - (err) => { - logVerbose( - `telegram react failed for chat ${chatId}: ${String(err)}`, - ); - return false; - }, - ) - : null; - - let placeholder = ""; - if (msg.photo) placeholder = ""; - else if (msg.video) placeholder = ""; - else if (msg.audio || msg.voice) placeholder = ""; - else if (msg.document) placeholder = ""; - - const replyTarget = describeReplyTarget(msg); - const locationData = extractTelegramLocation(msg); - const locationText = locationData - ? formatLocationText(locationData) - : undefined; - const rawText = (msg.text ?? msg.caption ?? "").trim(); - let rawBody = [rawText, locationText].filter(Boolean).join("\n").trim(); - if (!rawBody) rawBody = placeholder; - if (!rawBody && allMedia.length === 0) return; - - let bodyText = rawBody; - if (!bodyText && allMedia.length > 0) { - bodyText = `${allMedia.length > 1 ? ` (${allMedia.length} images)` : ""}`; - } - - const replySuffix = replyTarget - ? `\n\n[Replying to ${replyTarget.sender}${ - replyTarget.id ? ` id:${replyTarget.id}` : "" - }]\n${replyTarget.body}\n[/Replying]` - : ""; - const groupLabel = isGroup - ? buildGroupLabel(msg, chatId, resolvedThreadId) - : undefined; - const body = formatAgentEnvelope({ - channel: "Telegram", - from: isGroup - ? buildGroupFromLabel(msg, chatId, senderId, resolvedThreadId) - : buildSenderLabel(msg, senderId || chatId), - timestamp: msg.date ? msg.date * 1000 : undefined, - body: `${bodyText}${replySuffix}`, - }); - let combinedBody = body; - const historyKey = isGroup - ? buildTelegramGroupPeerId(chatId, resolvedThreadId) - : undefined; - if (isGroup && historyKey && historyLimit > 0) { - combinedBody = buildHistoryContextFromMap({ - historyMap: groupHistories, - historyKey, - limit: historyLimit, - entry: { - sender: buildSenderLabel(msg, senderId || chatId), - body: rawBody, - timestamp: msg.date ? msg.date * 1000 : undefined, - messageId: - typeof msg.message_id === "number" - ? String(msg.message_id) - : undefined, - }, - currentMessage: combinedBody, - formatEntry: (entry) => - formatAgentEnvelope({ - channel: "Telegram", - from: groupLabel ?? `group:${chatId}`, - timestamp: entry.timestamp, - body: `${entry.sender}: ${entry.body} [id:${entry.messageId ?? "unknown"} chat:${chatId}]`, - }), - }); - } - - const skillFilter = firstDefined(topicConfig?.skills, groupConfig?.skills); - const systemPromptParts = [ - groupConfig?.systemPrompt?.trim() || null, - topicConfig?.systemPrompt?.trim() || null, - ].filter((entry): entry is string => Boolean(entry)); - const groupSystemPrompt = - systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined; - const commandBody = normalizeCommandBody(rawBody, { botUsername }); - const ctxPayload = { - Body: combinedBody, - RawBody: rawBody, - CommandBody: commandBody, - From: isGroup - ? buildTelegramGroupFrom(chatId, resolvedThreadId) - : `telegram:${chatId}`, - To: `telegram:${chatId}`, - SessionKey: route.sessionKey, - AccountId: route.accountId, - ChatType: isGroup ? "group" : "direct", - GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined, - GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined, - SenderName: buildSenderName(msg), - SenderId: senderId || undefined, - SenderUsername: senderUsername || undefined, - Provider: "telegram", - Surface: "telegram", - MessageSid: options?.messageIdOverride ?? String(msg.message_id), - ReplyToId: replyTarget?.id, - ReplyToBody: replyTarget?.body, - ReplyToSender: replyTarget?.sender, - Timestamp: msg.date ? msg.date * 1000 : undefined, - WasMentioned: isGroup ? effectiveWasMentioned : undefined, - MediaPath: allMedia[0]?.path, - MediaType: allMedia[0]?.contentType, - MediaUrl: allMedia[0]?.path, - MediaPaths: allMedia.length > 0 ? allMedia.map((m) => m.path) : undefined, - MediaUrls: allMedia.length > 0 ? allMedia.map((m) => m.path) : undefined, - MediaTypes: - allMedia.length > 0 - ? (allMedia.map((m) => m.contentType).filter(Boolean) as string[]) - : undefined, - ...(locationData ? toLocationContext(locationData) : undefined), - CommandAuthorized: commandAuthorized, - MessageThreadId: resolvedThreadId, - IsForum: isForum, - // Originating channel for reply routing. - OriginatingChannel: "telegram" as const, - OriginatingTo: `telegram:${chatId}`, - }; - - if (replyTarget && shouldLogVerbose()) { - const preview = replyTarget.body.replace(/\s+/g, " ").slice(0, 120); - logVerbose( - `telegram reply-context: replyToId=${replyTarget.id} replyToSender=${replyTarget.sender} replyToBody="${preview}"`, - ); - } - - if (!isGroup) { - const sessionCfg = cfg.session; - const storePath = resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); - await updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - channel: "telegram", - to: String(chatId), - accountId: route.accountId, - }); - } - - if (shouldLogVerbose()) { - const preview = body.slice(0, 200).replace(/\n/g, "\\n"); - const mediaInfo = - allMedia.length > 1 ? ` mediaCount=${allMedia.length}` : ""; - const topicInfo = - resolvedThreadId != null ? ` topic=${resolvedThreadId}` : ""; - logVerbose( - `telegram inbound: chatId=${chatId} from=${ctxPayload.From} len=${body.length}${mediaInfo}${topicInfo} preview="${preview}"`, - ); - } - - const isPrivateChat = msg.chat.type === "private"; - const draftMaxChars = Math.min(textLimit, 4096); - const canStreamDraft = - streamMode !== "off" && - isPrivateChat && - typeof resolvedThreadId === "number" && - (await resolveBotTopicsEnabled(primaryCtx)); - const draftStream = canStreamDraft - ? createTelegramDraftStream({ - api: bot.api, - chatId, - draftId: msg.message_id || Date.now(), - maxChars: draftMaxChars, - messageThreadId: resolvedThreadId, - log: logVerbose, - warn: logVerbose, - }) - : undefined; - const draftChunking = - draftStream && streamMode === "block" - ? resolveTelegramDraftStreamingChunking(cfg, route.accountId) - : undefined; - const draftChunker = draftChunking - ? new EmbeddedBlockChunker(draftChunking) - : undefined; - let lastPartialText = ""; - let draftText = ""; - const updateDraftFromPartial = (text?: string) => { - if (!draftStream || !text) return; - if (text === lastPartialText) return; - if (streamMode === "partial") { - lastPartialText = text; - draftStream.update(text); - return; - } - let delta = text; - if (text.startsWith(lastPartialText)) { - delta = text.slice(lastPartialText.length); - } else { - // Streaming buffer reset (or non-monotonic stream). Start fresh. - draftChunker?.reset(); - draftText = ""; - } - lastPartialText = text; - if (!delta) return; - if (!draftChunker) { - draftText = text; - draftStream.update(draftText); - return; - } - draftChunker.append(delta); - draftChunker.drain({ - force: false, - emit: (chunk) => { - draftText += chunk; - draftStream.update(draftText); - }, - }); - }; - const flushDraft = async () => { - if (!draftStream) return; - if (draftChunker?.hasBuffered()) { - draftChunker.drain({ - force: true, - emit: (chunk) => { - draftText += chunk; - }, - }); - draftChunker.reset(); - if (draftText) draftStream.update(draftText); - } - await draftStream.flush(); - }; - - const disableBlockStreaming = - Boolean(draftStream) || - (typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : undefined); - - let didSendReply = false; - const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ - ctx: ctxPayload, - cfg, - dispatcherOptions: { - responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) - .responsePrefix, - deliver: async (payload, info) => { - if (info.kind === "final") { - await flushDraft(); - draftStream?.stop(); - } - await deliverReplies({ - replies: [payload], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - replyToMode, - textLimit, - messageThreadId: resolvedThreadId, - }); - didSendReply = true; - }, - onError: (err, info) => { - runtime.error?.( - danger(`telegram ${info.kind} reply failed: ${String(err)}`), - ); - }, - onReplyStart: sendTyping, - }, - replyOptions: { - skillFilter, - onPartialReply: draftStream - ? (payload) => updateDraftFromPartial(payload.text) - : undefined, - onReasoningStream: draftStream - ? (payload) => { - if (payload.text) draftStream.update(payload.text); - } - : undefined, - disableBlockStreaming, - }, - }); - draftStream?.stop(); - if (!queuedFinal) { - if (isGroup && historyKey && historyLimit > 0 && didSendReply) { - clearHistoryEntries({ historyMap: groupHistories, historyKey }); - } - return; - } - if ( - removeAckAfterReply && - ackReactionPromise && - msg.message_id && - reactionApi - ) { - void ackReactionPromise.then((didAck) => { - if (!didAck) return; - reactionApi(chatId, msg.message_id, []).catch((err) => { - logVerbose( - `telegram: failed to remove ack reaction from ${chatId}/${msg.message_id}: ${String(err)}`, - ); - }); - }); - } - if (isGroup && historyKey && historyLimit > 0 && didSendReply) { - clearHistoryEntries({ historyMap: groupHistories, historyKey }); - } - }; - - const nativeCommands = nativeEnabled - ? listNativeCommandSpecsForConfig(cfg) - : []; - if (nativeCommands.length > 0) { - const api = bot.api as unknown as { - setMyCommands?: ( - commands: Array<{ command: string; description: string }>, - ) => Promise; - }; - if (typeof api.setMyCommands === "function") { - api - .setMyCommands( - nativeCommands.map((command) => ({ - command: command.name, - description: command.description, - })), - ) - .catch((err) => { - runtime.error?.( - danger(`telegram setMyCommands failed: ${String(err)}`), - ); - }); - } else { - logVerbose("telegram: setMyCommands unavailable; skipping registration"); - } - - if ( - typeof (bot as unknown as { command?: unknown }).command !== "function" - ) { - logVerbose("telegram: bot.command unavailable; skipping native handlers"); - } else { - for (const command of nativeCommands) { - bot.command(command.name, async (ctx) => { - const msg = ctx.message; - if (!msg) return; - if (shouldSkipUpdate(ctx)) return; - const chatId = msg.chat.id; - const isGroup = - msg.chat.type === "group" || msg.chat.type === "supergroup"; - const messageThreadId = (msg as { message_thread_id?: number }) - .message_thread_id; - const isForum = - (msg.chat as { is_forum?: boolean }).is_forum === true; - const resolvedThreadId = resolveTelegramForumThreadId({ - isForum, - messageThreadId, - }); - const storeAllowFrom = await readTelegramAllowFromStore().catch( - () => [], - ); - const { groupConfig, topicConfig } = resolveTelegramGroupConfig( - chatId, - resolvedThreadId, - ); - const groupAllowOverride = firstDefined( - topicConfig?.allowFrom, - groupConfig?.allowFrom, - ); - const effectiveGroupAllow = normalizeAllowFrom([ - ...(groupAllowOverride ?? groupAllowFrom ?? []), - ...storeAllowFrom, - ]); - const hasGroupAllowOverride = - typeof groupAllowOverride !== "undefined"; - - if (isGroup && groupConfig?.enabled === false) { - await bot.api.sendMessage(chatId, "This group is disabled."); - return; - } - if (isGroup && topicConfig?.enabled === false) { - await bot.api.sendMessage(chatId, "This topic is disabled."); - return; - } - if (isGroup && hasGroupAllowOverride) { - const senderId = msg.from?.id; - const senderUsername = msg.from?.username ?? ""; - if ( - senderId == null || - !isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderId), - senderUsername, - }) - ) { - await bot.api.sendMessage( - chatId, - "You are not authorized to use this command.", - ); - return; - } - } - - if (isGroup && useAccessGroups) { - const groupPolicy = telegramCfg.groupPolicy ?? "open"; - if (groupPolicy === "disabled") { - await bot.api.sendMessage( - chatId, - "Telegram group commands are disabled.", - ); - return; - } - if (groupPolicy === "allowlist") { - const senderId = msg.from?.id; - if (senderId == null) { - await bot.api.sendMessage( - chatId, - "You are not authorized to use this command.", - ); - return; - } - const senderUsername = msg.from?.username ?? ""; - if ( - !isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderId), - senderUsername, - }) - ) { - await bot.api.sendMessage( - chatId, - "You are not authorized to use this command.", - ); - return; - } - } - const groupAllowlist = resolveGroupPolicy(chatId); - if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { - await bot.api.sendMessage(chatId, "This group is not allowed."); - return; - } - } - - const allowFromList = Array.isArray(allowFrom) - ? allowFrom.map((entry) => String(entry).trim()).filter(Boolean) - : []; - const senderId = msg.from?.id ? String(msg.from.id) : ""; - const senderUsername = msg.from?.username ?? ""; - const commandAuthorized = - allowFromList.length === 0 || - allowFromList.includes("*") || - (senderId && allowFromList.includes(senderId)) || - (senderId && allowFromList.includes(`telegram:${senderId}`)) || - (senderUsername && - allowFromList.some( - (entry) => - entry.toLowerCase() === senderUsername.toLowerCase() || - entry.toLowerCase() === `@${senderUsername.toLowerCase()}`, - )); - if (!commandAuthorized) { - await bot.api.sendMessage( - chatId, - "You are not authorized to use this command.", - ); - return; - } - - const prompt = buildCommandText(command.name, ctx.match ?? ""); - const route = resolveAgentRoute({ - cfg, - channel: "telegram", - accountId: account.accountId, - peer: { - kind: isGroup ? "group" : "dm", - id: isGroup - ? buildTelegramGroupPeerId(chatId, resolvedThreadId) - : String(chatId), - }, - }); - const skillFilter = firstDefined( - topicConfig?.skills, - groupConfig?.skills, - ); - const systemPromptParts = [ - groupConfig?.systemPrompt?.trim() || null, - topicConfig?.systemPrompt?.trim() || null, - ].filter((entry): entry is string => Boolean(entry)); - const groupSystemPrompt = - systemPromptParts.length > 0 - ? systemPromptParts.join("\n\n") - : undefined; - const ctxPayload = { - Body: prompt, - From: isGroup - ? buildTelegramGroupFrom(chatId, resolvedThreadId) - : `telegram:${chatId}`, - To: `slash:${senderId || chatId}`, - ChatType: isGroup ? "group" : "direct", - GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined, - GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined, - SenderName: buildSenderName(msg), - SenderId: senderId || undefined, - SenderUsername: senderUsername || undefined, - Surface: "telegram", - MessageSid: String(msg.message_id), - Timestamp: msg.date ? msg.date * 1000 : undefined, - WasMentioned: true, - CommandAuthorized: commandAuthorized, - CommandSource: "native" as const, - SessionKey: `telegram:slash:${senderId || chatId}`, - CommandTargetSessionKey: route.sessionKey, - MessageThreadId: resolvedThreadId, - IsForum: isForum, - }; - - const disableBlockStreaming = - typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : undefined; - - await dispatchReplyWithBufferedBlockDispatcher({ - ctx: ctxPayload, - cfg, - dispatcherOptions: { - responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) - .responsePrefix, - deliver: async (payload) => { - await deliverReplies({ - replies: [payload], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - replyToMode, - textLimit, - messageThreadId: resolvedThreadId, - }); - }, - onError: (err, info) => { - runtime.error?.( - danger( - `telegram slash ${info.kind} reply failed: ${String(err)}`, - ), - ); - }, - }, - replyOptions: { - skillFilter, - disableBlockStreaming, - }, - }); - }); - } - } - } else if (nativeDisabledExplicit) { - const api = bot.api as unknown as { - setMyCommands?: (commands: []) => Promise; - }; - if (typeof api.setMyCommands === "function") { - api.setMyCommands([]).catch((err) => { - runtime.error?.( - danger(`telegram clear commands failed: ${String(err)}`), - ); - }); - } else { - logVerbose("telegram: setMyCommands unavailable; skipping clear"); - } - } - - bot.on("callback_query", async (ctx) => { - const callback = ctx.callbackQuery; - if (!callback) return; - if (shouldSkipUpdate(ctx)) return; - try { - const data = (callback.data ?? "").trim(); - const callbackMessage = callback.message; - if (!data || !callbackMessage) return; - - const syntheticMessage: TelegramMessage = { - ...callbackMessage, - from: callback.from, - text: data, - caption: undefined, - caption_entities: undefined, - entities: undefined, - }; - const storeAllowFrom = await readTelegramAllowFromStore().catch(() => []); - const getFile = - typeof ctx.getFile === "function" - ? ctx.getFile.bind(ctx) - : async () => ({}); - await processMessage( - { message: syntheticMessage, me: ctx.me, getFile }, - [], - storeAllowFrom, - { forceWasMentioned: true, messageIdOverride: callback.id }, - ); - } catch (err) { - runtime.error?.(danger(`callback handler failed: ${String(err)}`)); - } finally { - await bot.api.answerCallbackQuery(callback.id).catch(() => {}); - } + const processMessage = createTelegramMessageProcessor({ + bot, + cfg, + account, + telegramCfg, + historyLimit, + groupHistories, + dmPolicy, + allowFrom, + groupAllowFrom, + ackReactionScope, + logger, + resolveGroupActivation, + resolveGroupRequireMention, + resolveTelegramGroupConfig, + runtime, + replyToMode, + streamMode, + textLimit, + opts, + resolveBotTopicsEnabled, }); - bot.on("message", async (ctx) => { - try { - const msg = ctx.message; - if (!msg) return; - if (shouldSkipUpdate(ctx)) return; - - const chatId = msg.chat.id; - const isGroup = - msg.chat.type === "group" || msg.chat.type === "supergroup"; - const messageThreadId = (msg as { message_thread_id?: number }) - .message_thread_id; - const isForum = (msg.chat as { is_forum?: boolean }).is_forum === true; - const resolvedThreadId = resolveTelegramForumThreadId({ - isForum, - messageThreadId, - }); - const storeAllowFrom = await readTelegramAllowFromStore().catch(() => []); - const { groupConfig, topicConfig } = resolveTelegramGroupConfig( - chatId, - resolvedThreadId, - ); - const groupAllowOverride = firstDefined( - topicConfig?.allowFrom, - groupConfig?.allowFrom, - ); - const effectiveGroupAllow = normalizeAllowFrom([ - ...(groupAllowOverride ?? groupAllowFrom ?? []), - ...storeAllowFrom, - ]); - const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; - - if (isGroup) { - if (groupConfig?.enabled === false) { - logVerbose(`Blocked telegram group ${chatId} (group disabled)`); - return; - } - if (topicConfig?.enabled === false) { - logVerbose( - `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, - ); - return; - } - if (hasGroupAllowOverride) { - const senderId = msg.from?.id; - const senderUsername = msg.from?.username ?? ""; - const allowed = - senderId != null && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderId), - senderUsername, - }); - if (!allowed) { - logVerbose( - `Blocked telegram group sender ${senderId ?? "unknown"} (group allowFrom override)`, - ); - return; - } - } - // Group policy filtering: controls how group messages are handled - // - "open": groups bypass allowFrom, only mention-gating applies - // - "disabled": block all group messages entirely - // - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom - const groupPolicy = telegramCfg.groupPolicy ?? "open"; - if (groupPolicy === "disabled") { - logVerbose(`Blocked telegram group message (groupPolicy: disabled)`); - return; - } - if (groupPolicy === "allowlist") { - // For allowlist mode, the sender (msg.from.id) must be in allowFrom - const senderId = msg.from?.id; - if (senderId == null) { - logVerbose( - `Blocked telegram group message (no sender ID, groupPolicy: allowlist)`, - ); - return; - } - if (!effectiveGroupAllow.hasEntries) { - logVerbose( - "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", - ); - return; - } - const senderUsername = msg.from?.username ?? ""; - if ( - !isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderId), - senderUsername, - }) - ) { - logVerbose( - `Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`, - ); - return; - } - } - - // Group allowlist based on configured group IDs. - const groupAllowlist = resolveGroupPolicy(chatId); - if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { - logger.info( - { chatId, title: msg.chat.title, reason: "not-allowed" }, - "skipping group message", - ); - return; - } - } - - // Media group handling - buffer multi-image messages - const mediaGroupId = (msg as { media_group_id?: string }).media_group_id; - if (mediaGroupId) { - const existing = mediaGroupBuffer.get(mediaGroupId); - if (existing) { - clearTimeout(existing.timer); - existing.messages.push({ msg, ctx }); - existing.timer = setTimeout(async () => { - mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(existing); - }) - .catch(() => undefined); - await mediaGroupProcessing; - }, MEDIA_GROUP_TIMEOUT_MS); - } else { - const entry: MediaGroupEntry = { - messages: [{ msg, ctx }], - timer: setTimeout(async () => { - mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(entry); - }) - .catch(() => undefined); - await mediaGroupProcessing; - }, MEDIA_GROUP_TIMEOUT_MS), - }; - mediaGroupBuffer.set(mediaGroupId, entry); - } - return; - } - - let media: Awaited> = null; - try { - media = await resolveMedia( - ctx, - mediaMaxBytes, - opts.token, - opts.proxyFetch, - ); - } catch (mediaErr) { - const errMsg = String(mediaErr); - if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) { - const limitMb = Math.round(mediaMaxBytes / (1024 * 1024)); - await bot.api - .sendMessage( - chatId, - `⚠️ File too large. Maximum size is ${limitMb}MB.`, - { reply_to_message_id: msg.message_id }, - ) - .catch(() => {}); - logger.warn({ chatId, error: errMsg }, "media exceeds size limit"); - return; - } - throw mediaErr; - } - const allMedia = media - ? [{ path: media.path, contentType: media.contentType }] - : []; - await processMessage(ctx, allMedia, storeAllowFrom); - } catch (err) { - runtime.error?.(danger(`handler failed: ${String(err)}`)); - } + registerTelegramNativeCommands({ + bot, + cfg, + runtime, + accountId: account.accountId, + telegramCfg, + allowFrom, + groupAllowFrom, + replyToMode, + textLimit, + useAccessGroups, + nativeEnabled, + nativeDisabledExplicit, + resolveGroupPolicy, + resolveTelegramGroupConfig, + shouldSkipUpdate, + opts, }); - const processMediaGroup = async (entry: MediaGroupEntry) => { - try { - entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); - - const captionMsg = entry.messages.find( - (m) => m.msg.caption || m.msg.text, - ); - const primaryEntry = captionMsg ?? entry.messages[0]; - - const allMedia: Array<{ path: string; contentType?: string }> = []; - for (const { ctx } of entry.messages) { - const media = await resolveMedia( - ctx, - mediaMaxBytes, - opts.token, - opts.proxyFetch, - ); - if (media) { - allMedia.push({ path: media.path, contentType: media.contentType }); - } - } - - const storeAllowFrom = await readTelegramAllowFromStore().catch(() => []); - await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom); - } catch (err) { - runtime.error?.(danger(`media group handler failed: ${String(err)}`)); - } - }; + registerTelegramHandlers({ + bot, + opts, + runtime, + mediaMaxBytes, + telegramCfg, + groupAllowFrom, + resolveGroupPolicy, + resolveTelegramGroupConfig, + shouldSkipUpdate, + processMessage, + logger, + }); return bot; }