fix: carry reply tags across streamed chunks

This commit is contained in:
Peter Steinberger
2026-01-22 08:01:21 +00:00
parent e0c19607b7
commit 388d302472
7 changed files with 323 additions and 14 deletions

View File

@@ -226,24 +226,27 @@ export function handleMessageEnd(
);
} else {
ctx.state.lastBlockReplyText = text;
const {
text: cleanedText,
mediaUrls,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
} = parseReplyDirectives(text);
// Emit if there's content OR audioAsVoice flag (to propagate the flag).
if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) {
void onBlockReply({
const splitResult = ctx.consumeReplyDirectives(text, { final: true });
if (splitResult) {
const {
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
mediaUrls,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
});
} = splitResult;
// Emit if there's content OR audioAsVoice flag (to propagate the flag).
if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) {
void onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
});
}
}
}
}
@@ -254,6 +257,30 @@ export function handleMessageEnd(
ctx.emitReasoningStream(rawThinking);
}
if (ctx.state.blockReplyBreak === "text_end" && onBlockReply) {
const tailResult = ctx.consumeReplyDirectives("", { final: true });
if (tailResult) {
const {
text: cleanedText,
mediaUrls,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
} = tailResult;
if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) {
void onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
});
}
}
}
ctx.state.deltaBuffer = "";
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();

View File

@@ -1,6 +1,7 @@
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import type { ReasoningLevel } from "../auto-reply/thinking.js";
import type { ReplyDirectiveParseResult } from "../auto-reply/reply/reply-directives.js";
import type { InlineCodeState } from "../markdown/code-spans.js";
import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
@@ -77,6 +78,10 @@ export type EmbeddedPiSubscribeContext = {
emitBlockChunk: (text: string) => void;
flushBlockReplyBuffer: () => void;
emitReasoningStream: (text: string) => void;
consumeReplyDirectives: (
text: string,
options?: { final?: boolean },
) => ReplyDirectiveParseResult | null;
resetAssistantMessageState: (nextAssistantTextBaseline: number) => void;
resetForCompactionRetry: () => void;
finalizeAssistantTexts: (args: {

View File

@@ -0,0 +1,106 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type StubSession = {
subscribe: (fn: (evt: unknown) => void) => () => void;
};
describe("subscribeEmbeddedPiSession reply tags", () => {
it("carries reply_to_current across tag-only block chunks", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
blockReplyChunking: {
minChars: 1,
maxChars: 50,
breakPreference: "newline",
},
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "[[reply_to_current]]\nHello",
},
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_end" },
});
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text: "[[reply_to_current]]\nHello" }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(1);
const payload = onBlockReply.mock.calls[0]?.[0];
expect(payload?.text).toBe("Hello");
expect(payload?.replyToCurrent).toBe(true);
expect(payload?.replyToTag).toBe(true);
});
it("flushes trailing directive tails on stream end", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
blockReplyChunking: {
minChars: 1,
maxChars: 50,
breakPreference: "newline",
},
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "Hello [[" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_end" },
});
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text: "Hello [[" }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(2);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello");
expect(onBlockReply.mock.calls[1]?.[0]?.text).toBe("[[");
});
});

View File

