diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index bd6a3b00d..63888d4d5 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -14,7 +14,7 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) { startedAt: Date.now(), }, }); - ctx.params.onAgentEvent?.({ + void ctx.params.onAgentEvent?.({ stream: "lifecycle", data: { phase: "start" }, }); @@ -24,7 +24,7 @@ export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) { ctx.state.compactionInFlight = true; ctx.ensureCompactionPromise(); ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`); - ctx.params.onAgentEvent?.({ + void ctx.params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" }, }); @@ -43,7 +43,7 @@ export function handleAutoCompactionEnd( } else { ctx.maybeResolveCompactionWait(); } - ctx.params.onAgentEvent?.({ + void ctx.params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry }, }); @@ -59,7 +59,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { endedAt: Date.now(), }, }); - ctx.params.onAgentEvent?.({ + void ctx.params.onAgentEvent?.({ stream: "lifecycle", data: { phase: "end" }, }); diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 9e17a1700..b9318e4ea 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -118,7 +118,7 @@ export function handleMessageUpdate( mediaUrls: mediaUrls?.length ? mediaUrls : undefined, }, }); - ctx.params.onAgentEvent?.({ + void ctx.params.onAgentEvent?.({ stream: "assistant", data: { text: cleanedText, diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 5476ba1ff..6abb4c05e 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -11,7 +11,7 @@ import { } from "./pi-embedded-subscribe.tools.js"; import { inferToolMetaFromArgs } from "./pi-embedded-utils.js"; -export function handleToolExecutionStart( +export async function handleToolExecutionStart( ctx: EmbeddedPiSubscribeContext, evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown }, ) { @@ -53,7 +53,8 @@ export function handleToolExecutionStart( args: args as Record, }, }); - ctx.params.onAgentEvent?.({ + // Await onAgentEvent to ensure typing indicator starts before tool summaries are emitted. + await ctx.params.onAgentEvent?.({ stream: "tool", data: { phase: "start", name: toolName, toolCallId }, }); @@ -108,7 +109,7 @@ export function handleToolExecutionUpdate( partialResult: sanitized, }, }); - ctx.params.onAgentEvent?.({ + void ctx.params.onAgentEvent?.({ stream: "tool", data: { phase: "update", @@ -170,7 +171,7 @@ export function handleToolExecutionEnd( result: sanitizedResult, }, }); - ctx.params.onAgentEvent?.({ + void ctx.params.onAgentEvent?.({ stream: "tool", data: { phase: "result", diff --git a/src/agents/pi-embedded-subscribe.handlers.ts b/src/agents/pi-embedded-subscribe.handlers.ts index b3f755d70..92ed49111 100644 --- a/src/agents/pi-embedded-subscribe.handlers.ts +++ b/src/agents/pi-embedded-subscribe.handlers.ts @@ -32,7 +32,11 @@ export function createEmbeddedPiSessionEventHandler(ctx: EmbeddedPiSubscribeCont handleMessageEnd(ctx, evt as never); return; case "tool_execution_start": - handleToolExecutionStart(ctx, evt as never); + // Async handler - awaits typing indicator before emitting tool summaries. + // Catch rejections to avoid unhandled promise rejection crashes. + handleToolExecutionStart(ctx, evt as never).catch((err) => { + ctx.log.debug(`tool_execution_start handler failed: ${String(err)}`); + }); return; case "tool_execution_update": handleToolExecutionUpdate(ctx, evt as never); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.includes-canvas-action-metadata-tool-summaries.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.includes-canvas-action-metadata-tool-summaries.test.ts index 398c2829d..02fb7b940 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.includes-canvas-action-metadata-tool-summaries.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.includes-canvas-action-metadata-tool-summaries.test.ts @@ -13,7 +13,7 @@ describe("subscribeEmbeddedPiSession", () => { { tag: "antthinking", open: "", close: "" }, ] as const; - it("includes canvas action metadata in tool summaries", () => { + it("includes canvas action metadata in tool summaries", async () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { subscribe: (fn) => { @@ -38,6 +38,9 @@ describe("subscribeEmbeddedPiSession", () => { args: { action: "a2ui_push", jsonlPath: "/tmp/a2ui.jsonl" }, }); + // Wait for async handler to complete + await Promise.resolve(); + expect(onToolResult).toHaveBeenCalledTimes(1); const payload = onToolResult.mock.calls[0][0]; expect(payload.text).toContain("🖼️"); @@ -72,7 +75,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(onToolResult).not.toHaveBeenCalled(); }); - it("emits tool summaries when shouldEmitToolResult overrides verbose", () => { + it("emits tool summaries when shouldEmitToolResult overrides verbose", async () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { subscribe: (fn) => { @@ -98,6 +101,9 @@ describe("subscribeEmbeddedPiSession", () => { args: { path: "/tmp/c.txt" }, }); + // Wait for async handler to complete + await Promise.resolve(); + expect(onToolResult).toHaveBeenCalledTimes(1); }); }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.test.ts index 6e017899e..a28d55358 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.test.ts @@ -14,7 +14,7 @@ describe("subscribeEmbeddedPiSession", () => { { tag: "antthinking", open: "", close: "" }, ] as const; - it("suppresses message_end block replies when the message tool already sent", () => { + it("suppresses message_end block replies when the message tool already sent", async () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { subscribe: (fn) => { @@ -41,6 +41,9 @@ describe("subscribeEmbeddedPiSession", () => { args: { action: "send", to: "+1555", message: messageText }, }); + // Wait for async handler to complete + await Promise.resolve(); + handler?.({ type: "tool_execution_end", toolName: "message", @@ -58,7 +61,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReply).not.toHaveBeenCalled(); }); - it("does not suppress message_end replies when message tool reports error", () => { + it("does not suppress message_end replies when message tool reports error", async () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { subscribe: (fn) => { @@ -85,6 +88,9 @@ describe("subscribeEmbeddedPiSession", () => { args: { action: "send", to: "+1555", message: messageText }, }); + // Wait for async handler to complete + await Promise.resolve(); + handler?.({ type: "tool_execution_end", toolName: "message", diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.test.ts index 05622834c..571781eee 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.test.ts @@ -54,7 +54,7 @@ describe("subscribeEmbeddedPiSession", () => { await waitPromise; expect(resolved).toBe(true); }); - it("emits tool summaries at tool start when verbose is on", () => { + it("emits tool summaries at tool start when verbose is on", async () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { subscribe: (fn) => { @@ -79,6 +79,9 @@ describe("subscribeEmbeddedPiSession", () => { args: { path: "/tmp/a.txt" }, }); + // Wait for async handler to complete + await Promise.resolve(); + expect(onToolResult).toHaveBeenCalledTimes(1); const payload = onToolResult.mock.calls[0][0]; expect(payload.text).toContain("/tmp/a.txt"); @@ -93,7 +96,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(onToolResult).toHaveBeenCalledTimes(1); }); - it("includes browser action metadata in tool summaries", () => { + it("includes browser action metadata in tool summaries", async () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { subscribe: (fn) => { @@ -118,6 +121,9 @@ describe("subscribeEmbeddedPiSession", () => { args: { action: "snapshot", targetUrl: "https://example.com" }, }); + // Wait for async handler to complete + await Promise.resolve(); + expect(onToolResult).toHaveBeenCalledTimes(1); const payload = onToolResult.mock.calls[0][0]; expect(payload.text).toContain("🌐"); diff --git a/src/agents/pi-embedded-subscribe.types.ts b/src/agents/pi-embedded-subscribe.types.ts index 7237d6ac4..5cb8b5144 100644 --- a/src/agents/pi-embedded-subscribe.types.ts +++ b/src/agents/pi-embedded-subscribe.types.ts @@ -22,7 +22,10 @@ export type SubscribeEmbeddedPiSessionParams = { blockReplyChunking?: BlockReplyChunking; onPartialReply?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; onAssistantMessageStart?: () => void | Promise; - onAgentEvent?: (evt: { stream: string; data: Record }) => void; + onAgentEvent?: (evt: { + stream: string; + data: Record; + }) => void | Promise; enforceFinalTag?: boolean; }; diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 9c9f9c0a0..abecf58be 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -26,7 +26,7 @@ import type { VerboseLevel } from "../thinking.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { buildThreadingToolContext, resolveEnforceFinalTag } from "./agent-runner-utils.js"; -import type { BlockReplyPipeline } from "./block-reply-pipeline.js"; +import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js"; import type { FollowupRun } from "./queue.js"; import { parseReplyDirectives } from "./reply-directives.js"; import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js"; @@ -40,6 +40,8 @@ export type AgentRunLoopResult = fallbackModel?: string; didLogHeartbeatStrip: boolean; autoCompactionCompleted: boolean; + /** Payload keys sent directly (not via pipeline) during tool flush. */ + directlySentBlockKeys?: Set; } | { kind: "final"; payload: ReplyPayload }; @@ -70,6 +72,8 @@ export async function runAgentTurnWithFallback(params: { }): Promise { let didLogHeartbeatStrip = false; let autoCompactionCompleted = false; + // Track payloads sent directly (not via pipeline) during tool flush to avoid duplicates. + const directlySentBlockKeys = new Set(); const runId = crypto.randomUUID(); if (params.sessionKey) { @@ -244,12 +248,13 @@ export async function runAgentTurnWithFallback(params: { }); } : undefined, - onAgentEvent: (evt) => { - // Trigger typing when tools start executing + onAgentEvent: async (evt) => { + // Trigger typing when tools start executing. + // Must await to ensure typing indicator starts before tool summaries are emitted. if (evt.stream === "tool") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; if (phase === "start" || phase === "update") { - void params.typingSignals.signalToolStart(); + await params.typingSignals.signalToolStart(); } } // Track auto-compaction completion @@ -261,57 +266,67 @@ export async function runAgentTurnWithFallback(params: { } } }, - onBlockReply: - params.blockStreamingEnabled && params.opts?.onBlockReply - ? async (payload) => { - const { text, skip } = normalizeStreamingText(payload); - const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0; - if (skip && !hasPayloadMedia) return; - const taggedPayload = applyReplyTagsToPayload( - { - text, - mediaUrls: payload.mediaUrls, - mediaUrl: payload.mediaUrls?.[0], - }, - params.sessionCtx.MessageSid, - ); - // Let through payloads with audioAsVoice flag even if empty (need to track it) - if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return; - const parsed = parseReplyDirectives(taggedPayload.text ?? "", { - currentMessageId: params.sessionCtx.MessageSid, - silentToken: SILENT_REPLY_TOKEN, - }); - const cleaned = parsed.text || undefined; - const hasRenderableMedia = - Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; - // Skip empty payloads unless they have audioAsVoice flag (need to track it) - if ( - !cleaned && - !hasRenderableMedia && - !payload.audioAsVoice && - !parsed.audioAsVoice - ) - return; - if (parsed.isSilent && !hasRenderableMedia) return; + // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, + // even when regular block streaming is disabled. The handler sends directly + // via opts.onBlockReply when the pipeline isn't available. + onBlockReply: params.opts?.onBlockReply + ? async (payload) => { + const { text, skip } = normalizeStreamingText(payload); + const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0; + if (skip && !hasPayloadMedia) return; + const taggedPayload = applyReplyTagsToPayload( + { + text, + mediaUrls: payload.mediaUrls, + mediaUrl: payload.mediaUrls?.[0], + }, + params.sessionCtx.MessageSid, + ); + // Let through payloads with audioAsVoice flag even if empty (need to track it) + if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return; + const parsed = parseReplyDirectives(taggedPayload.text ?? "", { + currentMessageId: params.sessionCtx.MessageSid, + silentToken: SILENT_REPLY_TOKEN, + }); + const cleaned = parsed.text || undefined; + const hasRenderableMedia = + Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; + // Skip empty payloads unless they have audioAsVoice flag (need to track it) + if ( + !cleaned && + !hasRenderableMedia && + !payload.audioAsVoice && + !parsed.audioAsVoice + ) + return; + if (parsed.isSilent && !hasRenderableMedia) return; - const blockPayload: ReplyPayload = params.applyReplyToMode({ - ...taggedPayload, - text: cleaned, - audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice), - replyToId: taggedPayload.replyToId ?? parsed.replyToId, - replyToTag: taggedPayload.replyToTag || parsed.replyToTag, - replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent, + const blockPayload: ReplyPayload = params.applyReplyToMode({ + ...taggedPayload, + text: cleaned, + audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice), + replyToId: taggedPayload.replyToId ?? parsed.replyToId, + replyToTag: taggedPayload.replyToTag || parsed.replyToTag, + replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent, + }); + + void params.typingSignals + .signalTextDelta(cleaned ?? taggedPayload.text) + .catch((err) => { + logVerbose(`block reply typing signal failed: ${String(err)}`); }); - void params.typingSignals - .signalTextDelta(cleaned ?? taggedPayload.text) - .catch((err) => { - logVerbose(`block reply typing signal failed: ${String(err)}`); - }); - - params.blockReplyPipeline?.enqueue(blockPayload); + // Use pipeline if available (block streaming enabled), otherwise send directly + if (params.blockStreamingEnabled && params.blockReplyPipeline) { + params.blockReplyPipeline.enqueue(blockPayload); + } else { + // Send directly when flushing before tool execution (no streaming). + // Track sent key to avoid duplicate in final payloads. + directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); + await params.opts?.onBlockReply?.(blockPayload); } - : undefined, + } + : undefined, onBlockReplyFlush: params.blockStreamingEnabled && blockReplyPipeline ? async () => { @@ -447,5 +462,6 @@ export async function runAgentTurnWithFallback(params: { fallbackModel, didLogHeartbeatStrip, autoCompactionCompleted, + directlySentBlockKeys: directlySentBlockKeys.size > 0 ? directlySentBlockKeys : undefined, }; } diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index 582ed53f3..60d6fa763 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -5,7 +5,7 @@ import type { OriginatingChannelType } from "../templating.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { ReplyPayload } from "../types.js"; import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js"; -import type { BlockReplyPipeline } from "./block-reply-pipeline.js"; +import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js"; import { parseReplyDirectives } from "./reply-directives.js"; import { applyReplyThreading, @@ -20,6 +20,8 @@ export function buildReplyPayloads(params: { didLogHeartbeatStrip: boolean; blockStreamingEnabled: boolean; blockReplyPipeline: BlockReplyPipeline | null; + /** Payload keys sent directly (not via pipeline) during tool flush. */ + directlySentBlockKeys?: Set; replyToMode: ReplyToMode; replyToChannel?: OriginatingChannelType; currentMessageId?: string; @@ -98,11 +100,16 @@ export function buildReplyPayloads(params: { payloads: replyTaggedPayloads, sentTexts: messagingToolSentTexts, }); + // Filter out payloads already sent via pipeline or directly during tool flush. const filteredPayloads = shouldDropFinalPayloads ? [] : params.blockStreamingEnabled ? dedupedPayloads.filter((payload) => !params.blockReplyPipeline?.hasSentPayload(payload)) - : dedupedPayloads; + : params.directlySentBlockKeys?.size + ? dedupedPayloads.filter( + (payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)), + ) + : dedupedPayloads; const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads; return { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index e747a9b51..c23328f58 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -272,7 +272,7 @@ export async function runReplyAgent(params: { return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn); } - const { runResult, fallbackProvider, fallbackModel } = runOutcome; + const { runResult, fallbackProvider, fallbackModel, directlySentBlockKeys } = runOutcome; let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome; if ( @@ -314,6 +314,7 @@ export async function runReplyAgent(params: { didLogHeartbeatStrip, blockStreamingEnabled, blockReplyPipeline, + directlySentBlockKeys, replyToMode, replyToChannel, currentMessageId: sessionCtx.MessageSid,