From e5dbe1db9dbc0b8228e2b2cb0f23fb5a9053c5f9 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 7 Jan 2026 07:47:18 +0000 Subject: [PATCH] fix: ensure output for non-streaming models (#369) Co-authored-by: mneves75 --- CHANGELOG.md | 1 + src/agents/pi-embedded-subscribe.test.ts | 34 ++++++++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 6 +++-- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 937a477ae..b9dc34b88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - Gateway/CLI: add daemon runtime selection (Node recommended; Bun optional) and document WhatsApp/Baileys Bun WebSocket instability on reconnect. - CLI: add `clawdbot docs` live docs search with pretty output. - Agent: treat compaction retry AbortError as a fallback trigger without swallowing non-abort errors. Thanks @erikpr1994 for PR #341. +- Agent: deliver final replies for non-streaming models when block chunking is enabled. Thanks @mneves75 for PR #369. - Sub-agents: allow `sessions_spawn` model overrides and error on invalid models. Thanks @azade-c for PR #298. - Sub-agents: skip invalid model overrides with a warning and keep the run alive; tool exceptions now return tool errors instead of crashing the agent. - Heartbeat: default interval 30m; clarified default prompt usage and HEARTBEAT.md template behavior. diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index b50e02d96..9c862d759 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -400,6 +400,40 @@ describe("subscribeEmbeddedPiSession", () => { expect(subscription.assistantTexts).toEqual(["Hello world"]); }); + it("populates assistantTexts for non-streaming models with chunking enabled", () => { + // Non-streaming models (e.g. zai/glm-4.7): no text_delta events; message_end + // must still populate assistantTexts so providers can deliver a final reply. + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const subscription = subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + blockReplyChunking: { minChars: 50, maxChars: 200 }, // Chunking enabled + }); + + // Simulate non-streaming model: only message_start and message_end, no text_delta + handler?.({ type: "message_start", message: { role: "assistant" } }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "Response from non-streaming model" }], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + + expect(subscription.assistantTexts).toEqual([ + "Response from non-streaming model", + ]); + }); + it("does not append when text_end content is a prefix of deltas", () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 3aa9a6d5a..d5535d2d5 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -516,8 +516,10 @@ export function subscribeEmbeddedPiSession(params: { const addedDuringMessage = assistantTexts.length > assistantTextBaseline; - const chunkingEnabled = Boolean(blockChunking); - if (!chunkingEnabled && !addedDuringMessage && text) { + const chunkerHasBuffered = blockChunker?.hasBuffered() ?? false; + // Non-streaming models (no text_delta): ensure assistantTexts gets the + // final text when the chunker has nothing buffered to drain. + if (!addedDuringMessage && !chunkerHasBuffered && text) { const last = assistantTexts.at(-1); if (!last || last !== text) assistantTexts.push(text); }