@@ -1,4 +1,5 @@
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
import { createStreamingDirectiveAccumulator } from "../auto-reply/reply/streaming-directives.js";
import { formatToolAggregate } from "../auto-reply/tool-meta.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import type { InlineCodeState } from "../markdown/code-spans.js";
@@ -75,11 +76,13 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const messagingToolSentTargets = state.messagingToolSentTargets;
const pendingMessagingTexts = state.pendingMessagingTexts;
const pendingMessagingTargets = state.pendingMessagingTargets;
const replyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
state.deltaBuffer = "";
state.blockBuffer = "";
blockChunker?.reset();
replyDirectiveAccumulator.reset();
state.blockState.thinking = false;
state.blockState.final = false;
state.blockState.inlineCode = createInlineCodeState();
@@ -374,7 +377,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
assistantTexts.push(chunk);
rememberAssistantText(chunk);
if (!params.onBlockReply) return;
const splitResult = parseReplyDirectives(chunk);
const splitResult = replyDirectiveAccumulator.consume(chunk);
if (!splitResult) return;
const {
text: cleanedText,
mediaUrls,
@@ -395,6 +399,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
});
};
const consumeReplyDirectives = (text: string, options?: { final?: boolean }) =>
replyDirectiveAccumulator.consume(text, options);
const flushBlockReplyBuffer = () => {
if (!params.onBlockReply) return;
if (blockChunker?.hasBuffered()) {
@@ -447,6 +454,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
emitBlockChunk,
flushBlockReplyBuffer,
emitReasoningStream,
consumeReplyDirectives,
resetAssistantMessageState,
resetForCompactionRetry,
finalizeAssistantTexts,

View File

@@ -0,0 +1,37 @@
import { describe, expect, it } from "vitest";
import { createStreamingDirectiveAccumulator } from "./streaming-directives.js";
describe("createStreamingDirectiveAccumulator", () => {
it("stashes reply_to_current until a renderable chunk arrives", () => {
const accumulator = createStreamingDirectiveAccumulator();
expect(accumulator.consume("[[reply_to_current]]")).toBeNull();
const result = accumulator.consume("Hello");
expect(result?.text).toBe("Hello");
expect(result?.replyToCurrent).toBe(true);
expect(result?.replyToTag).toBe(true);
});
it("handles reply tags split across chunks", () => {
const accumulator = createStreamingDirectiveAccumulator();
expect(accumulator.consume("[[reply_to_")).toBeNull();
const result = accumulator.consume("current]] Yo");
expect(result?.text).toBe("Yo");
expect(result?.replyToCurrent).toBe(true);
expect(result?.replyToTag).toBe(true);
});
it("propagates explicit reply ids across chunks", () => {
const accumulator = createStreamingDirectiveAccumulator();
expect(accumulator.consume("[[reply_to: abc-123]]")).toBeNull();
const result = accumulator.consume("Hi");
expect(result?.text).toBe("Hi");
expect(result?.replyToId).toBe("abc-123");
expect(result?.replyToTag).toBe(true);
});
});

View File

@@ -0,0 +1,124 @@
import { splitMediaFromOutput } from "../../media/parse.js";
import { parseInlineDirectives } from "../../utils/directive-tags.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyDirectiveParseResult } from "./reply-directives.js";
type PendingReplyState = {
explicitId?: string;
sawCurrent: boolean;
hasTag: boolean;
};
type ParsedChunk = ReplyDirectiveParseResult & {
replyToExplicitId?: string;
};
type ConsumeOptions = {
final?: boolean;
silentToken?: string;
};
const splitTrailingDirective = (text: string): { text: string; tail: string } => {
const openIndex = text.lastIndexOf("[[");
if (openIndex < 0) return { text, tail: "" };
const closeIndex = text.indexOf("]]", openIndex + 2);
if (closeIndex >= 0) return { text, tail: "" };
return {
text: text.slice(0, openIndex),
tail: text.slice(openIndex),
};
};
const parseChunk = (raw: string, options?: { silentToken?: string }): ParsedChunk => {
const split = splitMediaFromOutput(raw);
let text = split.text ?? "";
const replyParsed = parseInlineDirectives(text, {
stripAudioTag: false,
stripReplyTags: true,
});
if (replyParsed.hasReplyTag) {
text = replyParsed.text;
}
const silentToken = options?.silentToken ?? SILENT_REPLY_TOKEN;
const isSilent = isSilentReplyText(text, silentToken);
if (isSilent) {
text = "";
}
return {
text,
mediaUrls: split.mediaUrls,
mediaUrl: split.mediaUrl,
replyToId: replyParsed.replyToId,
replyToExplicitId: replyParsed.replyToExplicitId,
replyToCurrent: replyParsed.replyToCurrent,
replyToTag: replyParsed.hasReplyTag,
audioAsVoice: split.audioAsVoice,
isSilent,
};
};
const hasRenderableContent = (parsed: ReplyDirectiveParseResult): boolean =>
Boolean(parsed.text) ||
Boolean(parsed.mediaUrl) ||
(parsed.mediaUrls?.length ?? 0) > 0 ||
Boolean(parsed.audioAsVoice);
export function createStreamingDirectiveAccumulator() {
let pendingTail = "";
let pendingReply: PendingReplyState = { sawCurrent: false, hasTag: false };
const reset = () => {
pendingTail = "";
pendingReply = { sawCurrent: false, hasTag: false };
};
const consume = (raw: string, options: ConsumeOptions = {}): ReplyDirectiveParseResult | null => {
let combined = `${pendingTail}${raw ?? ""}`;
pendingTail = "";
if (!options.final) {
const split = splitTrailingDirective(combined);
combined = split.text;
pendingTail = split.tail;
}
if (!combined) {
return null;
}
const parsed = parseChunk(combined, { silentToken: options.silentToken });
const hasTag = pendingReply.hasTag || parsed.replyToTag;
const sawCurrent = pendingReply.sawCurrent || parsed.replyToCurrent;
const explicitId = parsed.replyToExplicitId ?? pendingReply.explicitId;
const combinedResult: ReplyDirectiveParseResult = {
...parsed,
replyToId: explicitId,
replyToCurrent: sawCurrent,
replyToTag: hasTag,
};
if (!hasRenderableContent(combinedResult)) {
if (hasTag) {
pendingReply = {
explicitId,
sawCurrent,
hasTag,
};
}
return null;
}
pendingReply = { sawCurrent: false, hasTag: false };
return combinedResult;
};
return {
consume,
reset,
};
}

View File

@@ -2,6 +2,7 @@ export type InlineDirectiveParseResult = {
text: string;
audioAsVoice: boolean;
replyToId?: string;
replyToExplicitId?: string;
replyToCurrent: boolean;
hasAudioTag: boolean;
hasReplyTag: boolean;
@@ -71,6 +72,7 @@ export function parseInlineDirectives(
text: cleaned,
audioAsVoice,
replyToId,
replyToExplicitId: lastExplicitId,
replyToCurrent: sawCurrent,
hasAudioTag,
hasReplyTag,