From 9f8eeceae7c1c410c7e5a0f495fcc8fc6268bdbf Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 3 Jan 2026 16:45:53 +0100 Subject: [PATCH] feat: soften block streaming chunking --- CHANGELOG.md | 1 + docs/agent.md | 2 + docs/configuration.md | 14 ++ src/agents/pi-embedded-runner.ts | 7 +- src/agents/pi-embedded-subscribe.test.ts | 52 +++++++ src/agents/pi-embedded-subscribe.ts | 170 +++++++++++++++++++---- src/auto-reply/reply.ts | 53 +++++++ 7 files changed, 270 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9efe26ac9..184c9512f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ ### Fixes - Telegram: chunk block-stream replies to avoid “message is too long” errors (#124) — thanks @mukhtharcm. - Gmail hooks: resolve gcloud Python to a real executable when PATH uses mise shims — thanks @joargp. +- Agent: add soft block-stream chunking (800–1200 chars default) with paragraph/newline preference. - Agent tools: scope the Discord tool to Discord surface runs. - Agent tools: format verbose tool summaries without brackets, with unique emojis and `tool: detail` style. - macOS Connections: move to sidebar + detail layout with structured sections and header actions. diff --git a/docs/agent.md b/docs/agent.md index 2dac0b3f3..f464fc32e 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -83,6 +83,8 @@ current turn ends, then a new agent turn starts with the queued payloads. See Block streaming sends completed assistant blocks as soon as they finish; disable via `agent.blockStreamingDefault: "off"` if you only want the final response. Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`). +Control soft block chunking with `agent.blockStreamingChunk` (defaults to +800–1200 chars; prefers paragraph breaks, then newlines; sentences last). ## Configuration (minimal) diff --git a/docs/configuration.md b/docs/configuration.md index 0fae80f2d..427fe6dc0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -391,6 +391,20 @@ Controls the embedded agent runtime (model/thinking/verbose/timeouts). } ``` +Block streaming: +- `agent.blockStreamingDefault`: `"on"`/`"off"` (default on). +- `agent.blockStreamingBreak`: `"text_end"` or `"message_end"`. +- `agent.blockStreamingChunk`: soft chunking for streamed blocks. Defaults to + 800–1200 chars, prefers paragraph breaks (`\n\n`), then newlines, then sentences. + Example: + ```json5 + { + agent: { + blockStreamingChunk: { minChars: 800, maxChars: 1200 } + } + } + ``` + `agent.model` should be set as `provider/model` (e.g. `anthropic/claude-opus-4-5`). If `modelAliases` is configured, you may also use the alias key (e.g. `Opus`). If you omit the provider, CLAWDIS currently assumes `anthropic` as a temporary diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 9d1c2879c..2deec7a14 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -42,7 +42,10 @@ import { formatAssistantErrorText, sanitizeSessionMessagesImages, } from "./pi-embedded-helpers.js"; -import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; +import { + subscribeEmbeddedPiSession, + type BlockReplyChunking, +} from "./pi-embedded-subscribe.js"; import { extractAssistantText } from "./pi-embedded-utils.js"; import { createClawdisCodingTools } from "./pi-tools.js"; import { @@ -334,6 +337,7 @@ export async function runEmbeddedPiAgent(params: { mediaUrls?: string[]; }) => void | Promise; blockReplyBreak?: "text_end" | "message_end"; + blockReplyChunking?: BlockReplyChunking; onToolResult?: (payload: { text?: string; mediaUrls?: string[]; @@ -503,6 +507,7 @@ export async function runEmbeddedPiAgent(params: { onToolResult: params.onToolResult, onBlockReply: params.onBlockReply, blockReplyBreak: params.blockReplyBreak, + blockReplyChunking: params.blockReplyChunking, onPartialReply: params.onPartialReply, onAgentEvent: params.onAgentEvent, enforceFinalTag: params.enforceFinalTag, diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 97ab7a62c..6606543eb 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -231,6 +231,58 @@ describe("subscribeEmbeddedPiSession", () => { expect(subscription.assistantTexts).toEqual(["Hello block"]); }); + it("streams soft chunks with paragraph preference", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + const subscription = subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "message_end", + blockReplyChunking: { + minChars: 5, + maxChars: 40, + breakPreference: "paragraph", + }, + }); + + const text = "First block line\n\nSecond block line"; + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: text, + }, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text }], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + + expect(onBlockReply).toHaveBeenCalledTimes(2); + expect(onBlockReply.mock.calls[0][0].text).toBe("First block line"); + expect(onBlockReply.mock.calls[1][0].text).toBe("Second block line"); + expect(subscription.assistantTexts).toEqual([ + "First block line", + "Second block line", + ]); + }); + it("waits for auto-compaction retry and clears buffered text", async () => { const listeners: SessionEventHandler[] = []; const session = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index d5190534a..4e9918f9d 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -19,6 +19,12 @@ const THINKING_OPEN_RE = /<\s*think(?:ing)?\s*>/i; const THINKING_CLOSE_RE = /<\s*\/\s*think(?:ing)?\s*>/i; const TOOL_RESULT_MAX_CHARS = 8000; +export type BlockReplyChunking = { + minChars: number; + maxChars: number; + breakPreference?: "paragraph" | "newline" | "sentence"; +}; + function truncateToolText(text: string): string { if (text.length <= TOOL_RESULT_MAX_CHARS) return text; return `${text.slice(0, TOOL_RESULT_MAX_CHARS)}\n…(truncated)…`; @@ -93,6 +99,7 @@ export function subscribeEmbeddedPiSession(params: { mediaUrls?: string[]; }) => void | Promise; blockReplyBreak?: "text_end" | "message_end"; + blockReplyChunking?: BlockReplyChunking; onPartialReply?: (payload: { text?: string; mediaUrls?: string[]; @@ -108,6 +115,7 @@ export function subscribeEmbeddedPiSession(params: { const toolMetaById = new Map(); const blockReplyBreak = params.blockReplyBreak ?? "text_end"; let deltaBuffer = ""; + let blockBuffer = ""; let lastStreamedAssistant: string | undefined; let lastBlockReplyText: string | undefined; let assistantTextBaseline = 0; @@ -178,11 +186,111 @@ export function subscribeEmbeddedPiSession(params: { }); }); + const blockChunking = params.blockReplyChunking; + + const findSentenceBreak = (window: string, minChars: number): number => { + if (!window) return -1; + const matches = window.matchAll(/[.!?](?=\s|$)/g); + let idx = -1; + for (const match of matches) { + const at = match.index ?? -1; + if (at < minChars) continue; + idx = at + 1; + } + return idx; + }; + + const findWhitespaceBreak = (window: string, minChars: number): number => { + for (let i = window.length - 1; i >= minChars; i--) { + if (/\s/.test(window[i])) return i; + } + return -1; + }; + + const pickBreakIndex = (buffer: string): number => { + if (!blockChunking) return -1; + const minChars = Math.max(1, Math.floor(blockChunking.minChars)); + const maxChars = Math.max(minChars, Math.floor(blockChunking.maxChars)); + if (buffer.length < minChars) return -1; + const window = buffer.slice(0, Math.min(maxChars, buffer.length)); + + const preference = blockChunking.breakPreference ?? "paragraph"; + const paragraphIdx = window.lastIndexOf("\n\n"); + if (preference === "paragraph" && paragraphIdx >= minChars) { + return paragraphIdx; + } + + const newlineIdx = window.lastIndexOf("\n"); + if ( + (preference === "paragraph" || preference === "newline") && + newlineIdx >= minChars + ) { + return newlineIdx; + } + + if (preference !== "newline") { + const sentenceIdx = findSentenceBreak(window, minChars); + if (sentenceIdx >= minChars) return sentenceIdx; + } + + const whitespaceIdx = findWhitespaceBreak(window, minChars); + if (whitespaceIdx >= minChars) return whitespaceIdx; + + if (buffer.length >= maxChars) return maxChars; + return -1; + }; + + const emitBlockChunk = (text: string) => { + const chunk = text.trimEnd(); + if (!chunk) return; + if (chunk === lastBlockReplyText) return; + lastBlockReplyText = chunk; + assistantTexts.push(chunk); + if (!params.onBlockReply) return; + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(chunk); + if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) return; + void params.onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + }; + + const drainBlockBuffer = (force: boolean) => { + if (!blockChunking) return; + const minChars = Math.max(1, Math.floor(blockChunking.minChars)); + const maxChars = Math.max(minChars, Math.floor(blockChunking.maxChars)); + if (blockBuffer.length < minChars && !force) return; + while (blockBuffer.length >= minChars || (force && blockBuffer.length > 0)) { + const breakIdx = pickBreakIndex(blockBuffer); + if (breakIdx <= 0) { + if (force) { + emitBlockChunk(blockBuffer); + blockBuffer = ""; + } + return; + } + const rawChunk = blockBuffer.slice(0, breakIdx); + if (rawChunk.trim().length === 0) { + blockBuffer = blockBuffer.slice(breakIdx).trimStart(); + continue; + } + emitBlockChunk(rawChunk); + const nextStart = + breakIdx < blockBuffer.length && /\s/.test(blockBuffer[breakIdx]) + ? breakIdx + 1 + : breakIdx; + blockBuffer = blockBuffer.slice(nextStart).trimStart(); + if (blockBuffer.length < minChars && !force) return; + if (blockBuffer.length < maxChars && !force) return; + } + }; + const resetForCompactionRetry = () => { assistantTexts.length = 0; toolMetas.length = 0; toolMetaById.clear(); deltaBuffer = ""; + blockBuffer = ""; lastStreamedAssistant = undefined; lastBlockReplyText = undefined; assistantTextBaseline = 0; @@ -337,6 +445,7 @@ export function subscribeEmbeddedPiSession(params: { : ""; if (chunk) { deltaBuffer += chunk; + blockBuffer += chunk; } const cleaned = params.enforceFinalTag @@ -372,25 +481,29 @@ export function subscribeEmbeddedPiSession(params: { } } + if (params.onBlockReply && blockChunking) { + drainBlockBuffer(false); + } + if (evtType === "text_end" && blockReplyBreak === "text_end") { - if (next && next === lastBlockReplyText) { - deltaBuffer = ""; - lastStreamedAssistant = undefined; - return; - } - lastBlockReplyText = next || undefined; - if (next) assistantTexts.push(next); - if (next && params.onBlockReply) { - const { text: cleanedText, mediaUrls } = - splitMediaFromOutput(next); - if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { - void params.onBlockReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }); + if (blockChunking && blockBuffer.length > 0) { + drainBlockBuffer(true); + } else if (next && next !== lastBlockReplyText) { + lastBlockReplyText = next || undefined; + if (next) assistantTexts.push(next); + if (next && params.onBlockReply) { + const { text: cleanedText, mediaUrls } = + splitMediaFromOutput(next); + if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { + void params.onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + } } } deltaBuffer = ""; + blockBuffer = ""; lastStreamedAssistant = undefined; } } @@ -420,25 +533,26 @@ export function subscribeEmbeddedPiSession(params: { assistantTextBaseline = assistantTexts.length; if ( - blockReplyBreak === "message_end" && + (blockReplyBreak === "message_end" || blockBuffer.length > 0) && text && params.onBlockReply ) { - if (text === lastBlockReplyText) { - deltaBuffer = ""; - lastStreamedAssistant = undefined; - return; - } - lastBlockReplyText = text; - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); - if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { - void params.onBlockReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }); + if (blockChunking && blockBuffer.length > 0) { + drainBlockBuffer(true); + } else if (text !== lastBlockReplyText) { + lastBlockReplyText = text; + const { text: cleanedText, mediaUrls } = + splitMediaFromOutput(text); + if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { + void params.onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + } } } deltaBuffer = ""; + blockBuffer = ""; lastStreamedAssistant = undefined; lastBlockReplyText = undefined; } diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index f72266670..36e4b0d5c 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -74,6 +74,7 @@ import { } from "./thinking.js"; import { SILENT_REPLY_TOKEN } from "./tokens.js"; import { isAudio, transcribeInboundAudio } from "./transcription.js"; +import { resolveTextChunkLimit, type TextChunkSurface } from "./chunk.js"; import type { GetReplyOptions, ReplyPayload } from "./types.js"; export type { GetReplyOptions, ReplyPayload } from "./types.js"; @@ -81,6 +82,54 @@ export type { GetReplyOptions, ReplyPayload } from "./types.js"; const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]); const ABORT_MEMORY = new Map(); const SYSTEM_MARK = "⚙️"; +const DEFAULT_BLOCK_STREAM_MIN = 800; +const DEFAULT_BLOCK_STREAM_MAX = 1200; + +const BLOCK_CHUNK_SURFACES = new Set([ + "whatsapp", + "telegram", + "discord", + "signal", + "imessage", + "webchat", +]); + +function normalizeChunkSurface(surface?: string): TextChunkSurface | undefined { + if (!surface) return undefined; + const cleaned = surface.trim().toLowerCase(); + return BLOCK_CHUNK_SURFACES.has(cleaned as TextChunkSurface) + ? (cleaned as TextChunkSurface) + : undefined; +} + +function resolveBlockStreamingChunking( + cfg: ClawdisConfig | undefined, + surface?: string, +): { + minChars: number; + maxChars: number; + breakPreference: "paragraph" | "newline" | "sentence"; +} { + const surfaceKey = normalizeChunkSurface(surface); + const textLimit = resolveTextChunkLimit(cfg, surfaceKey); + const chunkCfg = cfg?.agent?.blockStreamingChunk; + const maxRequested = Math.max( + 1, + Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX), + ); + const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); + const minRequested = Math.max( + 1, + Math.floor(chunkCfg?.minChars ?? DEFAULT_BLOCK_STREAM_MIN), + ); + const minChars = Math.min(minRequested, maxChars); + const breakPreference = + chunkCfg?.breakPreference === "newline" || + chunkCfg?.breakPreference === "sentence" + ? chunkCfg.breakPreference + : "paragraph"; + return { minChars, maxChars, breakPreference }; +} type QueueMode = | "steer" @@ -1079,6 +1128,9 @@ export async function getReplyFromConfig( const resolvedBlockStreamingBreak = agentCfg?.blockStreamingBreak === "text_end" ? "text_end" : "message_end"; const blockStreamingEnabled = resolvedBlockStreaming === "on"; + const blockReplyChunking = blockStreamingEnabled + ? resolveBlockStreamingChunking(cfg, sessionCtx.Surface) + : undefined; const streamedPayloadKeys = new Set(); const pendingBlockTasks = new Set>(); const buildPayloadKey = (payload: ReplyPayload) => { @@ -2124,6 +2176,7 @@ export async function getReplyFromConfig( timeoutMs, runId, blockReplyBreak: resolvedBlockStreamingBreak, + blockReplyChunking, onPartialReply: opts?.onPartialReply ? async (payload) => { let text = payload.text;