Previously, when block streaming was disabled (the default), text generated between tool calls would only appear after all tools completed. This was because onBlockReply wasn't passed to the subscription when block streaming was off, so flushBlockReplyBuffer() before tool execution did nothing. Now onBlockReply is always passed, and when block streaming is disabled, block replies are sent directly during tool flush. Directly sent payloads are tracked to avoid duplicates in final payloads. Also fixes a race condition where tool summaries could be emitted before the typing indicator started by awaiting onAgentEvent in tool handlers.
157 lines
4.5 KiB
TypeScript
157 lines
4.5 KiB
TypeScript
import type { AssistantMessage } from "@mariozechner/pi-ai";
|
|
import { describe, expect, it, vi } from "vitest";
|
|
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
|
|
|
|
type StubSession = {
|
|
subscribe: (fn: (evt: unknown) => void) => () => void;
|
|
};
|
|
|
|
describe("subscribeEmbeddedPiSession", () => {
|
|
const _THINKING_TAG_CASES = [
|
|
{ tag: "think", open: "<think>", close: "</think>" },
|
|
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
|
|
{ tag: "thought", open: "<thought>", close: "</thought>" },
|
|
{ tag: "antthinking", open: "<antthinking>", close: "</antthinking>" },
|
|
] as const;
|
|
|
|
it("suppresses message_end block replies when the message tool already sent", async () => {
|
|
let handler: ((evt: unknown) => void) | undefined;
|
|
const session: StubSession = {
|
|
subscribe: (fn) => {
|
|
handler = fn;
|
|
return () => {};
|
|
},
|
|
};
|
|
|
|
const onBlockReply = vi.fn();
|
|
|
|
subscribeEmbeddedPiSession({
|
|
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
|
|
runId: "run",
|
|
onBlockReply,
|
|
blockReplyBreak: "message_end",
|
|
});
|
|
|
|
const messageText = "This is the answer.";
|
|
|
|
handler?.({
|
|
type: "tool_execution_start",
|
|
toolName: "message",
|
|
toolCallId: "tool-message-1",
|
|
args: { action: "send", to: "+1555", message: messageText },
|
|
});
|
|
|
|
// Wait for async handler to complete
|
|
await Promise.resolve();
|
|
|
|
handler?.({
|
|
type: "tool_execution_end",
|
|
toolName: "message",
|
|
toolCallId: "tool-message-1",
|
|
isError: false,
|
|
result: "ok",
|
|
});
|
|
|
|
const assistantMessage = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: messageText }],
|
|
} as AssistantMessage;
|
|
|
|
handler?.({ type: "message_end", message: assistantMessage });
|
|
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
});
|
|
it("does not suppress message_end replies when message tool reports error", async () => {
|
|
let handler: ((evt: unknown) => void) | undefined;
|
|
const session: StubSession = {
|
|
subscribe: (fn) => {
|
|
handler = fn;
|
|
return () => {};
|
|
},
|
|
};
|
|
|
|
const onBlockReply = vi.fn();
|
|
|
|
subscribeEmbeddedPiSession({
|
|
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
|
|
runId: "run",
|
|
onBlockReply,
|
|
blockReplyBreak: "message_end",
|
|
});
|
|
|
|
const messageText = "Please retry the send.";
|
|
|
|
handler?.({
|
|
type: "tool_execution_start",
|
|
toolName: "message",
|
|
toolCallId: "tool-message-err",
|
|
args: { action: "send", to: "+1555", message: messageText },
|
|
});
|
|
|
|
// Wait for async handler to complete
|
|
await Promise.resolve();
|
|
|
|
handler?.({
|
|
type: "tool_execution_end",
|
|
toolName: "message",
|
|
toolCallId: "tool-message-err",
|
|
isError: false,
|
|
result: { details: { status: "error" } },
|
|
});
|
|
|
|
const assistantMessage = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: messageText }],
|
|
} as AssistantMessage;
|
|
|
|
handler?.({ type: "message_end", message: assistantMessage });
|
|
|
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
|
});
|
|
it("clears block reply state on message_start", () => {
|
|
let handler: ((evt: unknown) => void) | undefined;
|
|
const session: StubSession = {
|
|
subscribe: (fn) => {
|
|
handler = fn;
|
|
return () => {};
|
|
},
|
|
};
|
|
|
|
const onBlockReply = vi.fn();
|
|
|
|
subscribeEmbeddedPiSession({
|
|
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
|
|
runId: "run",
|
|
onBlockReply,
|
|
blockReplyBreak: "text_end",
|
|
});
|
|
|
|
handler?.({ type: "message_start", message: { role: "assistant" } });
|
|
handler?.({
|
|
type: "message_update",
|
|
message: { role: "assistant" },
|
|
assistantMessageEvent: { type: "text_delta", delta: "OK" },
|
|
});
|
|
handler?.({
|
|
type: "message_update",
|
|
message: { role: "assistant" },
|
|
assistantMessageEvent: { type: "text_end" },
|
|
});
|
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
|
|
|
// New assistant message with identical output should still emit.
|
|
handler?.({ type: "message_start", message: { role: "assistant" } });
|
|
handler?.({
|
|
type: "message_update",
|
|
message: { role: "assistant" },
|
|
assistantMessageEvent: { type: "text_delta", delta: "OK" },
|
|
});
|
|
handler?.({
|
|
type: "message_update",
|
|
message: { role: "assistant" },
|
|
assistantMessageEvent: { type: "text_end" },
|
|
});
|
|
expect(onBlockReply).toHaveBeenCalledTimes(2);
|
|
});
|
|
});
|