diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e4248a4e..8631fdb2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ - Telegram: stop typing after tool results. Thanks @AbhisekBasu1 for PR #322. - Telegram: include sender identity in group envelope headers. (#336) - Telegram: support forum topics with topic-isolated sessions and message_thread_id routing. Thanks @HazAT, @nachoiacovino, @RandyVentures for PR #321/#333/#334. +- Telegram: add draft streaming via `sendMessageDraft` with `telegram.streamMode`, plus `/reasoning stream` for draft-only reasoning. - iMessage: ignore disconnect errors during shutdown (avoid unhandled promise rejections). Thanks @antons for PR #359. - Messages: stop defaulting ack reactions to 👀 when identity emoji is missing. - Auto-reply: require slash for control commands to avoid false triggers in normal text. diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index bfdbe2478..8fb78d7cc 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -468,6 +468,7 @@ Set `telegram.enabled: false` to disable automatic startup. dmPolicy: "pairing", // pairing | allowlist | open | disabled allowFrom: ["tg:123456789"], // optional; "open" requires ["*"] groups: { "*": { requireMention: true } }, + streamMode: "partial", // off | partial | block (draft streaming) actions: { reactions: true }, // tool action gates (false disables) mediaMaxMb: 5, proxy: "socks5://localhost:9050", @@ -478,6 +479,11 @@ Set `telegram.enabled: false` to disable automatic startup. } ``` +Draft streaming notes: +- Uses Telegram `sendMessageDraft` (draft bubble, not a real message). +- Requires **private chat topics** (message_thread_id in DMs; bot has topics enabled). +- `/reasoning stream` streams reasoning into the draft, then sends the final answer. + ### `discord` (bot transport) Configure the Discord bot by setting the bot token and optional gating: diff --git a/docs/providers/grammy.md b/docs/providers/grammy.md index e5fc77b48..1f48c31eb 100644 --- a/docs/providers/grammy.md +++ b/docs/providers/grammy.md @@ -5,7 +5,7 @@ read_when: --- # grammY Integration (Telegram Bot API) -Updated: 2025-12-07 +Updated: 2026-01-07 # Why grammY - TS-first Bot API client with built-in long-poll + webhook helpers, middleware, error handling, rate limiter. @@ -19,6 +19,7 @@ Updated: 2025-12-07 - **Webhook support:** `webhook-set.ts` wraps `setWebhook/deleteWebhook`; `webhook.ts` hosts the callback with health + graceful shutdown. Gateway enables webhook mode when `telegram.webhookUrl` is set (otherwise it long-polls). - **Sessions:** direct chats collapse into the agent main session (`agent::`); groups use `agent::telegram:group:`; replies route back to the same provider. - **Config knobs:** `telegram.botToken`, `telegram.dmPolicy`, `telegram.groups` (allowlist + mention defaults), `telegram.allowFrom`, `telegram.groupAllowFrom`, `telegram.groupPolicy`, `telegram.mediaMaxMb`, `telegram.proxy`, `telegram.webhookSecret`, `telegram.webhookUrl`. +- **Draft streaming:** optional `telegram.streamMode` uses `sendMessageDraft` in private topic chats (Bot API 9.3+). - **Tests:** grammy mocks cover DM + group mention gating and outbound send; more media/webhook fixtures still welcome. Open questions diff --git a/docs/providers/telegram.md b/docs/providers/telegram.md index b68cc54fc..8c8514d3c 100644 --- a/docs/providers/telegram.md +++ b/docs/providers/telegram.md @@ -5,7 +5,7 @@ read_when: --- # Telegram (Bot API) -Updated: 2026-01-06 +Updated: 2026-01-07 Status: production-ready for bot DMs + groups via grammY. Long-polling by default; webhook optional. @@ -44,6 +44,10 @@ Telegram forum topics include a `message_thread_id` per message. Clawdbot: - Sends typing indicators and replies with `message_thread_id` so responses stay in the topic. - Exposes `MessageThreadId` + `IsForum` in template context for routing/templating. +Private topics (DM forum mode) also include `message_thread_id`. Clawdbot: +- Appends `:topic:` to **DM** session keys for isolation. +- Uses the thread id for draft streaming + replies. + ## Access control (DMs + groups) - Default: `telegram.dmPolicy = "pairing"`. Unknown senders receive a pairing code; messages are ignored until approved (codes expire after 1 hour). - Approve via: @@ -69,6 +73,27 @@ Telegram supports optional threaded replies via tags: Controlled by `telegram.replyToMode`: - `off` (default), `first`, `all`. +## Streaming (drafts) +Telegram can stream **draft bubbles** while the agent is generating a response. +Clawdbot uses Bot API `sendMessageDraft` (not real messages) and then sends the +final reply as a normal message. + +Requirements (Telegram Bot API 9.3+): +- **Private chats with topics enabled** (forum topic mode for the bot). +- Incoming messages must include `message_thread_id` (private topic thread). +- Streaming is ignored for groups/supergroups/channels. + +Config: +- `telegram.streamMode: "off" | "partial" | "block"` (default: `partial`) + - `partial`: update the draft bubble with the latest streaming text. + - `block`: update the draft bubble in larger blocks (chunked). + - `off`: disable draft streaming. + +Reasoning stream (Telegram only): +- `/reasoning stream` streams reasoning into the draft bubble while the reply is + generating, then sends the final answer without reasoning. +- If `telegram.streamMode` is `off`, reasoning stream is disabled. + ## Agent tool (reactions) - Tool: `telegram` with `react` action (`chatId`, `messageId`, `emoji`). - Reaction removal semantics: see [/tools/reactions](/tools/reactions). @@ -92,6 +117,7 @@ Provider options: - `telegram.groups`: per-group defaults + allowlist (use `"*"` for global defaults). - `telegram.replyToMode`: `off | first | all`. - `telegram.textChunkLimit`: outbound chunk size (chars). +- `telegram.streamMode`: `off | partial | block` (draft streaming). - `telegram.mediaMaxMb`: inbound/outbound media cap (MB). - `telegram.proxy`: proxy URL for Bot API calls (SOCKS/HTTP). - `telegram.webhookUrl`: enable webhook mode. diff --git a/docs/start/faq.md b/docs/start/faq.md index f9b096be3..ad16dd0e7 100644 --- a/docs/start/faq.md +++ b/docs/start/faq.md @@ -603,7 +603,7 @@ Quick reference (send these in chat): | `/activation mention\|always` | Group activation (owner-only) | | `/think ` | Set thinking level (off\|minimal\|low\|medium\|high) | | `/verbose on\|off` | Toggle verbose mode | -| `/reasoning on\|off` | Toggle reasoning visibility | +| `/reasoning on\|off\|stream` | Toggle reasoning visibility (stream = Telegram draft only) | | `/elevated on\|off` | Toggle elevated bash mode (approved senders only) | | `/model ` | Switch AI model (see below) | | `/queue ` | Queue mode (see below) | diff --git a/docs/tools/slash-commands.md b/docs/tools/slash-commands.md index 7a9eddd9b..67633b9c5 100644 --- a/docs/tools/slash-commands.md +++ b/docs/tools/slash-commands.md @@ -40,7 +40,7 @@ Text + native (when enabled): - `/reset` or `/new` - `/think ` (aliases: `/thinking`, `/t`) - `/verbose on|off` (alias: `/v`) -- `/reasoning on|off` (alias: `/reason`) +- `/reasoning on|off|stream` (alias: `/reason`; `stream` = Telegram draft only) - `/elevated on|off` (alias: `/elev`) - `/model ` - `/queue ` (plus options like `debounce:2s cap:25 drop:summarize`) diff --git a/docs/tools/thinking.md b/docs/tools/thinking.md index 86ebb6f5a..9ac0980ca 100644 --- a/docs/tools/thinking.md +++ b/docs/tools/thinking.md @@ -35,9 +35,10 @@ read_when: - When verbose is on, agents that emit structured tool results (Pi, other JSON agents) send each tool result back as its own metadata-only message, prefixed with ` : ` when available (path/command); the tool output itself is not forwarded. These tool summaries are sent as soon as each tool finishes (separate bubbles), not as streaming deltas. If you toggle `/verbose on|off` while a run is in-flight, subsequent tool bubbles honor the new setting. ## Reasoning visibility (/reasoning) -- Levels: `on|off`. +- Levels: `on|off|stream`. - Directive-only message toggles whether thinking blocks are shown as italic text in replies. - When enabled, any model-provided reasoning content is appended as a separate italic block. +- `stream` (Telegram only): streams reasoning into the Telegram draft bubble while the reply is generating, then sends the final answer without reasoning. - Alias: `/reason`. ## Related diff --git a/docs/web/tui.md b/docs/web/tui.md index 3c83c0a43..df87d081d 100644 --- a/docs/web/tui.md +++ b/docs/web/tui.md @@ -6,7 +6,7 @@ read_when: --- # TUI (Gateway chat client) -Updated: 2026-01-03 +Updated: 2026-01-07 ## What it is - A terminal UI that connects to the Gateway WebSocket and speaks the same chat APIs as WebChat. @@ -51,7 +51,7 @@ Use SSH tunneling or Tailscale to reach the Gateway WS. - `/model ` (or `/model list`, `/models`) - `/think ` - `/verbose ` -- `/reasoning ` +- `/reasoning ` (stream = Telegram draft only) - `/elevated ` - `/elev ` - `/activation ` diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index c0026884c..2dce46231 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -659,6 +659,10 @@ export async function runEmbeddedPiAgent(params: { }) => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; + onReasoningStream?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; onToolResult?: (payload: { text?: string; mediaUrls?: string[]; @@ -917,9 +921,10 @@ export async function runEmbeddedPiAgent(params: { session, runId: params.runId, verboseLevel: params.verboseLevel, - includeReasoning: params.reasoningLevel === "on", + reasoningMode: params.reasoningLevel ?? "off", shouldEmitToolResult: params.shouldEmitToolResult, onToolResult: params.onToolResult, + onReasoningStream: params.onReasoningStream, onBlockReply: params.onBlockReply, blockReplyBreak: params.blockReplyBreak, blockReplyChunking: params.blockReplyChunking, diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 9c862d759..18c785f6c 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -147,7 +147,7 @@ describe("subscribeEmbeddedPiSession", () => { runId: "run", onBlockReply, blockReplyBreak: "message_end", - includeReasoning: true, + reasoningMode: "on", }); const assistantMessage = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index d5535d2d5..940ca784f 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -1,7 +1,7 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage } from "@mariozechner/pi-ai"; import type { AgentSession } from "@mariozechner/pi-coding-agent"; - +import type { ReasoningLevel } from "../auto-reply/thinking.js"; import { formatToolAggregate } from "../auto-reply/tool-meta.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { createSubsystemLogger } from "../logging.js"; @@ -18,6 +18,8 @@ import { const THINKING_TAG_RE = /<\s*\/?\s*think(?:ing)?\s*>/gi; const THINKING_OPEN_RE = /<\s*think(?:ing)?\s*>/i; const THINKING_CLOSE_RE = /<\s*\/\s*think(?:ing)?\s*>/i; +const THINKING_OPEN_GLOBAL_RE = /<\s*think(?:ing)?\s*>/gi; +const THINKING_CLOSE_GLOBAL_RE = /<\s*\/\s*think(?:ing)?\s*>/gi; const TOOL_RESULT_MAX_CHARS = 8000; const log = createSubsystemLogger("agent/embedded"); @@ -87,12 +89,16 @@ export function subscribeEmbeddedPiSession(params: { session: AgentSession; runId: string; verboseLevel?: "off" | "on"; - includeReasoning?: boolean; + reasoningMode?: ReasoningLevel; shouldEmitToolResult?: () => boolean; onToolResult?: (payload: { text?: string; mediaUrls?: string[]; }) => void | Promise; + onReasoningStream?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; onBlockReply?: (payload: { text?: string; mediaUrls?: string[]; @@ -114,9 +120,15 @@ export function subscribeEmbeddedPiSession(params: { const toolMetaById = new Map(); const toolSummaryById = new Set(); const blockReplyBreak = params.blockReplyBreak ?? "text_end"; + const reasoningMode = params.reasoningMode ?? "off"; + const includeReasoning = reasoningMode === "on"; + const streamReasoning = + reasoningMode === "stream" && + typeof params.onReasoningStream === "function"; let deltaBuffer = ""; let blockBuffer = ""; let lastStreamedAssistant: string | undefined; + let lastStreamedReasoning: string | undefined; let lastBlockReplyText: string | undefined; let assistantTextBaseline = 0; let compactionInFlight = false; @@ -238,6 +250,39 @@ export function subscribeEmbeddedPiSession(params: { return result.trim(); }; + const extractThinkingFromStream = (text: string): string => { + if (!text) return ""; + const closed = extractThinkingFromText(text); + if (closed) return closed; + const openMatches = [...text.matchAll(THINKING_OPEN_GLOBAL_RE)]; + if (openMatches.length === 0) return ""; + const closeMatches = [...text.matchAll(THINKING_CLOSE_GLOBAL_RE)]; + const lastOpen = openMatches[openMatches.length - 1]; + const lastClose = closeMatches[closeMatches.length - 1]; + if (lastClose && (lastClose.index ?? -1) > (lastOpen.index ?? -1)) { + return closed; + } + const start = (lastOpen.index ?? 0) + lastOpen[0].length; + return text.slice(start).trim(); + }; + + const formatReasoningDraft = (text: string): string => { + const trimmed = text.trim(); + if (!trimmed) return ""; + return `Reasoning:\n${trimmed}`; + }; + + const emitReasoningStream = (text: string) => { + if (!streamReasoning || !params.onReasoningStream) return; + const formatted = formatReasoningDraft(text); + if (!formatted) return; + if (formatted === lastStreamedReasoning) return; + lastStreamedReasoning = formatted; + void params.onReasoningStream({ + text: formatted, + }); + }; + const resetForCompactionRetry = () => { assistantTexts.length = 0; toolMetas.length = 0; @@ -247,6 +292,7 @@ export function subscribeEmbeddedPiSession(params: { blockBuffer = ""; blockChunker?.reset(); lastStreamedAssistant = undefined; + lastStreamedReasoning = undefined; lastBlockReplyText = undefined; assistantTextBaseline = 0; }; @@ -266,6 +312,7 @@ export function subscribeEmbeddedPiSession(params: { blockChunker?.reset(); lastStreamedAssistant = undefined; lastBlockReplyText = undefined; + lastStreamedReasoning = undefined; lastReasoningSent = undefined; assistantTextBaseline = assistantTexts.length; } @@ -436,6 +483,11 @@ export function subscribeEmbeddedPiSession(params: { } } + if (streamReasoning) { + // Handle partial tags: stream whatever reasoning is visible so far. + emitReasoningStream(extractThinkingFromStream(deltaBuffer)); + } + const cleaned = params.enforceFinalTag ? stripThinkingSegments(stripUnpairedThinkingTags(deltaBuffer)) : stripThinkingSegments(deltaBuffer); @@ -502,17 +554,19 @@ export function subscribeEmbeddedPiSession(params: { params.enforceFinalTag && cleaned ? (extractFinalText(cleaned)?.trim() ?? cleaned) : cleaned; - const rawThinking = params.includeReasoning - ? extractAssistantThinking(assistantMessage) || - extractThinkingFromText(rawText) - : ""; + const rawThinking = + includeReasoning || streamReasoning + ? extractAssistantThinking(assistantMessage) || + extractThinkingFromText(rawText) + : ""; const formattedReasoning = rawThinking ? formatReasoningMarkdown(rawThinking) : ""; - const text = - baseText && formattedReasoning + const text = includeReasoning + ? baseText && formattedReasoning ? `${formattedReasoning}\n\n${baseText}` - : formattedReasoning || baseText; + : formattedReasoning || baseText + : baseText; const addedDuringMessage = assistantTexts.length > assistantTextBaseline; @@ -550,6 +604,7 @@ export function subscribeEmbeddedPiSession(params: { } const onBlockReply = params.onBlockReply; const shouldEmitReasoningBlock = + includeReasoning && Boolean(formattedReasoning) && Boolean(onBlockReply) && formattedReasoning !== lastReasoningSent && @@ -558,6 +613,9 @@ export function subscribeEmbeddedPiSession(params: { lastReasoningSent = formattedReasoning; void onBlockReply({ text: formattedReasoning }); } + if (streamReasoning && rawThinking) { + emitReasoningStream(rawThinking); + } deltaBuffer = ""; blockBuffer = ""; blockChunker?.reset(); diff --git a/src/auto-reply/reply.directive.test.ts b/src/auto-reply/reply.directive.test.ts index 1a55ba317..9f9105d44 100644 --- a/src/auto-reply/reply.directive.test.ts +++ b/src/auto-reply/reply.directive.test.ts @@ -106,6 +106,12 @@ describe("directive parsing", () => { expect(res.reasoningLevel).toBe("on"); }); + it("matches reasoning stream directive", () => { + const res = extractReasoningDirective("/reasoning stream please"); + expect(res.hasDirective).toBe(true); + expect(res.reasoningLevel).toBe("stream"); + }); + it("matches elevated with leading space", () => { const res = extractElevatedDirective(" please /elevated on now"); expect(res.hasDirective).toBe(true); diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 3dbaf85ed..21f9988c1 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -401,7 +401,8 @@ export async function getReplyFromConfig( agentCfg?.blockStreamingBreak === "message_end" ? "message_end" : "text_end"; - const blockStreamingEnabled = resolvedBlockStreaming === "on"; + const blockStreamingEnabled = + resolvedBlockStreaming === "on" && opts?.disableBlockStreaming !== true; const blockReplyChunking = blockStreamingEnabled ? resolveBlockStreamingChunking(cfg, sessionCtx.Provider) : undefined; diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 602c58f28..cf64530da 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -197,6 +197,9 @@ export async function runReplyAgent(params: { let fallbackProvider = followupRun.run.provider; let fallbackModel = followupRun.run.model; try { + const allowPartialStream = !( + followupRun.run.reasoningLevel === "stream" && opts?.onReasoningStream + ); const fallbackResult = await runWithModelFallback({ cfg: followupRun.run.config, provider: followupRun.run.provider, @@ -227,32 +230,41 @@ export async function runReplyAgent(params: { runId, blockReplyBreak: resolvedBlockStreamingBreak, blockReplyChunking, - onPartialReply: opts?.onPartialReply - ? async (payload) => { - let text = payload.text; - if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { - const stripped = stripHeartbeatToken(text, { - mode: "message", + onPartialReply: + opts?.onPartialReply && allowPartialStream + ? async (payload) => { + let text = payload.text; + if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { + const stripped = stripHeartbeatToken(text, { + mode: "message", + }); + if (stripped.didStrip && !didLogHeartbeatStrip) { + didLogHeartbeatStrip = true; + logVerbose( + "Stripped stray HEARTBEAT_OK token from reply", + ); + } + if ( + stripped.shouldSkip && + (payload.mediaUrls?.length ?? 0) === 0 + ) { + return; + } + text = stripped.text; + } + if (!isHeartbeat) { + await typing.startTypingOnText(text); + } + await opts.onPartialReply?.({ + text, + mediaUrls: payload.mediaUrls, }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose( - "Stripped stray HEARTBEAT_OK token from reply", - ); - } - if ( - stripped.shouldSkip && - (payload.mediaUrls?.length ?? 0) === 0 - ) { - return; - } - text = stripped.text; } - if (!isHeartbeat) { - await typing.startTypingOnText(text); - } - await opts.onPartialReply?.({ - text, + : undefined, + onReasoningStream: opts?.onReasoningStream + ? async (payload) => { + await opts.onReasoningStream?.({ + text: payload.text, mediaUrls: payload.mediaUrls, }); } diff --git a/src/auto-reply/reply/directive-handling.ts b/src/auto-reply/reply/directive-handling.ts index 618dc5741..b811248c3 100644 --- a/src/auto-reply/reply/directive-handling.ts +++ b/src/auto-reply/reply/directive-handling.ts @@ -385,7 +385,7 @@ export async function handleDirectiveOnly(params: { } if (directives.hasReasoningDirective && !directives.reasoningLevel) { return { - text: `Unrecognized reasoning level "${directives.rawReasoningLevel ?? ""}". Valid levels: on, off.`, + text: `Unrecognized reasoning level "${directives.rawReasoningLevel ?? ""}". Valid levels: on, off, stream.`, }; } if (directives.hasElevatedDirective && !directives.elevatedLevel) { @@ -563,7 +563,9 @@ export async function handleDirectiveOnly(params: { parts.push( directives.reasoningLevel === "off" ? `${SYSTEM_MARK} Reasoning visibility disabled.` - : `${SYSTEM_MARK} Reasoning visibility enabled.`, + : directives.reasoningLevel === "stream" + ? `${SYSTEM_MARK} Reasoning stream enabled (Telegram only).` + : `${SYSTEM_MARK} Reasoning visibility enabled.`, ); } if (directives.hasElevatedDirective && directives.elevatedLevel) { diff --git a/src/auto-reply/thinking.test.ts b/src/auto-reply/thinking.test.ts index 5764e1775..72c0007e8 100644 --- a/src/auto-reply/thinking.test.ts +++ b/src/auto-reply/thinking.test.ts @@ -17,4 +17,9 @@ describe("normalizeReasoningLevel", () => { expect(normalizeReasoningLevel("show")).toBe("on"); expect(normalizeReasoningLevel("hide")).toBe("off"); }); + + it("accepts stream", () => { + expect(normalizeReasoningLevel("stream")).toBe("stream"); + expect(normalizeReasoningLevel("streaming")).toBe("stream"); + }); }); diff --git a/src/auto-reply/thinking.ts b/src/auto-reply/thinking.ts index c1e3fb580..1550fde76 100644 --- a/src/auto-reply/thinking.ts +++ b/src/auto-reply/thinking.ts @@ -1,7 +1,7 @@ export type ThinkLevel = "off" | "minimal" | "low" | "medium" | "high"; export type VerboseLevel = "off" | "on"; export type ElevatedLevel = "off" | "on"; -export type ReasoningLevel = "off" | "on"; +export type ReasoningLevel = "off" | "on" | "stream"; // Normalize user-provided thinking level strings to the canonical enum. export function normalizeThinkLevel( @@ -82,5 +82,6 @@ export function normalizeReasoningLevel( ) ) return "on"; + if (["stream", "streaming", "draft", "live"].includes(key)) return "stream"; return undefined; } diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index b76a0a5a1..ba1562740 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -5,8 +5,10 @@ export type GetReplyOptions = { onTypingController?: (typing: TypingController) => void; isHeartbeat?: boolean; onPartialReply?: (payload: ReplyPayload) => Promise | void; + onReasoningStream?: (payload: ReplyPayload) => Promise | void; onBlockReply?: (payload: ReplyPayload) => Promise | void; onToolResult?: (payload: ReplyPayload) => Promise | void; + disableBlockStreaming?: boolean; }; export type ReplyPayload = { diff --git a/src/config/config.test.ts b/src/config/config.test.ts index f69ec69ae..7e375579e 100644 --- a/src/config/config.test.ts +++ b/src/config/config.test.ts @@ -713,6 +713,16 @@ describe("legacy config detection", () => { } }); + it("defaults telegram.streamMode to partial when telegram section exists", async () => { + vi.resetModules(); + const { validateConfigObject } = await import("./config.js"); + const res = validateConfigObject({ telegram: {} }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.config.telegram?.streamMode).toBe("partial"); + } + }); + it('rejects whatsapp.dmPolicy="open" without allowFrom "*"', async () => { vi.resetModules(); const { validateConfigObject } = await import("./config.js"); diff --git a/src/config/schema.ts b/src/config/schema.ts index 1e718cd75..5cf88f528 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -107,6 +107,7 @@ const FIELD_LABELS: Record = { "talk.apiKey": "Talk API Key", "telegram.botToken": "Telegram Bot Token", "telegram.dmPolicy": "Telegram DM Policy", + "telegram.streamMode": "Telegram Stream Mode", "whatsapp.dmPolicy": "WhatsApp DM Policy", "signal.dmPolicy": "Signal DM Policy", "imessage.dmPolicy": "iMessage DM Policy", @@ -155,6 +156,8 @@ const FIELD_HELP: Record = { 'When to send ack reactions ("group-mentions", "group-all", "direct", "all").', "telegram.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires telegram.allowFrom=["*"].', + "telegram.streamMode": + "Draft streaming mode for Telegram replies (off | partial | block). Requires private topics + sendMessageDraft.", "whatsapp.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires whatsapp.allowFrom=["*"].', "signal.dmPolicy": diff --git a/src/config/types.ts b/src/config/types.ts index 9f8526420..474cb9b1f 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -270,6 +270,8 @@ export type TelegramConfig = { groupPolicy?: GroupPolicy; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; + /** Draft streaming mode for Telegram (off|partial|block). Default: partial. */ + streamMode?: "off" | "partial" | "block"; mediaMaxMb?: number; proxy?: string; webhookUrl?: string; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index be3a70fe3..761a43e3d 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -793,6 +793,10 @@ export const ClawdbotSchema = z.object({ groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), textChunkLimit: z.number().int().positive().optional(), + streamMode: z + .enum(["off", "partial", "block"]) + .optional() + .default("partial"), mediaMaxMb: z.number().positive().optional(), proxy: z.string().optional(), webhookUrl: z.string().optional(), diff --git a/src/gateway/server-bridge.ts b/src/gateway/server-bridge.ts index d772a88a5..cc18f8e3e 100644 --- a/src/gateway/server-bridge.ts +++ b/src/gateway/server-bridge.ts @@ -446,7 +446,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { ok: false, error: { code: ErrorCodes.INVALID_REQUEST, - message: `invalid reasoningLevel: ${String(raw)}`, + message: `invalid reasoningLevel: ${String(raw)} (use on|off|stream)`, }, }; } diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index a209fb396..7da991553 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -224,7 +224,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { undefined, errorShape( ErrorCodes.INVALID_REQUEST, - 'invalid reasoningLevel (use "on"|"off")', + 'invalid reasoningLevel (use "on"|"off"|"stream")', ), ); return; diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index f181021a0..63ae862d9 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -4,6 +4,7 @@ import { Buffer } from "node:buffer"; import { apiThrottler } from "@grammyjs/transformer-throttler"; import type { ApiClientOptions, Message } from "grammy"; import { Bot, InputFile, webhookCallback } from "grammy"; +import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; import { chunkMarkdownText, resolveTextChunkLimit, @@ -14,6 +15,7 @@ import { listNativeCommandSpecs, } from "../auto-reply/commands-registry.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { resolveBlockStreamingChunking } from "../auto-reply/reply/block-streaming.js"; import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; import { buildMentionRegexes, @@ -43,6 +45,7 @@ import { import { resolveAgentRoute } from "../routing/resolve-route.js"; import type { RuntimeEnv } from "../runtime.js"; import { loadWebMedia } from "../web/media.js"; +import { createTelegramDraftStream } from "./draft-stream.js"; import { readTelegramAllowFromStore, upsertTelegramPairingRequest, @@ -57,6 +60,8 @@ const MEDIA_GROUP_TIMEOUT_MS = 500; type TelegramMessage = Message.CommonMessage; +type TelegramStreamMode = "off" | "partial" | "block"; + type MediaGroupEntry = { messages: Array<{ msg: TelegramMessage; @@ -164,6 +169,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { ); }; const replyToMode = opts.replyToMode ?? cfg.telegram?.replyToMode ?? "off"; + const streamMode = resolveTelegramStreamMode(cfg); const nativeEnabled = cfg.commands?.native === true; const nativeDisabledExplicit = cfg.commands?.native === false; const useAccessGroups = cfg.commands?.useAccessGroups !== false; @@ -173,6 +179,23 @@ export function createTelegramBot(opts: TelegramBotOptions) { (opts.mediaMaxMb ?? cfg.telegram?.mediaMaxMb ?? 5) * 1024 * 1024; const logger = getChildLogger({ module: "telegram-auto-reply" }); const mentionRegexes = buildMentionRegexes(cfg); + let botHasTopicsEnabled: boolean | undefined; + const resolveBotTopicsEnabled = async (ctx?: TelegramContext) => { + const fromCtx = ctx?.me as { has_topics_enabled?: boolean } | undefined; + if (typeof fromCtx?.has_topics_enabled === "boolean") { + botHasTopicsEnabled = fromCtx.has_topics_enabled; + return botHasTopicsEnabled; + } + if (typeof botHasTopicsEnabled === "boolean") return botHasTopicsEnabled; + try { + const me = (await bot.api.getMe()) as { has_topics_enabled?: boolean }; + botHasTopicsEnabled = Boolean(me?.has_topics_enabled); + } catch (err) { + logVerbose(`telegram getMe failed: ${String(err)}`); + botHasTopicsEnabled = false; + } + return botHasTopicsEnabled; + }; const resolveGroupPolicy = (chatId: string | number) => resolveProviderGroupPolicy({ cfg, @@ -397,7 +420,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { kind: isGroup ? "group" : "dm", id: isGroup ? buildTelegramGroupPeerId(chatId, messageThreadId) - : String(chatId), + : buildTelegramDmPeerId(chatId, messageThreadId), }, }); const ctxPayload = { @@ -471,10 +494,88 @@ export function createTelegramBot(opts: TelegramBotOptions) { ); } + const isPrivateChat = msg.chat.type === "private"; + const draftMaxChars = Math.min(textLimit, 4096); + const canStreamDraft = + streamMode !== "off" && + isPrivateChat && + typeof messageThreadId === "number" && + (await resolveBotTopicsEnabled(primaryCtx)); + const draftStream = canStreamDraft + ? createTelegramDraftStream({ + api: bot.api, + chatId, + draftId: msg.message_id || Date.now(), + maxChars: draftMaxChars, + messageThreadId, + log: logVerbose, + warn: logVerbose, + }) + : undefined; + const draftChunking = + draftStream && streamMode === "block" + ? resolveBlockStreamingChunking(cfg, "telegram") + : undefined; + const draftChunker = draftChunking + ? new EmbeddedBlockChunker(draftChunking) + : undefined; + let lastPartialText = ""; + let draftText = ""; + const updateDraftFromPartial = (text?: string) => { + if (!draftStream || !text) return; + if (text === lastPartialText) return; + if (streamMode === "partial") { + lastPartialText = text; + draftStream.update(text); + return; + } + let delta = text; + if (text.startsWith(lastPartialText)) { + delta = text.slice(lastPartialText.length); + } else { + // Streaming buffer reset (or non-monotonic stream). Start fresh. + draftChunker?.reset(); + draftText = ""; + } + lastPartialText = text; + if (!delta) return; + if (!draftChunker) { + draftText = text; + draftStream.update(draftText); + return; + } + draftChunker.append(delta); + draftChunker.drain({ + force: false, + emit: (chunk) => { + draftText += chunk; + draftStream.update(draftText); + }, + }); + }; + const flushDraft = async () => { + if (!draftStream) return; + if (draftChunker?.hasBuffered()) { + draftChunker.drain({ + force: true, + emit: (chunk) => { + draftText += chunk; + }, + }); + draftChunker.reset(); + if (draftText) draftStream.update(draftText); + } + await draftStream.flush(); + }; + const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ responsePrefix: cfg.messages?.responsePrefix, - deliver: async (payload) => { + deliver: async (payload, info) => { + if (info.kind === "final") { + await flushDraft(); + draftStream?.stop(); + } await deliverReplies({ replies: [payload], chatId: String(chatId), @@ -498,9 +599,21 @@ export function createTelegramBot(opts: TelegramBotOptions) { ctx: ctxPayload, cfg, dispatcher, - replyOptions, + replyOptions: { + ...replyOptions, + onPartialReply: draftStream + ? (payload) => updateDraftFromPartial(payload.text) + : undefined, + onReasoningStream: draftStream + ? (payload) => { + if (payload.text) draftStream.update(payload.text); + } + : undefined, + disableBlockStreaming: Boolean(draftStream), + }, }); markDispatchIdle(); + draftStream?.stop(); if (!queuedFinal) return; }; @@ -602,7 +715,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { kind: isGroup ? "group" : "dm", id: isGroup ? buildTelegramGroupPeerId(chatId, messageThreadId) - : String(chatId), + : buildTelegramDmPeerId(chatId, messageThreadId), }, }); const ctxPayload = { @@ -925,6 +1038,14 @@ function buildTelegramThreadParams(messageThreadId?: number) { : undefined; } +function resolveTelegramStreamMode( + cfg: ReturnType, +): TelegramStreamMode { + const raw = cfg.telegram?.streamMode?.trim().toLowerCase(); + if (raw === "off" || raw === "partial" || raw === "block") return raw; + return "partial"; +} + function buildTelegramGroupPeerId( chatId: number | string, messageThreadId?: number, @@ -934,6 +1055,15 @@ function buildTelegramGroupPeerId( : String(chatId); } +function buildTelegramDmPeerId( + chatId: number | string, + messageThreadId?: number, +) { + return messageThreadId != null + ? `${chatId}:topic:${messageThreadId}` + : String(chatId); +} + function buildTelegramGroupFrom( chatId: number | string, messageThreadId?: number, diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts new file mode 100644 index 000000000..aba3b3e25 --- /dev/null +++ b/src/telegram/draft-stream.ts @@ -0,0 +1,130 @@ +import type { Bot } from "grammy"; + +const TELEGRAM_DRAFT_MAX_CHARS = 4096; +const DEFAULT_THROTTLE_MS = 300; + +export type TelegramDraftStream = { + update: (text: string) => void; + flush: () => Promise; + stop: () => void; +}; + +export function createTelegramDraftStream(params: { + api: Bot["api"]; + chatId: number; + draftId: number; + maxChars?: number; + messageThreadId?: number; + throttleMs?: number; + log?: (message: string) => void; + warn?: (message: string) => void; +}): TelegramDraftStream { + const maxChars = Math.min( + params.maxChars ?? TELEGRAM_DRAFT_MAX_CHARS, + TELEGRAM_DRAFT_MAX_CHARS, + ); + const throttleMs = Math.max(50, params.throttleMs ?? DEFAULT_THROTTLE_MS); + const rawDraftId = Number.isFinite(params.draftId) + ? Math.trunc(params.draftId) + : 1; + const draftId = rawDraftId === 0 ? 1 : Math.abs(rawDraftId); + const chatId = params.chatId; + const threadParams = + typeof params.messageThreadId === "number" + ? { message_thread_id: Math.trunc(params.messageThreadId) } + : undefined; + + let lastSentText = ""; + let lastSentAt = 0; + let pendingText = ""; + let inFlight = false; + let timer: ReturnType | undefined; + let stopped = false; + + const sendDraft = async (text: string) => { + if (stopped) return; + const trimmed = text.trimEnd(); + if (!trimmed) return; + if (trimmed.length > maxChars) { + // Drafts are capped at 4096 chars. Stop streaming once we exceed the cap + // so we don't keep sending failing updates or a truncated preview. + stopped = true; + params.warn?.( + `telegram draft stream stopped (draft length ${trimmed.length} > ${maxChars})`, + ); + return; + } + if (trimmed === lastSentText) return; + lastSentText = trimmed; + lastSentAt = Date.now(); + try { + await params.api.sendMessageDraft(chatId, draftId, trimmed, threadParams); + } catch (err) { + stopped = true; + params.warn?.( + `telegram draft stream failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } + }; + + const flush = async () => { + if (timer) { + clearTimeout(timer); + timer = undefined; + } + if (inFlight) { + schedule(); + return; + } + const text = pendingText; + pendingText = ""; + if (!text.trim()) { + if (pendingText) schedule(); + return; + } + inFlight = true; + try { + await sendDraft(text); + } finally { + inFlight = false; + } + if (pendingText) schedule(); + }; + + const schedule = () => { + if (timer) return; + const delay = Math.max(0, throttleMs - (Date.now() - lastSentAt)); + timer = setTimeout(() => { + void flush(); + }, delay); + }; + + const update = (text: string) => { + if (stopped) return; + pendingText = text; + if (inFlight) { + schedule(); + return; + } + if (!timer && Date.now() - lastSentAt >= throttleMs) { + void flush(); + return; + } + schedule(); + }; + + const stop = () => { + stopped = true; + pendingText = ""; + if (timer) { + clearTimeout(timer); + timer = undefined; + } + }; + + params.log?.( + `telegram draft stream ready (draftId=${draftId}, maxChars=${maxChars}, throttleMs=${throttleMs})`, + ); + + return { update, flush, stop }; +}