// @ts-nocheck import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; import { resolveChunkMode } from "../auto-reply/chunk.js"; import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { removeAckReactionAfterReply } from "../channels/ack-reactions.js"; import { logAckFailure, logTypingFailure } from "../channels/logging.js"; import { createReplyPrefixContext } from "../channels/reply-prefix.js"; import { createTypingCallbacks } from "../channels/typing.js"; import { danger, logVerbose } from "../globals.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; import { deliverReplies } from "./bot/delivery.js"; import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js"; import { createTelegramDraftStream } from "./draft-stream.js"; export const dispatchTelegramMessage = async ({ context, bot, cfg, runtime, replyToMode, streamMode, textLimit, telegramCfg, opts, resolveBotTopicsEnabled, }) => { const { ctxPayload, primaryCtx, msg, chatId, isGroup, resolvedThreadId, historyKey, historyLimit, groupHistories, route, skillFilter, sendTyping, sendRecordVoice, ackReactionPromise, reactionApi, removeAckAfterReply, } = context; const isPrivateChat = msg.chat.type === "private"; const draftMaxChars = Math.min(textLimit, 4096); const canStreamDraft = streamMode !== "off" && isPrivateChat && typeof resolvedThreadId === "number" && (await resolveBotTopicsEnabled(primaryCtx)); const draftStream = canStreamDraft ? createTelegramDraftStream({ api: bot.api, chatId, draftId: msg.message_id || Date.now(), maxChars: draftMaxChars, messageThreadId: resolvedThreadId, log: logVerbose, warn: logVerbose, }) : undefined; const draftChunking = draftStream && streamMode === "block" ? resolveTelegramDraftStreamingChunking(cfg, route.accountId) : undefined; const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking) : undefined; let lastPartialText = ""; let draftText = ""; const updateDraftFromPartial = (text?: string) => { if (!draftStream || !text) return; if (text === lastPartialText) return; if (streamMode === "partial") { lastPartialText = text; draftStream.update(text); return; } let delta = text; if (text.startsWith(lastPartialText)) { delta = text.slice(lastPartialText.length); } else { // Streaming buffer reset (or non-monotonic stream). Start fresh. draftChunker?.reset(); draftText = ""; } lastPartialText = text; if (!delta) return; if (!draftChunker) { draftText = text; draftStream.update(draftText); return; } draftChunker.append(delta); draftChunker.drain({ force: false, emit: (chunk) => { draftText += chunk; draftStream.update(draftText); }, }); }; const flushDraft = async () => { if (!draftStream) return; if (draftChunker?.hasBuffered()) { draftChunker.drain({ force: true, emit: (chunk) => { draftText += chunk; }, }); draftChunker.reset(); if (draftText) draftStream.update(draftText); } await draftStream.flush(); }; const disableBlockStreaming = Boolean(draftStream) || (typeof telegramCfg.blockStreaming === "boolean" ? !telegramCfg.blockStreaming : undefined); const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId }); const tableMode = resolveMarkdownTableMode({ cfg, channel: "telegram", accountId: route.accountId, }); const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg, dispatcherOptions: { responsePrefix: prefixContext.responsePrefix, responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, deliver: async (payload, info) => { if (info.kind === "final") { await flushDraft(); draftStream?.stop(); } await deliverReplies({ replies: [payload], chatId: String(chatId), token: opts.token, runtime, bot, replyToMode, textLimit, messageThreadId: resolvedThreadId, tableMode, chunkMode, onVoiceRecording: sendRecordVoice, linkPreview: telegramCfg.linkPreview, }); }, onError: (err, info) => { runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, onReplyStart: createTypingCallbacks({ start: sendTyping, onStartError: (err) => { logTypingFailure({ log: logVerbose, channel: "telegram", target: String(chatId), error: err, }); }, }).onReplyStart, }, replyOptions: { skillFilter, onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined, onReasoningStream: draftStream ? (payload) => { if (payload.text) draftStream.update(payload.text); } : undefined, disableBlockStreaming, onModelSelected: (ctx) => { prefixContext.onModelSelected(ctx); }, }, }); draftStream?.stop(); if (!queuedFinal) { if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); } return; } removeAckReactionAfterReply({ removeAfterReply: removeAckAfterReply, ackReactionPromise, ackReactionValue: ackReactionPromise ? "ack" : null, remove: () => reactionApi?.(chatId, msg.message_id ?? 0, []) ?? Promise.resolve(), onError: (err) => { if (!msg.message_id) return; logAckFailure({ log: logVerbose, channel: "telegram", target: `${chatId}/${msg.message_id}`, error: err, }); }, }); if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); } };