From 3b91148a0a49fcb3e9b7c3710c0b7ee741f0479b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 23:19:19 +0100 Subject: [PATCH] fix: handle fence-close paragraph breaks --- src/agents/pi-embedded-block-chunker.ts | 21 ++++++--- src/auto-reply/reply.block-streaming.test.ts | 47 +++++++++++++++----- 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/src/agents/pi-embedded-block-chunker.ts b/src/agents/pi-embedded-block-chunker.ts index 4236eb53d..7254bbd9f 100644 --- a/src/agents/pi-embedded-block-chunker.ts +++ b/src/agents/pi-embedded-block-chunker.ts @@ -129,11 +129,13 @@ export class EmbeddedBlockChunker { if (preference === "paragraph") { let paragraphIdx = buffer.indexOf("\n\n"); while (paragraphIdx !== -1) { - if ( - paragraphIdx >= minChars && - isSafeFenceBreak(fenceSpans, paragraphIdx) - ) { - return { index: paragraphIdx }; + const candidates = [paragraphIdx, paragraphIdx + 1]; + for (const candidate of candidates) { + if (candidate < minChars) continue; + if (candidate < 0 || candidate >= buffer.length) continue; + if (isSafeFenceBreak(fenceSpans, candidate)) { + return { index: candidate }; + } } paragraphIdx = buffer.indexOf("\n\n", paragraphIdx + 2); } @@ -183,8 +185,13 @@ export class EmbeddedBlockChunker { if (preference === "paragraph") { let paragraphIdx = window.lastIndexOf("\n\n"); while (paragraphIdx >= minChars) { - if (isSafeFenceBreak(fenceSpans, paragraphIdx)) { - return { index: paragraphIdx }; + const candidates = [paragraphIdx, paragraphIdx + 1]; + for (const candidate of candidates) { + if (candidate < minChars) continue; + if (candidate < 0 || candidate >= buffer.length) continue; + if (isSafeFenceBreak(fenceSpans, candidate)) { + return { index: candidate }; + } } paragraphIdx = window.lastIndexOf("\n\n", paragraphIdx - 1); } diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index a2d592f2c..c11bd492c 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -2,11 +2,21 @@ import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { runEmbeddedPiAgent as runEmbeddedPiAgentRunner } from "/src/agents/pi-embedded.js"; import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; import { loadModelCatalog } from "../agents/model-catalog.js"; -import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { runEmbeddedPiAgent as runEmbeddedPiAgentAutoReply } from "../agents/pi-embedded.js"; import { getReplyFromConfig } from "./reply.js"; +vi.mock("/src/agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), + runEmbeddedPiAgent: vi.fn(), + queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, + isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), + isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), +})); vi.mock("../agents/pi-embedded.js", () => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: vi.fn(), @@ -26,7 +36,8 @@ async function withTempHome(fn: (home: string) => Promise): Promise { describe("block streaming", () => { beforeEach(() => { - vi.mocked(runEmbeddedPiAgent).mockReset(); + vi.mocked(runEmbeddedPiAgentAutoReply).mockReset(); + vi.mocked(runEmbeddedPiAgentRunner).mockReset(); vi.mocked(loadModelCatalog).mockResolvedValue([ { id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" }, { id: "gpt-4.1-mini", name: "GPT-4.1 Mini", provider: "openai" }, @@ -52,7 +63,9 @@ describe("block streaming", () => { const onReplyStart = vi.fn(() => typingGate); const onBlockReply = vi.fn().mockResolvedValue(undefined); - vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + const impl = async ( + params: Parameters[0], + ) => { void params.onBlockReply?.({ text: "hello" }); return { payloads: [{ text: "hello" }], @@ -61,7 +74,9 @@ describe("block streaming", () => { agentMeta: { sessionId: "s", provider: "p", model: "m" }, }, }; - }); + }; + vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl); + vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl); const replyPromise = getReplyFromConfig( { @@ -109,7 +124,9 @@ describe("block streaming", () => { seen.push(payload.text ?? ""); }); - vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + const impl = async ( + params: Parameters[0], + ) => { void params.onBlockReply?.({ text: "first" }); void params.onBlockReply?.({ text: "second" }); return { @@ -119,7 +136,9 @@ describe("block streaming", () => { agentMeta: { sessionId: "s", provider: "p", model: "m" }, }, }; - }); + }; + vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl); + vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl); const replyPromise = getReplyFromConfig( { @@ -159,7 +178,9 @@ describe("block streaming", () => { await withTempHome(async (home) => { const onBlockReply = vi.fn().mockResolvedValue(undefined); - vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + const impl = async ( + params: Parameters[0], + ) => { void params.onBlockReply?.({ text: "chunk-1" }); return { payloads: [{ text: "chunk-1\nchunk-2" }], @@ -168,7 +189,9 @@ describe("block streaming", () => { agentMeta: { sessionId: "s", provider: "p", model: "m" }, }, }; - }); + }; + vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl); + vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl); const res = await getReplyFromConfig( { @@ -215,7 +238,9 @@ describe("block streaming", () => { }); }); - vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + const impl = async ( + params: Parameters[0], + ) => { void params.onBlockReply?.({ text: "streamed" }); return { payloads: [{ text: "final" }], @@ -224,7 +249,9 @@ describe("block streaming", () => { agentMeta: { sessionId: "s", provider: "p", model: "m" }, }, }; - }); + }; + vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl); + vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl); const replyPromise = getReplyFromConfig( {