feat: soften block streaming chunking

This commit is contained in:
Peter Steinberger
2026-01-03 16:45:53 +01:00
parent 53baba71fa
commit 9f8eeceae7
7 changed files with 270 additions and 29 deletions

View File

@@ -15,6 +15,7 @@
### Fixes
- Telegram: chunk block-stream replies to avoid “message is too long” errors (#124) — thanks @mukhtharcm.
- Gmail hooks: resolve gcloud Python to a real executable when PATH uses mise shims — thanks @joargp.
- Agent: add soft block-stream chunking (8001200 chars default) with paragraph/newline preference.
- Agent tools: scope the Discord tool to Discord surface runs.
- Agent tools: format verbose tool summaries without brackets, with unique emojis and `tool: detail` style.
- macOS Connections: move to sidebar + detail layout with structured sections and header actions.

View File

@@ -83,6 +83,8 @@ current turn ends, then a new agent turn starts with the queued payloads. See
Block streaming sends completed assistant blocks as soon as they finish; disable
via `agent.blockStreamingDefault: "off"` if you only want the final response.
Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`).
Control soft block chunking with `agent.blockStreamingChunk` (defaults to
8001200 chars; prefers paragraph breaks, then newlines; sentences last).
## Configuration (minimal)

View File

@@ -391,6 +391,20 @@ Controls the embedded agent runtime (model/thinking/verbose/timeouts).
}
```
Block streaming:
- `agent.blockStreamingDefault`: `"on"`/`"off"` (default on).
- `agent.blockStreamingBreak`: `"text_end"` or `"message_end"`.
- `agent.blockStreamingChunk`: soft chunking for streamed blocks. Defaults to
8001200 chars, prefers paragraph breaks (`\n\n`), then newlines, then sentences.
Example:
```json5
{
agent: {
blockStreamingChunk: { minChars: 800, maxChars: 1200 }
}
}
```
`agent.model` should be set as `provider/model` (e.g. `anthropic/claude-opus-4-5`).
If `modelAliases` is configured, you may also use the alias key (e.g. `Opus`).
If you omit the provider, CLAWDIS currently assumes `anthropic` as a temporary

View File

@@ -42,7 +42,10 @@ import {
formatAssistantErrorText,
sanitizeSessionMessagesImages,
} from "./pi-embedded-helpers.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
import {
subscribeEmbeddedPiSession,
type BlockReplyChunking,
} from "./pi-embedded-subscribe.js";
import { extractAssistantText } from "./pi-embedded-utils.js";
import { createClawdisCodingTools } from "./pi-tools.js";
import {
@@ -334,6 +337,7 @@ export async function runEmbeddedPiAgent(params: {
mediaUrls?: string[];
}) => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
blockReplyChunking?: BlockReplyChunking;
onToolResult?: (payload: {
text?: string;
mediaUrls?: string[];
@@ -503,6 +507,7 @@ export async function runEmbeddedPiAgent(params: {
onToolResult: params.onToolResult,
onBlockReply: params.onBlockReply,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,

View File

@@ -231,6 +231,58 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.assistantTexts).toEqual(["Hello block"]);
});
it("streams soft chunks with paragraph preference", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "message_end",
blockReplyChunking: {
minChars: 5,
maxChars: 40,
breakPreference: "paragraph",
},
});
const text = "First block line\n\nSecond block line";
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: text,
},
});
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(2);
expect(onBlockReply.mock.calls[0][0].text).toBe("First block line");
expect(onBlockReply.mock.calls[1][0].text).toBe("Second block line");
expect(subscription.assistantTexts).toEqual([
"First block line",
"Second block line",
]);
});
it("waits for auto-compaction retry and clears buffered text", async () => {
const listeners: SessionEventHandler[] = [];
const session = {

View File

@@ -19,6 +19,12 @@ const THINKING_OPEN_RE = /<\s*think(?:ing)?\s*>/i;
const THINKING_CLOSE_RE = /<\s*\/\s*think(?:ing)?\s*>/i;
const TOOL_RESULT_MAX_CHARS = 8000;
export type BlockReplyChunking = {
minChars: number;
maxChars: number;
breakPreference?: "paragraph" | "newline" | "sentence";
};
function truncateToolText(text: string): string {
if (text.length <= TOOL_RESULT_MAX_CHARS) return text;
return `${text.slice(0, TOOL_RESULT_MAX_CHARS)}\n…(truncated)…`;
@@ -93,6 +99,7 @@ export function subscribeEmbeddedPiSession(params: {
mediaUrls?: string[];
}) => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
blockReplyChunking?: BlockReplyChunking;
onPartialReply?: (payload: {
text?: string;
mediaUrls?: string[];
@@ -108,6 +115,7 @@ export function subscribeEmbeddedPiSession(params: {
const toolMetaById = new Map<string, string | undefined>();
const blockReplyBreak = params.blockReplyBreak ?? "text_end";
let deltaBuffer = "";
let blockBuffer = "";
let lastStreamedAssistant: string | undefined;
let lastBlockReplyText: string | undefined;
let assistantTextBaseline = 0;
@@ -178,11 +186,111 @@ export function subscribeEmbeddedPiSession(params: {
});
});
const blockChunking = params.blockReplyChunking;
const findSentenceBreak = (window: string, minChars: number): number => {
if (!window) return -1;
const matches = window.matchAll(/[.!?](?=\s|$)/g);
let idx = -1;
for (const match of matches) {
const at = match.index ?? -1;
if (at < minChars) continue;
idx = at + 1;
}
return idx;
};
const findWhitespaceBreak = (window: string, minChars: number): number => {
for (let i = window.length - 1; i >= minChars; i--) {
if (/\s/.test(window[i])) return i;
}
return -1;
};
const pickBreakIndex = (buffer: string): number => {
if (!blockChunking) return -1;
const minChars = Math.max(1, Math.floor(blockChunking.minChars));
const maxChars = Math.max(minChars, Math.floor(blockChunking.maxChars));
if (buffer.length < minChars) return -1;
const window = buffer.slice(0, Math.min(maxChars, buffer.length));
const preference = blockChunking.breakPreference ?? "paragraph";
const paragraphIdx = window.lastIndexOf("\n\n");
if (preference === "paragraph" && paragraphIdx >= minChars) {
return paragraphIdx;
}
const newlineIdx = window.lastIndexOf("\n");
if (
(preference === "paragraph" || preference === "newline") &&
newlineIdx >= minChars
) {
return newlineIdx;
}
if (preference !== "newline") {
const sentenceIdx = findSentenceBreak(window, minChars);
if (sentenceIdx >= minChars) return sentenceIdx;
}
const whitespaceIdx = findWhitespaceBreak(window, minChars);
if (whitespaceIdx >= minChars) return whitespaceIdx;
if (buffer.length >= maxChars) return maxChars;
return -1;
};
const emitBlockChunk = (text: string) => {
const chunk = text.trimEnd();
if (!chunk) return;
if (chunk === lastBlockReplyText) return;
lastBlockReplyText = chunk;
assistantTexts.push(chunk);
if (!params.onBlockReply) return;
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(chunk);
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) return;
void params.onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
};
const drainBlockBuffer = (force: boolean) => {
if (!blockChunking) return;
const minChars = Math.max(1, Math.floor(blockChunking.minChars));
const maxChars = Math.max(minChars, Math.floor(blockChunking.maxChars));
if (blockBuffer.length < minChars && !force) return;
while (blockBuffer.length >= minChars || (force && blockBuffer.length > 0)) {
const breakIdx = pickBreakIndex(blockBuffer);
if (breakIdx <= 0) {
if (force) {
emitBlockChunk(blockBuffer);
blockBuffer = "";
}
return;
}
const rawChunk = blockBuffer.slice(0, breakIdx);
if (rawChunk.trim().length === 0) {
blockBuffer = blockBuffer.slice(breakIdx).trimStart();
continue;
}
emitBlockChunk(rawChunk);
const nextStart =
breakIdx < blockBuffer.length && /\s/.test(blockBuffer[breakIdx])
? breakIdx + 1
: breakIdx;
blockBuffer = blockBuffer.slice(nextStart).trimStart();
if (blockBuffer.length < minChars && !force) return;
if (blockBuffer.length < maxChars && !force) return;
}
};
const resetForCompactionRetry = () => {
assistantTexts.length = 0;
toolMetas.length = 0;
toolMetaById.clear();
deltaBuffer = "";
blockBuffer = "";
lastStreamedAssistant = undefined;
lastBlockReplyText = undefined;
assistantTextBaseline = 0;
@@ -337,6 +445,7 @@ export function subscribeEmbeddedPiSession(params: {
: "";
if (chunk) {
deltaBuffer += chunk;
blockBuffer += chunk;
}
const cleaned = params.enforceFinalTag
@@ -372,25 +481,29 @@ export function subscribeEmbeddedPiSession(params: {
}
}
if (params.onBlockReply && blockChunking) {
drainBlockBuffer(false);
}
if (evtType === "text_end" && blockReplyBreak === "text_end") {
if (next && next === lastBlockReplyText) {
deltaBuffer = "";
lastStreamedAssistant = undefined;
return;
}
lastBlockReplyText = next || undefined;
if (next) assistantTexts.push(next);
if (next && params.onBlockReply) {
const { text: cleanedText, mediaUrls } =
splitMediaFromOutput(next);
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
void params.onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
if (blockChunking && blockBuffer.length > 0) {
drainBlockBuffer(true);
} else if (next && next !== lastBlockReplyText) {
lastBlockReplyText = next || undefined;
if (next) assistantTexts.push(next);
if (next && params.onBlockReply) {
const { text: cleanedText, mediaUrls } =
splitMediaFromOutput(next);
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
void params.onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
}
}
}
deltaBuffer = "";
blockBuffer = "";
lastStreamedAssistant = undefined;
}
}
@@ -420,25 +533,26 @@ export function subscribeEmbeddedPiSession(params: {
assistantTextBaseline = assistantTexts.length;
if (
blockReplyBreak === "message_end" &&
(blockReplyBreak === "message_end" || blockBuffer.length > 0) &&
text &&
params.onBlockReply
) {
if (text === lastBlockReplyText) {
deltaBuffer = "";
lastStreamedAssistant = undefined;
return;
}
lastBlockReplyText = text;
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
void params.onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
if (blockChunking && blockBuffer.length > 0) {
drainBlockBuffer(true);
} else if (text !== lastBlockReplyText) {
lastBlockReplyText = text;
const { text: cleanedText, mediaUrls } =
splitMediaFromOutput(text);
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
void params.onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
}
}
}
deltaBuffer = "";
blockBuffer = "";
lastStreamedAssistant = undefined;
lastBlockReplyText = undefined;
}

View File

@@ -74,6 +74,7 @@ import {
} from "./thinking.js";
import { SILENT_REPLY_TOKEN } from "./tokens.js";
import { isAudio, transcribeInboundAudio } from "./transcription.js";
import { resolveTextChunkLimit, type TextChunkSurface } from "./chunk.js";
import type { GetReplyOptions, ReplyPayload } from "./types.js";
export type { GetReplyOptions, ReplyPayload } from "./types.js";
@@ -81,6 +82,54 @@ export type { GetReplyOptions, ReplyPayload } from "./types.js";
const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]);
const ABORT_MEMORY = new Map<string, boolean>();
const SYSTEM_MARK = "⚙️";
const DEFAULT_BLOCK_STREAM_MIN = 800;
const DEFAULT_BLOCK_STREAM_MAX = 1200;
const BLOCK_CHUNK_SURFACES = new Set<TextChunkSurface>([
"whatsapp",
"telegram",
"discord",
"signal",
"imessage",
"webchat",
]);
function normalizeChunkSurface(surface?: string): TextChunkSurface | undefined {
if (!surface) return undefined;
const cleaned = surface.trim().toLowerCase();
return BLOCK_CHUNK_SURFACES.has(cleaned as TextChunkSurface)
? (cleaned as TextChunkSurface)
: undefined;
}
function resolveBlockStreamingChunking(
cfg: ClawdisConfig | undefined,
surface?: string,
): {
minChars: number;
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
} {
const surfaceKey = normalizeChunkSurface(surface);
const textLimit = resolveTextChunkLimit(cfg, surfaceKey);
const chunkCfg = cfg?.agent?.blockStreamingChunk;
const maxRequested = Math.max(
1,
Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX),
);
const maxChars = Math.max(1, Math.min(maxRequested, textLimit));
const minRequested = Math.max(
1,
Math.floor(chunkCfg?.minChars ?? DEFAULT_BLOCK_STREAM_MIN),
);
const minChars = Math.min(minRequested, maxChars);
const breakPreference =
chunkCfg?.breakPreference === "newline" ||
chunkCfg?.breakPreference === "sentence"
? chunkCfg.breakPreference
: "paragraph";
return { minChars, maxChars, breakPreference };
}
type QueueMode =
| "steer"
@@ -1079,6 +1128,9 @@ export async function getReplyFromConfig(
const resolvedBlockStreamingBreak =
agentCfg?.blockStreamingBreak === "text_end" ? "text_end" : "message_end";
const blockStreamingEnabled = resolvedBlockStreaming === "on";
const blockReplyChunking = blockStreamingEnabled
? resolveBlockStreamingChunking(cfg, sessionCtx.Surface)
: undefined;
const streamedPayloadKeys = new Set<string>();
const pendingBlockTasks = new Set<Promise<void>>();
const buildPayloadKey = (payload: ReplyPayload) => {
@@ -2124,6 +2176,7 @@ export async function getReplyFromConfig(
timeoutMs,
runId,
blockReplyBreak: resolvedBlockStreamingBreak,
blockReplyChunking,
onPartialReply: opts?.onPartialReply
? async (payload) => {
let text = payload.text;