diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index e7ee357e6..15ff46ab8 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -106,6 +106,7 @@ export async function runReplyAgent(params: { const streamedPayloadKeys = new Set(); const pendingStreamedPayloadKeys = new Set(); const pendingBlockTasks = new Set>(); + const pendingToolTasks = new Set>(); let didStreamBlockReply = false; const buildPayloadKey = (payload: ReplyPayload) => { const text = payload.text?.trim() ?? ""; @@ -312,33 +313,42 @@ export async function runReplyAgent(params: { : undefined, shouldEmitToolResult, onToolResult: opts?.onToolResult - ? async (payload) => { - let text = payload.text; - if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { - const stripped = stripHeartbeatToken(text, { - mode: "message", + ? (payload) => { + const task = (async () => { + let text = payload.text; + if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { + const stripped = stripHeartbeatToken(text, { + mode: "message", + }); + if (stripped.didStrip && !didLogHeartbeatStrip) { + didLogHeartbeatStrip = true; + logVerbose( + "Stripped stray HEARTBEAT_OK token from reply", + ); + } + if ( + stripped.shouldSkip && + (payload.mediaUrls?.length ?? 0) === 0 + ) { + return; + } + text = stripped.text; + } + if (!isHeartbeat) { + await typing.startTypingOnText(text); + } + await opts.onToolResult?.({ + text, + mediaUrls: payload.mediaUrls, }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose( - "Stripped stray HEARTBEAT_OK token from reply", - ); - } - if ( - stripped.shouldSkip && - (payload.mediaUrls?.length ?? 0) === 0 - ) { - return; - } - text = stripped.text; - } - if (!isHeartbeat) { - await typing.startTypingOnText(text); - } - await opts.onToolResult?.({ - text, - mediaUrls: payload.mediaUrls, - }); + })() + .catch((err) => { + logVerbose(`tool result delivery failed: ${String(err)}`); + }) + .finally(() => { + pendingToolTasks.delete(task); + }); + pendingToolTasks.add(task); } : undefined, }), @@ -378,6 +388,9 @@ export async function runReplyAgent(params: { if (pendingBlockTasks.size > 0) { await Promise.allSettled(pendingBlockTasks); } + if (pendingToolTasks.size > 0) { + await Promise.allSettled(pendingToolTasks); + } const sanitizedPayloads = isHeartbeat ? payloadArray