fix: prevent duplicate agent event emission

This commit is contained in:
Peter Steinberger
2026-01-20 09:24:07 +00:00
parent 9dbc1435a6
commit 94af5a72fc
7 changed files with 182 additions and 30 deletions

View File

@@ -24,6 +24,11 @@ export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) {
ctx.state.compactionInFlight = true;
ctx.ensureCompactionPromise();
ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`);
emitAgentEvent({
runId: ctx.params.runId,
stream: "compaction",
data: { phase: "start" },
});
void ctx.params.onAgentEvent?.({
stream: "compaction",
data: { phase: "start" },
@@ -43,6 +48,11 @@ export function handleAutoCompactionEnd(
} else {
ctx.maybeResolveCompactionWait();
}
emitAgentEvent({
runId: ctx.params.runId,
stream: "compaction",
data: { phase: "end", willRetry },
});
void ctx.params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry },

View File

@@ -109,34 +109,34 @@ export function handleMessageUpdate(
.trim();
if (next && next !== ctx.state.lastStreamedAssistant) {
const previousText = ctx.state.lastStreamedAssistant ?? "";
ctx.state.lastStreamedAssistant = next;
const { text: cleanedText, mediaUrls } = parseReplyDirectives(next);
const { text: previousCleanedText } = parseReplyDirectives(previousText);
const deltaText = cleanedText.startsWith(previousCleanedText)
? cleanedText.slice(previousCleanedText.length)
: cleanedText;
emitAgentEvent({
runId: ctx.params.runId,
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
void ctx.params.onAgentEvent?.({
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) {
void ctx.params.onPartialReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
if (cleanedText.startsWith(previousCleanedText)) {
const deltaText = cleanedText.slice(previousCleanedText.length);
ctx.state.lastStreamedAssistant = next;
emitAgentEvent({
runId: ctx.params.runId,
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
void ctx.params.onAgentEvent?.({
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) {
void ctx.params.onPartialReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
}
}
}

View File

@@ -184,4 +184,40 @@ describe("subscribeEmbeddedPiSession", () => {
expect(payloads[1]?.text).toBe("Hello world");
expect(payloads[1]?.delta).toBe(" world");
});
it("skips agent events when cleaned text rewinds mid-stream", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onAgentEvent = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
onAgentEvent,
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "MEDIA:" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: " https://example.com/a.png\nCaption" },
});
const payloads = onAgentEvent.mock.calls
.map((call) => call[0]?.data as Record<string, unknown> | undefined)
.filter((value): value is Record<string, unknown> => Boolean(value));
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe("MEDIA:");
});
});

View File

@@ -1,4 +1,5 @@
import { describe, expect, it, vi } from "vitest";
import { onAgentEvent } from "../infra/agent-events.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type StubSession = {
@@ -54,6 +55,44 @@ describe("subscribeEmbeddedPiSession", () => {
await waitPromise;
expect(resolved).toBe(true);
});
it("emits compaction events on the agent event bus", async () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const events: Array<{ phase: string; willRetry?: boolean }> = [];
const stop = onAgentEvent((evt) => {
if (evt.runId !== "run-compaction") return;
if (evt.stream !== "compaction") return;
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : "";
events.push({
phase,
willRetry: typeof evt.data?.willRetry === "boolean" ? evt.data.willRetry : undefined,
});
});
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run-compaction",
});
handler?.({ type: "auto_compaction_start" });
handler?.({ type: "auto_compaction_end", willRetry: true });
handler?.({ type: "auto_compaction_end", willRetry: false });
stop();
expect(events).toEqual([
{ phase: "start" },
{ phase: "end", willRetry: true },
{ phase: "end", willRetry: false },
]);
});
it("emits tool summaries at tool start when verbose is on", async () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -18,6 +18,7 @@ import { loadModelCatalog } from "../agents/model-catalog.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import type { ClawdbotConfig } from "../config/config.js";
import * as configModule from "../config/config.js";
import { emitAgentEvent, onAgentEvent } from "../infra/agent-events.js";
import type { RuntimeEnv } from "../runtime.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createPluginRuntime } from "../plugins/runtime/index.js";
@@ -139,6 +140,43 @@ describe("agentCommand", () => {
});
});
it("does not duplicate agent events from embedded runs", async () => {
await withTempHome(async (home) => {
const store = path.join(home, "sessions.json");
mockConfig(home, store);
const assistantEvents: Array<{ runId: string; text?: string }> = [];
const stop = onAgentEvent((evt) => {
if (evt.stream !== "assistant") return;
assistantEvents.push({
runId: evt.runId,
text: typeof evt.data?.text === "string" ? (evt.data.text as string) : undefined,
});
});
vi.mocked(runEmbeddedPiAgent).mockImplementationOnce(async (params) => {
const runId = (params as { runId?: string } | undefined)?.runId ?? "run";
const data = { text: "hello", delta: "hello" };
(
params as {
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
}
).onAgentEvent?.({ stream: "assistant", data });
emitAgentEvent({ runId, stream: "assistant", data });
return {
payloads: [{ text: "hello" }],
meta: { agentMeta: { provider: "p", model: "m" } },
} as never;
});
await agentCommand({ message: "hi", to: "+1555" }, runtime);
stop();
const matching = assistantEvents.filter((evt) => evt.text === "hello");
expect(matching).toHaveLength(1);
});
});
it("uses provider/model from agents.defaults.model.primary", async () => {
await withTempHome(async (home) => {
const store = path.join(home, "sessions.json");

View File

@@ -434,6 +434,7 @@ export async function agentCommand(
streamParams: opts.streamParams,
agentDir,
onAgentEvent: (evt) => {
// Track lifecycle end for fallback emission below.
if (
evt.stream === "lifecycle" &&
typeof evt.data?.phase === "string" &&
@@ -441,11 +442,6 @@ export async function agentCommand(
) {
lifecycleEnded = true;
}
emitAgentEvent({
runId,
stream: evt.stream,
data: evt.data,
});
},
});
},

View File

@@ -462,6 +462,39 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
}
});
it("preserves repeated identical deltas when streaming SSE", async () => {
agentCommand.mockImplementationOnce(async (opts: unknown) => {
const runId = (opts as { runId?: string } | undefined)?.runId ?? "";
emitAgentEvent({ runId, stream: "assistant", data: { delta: "hi" } });
emitAgentEvent({ runId, stream: "assistant", data: { delta: "hi" } });
return { payloads: [{ text: "hihi" }] } as never;
});
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(port, {
stream: true,
model: "clawdbot",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
const text = await res.text();
const data = parseSseDataLines(text);
const jsonChunks = data
.filter((d) => d !== "[DONE]")
.map((d) => JSON.parse(d) as Record<string, unknown>);
const allContent = jsonChunks
.flatMap((c) => (c.choices as Array<Record<string, unknown>> | undefined) ?? [])
.map((choice) => (choice.delta as Record<string, unknown> | undefined)?.content)
.filter((v): v is string => typeof v === "string")
.join("");
expect(allContent).toBe("hihi");
} finally {
await server.close({ reason: "test done" });
}
});
it("streams SSE chunks when stream=true (fallback when no deltas)", async () => {
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],