diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index 79927220e..c49c0944e 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -26,9 +26,23 @@ export const registerTelegramHandlers = ({ processMessage, logger, }) => { + const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; + const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = 1500; + const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1; + const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12; + const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000; + const mediaGroupBuffer = new Map(); let mediaGroupProcessing: Promise = Promise.resolve(); + type TextFragmentEntry = { + key: string; + messages: Array<{ msg: TelegramMessage; ctx: unknown; receivedAtMs: number }>; + timer: ReturnType; + }; + const textFragmentBuffer = new Map(); + let textFragmentProcessing: Promise = Promise.resolve(); + const processMediaGroup = async (entry: MediaGroupEntry) => { try { entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); @@ -51,6 +65,55 @@ export const registerTelegramHandlers = ({ } }; + const flushTextFragments = async (entry: TextFragmentEntry) => { + try { + entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); + + const first = entry.messages[0]; + const last = entry.messages.at(-1); + if (!first || !last) return; + + const combinedText = entry.messages.map((m) => m.msg.text ?? "").join(""); + if (!combinedText.trim()) return; + + const syntheticMessage: TelegramMessage = { + ...first.msg, + text: combinedText, + caption: undefined, + caption_entities: undefined, + entities: undefined, + date: last.msg.date ?? first.msg.date, + }; + + const storeAllowFrom = await readTelegramAllowFromStore().catch(() => []); + const baseCtx = first.ctx as { me?: unknown; getFile?: unknown } & Record; + const getFile = + typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({}); + + await processMessage( + { message: syntheticMessage, me: baseCtx.me, getFile }, + [], + storeAllowFrom, + { messageIdOverride: String(last.msg.message_id) }, + ); + } catch (err) { + runtime.error?.(danger(`text fragment handler failed: ${String(err)}`)); + } + }; + + const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => { + clearTimeout(entry.timer); + entry.timer = setTimeout(async () => { + textFragmentBuffer.delete(entry.key); + textFragmentProcessing = textFragmentProcessing + .then(async () => { + await flushTextFragments(entry); + }) + .catch(() => undefined); + await textFragmentProcessing; + }, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS); + }; + bot.on("callback_query", async (ctx) => { const callback = ctx.callbackQuery; if (!callback) return; @@ -226,6 +289,68 @@ export const registerTelegramHandlers = ({ } } + // Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars). + // We buffer “near-limit” messages and append immediately-following parts. + const text = typeof msg.text === "string" ? msg.text : undefined; + const isCommandLike = (text ?? "").trim().startsWith("/"); + if (text && !isCommandLike) { + const nowMs = Date.now(); + const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown"; + const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`; + const existing = textFragmentBuffer.get(key); + + if (existing) { + const last = existing.messages.at(-1); + const lastMsgId = last?.msg.message_id; + const lastReceivedAtMs = last?.receivedAtMs ?? nowMs; + const idGap = typeof lastMsgId === "number" ? msg.message_id - lastMsgId : Infinity; + const timeGapMs = nowMs - lastReceivedAtMs; + const canAppend = + idGap > 0 && + idGap <= TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP && + timeGapMs >= 0 && + timeGapMs <= TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS; + + if (canAppend) { + const currentTotalChars = existing.messages.reduce( + (sum, m) => sum + (m.msg.text?.length ?? 0), + 0, + ); + const nextTotalChars = currentTotalChars + text.length; + if ( + existing.messages.length + 1 <= TELEGRAM_TEXT_FRAGMENT_MAX_PARTS && + nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS + ) { + existing.messages.push({ msg, ctx, receivedAtMs: nowMs }); + scheduleTextFragmentFlush(existing); + return; + } + } + + // Not appendable (or limits exceeded): flush buffered entry first, then continue normally. + clearTimeout(existing.timer); + textFragmentBuffer.delete(key); + textFragmentProcessing = textFragmentProcessing + .then(async () => { + await flushTextFragments(existing); + }) + .catch(() => undefined); + await textFragmentProcessing; + } + + const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS; + if (shouldStart) { + const entry: TextFragmentEntry = { + key, + messages: [{ msg, ctx, receivedAtMs: nowMs }], + timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS), + }; + textFragmentBuffer.set(key, entry); + scheduleTextFragmentFlush(entry); + return; + } + } + // Media group handling - buffer multi-image messages const mediaGroupId = (msg as { media_group_id?: string }).media_group_id; if (mediaGroupId) { diff --git a/src/telegram/bot.media.downloads-media-file-path-no-file-download.test.ts b/src/telegram/bot.media.downloads-media-file-path-no-file-download.test.ts index cd94f6b56..aa2653cb3 100644 --- a/src/telegram/bot.media.downloads-media-file-path-no-file-download.test.ts +++ b/src/telegram/bot.media.downloads-media-file-path-no-file-download.test.ts @@ -409,3 +409,74 @@ describe("telegram media groups", () => { MEDIA_GROUP_TEST_TIMEOUT_MS, ); }); + +describe("telegram text fragments", () => { + beforeEach(() => { + // These tests rely on real setTimeout aggregation; guard against leaked fake timers. + vi.useRealTimers(); + }); + + const TEXT_FRAGMENT_POLL_TIMEOUT_MS = process.platform === "win32" ? 30_000 : 15_000; + const TEXT_FRAGMENT_TEST_TIMEOUT_MS = process.platform === "win32" ? 45_000 : 20_000; + + const waitForFragmentProcessing = async ( + replySpy: ReturnType, + expectedCalls: number, + ) => { + await expect + .poll(() => replySpy.mock.calls.length, { timeout: TEXT_FRAGMENT_POLL_TIMEOUT_MS }) + .toBe(expectedCalls); + }; + + it( + "buffers near-limit text and processes sequential parts as one message", + async () => { + const { createTelegramBot } = await import("./bot.js"); + const replyModule = await import("../auto-reply/reply.js"); + const replySpy = replyModule.__replySpy as unknown as ReturnType; + + onSpy.mockReset(); + replySpy.mockReset(); + + createTelegramBot({ token: "tok" }); + const handler = onSpy.mock.calls.find((call) => call[0] === "message")?.[1] as ( + ctx: Record, + ) => Promise; + expect(handler).toBeDefined(); + + const part1 = "A".repeat(4050); + const part2 = "B".repeat(50); + + await handler({ + message: { + chat: { id: 42, type: "private" }, + message_id: 10, + date: 1736380800, + text: part1, + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({}), + }); + + await handler({ + message: { + chat: { id: 42, type: "private" }, + message_id: 11, + date: 1736380801, + text: part2, + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({}), + }); + + expect(replySpy).not.toHaveBeenCalled(); + await waitForFragmentProcessing(replySpy, 1); + + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0][0] as { RawBody?: string; Body?: string }; + expect(payload.RawBody).toContain(part1.slice(0, 32)); + expect(payload.RawBody).toContain(part2.slice(0, 32)); + }, + TEXT_FRAGMENT_TEST_TIMEOUT_MS, + ); +});