fix: reschedule heartbeat on hot reload
Co-authored-by: Seb Slight <sebslight@users.noreply.github.com>
This commit is contained in:
@@ -29,6 +29,7 @@ Docs: https://docs.clawd.bot
|
||||
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
|
||||
- Doctor: clarify plugin auto-enable hint text in the startup banner.
|
||||
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.
|
||||
- Gateway: reschedule per-agent heartbeats on config hot reload without restarting the runner.
|
||||
- UI: keep config form enums typed, preserve empty strings, protect sensitive defaults, and deepen config search. (#1315) — thanks @MaudeBot.
|
||||
- UI: preserve ordered list numbering in chat markdown. (#1341) — thanks @bradleypriest.
|
||||
- Web search: infer Perplexity base URL from API key source (direct vs OpenRouter).
|
||||
|
||||
@@ -3,6 +3,7 @@ import type { WebSocketServer } from "ws";
|
||||
import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js";
|
||||
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
||||
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
||||
import type { HeartbeatRunner } from "../infra/heartbeat-runner.js";
|
||||
import type { PluginServicesHandle } from "../plugins/services.js";
|
||||
|
||||
export function createGatewayCloseHandler(params: {
|
||||
@@ -13,7 +14,7 @@ export function createGatewayCloseHandler(params: {
|
||||
stopChannel: (name: ChannelId, accountId?: string) => Promise<void>;
|
||||
pluginServices: PluginServicesHandle | null;
|
||||
cron: { stop: () => void };
|
||||
heartbeatRunner: { stop: () => void };
|
||||
heartbeatRunner: HeartbeatRunner;
|
||||
nodePresenceTimers: Map<string, ReturnType<typeof setInterval>>;
|
||||
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
||||
tickInterval: ReturnType<typeof setInterval>;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import type { loadConfig } from "../config/config.js";
|
||||
import { startGmailWatcher, stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
||||
import { startHeartbeatRunner } from "../infra/heartbeat-runner.js";
|
||||
import type { HeartbeatRunner } from "../infra/heartbeat-runner.js";
|
||||
import { resetDirectoryCache } from "../infra/outbound/target-resolver.js";
|
||||
import {
|
||||
authorizeGatewaySigusr1Restart,
|
||||
@@ -18,7 +18,7 @@ import { buildGatewayCronService, type GatewayCronState } from "./server-cron.js
|
||||
|
||||
type GatewayHotReloadState = {
|
||||
hooksConfig: ReturnType<typeof resolveHooksConfig>;
|
||||
heartbeatRunner: { stop: () => void };
|
||||
heartbeatRunner: HeartbeatRunner;
|
||||
cronState: GatewayCronState;
|
||||
browserControl: Awaited<ReturnType<typeof startBrowserControlServerIfEnabled>> | null;
|
||||
};
|
||||
@@ -57,8 +57,7 @@ export function createGatewayReloadHandlers(params: {
|
||||
}
|
||||
|
||||
if (plan.restartHeartbeat) {
|
||||
state.heartbeatRunner.stop();
|
||||
nextState.heartbeatRunner = startHeartbeatRunner({ cfg: nextConfig });
|
||||
nextState.heartbeatRunner.updateConfig(nextConfig);
|
||||
}
|
||||
|
||||
resetDirectoryCache();
|
||||
|
||||
@@ -21,7 +21,11 @@ const hoisted = vi.hoisted(() => {
|
||||
}));
|
||||
|
||||
const heartbeatStop = vi.fn();
|
||||
const startHeartbeatRunner = vi.fn(() => ({ stop: heartbeatStop }));
|
||||
const heartbeatUpdateConfig = vi.fn();
|
||||
const startHeartbeatRunner = vi.fn(() => ({
|
||||
stop: heartbeatStop,
|
||||
updateConfig: heartbeatUpdateConfig,
|
||||
}));
|
||||
|
||||
const startGmailWatcher = vi.fn(async () => ({ started: true }));
|
||||
const stopGmailWatcher = vi.fn(async () => {});
|
||||
@@ -116,6 +120,7 @@ const hoisted = vi.hoisted(() => {
|
||||
browserStop,
|
||||
startBrowserControlServerIfEnabled,
|
||||
heartbeatStop,
|
||||
heartbeatUpdateConfig,
|
||||
startHeartbeatRunner,
|
||||
startGmailWatcher,
|
||||
stopGmailWatcher,
|
||||
@@ -237,8 +242,9 @@ describe("gateway hot reload", () => {
|
||||
expect(hoisted.browserStop).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.startBrowserControlServerIfEnabled).toHaveBeenCalledTimes(2);
|
||||
|
||||
expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(2);
|
||||
expect(hoisted.heartbeatStop).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.heartbeatUpdateConfig).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.heartbeatUpdateConfig).toHaveBeenCalledWith(nextConfig);
|
||||
|
||||
expect(hoisted.cronInstances.length).toBe(2);
|
||||
expect(hoisted.cronInstances[0].stop).toHaveBeenCalledTimes(1);
|
||||
|
||||
57
src/infra/heartbeat-runner.scheduler.test.ts
Normal file
57
src/infra/heartbeat-runner.scheduler.test.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { ClawdbotConfig } from "../config/config.js";
|
||||
import { startHeartbeatRunner } from "./heartbeat-runner.js";
|
||||
|
||||
describe("startHeartbeatRunner", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("updates scheduling when config changes without restart", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date(0));
|
||||
|
||||
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
|
||||
|
||||
const runner = startHeartbeatRunner({
|
||||
cfg: {
|
||||
agents: { defaults: { heartbeat: { every: "30m" } } },
|
||||
} as ClawdbotConfig,
|
||||
runOnce: runSpy,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||
|
||||
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||
expect(runSpy.mock.calls[0]?.[0]).toEqual(
|
||||
expect.objectContaining({ agentId: "main", reason: "interval" }),
|
||||
);
|
||||
|
||||
runner.updateConfig({
|
||||
agents: {
|
||||
defaults: { heartbeat: { every: "30m" } },
|
||||
list: [
|
||||
{ id: "main", heartbeat: { every: "10m" } },
|
||||
{ id: "ops", heartbeat: { every: "15m" } },
|
||||
],
|
||||
},
|
||||
} as ClawdbotConfig);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(10 * 60_000 + 1_000);
|
||||
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
expect(runSpy.mock.calls[1]?.[0]).toEqual(
|
||||
expect.objectContaining({ agentId: "main", heartbeat: { every: "10m" } }),
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000);
|
||||
|
||||
expect(runSpy).toHaveBeenCalledTimes(3);
|
||||
expect(runSpy.mock.calls[2]?.[0]).toEqual(
|
||||
expect.objectContaining({ agentId: "ops", heartbeat: { every: "15m" } }),
|
||||
);
|
||||
|
||||
runner.stop();
|
||||
});
|
||||
});
|
||||
@@ -70,6 +70,19 @@ export type HeartbeatSummary = {
|
||||
|
||||
const DEFAULT_HEARTBEAT_TARGET = "last";
|
||||
|
||||
type HeartbeatAgentState = {
|
||||
agentId: string;
|
||||
heartbeat?: HeartbeatConfig;
|
||||
intervalMs: number;
|
||||
lastRunMs?: number;
|
||||
nextDueMs: number;
|
||||
};
|
||||
|
||||
export type HeartbeatRunner = {
|
||||
stop: () => void;
|
||||
updateConfig: (cfg: ClawdbotConfig) => void;
|
||||
};
|
||||
|
||||
function hasExplicitHeartbeatAgents(cfg: ClawdbotConfig) {
|
||||
const list = cfg.agents?.list ?? [];
|
||||
return list.some((entry) => Boolean(entry?.heartbeat));
|
||||
@@ -539,24 +552,97 @@ export function startHeartbeatRunner(opts: {
|
||||
cfg?: ClawdbotConfig;
|
||||
runtime?: RuntimeEnv;
|
||||
abortSignal?: AbortSignal;
|
||||
}) {
|
||||
const cfg = opts.cfg ?? loadConfig();
|
||||
const heartbeatAgents = resolveHeartbeatAgents(cfg);
|
||||
const intervals = heartbeatAgents
|
||||
.map((agent) => resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat))
|
||||
.filter((value): value is number => typeof value === "number");
|
||||
const intervalMs = intervals.length > 0 ? Math.min(...intervals) : null;
|
||||
if (!intervalMs) {
|
||||
log.info("heartbeat: disabled", { enabled: false });
|
||||
}
|
||||
|
||||
runOnce?: typeof runHeartbeatOnce;
|
||||
}): HeartbeatRunner {
|
||||
const runtime = opts.runtime ?? defaultRuntime;
|
||||
const lastRunByAgent = new Map<string, number>();
|
||||
const runOnce = opts.runOnce ?? runHeartbeatOnce;
|
||||
const state = {
|
||||
cfg: opts.cfg ?? loadConfig(),
|
||||
runtime,
|
||||
agents: new Map<string, HeartbeatAgentState>(),
|
||||
timer: null as NodeJS.Timeout | null,
|
||||
stopped: false,
|
||||
};
|
||||
let initialized = false;
|
||||
|
||||
const resolveNextDue = (now: number, intervalMs: number, prevState?: HeartbeatAgentState) => {
|
||||
if (typeof prevState?.lastRunMs === "number") {
|
||||
return prevState.lastRunMs + intervalMs;
|
||||
}
|
||||
if (prevState && prevState.intervalMs === intervalMs && prevState.nextDueMs > now) {
|
||||
return prevState.nextDueMs;
|
||||
}
|
||||
return now + intervalMs;
|
||||
};
|
||||
|
||||
const scheduleNext = () => {
|
||||
if (state.stopped) return;
|
||||
if (state.timer) {
|
||||
clearTimeout(state.timer);
|
||||
state.timer = null;
|
||||
}
|
||||
if (state.agents.size === 0) return;
|
||||
const now = Date.now();
|
||||
let nextDue = Number.POSITIVE_INFINITY;
|
||||
for (const agent of state.agents.values()) {
|
||||
if (agent.nextDueMs < nextDue) nextDue = agent.nextDueMs;
|
||||
}
|
||||
if (!Number.isFinite(nextDue)) return;
|
||||
const delay = Math.max(0, nextDue - now);
|
||||
state.timer = setTimeout(() => {
|
||||
requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
|
||||
}, delay);
|
||||
state.timer.unref?.();
|
||||
};
|
||||
|
||||
const updateConfig = (cfg: ClawdbotConfig) => {
|
||||
if (state.stopped) return;
|
||||
const now = Date.now();
|
||||
const prevAgents = state.agents;
|
||||
const prevEnabled = prevAgents.size > 0;
|
||||
const nextAgents = new Map<string, HeartbeatAgentState>();
|
||||
const intervals: number[] = [];
|
||||
for (const agent of resolveHeartbeatAgents(cfg)) {
|
||||
const intervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat);
|
||||
if (!intervalMs) continue;
|
||||
intervals.push(intervalMs);
|
||||
const prevState = prevAgents.get(agent.agentId);
|
||||
const nextDueMs = resolveNextDue(now, intervalMs, prevState);
|
||||
nextAgents.set(agent.agentId, {
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
intervalMs,
|
||||
lastRunMs: prevState?.lastRunMs,
|
||||
nextDueMs,
|
||||
});
|
||||
}
|
||||
|
||||
state.cfg = cfg;
|
||||
state.agents = nextAgents;
|
||||
const nextEnabled = nextAgents.size > 0;
|
||||
if (!initialized) {
|
||||
if (!nextEnabled) {
|
||||
log.info("heartbeat: disabled", { enabled: false });
|
||||
} else {
|
||||
log.info("heartbeat: started", { intervalMs: Math.min(...intervals) });
|
||||
}
|
||||
initialized = true;
|
||||
} else if (prevEnabled !== nextEnabled) {
|
||||
if (!nextEnabled) {
|
||||
log.info("heartbeat: disabled", { enabled: false });
|
||||
} else {
|
||||
log.info("heartbeat: started", { intervalMs: Math.min(...intervals) });
|
||||
}
|
||||
}
|
||||
|
||||
scheduleNext();
|
||||
};
|
||||
|
||||
const run: HeartbeatWakeHandler = async (params) => {
|
||||
if (!heartbeatsEnabled) {
|
||||
return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult;
|
||||
}
|
||||
if (heartbeatAgents.length === 0) {
|
||||
if (state.agents.size === 0) {
|
||||
return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult;
|
||||
}
|
||||
|
||||
@@ -566,52 +652,44 @@ export function startHeartbeatRunner(opts: {
|
||||
const now = startedAt;
|
||||
let ran = false;
|
||||
|
||||
for (const agent of heartbeatAgents) {
|
||||
const agentIntervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat);
|
||||
if (!agentIntervalMs) continue;
|
||||
const lastRun = lastRunByAgent.get(agent.agentId);
|
||||
if (isInterval && typeof lastRun === "number" && now - lastRun < agentIntervalMs) {
|
||||
for (const agent of state.agents.values()) {
|
||||
if (isInterval && now < agent.nextDueMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const res = await runHeartbeatOnce({
|
||||
cfg,
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime },
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
lastRunByAgent.set(agent.agentId, now);
|
||||
agent.lastRunMs = now;
|
||||
agent.nextDueMs = now + agent.intervalMs;
|
||||
}
|
||||
if (res.status === "ran") ran = true;
|
||||
}
|
||||
|
||||
scheduleNext();
|
||||
if (ran) return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
};
|
||||
|
||||
setHeartbeatWakeHandler(async (params) => run({ reason: params.reason }));
|
||||
|
||||
let timer: NodeJS.Timeout | null = null;
|
||||
if (intervalMs) {
|
||||
timer = setInterval(() => {
|
||||
requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
|
||||
}, intervalMs);
|
||||
timer.unref?.();
|
||||
log.info("heartbeat: started", { intervalMs });
|
||||
}
|
||||
updateConfig(state.cfg);
|
||||
|
||||
const cleanup = () => {
|
||||
state.stopped = true;
|
||||
setHeartbeatWakeHandler(null);
|
||||
if (timer) clearInterval(timer);
|
||||
timer = null;
|
||||
if (state.timer) clearTimeout(state.timer);
|
||||
state.timer = null;
|
||||
};
|
||||
|
||||
opts.abortSignal?.addEventListener("abort", cleanup, { once: true });
|
||||
|
||||
return { stop: cleanup };
|
||||
return { stop: cleanup, updateConfig };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user