import { resolveAgentConfig, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { resolveEffectiveMessagesConfig } from "../agents/identity.js"; import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS, DEFAULT_HEARTBEAT_EVERY, resolveHeartbeatPrompt as resolveHeartbeatPromptText, stripHeartbeatToken, } from "../auto-reply/heartbeat.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { getChannelPlugin } from "../channels/plugins/index.js"; import type { ChannelHeartbeatDeps } from "../channels/plugins/types.js"; import { parseDurationMs } from "../cli/parse-duration.js"; import type { ClawdbotConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import { loadSessionStore, resolveAgentMainSessionKey, resolveStorePath, saveSessionStore, updateSessionStore, } from "../config/sessions.js"; import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { getQueueSize } from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeAgentId } from "../routing/session-key.js"; import { emitHeartbeatEvent } from "./heartbeat-events.js"; import { type HeartbeatRunResult, type HeartbeatWakeHandler, requestHeartbeatNow, setHeartbeatWakeHandler, } from "./heartbeat-wake.js"; import type { OutboundSendDeps } from "./outbound/deliver.js"; import { deliverOutboundPayloads } from "./outbound/deliver.js"; import { resolveHeartbeatDeliveryTarget } from "./outbound/targets.js"; type HeartbeatDeps = OutboundSendDeps & ChannelHeartbeatDeps & { runtime?: RuntimeEnv; getQueueSize?: (lane?: string) => number; nowMs?: () => number; }; const log = createSubsystemLogger("gateway/heartbeat"); let heartbeatsEnabled = true; export function setHeartbeatsEnabled(enabled: boolean) { heartbeatsEnabled = enabled; } type HeartbeatConfig = AgentDefaultsConfig["heartbeat"]; type HeartbeatAgent = { agentId: string; heartbeat?: HeartbeatConfig; }; export type HeartbeatSummary = { enabled: boolean; every: string; everyMs: number | null; prompt: string; target: string; model?: string; ackMaxChars: number; }; const DEFAULT_HEARTBEAT_TARGET = "last"; type HeartbeatAgentState = { agentId: string; heartbeat?: HeartbeatConfig; intervalMs: number; lastRunMs?: number; nextDueMs: number; }; export type HeartbeatRunner = { stop: () => void; updateConfig: (cfg: ClawdbotConfig) => void; }; function hasExplicitHeartbeatAgents(cfg: ClawdbotConfig) { const list = cfg.agents?.list ?? []; return list.some((entry) => Boolean(entry?.heartbeat)); } export function isHeartbeatEnabledForAgent(cfg: ClawdbotConfig, agentId?: string): boolean { const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg)); const list = cfg.agents?.list ?? []; const hasExplicit = hasExplicitHeartbeatAgents(cfg); if (hasExplicit) { return list.some( (entry) => Boolean(entry?.heartbeat) && normalizeAgentId(entry?.id) === resolvedAgentId, ); } return resolvedAgentId === resolveDefaultAgentId(cfg); } function resolveHeartbeatConfig( cfg: ClawdbotConfig, agentId?: string, ): HeartbeatConfig | undefined { const defaults = cfg.agents?.defaults?.heartbeat; if (!agentId) return defaults; const overrides = resolveAgentConfig(cfg, agentId)?.heartbeat; if (!defaults && !overrides) return overrides; return { ...defaults, ...overrides }; } export function resolveHeartbeatSummaryForAgent( cfg: ClawdbotConfig, agentId?: string, ): HeartbeatSummary { const defaults = cfg.agents?.defaults?.heartbeat; const overrides = agentId ? resolveAgentConfig(cfg, agentId)?.heartbeat : undefined; const enabled = isHeartbeatEnabledForAgent(cfg, agentId); if (!enabled) { return { enabled: false, every: "disabled", everyMs: null, prompt: resolveHeartbeatPromptText(defaults?.prompt), target: defaults?.target ?? DEFAULT_HEARTBEAT_TARGET, model: defaults?.model, ackMaxChars: Math.max(0, defaults?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS), }; } const merged = defaults || overrides ? { ...defaults, ...overrides } : undefined; const every = merged?.every ?? defaults?.every ?? overrides?.every ?? DEFAULT_HEARTBEAT_EVERY; const everyMs = resolveHeartbeatIntervalMs(cfg, undefined, merged); const prompt = resolveHeartbeatPromptText( merged?.prompt ?? defaults?.prompt ?? overrides?.prompt, ); const target = merged?.target ?? defaults?.target ?? overrides?.target ?? DEFAULT_HEARTBEAT_TARGET; const model = merged?.model ?? defaults?.model ?? overrides?.model; const ackMaxChars = Math.max( 0, merged?.ackMaxChars ?? defaults?.ackMaxChars ?? overrides?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS, ); return { enabled: true, every, everyMs, prompt, target, model, ackMaxChars, }; } function resolveHeartbeatAgents(cfg: ClawdbotConfig): HeartbeatAgent[] { const list = cfg.agents?.list ?? []; if (hasExplicitHeartbeatAgents(cfg)) { return list .filter((entry) => entry?.heartbeat) .map((entry) => { const id = normalizeAgentId(entry.id); return { agentId: id, heartbeat: resolveHeartbeatConfig(cfg, id) }; }) .filter((entry) => entry.agentId); } const fallbackId = resolveDefaultAgentId(cfg); return [{ agentId: fallbackId, heartbeat: resolveHeartbeatConfig(cfg, fallbackId) }]; } export function resolveHeartbeatIntervalMs( cfg: ClawdbotConfig, overrideEvery?: string, heartbeat?: HeartbeatConfig, ) { const raw = overrideEvery ?? heartbeat?.every ?? cfg.agents?.defaults?.heartbeat?.every ?? DEFAULT_HEARTBEAT_EVERY; if (!raw) return null; const trimmed = String(raw).trim(); if (!trimmed) return null; let ms: number; try { ms = parseDurationMs(trimmed, { defaultUnit: "m" }); } catch { return null; } if (ms <= 0) return null; return ms; } export function resolveHeartbeatPrompt(cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig) { return resolveHeartbeatPromptText(heartbeat?.prompt ?? cfg.agents?.defaults?.heartbeat?.prompt); } function resolveHeartbeatAckMaxChars(cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig) { return Math.max( 0, heartbeat?.ackMaxChars ?? cfg.agents?.defaults?.heartbeat?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS, ); } function resolveHeartbeatSession(cfg: ClawdbotConfig, agentId?: string) { const sessionCfg = cfg.session; const scope = sessionCfg?.scope ?? "per-sender"; const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg)); const sessionKey = scope === "global" ? "global" : resolveAgentMainSessionKey({ cfg, agentId: resolvedAgentId }); const storeAgentId = scope === "global" ? resolveDefaultAgentId(cfg) : resolvedAgentId; const storePath = resolveStorePath(sessionCfg?.store, { agentId: storeAgentId }); const store = loadSessionStore(storePath); const entry = store[sessionKey]; return { sessionKey, storePath, store, entry }; } function resolveHeartbeatReplyPayload( replyResult: ReplyPayload | ReplyPayload[] | undefined, ): ReplyPayload | undefined { if (!replyResult) return undefined; if (!Array.isArray(replyResult)) return replyResult; for (let idx = replyResult.length - 1; idx >= 0; idx -= 1) { const payload = replyResult[idx]; if (!payload) continue; if (payload.text || payload.mediaUrl || (payload.mediaUrls && payload.mediaUrls.length > 0)) { return payload; } } return undefined; } function resolveHeartbeatReasoningPayloads( replyResult: ReplyPayload | ReplyPayload[] | undefined, ): ReplyPayload[] { const payloads = Array.isArray(replyResult) ? replyResult : replyResult ? [replyResult] : []; return payloads.filter((payload) => { const text = typeof payload.text === "string" ? payload.text : ""; return text.trimStart().startsWith("Reasoning:"); }); } function resolveHeartbeatSender(params: { allowFrom: Array; lastTo?: string; provider?: string | null; }) { const { allowFrom, lastTo, provider } = params; const candidates = [ lastTo?.trim(), provider && lastTo ? `${provider}:${lastTo}` : undefined, ].filter((val): val is string => Boolean(val?.trim())); const allowList = allowFrom .map((entry) => String(entry)) .filter((entry) => entry && entry !== "*"); if (allowFrom.includes("*")) { return candidates[0] ?? "heartbeat"; } if (candidates.length > 0 && allowList.length > 0) { const matched = candidates.find((candidate) => allowList.includes(candidate)); if (matched) return matched; } if (candidates.length > 0 && allowList.length === 0) { return candidates[0]; } if (allowList.length > 0) return allowList[0]; return candidates[0] ?? "heartbeat"; } async function restoreHeartbeatUpdatedAt(params: { storePath: string; sessionKey: string; updatedAt?: number; }) { const { storePath, sessionKey, updatedAt } = params; if (typeof updatedAt !== "number") return; const store = loadSessionStore(storePath); const entry = store[sessionKey]; if (!entry) return; const nextUpdatedAt = Math.max(entry.updatedAt ?? 0, updatedAt); if (entry.updatedAt === nextUpdatedAt) return; await updateSessionStore(storePath, (nextStore) => { const nextEntry = nextStore[sessionKey] ?? entry; if (!nextEntry) return; const resolvedUpdatedAt = Math.max(nextEntry.updatedAt ?? 0, updatedAt); if (nextEntry.updatedAt === resolvedUpdatedAt) return; nextStore[sessionKey] = { ...nextEntry, updatedAt: resolvedUpdatedAt }; }); } function normalizeHeartbeatReply( payload: ReplyPayload, responsePrefix: string | undefined, ackMaxChars: number, ) { const stripped = stripHeartbeatToken(payload.text, { mode: "heartbeat", maxAckChars: ackMaxChars, }); const hasMedia = Boolean(payload.mediaUrl || (payload.mediaUrls?.length ?? 0) > 0); if (stripped.shouldSkip && !hasMedia) { return { shouldSkip: true, text: "", hasMedia, }; } let finalText = stripped.text; if (responsePrefix && finalText && !finalText.startsWith(responsePrefix)) { finalText = `${responsePrefix} ${finalText}`; } return { shouldSkip: false, text: finalText, hasMedia }; } export async function runHeartbeatOnce(opts: { cfg?: ClawdbotConfig; agentId?: string; heartbeat?: HeartbeatConfig; reason?: string; deps?: HeartbeatDeps; }): Promise { const cfg = opts.cfg ?? loadConfig(); const agentId = normalizeAgentId(opts.agentId ?? resolveDefaultAgentId(cfg)); const heartbeat = opts.heartbeat ?? resolveHeartbeatConfig(cfg, agentId); if (!heartbeatsEnabled) { return { status: "skipped", reason: "disabled" }; } if (!isHeartbeatEnabledForAgent(cfg, agentId)) { return { status: "skipped", reason: "disabled" }; } if (!resolveHeartbeatIntervalMs(cfg, undefined, heartbeat)) { return { status: "skipped", reason: "disabled" }; } const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)(CommandLane.Main); if (queueSize > 0) { return { status: "skipped", reason: "requests-in-flight" }; } const startedAt = opts.deps?.nowMs?.() ?? Date.now(); const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId); const previousUpdatedAt = entry?.updatedAt; const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat }); const lastChannel = delivery.lastChannel; const lastAccountId = delivery.lastAccountId; const senderProvider = delivery.channel !== "none" ? delivery.channel : lastChannel; const senderAllowFrom = senderProvider ? (getChannelPlugin(senderProvider)?.config.resolveAllowFrom?.({ cfg, accountId: senderProvider === lastChannel ? lastAccountId : undefined, }) ?? []) : []; const sender = resolveHeartbeatSender({ allowFrom: senderAllowFrom, lastTo: entry?.lastTo, provider: senderProvider, }); const prompt = resolveHeartbeatPrompt(cfg, heartbeat); const ctx = { Body: prompt, From: sender, To: sender, Provider: "heartbeat", SessionKey: sessionKey, }; try { const replyResult = await getReplyFromConfig(ctx, { isHeartbeat: true }, cfg); const replyPayload = resolveHeartbeatReplyPayload(replyResult); const includeReasoning = heartbeat?.includeReasoning === true; const reasoningPayloads = includeReasoning ? resolveHeartbeatReasoningPayloads(replyResult).filter((payload) => payload !== replyPayload) : []; if ( !replyPayload || (!replyPayload.text && !replyPayload.mediaUrl && !replyPayload.mediaUrls?.length) ) { await restoreHeartbeatUpdatedAt({ storePath, sessionKey, updatedAt: previousUpdatedAt, }); emitHeartbeatEvent({ status: "ok-empty", reason: opts.reason, durationMs: Date.now() - startedAt, }); return { status: "ran", durationMs: Date.now() - startedAt }; } const ackMaxChars = resolveHeartbeatAckMaxChars(cfg, heartbeat); const normalized = normalizeHeartbeatReply( replyPayload, resolveEffectiveMessagesConfig(cfg, agentId).responsePrefix, ackMaxChars, ); const shouldSkipMain = normalized.shouldSkip && !normalized.hasMedia; if (shouldSkipMain && reasoningPayloads.length === 0) { await restoreHeartbeatUpdatedAt({ storePath, sessionKey, updatedAt: previousUpdatedAt, }); emitHeartbeatEvent({ status: "ok-token", reason: opts.reason, durationMs: Date.now() - startedAt, }); return { status: "ran", durationMs: Date.now() - startedAt }; } const mediaUrls = replyPayload.mediaUrls ?? (replyPayload.mediaUrl ? [replyPayload.mediaUrl] : []); // Suppress duplicate heartbeats (same payload) within a short window. // This prevents "nagging" when nothing changed but the model repeats the same items. const prevHeartbeatText = typeof entry?.lastHeartbeatText === "string" ? entry.lastHeartbeatText : ""; const prevHeartbeatAt = typeof entry?.lastHeartbeatSentAt === "number" ? entry.lastHeartbeatSentAt : undefined; const isDuplicateMain = !shouldSkipMain && !mediaUrls.length && Boolean(prevHeartbeatText.trim()) && normalized.text.trim() === prevHeartbeatText.trim() && typeof prevHeartbeatAt === "number" && startedAt - prevHeartbeatAt < 24 * 60 * 60 * 1000; if (isDuplicateMain) { await restoreHeartbeatUpdatedAt({ storePath, sessionKey, updatedAt: previousUpdatedAt, }); emitHeartbeatEvent({ status: "skipped", reason: "duplicate", preview: normalized.text.slice(0, 200), durationMs: Date.now() - startedAt, hasMedia: false, }); return { status: "ran", durationMs: Date.now() - startedAt }; } // Reasoning payloads are text-only; any attachments stay on the main reply. const previewText = shouldSkipMain ? reasoningPayloads .map((payload) => payload.text) .filter((text): text is string => Boolean(text?.trim())) .join("\n") : normalized.text; if (delivery.channel === "none" || !delivery.to) { emitHeartbeatEvent({ status: "skipped", reason: delivery.reason ?? "no-target", preview: previewText?.slice(0, 200), durationMs: Date.now() - startedAt, hasMedia: mediaUrls.length > 0, }); return { status: "ran", durationMs: Date.now() - startedAt }; } const deliveryAccountId = delivery.accountId; const heartbeatPlugin = getChannelPlugin(delivery.channel); if (heartbeatPlugin?.heartbeat?.checkReady) { const readiness = await heartbeatPlugin.heartbeat.checkReady({ cfg, accountId: deliveryAccountId, deps: opts.deps, }); if (!readiness.ok) { emitHeartbeatEvent({ status: "skipped", reason: readiness.reason, preview: previewText?.slice(0, 200), durationMs: Date.now() - startedAt, hasMedia: mediaUrls.length > 0, }); log.info("heartbeat: channel not ready", { channel: delivery.channel, reason: readiness.reason, }); return { status: "skipped", reason: readiness.reason }; } } await deliverOutboundPayloads({ cfg, channel: delivery.channel, to: delivery.to, accountId: deliveryAccountId, payloads: [ ...reasoningPayloads, ...(shouldSkipMain ? [] : [ { text: normalized.text, mediaUrls, }, ]), ], deps: opts.deps, }); // Record last delivered heartbeat payload for dedupe. if (!shouldSkipMain && normalized.text.trim()) { const store = loadSessionStore(storePath); const current = store[sessionKey]; if (current) { store[sessionKey] = { ...current, lastHeartbeatText: normalized.text, lastHeartbeatSentAt: startedAt, }; await saveSessionStore(storePath, store); } } emitHeartbeatEvent({ status: "sent", to: delivery.to, preview: previewText?.slice(0, 200), durationMs: Date.now() - startedAt, hasMedia: mediaUrls.length > 0, }); return { status: "ran", durationMs: Date.now() - startedAt }; } catch (err) { const reason = formatErrorMessage(err); emitHeartbeatEvent({ status: "failed", reason, durationMs: Date.now() - startedAt, }); log.error(`heartbeat failed: ${reason}`, { error: reason }); return { status: "failed", reason }; } } export function startHeartbeatRunner(opts: { cfg?: ClawdbotConfig; runtime?: RuntimeEnv; abortSignal?: AbortSignal; runOnce?: typeof runHeartbeatOnce; }): HeartbeatRunner { const runtime = opts.runtime ?? defaultRuntime; const runOnce = opts.runOnce ?? runHeartbeatOnce; const state = { cfg: opts.cfg ?? loadConfig(), runtime, agents: new Map(), timer: null as NodeJS.Timeout | null, stopped: false, }; let initialized = false; const resolveNextDue = (now: number, intervalMs: number, prevState?: HeartbeatAgentState) => { if (typeof prevState?.lastRunMs === "number") { return prevState.lastRunMs + intervalMs; } if (prevState && prevState.intervalMs === intervalMs && prevState.nextDueMs > now) { return prevState.nextDueMs; } return now + intervalMs; }; const scheduleNext = () => { if (state.stopped) return; if (state.timer) { clearTimeout(state.timer); state.timer = null; } if (state.agents.size === 0) return; const now = Date.now(); let nextDue = Number.POSITIVE_INFINITY; for (const agent of state.agents.values()) { if (agent.nextDueMs < nextDue) nextDue = agent.nextDueMs; } if (!Number.isFinite(nextDue)) return; const delay = Math.max(0, nextDue - now); state.timer = setTimeout(() => { requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); }, delay); state.timer.unref?.(); }; const updateConfig = (cfg: ClawdbotConfig) => { if (state.stopped) return; const now = Date.now(); const prevAgents = state.agents; const prevEnabled = prevAgents.size > 0; const nextAgents = new Map(); const intervals: number[] = []; for (const agent of resolveHeartbeatAgents(cfg)) { const intervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat); if (!intervalMs) continue; intervals.push(intervalMs); const prevState = prevAgents.get(agent.agentId); const nextDueMs = resolveNextDue(now, intervalMs, prevState); nextAgents.set(agent.agentId, { agentId: agent.agentId, heartbeat: agent.heartbeat, intervalMs, lastRunMs: prevState?.lastRunMs, nextDueMs, }); } state.cfg = cfg; state.agents = nextAgents; const nextEnabled = nextAgents.size > 0; if (!initialized) { if (!nextEnabled) { log.info("heartbeat: disabled", { enabled: false }); } else { log.info("heartbeat: started", { intervalMs: Math.min(...intervals) }); } initialized = true; } else if (prevEnabled !== nextEnabled) { if (!nextEnabled) { log.info("heartbeat: disabled", { enabled: false }); } else { log.info("heartbeat: started", { intervalMs: Math.min(...intervals) }); } } scheduleNext(); }; const run: HeartbeatWakeHandler = async (params) => { if (!heartbeatsEnabled) { return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult; } if (state.agents.size === 0) { return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult; } const reason = params?.reason; const isInterval = reason === "interval"; const startedAt = Date.now(); const now = startedAt; let ran = false; for (const agent of state.agents.values()) { if (isInterval && now < agent.nextDueMs) { continue; } const res = await runOnce({ cfg: state.cfg, agentId: agent.agentId, heartbeat: agent.heartbeat, reason, deps: { runtime: state.runtime }, }); if (res.status === "skipped" && res.reason === "requests-in-flight") { return res; } if (res.status !== "skipped" || res.reason !== "disabled") { agent.lastRunMs = now; agent.nextDueMs = now + agent.intervalMs; } if (res.status === "ran") ran = true; } scheduleNext(); if (ran) return { status: "ran", durationMs: Date.now() - startedAt }; return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; }; setHeartbeatWakeHandler(async (params) => run({ reason: params.reason })); updateConfig(state.cfg); const cleanup = () => { state.stopped = true; setHeartbeatWakeHandler(null); if (state.timer) clearTimeout(state.timer); state.timer = null; }; opts.abortSignal?.addEventListener("abort", cleanup, { once: true }); return { stop: cleanup, updateConfig }; }