diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts index 26d0f2dce..89b330f29 100644 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts +++ b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts @@ -145,6 +145,23 @@ describe("runReplyAgent typing (heartbeat)", () => { expect(typing.startTypingLoop).toHaveBeenCalled(); }); + it("signals typing even without consumer partial handler", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce( + async (params: EmbeddedPiAgentParams) => { + await params.onPartialReply?.({ text: "hi" }); + return { payloads: [{ text: "final" }], meta: {} }; + }, + ); + + const { run, typing } = createMinimalRun({ + typingMode: "message", + }); + await run(); + + expect(typing.startTypingOnText).toHaveBeenCalledWith("hi"); + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + }); + it("never signals typing for heartbeat runs", async () => { const onPartialReply = vi.fn(); runEmbeddedPiAgentMock.mockImplementationOnce( diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 660d80af5..070a1e7bf 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -547,6 +547,28 @@ export async function runReplyAgent(params: { const allowPartialStream = !( followupRun.run.reasoningLevel === "stream" && opts?.onReasoningStream ); + const handlePartialForTyping = async ( + payload: ReplyPayload, + ): Promise => { + if (!allowPartialStream) return undefined; + let text = payload.text; + if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { + const stripped = stripHeartbeatToken(text, { + mode: "message", + }); + if (stripped.didStrip && !didLogHeartbeatStrip) { + didLogHeartbeatStrip = true; + logVerbose("Stripped stray HEARTBEAT_OK token from reply"); + } + if (stripped.shouldSkip && (payload.mediaUrls?.length ?? 0) === 0) { + return undefined; + } + text = stripped.text; + } + if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return undefined; + await typingSignals.signalTextDelta(text); + return text; + }; const fallbackResult = await runWithModelFallback({ cfg: followupRun.run.config, provider: followupRun.run.provider, @@ -641,31 +663,15 @@ export async function runReplyAgent(params: { blockReplyBreak: resolvedBlockStreamingBreak, blockReplyChunking, onPartialReply: - opts?.onPartialReply && allowPartialStream + allowPartialStream ? async (payload) => { - let text = payload.text; - if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { - const stripped = stripHeartbeatToken(text, { - mode: "message", - }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose( - "Stripped stray HEARTBEAT_OK token from reply", - ); - } - if ( - stripped.shouldSkip && - (payload.mediaUrls?.length ?? 0) === 0 - ) { - return; - } - text = stripped.text; - } - if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return; - await typingSignals.signalTextDelta(text); - await opts.onPartialReply?.({ - text, + const textForTyping = await handlePartialForTyping( + payload, + ); + if (!opts?.onPartialReply || textForTyping === undefined) + return; + await opts.onPartialReply({ + text: textForTyping, mediaUrls: payload.mediaUrls, }); }