import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; type StubSession = { subscribe: (fn: (evt: unknown) => void) => () => void; }; type SessionEventHandler = (evt: unknown) => void; describe("subscribeEmbeddedPiSession", () => { const _THINKING_TAG_CASES = [ { tag: "think", open: "", close: "" }, { tag: "thinking", open: "", close: "" }, { tag: "thought", open: "", close: "" }, { tag: "antthinking", open: "", close: "" }, ] as const; it("does not emit duplicate block replies when text_end repeats", () => { let handler: SessionEventHandler | undefined; const session: StubSession = { subscribe: (fn) => { handler = fn; return () => {}; }, }; const onBlockReply = vi.fn(); const subscription = subscribeEmbeddedPiSession({ session: session as unknown as Parameters[0]["session"], runId: "run", onBlockReply, blockReplyBreak: "text_end", }); handler?.({ type: "message_update", message: { role: "assistant" }, assistantMessageEvent: { type: "text_delta", delta: "Hello block", }, }); handler?.({ type: "message_update", message: { role: "assistant" }, assistantMessageEvent: { type: "text_end", }, }); handler?.({ type: "message_update", message: { role: "assistant" }, assistantMessageEvent: { type: "text_end", }, }); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(subscription.assistantTexts).toEqual(["Hello block"]); }); it("does not duplicate assistantTexts when message_end repeats", () => { let handler: SessionEventHandler | undefined; const session: StubSession = { subscribe: (fn) => { handler = fn; return () => {}; }, }; const subscription = subscribeEmbeddedPiSession({ session: session as unknown as Parameters[0]["session"], runId: "run", }); const assistantMessage = { role: "assistant", content: [{ type: "text", text: "Hello world" }], } as AssistantMessage; handler?.({ type: "message_end", message: assistantMessage }); handler?.({ type: "message_end", message: assistantMessage }); expect(subscription.assistantTexts).toEqual(["Hello world"]); }); it("does not duplicate assistantTexts when message_end repeats with reasoning blocks", () => { let handler: SessionEventHandler | undefined; const session: StubSession = { subscribe: (fn) => { handler = fn; return () => {}; }, }; const subscription = subscribeEmbeddedPiSession({ session: session as unknown as Parameters[0]["session"], runId: "run", reasoningMode: "on", }); const assistantMessage = { role: "assistant", content: [ { type: "thinking", thinking: "Because" }, { type: "text", text: "Hello world" }, ], } as AssistantMessage; handler?.({ type: "message_end", message: assistantMessage }); handler?.({ type: "message_end", message: assistantMessage }); expect(subscription.assistantTexts).toEqual(["Hello world"]); }); it("populates assistantTexts for non-streaming models with chunking enabled", () => { // Non-streaming models (e.g. zai/glm-4.7): no text_delta events; message_end // must still populate assistantTexts so providers can deliver a final reply. let handler: SessionEventHandler | undefined; const session: StubSession = { subscribe: (fn) => { handler = fn; return () => {}; }, }; const subscription = subscribeEmbeddedPiSession({ session: session as unknown as Parameters[0]["session"], runId: "run", blockReplyChunking: { minChars: 50, maxChars: 200 }, // Chunking enabled }); // Simulate non-streaming model: only message_start and message_end, no text_delta handler?.({ type: "message_start", message: { role: "assistant" } }); const assistantMessage = { role: "assistant", content: [{ type: "text", text: "Response from non-streaming model" }], } as AssistantMessage; handler?.({ type: "message_end", message: assistantMessage }); expect(subscription.assistantTexts).toEqual(["Response from non-streaming model"]); }); });