From 434c25331eaa0fb22763d84b0f5c11238d816d26 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 7 Jan 2026 22:18:11 +0000 Subject: [PATCH] refactor: centralize typing mode signals --- docs/concepts/group-messages.md | 1 + docs/concepts/typing-indicators.md | 7 +- docs/gateway/configuration.md | 1 + src/auto-reply/reply.ts | 14 ++-- .../agent-runner.heartbeat-typing.test.ts | 16 +--- src/auto-reply/reply/agent-runner.ts | 40 ++++------ .../reply/followup-runner.compaction.test.ts | 16 +--- src/auto-reply/reply/followup-runner.ts | 15 ++-- src/auto-reply/reply/test-helpers.ts | 15 ++++ src/auto-reply/reply/typing-mode.test.ts | 75 ++++++++++++++++++- src/auto-reply/reply/typing-mode.ts | 56 +++++++++++++- src/config/types.ts | 1 + src/config/zod-schema.ts | 8 ++ 13 files changed, 192 insertions(+), 73 deletions(-) create mode 100644 src/auto-reply/reply/test-helpers.ts diff --git a/docs/concepts/group-messages.md b/docs/concepts/group-messages.md index e403634d2..358a64c95 100644 --- a/docs/concepts/group-messages.md +++ b/docs/concepts/group-messages.md @@ -71,3 +71,4 @@ Only the owner number (from `whatsapp.allowFrom`, or the bot’s own E.164 when - Heartbeats are intentionally skipped for groups to avoid noisy broadcasts. - Echo suppression uses the combined batch string; if you send identical text twice without mentions, only the first will get a response. - Session store entries will appear as `agent::whatsapp:group:` in the session store (`~/.clawdbot/agents//sessions/sessions.json` by default); a missing entry just means the group hasn’t triggered a run yet. +- Typing indicators in groups follow `agent.typingMode` (default: `message` when unmentioned). diff --git a/docs/concepts/typing-indicators.md b/docs/concepts/typing-indicators.md index a389ba240..e3d92a46f 100644 --- a/docs/concepts/typing-indicators.md +++ b/docs/concepts/typing-indicators.md @@ -39,10 +39,11 @@ Order of “how early it fires”: } ``` -You can override the refresh cadence per session: +You can override mode or cadence per session: ```json5 { session: { + typingMode: "message", typingIntervalSeconds: 4 } } @@ -51,8 +52,8 @@ You can override the refresh cadence per session: ## Notes - `message` mode won’t show typing for silent-only replies (e.g. the `NO_REPLY` token used to suppress output). -- `thinking` only fires if the run streams reasoning; if the model doesn’t emit - reasoning deltas, typing won’t start. +- `thinking` only fires if the run streams reasoning (`reasoningLevel: "stream"`). + If the model doesn’t emit reasoning deltas, typing won’t start. - Heartbeats never show typing, regardless of mode. - `typingIntervalSeconds` controls the **refresh cadence**, not the start time. The default is 6 seconds. diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index b0a36b1e9..94305b55f 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -996,6 +996,7 @@ See [/concepts/streaming](/concepts/streaming) for behavior + chunking details. Typing indicators: - `agent.typingMode`: `"never" | "instant" | "thinking" | "message"`. Defaults to `instant` for direct chats / mentions and `message` for unmentioned group chats. +- `session.typingMode`: per-session override for the mode. - `agent.typingIntervalSeconds`: how often the typing signal is refreshed (default: 6s). - `session.typingIntervalSeconds`: per-session override for the refresh interval. See [/concepts/typing-indicators](/concepts/typing-indicators) for behavior details. diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 2b95c798d..e7f95bb0d 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -66,8 +66,8 @@ import { } from "./reply/session-updates.js"; import { createTypingController } from "./reply/typing.js"; import { + createTypingSignaler, resolveTypingMode, - shouldStartTypingImmediately, } from "./reply/typing-mode.js"; import type { MsgContext, TemplateContext } from "./templating.js"; import { @@ -599,12 +599,16 @@ export async function getReplyFromConfig( const wasMentioned = ctx.WasMentioned === true; const isHeartbeat = opts?.isHeartbeat === true; const typingMode = resolveTypingMode({ - configured: agentCfg?.typingMode, + configured: sessionCfg?.typingMode ?? agentCfg?.typingMode, isGroupChat, wasMentioned, isHeartbeat, }); - const shouldEagerType = shouldStartTypingImmediately(typingMode); + const typingSignals = createTypingSignaler({ + typing, + mode: typingMode, + isHeartbeat, + }); const shouldInjectGroupIntro = Boolean( isGroupChat && (isFirstTurnInSession || sessionEntry?.groupActivationNeedsSystemIntro), @@ -798,8 +802,8 @@ export async function getReplyFromConfig( }, }; - if (shouldEagerType) { - await typing.startTypingLoop(); + if (typingSignals.shouldStartImmediately) { + await typingSignals.signalRunStart(); } return runReplyAgent({ diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts index 16473ce8f..19e0f362f 100644 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts +++ b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts @@ -9,7 +9,7 @@ import type { TypingMode } from "../../config/types.js"; import type { TemplateContext } from "../templating.js"; import type { GetReplyOptions } from "../types.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; -import type { TypingController } from "./typing.js"; +import { createMockTypingController } from "./test-helpers.js"; const runEmbeddedPiAgentMock = vi.fn(); @@ -46,18 +46,6 @@ vi.mock("./queue.js", async () => { import { runReplyAgent } from "./agent-runner.js"; -function createTyping(): TypingController { - return { - onReplyStart: vi.fn(async () => {}), - startTypingLoop: vi.fn(async () => {}), - startTypingOnText: vi.fn(async () => {}), - refreshTypingTtl: vi.fn(), - markRunComplete: vi.fn(), - markDispatchIdle: vi.fn(), - cleanup: vi.fn(), - }; -} - type EmbeddedPiAgentParams = { onPartialReply?: (payload: { text?: string }) => Promise | void; }; @@ -71,7 +59,7 @@ function createMinimalRun(params?: { storePath?: string; typingMode?: TypingMode; }) { - const typing = createTyping(); + const typing = createMockTypingController(); const opts = params?.opts; const sessionCtx = { Provider: "whatsapp", diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 478be71a0..358c16b28 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -33,6 +33,7 @@ import { import { extractReplyToTag } from "./reply-tags.js"; import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; +import { createTypingSignaler } from "./typing-mode.js"; const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i; @@ -107,26 +108,11 @@ export async function runReplyAgent(params: { } = params; const isHeartbeat = opts?.isHeartbeat === true; - const shouldStartTypingOnText = - typingMode === "message" || typingMode === "instant"; - const shouldStartTypingOnReasoning = typingMode === "thinking"; - - const signalTypingFromText = async (text?: string) => { - if (isHeartbeat || typingMode === "never") return; - if (shouldStartTypingOnText) { - await typing.startTypingOnText(text); - return; - } - if (shouldStartTypingOnReasoning) { - typing.refreshTypingTtl(); - } - }; - - const signalTypingFromReasoning = async () => { - if (isHeartbeat || !shouldStartTypingOnReasoning) return; - await typing.startTypingLoop(); - typing.refreshTypingTtl(); - }; + const typingSignals = createTypingSignaler({ + typing, + mode: typingMode, + isHeartbeat, + }); const shouldEmitToolResult = () => { if (!sessionKey || !storePath) { @@ -276,7 +262,7 @@ export async function runReplyAgent(params: { } text = stripped.text; } - await signalTypingFromText(text); + await typingSignals.signalTextDelta(text); await opts.onPartialReply?.({ text, mediaUrls: payload.mediaUrls, @@ -284,9 +270,9 @@ export async function runReplyAgent(params: { } : undefined, onReasoningStream: - shouldStartTypingOnReasoning || opts?.onReasoningStream + typingSignals.shouldStartOnReasoning || opts?.onReasoningStream ? async (payload) => { - await signalTypingFromReasoning(); + await typingSignals.signalReasoningDelta(); await opts?.onReasoningStream?.({ text: payload.text, mediaUrls: payload.mediaUrls, @@ -344,7 +330,7 @@ export async function runReplyAgent(params: { } pendingStreamedPayloadKeys.add(payloadKey); const task = (async () => { - await signalTypingFromText(cleaned); + await typingSignals.signalTextDelta(cleaned); await opts.onBlockReply?.(blockPayload); })() .then(() => { @@ -389,7 +375,7 @@ export async function runReplyAgent(params: { } text = stripped.text; } - await signalTypingFromText(text); + await typingSignals.signalTextDelta(text); await opts.onToolResult?.({ text, mediaUrls: payload.mediaUrls, @@ -544,8 +530,8 @@ export async function runReplyAgent(params: { if (payload.mediaUrls && payload.mediaUrls.length > 0) return true; return false; }); - if (shouldSignalTyping && typingMode === "instant" && !isHeartbeat) { - await typing.startTypingLoop(); + if (shouldSignalTyping) { + await typingSignals.signalRunStart(); } if (sessionStore && sessionKey) { diff --git a/src/auto-reply/reply/followup-runner.compaction.test.ts b/src/auto-reply/reply/followup-runner.compaction.test.ts index 17e3ccd7a..1e4e6336b 100644 --- a/src/auto-reply/reply/followup-runner.compaction.test.ts +++ b/src/auto-reply/reply/followup-runner.compaction.test.ts @@ -5,7 +5,7 @@ import { describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../../config/sessions.js"; import type { FollowupRun } from "./queue.js"; -import type { TypingController } from "./typing.js"; +import { createMockTypingController } from "./test-helpers.js"; const runEmbeddedPiAgentMock = vi.fn(); @@ -31,18 +31,6 @@ vi.mock("../../agents/pi-embedded.js", () => ({ import { createFollowupRunner } from "./followup-runner.js"; -function createTyping(): TypingController { - return { - onReplyStart: vi.fn(async () => {}), - startTypingLoop: vi.fn(async () => {}), - startTypingOnText: vi.fn(async () => {}), - refreshTypingTtl: vi.fn(), - markRunComplete: vi.fn(), - markDispatchIdle: vi.fn(), - cleanup: vi.fn(), - }; -} - describe("createFollowupRunner compaction", () => { it("adds verbose auto-compaction notice and tracks count", async () => { const storePath = path.join( @@ -75,7 +63,7 @@ describe("createFollowupRunner compaction", () => { const runner = createFollowupRunner({ opts: { onBlockReply }, - typing: createTyping(), + typing: createMockTypingController(), typingMode: "instant", sessionEntry, sessionStore, diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index e628a806c..46fd90884 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -17,6 +17,7 @@ import { extractReplyToTag } from "./reply-tags.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; +import { createTypingSignaler } from "./typing-mode.js"; export function createFollowupRunner(params: { opts?: GetReplyOptions; @@ -40,6 +41,11 @@ export function createFollowupRunner(params: { defaultModel, agentCfgContextTokens, } = params; + const typingSignals = createTypingSignaler({ + typing, + mode: typingMode, + isHeartbeat: opts?.isHeartbeat === true, + }); /** * Sends followup payloads, routing to the originating channel if set. @@ -74,13 +80,7 @@ export function createFollowupRunner(params: { ) { continue; } - if (typingMode !== "never") { - if (typingMode === "message" || typingMode === "instant") { - await typing.startTypingOnText(payload.text); - } else if (typingMode === "thinking") { - typing.refreshTypingTtl(); - } - } + await typingSignals.signalTextDelta(payload.text); // Route to originating channel if set, otherwise fall back to dispatcher. if (shouldRouteToOriginating) { @@ -108,6 +108,7 @@ export function createFollowupRunner(params: { }; return async (queued: FollowupRun) => { + await typingSignals.signalRunStart(); try { const runId = crypto.randomUUID(); if (queued.run.sessionKey) { diff --git a/src/auto-reply/reply/test-helpers.ts b/src/auto-reply/reply/test-helpers.ts new file mode 100644 index 000000000..c9f49ac58 --- /dev/null +++ b/src/auto-reply/reply/test-helpers.ts @@ -0,0 +1,15 @@ +import { vi } from "vitest"; + +import type { TypingController } from "./typing.js"; + +export function createMockTypingController(): TypingController { + return { + onReplyStart: vi.fn(async () => {}), + startTypingLoop: vi.fn(async () => {}), + startTypingOnText: vi.fn(async () => {}), + refreshTypingTtl: vi.fn(), + markRunComplete: vi.fn(), + markDispatchIdle: vi.fn(), + cleanup: vi.fn(), + }; +} diff --git a/src/auto-reply/reply/typing-mode.test.ts b/src/auto-reply/reply/typing-mode.test.ts index 278f84cc9..9d3f06e1e 100644 --- a/src/auto-reply/reply/typing-mode.test.ts +++ b/src/auto-reply/reply/typing-mode.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it } from "vitest"; -import { resolveTypingMode } from "./typing-mode.js"; +import { createMockTypingController } from "./test-helpers.js"; +import { createTypingSignaler, resolveTypingMode } from "./typing-mode.js"; describe("resolveTypingMode", () => { it("defaults to instant for direct chats", () => { @@ -66,3 +67,75 @@ describe("resolveTypingMode", () => { ).toBe("never"); }); }); + +describe("createTypingSignaler", () => { + it("signals immediately for instant mode", async () => { + const typing = createMockTypingController(); + const signaler = createTypingSignaler({ + typing, + mode: "instant", + isHeartbeat: false, + }); + + await signaler.signalRunStart(); + + expect(typing.startTypingLoop).toHaveBeenCalled(); + }); + + it("signals on text for message mode", async () => { + const typing = createMockTypingController(); + const signaler = createTypingSignaler({ + typing, + mode: "message", + isHeartbeat: false, + }); + + await signaler.signalTextDelta("hello"); + + expect(typing.startTypingOnText).toHaveBeenCalledWith("hello"); + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + }); + + it("signals on reasoning for thinking mode", async () => { + const typing = createMockTypingController(); + const signaler = createTypingSignaler({ + typing, + mode: "thinking", + isHeartbeat: false, + }); + + await signaler.signalReasoningDelta(); + + expect(typing.startTypingLoop).toHaveBeenCalled(); + }); + + it("refreshes ttl on text for thinking mode", async () => { + const typing = createMockTypingController(); + const signaler = createTypingSignaler({ + typing, + mode: "thinking", + isHeartbeat: false, + }); + + await signaler.signalTextDelta("hi"); + + expect(typing.refreshTypingTtl).toHaveBeenCalled(); + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + }); + + it("suppresses typing when disabled", async () => { + const typing = createMockTypingController(); + const signaler = createTypingSignaler({ + typing, + mode: "instant", + isHeartbeat: true, + }); + + await signaler.signalRunStart(); + await signaler.signalTextDelta("hi"); + await signaler.signalReasoningDelta(); + + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + }); +}); diff --git a/src/auto-reply/reply/typing-mode.ts b/src/auto-reply/reply/typing-mode.ts index d9e150b51..e5ac1671e 100644 --- a/src/auto-reply/reply/typing-mode.ts +++ b/src/auto-reply/reply/typing-mode.ts @@ -1,4 +1,5 @@ import type { TypingMode } from "../../config/types.js"; +import type { TypingController } from "./typing.js"; export type TypingModeContext = { configured?: TypingMode; @@ -21,5 +22,56 @@ export function resolveTypingMode({ return DEFAULT_GROUP_TYPING_MODE; } -export const shouldStartTypingImmediately = (mode: TypingMode) => - mode === "instant"; +export type TypingSignaler = { + mode: TypingMode; + shouldStartImmediately: boolean; + shouldStartOnText: boolean; + shouldStartOnReasoning: boolean; + signalRunStart: () => Promise; + signalTextDelta: (text?: string) => Promise; + signalReasoningDelta: () => Promise; +}; + +export function createTypingSignaler(params: { + typing: TypingController; + mode: TypingMode; + isHeartbeat: boolean; +}): TypingSignaler { + const { typing, mode, isHeartbeat } = params; + const shouldStartImmediately = mode === "instant"; + const shouldStartOnText = mode === "message" || mode === "instant"; + const shouldStartOnReasoning = mode === "thinking"; + const disabled = isHeartbeat || mode === "never"; + + const signalRunStart = async () => { + if (disabled || !shouldStartImmediately) return; + await typing.startTypingLoop(); + }; + + const signalTextDelta = async (text?: string) => { + if (disabled) return; + if (shouldStartOnText) { + await typing.startTypingOnText(text); + return; + } + if (shouldStartOnReasoning) { + typing.refreshTypingTtl(); + } + }; + + const signalReasoningDelta = async () => { + if (disabled || !shouldStartOnReasoning) return; + await typing.startTypingLoop(); + typing.refreshTypingTtl(); + }; + + return { + mode, + shouldStartImmediately, + shouldStartOnText, + shouldStartOnReasoning, + signalRunStart, + signalTextDelta, + signalReasoningDelta, + }; +} diff --git a/src/config/types.ts b/src/config/types.ts index 5883cf9c7..e625a5914 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -38,6 +38,7 @@ export type SessionConfig = { heartbeatIdleMinutes?: number; store?: string; typingIntervalSeconds?: number; + typingMode?: TypingMode; mainKey?: string; sendPolicy?: SessionSendPolicyConfig; agentToAgent?: { diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index b38d9c23e..a9458dc9d 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -129,6 +129,14 @@ const SessionSchema = z heartbeatIdleMinutes: z.number().int().positive().optional(), store: z.string().optional(), typingIntervalSeconds: z.number().int().positive().optional(), + typingMode: z + .union([ + z.literal("never"), + z.literal("instant"), + z.literal("thinking"), + z.literal("message"), + ]) + .optional(), mainKey: z.string().optional(), sendPolicy: z .object({