diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 4ad3a0649..5f9839d4d 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -284,6 +284,50 @@ export function subscribeEmbeddedPiSession(params: { const isSafeBreak = (spans: FenceSpan[], index: number): boolean => !findFenceSpanAt(spans, index); + const pickSoftBreakIndex = (buffer: string): BreakResult => { + if (!blockChunking) return { index: -1 }; + const minChars = Math.max(1, Math.floor(blockChunking.minChars)); + if (buffer.length < minChars) return { index: -1 }; + const fenceSpans = parseFenceSpans(buffer); + const preference = blockChunking.breakPreference ?? "paragraph"; + + if (preference === "paragraph") { + let paragraphIdx = buffer.lastIndexOf("\n\n"); + while (paragraphIdx >= minChars) { + if (isSafeBreak(fenceSpans, paragraphIdx)) { + return { index: paragraphIdx }; + } + paragraphIdx = buffer.lastIndexOf("\n\n", paragraphIdx - 1); + } + } + + if (preference === "paragraph" || preference === "newline") { + let newlineIdx = buffer.lastIndexOf("\n"); + while (newlineIdx >= minChars) { + if (isSafeBreak(fenceSpans, newlineIdx)) { + return { index: newlineIdx }; + } + newlineIdx = buffer.lastIndexOf("\n", newlineIdx - 1); + } + } + + if (preference !== "newline") { + const matches = buffer.matchAll(/[.!?](?=\s|$)/g); + let sentenceIdx = -1; + for (const match of matches) { + const at = match.index ?? -1; + if (at < minChars) continue; + const candidate = at + 1; + if (isSafeBreak(fenceSpans, candidate)) { + sentenceIdx = candidate; + } + } + if (sentenceIdx >= minChars) return { index: sentenceIdx }; + } + + return { index: -1 }; + }; + const pickBreakIndex = (buffer: string): BreakResult => { if (!blockChunking) return { index: -1 }; const minChars = Math.max(1, Math.floor(blockChunking.minChars)); @@ -371,18 +415,15 @@ export function subscribeEmbeddedPiSession(params: { if (!blockChunking) return; const minChars = Math.max(1, Math.floor(blockChunking.minChars)); const maxChars = Math.max(minChars, Math.floor(blockChunking.maxChars)); - // Force flush small remainders as a single chunk to avoid re-splitting. - if (force && blockBuffer.length > 0 && blockBuffer.length <= maxChars) { - emitBlockChunk(blockBuffer); - blockBuffer = ""; - return; - } if (blockBuffer.length < minChars && !force) return; while ( blockBuffer.length >= minChars || (force && blockBuffer.length > 0) ) { - const breakResult = pickBreakIndex(blockBuffer); + const breakResult = + force && blockBuffer.length <= maxChars + ? pickSoftBreakIndex(blockBuffer) + : pickBreakIndex(blockBuffer); if (breakResult.index <= 0) { if (force) { emitBlockChunk(blockBuffer); @@ -643,7 +684,11 @@ export function subscribeEmbeddedPiSession(params: { } } - if (params.onBlockReply && blockChunking) { + if ( + params.onBlockReply && + blockChunking && + blockReplyBreak === "text_end" + ) { drainBlockBuffer(false); } @@ -678,7 +723,8 @@ export function subscribeEmbeddedPiSession(params: { const addedDuringMessage = assistantTexts.length > assistantTextBaseline; - if (!addedDuringMessage && text) { + const chunkingEnabled = Boolean(blockChunking); + if (!chunkingEnabled && !addedDuringMessage && text) { const last = assistantTexts.at(-1); if (!last || last !== text) assistantTexts.push(text); }