diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a8763e17..9cbed0e10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - Control UI: show a reading indicator bubble while the assistant is responding. - Status: show runtime (docker/direct) and move shortcuts to `/help`. - Status: show model auth source (api-key/oauth). +- Block streaming: avoid splitting Markdown fenced blocks and reopen fences when forced to split. ### Maintenance - Deps: bump pi-* stack, Slack SDK, discord-api-types, file-type, zod, and Biome. diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index ca0782909..40bf76444 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -633,6 +633,109 @@ describe("subscribeEmbeddedPiSession", () => { ]); }); + it("avoids splitting inside fenced code blocks", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "message_end", + blockReplyChunking: { + minChars: 5, + maxChars: 50, + breakPreference: "paragraph", + }, + }); + + const text = "Intro\n\n```bash\nline1\nline2\n```\n\nOutro"; + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: text, + }, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text }], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + + expect(onBlockReply).toHaveBeenCalledTimes(3); + expect(onBlockReply.mock.calls[0][0].text).toBe("Intro"); + expect(onBlockReply.mock.calls[1][0].text).toBe( + "```bash\nline1\nline2\n```", + ); + expect(onBlockReply.mock.calls[2][0].text).toBe("Outro"); + }); + + it("reopens fenced blocks when splitting inside them", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "message_end", + blockReplyChunking: { + minChars: 10, + maxChars: 30, + breakPreference: "paragraph", + }, + }); + + const text = `\`\`\`txt\n${"a".repeat(80)}\n\`\`\``; + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: text, + }, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text }], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + + expect(onBlockReply.mock.calls.length).toBeGreaterThan(1); + for (const call of onBlockReply.mock.calls) { + const chunk = call[0].text as string; + expect(chunk.startsWith("```txt")).toBe(true); + const fenceCount = chunk.match(/```/g)?.length ?? 0; + expect(fenceCount).toBeGreaterThanOrEqual(2); + } + }); + it("waits for auto-compaction retry and clears buffered text", async () => { const listeners: SessionEventHandler[] = []; const session = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index e5c39c0fd..845f62dce 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -195,56 +195,159 @@ export function subscribeEmbeddedPiSession(params: { } }; - const findSentenceBreak = (window: string, minChars: number): number => { - if (!window) return -1; - const matches = window.matchAll(/[.!?](?=\s|$)/g); - let idx = -1; - for (const match of matches) { - const at = match.index ?? -1; - if (at < minChars) continue; - idx = at + 1; - } - return idx; + type FenceSpan = { + start: number; + end: number; + openLine: string; + marker: string; + indent: string; }; - const findWhitespaceBreak = (window: string, minChars: number): number => { - for (let i = window.length - 1; i >= minChars; i--) { - if (/\s/.test(window[i])) return i; - } - return -1; + type FenceSplit = { + closeFenceLine: string; + reopenFenceLine: string; }; - const pickBreakIndex = (buffer: string): number => { - if (!blockChunking) return -1; + type BreakResult = { + index: number; + fenceSplit?: FenceSplit; + }; + + const parseFenceSpans = (buffer: string): FenceSpan[] => { + const spans: FenceSpan[] = []; + let open: + | { + start: number; + markerChar: string; + markerLen: number; + openLine: string; + marker: string; + indent: string; + } + | undefined; + let offset = 0; + while (offset <= buffer.length) { + const nextNewline = buffer.indexOf("\n", offset); + const lineEnd = nextNewline === -1 ? buffer.length : nextNewline; + const line = buffer.slice(offset, lineEnd); + const match = line.match(/^( {0,3})(`{3,}|~{3,})(.*)$/); + if (match) { + const indent = match[1]; + const marker = match[2]; + const markerChar = marker[0]; + const markerLen = marker.length; + if (!open) { + open = { + start: offset, + markerChar, + markerLen, + openLine: line, + marker, + indent, + }; + } else if ( + open.markerChar === markerChar && + markerLen >= open.markerLen + ) { + const end = nextNewline === -1 ? buffer.length : nextNewline + 1; + spans.push({ + start: open.start, + end, + openLine: open.openLine, + marker: open.marker, + indent: open.indent, + }); + open = undefined; + } + } + if (nextNewline === -1) break; + offset = nextNewline + 1; + } + if (open) { + spans.push({ + start: open.start, + end: buffer.length, + openLine: open.openLine, + marker: open.marker, + indent: open.indent, + }); + } + return spans; + }; + + const findFenceSpanAt = ( + spans: FenceSpan[], + index: number, + ): FenceSpan | undefined => + spans.find((span) => index > span.start && index < span.end); + + const isSafeBreak = (spans: FenceSpan[], index: number): boolean => + !findFenceSpanAt(spans, index); + + const pickBreakIndex = (buffer: string): BreakResult => { + if (!blockChunking) return { index: -1 }; const minChars = Math.max(1, Math.floor(blockChunking.minChars)); const maxChars = Math.max(minChars, Math.floor(blockChunking.maxChars)); - if (buffer.length < minChars) return -1; + if (buffer.length < minChars) return { index: -1 }; const window = buffer.slice(0, Math.min(maxChars, buffer.length)); + const fenceSpans = parseFenceSpans(buffer); const preference = blockChunking.breakPreference ?? "paragraph"; - const paragraphIdx = window.lastIndexOf("\n\n"); - if (preference === "paragraph" && paragraphIdx >= minChars) { - return paragraphIdx; + if (preference === "paragraph") { + let paragraphIdx = window.lastIndexOf("\n\n"); + while (paragraphIdx >= minChars) { + if (isSafeBreak(fenceSpans, paragraphIdx)) { + return { index: paragraphIdx }; + } + paragraphIdx = window.lastIndexOf("\n\n", paragraphIdx - 1); + } } - const newlineIdx = window.lastIndexOf("\n"); - if ( - (preference === "paragraph" || preference === "newline") && - newlineIdx >= minChars - ) { - return newlineIdx; + if (preference === "paragraph" || preference === "newline") { + let newlineIdx = window.lastIndexOf("\n"); + while (newlineIdx >= minChars) { + if (isSafeBreak(fenceSpans, newlineIdx)) { + return { index: newlineIdx }; + } + newlineIdx = window.lastIndexOf("\n", newlineIdx - 1); + } } if (preference !== "newline") { - const sentenceIdx = findSentenceBreak(window, minChars); - if (sentenceIdx >= minChars) return sentenceIdx; + const matches = window.matchAll(/[.!?](?=\s|$)/g); + let sentenceIdx = -1; + for (const match of matches) { + const at = match.index ?? -1; + if (at < minChars) continue; + const candidate = at + 1; + if (isSafeBreak(fenceSpans, candidate)) { + sentenceIdx = candidate; + } + } + if (sentenceIdx >= minChars) return { index: sentenceIdx }; } - const whitespaceIdx = findWhitespaceBreak(window, minChars); - if (whitespaceIdx >= minChars) return whitespaceIdx; + for (let i = window.length - 1; i >= minChars; i--) { + if (/\s/.test(window[i]) && isSafeBreak(fenceSpans, i)) { + return { index: i }; + } + } - if (buffer.length >= maxChars) return maxChars; - return -1; + if (buffer.length >= maxChars) { + if (isSafeBreak(fenceSpans, maxChars)) return { index: maxChars }; + const fence = findFenceSpanAt(fenceSpans, maxChars); + if (fence) { + return { + index: maxChars, + fenceSplit: { + closeFenceLine: `${fence.indent}${fence.marker}`, + reopenFenceLine: fence.openLine, + }, + }; + } + return { index: maxChars }; + } + return { index: -1 }; }; const emitBlockChunk = (text: string) => { @@ -279,25 +382,42 @@ export function subscribeEmbeddedPiSession(params: { blockBuffer.length >= minChars || (force && blockBuffer.length > 0) ) { - const breakIdx = pickBreakIndex(blockBuffer); - if (breakIdx <= 0) { + const breakResult = pickBreakIndex(blockBuffer); + if (breakResult.index <= 0) { if (force) { emitBlockChunk(blockBuffer); blockBuffer = ""; } return; } - const rawChunk = blockBuffer.slice(0, breakIdx); + const breakIdx = breakResult.index; + let rawChunk = blockBuffer.slice(0, breakIdx); if (rawChunk.trim().length === 0) { blockBuffer = blockBuffer.slice(breakIdx).trimStart(); continue; } + let nextBuffer = blockBuffer.slice(breakIdx); + const fenceSplit = breakResult.fenceSplit; + if (fenceSplit) { + const closeFence = rawChunk.endsWith("\n") + ? `${fenceSplit.closeFenceLine}\n` + : `\n${fenceSplit.closeFenceLine}\n`; + rawChunk = `${rawChunk}${closeFence}`; + const reopenFence = fenceSplit.reopenFenceLine.endsWith("\n") + ? fenceSplit.reopenFenceLine + : `${fenceSplit.reopenFenceLine}\n`; + nextBuffer = `${reopenFence}${nextBuffer}`; + } emitBlockChunk(rawChunk); - const nextStart = - breakIdx < blockBuffer.length && /\s/.test(blockBuffer[breakIdx]) - ? breakIdx + 1 - : breakIdx; - blockBuffer = blockBuffer.slice(nextStart).trimStart(); + if (fenceSplit) { + blockBuffer = nextBuffer; + } else { + const nextStart = + breakIdx < blockBuffer.length && /\s/.test(blockBuffer[breakIdx]) + ? breakIdx + 1 + : breakIdx; + blockBuffer = blockBuffer.slice(nextStart).trimStart(); + } if (blockBuffer.length < minChars && !force) return; if (blockBuffer.length < maxChars && !force) return; }