From d4c205f8e1faf2c7b32588ba178c0a8f501543de Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 13 Jan 2026 04:32:28 +0000 Subject: [PATCH] fix: start typing on message start --- src/agents/pi-embedded-runner.ts | 2 ++ src/agents/pi-embedded-subscribe.ts | 11 +++++++++-- .../reply/agent-runner.heartbeat-typing.test.ts | 15 +++++++++------ src/auto-reply/reply/agent-runner.ts | 3 +++ src/auto-reply/reply/typing-mode.test.ts | 14 ++++++++++++++ src/auto-reply/reply/typing-mode.ts | 10 ++++++++++ 6 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 521d22212..181be081c 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -1383,6 +1383,7 @@ export async function runEmbeddedPiAgent(params: { text?: string; mediaUrls?: string[]; }) => void | Promise; + onAssistantMessageStart?: () => void | Promise; onBlockReply?: (payload: { text?: string; mediaUrls?: string[]; @@ -1774,6 +1775,7 @@ export async function runEmbeddedPiAgent(params: { blockReplyBreak: params.blockReplyBreak, blockReplyChunking: params.blockReplyChunking, onPartialReply: params.onPartialReply, + onAssistantMessageStart: params.onAssistantMessageStart, onAgentEvent: params.onAgentEvent, enforceFinalTag: params.enforceFinalTag, }); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index a9b6a9284..755893585 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -146,7 +146,7 @@ function extractMessagingToolSend( : undefined; } -export function subscribeEmbeddedPiSession(params: { +export type SubscribeEmbeddedPiSessionParams = { session: AgentSession; runId: string; verboseLevel?: "off" | "on"; @@ -173,12 +173,17 @@ export function subscribeEmbeddedPiSession(params: { text?: string; mediaUrls?: string[]; }) => void | Promise; + onAssistantMessageStart?: () => void | Promise; onAgentEvent?: (evt: { stream: string; data: Record; }) => void; enforceFinalTag?: boolean; -}) { +}; + +export function subscribeEmbeddedPiSession( + params: SubscribeEmbeddedPiSessionParams, +) { const assistantTexts: string[] = []; const toolMetas: Array<{ toolName?: string; meta?: string }> = []; const toolMetaById = new Map(); @@ -492,6 +497,8 @@ export function subscribeEmbeddedPiSession(params: { // may deliver late text_end updates after message_end, which would // otherwise re-trigger block replies. resetAssistantMessageState(assistantTexts.length); + // Use assistant message_start as the earliest "writing" signal for typing. + void params.onAssistantMessageStart?.(); } } diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts index 5af29f6b4..89544d618 100644 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts +++ b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts @@ -51,6 +51,7 @@ type EmbeddedPiAgentParams = { text?: string; mediaUrls?: string[]; }) => Promise | void; + onAssistantMessageStart?: () => Promise | void; onBlockReply?: (payload: { text?: string; mediaUrls?: string[]; @@ -212,19 +213,21 @@ describe("runReplyAgent typing (heartbeat)", () => { expect(typing.startTypingLoop).not.toHaveBeenCalled(); }); - it("starts typing only on deltas in message mode", async () => { - runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ - payloads: [{ text: "final" }], - meta: {}, - })); + it("starts typing on assistant message start in message mode", async () => { + runEmbeddedPiAgentMock.mockImplementationOnce( + async (params: EmbeddedPiAgentParams) => { + await params.onAssistantMessageStart?.(); + return { payloads: [{ text: "final" }], meta: {} }; + }, + ); const { run, typing } = createMinimalRun({ typingMode: "message", }); await run(); + expect(typing.startTypingLoop).toHaveBeenCalled(); expect(typing.startTypingOnText).not.toHaveBeenCalled(); - expect(typing.startTypingLoop).not.toHaveBeenCalled(); }); it("starts typing from reasoning stream in thinking mode", async () => { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index ac6f9790e..018adeeb0 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -689,6 +689,9 @@ export async function runReplyAgent(params: { }); } : undefined, + onAssistantMessageStart: async () => { + await typingSignals.signalMessageStart(); + }, onReasoningStream: typingSignals.shouldStartOnReasoning || opts?.onReasoningStream ? async (payload) => { diff --git a/src/auto-reply/reply/typing-mode.test.ts b/src/auto-reply/reply/typing-mode.test.ts index 3b18c47b5..013e781da 100644 --- a/src/auto-reply/reply/typing-mode.test.ts +++ b/src/auto-reply/reply/typing-mode.test.ts @@ -96,6 +96,20 @@ describe("createTypingSignaler", () => { expect(typing.startTypingLoop).not.toHaveBeenCalled(); }); + it("signals on message start for message mode", async () => { + const typing = createMockTypingController(); + const signaler = createTypingSignaler({ + typing, + mode: "message", + isHeartbeat: false, + }); + + await signaler.signalMessageStart(); + + expect(typing.startTypingLoop).toHaveBeenCalled(); + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + }); + it("signals on reasoning for thinking mode", async () => { const typing = createMockTypingController(); const signaler = createTypingSignaler({ diff --git a/src/auto-reply/reply/typing-mode.ts b/src/auto-reply/reply/typing-mode.ts index 9e2a6b36b..9ba435044 100644 --- a/src/auto-reply/reply/typing-mode.ts +++ b/src/auto-reply/reply/typing-mode.ts @@ -25,9 +25,11 @@ export function resolveTypingMode({ export type TypingSignaler = { mode: TypingMode; shouldStartImmediately: boolean; + shouldStartOnMessageStart: boolean; shouldStartOnText: boolean; shouldStartOnReasoning: boolean; signalRunStart: () => Promise; + signalMessageStart: () => Promise; signalTextDelta: (text?: string) => Promise; signalReasoningDelta: () => Promise; signalToolStart: () => Promise; @@ -40,6 +42,7 @@ export function createTypingSignaler(params: { }): TypingSignaler { const { typing, mode, isHeartbeat } = params; const shouldStartImmediately = mode === "instant"; + const shouldStartOnMessageStart = mode === "message"; const shouldStartOnText = mode === "message" || mode === "instant"; const shouldStartOnReasoning = mode === "thinking"; const disabled = isHeartbeat || mode === "never"; @@ -49,6 +52,11 @@ export function createTypingSignaler(params: { await typing.startTypingLoop(); }; + const signalMessageStart = async () => { + if (disabled || !shouldStartOnMessageStart) return; + await typing.startTypingLoop(); + }; + const signalTextDelta = async (text?: string) => { if (disabled) return; if (shouldStartOnText) { @@ -80,9 +88,11 @@ export function createTypingSignaler(params: { return { mode, shouldStartImmediately, + shouldStartOnMessageStart, shouldStartOnText, shouldStartOnReasoning, signalRunStart, + signalMessageStart, signalTextDelta, signalReasoningDelta, signalToolStart,