From 388d30247233205933d452ac4bf1ffe3f0ba7b56 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 22 Jan 2026 08:01:21 +0000 Subject: [PATCH] fix: carry reply tags across streamed chunks --- ...pi-embedded-subscribe.handlers.messages.ts | 53 ++++++-- .../pi-embedded-subscribe.handlers.types.ts | 5 + .../pi-embedded-subscribe.reply-tags.test.ts | 106 +++++++++++++++ src/agents/pi-embedded-subscribe.ts | 10 +- .../reply/streaming-directives.test.ts | 37 ++++++ src/auto-reply/reply/streaming-directives.ts | 124 ++++++++++++++++++ src/utils/directive-tags.ts | 2 + 7 files changed, 323 insertions(+), 14 deletions(-) create mode 100644 src/agents/pi-embedded-subscribe.reply-tags.test.ts create mode 100644 src/auto-reply/reply/streaming-directives.test.ts create mode 100644 src/auto-reply/reply/streaming-directives.ts diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index b2074d866..1f515e113 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -226,24 +226,27 @@ export function handleMessageEnd( ); } else { ctx.state.lastBlockReplyText = text; - const { - text: cleanedText, - mediaUrls, - audioAsVoice, - replyToId, - replyToTag, - replyToCurrent, - } = parseReplyDirectives(text); - // Emit if there's content OR audioAsVoice flag (to propagate the flag). - if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { - void onBlockReply({ + const splitResult = ctx.consumeReplyDirectives(text, { final: true }); + if (splitResult) { + const { text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + mediaUrls, audioAsVoice, replyToId, replyToTag, replyToCurrent, - }); + } = splitResult; + // Emit if there's content OR audioAsVoice flag (to propagate the flag). + if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { + void onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } } } } @@ -254,6 +257,30 @@ export function handleMessageEnd( ctx.emitReasoningStream(rawThinking); } + if (ctx.state.blockReplyBreak === "text_end" && onBlockReply) { + const tailResult = ctx.consumeReplyDirectives("", { final: true }); + if (tailResult) { + const { + text: cleanedText, + mediaUrls, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + } = tailResult; + if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { + void onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } + } + } + ctx.state.deltaBuffer = ""; ctx.state.blockBuffer = ""; ctx.blockChunker?.reset(); diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 94a107961..4a464c5e2 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -1,6 +1,7 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; import type { ReasoningLevel } from "../auto-reply/thinking.js"; +import type { ReplyDirectiveParseResult } from "../auto-reply/reply/reply-directives.js"; import type { InlineCodeState } from "../markdown/code-spans.js"; import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; import type { MessagingToolSend } from "./pi-embedded-messaging.js"; @@ -77,6 +78,10 @@ export type EmbeddedPiSubscribeContext = { emitBlockChunk: (text: string) => void; flushBlockReplyBuffer: () => void; emitReasoningStream: (text: string) => void; + consumeReplyDirectives: ( + text: string, + options?: { final?: boolean }, + ) => ReplyDirectiveParseResult | null; resetAssistantMessageState: (nextAssistantTextBaseline: number) => void; resetForCompactionRetry: () => void; finalizeAssistantTexts: (args: { diff --git a/src/agents/pi-embedded-subscribe.reply-tags.test.ts b/src/agents/pi-embedded-subscribe.reply-tags.test.ts new file mode 100644 index 000000000..5243c8488 --- /dev/null +++ b/src/agents/pi-embedded-subscribe.reply-tags.test.ts @@ -0,0 +1,106 @@ +import type { AssistantMessage } from "@mariozechner/pi-ai"; +import { describe, expect, it, vi } from "vitest"; +import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; + +type StubSession = { + subscribe: (fn: (evt: unknown) => void) => () => void; +}; + +describe("subscribeEmbeddedPiSession reply tags", () => { + it("carries reply_to_current across tag-only block chunks", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + blockReplyChunking: { + minChars: 1, + maxChars: 50, + breakPreference: "newline", + }, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "[[reply_to_current]]\nHello", + }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_end" }, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "[[reply_to_current]]\nHello" }], + } as AssistantMessage; + handler?.({ type: "message_end", message: assistantMessage }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + const payload = onBlockReply.mock.calls[0]?.[0]; + expect(payload?.text).toBe("Hello"); + expect(payload?.replyToCurrent).toBe(true); + expect(payload?.replyToTag).toBe(true); + }); + + it("flushes trailing directive tails on stream end", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + blockReplyChunking: { + minChars: 1, + maxChars: 50, + breakPreference: "newline", + }, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: "Hello [[" }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_end" }, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "Hello [[" }], + } as AssistantMessage; + handler?.({ type: "message_end", message: assistantMessage }); + + expect(onBlockReply).toHaveBeenCalledTimes(2); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello"); + expect(onBlockReply.mock.calls[1]?.[0]?.text).toBe("[["); + }); +}); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index cb6d81be6..a4a4b906a 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -1,4 +1,5 @@ import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js"; +import { createStreamingDirectiveAccumulator } from "../auto-reply/reply/streaming-directives.js"; import { formatToolAggregate } from "../auto-reply/tool-meta.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import type { InlineCodeState } from "../markdown/code-spans.js"; @@ -75,11 +76,13 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const messagingToolSentTargets = state.messagingToolSentTargets; const pendingMessagingTexts = state.pendingMessagingTexts; const pendingMessagingTargets = state.pendingMessagingTargets; + const replyDirectiveAccumulator = createStreamingDirectiveAccumulator(); const resetAssistantMessageState = (nextAssistantTextBaseline: number) => { state.deltaBuffer = ""; state.blockBuffer = ""; blockChunker?.reset(); + replyDirectiveAccumulator.reset(); state.blockState.thinking = false; state.blockState.final = false; state.blockState.inlineCode = createInlineCodeState(); @@ -374,7 +377,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar assistantTexts.push(chunk); rememberAssistantText(chunk); if (!params.onBlockReply) return; - const splitResult = parseReplyDirectives(chunk); + const splitResult = replyDirectiveAccumulator.consume(chunk); + if (!splitResult) return; const { text: cleanedText, mediaUrls, @@ -395,6 +399,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar }); }; + const consumeReplyDirectives = (text: string, options?: { final?: boolean }) => + replyDirectiveAccumulator.consume(text, options); + const flushBlockReplyBuffer = () => { if (!params.onBlockReply) return; if (blockChunker?.hasBuffered()) { @@ -447,6 +454,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar emitBlockChunk, flushBlockReplyBuffer, emitReasoningStream, + consumeReplyDirectives, resetAssistantMessageState, resetForCompactionRetry, finalizeAssistantTexts, diff --git a/src/auto-reply/reply/streaming-directives.test.ts b/src/auto-reply/reply/streaming-directives.test.ts new file mode 100644 index 000000000..02d32ded8 --- /dev/null +++ b/src/auto-reply/reply/streaming-directives.test.ts @@ -0,0 +1,37 @@ +import { describe, expect, it } from "vitest"; +import { createStreamingDirectiveAccumulator } from "./streaming-directives.js"; + +describe("createStreamingDirectiveAccumulator", () => { + it("stashes reply_to_current until a renderable chunk arrives", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + expect(accumulator.consume("[[reply_to_current]]")).toBeNull(); + + const result = accumulator.consume("Hello"); + expect(result?.text).toBe("Hello"); + expect(result?.replyToCurrent).toBe(true); + expect(result?.replyToTag).toBe(true); + }); + + it("handles reply tags split across chunks", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + expect(accumulator.consume("[[reply_to_")).toBeNull(); + + const result = accumulator.consume("current]] Yo"); + expect(result?.text).toBe("Yo"); + expect(result?.replyToCurrent).toBe(true); + expect(result?.replyToTag).toBe(true); + }); + + it("propagates explicit reply ids across chunks", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + expect(accumulator.consume("[[reply_to: abc-123]]")).toBeNull(); + + const result = accumulator.consume("Hi"); + expect(result?.text).toBe("Hi"); + expect(result?.replyToId).toBe("abc-123"); + expect(result?.replyToTag).toBe(true); + }); +}); diff --git a/src/auto-reply/reply/streaming-directives.ts b/src/auto-reply/reply/streaming-directives.ts new file mode 100644 index 000000000..a79e640a1 --- /dev/null +++ b/src/auto-reply/reply/streaming-directives.ts @@ -0,0 +1,124 @@ +import { splitMediaFromOutput } from "../../media/parse.js"; +import { parseInlineDirectives } from "../../utils/directive-tags.js"; +import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; +import type { ReplyDirectiveParseResult } from "./reply-directives.js"; + +type PendingReplyState = { + explicitId?: string; + sawCurrent: boolean; + hasTag: boolean; +}; + +type ParsedChunk = ReplyDirectiveParseResult & { + replyToExplicitId?: string; +}; + +type ConsumeOptions = { + final?: boolean; + silentToken?: string; +}; + +const splitTrailingDirective = (text: string): { text: string; tail: string } => { + const openIndex = text.lastIndexOf("[["); + if (openIndex < 0) return { text, tail: "" }; + const closeIndex = text.indexOf("]]", openIndex + 2); + if (closeIndex >= 0) return { text, tail: "" }; + return { + text: text.slice(0, openIndex), + tail: text.slice(openIndex), + }; +}; + +const parseChunk = (raw: string, options?: { silentToken?: string }): ParsedChunk => { + const split = splitMediaFromOutput(raw); + let text = split.text ?? ""; + + const replyParsed = parseInlineDirectives(text, { + stripAudioTag: false, + stripReplyTags: true, + }); + + if (replyParsed.hasReplyTag) { + text = replyParsed.text; + } + + const silentToken = options?.silentToken ?? SILENT_REPLY_TOKEN; + const isSilent = isSilentReplyText(text, silentToken); + if (isSilent) { + text = ""; + } + + return { + text, + mediaUrls: split.mediaUrls, + mediaUrl: split.mediaUrl, + replyToId: replyParsed.replyToId, + replyToExplicitId: replyParsed.replyToExplicitId, + replyToCurrent: replyParsed.replyToCurrent, + replyToTag: replyParsed.hasReplyTag, + audioAsVoice: split.audioAsVoice, + isSilent, + }; +}; + +const hasRenderableContent = (parsed: ReplyDirectiveParseResult): boolean => + Boolean(parsed.text) || + Boolean(parsed.mediaUrl) || + (parsed.mediaUrls?.length ?? 0) > 0 || + Boolean(parsed.audioAsVoice); + +export function createStreamingDirectiveAccumulator() { + let pendingTail = ""; + let pendingReply: PendingReplyState = { sawCurrent: false, hasTag: false }; + + const reset = () => { + pendingTail = ""; + pendingReply = { sawCurrent: false, hasTag: false }; + }; + + const consume = (raw: string, options: ConsumeOptions = {}): ReplyDirectiveParseResult | null => { + let combined = `${pendingTail}${raw ?? ""}`; + pendingTail = ""; + + if (!options.final) { + const split = splitTrailingDirective(combined); + combined = split.text; + pendingTail = split.tail; + } + + if (!combined) { + return null; + } + + const parsed = parseChunk(combined, { silentToken: options.silentToken }); + const hasTag = pendingReply.hasTag || parsed.replyToTag; + const sawCurrent = pendingReply.sawCurrent || parsed.replyToCurrent; + const explicitId = parsed.replyToExplicitId ?? pendingReply.explicitId; + + const combinedResult: ReplyDirectiveParseResult = { + ...parsed, + replyToId: explicitId, + replyToCurrent: sawCurrent, + replyToTag: hasTag, + }; + + if (!hasRenderableContent(combinedResult)) { + if (hasTag) { + pendingReply = { + explicitId, + sawCurrent, + hasTag, + }; + } + return null; + } + + pendingReply = { sawCurrent: false, hasTag: false }; + return combinedResult; + }; + + return { + consume, + reset, + }; +} diff --git a/src/utils/directive-tags.ts b/src/utils/directive-tags.ts index 040e356bc..a58b143dc 100644 --- a/src/utils/directive-tags.ts +++ b/src/utils/directive-tags.ts @@ -2,6 +2,7 @@ export type InlineDirectiveParseResult = { text: string; audioAsVoice: boolean; replyToId?: string; + replyToExplicitId?: string; replyToCurrent: boolean; hasAudioTag: boolean; hasReplyTag: boolean; @@ -71,6 +72,7 @@ export function parseInlineDirectives( text: cleaned, audioAsVoice, replyToId, + replyToExplicitId: lastExplicitId, replyToCurrent: sawCurrent, hasAudioTag, hasReplyTag,