From 0d8e0ddc4fff27ac8a9042dc3002b14ea7eeb51f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 26 Dec 2025 02:35:21 +0100 Subject: [PATCH] feat: unify gateway heartbeat --- CHANGELOG.md | 1 + docs/configuration.md | 6 +- docs/cron.md | 14 +- docs/heartbeat.md | 80 +-- docs/whatsapp.md | 9 +- src/agents/model-selection.test.ts | 7 +- src/agents/model-selection.ts | 2 +- src/auto-reply/heartbeat.test.ts | 55 ++ src/auto-reply/heartbeat.ts | 21 + src/config/config.ts | 16 + src/cron/service.test.ts | 42 +- src/cron/service.ts | 8 +- src/gateway/server.ts | 22 +- src/infra/heartbeat-runner.test.ts | 116 ++++ src/infra/heartbeat-runner.ts | 421 ++++++++++++++ .../heartbeat-wake.ts} | 21 +- src/provider-web.ts | 1 - src/web/auto-reply.test.ts | 538 ------------------ src/web/auto-reply.ts | 317 +---------- 19 files changed, 744 insertions(+), 953 deletions(-) create mode 100644 src/auto-reply/heartbeat.test.ts create mode 100644 src/auto-reply/heartbeat.ts create mode 100644 src/infra/heartbeat-runner.test.ts create mode 100644 src/infra/heartbeat-runner.ts rename src/{web/reply-heartbeat-wake.ts => infra/heartbeat-wake.ts} (76%) diff --git a/CHANGELOG.md b/CHANGELOG.md index fbb7d7674..90a60ddd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ ### Breaking - Config refactor: `inbound.*` removed; use top-level `routing` (allowlists + group rules + transcription), `messages` (prefixes/timestamps), and `session` (scoping/store/mainKey). No legacy keys read. - Heartbeat config moved to `agent.heartbeat`: set `every: "30m"` (duration string) and optional `model`. `agent.heartbeatMinutes` is removed, and heartbeats are disabled unless `agent.heartbeat.every` is set. +- Heartbeats now run via the gateway runner (main session) and deliver to the last used channel by default. WhatsApp reply-heartbeat behavior is removed; use `agent.heartbeat.target`/`to` (or `target: "none"`) to control delivery. ### Fixes - Heartbeat replies now strip repeated `HEARTBEAT_OK` tails to avoid accidental “OK OK” spam. diff --git a/docs/configuration.md b/docs/configuration.md index 10f9482dc..e25184ff2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -130,7 +130,8 @@ Controls the embedded agent runtime (model/thinking/verbose/timeouts). timeoutSeconds: 600, mediaMaxMb: 5, heartbeat: { - every: "30m" + every: "30m", + target: "last" }, maxConcurrent: 3, bash: { @@ -151,6 +152,9 @@ deprecation fallback. - `every`: duration string (`ms`, `s`, `m`, `h`); default unit minutes. Omit or set `0m` to disable. - `model`: optional override model for heartbeat runs (`provider/model`). +- `target`: delivery channel (`last`, `whatsapp`, `telegram`, `none`). Default: `last`. +- `to`: optional recipient override (E.164 for WhatsApp, chat id for Telegram). +- `prompt`: override the default heartbeat body (`HEARTBEAT`). `agent.bash` configures background bash defaults: - `backgroundMs`: time before auto-background (ms, default 20000) diff --git a/docs/cron.md b/docs/cron.md index 29309ab31..f17b0a023 100644 --- a/docs/cron.md +++ b/docs/cron.md @@ -14,7 +14,7 @@ Last updated: 2025-12-13 ## Context Clawdis already has: -- A **periodic reply heartbeat** that runs the agent with `HEARTBEAT` and suppresses `HEARTBEAT_OK` (`src/web/auto-reply.ts`). +- A **gateway heartbeat runner** that runs the agent with `HEARTBEAT` and suppresses `HEARTBEAT_OK` (`src/infra/heartbeat-runner.ts`). - A lightweight, in-memory **system event queue** (`enqueueSystemEvent`) that is injected into the next **main session** turn (`drainSystemEvents` in `src/auto-reply/reply.ts`). - A WebSocket **Gateway** daemon that is intended to be always-on (`docs/gateway.md`). @@ -197,12 +197,12 @@ This yields: We need a way for the Gateway (or the scheduler) to request an immediate heartbeat without duplicating heartbeat logic. Design: -- `monitorWebProvider` owns the real `runReplyHeartbeat()` function (it already has all the local state needed). -- Add a small global hook module: - - `setReplyHeartbeatWakeHandler(fn | null)` installed by `monitorWebProvider` - - `requestReplyHeartbeatNow({ reason, coalesceMs? })` -- If the handler is absent (provider not connected), the request is stored as “pending”; the next time the handler is installed, it runs once. -- Coalesce rapid calls and respect the existing “skip when queue busy” behavior (prefer retrying soon vs dropping). +- `startHeartbeatRunner` owns the real heartbeat execution and installs a wake handler. +- Wake hook lives in `src/infra/heartbeat-wake.ts`: + - `setHeartbeatWakeHandler(fn | null)` installed by the heartbeat runner + - `requestHeartbeatNow({ reason, coalesceMs? })` +- If the handler is absent, the request is stored as “pending”; the next time the handler is installed, it runs once. +- Coalesce rapid calls and respect the “skip when queue busy” behavior (retry soon vs dropping). ## Run history log (JSONL) diff --git a/docs/heartbeat.md b/docs/heartbeat.md index 779f95bf6..31090ebde 100644 --- a/docs/heartbeat.md +++ b/docs/heartbeat.md @@ -3,49 +3,53 @@ summary: "Plan for heartbeat polling messages and notification rules" read_when: - Adjusting heartbeat cadence or messaging --- -# Heartbeat polling plan (2025-11-26) +# Heartbeat (Gateway) -Goal: add a simple heartbeat poll for the embedded agent that only notifies users when something matters, using the `HEARTBEAT_OK` sentinel. The heartbeat body we send is `HEARTBEAT` so the model can easily spot it. +Heartbeat runs periodic agent turns in the **main session** so the model can +surface anything that needs attention without spamming the user. ## Prompt contract -- Extend the agent system prompt to explain: “If this is a heartbeat poll and nothing needs attention, reply exactly `HEARTBEAT_OK` and nothing else. For any alert, do **not** include `HEARTBEAT_OK`; just return the alert text.” Heartbeat prompt body is `HEARTBEAT`. -- Keep existing WhatsApp length guidance; forbid burying the sentinel inside alerts. +- Heartbeat body defaults to `HEARTBEAT` (configurable via `agent.heartbeat.prompt`). +- If nothing needs attention, the model must reply **exactly** `HEARTBEAT_OK`. +- For alerts, do **not** include `HEARTBEAT_OK`; return only the alert text. -## Config & defaults -- New config key: `agent.heartbeat` with: - - `every`: duration string (`ms`, `s`, `m`, `h`; default unit minutes). `0m` disables. - - `model`: optional override model (`provider/model`) for heartbeat runs. -- Default: disabled unless `agent.heartbeat.every` is set. -- New optional idle override for heartbeats: `session.heartbeatIdleMinutes` (defaults to `idleMinutes`). Heartbeat skips do **not** update the session `updatedAt` so idle expiry still works. +## Config -## Poller behavior -- When gateway runs with command-mode auto-reply, start a timer with the resolved heartbeat interval. -- Each tick invokes the configured command with a short heartbeat body (e.g., “(heartbeat) summarize any important changes since last turn”) while reusing the active session args so Pi context stays warm. -- Heartbeats never create a new session implicitly: if there’s no stored session for the target (fallback path), the heartbeat is skipped instead of starting a fresh Pi session. -- Abort timer on SIGINT/abort of the gateway. +```json5 +{ + agent: { + heartbeat: { + every: "30m", // duration string: ms|s|m|h (0m disables) + model: "anthropic/claude-opus-4-5", + target: "last", // last | whatsapp | telegram | none + to: "+15551234567", // optional override for whatsapp/telegram + prompt: "HEARTBEAT" // optional override + } + } +} +``` -## Sentinel handling -- Trim output. If the trimmed text equals `HEARTBEAT_OK` (case-sensitive) -> skip outbound message. -- Otherwise, send the text/media as normal, stripping the sentinel if it somehow appears. -- Treat empty output as `HEARTBEAT_OK` to avoid spurious pings. +### Fields +- `every`: heartbeat interval (duration string; default unit minutes). Omit or set + to `0m` to disable. +- `model`: optional model override for heartbeat runs (`provider/model`). +- `target`: where heartbeat output is delivered. + - `last` (default): send to the last used external channel. + - `whatsapp` / `telegram`: force the channel (optionally set `to`). + - `none`: do not deliver externally; output stays in the session (WebChat-visible). +- `to`: optional recipient override (E.164 for WhatsApp, chat id for Telegram). +- `prompt`: override the default heartbeat body. -## Logging requirements -- Normal mode: single info line per tick, e.g., `heartbeat: ok (skipped)` or `heartbeat: alert sent (32ms)`. -- `--verbose`: log start/end, command argv, duration, and whether it was skipped/sent/error; include session ID and connection/run IDs via `getChildLogger` for correlation. -- On command failure: warn-level one-liner in normal mode; verbose log includes stdout/stderr snippets. +## Behavior +- Runs in the main session (`session.mainKey`, or `global` when scope is global). +- Uses the main lane queue; if requests are in flight, the wake is retried. +- Empty output or `HEARTBEAT_OK` is treated as “ok” and does **not** keep the + session alive (`updatedAt` is restored). +- If `target` resolves to no external destination (no last route or `none`), the + heartbeat still runs but no outbound message is sent. -## Failure/backoff -- If a heartbeat command errors, log it and retry on the next scheduled tick (no exponential backoff unless command repeatedly fails; keep it simple for now). - -## Tests to add -- Unit: sentinel detection (`HEARTBEAT_OK`, empty output, mixed text), skip vs send decision, default interval resolver (30m, override, disable). -- Unit/integration: verbose logger emits start/end lines; normal logger emits a single line. - -## Documentation -- Add a short README snippet under configuration showing `agent.heartbeat` and the sentinel rule. -- Expose CLI triggers: - - `clawdis heartbeat` (web provider, defaults to first `routing.allowFrom`; optional `--to` override) - - `--session-id ` forces resuming a specific session for that heartbeat - - `clawdis gateway --heartbeat-now` to run the gateway loop with an immediate heartbeat - - Gateway supports `--heartbeat-now` to fire once at startup. - - When multiple sessions are active or `routing.allowFrom` is only `"*"`, require `--to ` or `--all` for manual heartbeats to avoid ambiguous targets. +## Wake hook +- The gateway exposes a heartbeat wake hook so cron/jobs/webhooks can request an + immediate run (`requestHeartbeatNow`). +- `wake` endpoints should enqueue system events and optionally trigger a wake; the + heartbeat runner picks those up on the next tick or immediately. diff --git a/docs/whatsapp.md b/docs/whatsapp.md index b34846e7d..e7a3c7cb8 100644 --- a/docs/whatsapp.md +++ b/docs/whatsapp.md @@ -86,10 +86,9 @@ Status: WhatsApp Web via Baileys only. Gateway owns the single session. ## Heartbeats - **Gateway heartbeat** logs connection health (`web.heartbeatSeconds`, default 60s). -- **Reply heartbeat** asks agent on a timer (`agent.heartbeat.every`). - - Uses `HEARTBEAT` prompt + `HEARTBEAT_TOKEN` skip behavior. - - Skips if queue busy or last inbound was a group. - - Falls back to last direct recipient if needed. +- **Agent heartbeat** is global (`agent.heartbeat.*`) and runs in the main session. + - Uses `HEARTBEAT` prompt + `HEARTBEAT_OK` skip behavior. + - Delivery defaults to the last used channel (or configured target). ## Reconnect behavior - Backoff policy: `web.reconnect`: @@ -106,6 +105,8 @@ Status: WhatsApp Web via Baileys only. Gateway owns the single session. - `agent.mediaMaxMb` - `agent.heartbeat.every` - `agent.heartbeat.model` (optional override) +- `agent.heartbeat.target` +- `agent.heartbeat.to` - `session.*` (scope, idle, store, mainKey) - `web.heartbeatSeconds` - `web.reconnect.*` diff --git a/src/agents/model-selection.test.ts b/src/agents/model-selection.test.ts index 385a7de65..021aa6ff1 100644 --- a/src/agents/model-selection.test.ts +++ b/src/agents/model-selection.test.ts @@ -19,7 +19,7 @@ describe("resolveConfiguredModelRef", () => { expect(resolved).toEqual({ provider: "openai", model: "gpt-4.1-mini" }); }); - it("falls back to default provider when agent.model omits it", () => { + it("falls back to anthropic when agent.model omits provider", () => { const cfg = { agent: { model: "claude-opus-4-5" }, } satisfies ClawdisConfig; @@ -30,10 +30,7 @@ describe("resolveConfiguredModelRef", () => { defaultModel: DEFAULT_MODEL, }); - expect(resolved).toEqual({ - provider: DEFAULT_PROVIDER, - model: "claude-opus-4-5", - }); + expect(resolved).toEqual({ provider: "anthropic", model: "claude-opus-4-5" }); }); it("falls back to defaults when agent.model is missing", () => { diff --git a/src/agents/model-selection.ts b/src/agents/model-selection.ts index 92f2392b9..bd66c6656 100644 --- a/src/agents/model-selection.ts +++ b/src/agents/model-selection.ts @@ -39,7 +39,7 @@ export function resolveConfiguredModelRef(params: { if (parsed) return parsed; } // TODO(steipete): drop this fallback once provider-less agent.model is fully deprecated. - return { provider: params.defaultProvider, model: trimmed }; + return { provider: "anthropic", model: trimmed }; } return { provider: params.defaultProvider, model: params.defaultModel }; } diff --git a/src/auto-reply/heartbeat.test.ts b/src/auto-reply/heartbeat.test.ts new file mode 100644 index 000000000..b95a98fff --- /dev/null +++ b/src/auto-reply/heartbeat.test.ts @@ -0,0 +1,55 @@ +import { describe, expect, it } from "vitest"; + +import { stripHeartbeatToken } from "./heartbeat.js"; +import { HEARTBEAT_TOKEN } from "./tokens.js"; + +describe("stripHeartbeatToken", () => { + it("skips empty or token-only replies", () => { + expect(stripHeartbeatToken(undefined)).toEqual({ + shouldSkip: true, + text: "", + }); + expect(stripHeartbeatToken(" ")).toEqual({ + shouldSkip: true, + text: "", + }); + expect(stripHeartbeatToken(HEARTBEAT_TOKEN)).toEqual({ + shouldSkip: true, + text: "", + }); + }); + + it("keeps content and removes token when mixed", () => { + expect(stripHeartbeatToken(`ALERT ${HEARTBEAT_TOKEN}`)).toEqual({ + shouldSkip: false, + text: "ALERT", + }); + expect(stripHeartbeatToken("hello")).toEqual({ + shouldSkip: false, + text: "hello", + }); + }); + + it("strips repeated OK tails after heartbeat token", () => { + expect(stripHeartbeatToken("HEARTBEAT_OK_OK_OK")).toEqual({ + shouldSkip: true, + text: "", + }); + expect(stripHeartbeatToken("HEARTBEAT_OK_OK")).toEqual({ + shouldSkip: true, + text: "", + }); + expect(stripHeartbeatToken("HEARTBEAT_OK _OK")).toEqual({ + shouldSkip: true, + text: "", + }); + expect(stripHeartbeatToken("HEARTBEAT_OK OK")).toEqual({ + shouldSkip: true, + text: "", + }); + expect(stripHeartbeatToken("ALERT HEARTBEAT_OK_OK")).toEqual({ + shouldSkip: false, + text: "ALERT", + }); + }); +}); diff --git a/src/auto-reply/heartbeat.ts b/src/auto-reply/heartbeat.ts new file mode 100644 index 000000000..262275be8 --- /dev/null +++ b/src/auto-reply/heartbeat.ts @@ -0,0 +1,21 @@ +import { HEARTBEAT_TOKEN } from "./tokens.js"; + +export const HEARTBEAT_PROMPT = "HEARTBEAT"; + +export function stripHeartbeatToken(raw?: string) { + if (!raw) return { shouldSkip: true, text: "" }; + const trimmed = raw.trim(); + if (!trimmed) return { shouldSkip: true, text: "" }; + if (trimmed === HEARTBEAT_TOKEN) return { shouldSkip: true, text: "" }; + const hadToken = trimmed.includes(HEARTBEAT_TOKEN); + let withoutToken = trimmed.replaceAll(HEARTBEAT_TOKEN, "").trim(); + if (hadToken && withoutToken) { + // LLMs sometimes echo malformed HEARTBEAT_OK_OK... tails; strip trailing OK runs to avoid spam. + withoutToken = withoutToken.replace(/[\s_]*OK(?:[\s_]*OK)*$/gi, "").trim(); + } + const shouldSkip = withoutToken.length === 0; + return { + shouldSkip, + text: shouldSkip ? "" : withoutToken || trimmed, + }; +} diff --git a/src/config/config.ts b/src/config/config.ts index 0436190a8..b305d00ac 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -329,6 +329,12 @@ export type ClawdisConfig = { every?: string; /** Heartbeat model override (provider/model). */ model?: string; + /** Delivery target (last|whatsapp|telegram|none). */ + target?: "last" | "whatsapp" | "telegram" | "none"; + /** Optional delivery override (E.164 for WhatsApp, chat id for Telegram). */ + to?: string; + /** Override the heartbeat prompt body (default: "HEARTBEAT"). */ + prompt?: string; }; /** Max concurrent agent runs across all conversations. Default: 1 (sequential). */ maxConcurrent?: number; @@ -454,6 +460,16 @@ const HeartbeatSchema = z .object({ every: z.string().optional(), model: z.string().optional(), + target: z + .union([ + z.literal("last"), + z.literal("whatsapp"), + z.literal("telegram"), + z.literal("none"), + ]) + .optional(), + to: z.string().optional(), + prompt: z.string().optional(), }) .superRefine((val, ctx) => { if (!val.every) return; diff --git a/src/cron/service.test.ts b/src/cron/service.test.ts index 357498d71..b38fa1fb2 100644 --- a/src/cron/service.test.ts +++ b/src/cron/service.test.ts @@ -40,14 +40,14 @@ describe("CronService", () => { it("runs a one-shot main job and disables it after success", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); - const requestReplyHeartbeatNow = vi.fn(); + const requestHeartbeatNow = vi.fn(); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, - requestReplyHeartbeatNow, + requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); @@ -71,7 +71,7 @@ describe("CronService", () => { const updated = jobs.find((j) => j.id === job.id); expect(updated?.enabled).toBe(false); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello"); - expect(requestReplyHeartbeatNow).toHaveBeenCalled(); + expect(requestHeartbeatNow).toHaveBeenCalled(); await cron.list({ includeDisabled: true }); cron.stop(); @@ -81,7 +81,7 @@ describe("CronService", () => { it("runs an isolated job and posts summary to main", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); - const requestReplyHeartbeatNow = vi.fn(); + const requestHeartbeatNow = vi.fn(); const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done", @@ -92,7 +92,7 @@ describe("CronService", () => { cronEnabled: true, log: noopLogger, enqueueSystemEvent, - requestReplyHeartbeatNow, + requestHeartbeatNow, runIsolatedAgentJob, }); @@ -113,7 +113,7 @@ describe("CronService", () => { await cron.list({ includeDisabled: true }); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done"); - expect(requestReplyHeartbeatNow).toHaveBeenCalled(); + expect(requestHeartbeatNow).toHaveBeenCalled(); cron.stop(); await store.cleanup(); }); @@ -121,7 +121,7 @@ describe("CronService", () => { it("posts last output to main even when isolated job errors", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); - const requestReplyHeartbeatNow = vi.fn(); + const requestHeartbeatNow = vi.fn(); const runIsolatedAgentJob = vi.fn(async () => ({ status: "error" as const, summary: "last output", @@ -133,7 +133,7 @@ describe("CronService", () => { cronEnabled: true, log: noopLogger, enqueueSystemEvent, - requestReplyHeartbeatNow, + requestHeartbeatNow, runIsolatedAgentJob, }); @@ -155,7 +155,7 @@ describe("CronService", () => { expect(enqueueSystemEvent).toHaveBeenCalledWith( "Cron (error): last output", ); - expect(requestReplyHeartbeatNow).toHaveBeenCalled(); + expect(requestHeartbeatNow).toHaveBeenCalled(); cron.stop(); await store.cleanup(); }); @@ -168,7 +168,7 @@ describe("CronService", () => { cronEnabled: true, log: noopLogger, enqueueSystemEvent: vi.fn(), - requestReplyHeartbeatNow: vi.fn(), + requestHeartbeatNow: vi.fn(), runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); @@ -203,7 +203,7 @@ describe("CronService", () => { it("skips invalid main jobs with agentTurn payloads from disk", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); - const requestReplyHeartbeatNow = vi.fn(); + const requestHeartbeatNow = vi.fn(); const atMs = Date.parse("2025-12-13T00:00:01.000Z"); await fs.mkdir(path.dirname(store.storePath), { recursive: true }); @@ -232,7 +232,7 @@ describe("CronService", () => { cronEnabled: true, log: noopLogger, enqueueSystemEvent, - requestReplyHeartbeatNow, + requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); @@ -242,7 +242,7 @@ describe("CronService", () => { await vi.runOnlyPendingTimersAsync(); expect(enqueueSystemEvent).not.toHaveBeenCalled(); - expect(requestReplyHeartbeatNow).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); const jobs = await cron.list({ includeDisabled: true }); expect(jobs[0]?.state.lastStatus).toBe("skipped"); @@ -255,14 +255,14 @@ describe("CronService", () => { it("skips main jobs with empty systemEvent text", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); - const requestReplyHeartbeatNow = vi.fn(); + const requestHeartbeatNow = vi.fn(); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, - requestReplyHeartbeatNow, + requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); @@ -281,7 +281,7 @@ describe("CronService", () => { await vi.runOnlyPendingTimersAsync(); expect(enqueueSystemEvent).not.toHaveBeenCalled(); - expect(requestReplyHeartbeatNow).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); const jobs = await cron.list({ includeDisabled: true }); expect(jobs[0]?.state.lastStatus).toBe("skipped"); @@ -294,14 +294,14 @@ describe("CronService", () => { it("does not schedule timers when cron is disabled", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); - const requestReplyHeartbeatNow = vi.fn(); + const requestHeartbeatNow = vi.fn(); const cron = new CronService({ storePath: store.storePath, cronEnabled: false, log: noopLogger, enqueueSystemEvent, - requestReplyHeartbeatNow, + requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); @@ -324,7 +324,7 @@ describe("CronService", () => { await vi.runOnlyPendingTimersAsync(); expect(enqueueSystemEvent).not.toHaveBeenCalled(); - expect(requestReplyHeartbeatNow).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); expect(noopLogger.warn).toHaveBeenCalled(); cron.stop(); @@ -334,14 +334,14 @@ describe("CronService", () => { it("status reports next wake when enabled", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); - const requestReplyHeartbeatNow = vi.fn(); + const requestHeartbeatNow = vi.fn(); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, - requestReplyHeartbeatNow, + requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); diff --git a/src/cron/service.ts b/src/cron/service.ts index 87001bdbf..cdda4bc51 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -34,7 +34,7 @@ export type CronServiceDeps = { storePath: string; cronEnabled: boolean; enqueueSystemEvent: (text: string) => void; - requestReplyHeartbeatNow: (opts?: { reason?: string }) => void; + requestHeartbeatNow: (opts?: { reason?: string }) => void; runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{ status: "ok" | "error" | "skipped"; summary?: string; @@ -276,7 +276,7 @@ export class CronService { if (!text) return { ok: false }; this.deps.enqueueSystemEvent(text); if (opts.mode === "now") { - this.deps.requestReplyHeartbeatNow({ reason: "wake" }); + this.deps.requestHeartbeatNow({ reason: "wake" }); } return { ok: true }; } @@ -479,7 +479,7 @@ export class CronService { const statusPrefix = status === "ok" ? prefix : `${prefix} (${status})`; this.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`); if (job.wakeMode === "now") { - this.deps.requestReplyHeartbeatNow({ reason: `cron:${job.id}:post` }); + this.deps.requestHeartbeatNow({ reason: `cron:${job.id}:post` }); } } }; @@ -503,7 +503,7 @@ export class CronService { } this.deps.enqueueSystemEvent(text); if (job.wakeMode === "now") { - this.deps.requestReplyHeartbeatNow({ reason: `cron:${job.id}` }); + this.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); } await finish("ok", undefined, text); return; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 377b2a87c..bb05d26e3 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -76,6 +76,11 @@ import { getLastHeartbeatEvent, onHeartbeatEvent, } from "../infra/heartbeat-events.js"; +import { + setHeartbeatsEnabled, + startHeartbeatRunner, +} from "../infra/heartbeat-runner.js"; +import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { getMachineDisplayName } from "../infra/machine-name.js"; import { approveNodePairing, @@ -129,13 +134,9 @@ import { monitorTelegramProvider } from "../telegram/monitor.js"; import { probeTelegram, type TelegramProbe } from "../telegram/probe.js"; import { sendMessageTelegram } from "../telegram/send.js"; import { normalizeE164, resolveUserPath } from "../utils.js"; -import { - setHeartbeatsEnabled, - type WebProviderStatus, -} from "../web/auto-reply.js"; +import type { WebProviderStatus } from "../web/auto-reply.js"; import { startWebLoginWithQr, waitForWebLogin } from "../web/login-qr.js"; import { sendMessageWhatsApp } from "../web/outbound.js"; -import { requestReplyHeartbeatNow } from "../web/reply-heartbeat-wake.js"; import { getWebAuthAgeMs, logoutWeb, readWebSelfId } from "../web/session.js"; import { assertGatewayAuthConfigured, @@ -1423,7 +1424,7 @@ export async function startGatewayServer( }) => { enqueueSystemEvent(value.text); if (value.mode === "now") { - requestReplyHeartbeatNow({ reason: "hook:wake" }); + requestHeartbeatNow({ reason: "hook:wake" }); } }; @@ -1481,13 +1482,13 @@ export async function startGatewayServer( : `Hook ${value.name} (${result.status})`; enqueueSystemEvent(`${prefix}: ${summary}`.trim()); if (value.wakeMode === "now") { - requestReplyHeartbeatNow({ reason: `hook:${jobId}` }); + requestHeartbeatNow({ reason: `hook:${jobId}` }); } } catch (err) { logHooks.warn(`hook agent failed: ${String(err)}`); enqueueSystemEvent(`Hook ${value.name} (error): ${String(err)}`); if (value.wakeMode === "now") { - requestReplyHeartbeatNow({ reason: `hook:${jobId}:error` }); + requestHeartbeatNow({ reason: `hook:${jobId}:error` }); } } })(); @@ -1758,7 +1759,7 @@ export async function startGatewayServer( storePath: cronStorePath, cronEnabled, enqueueSystemEvent, - requestReplyHeartbeatNow, + requestHeartbeatNow, runIsolatedAgentJob: async ({ job, message }) => { const cfg = loadConfig(); return await runCronIsolatedAgentTurn({ @@ -3360,6 +3361,8 @@ export async function startGatewayServer( broadcast("heartbeat", evt, { dropIfSlow: true }); }); + const heartbeatRunner = startHeartbeatRunner({ cfg: cfgAtStart }); + void cron .start() .catch((err) => logCron.error(`failed to start: ${String(err)}`)); @@ -5970,6 +5973,7 @@ export async function startGatewayServer( await stopWhatsAppProvider(); await stopTelegramProvider(); cron.stop(); + heartbeatRunner.stop(); broadcast("shutdown", { reason, restartExpectedMs, diff --git a/src/infra/heartbeat-runner.test.ts b/src/infra/heartbeat-runner.test.ts new file mode 100644 index 000000000..36f63a4ea --- /dev/null +++ b/src/infra/heartbeat-runner.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, it } from "vitest"; + +import type { ClawdisConfig } from "../config/config.js"; +import { HEARTBEAT_PROMPT } from "../auto-reply/heartbeat.js"; +import { + resolveHeartbeatDeliveryTarget, + resolveHeartbeatIntervalMs, + resolveHeartbeatPrompt, +} from "./heartbeat-runner.js"; + +describe("resolveHeartbeatIntervalMs", () => { + it("returns null when unset or invalid", () => { + expect(resolveHeartbeatIntervalMs({})).toBeNull(); + expect( + resolveHeartbeatIntervalMs({ agent: { heartbeat: { every: "0m" } } }), + ).toBeNull(); + expect( + resolveHeartbeatIntervalMs({ agent: { heartbeat: { every: "oops" } } }), + ).toBeNull(); + }); + + it("parses duration strings with minute defaults", () => { + expect( + resolveHeartbeatIntervalMs({ agent: { heartbeat: { every: "5m" } } }), + ).toBe(5 * 60_000); + expect( + resolveHeartbeatIntervalMs({ agent: { heartbeat: { every: "5" } } }), + ).toBe(5 * 60_000); + expect( + resolveHeartbeatIntervalMs({ agent: { heartbeat: { every: "2h" } } }), + ).toBe(2 * 60 * 60_000); + }); +}); + +describe("resolveHeartbeatPrompt", () => { + it("uses the default prompt when unset", () => { + expect(resolveHeartbeatPrompt({})).toBe(HEARTBEAT_PROMPT); + }); + + it("uses a trimmed override when configured", () => { + const cfg: ClawdisConfig = { + agent: { heartbeat: { prompt: " ping " } }, + }; + expect(resolveHeartbeatPrompt(cfg)).toBe("ping"); + }); +}); + +describe("resolveHeartbeatDeliveryTarget", () => { + const baseEntry = { + sessionId: "sid", + updatedAt: Date.now(), + }; + + it("respects target none", () => { + const cfg: ClawdisConfig = { + agent: { heartbeat: { target: "none" } }, + }; + expect(resolveHeartbeatDeliveryTarget({ cfg, entry: baseEntry })).toEqual({ + channel: "none", + reason: "target-none", + }); + }); + + it("uses last route by default", () => { + const cfg: ClawdisConfig = {}; + const entry = { + ...baseEntry, + lastChannel: "whatsapp" as const, + lastTo: "+1555", + }; + expect(resolveHeartbeatDeliveryTarget({ cfg, entry })).toEqual({ + channel: "whatsapp", + to: "+1555", + }); + }); + + it("skips when last route is webchat", () => { + const cfg: ClawdisConfig = {}; + const entry = { + ...baseEntry, + lastChannel: "webchat" as const, + lastTo: "web", + }; + expect(resolveHeartbeatDeliveryTarget({ cfg, entry })).toEqual({ + channel: "none", + reason: "no-target", + }); + }); + + it("applies allowFrom fallback for WhatsApp targets", () => { + const cfg: ClawdisConfig = { + agent: { heartbeat: { target: "whatsapp", to: "+1999" } }, + routing: { allowFrom: ["+1555", "+1666"] }, + }; + const entry = { + ...baseEntry, + lastChannel: "whatsapp" as const, + lastTo: "+1222", + }; + expect(resolveHeartbeatDeliveryTarget({ cfg, entry })).toEqual({ + channel: "whatsapp", + to: "+1555", + reason: "allowFrom-fallback", + }); + }); + + it("keeps explicit telegram targets", () => { + const cfg: ClawdisConfig = { + agent: { heartbeat: { target: "telegram", to: "123" } }, + }; + expect(resolveHeartbeatDeliveryTarget({ cfg, entry: baseEntry })).toEqual({ + channel: "telegram", + to: "123", + }); + }); +}); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts new file mode 100644 index 000000000..e7a5a30d4 --- /dev/null +++ b/src/infra/heartbeat-runner.ts @@ -0,0 +1,421 @@ +import { chunkText } from "../auto-reply/chunk.js"; +import { HEARTBEAT_PROMPT, stripHeartbeatToken } from "../auto-reply/heartbeat.js"; +import { getReplyFromConfig } from "../auto-reply/reply.js"; +import type { ReplyPayload } from "../auto-reply/types.js"; +import { parseDurationMs } from "../cli/parse-duration.js"; +import type { ClawdisConfig } from "../config/config.js"; +import { loadConfig } from "../config/config.js"; +import { + loadSessionStore, + resolveStorePath, + saveSessionStore, + type SessionEntry, +} from "../config/sessions.js"; +import { createSubsystemLogger } from "../logging.js"; +import { getQueueSize } from "../process/command-queue.js"; +import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import { normalizeE164 } from "../utils.js"; +import { sendMessageTelegram } from "../telegram/send.js"; +import { sendMessageWhatsApp } from "../web/outbound.js"; +import { emitHeartbeatEvent } from "./heartbeat-events.js"; +import { + requestHeartbeatNow, + setHeartbeatWakeHandler, + type HeartbeatRunResult, +} from "./heartbeat-wake.js"; + +export type HeartbeatTarget = "last" | "whatsapp" | "telegram" | "none"; + +export type HeartbeatDeliveryTarget = { + channel: "whatsapp" | "telegram" | "none"; + to?: string; + reason?: string; +}; + +type HeartbeatDeps = { + runtime?: RuntimeEnv; + sendWhatsApp?: typeof sendMessageWhatsApp; + sendTelegram?: typeof sendMessageTelegram; + getQueueSize?: (lane?: string) => number; + nowMs?: () => number; +}; + +const log = createSubsystemLogger("gateway/heartbeat"); +let heartbeatsEnabled = true; + +export function setHeartbeatsEnabled(enabled: boolean) { + heartbeatsEnabled = enabled; +} + +export function resolveHeartbeatIntervalMs( + cfg: ClawdisConfig, + overrideEvery?: string, +) { + const raw = overrideEvery ?? cfg.agent?.heartbeat?.every; + if (!raw) return null; + const trimmed = String(raw).trim(); + if (!trimmed) return null; + let ms: number; + try { + ms = parseDurationMs(trimmed, { defaultUnit: "m" }); + } catch { + return null; + } + if (ms <= 0) return null; + return ms; +} + +export function resolveHeartbeatPrompt(cfg: ClawdisConfig) { + const raw = cfg.agent?.heartbeat?.prompt; + const trimmed = typeof raw === "string" ? raw.trim() : ""; + return trimmed || HEARTBEAT_PROMPT; +} + +function resolveHeartbeatSession(cfg: ClawdisConfig) { + const sessionCfg = cfg.session; + const scope = sessionCfg?.scope ?? "per-sender"; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const sessionKey = scope === "global" ? "global" : mainKey; + const storePath = resolveStorePath(sessionCfg?.store); + const store = loadSessionStore(storePath); + const entry = store[sessionKey]; + return { sessionKey, storePath, store, entry }; +} + +function resolveHeartbeatSender(params: { + allowFrom: Array; + lastTo?: string; + lastChannel?: SessionEntry["lastChannel"]; +}) { + const { allowFrom, lastTo, lastChannel } = params; + const candidates = [ + lastTo?.trim(), + lastChannel === "telegram" && lastTo ? `telegram:${lastTo}` : undefined, + lastChannel === "whatsapp" && lastTo ? `whatsapp:${lastTo}` : undefined, + ].filter((val): val is string => Boolean(val && val.trim())); + + const allowList = allowFrom + .map((entry) => String(entry)) + .filter((entry) => entry && entry !== "*"); + if (allowFrom.includes("*")) { + return candidates[0] ?? "heartbeat"; + } + if (candidates.length > 0 && allowList.length > 0) { + const matched = candidates.find((candidate) => + allowList.includes(candidate), + ); + if (matched) return matched; + } + if (candidates.length > 0 && allowList.length === 0) { + return candidates[0]; + } + if (allowList.length > 0) return allowList[0]; + return candidates[0] ?? "heartbeat"; +} + +export function resolveHeartbeatDeliveryTarget(params: { + cfg: ClawdisConfig; + entry?: SessionEntry; +}): HeartbeatDeliveryTarget { + const { cfg, entry } = params; + const rawTarget = cfg.agent?.heartbeat?.target; + const target: HeartbeatTarget = + rawTarget === "whatsapp" || + rawTarget === "telegram" || + rawTarget === "none" || + rawTarget === "last" + ? rawTarget + : "last"; + if (target === "none") { + return { channel: "none", reason: "target-none" }; + } + + const explicitTo = + typeof cfg.agent?.heartbeat?.to === "string" && + cfg.agent.heartbeat.to.trim() + ? cfg.agent.heartbeat.to.trim() + : undefined; + + const lastChannel = + entry?.lastChannel && entry.lastChannel !== "webchat" + ? entry.lastChannel + : undefined; + const lastTo = typeof entry?.lastTo === "string" ? entry.lastTo.trim() : ""; + + const channel: "whatsapp" | "telegram" | undefined = + target === "last" + ? lastChannel + : target === "whatsapp" || target === "telegram" + ? target + : undefined; + + const to = + explicitTo || + (channel && lastChannel === channel ? lastTo : undefined) || + (target === "last" ? lastTo : undefined); + + if (!channel || !to) { + return { channel: "none", reason: "no-target" }; +} + +async function restoreHeartbeatUpdatedAt(params: { + storePath: string; + sessionKey: string; + updatedAt?: number; +}) { + const { storePath, sessionKey, updatedAt } = params; + if (typeof updatedAt !== "number") return; + const store = loadSessionStore(storePath); + const entry = store[sessionKey]; + if (!entry) return; + if (entry.updatedAt === updatedAt) return; + store[sessionKey] = { ...entry, updatedAt }; + await saveSessionStore(storePath, store); +} + + if (channel !== "whatsapp") { + return { channel, to }; + } + + const rawAllow = cfg.routing?.allowFrom ?? []; + if (rawAllow.includes("*")) return { channel, to }; + const allowFrom = rawAllow + .map((val) => normalizeE164(val)) + .filter((val) => val.length > 1); + if (allowFrom.length === 0) return { channel, to }; + + const normalized = normalizeE164(to); + if (allowFrom.includes(normalized)) return { channel, to: normalized }; + return { channel, to: allowFrom[0], reason: "allowFrom-fallback" }; +} + +function normalizeHeartbeatReply( + payload: ReplyPayload, + responsePrefix?: string, +) { + const stripped = stripHeartbeatToken(payload.text); + const hasMedia = Boolean( + payload.mediaUrl || (payload.mediaUrls?.length ?? 0) > 0, + ); + if (stripped.shouldSkip && !hasMedia) { + return { + shouldSkip: true, + text: "", + hasMedia, + }; + } + let finalText = stripped.text; + if (responsePrefix && finalText && !finalText.startsWith(responsePrefix)) { + finalText = `${responsePrefix} ${finalText}`; + } + return { shouldSkip: false, text: finalText, hasMedia }; +} + +async function deliverHeartbeatReply(params: { + channel: "whatsapp" | "telegram"; + to: string; + text: string; + mediaUrls: string[]; + deps: Required>; +}) { + const { channel, to, text, mediaUrls, deps } = params; + if (channel === "whatsapp") { + if (mediaUrls.length === 0) { + for (const chunk of chunkText(text, 4000)) { + await deps.sendWhatsApp(to, chunk, { verbose: false }); + } + return; + } + let first = true; + for (const url of mediaUrls) { + const caption = first ? text : ""; + first = false; + await deps.sendWhatsApp(to, caption, { verbose: false, mediaUrl: url }); + } + return; + } + + if (mediaUrls.length === 0) { + for (const chunk of chunkText(text, 4000)) { + await deps.sendTelegram(to, chunk, { verbose: false }); + } + return; + } + let first = true; + for (const url of mediaUrls) { + const caption = first ? text : ""; + first = false; + await deps.sendTelegram(to, caption, { verbose: false, mediaUrl: url }); + } +} + +export async function runHeartbeatOnce(opts: { + cfg?: ClawdisConfig; + reason?: string; + deps?: HeartbeatDeps; +}): Promise { + const cfg = opts.cfg ?? loadConfig(); + if (!heartbeatsEnabled) { + return { status: "skipped", reason: "disabled" }; + } + if (!resolveHeartbeatIntervalMs(cfg)) { + return { status: "skipped", reason: "disabled" }; + } + + const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)("main"); + if (queueSize > 0) { + return { status: "skipped", reason: "requests-in-flight" }; + } + + const startedAt = opts.deps?.nowMs?.() ?? Date.now(); + const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg); + const previousUpdatedAt = entry?.updatedAt; + const allowFrom = cfg.routing?.allowFrom ?? []; + const sender = resolveHeartbeatSender({ + allowFrom, + lastTo: entry?.lastTo, + lastChannel: entry?.lastChannel, + }); + const prompt = resolveHeartbeatPrompt(cfg); + const ctx = { + Body: prompt, + From: sender, + To: sender, + Surface: "heartbeat", + }; + + try { + const replyResult = await getReplyFromConfig( + ctx, + { isHeartbeat: true }, + cfg, + ); + const replyPayload = Array.isArray(replyResult) + ? replyResult[0] + : replyResult; + + if ( + !replyPayload || + (!replyPayload.text && + !replyPayload.mediaUrl && + !replyPayload.mediaUrls?.length) + ) { + await restoreHeartbeatUpdatedAt({ + storePath, + sessionKey, + updatedAt: previousUpdatedAt, + }); + emitHeartbeatEvent({ + status: "ok-empty", + reason: opts.reason, + durationMs: Date.now() - startedAt, + }); + return { status: "ran", durationMs: Date.now() - startedAt }; + } + + const normalized = normalizeHeartbeatReply( + replyPayload, + cfg.messages?.responsePrefix, + ); + if (normalized.shouldSkip && !normalized.hasMedia) { + await restoreHeartbeatUpdatedAt({ + storePath, + sessionKey, + updatedAt: previousUpdatedAt, + }); + emitHeartbeatEvent({ + status: "ok-token", + reason: opts.reason, + durationMs: Date.now() - startedAt, + }); + return { status: "ran", durationMs: Date.now() - startedAt }; + } + + const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry }); + const mediaUrls = + replyPayload.mediaUrls ?? (replyPayload.mediaUrl ? [replyPayload.mediaUrl] : []); + + if (delivery.channel === "none" || !delivery.to) { + emitHeartbeatEvent({ + status: "skipped", + reason: delivery.reason ?? "no-target", + preview: normalized.text?.slice(0, 200), + durationMs: Date.now() - startedAt, + hasMedia: mediaUrls.length > 0, + }); + return { status: "ran", durationMs: Date.now() - startedAt }; + } + + const deps = { + sendWhatsApp: opts.deps?.sendWhatsApp ?? sendMessageWhatsApp, + sendTelegram: opts.deps?.sendTelegram ?? sendMessageTelegram, + }; + await deliverHeartbeatReply({ + channel: delivery.channel, + to: delivery.to, + text: normalized.text, + mediaUrls, + deps, + }); + + emitHeartbeatEvent({ + status: "sent", + to: delivery.to, + preview: normalized.text?.slice(0, 200), + durationMs: Date.now() - startedAt, + hasMedia: mediaUrls.length > 0, + }); + return { status: "ran", durationMs: Date.now() - startedAt }; + } catch (err) { + emitHeartbeatEvent({ + status: "failed", + reason: String(err), + durationMs: Date.now() - startedAt, + }); + log.error({ error: String(err) }, "heartbeat failed"); + return { status: "failed", reason: String(err) }; + } +} + +export function startHeartbeatRunner(opts: { + cfg?: ClawdisConfig; + runtime?: RuntimeEnv; + abortSignal?: AbortSignal; +}) { + const cfg = opts.cfg ?? loadConfig(); + const intervalMs = resolveHeartbeatIntervalMs(cfg); + if (!intervalMs) { + log.info({ enabled: false }, "heartbeat: disabled"); + } + + const runtime = opts.runtime ?? defaultRuntime; + const run = async (params?: { reason?: string }) => { + const res = await runHeartbeatOnce({ + cfg, + reason: params?.reason, + deps: { runtime }, + }); + return res; + }; + + setHeartbeatWakeHandler(async (params) => run({ reason: params.reason })); + + let timer: NodeJS.Timeout | null = null; + if (intervalMs) { + timer = setInterval(() => { + requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + }, intervalMs); + timer.unref?.(); + log.info({ intervalMs }, "heartbeat: started"); + } + + const cleanup = () => { + setHeartbeatWakeHandler(null); + if (timer) clearInterval(timer); + timer = null; + }; + + opts.abortSignal?.addEventListener("abort", cleanup, { once: true }); + + return { stop: cleanup }; +} diff --git a/src/web/reply-heartbeat-wake.ts b/src/infra/heartbeat-wake.ts similarity index 76% rename from src/web/reply-heartbeat-wake.ts rename to src/infra/heartbeat-wake.ts index 1cb4f269f..39c09e7c8 100644 --- a/src/web/reply-heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -1,13 +1,13 @@ -export type ReplyHeartbeatWakeResult = +export type HeartbeatRunResult = | { status: "ran"; durationMs: number } | { status: "skipped"; reason: string } | { status: "failed"; reason: string }; -export type ReplyHeartbeatWakeHandler = (opts: { +export type HeartbeatWakeHandler = (opts: { reason?: string; -}) => Promise; +}) => Promise; -let handler: ReplyHeartbeatWakeHandler | null = null; +let handler: HeartbeatWakeHandler | null = null; let pendingReason: string | null = null; let scheduled = false; let running = false; @@ -51,27 +51,22 @@ function schedule(coalesceMs: number) { timer.unref?.(); } -export function setReplyHeartbeatWakeHandler( - next: ReplyHeartbeatWakeHandler | null, -) { +export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null) { handler = next; if (handler && pendingReason) { schedule(DEFAULT_COALESCE_MS); } } -export function requestReplyHeartbeatNow(opts?: { - reason?: string; - coalesceMs?: number; -}) { +export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }) { pendingReason = opts?.reason ?? pendingReason ?? "requested"; schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS); } -export function hasReplyHeartbeatWakeHandler() { +export function hasHeartbeatWakeHandler() { return handler !== null; } -export function hasPendingReplyHeartbeatWake() { +export function hasPendingHeartbeatWake() { return pendingReason !== null || Boolean(timer) || scheduled; } diff --git a/src/provider-web.ts b/src/provider-web.ts index 5a3ad2f4c..3d85d326e 100644 --- a/src/provider-web.ts +++ b/src/provider-web.ts @@ -7,7 +7,6 @@ export { monitorWebProvider, resolveHeartbeatRecipients, runWebHeartbeatOnce, - setHeartbeatsEnabled, type WebMonitorTuning, type WebProviderStatus, } from "./web/auto-reply.js"; diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 03b2b6ebc..a3714074a 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -15,19 +15,11 @@ import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ClawdisConfig } from "../config/config.js"; import { resetLogger, setLoggerOverride } from "../logging.js"; -import * as commandQueue from "../process/command-queue.js"; import { - HEARTBEAT_PROMPT, HEARTBEAT_TOKEN, monitorWebProvider, - resolveHeartbeatRecipients, - resolveReplyHeartbeatIntervalMs, - runWebHeartbeatOnce, SILENT_REPLY_TOKEN, - stripHeartbeatToken, } from "./auto-reply.js"; -import type { sendMessageWhatsApp } from "./outbound.js"; -import { requestReplyHeartbeatNow } from "./reply-heartbeat-wake.js"; import { resetBaileysMocks, resetLoadConfigMock, @@ -107,146 +99,6 @@ const makeSessionStore = async ( }; }; -describe("heartbeat helpers", () => { - it("strips heartbeat token and skips when only token", () => { - expect(stripHeartbeatToken(undefined)).toEqual({ - shouldSkip: true, - text: "", - }); - expect(stripHeartbeatToken(" ")).toEqual({ - shouldSkip: true, - text: "", - }); - expect(stripHeartbeatToken(HEARTBEAT_TOKEN)).toEqual({ - shouldSkip: true, - text: "", - }); - }); - - it("keeps content and removes token when mixed", () => { - expect(stripHeartbeatToken(`ALERT ${HEARTBEAT_TOKEN}`)).toEqual({ - shouldSkip: false, - text: "ALERT", - }); - expect(stripHeartbeatToken(`hello`)).toEqual({ - shouldSkip: false, - text: "hello", - }); - }); - - it("strips repeated OK tails after heartbeat token", () => { - expect(stripHeartbeatToken("HEARTBEAT_OK_OK_OK")).toEqual({ - shouldSkip: true, - text: "", - }); - expect(stripHeartbeatToken("HEARTBEAT_OK_OK")).toEqual({ - shouldSkip: true, - text: "", - }); - expect(stripHeartbeatToken("HEARTBEAT_OK _OK")).toEqual({ - shouldSkip: true, - text: "", - }); - expect(stripHeartbeatToken("HEARTBEAT_OK OK")).toEqual({ - shouldSkip: true, - text: "", - }); - expect(stripHeartbeatToken("ALERT HEARTBEAT_OK_OK")).toEqual({ - shouldSkip: false, - text: "ALERT", - }); - }); - - it("resolves reply heartbeat interval from config and overrides", () => { - const cfgBase: ClawdisConfig = {}; - expect(resolveReplyHeartbeatIntervalMs(cfgBase)).toBeNull(); - expect( - resolveReplyHeartbeatIntervalMs({ - agent: { heartbeat: { every: "5m" } }, - }), - ).toBe(5 * 60_000); - expect( - resolveReplyHeartbeatIntervalMs({ - agent: { heartbeat: { every: "0m" } }, - }), - ).toBeNull(); - expect(resolveReplyHeartbeatIntervalMs(cfgBase, "7m")).toBe(7 * 60_000); - expect( - resolveReplyHeartbeatIntervalMs({ - agent: { heartbeat: { every: "5" } }, - }), - ).toBe(5 * 60_000); - }); -}); - -describe("resolveHeartbeatRecipients", () => { - it("returns the sole session recipient", async () => { - const now = Date.now(); - const store = await makeSessionStore({ - main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" }, - }); - const cfg: ClawdisConfig = { - routing: { - allowFrom: ["+1999"], - }, - session: { store: store.storePath }, - }; - const result = resolveHeartbeatRecipients(cfg); - expect(result.source).toBe("session-single"); - expect(result.recipients).toEqual(["+1000"]); - await store.cleanup(); - }); - - it("surfaces ambiguity when multiple sessions exist", async () => { - const now = Date.now(); - const store = await makeSessionStore({ - main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" }, - alt: { updatedAt: now - 10, lastChannel: "whatsapp", lastTo: "+2000" }, - }); - const cfg: ClawdisConfig = { - routing: { - allowFrom: ["+1999"], - }, - session: { store: store.storePath }, - }; - const result = resolveHeartbeatRecipients(cfg); - expect(result.source).toBe("session-ambiguous"); - expect(result.recipients).toEqual(["+1000", "+2000"]); - await store.cleanup(); - }); - - it("filters wildcard allowFrom when no sessions exist", async () => { - const store = await makeSessionStore({}); - const cfg: ClawdisConfig = { - routing: { - allowFrom: ["*"], - }, - session: { store: store.storePath }, - }; - const result = resolveHeartbeatRecipients(cfg); - expect(result.recipients).toHaveLength(0); - expect(result.source).toBe("allowFrom"); - await store.cleanup(); - }); - - it("merges sessions and allowFrom when --all is set", async () => { - const now = Date.now(); - const store = await makeSessionStore({ - main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" }, - }); - const cfg: ClawdisConfig = { - routing: { - allowFrom: ["+1999"], - }, - session: { store: store.storePath }, - }; - const result = resolveHeartbeatRecipients(cfg, { all: true }); - expect(result.source).toBe("all"); - expect(result.recipients.sort()).toEqual(["+1000", "+1999"].sort()); - await store.cleanup(); - }); -}); - describe("partial reply gating", () => { it("does not send partial replies for WhatsApp surface", async () => { const reply = vi.fn().mockResolvedValue(undefined); @@ -387,249 +239,6 @@ describe("partial reply gating", () => { }); }); -describe("runWebHeartbeatOnce", () => { - it("skips when heartbeat token returned", async () => { - const store = await makeSessionStore(); - const sender: typeof sendMessageWhatsApp = vi.fn(); - const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN })); - await runWebHeartbeatOnce({ - cfg: { - routing: { - allowFrom: ["+1555"], - }, - session: { store: store.storePath }, - }, - to: "+1555", - verbose: false, - sender, - replyResolver: resolver, - }); - expect(resolver).toHaveBeenCalled(); - expect(sender).not.toHaveBeenCalled(); - await store.cleanup(); - }); - - it("sends when alert text present", async () => { - const store = await makeSessionStore(); - const sender: typeof sendMessageWhatsApp = vi - .fn() - .mockResolvedValue({ messageId: "m1", toJid: "jid" }); - const resolver = vi.fn(async () => ({ text: "ALERT" })); - await runWebHeartbeatOnce({ - cfg: { - routing: { - allowFrom: ["+1555"], - }, - session: { store: store.storePath }, - }, - to: "+1555", - verbose: false, - sender, - replyResolver: resolver, - }); - expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false }); - await store.cleanup(); - }); - - it("falls back to most recent session when no to is provided", async () => { - const store = await makeSessionStore(); - const storePath = store.storePath; - const sender: typeof sendMessageWhatsApp = vi - .fn() - .mockResolvedValue({ messageId: "m1", toJid: "jid" }); - const resolver = vi.fn(async () => ({ text: "ALERT" })); - const now = Date.now(); - const sessionEntries = { - "+1222": { sessionId: "s1", updatedAt: now - 1000 }, - "+1333": { sessionId: "s2", updatedAt: now }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionEntries)); - await runWebHeartbeatOnce({ - cfg: { - routing: { - allowFrom: ["+1999"], - }, - session: { store: storePath }, - }, - to: "+1999", - verbose: false, - sender, - replyResolver: resolver, - }); - expect(sender).toHaveBeenCalledWith("+1999", "ALERT", { verbose: false }); - await store.cleanup(); - }); - - it("does not refresh updatedAt when heartbeat is skipped", async () => { - const tmpDir = await fs.mkdtemp( - path.join(os.tmpdir(), "clawdis-heartbeat-"), - ); - const storePath = path.join(tmpDir, "sessions.json"); - const now = Date.now(); - const originalUpdated = now - 30 * 60 * 1000; - const store = { - "+1555": { sessionId: "sess1", updatedAt: originalUpdated }, - }; - await fs.writeFile(storePath, JSON.stringify(store)); - - const sender: typeof sendMessageWhatsApp = vi.fn(); - const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN })); - setLoadConfigMock({ - routing: { - allowFrom: ["+1555"], - }, - session: { - store: storePath, - idleMinutes: 60, - heartbeatIdleMinutes: 10, - }, - }); - - await runWebHeartbeatOnce({ - to: "+1555", - verbose: false, - sender, - replyResolver: resolver, - }); - - const after = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(after["+1555"].updatedAt).toBe(originalUpdated); - expect(sender).not.toHaveBeenCalled(); - }); - - it("heartbeat reuses existing session id when last inbound is present", async () => { - const tmpDir = await fs.mkdtemp( - path.join(os.tmpdir(), "clawdis-heartbeat-session-"), - ); - const storePath = path.join(tmpDir, "sessions.json"); - const sessionId = "sess-keep"; - await fs.writeFile( - storePath, - JSON.stringify({ - main: { sessionId, updatedAt: Date.now(), systemSent: false }, - }), - ); - - setLoadConfigMock(() => ({ - routing: { - allowFrom: ["+4367"], - }, - session: { store: storePath, idleMinutes: 60 }, - })); - - const replyResolver = vi.fn().mockResolvedValue({ text: HEARTBEAT_TOKEN }); - const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never; - const cfg: ClawdisConfig = { - routing: { - allowFrom: ["+4367"], - }, - session: { store: storePath, idleMinutes: 60 }, - }; - - await runWebHeartbeatOnce({ - cfg, - to: "+4367", - verbose: false, - replyResolver, - runtime, - }); - - const heartbeatCall = replyResolver.mock.calls.find( - (call) => call[0]?.Body === HEARTBEAT_PROMPT, - ); - expect(heartbeatCall?.[0]?.MessageSid).toBe(sessionId); - }); - - it("heartbeat honors session-id override and seeds store", async () => { - const tmpDir = await fs.mkdtemp( - path.join(os.tmpdir(), "clawdis-heartbeat-override-"), - ); - const storePath = path.join(tmpDir, "sessions.json"); - await fs.writeFile(storePath, JSON.stringify({})); - - const sessionId = "override-123"; - setLoadConfigMock(() => ({ - routing: { - allowFrom: ["+1999"], - }, - session: { store: storePath, idleMinutes: 60 }, - })); - - const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN })); - const cfg: ClawdisConfig = { - routing: { - allowFrom: ["+1999"], - }, - session: { store: storePath, idleMinutes: 60 }, - }; - await runWebHeartbeatOnce({ - cfg, - to: "+1999", - verbose: false, - replyResolver: resolver, - sessionId, - }); - - const heartbeatCall = resolver.mock.calls.find( - (call) => call[0]?.Body === HEARTBEAT_PROMPT, - ); - expect(heartbeatCall?.[0]?.MessageSid).toBe(sessionId); - const raw = await fs.readFile(storePath, "utf-8"); - const stored = raw ? JSON.parse(raw) : {}; - expect(stored.main?.sessionId).toBe(sessionId); - expect(stored.main?.updatedAt).toBeDefined(); - }); - - it("sends overrideBody directly and skips resolver", async () => { - const store = await makeSessionStore(); - const sender: typeof sendMessageWhatsApp = vi - .fn() - .mockResolvedValue({ messageId: "m1", toJid: "jid" }); - const resolver = vi.fn(); - await runWebHeartbeatOnce({ - cfg: { - routing: { - allowFrom: ["+1555"], - }, - session: { store: store.storePath }, - }, - to: "+1555", - verbose: false, - sender, - replyResolver: resolver, - overrideBody: "manual ping", - }); - expect(sender).toHaveBeenCalledWith("+1555", "manual ping", { - verbose: false, - }); - expect(resolver).not.toHaveBeenCalled(); - await store.cleanup(); - }); - - it("dry-run overrideBody prints and skips send", async () => { - const store = await makeSessionStore(); - const sender: typeof sendMessageWhatsApp = vi.fn(); - const resolver = vi.fn(); - await runWebHeartbeatOnce({ - cfg: { - routing: { - allowFrom: ["+1555"], - }, - session: { store: store.storePath }, - }, - to: "+1555", - verbose: false, - sender, - replyResolver: resolver, - overrideBody: "dry", - dryRun: true, - }); - expect(sender).not.toHaveBeenCalled(); - expect(resolver).not.toHaveBeenCalled(); - await store.cleanup(); - }); -}); - describe("web auto-reply", () => { beforeEach(() => { vi.clearAllMocks(); @@ -746,153 +355,6 @@ describe("web auto-reply", () => { }, ); - it("skips reply heartbeat when requests are running", async () => { - const tmpDir = await fs.mkdtemp( - path.join(os.tmpdir(), "clawdis-heartbeat-queue-"), - ); - const storePath = path.join(tmpDir, "sessions.json"); - await fs.writeFile(storePath, JSON.stringify({})); - - const queueSpy = vi.spyOn(commandQueue, "getQueueSize").mockReturnValue(2); - const replyResolver = vi.fn(); - const listenerFactory = vi.fn(async () => { - const onClose = new Promise(() => { - // stay open until aborted - }); - return { close: vi.fn(), onClose }; - }); - const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never; - - setLoadConfigMock(() => ({ - routing: { - allowFrom: ["+1555"], - }, - session: { store: storePath }, - })); - - const controller = new AbortController(); - const run = monitorWebProvider( - false, - listenerFactory, - true, - replyResolver, - runtime, - controller.signal, - { replyHeartbeatEvery: "1m", replyHeartbeatNow: true }, - ); - - try { - await Promise.resolve(); - controller.abort(); - await run; - expect(replyResolver).not.toHaveBeenCalled(); - } finally { - queueSpy.mockRestore(); - } - }); - - it("falls back to main recipient when last inbound is a group chat", async () => { - const now = Date.now(); - const store = await makeSessionStore({ - main: { - sessionId: "sid-main", - updatedAt: now, - lastChannel: "whatsapp", - lastTo: "+1555", - }, - }); - - const replyResolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN })); - let capturedOnMessage: - | ((msg: import("./inbound.js").WebInboundMessage) => Promise) - | undefined; - const listenerFactory = vi.fn( - async (opts: { - onMessage: ( - msg: import("./inbound.js").WebInboundMessage, - ) => Promise; - }) => { - capturedOnMessage = opts.onMessage; - const onClose = new Promise(() => { - // stay open until aborted - }); - return { close: vi.fn(), onClose }; - }, - ); - const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never; - - setLoadConfigMock(() => ({ - routing: { - allowFrom: ["+1555"], - groupChat: { requireMention: true, mentionPatterns: ["@clawd"] }, - }, - session: { store: store.storePath }, - })); - - const controller = new AbortController(); - const run = monitorWebProvider( - false, - listenerFactory, - true, - replyResolver, - runtime, - controller.signal, - { replyHeartbeatEvery: "10000m" }, - ); - - try { - await Promise.resolve(); - expect(capturedOnMessage).toBeDefined(); - - await capturedOnMessage?.({ - body: "hello group", - from: "123@g.us", - to: "+1555", - id: "g1", - sendComposing: vi.fn(), - reply: vi.fn(), - sendMedia: vi.fn(), - chatType: "group", - conversationId: "123@g.us", - chatId: "123@g.us", - }); - - // No mention => no auto-reply for the group message. - await new Promise((resolve) => setTimeout(resolve, 10)); - expect( - replyResolver.mock.calls.some( - (call) => call[0]?.Body !== HEARTBEAT_PROMPT, - ), - ).toBe(false); - - requestReplyHeartbeatNow({ coalesceMs: 0 }); - let heartbeatCall = replyResolver.mock.calls.find( - (call) => - call[0]?.Body === HEARTBEAT_PROMPT && - call[0]?.MessageSid === "sid-main", - ); - const deadline = Date.now() + 1000; - while (!heartbeatCall && Date.now() < deadline) { - await new Promise((resolve) => setTimeout(resolve, 10)); - heartbeatCall = replyResolver.mock.calls.find( - (call) => - call[0]?.Body === HEARTBEAT_PROMPT && - call[0]?.MessageSid === "sid-main", - ); - } - controller.abort(); - await run; - - expect(heartbeatCall).toBeDefined(); - expect(heartbeatCall?.[0]?.From).toBe("+1555"); - expect(heartbeatCall?.[0]?.To).toBe("+1555"); - expect(heartbeatCall?.[0]?.MessageSid).toBe("sid-main"); - } finally { - controller.abort(); - await store.cleanup(); - } - }); - it("processes inbound messages without batching and preserves timestamps", async () => { const originalTz = process.env.TZ; process.env.TZ = "Europe/Vienna"; diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 3c39088c5..b36a38c96 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -5,9 +5,12 @@ import { parseActivationCommand, } from "../auto-reply/group-activation.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; +import { + HEARTBEAT_PROMPT, + stripHeartbeatToken, +} from "../auto-reply/heartbeat.js"; import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; -import { parseDurationMs } from "../cli/parse-duration.js"; import { waitForever } from "../cli/wait.js"; import { loadConfig } from "../config/config.js"; import { @@ -22,7 +25,6 @@ import { isVerbose, logVerbose } from "../globals.js"; import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { createSubsystemLogger, getChildLogger } from "../logging.js"; -import { getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { isSelfChatMode, jidToE164, normalizeE164 } from "../utils.js"; import { setActiveWebListener } from "./active-listener.js"; @@ -37,8 +39,6 @@ import { resolveReconnectPolicy, sleepWithAbort, } from "./reconnect.js"; -import type { ReplyHeartbeatWakeResult } from "./reply-heartbeat-wake.js"; -import { setReplyHeartbeatWakeHandler } from "./reply-heartbeat-wake.js"; import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js"; const WEB_TEXT_LIMIT = 4000; @@ -48,11 +48,6 @@ const whatsappInboundLog = whatsappLog.child("inbound"); const whatsappOutboundLog = whatsappLog.child("outbound"); const whatsappHeartbeatLog = whatsappLog.child("heartbeat"); -let heartbeatsEnabled = true; -export function setHeartbeatsEnabled(enabled: boolean) { - heartbeatsEnabled = enabled; -} - // Send via the active gateway-backed listener. The monitor already owns the single // Baileys session, so use its send API directly. async function sendWithIpcFallback( @@ -73,8 +68,6 @@ type WebInboundMsg = Parameters< export type WebMonitorTuning = { reconnect?: Partial; heartbeatSeconds?: number; - replyHeartbeatEvery?: string; - replyHeartbeatNow?: boolean; sleep?: (ms: number, signal?: AbortSignal) => Promise; statusSink?: (status: WebProviderStatus) => void; }; @@ -82,8 +75,7 @@ export type WebMonitorTuning = { const formatDuration = (ms: number) => ms >= 1000 ? `${(ms / 1000).toFixed(2)}s` : `${ms}ms`; -export const HEARTBEAT_PROMPT = "HEARTBEAT"; -export { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN }; +export { HEARTBEAT_PROMPT, HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN }; export type WebProviderStatus = { running: boolean; @@ -188,41 +180,7 @@ function debugMention( return { wasMentioned: result, details }; } -export function resolveReplyHeartbeatIntervalMs( - cfg: ReturnType, - overrideEvery?: string, -) { - const raw = overrideEvery ?? cfg.agent?.heartbeat?.every; - if (!raw) return null; - const trimmed = String(raw).trim(); - if (!trimmed) return null; - let ms: number; - try { - ms = parseDurationMs(trimmed, { defaultUnit: "m" }); - } catch { - return null; - } - if (ms <= 0) return null; - return ms; -} - -export function stripHeartbeatToken(raw?: string) { - if (!raw) return { shouldSkip: true, text: "" }; - const trimmed = raw.trim(); - if (!trimmed) return { shouldSkip: true, text: "" }; - if (trimmed === HEARTBEAT_TOKEN) return { shouldSkip: true, text: "" }; - const hadToken = trimmed.includes(HEARTBEAT_TOKEN); - let withoutToken = trimmed.replaceAll(HEARTBEAT_TOKEN, "").trim(); - if (hadToken && withoutToken) { - // LLMs sometimes echo malformed HEARTBEAT_OK_OK... tails; strip trailing OK runs to avoid spam. - withoutToken = withoutToken.replace(/[\s_]*OK(?:[\s_]*OK)*$/gi, "").trim(); - } - const shouldSkip = withoutToken.length === 0; - return { - shouldSkip, - text: shouldSkip ? "" : withoutToken || trimmed, - }; -} +export { stripHeartbeatToken }; function isSilentReply(payload?: ReplyPayload): boolean { if (!payload) return false; @@ -427,27 +385,6 @@ export async function runWebHeartbeatOnce(opts: { } } -function getFallbackRecipient(cfg: ReturnType) { - const sessionCfg = cfg.session; - const storePath = resolveStorePath(sessionCfg?.store); - const store = loadSessionStore(storePath); - const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; - const main = store[mainKey]; - const lastTo = typeof main?.lastTo === "string" ? main.lastTo.trim() : ""; - const lastChannel = main?.lastChannel; - - if (lastChannel === "whatsapp" && lastTo) { - return normalizeE164(lastTo); - } - - const allowFrom = - Array.isArray(cfg.routing?.allowFrom) && cfg.routing.allowFrom.length > 0 - ? cfg.routing.allowFrom.filter((v) => v !== "*") - : []; - if (allowFrom.length === 0) return null; - return allowFrom[0] ? normalizeE164(allowFrom[0]) : null; -} - function getSessionRecipients(cfg: ReturnType) { const sessionCfg = cfg.session; const scope = sessionCfg?.scope ?? "per-sender"; @@ -775,10 +712,6 @@ export async function monitorWebProvider( cfg, tuning.heartbeatSeconds, ); - const replyHeartbeatIntervalMs = resolveReplyHeartbeatIntervalMs( - cfg, - tuning.replyHeartbeatEvery, - ); const reconnectPolicy = resolveReconnectPolicy(cfg, tuning.reconnect); const mentionConfig = buildMentionConfig(cfg); const sessionStorePath = resolveStorePath(cfg.session?.store); @@ -940,7 +873,6 @@ export async function monitorWebProvider( const connectionId = newConnectionId(); const startedAt = Date.now(); let heartbeat: NodeJS.Timeout | null = null; - let replyHeartbeatTimer: NodeJS.Timeout | null = null; let watchdogTimer: NodeJS.Timeout | null = null; let lastMessageAt: number | null = null; let handledMessages = 0; @@ -1346,9 +1278,7 @@ export async function monitorWebProvider( const closeListener = async () => { setActiveWebListener(null); - setReplyHeartbeatWakeHandler(null); if (heartbeat) clearInterval(heartbeat); - if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer); if (watchdogTimer) clearInterval(watchdogTimer); if (backgroundTasks.size > 0) { await Promise.allSettled(backgroundTasks); @@ -1363,7 +1293,6 @@ export async function monitorWebProvider( if (keepAlive) { heartbeat = setInterval(() => { - if (!heartbeatsEnabled) return; const authAgeMs = getWebAuthAgeMs(); const minutesSinceLastMessage = lastMessageAt ? Math.floor((Date.now() - lastMessageAt) / 60000) @@ -1420,240 +1349,6 @@ export async function monitorWebProvider( }, WATCHDOG_CHECK_MS); } - const runReplyHeartbeat = async (): Promise => { - const started = Date.now(); - if (!heartbeatsEnabled) { - return { status: "skipped", reason: "disabled" }; - } - const queued = getQueueSize(); - if (queued > 0) { - heartbeatLogger.info( - { connectionId, reason: "requests-in-flight", queued }, - "reply heartbeat skipped", - ); - if (isVerbose()) { - whatsappHeartbeatLog.debug("heartbeat skipped (requests in flight)"); - } - return { status: "skipped", reason: "requests-in-flight" }; - } - if (!replyHeartbeatIntervalMs) { - return { status: "skipped", reason: "disabled" }; - } - let heartbeatInboundMsg = lastInboundMsg; - if (heartbeatInboundMsg?.chatType === "group") { - // Heartbeats should never target group chats. If the last inbound activity - // was in a group, fall back to the main/direct session recipient instead - // of skipping heartbeats entirely. - heartbeatLogger.info( - { connectionId, reason: "last-inbound-group" }, - "reply heartbeat falling back", - ); - heartbeatInboundMsg = null; - } - const tickStart = Date.now(); - if (!heartbeatInboundMsg) { - const fallbackTo = getFallbackRecipient(cfg); - if (!fallbackTo) { - heartbeatLogger.info( - { - connectionId, - reason: "no-recent-inbound", - durationMs: Date.now() - tickStart, - }, - "reply heartbeat skipped", - ); - if (isVerbose()) { - whatsappHeartbeatLog.debug("heartbeat skipped (no recent inbound)"); - } - return { status: "skipped", reason: "no-recent-inbound" }; - } - const snapshot = getSessionSnapshot(cfg, fallbackTo, true); - if (!snapshot.entry) { - heartbeatLogger.info( - { connectionId, to: fallbackTo, reason: "no-session-for-fallback" }, - "reply heartbeat skipped", - ); - if (isVerbose()) { - whatsappHeartbeatLog.debug( - "heartbeat skipped (no session to resume)", - ); - } - return { status: "skipped", reason: "no-session-for-fallback" }; - } - if (isVerbose()) { - heartbeatLogger.info( - { - connectionId, - to: fallbackTo, - reason: "fallback-session", - sessionId: snapshot.entry?.sessionId ?? null, - sessionFresh: snapshot.fresh, - }, - "reply heartbeat start", - ); - } - await runWebHeartbeatOnce({ - cfg, - to: fallbackTo, - verbose, - replyResolver, - sessionId: snapshot.entry.sessionId, - }); - heartbeatLogger.info( - { - connectionId, - to: fallbackTo, - ...snapshot, - durationMs: Date.now() - tickStart, - }, - "reply heartbeat sent (fallback session)", - ); - return { status: "ran", durationMs: Date.now() - started }; - } - - try { - const snapshot = getSessionSnapshot(cfg, heartbeatInboundMsg.from); - if (isVerbose()) { - heartbeatLogger.info( - { - connectionId, - to: heartbeatInboundMsg.from, - intervalMs: replyHeartbeatIntervalMs, - sessionKey: snapshot.key, - sessionId: snapshot.entry?.sessionId ?? null, - sessionFresh: snapshot.fresh, - }, - "reply heartbeat start", - ); - } - const replyResult = await (replyResolver ?? getReplyFromConfig)( - { - Body: HEARTBEAT_PROMPT, - From: heartbeatInboundMsg.from, - To: heartbeatInboundMsg.to, - MessageSid: snapshot.entry?.sessionId, - MediaPath: undefined, - MediaUrl: undefined, - MediaType: undefined, - }, - { - onReplyStart: heartbeatInboundMsg.sendComposing, - isHeartbeat: true, - }, - ); - - const replyPayload = Array.isArray(replyResult) - ? replyResult[0] - : replyResult; - - if ( - !replyPayload || - (!replyPayload.text && - !replyPayload.mediaUrl && - !replyPayload.mediaUrls?.length) - ) { - heartbeatLogger.info( - { - connectionId, - durationMs: Date.now() - tickStart, - reason: "empty-reply", - }, - "reply heartbeat skipped", - ); - if (isVerbose()) { - whatsappHeartbeatLog.debug("heartbeat ok (empty reply)"); - } - return { status: "ran", durationMs: Date.now() - started }; - } - - const stripped = stripHeartbeatToken(replyPayload.text); - const hasMedia = Boolean( - replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0, - ); - if (stripped.shouldSkip && !hasMedia) { - heartbeatLogger.info( - { - connectionId, - durationMs: Date.now() - tickStart, - reason: "heartbeat-token", - rawLength: replyPayload.text?.length ?? 0, - }, - "reply heartbeat skipped", - ); - if (isVerbose()) { - whatsappHeartbeatLog.debug("heartbeat ok (HEARTBEAT_OK)"); - } - return { status: "ran", durationMs: Date.now() - started }; - } - - // Apply response prefix if configured (same as regular messages) - let finalText = stripped.text; - const responsePrefix = cfg.messages?.responsePrefix; - if ( - responsePrefix && - finalText && - !finalText.startsWith(responsePrefix) - ) { - finalText = `${responsePrefix} ${finalText}`; - } - - const cleanedReply: ReplyPayload = { - ...replyPayload, - text: finalText, - }; - - await deliverWebReply({ - replyResult: cleanedReply, - msg: heartbeatInboundMsg, - maxMediaBytes, - replyLogger, - connectionId, - }); - - const durationMs = Date.now() - tickStart; - whatsappHeartbeatLog.info( - `heartbeat alert sent (${formatDuration(durationMs)})`, - ); - heartbeatLogger.info( - { - connectionId, - durationMs, - hasMedia, - chars: stripped.text?.length ?? 0, - }, - "reply heartbeat sent", - ); - return { status: "ran", durationMs: Date.now() - started }; - } catch (err) { - const durationMs = Date.now() - tickStart; - heartbeatLogger.warn( - { - connectionId, - error: formatError(err), - durationMs, - }, - "reply heartbeat failed", - ); - whatsappHeartbeatLog.warn( - `heartbeat failed (${formatDuration(durationMs)})`, - ); - return { status: "failed", reason: formatError(err) }; - } - }; - - setReplyHeartbeatWakeHandler(async () => runReplyHeartbeat()); - - if (replyHeartbeatIntervalMs && !replyHeartbeatTimer) { - const intervalMs = replyHeartbeatIntervalMs; - replyHeartbeatTimer = setInterval(() => { - if (!heartbeatsEnabled) return; - void runReplyHeartbeat(); - }, intervalMs); - if (tuning.replyHeartbeatNow) { - void runReplyHeartbeat(); - } - } - whatsappLog.info( "Listening for personal WhatsApp inbound messages. Ctrl+C to stop.", );