diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index 63888d4d5..1c8402465 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -24,6 +24,11 @@ export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) { ctx.state.compactionInFlight = true; ctx.ensureCompactionPromise(); ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`); + emitAgentEvent({ + runId: ctx.params.runId, + stream: "compaction", + data: { phase: "start" }, + }); void ctx.params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" }, @@ -43,6 +48,11 @@ export function handleAutoCompactionEnd( } else { ctx.maybeResolveCompactionWait(); } + emitAgentEvent({ + runId: ctx.params.runId, + stream: "compaction", + data: { phase: "end", willRetry }, + }); void ctx.params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry }, diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 1ebee8eef..7e75f56f4 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -109,34 +109,34 @@ export function handleMessageUpdate( .trim(); if (next && next !== ctx.state.lastStreamedAssistant) { const previousText = ctx.state.lastStreamedAssistant ?? ""; - ctx.state.lastStreamedAssistant = next; const { text: cleanedText, mediaUrls } = parseReplyDirectives(next); const { text: previousCleanedText } = parseReplyDirectives(previousText); - const deltaText = cleanedText.startsWith(previousCleanedText) - ? cleanedText.slice(previousCleanedText.length) - : cleanedText; - emitAgentEvent({ - runId: ctx.params.runId, - stream: "assistant", - data: { - text: cleanedText, - delta: deltaText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - void ctx.params.onAgentEvent?.({ - stream: "assistant", - data: { - text: cleanedText, - delta: deltaText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) { - void ctx.params.onPartialReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + if (cleanedText.startsWith(previousCleanedText)) { + const deltaText = cleanedText.slice(previousCleanedText.length); + ctx.state.lastStreamedAssistant = next; + emitAgentEvent({ + runId: ctx.params.runId, + stream: "assistant", + data: { + text: cleanedText, + delta: deltaText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, }); + void ctx.params.onAgentEvent?.({ + stream: "assistant", + data: { + text: cleanedText, + delta: deltaText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, + }); + if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) { + void ctx.params.onPartialReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + } } } diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index 486cbfa65..c142d1544 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -184,4 +184,40 @@ describe("subscribeEmbeddedPiSession", () => { expect(payloads[1]?.text).toBe("Hello world"); expect(payloads[1]?.delta).toBe(" world"); }); + + it("skips agent events when cleaned text rewinds mid-stream", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onAgentEvent, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: "MEDIA:" }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: " https://example.com/a.png\nCaption" }, + }); + + const payloads = onAgentEvent.mock.calls + .map((call) => call[0]?.data as Record | undefined) + .filter((value): value is Record => Boolean(value)); + expect(payloads).toHaveLength(1); + expect(payloads[0]?.text).toBe("MEDIA:"); + }); }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.test.ts index 06b45160d..7483206b7 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it, vi } from "vitest"; +import { onAgentEvent } from "../infra/agent-events.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; type StubSession = { @@ -54,6 +55,44 @@ describe("subscribeEmbeddedPiSession", () => { await waitPromise; expect(resolved).toBe(true); }); + + it("emits compaction events on the agent event bus", async () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const events: Array<{ phase: string; willRetry?: boolean }> = []; + const stop = onAgentEvent((evt) => { + if (evt.runId !== "run-compaction") return; + if (evt.stream !== "compaction") return; + const phase = typeof evt.data?.phase === "string" ? evt.data.phase : ""; + events.push({ + phase, + willRetry: typeof evt.data?.willRetry === "boolean" ? evt.data.willRetry : undefined, + }); + }); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run-compaction", + }); + + handler?.({ type: "auto_compaction_start" }); + handler?.({ type: "auto_compaction_end", willRetry: true }); + handler?.({ type: "auto_compaction_end", willRetry: false }); + + stop(); + + expect(events).toEqual([ + { phase: "start" }, + { phase: "end", willRetry: true }, + { phase: "end", willRetry: false }, + ]); + }); it("emits tool summaries at tool start when verbose is on", async () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { diff --git a/src/commands/agent.test.ts b/src/commands/agent.test.ts index bb8c84e27..bddafbeca 100644 --- a/src/commands/agent.test.ts +++ b/src/commands/agent.test.ts @@ -18,6 +18,7 @@ import { loadModelCatalog } from "../agents/model-catalog.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import type { ClawdbotConfig } from "../config/config.js"; import * as configModule from "../config/config.js"; +import { emitAgentEvent, onAgentEvent } from "../infra/agent-events.js"; import type { RuntimeEnv } from "../runtime.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; import { createPluginRuntime } from "../plugins/runtime/index.js"; @@ -139,6 +140,43 @@ describe("agentCommand", () => { }); }); + it("does not duplicate agent events from embedded runs", async () => { + await withTempHome(async (home) => { + const store = path.join(home, "sessions.json"); + mockConfig(home, store); + + const assistantEvents: Array<{ runId: string; text?: string }> = []; + const stop = onAgentEvent((evt) => { + if (evt.stream !== "assistant") return; + assistantEvents.push({ + runId: evt.runId, + text: typeof evt.data?.text === "string" ? (evt.data.text as string) : undefined, + }); + }); + + vi.mocked(runEmbeddedPiAgent).mockImplementationOnce(async (params) => { + const runId = (params as { runId?: string } | undefined)?.runId ?? "run"; + const data = { text: "hello", delta: "hello" }; + ( + params as { + onAgentEvent?: (evt: { stream: string; data: Record }) => void; + } + ).onAgentEvent?.({ stream: "assistant", data }); + emitAgentEvent({ runId, stream: "assistant", data }); + return { + payloads: [{ text: "hello" }], + meta: { agentMeta: { provider: "p", model: "m" } }, + } as never; + }); + + await agentCommand({ message: "hi", to: "+1555" }, runtime); + stop(); + + const matching = assistantEvents.filter((evt) => evt.text === "hello"); + expect(matching).toHaveLength(1); + }); + }); + it("uses provider/model from agents.defaults.model.primary", async () => { await withTempHome(async (home) => { const store = path.join(home, "sessions.json"); diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 23a2060d0..cd2086eae 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -434,6 +434,7 @@ export async function agentCommand( streamParams: opts.streamParams, agentDir, onAgentEvent: (evt) => { + // Track lifecycle end for fallback emission below. if ( evt.stream === "lifecycle" && typeof evt.data?.phase === "string" && @@ -441,11 +442,6 @@ export async function agentCommand( ) { lifecycleEnded = true; } - emitAgentEvent({ - runId, - stream: evt.stream, - data: evt.data, - }); }, }); }, diff --git a/src/gateway/openai-http.e2e.test.ts b/src/gateway/openai-http.e2e.test.ts index c05ae5a87..be5a39b99 100644 --- a/src/gateway/openai-http.e2e.test.ts +++ b/src/gateway/openai-http.e2e.test.ts @@ -462,6 +462,39 @@ describe("OpenAI-compatible HTTP API (e2e)", () => { } }); + it("preserves repeated identical deltas when streaming SSE", async () => { + agentCommand.mockImplementationOnce(async (opts: unknown) => { + const runId = (opts as { runId?: string } | undefined)?.runId ?? ""; + emitAgentEvent({ runId, stream: "assistant", data: { delta: "hi" } }); + emitAgentEvent({ runId, stream: "assistant", data: { delta: "hi" } }); + return { payloads: [{ text: "hihi" }] } as never; + }); + + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions(port, { + stream: true, + model: "clawdbot", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + const text = await res.text(); + const data = parseSseDataLines(text); + const jsonChunks = data + .filter((d) => d !== "[DONE]") + .map((d) => JSON.parse(d) as Record); + const allContent = jsonChunks + .flatMap((c) => (c.choices as Array> | undefined) ?? []) + .map((choice) => (choice.delta as Record | undefined)?.content) + .filter((v): v is string => typeof v === "string") + .join(""); + expect(allContent).toBe("hihi"); + } finally { + await server.close({ reason: "test done" }); + } + }); + it("streams SSE chunks when stream=true (fallback when no deltas)", async () => { agentCommand.mockResolvedValueOnce({ payloads: [{ text: "hello" }],