From 61e385b33163b6f4dd8a5afb81ea3a71c3d886fe Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 16 Jan 2026 00:46:07 +0000 Subject: [PATCH] feat: add per-agent heartbeat config --- CHANGELOG.md | 1 + docs/channels/whatsapp.md | 6 +- docs/gateway/configuration.md | 7 +- docs/gateway/heartbeat.md | 49 ++++- docs/reference/templates/AGENTS.md | 2 +- docs/start/clawd.md | 2 +- docs/tools/thinking.md | 4 +- src/agents/agent-scope.ts | 2 + src/config/types.agents.ts | 2 + src/config/zod-schema.agent-runtime.ts | 1 + ...espects-ackmaxchars-heartbeat-acks.test.ts | 197 +++++++++--------- ...tbeat-runner.returns-default-unset.test.ts | 187 +++++++++++++---- src/infra/heartbeat-runner.ts | 156 +++++++++++--- src/infra/outbound/targets.ts | 10 +- 14 files changed, 441 insertions(+), 185 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae0ee14aa..2aab56422 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Plugins: add provider auth registry + `clawdbot models auth login` for plugin-driven OAuth/API key flows. - Onboarding: prompt to modify/disable/delete when reconfiguring existing channel accounts and keep channel selection looping until Finished. - TUI: show provider/model labels for the active session and default model. +- Heartbeat: add per-agent heartbeat configuration and multi-agent docs example. - Fix: list model picker entries as provider/model pairs for explicit selection. (#970) — thanks @mcinteerj. - Fix: persist `gateway.mode=local` after selecting Local run mode in `clawdbot configure`, even if no other sections are chosen. - Daemon: fix profile-aware service label resolution (env-driven) and add coverage for launchd/systemd/schtasks. (#969) — thanks @bjesuiter. diff --git a/docs/channels/whatsapp.md b/docs/channels/whatsapp.md index e66e18769..97b089981 100644 --- a/docs/channels/whatsapp.md +++ b/docs/channels/whatsapp.md @@ -296,8 +296,9 @@ WhatsApp can automatically send emoji reactions to incoming messages immediately ## Heartbeats - **Gateway heartbeat** logs connection health (`web.heartbeatSeconds`, default 60s). -- **Agent heartbeat** is global (`agents.defaults.heartbeat.*`) and runs in the main session. - - Uses the configured heartbeat prompt (default: `Read HEARTBEAT.md if exists. Consider outstanding tasks. Checkup sometimes on your human during (user local) day time.`) + `HEARTBEAT_OK` skip behavior. +- **Agent heartbeat** can be configured per agent (`agents.list[].heartbeat`) or globally + via `agents.defaults.heartbeat` (fallback when no per-agent entries are set). + - Uses the configured heartbeat prompt (default: `Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.`) + `HEARTBEAT_OK` skip behavior. - Delivery defaults to the last used channel (or configured target). ## Reconnect behavior @@ -330,6 +331,7 @@ WhatsApp can automatically send emoji reactions to incoming messages immediately - `agents.defaults.heartbeat.model` (optional override) - `agents.defaults.heartbeat.target` - `agents.defaults.heartbeat.to` +- `agents.list[].heartbeat.*` (per-agent overrides) - `session.*` (scope, idle, store, mainKey) - `web.enabled` (disable channel startup when false) - `web.heartbeatSeconds` diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index 65988c1a3..0e3ac6967 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -1675,9 +1675,14 @@ Z.AI models are available as `zai/` (e.g. `zai/glm-4.7`) and require - `includeReasoning`: when `true`, heartbeats will also deliver the separate `Reasoning:` message when available (same shape as `/reasoning on`). Default: `false`. - `target`: optional delivery channel (`last`, `whatsapp`, `telegram`, `discord`, `slack`, `signal`, `imessage`, `none`). Default: `last`. - `to`: optional recipient override (channel-specific id, e.g. E.164 for WhatsApp, chat id for Telegram). -- `prompt`: optional override for the heartbeat body (default: `Read HEARTBEAT.md if exists. Consider outstanding tasks. Checkup sometimes on your human during (user local) day time.`). Overrides are sent verbatim; include a `Read HEARTBEAT.md if exists` line if you still want the file read. +- `prompt`: optional override for the heartbeat body (default: `Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.`). Overrides are sent verbatim; include a `Read HEARTBEAT.md` line if you still want the file read. - `ackMaxChars`: max chars allowed after `HEARTBEAT_OK` before delivery (default: 300). +Per-agent heartbeats: +- Set `agents.list[].heartbeat` to enable or override heartbeat settings for a specific agent. +- If any agent entry defines `heartbeat`, **only those agents** run heartbeats; defaults + become the shared baseline for those agents. + Heartbeats run full agent turns. Shorter intervals burn more tokens; be mindful of `every`, keep `HEARTBEAT.md` tiny, and/or choose a cheaper `model`. diff --git a/docs/gateway/heartbeat.md b/docs/gateway/heartbeat.md index 88157a641..639ee23cb 100644 --- a/docs/gateway/heartbeat.md +++ b/docs/gateway/heartbeat.md @@ -33,9 +33,9 @@ Example config: ## Defaults -- Interval: `30m` (set `agents.defaults.heartbeat.every`; use `0m` to disable). +- Interval: `30m` (set `agents.defaults.heartbeat.every` or per-agent `agents.list[].heartbeat.every`; use `0m` to disable). - Prompt body (configurable via `agents.defaults.heartbeat.prompt`): - `Read HEARTBEAT.md if exists. Consider outstanding tasks. Checkup sometimes on your human during (user local) day time.` + `Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.` - The heartbeat prompt is sent **verbatim** as the user message. The system prompt includes a “Heartbeat” section and the run is flagged internally. @@ -49,8 +49,8 @@ The default prompt is intentionally broad: by using your configured local timezone (see [/concepts/timezone](/concepts/timezone)). If you want a heartbeat to do something very specific (e.g. “check Gmail PubSub -stats” or “verify gateway health”), set `agents.defaults.heartbeat.prompt` to a -custom body (sent verbatim). +stats” or “verify gateway health”), set `agents.defaults.heartbeat.prompt` (or +`agents.list[].heartbeat.prompt`) to a custom body (sent verbatim). ## Response contract @@ -77,7 +77,7 @@ and logged; a message that is only `HEARTBEAT_OK` is dropped. includeReasoning: false, // default: false (deliver separate Reasoning: message when available) target: "last", // last | whatsapp | telegram | discord | slack | signal | imessage | none to: "+15551234567", // optional channel-specific override - prompt: "Read HEARTBEAT.md if exists. Consider outstanding tasks. Checkup sometimes on your human during (user local) day time.", + prompt: "Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.", ackMaxChars: 300 // max chars allowed after HEARTBEAT_OK } } @@ -85,6 +85,39 @@ and logged; a message that is only `HEARTBEAT_OK` is dropped. } ``` +### Per-agent heartbeats + +If any `agents.list[]` entry includes a `heartbeat` block, **only those agents** +run heartbeats. The per-agent block merges on top of `agents.defaults.heartbeat` +(so you can set shared defaults once and override per agent). + +Example: two agents, only the second agent runs heartbeats. + +```json5 +{ + agents: { + defaults: { + heartbeat: { + every: "30m", + target: "last" + } + }, + list: [ + { id: "main", default: true }, + { + id: "ops", + heartbeat: { + every: "1h", + target: "whatsapp", + to: "+15551234567", + prompt: "Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK." + } + } + ] + } +} +``` + ### Field notes - `every`: heartbeat interval (duration string; default unit = minutes). @@ -100,7 +133,8 @@ and logged; a message that is only `HEARTBEAT_OK` is dropped. ## Delivery behavior -- Heartbeats run in the **main session** (`main`, or `global` when scope is global). +- Heartbeats run in each agent’s **main session** (`agent::`), or `global` + when `session.scope = "global"`. - If the main queue is busy, the heartbeat is skipped and retried later. - If `target` resolves to no external destination, the run still happens but no outbound message is sent. @@ -149,6 +183,9 @@ You can enqueue a system event and trigger an immediate heartbeat with: clawdbot wake --text "Check for urgent follow-ups" --mode now ``` +If multiple agents have `heartbeat` configured, a manual wake runs each of those +agent heartbeats immediately. + Use `--mode next-heartbeat` to wait for the next scheduled tick. ## Reasoning delivery (optional) diff --git a/docs/reference/templates/AGENTS.md b/docs/reference/templates/AGENTS.md index 776bace8e..e38dcea82 100644 --- a/docs/reference/templates/AGENTS.md +++ b/docs/reference/templates/AGENTS.md @@ -108,7 +108,7 @@ Skills provide your tools. When you need one, check its `SKILL.md`. Keep local n When you receive a heartbeat poll (message matches the configured heartbeat prompt), don't just reply `HEARTBEAT_OK` every time. Use heartbeats productively! Default heartbeat prompt: -`Read HEARTBEAT.md if exists. Consider outstanding tasks. Checkup sometimes on your human during (user local) day time.` +`Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.` You are free to edit `HEARTBEAT.md` with a short checklist or reminders. Keep it small to limit token burn. diff --git a/docs/start/clawd.md b/docs/start/clawd.md index c06a687e9..f57b8dc0d 100644 --- a/docs/start/clawd.md +++ b/docs/start/clawd.md @@ -175,7 +175,7 @@ Example: ## Heartbeats (proactive mode) By default, Clawdbot runs a heartbeat every 30 minutes with the prompt: -`Read HEARTBEAT.md if exists. Consider outstanding tasks. Checkup sometimes on your human during (user local) day time.` +`Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.` Set `agents.defaults.heartbeat.every: "0m"` to disable. - If the agent replies with `HEARTBEAT_OK` (optionally with short padding; see `agents.defaults.heartbeat.ackMaxChars`), Clawdbot suppresses outbound delivery for that heartbeat. diff --git a/docs/tools/thinking.md b/docs/tools/thinking.md index b11603866..8fde4fa3e 100644 --- a/docs/tools/thinking.md +++ b/docs/tools/thinking.md @@ -50,8 +50,8 @@ read_when: - Elevated mode docs live in [Elevated mode](/tools/elevated). ## Heartbeats -- Heartbeat probe body is the configured heartbeat prompt (default: `Read HEARTBEAT.md if exists. Consider outstanding tasks. Checkup sometimes on your human during (user local) day time.`). Inline directives in a heartbeat message apply as usual (but avoid changing session defaults from heartbeats). -- Heartbeat delivery defaults to the final payload only. To also send the separate `Reasoning:` message (when available), set `agents.defaults.heartbeat.includeReasoning: true`. +- Heartbeat probe body is the configured heartbeat prompt (default: `Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.`). Inline directives in a heartbeat message apply as usual (but avoid changing session defaults from heartbeats). +- Heartbeat delivery defaults to the final payload only. To also send the separate `Reasoning:` message (when available), set `agents.defaults.heartbeat.includeReasoning: true` or per-agent `agents.list[].heartbeat.includeReasoning: true`. ## Web chat UI - The web chat thinking selector mirrors the session's stored level from the inbound session store/config when the page loads. diff --git a/src/agents/agent-scope.ts b/src/agents/agent-scope.ts index dcaad42b4..db48002e3 100644 --- a/src/agents/agent-scope.ts +++ b/src/agents/agent-scope.ts @@ -22,6 +22,7 @@ type ResolvedAgentConfig = { model?: AgentEntry["model"]; memorySearch?: AgentEntry["memorySearch"]; humanDelay?: AgentEntry["humanDelay"]; + heartbeat?: AgentEntry["heartbeat"]; identity?: AgentEntry["identity"]; groupChat?: AgentEntry["groupChat"]; subagents?: AgentEntry["subagents"]; @@ -89,6 +90,7 @@ export function resolveAgentConfig( : undefined, memorySearch: entry.memorySearch, humanDelay: entry.humanDelay, + heartbeat: entry.heartbeat, identity: entry.identity, groupChat: entry.groupChat, subagents: typeof entry.subagents === "object" && entry.subagents ? entry.subagents : undefined, diff --git a/src/config/types.agents.ts b/src/config/types.agents.ts index 214a07603..f083c1897 100644 --- a/src/config/types.agents.ts +++ b/src/config/types.agents.ts @@ -27,6 +27,8 @@ export type AgentConfig = { memorySearch?: MemorySearchConfig; /** Human-like delay between block replies for this agent. */ humanDelay?: HumanDelayConfig; + /** Optional per-agent heartbeat overrides. */ + heartbeat?: AgentDefaultsConfig["heartbeat"]; identity?: IdentityConfig; groupChat?: GroupChatConfig; subagents?: { diff --git a/src/config/zod-schema.agent-runtime.ts b/src/config/zod-schema.agent-runtime.ts index a7b06a8c4..a89f9d378 100644 --- a/src/config/zod-schema.agent-runtime.ts +++ b/src/config/zod-schema.agent-runtime.ts @@ -255,6 +255,7 @@ export const AgentEntrySchema = z.object({ model: AgentModelSchema.optional(), memorySearch: MemorySearchSchema, humanDelay: HumanDelaySchema.optional(), + heartbeat: HeartbeatSchema, identity: IdentitySchema, groupChat: GroupChatSchema, subagents: z diff --git a/src/infra/heartbeat-runner.respects-ackmaxchars-heartbeat-acks.test.ts b/src/infra/heartbeat-runner.respects-ackmaxchars-heartbeat-acks.test.ts index 09a2154cb..b809416e2 100644 --- a/src/infra/heartbeat-runner.respects-ackmaxchars-heartbeat-acks.test.ts +++ b/src/infra/heartbeat-runner.respects-ackmaxchars-heartbeat-acks.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import * as replyModule from "../auto-reply/reply.js"; import type { ClawdbotConfig } from "../config/config.js"; +import { resolveMainSessionKey } from "../config/sessions.js"; import { runHeartbeatOnce } from "./heartbeat-runner.js"; // Avoid pulling optional runtime deps during isolated runs. @@ -15,22 +16,6 @@ describe("resolveHeartbeatIntervalMs", () => { const storePath = path.join(tmpDir, "sessions.json"); const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); try { - await fs.writeFile( - storePath, - JSON.stringify( - { - main: { - sessionId: "sid", - updatedAt: Date.now(), - lastProvider: "whatsapp", - lastTo: "+1555", - }, - }, - null, - 2, - ), - ); - const cfg: ClawdbotConfig = { agents: { defaults: { @@ -45,6 +30,23 @@ describe("resolveHeartbeatIntervalMs", () => { channels: { whatsapp: { allowFrom: ["*"] } }, session: { store: storePath }, }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastProvider: "whatsapp", + lastTo: "+1555", + }, + }, + null, + 2, + ), + ); replySpy.mockResolvedValue({ text: "HEARTBEAT_OK 🦞" }); const sendWhatsApp = vi.fn().mockResolvedValue({ @@ -75,22 +77,6 @@ describe("resolveHeartbeatIntervalMs", () => { const storePath = path.join(tmpDir, "sessions.json"); const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); try { - await fs.writeFile( - storePath, - JSON.stringify( - { - main: { - sessionId: "sid", - updatedAt: Date.now(), - lastProvider: "whatsapp", - lastTo: "+1555", - }, - }, - null, - 2, - ), - ); - const cfg: ClawdbotConfig = { agents: { defaults: { @@ -104,6 +90,23 @@ describe("resolveHeartbeatIntervalMs", () => { channels: { whatsapp: { allowFrom: ["*"] } }, session: { store: storePath }, }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastProvider: "whatsapp", + lastTo: "+1555", + }, + }, + null, + 2, + ), + ); replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); const sendWhatsApp = vi.fn().mockResolvedValue({ @@ -136,22 +139,6 @@ describe("resolveHeartbeatIntervalMs", () => { try { const originalUpdatedAt = 1000; const bumpedUpdatedAt = 2000; - await fs.writeFile( - storePath, - JSON.stringify( - { - main: { - sessionId: "sid", - updatedAt: originalUpdatedAt, - lastProvider: "whatsapp", - lastTo: "+1555", - }, - }, - null, - 2, - ), - ); - const cfg: ClawdbotConfig = { agents: { defaults: { @@ -165,12 +152,32 @@ describe("resolveHeartbeatIntervalMs", () => { channels: { whatsapp: { allowFrom: ["*"] } }, session: { store: storePath }, }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: originalUpdatedAt, + lastProvider: "whatsapp", + lastTo: "+1555", + }, + }, + null, + 2, + ), + ); replySpy.mockImplementationOnce(async () => { const raw = await fs.readFile(storePath, "utf-8"); - const parsed = JSON.parse(raw) as { main?: { updatedAt?: number } }; - if (parsed.main) { - parsed.main.updatedAt = bumpedUpdatedAt; + const parsed = JSON.parse(raw) as Record; + if (parsed[sessionKey]) { + parsed[sessionKey] = { + ...parsed[sessionKey], + updatedAt: bumpedUpdatedAt, + }; } await fs.writeFile(storePath, JSON.stringify(parsed, null, 2)); return { text: "" }; @@ -186,10 +193,11 @@ describe("resolveHeartbeatIntervalMs", () => { }, }); - const finalStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as { - main?: { updatedAt?: number }; - }; - expect(finalStore.main?.updatedAt).toBe(bumpedUpdatedAt); + const finalStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { updatedAt?: number } | undefined + >; + expect(finalStore[sessionKey]?.updatedAt).toBe(bumpedUpdatedAt); } finally { replySpy.mockRestore(); await fs.rm(tmpDir, { recursive: true, force: true }); @@ -201,11 +209,22 @@ describe("resolveHeartbeatIntervalMs", () => { const storePath = path.join(tmpDir, "sessions.json"); const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); try { + const cfg: ClawdbotConfig = { + agents: { + defaults: { + heartbeat: { every: "5m", target: "whatsapp", to: "+1555" }, + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + await fs.writeFile( storePath, JSON.stringify( { - main: { + [sessionKey]: { sessionId: "sid", updatedAt: Date.now(), lastProvider: "whatsapp", @@ -217,16 +236,6 @@ describe("resolveHeartbeatIntervalMs", () => { ), ); - const cfg: ClawdbotConfig = { - agents: { - defaults: { - heartbeat: { every: "5m", target: "whatsapp", to: "+1555" }, - }, - }, - channels: { whatsapp: { allowFrom: ["*"] } }, - session: { store: storePath }, - }; - replySpy.mockResolvedValue({ text: "Heartbeat alert" }); const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "m1", @@ -260,11 +269,22 @@ describe("resolveHeartbeatIntervalMs", () => { const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; process.env.TELEGRAM_BOT_TOKEN = ""; try { + const cfg: ClawdbotConfig = { + agents: { + defaults: { + heartbeat: { every: "5m", target: "telegram", to: "123456" }, + }, + }, + channels: { telegram: { botToken: "test-bot-token-123" } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + await fs.writeFile( storePath, JSON.stringify( { - main: { + [sessionKey]: { sessionId: "sid", updatedAt: Date.now(), lastProvider: "telegram", @@ -276,16 +296,6 @@ describe("resolveHeartbeatIntervalMs", () => { ), ); - const cfg: ClawdbotConfig = { - agents: { - defaults: { - heartbeat: { every: "5m", target: "telegram", to: "123456" }, - }, - }, - channels: { telegram: { botToken: "test-bot-token-123" } }, - session: { store: storePath }, - }; - replySpy.mockResolvedValue({ text: "Hello from heartbeat" }); const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", @@ -325,22 +335,6 @@ describe("resolveHeartbeatIntervalMs", () => { const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; process.env.TELEGRAM_BOT_TOKEN = ""; try { - await fs.writeFile( - storePath, - JSON.stringify( - { - main: { - sessionId: "sid", - updatedAt: Date.now(), - lastProvider: "telegram", - lastTo: "123456", - }, - }, - null, - 2, - ), - ); - const cfg: ClawdbotConfig = { agents: { defaults: { @@ -356,6 +350,23 @@ describe("resolveHeartbeatIntervalMs", () => { }, session: { store: storePath }, }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastProvider: "telegram", + lastTo: "123456", + }, + }, + null, + 2, + ), + ); replySpy.mockResolvedValue({ text: "Hello from heartbeat" }); const sendTelegram = vi.fn().mockResolvedValue({ diff --git a/src/infra/heartbeat-runner.returns-default-unset.test.ts b/src/infra/heartbeat-runner.returns-default-unset.test.ts index 40592757d..6d5ebe07f 100644 --- a/src/infra/heartbeat-runner.returns-default-unset.test.ts +++ b/src/infra/heartbeat-runner.returns-default-unset.test.ts @@ -7,6 +7,7 @@ import * as replyModule from "../auto-reply/reply.js"; import type { ClawdbotConfig } from "../config/config.js"; import { resolveAgentIdFromSessionKey, + resolveAgentMainSessionKey, resolveMainSessionKey, resolveStorePath, } from "../config/sessions.js"; @@ -55,6 +56,16 @@ describe("resolveHeartbeatIntervalMs", () => { }), ).toBe(2 * 60 * 60_000); }); + + it("uses explicit heartbeat overrides when provided", () => { + expect( + resolveHeartbeatIntervalMs( + { agents: { defaults: { heartbeat: { every: "30m" } } } }, + undefined, + { every: "5m" }, + ), + ).toBe(5 * 60_000); + }); }); describe("resolveHeartbeatPrompt", () => { @@ -183,6 +194,23 @@ describe("resolveHeartbeatDeliveryTarget", () => { to: "123", }); }); + + it("prefers per-agent heartbeat overrides when provided", () => { + const cfg: ClawdbotConfig = { + agents: { defaults: { heartbeat: { target: "telegram", to: "123" } } }, + }; + const heartbeat = { target: "whatsapp", to: "+1555" } as const; + expect( + resolveHeartbeatDeliveryTarget({ + cfg, + entry: { ...baseEntry, lastChannel: "whatsapp", lastTo: "+1999" }, + heartbeat, + }), + ).toEqual({ + channel: "whatsapp", + to: "+1555", + }); + }); }); describe("runHeartbeatOnce", () => { @@ -191,11 +219,22 @@ describe("runHeartbeatOnce", () => { const storePath = path.join(tmpDir, "sessions.json"); const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); try { + const cfg: ClawdbotConfig = { + agents: { + defaults: { + heartbeat: { every: "5m", target: "whatsapp", to: "+1555" }, + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + await fs.writeFile( storePath, JSON.stringify( { - main: { + [sessionKey]: { sessionId: "sid", updatedAt: Date.now(), lastChannel: "whatsapp", @@ -207,16 +246,6 @@ describe("runHeartbeatOnce", () => { ), ); - const cfg: ClawdbotConfig = { - agents: { - defaults: { - heartbeat: { every: "5m", target: "whatsapp", to: "+1555" }, - }, - }, - channels: { whatsapp: { allowFrom: ["*"] } }, - session: { store: storePath }, - }; - replySpy.mockResolvedValue([{ text: "Let me check..." }, { text: "Final alert" }]); const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "m1", @@ -242,6 +271,76 @@ describe("runHeartbeatOnce", () => { } }); + it("uses per-agent heartbeat overrides and session keys", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-")); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + try { + const cfg: ClawdbotConfig = { + agents: { + defaults: { + heartbeat: { every: "30m", prompt: "Default prompt" }, + }, + list: [ + { id: "main", default: true }, + { + id: "ops", + heartbeat: { every: "5m", target: "whatsapp", to: "+1555", prompt: "Ops check" }, + }, + ], + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + const sessionKey = resolveAgentMainSessionKey({ cfg, agentId: "ops" }); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastChannel: "whatsapp", + lastTo: "+1555", + }, + }, + null, + 2, + ), + ); + + replySpy.mockResolvedValue([{ text: "Final alert" }]); + const sendWhatsApp = vi.fn().mockResolvedValue({ + messageId: "m1", + toJid: "jid", + }); + + await runHeartbeatOnce({ + cfg, + agentId: "ops", + deps: { + sendWhatsApp, + getQueueSize: () => 0, + nowMs: () => 0, + webAuthExists: async () => true, + hasActiveWebListener: () => true, + }, + }); + + expect(sendWhatsApp).toHaveBeenCalledTimes(1); + expect(sendWhatsApp).toHaveBeenCalledWith("+1555", "Final alert", expect.any(Object)); + expect(replySpy).toHaveBeenCalledWith( + expect.objectContaining({ Body: "Ops check", SessionKey: sessionKey }), + { isHeartbeat: true }, + cfg, + ); + } finally { + replySpy.mockRestore(); + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); + it("suppresses duplicate heartbeat payloads within 24h", async () => { const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-")); const storePath = path.join(tmpDir, "sessions.json"); @@ -302,22 +401,6 @@ describe("runHeartbeatOnce", () => { const storePath = path.join(tmpDir, "sessions.json"); const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); try { - await fs.writeFile( - storePath, - JSON.stringify( - { - main: { - sessionId: "sid", - updatedAt: Date.now(), - lastProvider: "whatsapp", - lastTo: "+1555", - }, - }, - null, - 2, - ), - ); - const cfg: ClawdbotConfig = { agents: { defaults: { @@ -332,6 +415,23 @@ describe("runHeartbeatOnce", () => { channels: { whatsapp: { allowFrom: ["*"] } }, session: { store: storePath }, }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastProvider: "whatsapp", + lastTo: "+1555", + }, + }, + null, + 2, + ), + ); replySpy.mockResolvedValue([ { text: "Reasoning:\n_Because it helps_" }, @@ -372,22 +472,6 @@ describe("runHeartbeatOnce", () => { const storePath = path.join(tmpDir, "sessions.json"); const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); try { - await fs.writeFile( - storePath, - JSON.stringify( - { - main: { - sessionId: "sid", - updatedAt: Date.now(), - lastProvider: "whatsapp", - lastTo: "+1555", - }, - }, - null, - 2, - ), - ); - const cfg: ClawdbotConfig = { agents: { defaults: { @@ -402,6 +486,23 @@ describe("runHeartbeatOnce", () => { channels: { whatsapp: { allowFrom: ["*"] } }, session: { store: storePath }, }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastProvider: "whatsapp", + lastTo: "+1555", + }, + }, + null, + 2, + ), + ); replySpy.mockResolvedValue([ { text: "Reasoning:\n_Because it helps_" }, diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 0f03d773e..0ccee5f71 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -1,3 +1,4 @@ +import { resolveAgentConfig, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { resolveEffectiveMessagesConfig } from "../agents/identity.js"; import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS, @@ -14,20 +15,22 @@ import type { ClawdbotConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import { loadSessionStore, - resolveAgentIdFromSessionKey, - resolveMainSessionKey, + resolveAgentMainSessionKey, resolveStorePath, saveSessionStore, updateSessionStore, } from "../config/sessions.js"; +import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging.js"; import { getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import { normalizeAgentId } from "../routing/session-key.js"; import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js"; import { emitHeartbeatEvent } from "./heartbeat-events.js"; import { type HeartbeatRunResult, + type HeartbeatWakeHandler, requestHeartbeatNow, setHeartbeatWakeHandler, } from "./heartbeat-wake.js"; @@ -49,8 +52,48 @@ export function setHeartbeatsEnabled(enabled: boolean) { heartbeatsEnabled = enabled; } -export function resolveHeartbeatIntervalMs(cfg: ClawdbotConfig, overrideEvery?: string) { - const raw = overrideEvery ?? cfg.agents?.defaults?.heartbeat?.every ?? DEFAULT_HEARTBEAT_EVERY; +type HeartbeatConfig = AgentDefaultsConfig["heartbeat"]; +type HeartbeatAgent = { + agentId: string; + heartbeat?: HeartbeatConfig; +}; + +function resolveHeartbeatConfig( + cfg: ClawdbotConfig, + agentId?: string, +): HeartbeatConfig | undefined { + const defaults = cfg.agents?.defaults?.heartbeat; + if (!agentId) return defaults; + const overrides = resolveAgentConfig(cfg, agentId)?.heartbeat; + if (!defaults && !overrides) return overrides; + return { ...defaults, ...overrides }; +} + +function resolveHeartbeatAgents(cfg: ClawdbotConfig): HeartbeatAgent[] { + const list = cfg.agents?.list ?? []; + const explicit = list.filter((entry) => entry?.heartbeat); + if (explicit.length > 0) { + return explicit + .map((entry) => { + const id = normalizeAgentId(entry.id); + return { agentId: id, heartbeat: resolveHeartbeatConfig(cfg, id) }; + }) + .filter((entry) => entry.agentId); + } + const fallbackId = resolveDefaultAgentId(cfg); + return [{ agentId: fallbackId, heartbeat: resolveHeartbeatConfig(cfg, fallbackId) }]; +} + +export function resolveHeartbeatIntervalMs( + cfg: ClawdbotConfig, + overrideEvery?: string, + heartbeat?: HeartbeatConfig, +) { + const raw = + overrideEvery ?? + heartbeat?.every ?? + cfg.agents?.defaults?.heartbeat?.every ?? + DEFAULT_HEARTBEAT_EVERY; if (!raw) return null; const trimmed = String(raw).trim(); if (!trimmed) return null; @@ -64,23 +107,31 @@ export function resolveHeartbeatIntervalMs(cfg: ClawdbotConfig, overrideEvery?: return ms; } -export function resolveHeartbeatPrompt(cfg: ClawdbotConfig) { - return resolveHeartbeatPromptText(cfg.agents?.defaults?.heartbeat?.prompt); -} - -function resolveHeartbeatAckMaxChars(cfg: ClawdbotConfig) { - return Math.max( - 0, - cfg.agents?.defaults?.heartbeat?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS, +export function resolveHeartbeatPrompt(cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig) { + return resolveHeartbeatPromptText( + heartbeat?.prompt ?? cfg.agents?.defaults?.heartbeat?.prompt, ); } -function resolveHeartbeatSession(cfg: ClawdbotConfig) { +function resolveHeartbeatAckMaxChars(cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig) { + return Math.max( + 0, + heartbeat?.ackMaxChars ?? + cfg.agents?.defaults?.heartbeat?.ackMaxChars ?? + DEFAULT_HEARTBEAT_ACK_MAX_CHARS, + ); +} + +function resolveHeartbeatSession(cfg: ClawdbotConfig, agentId?: string) { const sessionCfg = cfg.session; const scope = sessionCfg?.scope ?? "per-sender"; - const sessionKey = scope === "global" ? "global" : resolveMainSessionKey(cfg); - const agentId = resolveAgentIdFromSessionKey(sessionKey); - const storePath = resolveStorePath(sessionCfg?.store, { agentId }); + const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg)); + const sessionKey = + scope === "global" + ? "global" + : resolveAgentMainSessionKey({ cfg, agentId: resolvedAgentId }); + const storeAgentId = scope === "global" ? resolveDefaultAgentId(cfg) : resolvedAgentId; + const storePath = resolveStorePath(sessionCfg?.store, { agentId: storeAgentId }); const store = loadSessionStore(storePath); const entry = store[sessionKey]; return { sessionKey, storePath, store, entry }; @@ -186,14 +237,18 @@ function normalizeHeartbeatReply( export async function runHeartbeatOnce(opts: { cfg?: ClawdbotConfig; + agentId?: string; + heartbeat?: HeartbeatConfig; reason?: string; deps?: HeartbeatDeps; }): Promise { const cfg = opts.cfg ?? loadConfig(); + const agentId = normalizeAgentId(opts.agentId ?? resolveDefaultAgentId(cfg)); + const heartbeat = opts.heartbeat ?? resolveHeartbeatConfig(cfg, agentId); if (!heartbeatsEnabled) { return { status: "skipped", reason: "disabled" }; } - if (!resolveHeartbeatIntervalMs(cfg)) { + if (!resolveHeartbeatIntervalMs(cfg, undefined, heartbeat)) { return { status: "skipped", reason: "disabled" }; } @@ -203,9 +258,9 @@ export async function runHeartbeatOnce(opts: { } const startedAt = opts.deps?.nowMs?.() ?? Date.now(); - const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg); + const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId); const previousUpdatedAt = entry?.updatedAt; - const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry }); + const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat }); const lastChannel = entry?.lastChannel && entry.lastChannel !== INTERNAL_MESSAGE_CHANNEL ? normalizeChannelId(entry.lastChannel) @@ -222,18 +277,19 @@ export async function runHeartbeatOnce(opts: { lastTo: entry?.lastTo, provider: senderProvider, }); - const prompt = resolveHeartbeatPrompt(cfg); + const prompt = resolveHeartbeatPrompt(cfg, heartbeat); const ctx = { Body: prompt, From: sender, To: sender, Provider: "heartbeat", + SessionKey: sessionKey, }; try { const replyResult = await getReplyFromConfig(ctx, { isHeartbeat: true }, cfg); const replyPayload = resolveHeartbeatReplyPayload(replyResult); - const includeReasoning = cfg.agents?.defaults?.heartbeat?.includeReasoning === true; + const includeReasoning = heartbeat?.includeReasoning === true; const reasoningPayloads = includeReasoning ? resolveHeartbeatReasoningPayloads(replyResult).filter((payload) => payload !== replyPayload) : []; @@ -255,10 +311,10 @@ export async function runHeartbeatOnce(opts: { return { status: "ran", durationMs: Date.now() - startedAt }; } - const ackMaxChars = resolveHeartbeatAckMaxChars(cfg); + const ackMaxChars = resolveHeartbeatAckMaxChars(cfg, heartbeat); const normalized = normalizeHeartbeatReply( replyPayload, - resolveEffectiveMessagesConfig(cfg, resolveAgentIdFromSessionKey(sessionKey)).responsePrefix, + resolveEffectiveMessagesConfig(cfg, agentId).responsePrefix, ackMaxChars, ); const shouldSkipMain = normalized.shouldSkip && !normalized.hasMedia; @@ -409,19 +465,57 @@ export function startHeartbeatRunner(opts: { abortSignal?: AbortSignal; }) { const cfg = opts.cfg ?? loadConfig(); - const intervalMs = resolveHeartbeatIntervalMs(cfg); + const heartbeatAgents = resolveHeartbeatAgents(cfg); + const intervals = heartbeatAgents + .map((agent) => resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat)) + .filter((value): value is number => typeof value === "number"); + const intervalMs = intervals.length > 0 ? Math.min(...intervals) : null; if (!intervalMs) { log.info("heartbeat: disabled", { enabled: false }); } const runtime = opts.runtime ?? defaultRuntime; - const run = async (params?: { reason?: string }) => { - const res = await runHeartbeatOnce({ - cfg, - reason: params?.reason, - deps: { runtime }, - }); - return res; + const lastRunByAgent = new Map(); + const run: HeartbeatWakeHandler = async (params) => { + if (!heartbeatsEnabled) { + return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult; + } + if (heartbeatAgents.length === 0) { + return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult; + } + + const reason = params?.reason; + const isInterval = reason === "interval"; + const startedAt = Date.now(); + const now = startedAt; + let ran = false; + + for (const agent of heartbeatAgents) { + const agentIntervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat); + if (!agentIntervalMs) continue; + const lastRun = lastRunByAgent.get(agent.agentId); + if (isInterval && typeof lastRun === "number" && now - lastRun < agentIntervalMs) { + continue; + } + + const res = await runHeartbeatOnce({ + cfg, + agentId: agent.agentId, + heartbeat: agent.heartbeat, + reason, + deps: { runtime }, + }); + if (res.status === "skipped" && res.reason === "requests-in-flight") { + return res; + } + if (res.status !== "skipped" || res.reason !== "disabled") { + lastRunByAgent.set(agent.agentId, now); + } + if (res.status === "ran") ran = true; + } + + if (ran) return { status: "ran", durationMs: Date.now() - startedAt }; + return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; }; setHeartbeatWakeHandler(async (params) => run({ reason: params.reason })); diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index e3de6b39f..9235079ba 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -2,6 +2,7 @@ import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/ind import type { ChannelId, ChannelOutboundTargetMode } from "../../channels/plugins/types.js"; import type { ClawdbotConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions.js"; +import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js"; import type { DeliverableMessageChannel, GatewayMessageChannel, @@ -79,9 +80,11 @@ export function resolveOutboundTarget(params: { export function resolveHeartbeatDeliveryTarget(params: { cfg: ClawdbotConfig; entry?: SessionEntry; + heartbeat?: AgentDefaultsConfig["heartbeat"]; }): OutboundTarget { const { cfg, entry } = params; - const rawTarget = cfg.agents?.defaults?.heartbeat?.target; + const heartbeat = params.heartbeat ?? cfg.agents?.defaults?.heartbeat; + const rawTarget = heartbeat?.target; let target: HeartbeatTarget = "last"; if (rawTarget === "none" || rawTarget === "last") { target = rawTarget; @@ -95,10 +98,7 @@ export function resolveHeartbeatDeliveryTarget(params: { } const explicitTo = - typeof cfg.agents?.defaults?.heartbeat?.to === "string" && - cfg.agents.defaults.heartbeat.to.trim() - ? cfg.agents.defaults.heartbeat.to.trim() - : undefined; + typeof heartbeat?.to === "string" && heartbeat.to.trim() ? heartbeat.to.trim() : undefined; const lastChannel = entry?.lastChannel && entry.lastChannel !== INTERNAL_MESSAGE_CHANNEL