fix(streaming): emit assistant deltas

Co-authored-by: Andrew Lauppe <andy@t5tele.com>
This commit is contained in:
Peter Steinberger
2026-01-18 06:24:48 +00:00
parent e39fd7dbb3
commit 0b00e591e1
2 changed files with 45 additions and 0 deletions

View File

@@ -108,13 +108,19 @@ export function handleMessageUpdate(
})
.trim();
if (next && next !== ctx.state.lastStreamedAssistant) {
const previousText = ctx.state.lastStreamedAssistant ?? "";
ctx.state.lastStreamedAssistant = next;
const { text: cleanedText, mediaUrls } = parseReplyDirectives(next);
const { text: previousCleanedText } = parseReplyDirectives(previousText);
const deltaText = cleanedText.startsWith(previousCleanedText)
? cleanedText.slice(previousCleanedText.length)
: cleanedText;
emitAgentEvent({
runId: ctx.params.runId,
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
@@ -122,6 +128,7 @@ export function handleMessageUpdate(
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});

View File

@@ -146,4 +146,42 @@ describe("subscribeEmbeddedPiSession", () => {
expect(combined).toBe("Final answer");
},
);
it("emits delta chunks in agent events for streaming assistant text", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onAgentEvent = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
onAgentEvent,
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "Hello" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: " world" },
});
const payloads = onAgentEvent.mock.calls
.map((call) => call[0]?.data as Record<string, unknown> | undefined)
.filter((value): value is Record<string, unknown> => Boolean(value));
expect(payloads[0]?.text).toBe("Hello");
expect(payloads[0]?.delta).toBe("Hello");
expect(payloads[1]?.text).toBe("Hello world");
expect(payloads[1]?.delta).toBe(" world");
});
});