From 2d28fa34f5958012d55014c4a0563b66df05a113 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 3 Jan 2026 00:52:02 +0100 Subject: [PATCH] feat: make block streaming break configurable --- CHANGELOG.md | 2 +- docs/agent.md | 1 + docs/configuration.md | 11 ++- src/agents/pi-embedded-runner.ts | 2 + src/agents/pi-embedded-subscribe.test.ts | 54 +++++++++++++++ src/agents/pi-embedded-subscribe.ts | 85 ++++++++++++++++-------- src/auto-reply/reply.ts | 5 ++ src/config/config.ts | 9 +++ 8 files changed, 138 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d4adc782..e88e5f1f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,7 +54,7 @@ ### Fixes - Chat UI: keep the chat scrolled to the latest message after switching sessions. -- Auto-reply: stream completed reply blocks as soon as they finish (configurable default); skip empty tool-only blocks unless verbose. +- Auto-reply: stream completed reply blocks as soon as they finish (configurable default + break); skip empty tool-only blocks unless verbose. - CLI onboarding: persist gateway token in config so local CLI auth works; recommend auth Off unless you need multi-machine access. - Control UI: accept a `?token=` URL param to auto-fill Gateway auth; onboarding now opens the dashboard with token auth when configured. - Agent prompt: remove hardcoded user name in system prompt example. diff --git a/docs/agent.md b/docs/agent.md index be6d1d7d6..1c4525fb9 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -73,6 +73,7 @@ Legacy Pi/Tau session folders are **not** read. Incoming user messages are queued while the agent is streaming. The queue is checked **after each tool call**. If a queued message is present, remaining tool calls from the current assistant message are skipped (error tool results with "Skipped due to queued user message."), then the queued user message is injected before the next assistant response. Block streaming sends completed assistant blocks as soon as they finish; disable via `agent.blockStreamingDefault: "off"` if you only want the final response. +Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`). ## Configuration (minimal) diff --git a/docs/configuration.md b/docs/configuration.md index 04bcc1e3e..2c1506838 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -331,6 +331,7 @@ Controls the embedded agent runtime (model/thinking/verbose/timeouts). thinkingDefault: "low", verboseDefault: "off", blockStreamingDefault: "on", + blockStreamingBreak: "text_end", timeoutSeconds: 600, mediaMaxMb: 5, heartbeat: { @@ -355,9 +356,13 @@ deprecation fallback. Z.AI models are available as `zai/` (e.g. `zai/glm-4.7`) and require `ZAI_API_KEY` (or legacy `Z_AI_API_KEY`) in the environment. -`agent.blockStreamingDefault` controls whether completed assistant blocks -(`message_end` chunks) are sent immediately (default: `on`). Set to `off` to -only deliver the final consolidated reply. +`agent.blockStreamingDefault` controls whether completed assistant blocks are +sent immediately (default: `on`). Set to `off` to only deliver the final +consolidated reply. + +`agent.blockStreamingBreak` controls what “block” means: +- `text_end` (default): end of each assistant text content block (before tool calls) +- `message_end`: end of the whole assistant message (may wait across tools) `agent.heartbeat` configures periodic heartbeat runs: - `every`: duration string (`ms`, `s`, `m`, `h`); default unit minutes. Omit or set diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index bef307dbf..9915720ff 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -331,6 +331,7 @@ export async function runEmbeddedPiAgent(params: { text?: string; mediaUrls?: string[]; }) => void | Promise; + blockReplyBreak?: "text_end" | "message_end"; onToolResult?: (payload: { text?: string; mediaUrls?: string[]; @@ -494,6 +495,7 @@ export async function runEmbeddedPiAgent(params: { shouldEmitToolResult: params.shouldEmitToolResult, onToolResult: params.onToolResult, onBlockReply: params.onBlockReply, + blockReplyBreak: params.blockReplyBreak, onPartialReply: params.onPartialReply, onAgentEvent: params.onAgentEvent, enforceFinalTag: params.enforceFinalTag, diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 46783f39a..4b16f5fcb 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -114,6 +114,7 @@ describe("subscribeEmbeddedPiSession", () => { >[0]["session"], runId: "run", onBlockReply, + blockReplyBreak: "message_end", }); const assistantMessage = { @@ -128,6 +129,59 @@ describe("subscribeEmbeddedPiSession", () => { expect(payload.text).toBe("Hello block"); }); + it("emits block replies on text_end and does not duplicate on message_end", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + const subscription = subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "Hello block", + }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_end", + }, + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + const payload = onBlockReply.mock.calls[0][0]; + expect(payload.text).toBe("Hello block"); + expect(subscription.assistantTexts).toEqual(["Hello block"]); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "Hello block" }], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(subscription.assistantTexts).toEqual(["Hello block"]); + }); + it("waits for auto-compaction retry and clears buffered text", async () => { const listeners: SessionEventHandler[] = []; const session = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index aa9302131..646dc96c2 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -61,6 +61,7 @@ export function subscribeEmbeddedPiSession(params: { text?: string; mediaUrls?: string[]; }) => void | Promise; + blockReplyBreak?: "text_end" | "message_end"; onPartialReply?: (payload: { text?: string; mediaUrls?: string[]; @@ -74,8 +75,10 @@ export function subscribeEmbeddedPiSession(params: { const assistantTexts: string[] = []; const toolMetas: Array<{ toolName?: string; meta?: string }> = []; const toolMetaById = new Map(); + const blockReplyBreak = params.blockReplyBreak ?? "text_end"; let deltaBuffer = ""; let lastStreamedAssistant: string | undefined; + let assistantTextBaseline = 0; let compactionInFlight = false; let pendingCompactionRetry = 0; let compactionRetryResolve: (() => void) | undefined; @@ -149,6 +152,7 @@ export function subscribeEmbeddedPiSession(params: { toolMetaById.clear(); deltaBuffer = ""; lastStreamedAssistant = undefined; + assistantTextBaseline = 0; toolDebouncer.flush(); }; @@ -264,38 +268,55 @@ export function subscribeEmbeddedPiSession(params: { : ""; if (chunk) { deltaBuffer += chunk; - const cleaned = params.enforceFinalTag - ? stripThinkingSegments(stripUnpairedThinkingTags(deltaBuffer)) - : stripThinkingSegments(deltaBuffer); - const next = params.enforceFinalTag - ? (extractFinalText(cleaned)?.trim() ?? cleaned.trim()) - : cleaned.trim(); - if (next && next !== lastStreamedAssistant) { - lastStreamedAssistant = next; + } + + const cleaned = params.enforceFinalTag + ? stripThinkingSegments(stripUnpairedThinkingTags(deltaBuffer)) + : stripThinkingSegments(deltaBuffer); + const next = params.enforceFinalTag + ? (extractFinalText(cleaned)?.trim() ?? cleaned.trim()) + : cleaned.trim(); + if (next && next !== lastStreamedAssistant) { + lastStreamedAssistant = next; + const { text: cleanedText, mediaUrls } = + splitMediaFromOutput(next); + emitAgentEvent({ + runId: params.runId, + stream: "assistant", + data: { + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, + }); + params.onAgentEvent?.({ + stream: "assistant", + data: { + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, + }); + if (params.onPartialReply) { + void params.onPartialReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + } + } + + if (evtType === "text_end" && blockReplyBreak === "text_end") { + if (next) assistantTexts.push(next); + if (next && params.onBlockReply) { const { text: cleanedText, mediaUrls } = splitMediaFromOutput(next); - emitAgentEvent({ - runId: params.runId, - stream: "assistant", - data: { - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - params.onAgentEvent?.({ - stream: "assistant", - data: { - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - if (params.onPartialReply) { - void params.onPartialReply({ + if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { + void params.onBlockReply({ text: cleanedText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, }); } } + deltaBuffer = ""; + lastStreamedAssistant = undefined; } } } @@ -317,8 +338,17 @@ export function subscribeEmbeddedPiSession(params: { params.enforceFinalTag && cleaned ? (extractFinalText(cleaned)?.trim() ?? cleaned) : cleaned; - if (text) assistantTexts.push(text); - if (text && params.onBlockReply) { + + const addedDuringMessage = + assistantTexts.length > assistantTextBaseline; + if (!addedDuringMessage && text) assistantTexts.push(text); + assistantTextBaseline = assistantTexts.length; + + if ( + blockReplyBreak === "message_end" && + text && + params.onBlockReply + ) { const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { void params.onBlockReply({ @@ -328,6 +358,7 @@ export function subscribeEmbeddedPiSession(params: { } } deltaBuffer = ""; + lastStreamedAssistant = undefined; } } diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 20dd24d9a..6fa60d455 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -604,6 +604,10 @@ export async function getReplyFromConfig( (agentCfg?.verboseDefault as VerboseLevel | undefined); const resolvedBlockStreaming = agentCfg?.blockStreamingDefault === "off" ? "off" : "on"; + const resolvedBlockStreamingBreak = + agentCfg?.blockStreamingBreak === "message_end" + ? "message_end" + : "text_end"; const blockStreamingEnabled = resolvedBlockStreaming === "on"; const streamedPayloadKeys = new Set(); const pendingBlockTasks = new Set>(); @@ -1368,6 +1372,7 @@ export async function getReplyFromConfig( verboseLevel: resolvedVerboseLevel, timeoutMs, runId, + blockReplyBreak: resolvedBlockStreamingBreak, onPartialReply: opts?.onPartialReply ? async (payload) => { let text = payload.text; diff --git a/src/config/config.ts b/src/config/config.ts index 6bf2c4526..1a1dc31b1 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -526,6 +526,12 @@ export type ClawdisConfig = { verboseDefault?: "off" | "on"; /** Default block streaming level when no override is present. */ blockStreamingDefault?: "off" | "on"; + /** + * Block streaming boundary: + * - "text_end": end of each assistant text content block (before tool calls) + * - "message_end": end of the whole assistant message (may include tool blocks) + */ + blockStreamingBreak?: "text_end" | "message_end"; timeoutSeconds?: number; /** Max inbound media size in MB for agent-visible attachments (text note or future image attach). */ mediaMaxMb?: number; @@ -906,6 +912,9 @@ const ClawdisSchema = z.object({ blockStreamingDefault: z .union([z.literal("off"), z.literal("on")]) .optional(), + blockStreamingBreak: z + .union([z.literal("text_end"), z.literal("message_end")]) + .optional(), timeoutSeconds: z.number().int().positive().optional(), mediaMaxMb: z.number().positive().optional(), typingIntervalSeconds: z.number().int().positive().optional(),