From 9616f4b2b10bebf081f5984ee74837c4db25bcf4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 3 Jan 2026 00:28:33 +0100 Subject: [PATCH] feat: stream reply blocks immediately --- CHANGELOG.md | 2 + docs/agent.md | 2 + docs/configuration.md | 5 ++ src/agents/pi-embedded-runner.ts | 5 ++ src/agents/pi-embedded-subscribe.test.ts | 31 ++++++++++ src/agents/pi-embedded-subscribe.ts | 13 ++++ src/auto-reply/reply.ts | 75 +++++++++++++++++++++++- src/auto-reply/types.ts | 1 + src/config/config.ts | 5 ++ src/discord/monitor.ts | 50 +++++++++++++++- src/imessage/monitor.ts | 33 ++++++++++- src/signal/monitor.ts | 34 ++++++++++- src/telegram/bot.ts | 30 +++++++++- src/web/auto-reply.ts | 45 ++++++++++++++ 14 files changed, 323 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3870dd581..ed9e17b43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,9 +50,11 @@ - Tests: add a Docker-based onboarding E2E harness. - Tests: harden wizard E2E flows for reset, providers, skills, and remote non-interactive runs. - Browser tools: add remote CDP URL support, Linux launcher options (`executablePath`, `noSandbox`), and surface `cdpUrl` in status. +- Skills: add tmux-first coding-agent skill + `requires.anyBins` gate for multi-CLI setup (thanks @sreekaransrinath). ### Fixes - Chat UI: keep the chat scrolled to the latest message after switching sessions. +- Auto-reply: stream completed reply blocks as soon as they finish (configurable default); skip empty tool-only blocks unless verbose. - CLI onboarding: persist gateway token in config so local CLI auth works; recommend auth Off unless you need multi-machine access. - Control UI: accept a `?token=` URL param to auto-fill Gateway auth; onboarding now opens the dashboard with token auth when configured. - Agent prompt: remove hardcoded user name in system prompt example. diff --git a/docs/agent.md b/docs/agent.md index 01cb830c2..be6d1d7d6 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -71,6 +71,8 @@ Legacy Pi/Tau session folders are **not** read. ## Steering while streaming Incoming user messages are queued while the agent is streaming. The queue is checked **after each tool call**. If a queued message is present, remaining tool calls from the current assistant message are skipped (error tool results with "Skipped due to queued user message."), then the queued user message is injected before the next assistant response. +Block streaming sends completed assistant blocks as soon as they finish; disable +via `agent.blockStreamingDefault: "off"` if you only want the final response. ## Configuration (minimal) diff --git a/docs/configuration.md b/docs/configuration.md index 9e08617eb..04bcc1e3e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -330,6 +330,7 @@ Controls the embedded agent runtime (model/thinking/verbose/timeouts). }, thinkingDefault: "low", verboseDefault: "off", + blockStreamingDefault: "on", timeoutSeconds: 600, mediaMaxMb: 5, heartbeat: { @@ -354,6 +355,10 @@ deprecation fallback. Z.AI models are available as `zai/` (e.g. `zai/glm-4.7`) and require `ZAI_API_KEY` (or legacy `Z_AI_API_KEY`) in the environment. +`agent.blockStreamingDefault` controls whether completed assistant blocks +(`message_end` chunks) are sent immediately (default: `on`). Set to `off` to +only deliver the final consolidated reply. + `agent.heartbeat` configures periodic heartbeat runs: - `every`: duration string (`ms`, `s`, `m`, `h`); default unit minutes. Omit or set `0m` to disable. diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index d7774cfbe..bef307dbf 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -327,6 +327,10 @@ export async function runEmbeddedPiAgent(params: { text?: string; mediaUrls?: string[]; }) => void | Promise; + onBlockReply?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; onToolResult?: (payload: { text?: string; mediaUrls?: string[]; @@ -489,6 +493,7 @@ export async function runEmbeddedPiAgent(params: { verboseLevel: params.verboseLevel, shouldEmitToolResult: params.shouldEmitToolResult, onToolResult: params.onToolResult, + onBlockReply: params.onBlockReply, onPartialReply: params.onPartialReply, onAgentEvent: params.onAgentEvent, enforceFinalTag: params.enforceFinalTag, diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 30ccd5adb..46783f39a 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -97,6 +97,37 @@ describe("subscribeEmbeddedPiSession", () => { expect(payload.text).toBe("Hello world"); }); + it("emits block replies on message_end", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + onBlockReply, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "Hello block" }], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + + expect(onBlockReply).toHaveBeenCalled(); + const payload = onBlockReply.mock.calls[0][0]; + expect(payload.text).toBe("Hello block"); + }); + it("waits for auto-compaction retry and clears buffered text", async () => { const listeners: SessionEventHandler[] = []; const session = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 421e49454..aa9302131 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -57,6 +57,10 @@ export function subscribeEmbeddedPiSession(params: { text?: string; mediaUrls?: string[]; }) => void | Promise; + onBlockReply?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; onPartialReply?: (payload: { text?: string; mediaUrls?: string[]; @@ -314,6 +318,15 @@ export function subscribeEmbeddedPiSession(params: { ? (extractFinalText(cleaned)?.trim() ?? cleaned) : cleaned; if (text) assistantTexts.push(text); + if (text && params.onBlockReply) { + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); + if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { + void params.onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + } + } deltaBuffer = ""; } } diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index f6d8c5703..87f5d6c69 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -599,6 +599,24 @@ export async function getReplyFromConfig( inlineVerbose ?? (sessionEntry?.verboseLevel as VerboseLevel | undefined) ?? (agentCfg?.verboseDefault as VerboseLevel | undefined); + const resolvedBlockStreaming = + agentCfg?.blockStreamingDefault === "off" ? "off" : "on"; + const blockStreamingEnabled = resolvedBlockStreaming === "on"; + const streamedPayloadKeys = new Set(); + const pendingBlockTasks = new Set>(); + const buildPayloadKey = (payload: ReplyPayload) => { + const text = payload.text?.trim() ?? ""; + const mediaList = payload.mediaUrls?.length + ? payload.mediaUrls + : payload.mediaUrl + ? [payload.mediaUrl] + : []; + return JSON.stringify({ + text, + mediaList, + replyToId: payload.replyToId ?? null, + }); + }; const shouldEmitToolResult = () => { if (!sessionKey || !storePath) { return resolvedVerboseLevel === "on"; @@ -1371,6 +1389,48 @@ export async function getReplyFromConfig( }); } : undefined, + onBlockReply: + blockStreamingEnabled && opts?.onBlockReply + ? async (payload) => { + let text = payload.text; + if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) { + const stripped = stripHeartbeatToken(text, { + mode: "message", + }); + if (stripped.didStrip && !didLogHeartbeatStrip) { + didLogHeartbeatStrip = true; + logVerbose("Stripped stray HEARTBEAT_OK token from reply"); + } + const hasMedia = (payload.mediaUrls?.length ?? 0) > 0; + if (stripped.shouldSkip && !hasMedia) return; + text = stripped.text; + } + const tagResult = extractReplyToTag( + text, + sessionCtx.MessageSid, + ); + const cleaned = tagResult.cleaned || undefined; + const hasMedia = (payload.mediaUrls?.length ?? 0) > 0; + if (!cleaned && !hasMedia) return; + if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia) return; + await startTypingOnText(cleaned); + const blockPayload: ReplyPayload = { + text: cleaned, + mediaUrls: payload.mediaUrls, + mediaUrl: payload.mediaUrls?.[0], + replyToId: tagResult.replyToId, + }; + const task = Promise.resolve(opts.onBlockReply?.(blockPayload)) + .then(() => { + streamedPayloadKeys.add(buildPayloadKey(blockPayload)); + }) + .catch((err) => { + logVerbose(`block reply delivery failed: ${String(err)}`); + }); + pendingBlockTasks.add(task); + void task.finally(() => pendingBlockTasks.delete(task)); + } + : undefined, shouldEmitToolResult, onToolResult: opts?.onToolResult ? async (payload) => { @@ -1421,6 +1481,9 @@ export async function getReplyFromConfig( const payloadArray = runResult.payloads ?? []; if (payloadArray.length === 0) return undefined; + if (pendingBlockTasks.size > 0) { + await Promise.allSettled(pendingBlockTasks); + } const sanitizedPayloads = opts?.isHeartbeat ? payloadArray @@ -1457,9 +1520,15 @@ export async function getReplyFromConfig( (payload.mediaUrls && payload.mediaUrls.length > 0), ); - if (replyTaggedPayloads.length === 0) return undefined; + const filteredPayloads = blockStreamingEnabled + ? replyTaggedPayloads.filter( + (payload) => !streamedPayloadKeys.has(buildPayloadKey(payload)), + ) + : replyTaggedPayloads; - const shouldSignalTyping = replyTaggedPayloads.some((payload) => { + if (filteredPayloads.length === 0) return undefined; + + const shouldSignalTyping = filteredPayloads.some((payload) => { const trimmed = payload.text?.trim(); if (trimmed && trimmed !== SILENT_REPLY_TOKEN) return true; if (payload.mediaUrl) return true; @@ -1514,7 +1583,7 @@ export async function getReplyFromConfig( } // If verbose is enabled and this is a new session, prepend a session hint. - let finalPayloads = replyTaggedPayloads; + let finalPayloads = filteredPayloads; if (resolvedVerboseLevel === "on" && isNewSession) { finalPayloads = [ { text: `🧭 New session: ${sessionIdFinal}` }, diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 0bfe335fd..3ab927358 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -2,6 +2,7 @@ export type GetReplyOptions = { onReplyStart?: () => Promise | void; isHeartbeat?: boolean; onPartialReply?: (payload: ReplyPayload) => Promise | void; + onBlockReply?: (payload: ReplyPayload) => Promise | void; onToolResult?: (payload: ReplyPayload) => Promise | void; }; diff --git a/src/config/config.ts b/src/config/config.ts index ab5189748..6bf2c4526 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -524,6 +524,8 @@ export type ClawdisConfig = { thinkingDefault?: "off" | "minimal" | "low" | "medium" | "high"; /** Default verbose level when no /verbose directive is present. */ verboseDefault?: "off" | "on"; + /** Default block streaming level when no override is present. */ + blockStreamingDefault?: "off" | "on"; timeoutSeconds?: number; /** Max inbound media size in MB for agent-visible attachments (text note or future image attach). */ mediaMaxMb?: number; @@ -901,6 +903,9 @@ const ClawdisSchema = z.object({ ]) .optional(), verboseDefault: z.union([z.literal("off"), z.literal("on")]).optional(), + blockStreamingDefault: z + .union([z.literal("off"), z.literal("on")]) + .optional(), timeoutSeconds: z.number().int().positive().optional(), mediaMaxMb: z.number().positive().optional(), typingIntervalSeconds: z.number().int().positive().optional(), diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 3e37e9889..1fb042d22 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -415,10 +415,39 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { ); } + let didSendReply = false; + let blockSendChain: Promise = Promise.resolve(); + const sendBlockReply = (payload: ReplyPayload) => { + if ( + !payload?.text && + !payload?.mediaUrl && + !(payload?.mediaUrls?.length ?? 0) + ) { + return; + } + blockSendChain = blockSendChain + .then(async () => { + await deliverReplies({ + replies: [payload], + target: ctxPayload.To, + token, + runtime, + replyToMode, + }); + didSendReply = true; + }) + .catch((err) => { + runtime.error?.( + danger(`discord block reply failed: ${String(err)}`), + ); + }); + }; + const replyResult = await getReplyFromConfig( ctxPayload, { onReplyStart: () => sendTyping(message), + onBlockReply: sendBlockReply, }, cfg, ); @@ -427,7 +456,18 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { ? replyResult : [replyResult] : []; - if (replies.length === 0) return; + await blockSendChain; + if (replies.length === 0) { + if ( + isGuildMessage && + shouldClearHistory && + historyLimit > 0 && + didSendReply + ) { + guildHistories.set(message.channelId, []); + } + return; + } await deliverReplies({ replies, @@ -436,12 +476,18 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { runtime, replyToMode, }); + didSendReply = true; if (isVerbose()) { logVerbose( `discord: delivered ${replies.length} reply${replies.length === 1 ? "" : "ies"} to ${ctxPayload.To}`, ); } - if (isGuildMessage && shouldClearHistory && historyLimit > 0) { + if ( + isGuildMessage && + shouldClearHistory && + historyLimit > 0 && + didSendReply + ) { guildHistories.set(message.channelId, []); } } catch (err) { diff --git a/src/imessage/monitor.ts b/src/imessage/monitor.ts index cc02800b4..96e4c8315 100644 --- a/src/imessage/monitor.ts +++ b/src/imessage/monitor.ts @@ -257,12 +257,43 @@ export async function monitorIMessageProvider( ); } - const replyResult = await getReplyFromConfig(ctxPayload, undefined, cfg); + let blockSendChain: Promise = Promise.resolve(); + const sendBlockReply = (payload: ReplyPayload) => { + if ( + !payload?.text && + !payload?.mediaUrl && + !(payload?.mediaUrls?.length ?? 0) + ) { + return; + } + blockSendChain = blockSendChain + .then(async () => { + await deliverReplies({ + replies: [payload], + target: ctxPayload.To, + client, + runtime, + maxBytes: mediaMaxBytes, + }); + }) + .catch((err) => { + runtime.error?.( + danger(`imessage block reply failed: ${String(err)}`), + ); + }); + }; + + const replyResult = await getReplyFromConfig( + ctxPayload, + { onBlockReply: sendBlockReply }, + cfg, + ); const replies = replyResult ? Array.isArray(replyResult) ? replyResult : [replyResult] : []; + await blockSendChain; if (replies.length === 0) return; await deliverReplies({ diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index 2c1e52902..1f6d443ae 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -373,12 +373,44 @@ export async function monitorSignalProvider( ); } - const replyResult = await getReplyFromConfig(ctxPayload, undefined, cfg); + let blockSendChain: Promise = Promise.resolve(); + const sendBlockReply = (payload: ReplyPayload) => { + if ( + !payload?.text && + !payload?.mediaUrl && + !(payload?.mediaUrls?.length ?? 0) + ) { + return; + } + blockSendChain = blockSendChain + .then(async () => { + await deliverReplies({ + replies: [payload], + target: ctxPayload.To, + baseUrl, + account, + runtime, + maxBytes: mediaMaxBytes, + }); + }) + .catch((err) => { + runtime.error?.( + danger(`signal block reply failed: ${String(err)}`), + ); + }); + }; + + const replyResult = await getReplyFromConfig( + ctxPayload, + { onBlockReply: sendBlockReply }, + cfg, + ); const replies = replyResult ? Array.isArray(replyResult) ? replyResult : [replyResult] : []; + await blockSendChain; if (replies.length === 0) return; await deliverReplies({ diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index e842ef172..7d88334f9 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -198,9 +198,36 @@ export function createTelegramBot(opts: TelegramBotOptions) { ); } + let blockSendChain: Promise = Promise.resolve(); + const sendBlockReply = (payload: ReplyPayload) => { + if ( + !payload?.text && + !payload?.mediaUrl && + !(payload?.mediaUrls?.length ?? 0) + ) { + return; + } + blockSendChain = blockSendChain + .then(async () => { + await deliverReplies({ + replies: [payload], + chatId: String(chatId), + token: opts.token, + runtime, + bot, + replyToMode, + }); + }) + .catch((err) => { + runtime.error?.( + danger(`telegram block reply failed: ${String(err)}`), + ); + }); + }; + const replyResult = await getReplyFromConfig( ctxPayload, - { onReplyStart: sendTyping }, + { onReplyStart: sendTyping, onBlockReply: sendBlockReply }, cfg, ); const replies = replyResult @@ -208,6 +235,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { ? replyResult : [replyResult] : []; + await blockSendChain; if (replies.length === 0) return; await deliverReplies({ diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 236154621..67be507a6 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1110,6 +1110,50 @@ export async function monitorWebProvider( ); }); }; + const sendBlockReply = (payload: ReplyPayload) => { + if ( + !payload?.text && + !payload?.mediaUrl && + !(payload?.mediaUrls?.length ?? 0) + ) { + return; + } + if (isSilentReply(payload)) return; + const blockPayload: ReplyPayload = { ...payload }; + if ( + responsePrefix && + blockPayload.text && + blockPayload.text.trim() !== HEARTBEAT_TOKEN && + !blockPayload.text.startsWith(responsePrefix) + ) { + blockPayload.text = `${responsePrefix} ${blockPayload.text}`; + } + toolSendChain = toolSendChain + .then(async () => { + await deliverWebReply({ + replyResult: blockPayload, + msg, + maxMediaBytes, + replyLogger, + connectionId, + skipLog: true, + }); + didSendReply = true; + if (blockPayload.text) { + recentlySent.add(blockPayload.text); + recentlySent.add(combinedBody); + if (recentlySent.size > MAX_RECENT_MESSAGES) { + const firstKey = recentlySent.values().next().value; + if (firstKey) recentlySent.delete(firstKey); + } + } + }) + .catch((err) => { + whatsappOutboundLog.error( + `Failed sending web block update to ${msg.from ?? conversationId}: ${formatError(err)}`, + ); + }); + }; const replyResult = await (replyResolver ?? getReplyFromConfig)( { @@ -1138,6 +1182,7 @@ export async function monitorWebProvider( { onReplyStart: msg.sendComposing, onToolResult: sendToolResult, + onBlockReply: sendBlockReply, }, );