diff --git a/CHANGELOG.md b/CHANGELOG.md index ace8110ca..d9bf627f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ Docs: https://docs.clawd.bot - Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk. - Doctor: clarify plugin auto-enable hint text in the startup banner. - Gateway: clarify unauthorized handshake responses with token/password mismatch guidance. +- Gateway: reschedule per-agent heartbeats on config hot reload without restarting the runner. - UI: keep config form enums typed, preserve empty strings, protect sensitive defaults, and deepen config search. (#1315) — thanks @MaudeBot. - UI: preserve ordered list numbering in chat markdown. (#1341) — thanks @bradleypriest. - Web search: infer Perplexity base URL from API key source (direct vs OpenRouter). diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 4e3c0e3c1..3bd40f118 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -3,6 +3,7 @@ import type { WebSocketServer } from "ws"; import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js"; import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import { stopGmailWatcher } from "../hooks/gmail-watcher.js"; +import type { HeartbeatRunner } from "../infra/heartbeat-runner.js"; import type { PluginServicesHandle } from "../plugins/services.js"; export function createGatewayCloseHandler(params: { @@ -13,7 +14,7 @@ export function createGatewayCloseHandler(params: { stopChannel: (name: ChannelId, accountId?: string) => Promise; pluginServices: PluginServicesHandle | null; cron: { stop: () => void }; - heartbeatRunner: { stop: () => void }; + heartbeatRunner: HeartbeatRunner; nodePresenceTimers: Map>; broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void; tickInterval: ReturnType; diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 9d18af6cd..6cea5ac02 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -1,7 +1,7 @@ import type { CliDeps } from "../cli/deps.js"; import type { loadConfig } from "../config/config.js"; import { startGmailWatcher, stopGmailWatcher } from "../hooks/gmail-watcher.js"; -import { startHeartbeatRunner } from "../infra/heartbeat-runner.js"; +import type { HeartbeatRunner } from "../infra/heartbeat-runner.js"; import { resetDirectoryCache } from "../infra/outbound/target-resolver.js"; import { authorizeGatewaySigusr1Restart, @@ -18,7 +18,7 @@ import { buildGatewayCronService, type GatewayCronState } from "./server-cron.js type GatewayHotReloadState = { hooksConfig: ReturnType; - heartbeatRunner: { stop: () => void }; + heartbeatRunner: HeartbeatRunner; cronState: GatewayCronState; browserControl: Awaited> | null; }; @@ -57,8 +57,7 @@ export function createGatewayReloadHandlers(params: { } if (plan.restartHeartbeat) { - state.heartbeatRunner.stop(); - nextState.heartbeatRunner = startHeartbeatRunner({ cfg: nextConfig }); + nextState.heartbeatRunner.updateConfig(nextConfig); } resetDirectoryCache(); diff --git a/src/gateway/server.reload.test.ts b/src/gateway/server.reload.test.ts index 83aca6eb2..a7db2dfbf 100644 --- a/src/gateway/server.reload.test.ts +++ b/src/gateway/server.reload.test.ts @@ -21,7 +21,11 @@ const hoisted = vi.hoisted(() => { })); const heartbeatStop = vi.fn(); - const startHeartbeatRunner = vi.fn(() => ({ stop: heartbeatStop })); + const heartbeatUpdateConfig = vi.fn(); + const startHeartbeatRunner = vi.fn(() => ({ + stop: heartbeatStop, + updateConfig: heartbeatUpdateConfig, + })); const startGmailWatcher = vi.fn(async () => ({ started: true })); const stopGmailWatcher = vi.fn(async () => {}); @@ -116,6 +120,7 @@ const hoisted = vi.hoisted(() => { browserStop, startBrowserControlServerIfEnabled, heartbeatStop, + heartbeatUpdateConfig, startHeartbeatRunner, startGmailWatcher, stopGmailWatcher, @@ -237,8 +242,9 @@ describe("gateway hot reload", () => { expect(hoisted.browserStop).toHaveBeenCalledTimes(1); expect(hoisted.startBrowserControlServerIfEnabled).toHaveBeenCalledTimes(2); - expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(2); - expect(hoisted.heartbeatStop).toHaveBeenCalledTimes(1); + expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1); + expect(hoisted.heartbeatUpdateConfig).toHaveBeenCalledTimes(1); + expect(hoisted.heartbeatUpdateConfig).toHaveBeenCalledWith(nextConfig); expect(hoisted.cronInstances.length).toBe(2); expect(hoisted.cronInstances[0].stop).toHaveBeenCalledTimes(1); diff --git a/src/infra/heartbeat-runner.scheduler.test.ts b/src/infra/heartbeat-runner.scheduler.test.ts new file mode 100644 index 000000000..fe34de6ca --- /dev/null +++ b/src/infra/heartbeat-runner.scheduler.test.ts @@ -0,0 +1,57 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { ClawdbotConfig } from "../config/config.js"; +import { startHeartbeatRunner } from "./heartbeat-runner.js"; + +describe("startHeartbeatRunner", () => { + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("updates scheduling when config changes without restart", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(0)); + + const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + + const runner = startHeartbeatRunner({ + cfg: { + agents: { defaults: { heartbeat: { every: "30m" } } }, + } as ClawdbotConfig, + runOnce: runSpy, + }); + + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + + expect(runSpy).toHaveBeenCalledTimes(1); + expect(runSpy.mock.calls[0]?.[0]).toEqual( + expect.objectContaining({ agentId: "main", reason: "interval" }), + ); + + runner.updateConfig({ + agents: { + defaults: { heartbeat: { every: "30m" } }, + list: [ + { id: "main", heartbeat: { every: "10m" } }, + { id: "ops", heartbeat: { every: "15m" } }, + ], + }, + } as ClawdbotConfig); + + await vi.advanceTimersByTimeAsync(10 * 60_000 + 1_000); + + expect(runSpy).toHaveBeenCalledTimes(2); + expect(runSpy.mock.calls[1]?.[0]).toEqual( + expect.objectContaining({ agentId: "main", heartbeat: { every: "10m" } }), + ); + + await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000); + + expect(runSpy).toHaveBeenCalledTimes(3); + expect(runSpy.mock.calls[2]?.[0]).toEqual( + expect.objectContaining({ agentId: "ops", heartbeat: { every: "15m" } }), + ); + + runner.stop(); + }); +}); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 25ba6036b..820cbdf5c 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -70,6 +70,19 @@ export type HeartbeatSummary = { const DEFAULT_HEARTBEAT_TARGET = "last"; +type HeartbeatAgentState = { + agentId: string; + heartbeat?: HeartbeatConfig; + intervalMs: number; + lastRunMs?: number; + nextDueMs: number; +}; + +export type HeartbeatRunner = { + stop: () => void; + updateConfig: (cfg: ClawdbotConfig) => void; +}; + function hasExplicitHeartbeatAgents(cfg: ClawdbotConfig) { const list = cfg.agents?.list ?? []; return list.some((entry) => Boolean(entry?.heartbeat)); @@ -539,24 +552,97 @@ export function startHeartbeatRunner(opts: { cfg?: ClawdbotConfig; runtime?: RuntimeEnv; abortSignal?: AbortSignal; -}) { - const cfg = opts.cfg ?? loadConfig(); - const heartbeatAgents = resolveHeartbeatAgents(cfg); - const intervals = heartbeatAgents - .map((agent) => resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat)) - .filter((value): value is number => typeof value === "number"); - const intervalMs = intervals.length > 0 ? Math.min(...intervals) : null; - if (!intervalMs) { - log.info("heartbeat: disabled", { enabled: false }); - } - + runOnce?: typeof runHeartbeatOnce; +}): HeartbeatRunner { const runtime = opts.runtime ?? defaultRuntime; - const lastRunByAgent = new Map(); + const runOnce = opts.runOnce ?? runHeartbeatOnce; + const state = { + cfg: opts.cfg ?? loadConfig(), + runtime, + agents: new Map(), + timer: null as NodeJS.Timeout | null, + stopped: false, + }; + let initialized = false; + + const resolveNextDue = (now: number, intervalMs: number, prevState?: HeartbeatAgentState) => { + if (typeof prevState?.lastRunMs === "number") { + return prevState.lastRunMs + intervalMs; + } + if (prevState && prevState.intervalMs === intervalMs && prevState.nextDueMs > now) { + return prevState.nextDueMs; + } + return now + intervalMs; + }; + + const scheduleNext = () => { + if (state.stopped) return; + if (state.timer) { + clearTimeout(state.timer); + state.timer = null; + } + if (state.agents.size === 0) return; + const now = Date.now(); + let nextDue = Number.POSITIVE_INFINITY; + for (const agent of state.agents.values()) { + if (agent.nextDueMs < nextDue) nextDue = agent.nextDueMs; + } + if (!Number.isFinite(nextDue)) return; + const delay = Math.max(0, nextDue - now); + state.timer = setTimeout(() => { + requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + }, delay); + state.timer.unref?.(); + }; + + const updateConfig = (cfg: ClawdbotConfig) => { + if (state.stopped) return; + const now = Date.now(); + const prevAgents = state.agents; + const prevEnabled = prevAgents.size > 0; + const nextAgents = new Map(); + const intervals: number[] = []; + for (const agent of resolveHeartbeatAgents(cfg)) { + const intervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat); + if (!intervalMs) continue; + intervals.push(intervalMs); + const prevState = prevAgents.get(agent.agentId); + const nextDueMs = resolveNextDue(now, intervalMs, prevState); + nextAgents.set(agent.agentId, { + agentId: agent.agentId, + heartbeat: agent.heartbeat, + intervalMs, + lastRunMs: prevState?.lastRunMs, + nextDueMs, + }); + } + + state.cfg = cfg; + state.agents = nextAgents; + const nextEnabled = nextAgents.size > 0; + if (!initialized) { + if (!nextEnabled) { + log.info("heartbeat: disabled", { enabled: false }); + } else { + log.info("heartbeat: started", { intervalMs: Math.min(...intervals) }); + } + initialized = true; + } else if (prevEnabled !== nextEnabled) { + if (!nextEnabled) { + log.info("heartbeat: disabled", { enabled: false }); + } else { + log.info("heartbeat: started", { intervalMs: Math.min(...intervals) }); + } + } + + scheduleNext(); + }; + const run: HeartbeatWakeHandler = async (params) => { if (!heartbeatsEnabled) { return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult; } - if (heartbeatAgents.length === 0) { + if (state.agents.size === 0) { return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult; } @@ -566,52 +652,44 @@ export function startHeartbeatRunner(opts: { const now = startedAt; let ran = false; - for (const agent of heartbeatAgents) { - const agentIntervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat); - if (!agentIntervalMs) continue; - const lastRun = lastRunByAgent.get(agent.agentId); - if (isInterval && typeof lastRun === "number" && now - lastRun < agentIntervalMs) { + for (const agent of state.agents.values()) { + if (isInterval && now < agent.nextDueMs) { continue; } - const res = await runHeartbeatOnce({ - cfg, + const res = await runOnce({ + cfg: state.cfg, agentId: agent.agentId, heartbeat: agent.heartbeat, reason, - deps: { runtime }, + deps: { runtime: state.runtime }, }); if (res.status === "skipped" && res.reason === "requests-in-flight") { return res; } if (res.status !== "skipped" || res.reason !== "disabled") { - lastRunByAgent.set(agent.agentId, now); + agent.lastRunMs = now; + agent.nextDueMs = now + agent.intervalMs; } if (res.status === "ran") ran = true; } + scheduleNext(); if (ran) return { status: "ran", durationMs: Date.now() - startedAt }; return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; }; setHeartbeatWakeHandler(async (params) => run({ reason: params.reason })); - - let timer: NodeJS.Timeout | null = null; - if (intervalMs) { - timer = setInterval(() => { - requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); - }, intervalMs); - timer.unref?.(); - log.info("heartbeat: started", { intervalMs }); - } + updateConfig(state.cfg); const cleanup = () => { + state.stopped = true; setHeartbeatWakeHandler(null); - if (timer) clearInterval(timer); - timer = null; + if (state.timer) clearTimeout(state.timer); + state.timer = null; }; opts.abortSignal?.addEventListener("abort", cleanup, { once: true }); - return { stop: cleanup }; + return { stop: cleanup, updateConfig }; }