feat: make block streaming break configurable

This commit is contained in:
Peter Steinberger
2026-01-03 00:52:02 +01:00
parent ea7d967625
commit 2d28fa34f5
8 changed files with 138 additions and 31 deletions

View File

@@ -331,6 +331,7 @@ export async function runEmbeddedPiAgent(params: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
onToolResult?: (payload: {
text?: string;
mediaUrls?: string[];
@@ -494,6 +495,7 @@ export async function runEmbeddedPiAgent(params: {
shouldEmitToolResult: params.shouldEmitToolResult,
onToolResult: params.onToolResult,
onBlockReply: params.onBlockReply,
blockReplyBreak: params.blockReplyBreak,
onPartialReply: params.onPartialReply,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,

View File

@@ -114,6 +114,7 @@ describe("subscribeEmbeddedPiSession", () => {
>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "message_end",
});
const assistantMessage = {
@@ -128,6 +129,59 @@ describe("subscribeEmbeddedPiSession", () => {
expect(payload.text).toBe("Hello block");
});
it("emits block replies on text_end and does not duplicate on message_end", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Hello block",
},
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
},
});
expect(onBlockReply).toHaveBeenCalledTimes(1);
const payload = onBlockReply.mock.calls[0][0];
expect(payload.text).toBe("Hello block");
expect(subscription.assistantTexts).toEqual(["Hello block"]);
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text: "Hello block" }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(subscription.assistantTexts).toEqual(["Hello block"]);
});
it("waits for auto-compaction retry and clears buffered text", async () => {
const listeners: SessionEventHandler[] = [];
const session = {

View File

@@ -61,6 +61,7 @@ export function subscribeEmbeddedPiSession(params: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
onPartialReply?: (payload: {
text?: string;
mediaUrls?: string[];
@@ -74,8 +75,10 @@ export function subscribeEmbeddedPiSession(params: {
const assistantTexts: string[] = [];
const toolMetas: Array<{ toolName?: string; meta?: string }> = [];
const toolMetaById = new Map<string, string | undefined>();
const blockReplyBreak = params.blockReplyBreak ?? "text_end";
let deltaBuffer = "";
let lastStreamedAssistant: string | undefined;
let assistantTextBaseline = 0;
let compactionInFlight = false;
let pendingCompactionRetry = 0;
let compactionRetryResolve: (() => void) | undefined;
@@ -149,6 +152,7 @@ export function subscribeEmbeddedPiSession(params: {
toolMetaById.clear();
deltaBuffer = "";
lastStreamedAssistant = undefined;
assistantTextBaseline = 0;
toolDebouncer.flush();
};
@@ -264,38 +268,55 @@ export function subscribeEmbeddedPiSession(params: {
: "";
if (chunk) {
deltaBuffer += chunk;
const cleaned = params.enforceFinalTag
? stripThinkingSegments(stripUnpairedThinkingTags(deltaBuffer))
: stripThinkingSegments(deltaBuffer);
const next = params.enforceFinalTag
? (extractFinalText(cleaned)?.trim() ?? cleaned.trim())
: cleaned.trim();
if (next && next !== lastStreamedAssistant) {
lastStreamedAssistant = next;
}
const cleaned = params.enforceFinalTag
? stripThinkingSegments(stripUnpairedThinkingTags(deltaBuffer))
: stripThinkingSegments(deltaBuffer);
const next = params.enforceFinalTag
? (extractFinalText(cleaned)?.trim() ?? cleaned.trim())
: cleaned.trim();
if (next && next !== lastStreamedAssistant) {
lastStreamedAssistant = next;
const { text: cleanedText, mediaUrls } =
splitMediaFromOutput(next);
emitAgentEvent({
runId: params.runId,
stream: "assistant",
data: {
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
params.onAgentEvent?.({
stream: "assistant",
data: {
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
if (params.onPartialReply) {
void params.onPartialReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
}
}
if (evtType === "text_end" && blockReplyBreak === "text_end") {
if (next) assistantTexts.push(next);
if (next && params.onBlockReply) {
const { text: cleanedText, mediaUrls } =
splitMediaFromOutput(next);
emitAgentEvent({
runId: params.runId,
stream: "assistant",
data: {
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
params.onAgentEvent?.({
stream: "assistant",
data: {
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
if (params.onPartialReply) {
void params.onPartialReply({
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
void params.onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
}
}
deltaBuffer = "";
lastStreamedAssistant = undefined;
}
}
}
@@ -317,8 +338,17 @@ export function subscribeEmbeddedPiSession(params: {
params.enforceFinalTag && cleaned
? (extractFinalText(cleaned)?.trim() ?? cleaned)
: cleaned;
if (text) assistantTexts.push(text);
if (text && params.onBlockReply) {
const addedDuringMessage =
assistantTexts.length > assistantTextBaseline;
if (!addedDuringMessage && text) assistantTexts.push(text);
assistantTextBaseline = assistantTexts.length;
if (
blockReplyBreak === "message_end" &&
text &&
params.onBlockReply
) {
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
void params.onBlockReply({
@@ -328,6 +358,7 @@ export function subscribeEmbeddedPiSession(params: {
}
}
deltaBuffer = "";
lastStreamedAssistant = undefined;
}
}

View File

@@ -604,6 +604,10 @@ export async function getReplyFromConfig(
(agentCfg?.verboseDefault as VerboseLevel | undefined);
const resolvedBlockStreaming =
agentCfg?.blockStreamingDefault === "off" ? "off" : "on";
const resolvedBlockStreamingBreak =
agentCfg?.blockStreamingBreak === "message_end"
? "message_end"
: "text_end";
const blockStreamingEnabled = resolvedBlockStreaming === "on";
const streamedPayloadKeys = new Set<string>();
const pendingBlockTasks = new Set<Promise<void>>();
@@ -1368,6 +1372,7 @@ export async function getReplyFromConfig(
verboseLevel: resolvedVerboseLevel,
timeoutMs,
runId,
blockReplyBreak: resolvedBlockStreamingBreak,
onPartialReply: opts?.onPartialReply
? async (payload) => {
let text = payload.text;

View File

@@ -526,6 +526,12 @@ export type ClawdisConfig = {
verboseDefault?: "off" | "on";
/** Default block streaming level when no override is present. */
blockStreamingDefault?: "off" | "on";
/**
* Block streaming boundary:
* - "text_end": end of each assistant text content block (before tool calls)
* - "message_end": end of the whole assistant message (may include tool blocks)
*/
blockStreamingBreak?: "text_end" | "message_end";
timeoutSeconds?: number;
/** Max inbound media size in MB for agent-visible attachments (text note or future image attach). */
mediaMaxMb?: number;
@@ -906,6 +912,9 @@ const ClawdisSchema = z.object({
blockStreamingDefault: z
.union([z.literal("off"), z.literal("on")])
.optional(),
blockStreamingBreak: z
.union([z.literal("text_end"), z.literal("message_end")])
.optional(),
timeoutSeconds: z.number().int().positive().optional(),
mediaMaxMb: z.number().positive().optional(),
typingIntervalSeconds: z.number().int().positive().optional(),