feat: configurable heartbeat session
This commit is contained in:
committed by
Peter Steinberger
parent
db61451c67
commit
7725dd6795
@@ -173,6 +173,8 @@ export type AgentDefaultsConfig = {
|
||||
};
|
||||
/** Heartbeat model override (provider/model). */
|
||||
model?: string;
|
||||
/** Session key for heartbeat runs ("main" or explicit session key). */
|
||||
session?: string;
|
||||
/** Delivery target (last|whatsapp|telegram|discord|slack|msteams|signal|imessage|none). */
|
||||
target?:
|
||||
| "last"
|
||||
|
||||
@@ -20,6 +20,7 @@ export const HeartbeatSchema = z
|
||||
.strict()
|
||||
.optional(),
|
||||
model: z.string().optional(),
|
||||
session: z.string().optional(),
|
||||
includeReasoning: z.boolean().optional(),
|
||||
target: z
|
||||
.union([
|
||||
|
||||
@@ -41,7 +41,6 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "whatsapp",
|
||||
to: "+1555",
|
||||
ackMaxChars: 0,
|
||||
},
|
||||
},
|
||||
@@ -58,6 +57,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
@@ -102,7 +102,6 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "whatsapp",
|
||||
to: "+1555",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -118,6 +117,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
@@ -164,7 +164,6 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "whatsapp",
|
||||
to: "+1555",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -180,6 +179,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: originalUpdatedAt,
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
@@ -231,7 +231,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
const cfg: ClawdbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: { every: "5m", target: "whatsapp", to: "+1555" },
|
||||
heartbeat: { every: "5m", target: "whatsapp" },
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
@@ -246,6 +246,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
@@ -291,7 +292,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
const cfg: ClawdbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: { every: "5m", target: "telegram", to: "123456" },
|
||||
heartbeat: { every: "5m", target: "telegram" },
|
||||
},
|
||||
},
|
||||
channels: { telegram: { botToken: "test-bot-token-123" } },
|
||||
@@ -306,6 +307,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "telegram",
|
||||
lastProvider: "telegram",
|
||||
lastTo: "123456",
|
||||
},
|
||||
@@ -357,7 +359,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
const cfg: ClawdbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: { every: "5m", target: "telegram", to: "123456" },
|
||||
heartbeat: { every: "5m", target: "telegram" },
|
||||
},
|
||||
},
|
||||
channels: {
|
||||
@@ -378,6 +380,7 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "telegram",
|
||||
lastProvider: "telegram",
|
||||
lastTo: "123456",
|
||||
},
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
resolveMainSessionKey,
|
||||
resolveStorePath,
|
||||
} from "../config/sessions.js";
|
||||
import { buildAgentPeerSessionKey } from "../routing/session-key.js";
|
||||
import {
|
||||
isHeartbeatEnabledForAgent,
|
||||
resolveHeartbeatIntervalMs,
|
||||
@@ -332,7 +333,7 @@ describe("runHeartbeatOnce", () => {
|
||||
const cfg: ClawdbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: { every: "5m", target: "whatsapp", to: "+1555" },
|
||||
heartbeat: { every: "5m", target: "whatsapp" },
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
@@ -395,7 +396,7 @@ describe("runHeartbeatOnce", () => {
|
||||
{ id: "main", default: true },
|
||||
{
|
||||
id: "ops",
|
||||
heartbeat: { every: "5m", target: "whatsapp", to: "+1555", prompt: "Ops check" },
|
||||
heartbeat: { every: "5m", target: "whatsapp", prompt: "Ops check" },
|
||||
},
|
||||
],
|
||||
},
|
||||
@@ -451,6 +452,86 @@ describe("runHeartbeatOnce", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("runs heartbeats in the explicit session key when configured", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
try {
|
||||
const groupId = "120363401234567890@g.us";
|
||||
const cfg: ClawdbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "last",
|
||||
},
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
const mainSessionKey = resolveMainSessionKey(cfg);
|
||||
const agentId = resolveAgentIdFromSessionKey(mainSessionKey);
|
||||
const groupSessionKey = buildAgentPeerSessionKey({
|
||||
agentId,
|
||||
channel: "whatsapp",
|
||||
peerKind: "group",
|
||||
peerId: groupId,
|
||||
});
|
||||
cfg.agents?.defaults?.heartbeat && (cfg.agents.defaults.heartbeat.session = groupSessionKey);
|
||||
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[mainSessionKey]: {
|
||||
sessionId: "sid-main",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
[groupSessionKey]: {
|
||||
sessionId: "sid-group",
|
||||
updatedAt: Date.now() + 10_000,
|
||||
lastChannel: "whatsapp",
|
||||
lastTo: groupId,
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
replySpy.mockResolvedValue([{ text: "Group alert" }]);
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({
|
||||
messageId: "m1",
|
||||
toJid: "jid",
|
||||
});
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
deps: {
|
||||
sendWhatsApp,
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
webAuthExists: async () => true,
|
||||
hasActiveWebListener: () => true,
|
||||
},
|
||||
});
|
||||
|
||||
expect(sendWhatsApp).toHaveBeenCalledTimes(1);
|
||||
expect(sendWhatsApp).toHaveBeenCalledWith(groupId, "Group alert", expect.any(Object));
|
||||
expect(replySpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ SessionKey: groupSessionKey }),
|
||||
{ isHeartbeat: true },
|
||||
cfg,
|
||||
);
|
||||
} finally {
|
||||
replySpy.mockRestore();
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("suppresses duplicate heartbeat payloads within 24h", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
@@ -459,7 +540,7 @@ describe("runHeartbeatOnce", () => {
|
||||
const cfg: ClawdbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: { every: "5m", target: "whatsapp", to: "+1555" },
|
||||
heartbeat: { every: "5m", target: "whatsapp" },
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
@@ -517,7 +598,6 @@ describe("runHeartbeatOnce", () => {
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "whatsapp",
|
||||
to: "+1555",
|
||||
includeReasoning: true,
|
||||
},
|
||||
},
|
||||
@@ -534,6 +614,7 @@ describe("runHeartbeatOnce", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
@@ -588,7 +669,6 @@ describe("runHeartbeatOnce", () => {
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "whatsapp",
|
||||
to: "+1555",
|
||||
includeReasoning: true,
|
||||
},
|
||||
},
|
||||
@@ -605,6 +685,7 @@ describe("runHeartbeatOnce", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
@@ -672,6 +753,7 @@ describe("runHeartbeatOnce", () => {
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { resolveAgentConfig, resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { resolveUserTimezone } from "../agents/date-time.js";
|
||||
import { resolveEffectiveMessagesConfig } from "../agents/identity.js";
|
||||
import {
|
||||
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
||||
@@ -16,6 +15,8 @@ import type { ClawdbotConfig } from "../config/config.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
canonicalizeMainSessionAlias,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveAgentMainSessionKey,
|
||||
resolveStorePath,
|
||||
saveSessionStore,
|
||||
@@ -25,9 +26,8 @@ import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { getQueueSize } from "../process/command-queue.js";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { normalizeAgentId } from "../routing/session-key.js";
|
||||
import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js";
|
||||
import { emitHeartbeatEvent } from "./heartbeat-events.js";
|
||||
import {
|
||||
type HeartbeatRunResult,
|
||||
@@ -70,94 +70,6 @@ export type HeartbeatSummary = {
|
||||
};
|
||||
|
||||
const DEFAULT_HEARTBEAT_TARGET = "last";
|
||||
const ACTIVE_HOURS_TIME_PATTERN = /^([01]\d|2[0-3]|24):([0-5]\d)$/;
|
||||
|
||||
function resolveActiveHoursTimezone(cfg: ClawdbotConfig, raw?: string): string {
|
||||
const trimmed = raw?.trim();
|
||||
if (!trimmed || trimmed === "user") {
|
||||
return resolveUserTimezone(cfg.agents?.defaults?.userTimezone);
|
||||
}
|
||||
if (trimmed === "local") {
|
||||
const host = Intl.DateTimeFormat().resolvedOptions().timeZone;
|
||||
return host?.trim() || "UTC";
|
||||
}
|
||||
try {
|
||||
new Intl.DateTimeFormat("en-US", { timeZone: trimmed }).format(new Date());
|
||||
return trimmed;
|
||||
} catch {
|
||||
return resolveUserTimezone(cfg.agents?.defaults?.userTimezone);
|
||||
}
|
||||
}
|
||||
|
||||
function parseActiveHoursTime(opts: { allow24: boolean }, raw?: string): number | null {
|
||||
if (!raw || !ACTIVE_HOURS_TIME_PATTERN.test(raw)) return null;
|
||||
const [hourStr, minuteStr] = raw.split(":");
|
||||
const hour = Number(hourStr);
|
||||
const minute = Number(minuteStr);
|
||||
if (!Number.isFinite(hour) || !Number.isFinite(minute)) return null;
|
||||
if (hour === 24) {
|
||||
if (!opts.allow24 || minute !== 0) return null;
|
||||
return 24 * 60;
|
||||
}
|
||||
return hour * 60 + minute;
|
||||
}
|
||||
|
||||
function resolveMinutesInTimeZone(nowMs: number, timeZone: string): number | null {
|
||||
try {
|
||||
const parts = new Intl.DateTimeFormat("en-US", {
|
||||
timeZone,
|
||||
hour: "2-digit",
|
||||
minute: "2-digit",
|
||||
hourCycle: "h23",
|
||||
}).formatToParts(new Date(nowMs));
|
||||
const map: Record<string, string> = {};
|
||||
for (const part of parts) {
|
||||
if (part.type !== "literal") map[part.type] = part.value;
|
||||
}
|
||||
const hour = Number(map.hour);
|
||||
const minute = Number(map.minute);
|
||||
if (!Number.isFinite(hour) || !Number.isFinite(minute)) return null;
|
||||
return hour * 60 + minute;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function isWithinActiveHours(
|
||||
cfg: ClawdbotConfig,
|
||||
heartbeat?: HeartbeatConfig,
|
||||
nowMs?: number,
|
||||
): boolean {
|
||||
const active = heartbeat?.activeHours;
|
||||
if (!active) return true;
|
||||
|
||||
const startMin = parseActiveHoursTime({ allow24: false }, active.start);
|
||||
const endMin = parseActiveHoursTime({ allow24: true }, active.end);
|
||||
if (startMin === null || endMin === null) return true;
|
||||
if (startMin === endMin) return true;
|
||||
|
||||
const timeZone = resolveActiveHoursTimezone(cfg, active.timezone);
|
||||
const currentMin = resolveMinutesInTimeZone(nowMs ?? Date.now(), timeZone);
|
||||
if (currentMin === null) return true;
|
||||
|
||||
if (endMin > startMin) {
|
||||
return currentMin >= startMin && currentMin < endMin;
|
||||
}
|
||||
return currentMin >= startMin || currentMin < endMin;
|
||||
}
|
||||
|
||||
type HeartbeatAgentState = {
|
||||
agentId: string;
|
||||
heartbeat?: HeartbeatConfig;
|
||||
intervalMs: number;
|
||||
lastRunMs?: number;
|
||||
nextDueMs: number;
|
||||
};
|
||||
|
||||
export type HeartbeatRunner = {
|
||||
stop: () => void;
|
||||
updateConfig: (cfg: ClawdbotConfig) => void;
|
||||
};
|
||||
|
||||
function hasExplicitHeartbeatAgents(cfg: ClawdbotConfig) {
|
||||
const list = cfg.agents?.list ?? [];
|
||||
@@ -286,17 +198,53 @@ function resolveHeartbeatAckMaxChars(cfg: ClawdbotConfig, heartbeat?: HeartbeatC
|
||||
);
|
||||
}
|
||||
|
||||
function resolveHeartbeatSession(cfg: ClawdbotConfig, agentId?: string) {
|
||||
function resolveHeartbeatSession(
|
||||
cfg: ClawdbotConfig,
|
||||
agentId?: string,
|
||||
heartbeat?: HeartbeatConfig,
|
||||
) {
|
||||
const sessionCfg = cfg.session;
|
||||
const scope = sessionCfg?.scope ?? "per-sender";
|
||||
const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg));
|
||||
const sessionKey =
|
||||
const mainSessionKey =
|
||||
scope === "global" ? "global" : resolveAgentMainSessionKey({ cfg, agentId: resolvedAgentId });
|
||||
const storeAgentId = scope === "global" ? resolveDefaultAgentId(cfg) : resolvedAgentId;
|
||||
const storePath = resolveStorePath(sessionCfg?.store, { agentId: storeAgentId });
|
||||
const store = loadSessionStore(storePath);
|
||||
const entry = store[sessionKey];
|
||||
return { sessionKey, storePath, store, entry };
|
||||
const mainEntry = store[mainSessionKey];
|
||||
|
||||
if (scope === "global") {
|
||||
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
|
||||
}
|
||||
|
||||
const trimmed = heartbeat?.session?.trim() ?? "";
|
||||
if (!trimmed) {
|
||||
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
|
||||
}
|
||||
|
||||
const normalized = trimmed.toLowerCase();
|
||||
if (normalized === "main" || normalized === "global") {
|
||||
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
|
||||
}
|
||||
|
||||
const candidate = toAgentStoreSessionKey({
|
||||
agentId: resolvedAgentId,
|
||||
requestKey: trimmed,
|
||||
mainKey: cfg.session?.mainKey,
|
||||
});
|
||||
const canonical = canonicalizeMainSessionAlias({
|
||||
cfg,
|
||||
agentId: resolvedAgentId,
|
||||
sessionKey: candidate,
|
||||
});
|
||||
if (canonical !== "global") {
|
||||
const sessionAgentId = resolveAgentIdFromSessionKey(canonical);
|
||||
if (sessionAgentId === normalizeAgentId(resolvedAgentId)) {
|
||||
return { sessionKey: canonical, storePath, store, entry: store[canonical] };
|
||||
}
|
||||
}
|
||||
|
||||
return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry };
|
||||
}
|
||||
|
||||
function resolveHeartbeatReplyPayload(
|
||||
@@ -417,17 +365,13 @@ export async function runHeartbeatOnce(opts: {
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
|
||||
const startedAt = opts.deps?.nowMs?.() ?? Date.now();
|
||||
if (!isWithinActiveHours(cfg, heartbeat, startedAt)) {
|
||||
return { status: "skipped", reason: "quiet-hours" };
|
||||
}
|
||||
|
||||
const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)(CommandLane.Main);
|
||||
const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)("main");
|
||||
if (queueSize > 0) {
|
||||
return { status: "skipped", reason: "requests-in-flight" };
|
||||
}
|
||||
|
||||
const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId);
|
||||
const startedAt = opts.deps?.nowMs?.() ?? Date.now();
|
||||
const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId, heartbeat);
|
||||
const previousUpdatedAt = entry?.updatedAt;
|
||||
const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat });
|
||||
const lastChannel = delivery.lastChannel;
|
||||
@@ -632,97 +576,24 @@ export function startHeartbeatRunner(opts: {
|
||||
cfg?: ClawdbotConfig;
|
||||
runtime?: RuntimeEnv;
|
||||
abortSignal?: AbortSignal;
|
||||
runOnce?: typeof runHeartbeatOnce;
|
||||
}): HeartbeatRunner {
|
||||
}) {
|
||||
const cfg = opts.cfg ?? loadConfig();
|
||||
const heartbeatAgents = resolveHeartbeatAgents(cfg);
|
||||
const intervals = heartbeatAgents
|
||||
.map((agent) => resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat))
|
||||
.filter((value): value is number => typeof value === "number");
|
||||
const intervalMs = intervals.length > 0 ? Math.min(...intervals) : null;
|
||||
if (!intervalMs) {
|
||||
log.info("heartbeat: disabled", { enabled: false });
|
||||
}
|
||||
|
||||
const runtime = opts.runtime ?? defaultRuntime;
|
||||
const runOnce = opts.runOnce ?? runHeartbeatOnce;
|
||||
const state = {
|
||||
cfg: opts.cfg ?? loadConfig(),
|
||||
runtime,
|
||||
agents: new Map<string, HeartbeatAgentState>(),
|
||||
timer: null as NodeJS.Timeout | null,
|
||||
stopped: false,
|
||||
};
|
||||
let initialized = false;
|
||||
|
||||
const resolveNextDue = (now: number, intervalMs: number, prevState?: HeartbeatAgentState) => {
|
||||
if (typeof prevState?.lastRunMs === "number") {
|
||||
return prevState.lastRunMs + intervalMs;
|
||||
}
|
||||
if (prevState && prevState.intervalMs === intervalMs && prevState.nextDueMs > now) {
|
||||
return prevState.nextDueMs;
|
||||
}
|
||||
return now + intervalMs;
|
||||
};
|
||||
|
||||
const scheduleNext = () => {
|
||||
if (state.stopped) return;
|
||||
if (state.timer) {
|
||||
clearTimeout(state.timer);
|
||||
state.timer = null;
|
||||
}
|
||||
if (state.agents.size === 0) return;
|
||||
const now = Date.now();
|
||||
let nextDue = Number.POSITIVE_INFINITY;
|
||||
for (const agent of state.agents.values()) {
|
||||
if (agent.nextDueMs < nextDue) nextDue = agent.nextDueMs;
|
||||
}
|
||||
if (!Number.isFinite(nextDue)) return;
|
||||
const delay = Math.max(0, nextDue - now);
|
||||
state.timer = setTimeout(() => {
|
||||
requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
|
||||
}, delay);
|
||||
state.timer.unref?.();
|
||||
};
|
||||
|
||||
const updateConfig = (cfg: ClawdbotConfig) => {
|
||||
if (state.stopped) return;
|
||||
const now = Date.now();
|
||||
const prevAgents = state.agents;
|
||||
const prevEnabled = prevAgents.size > 0;
|
||||
const nextAgents = new Map<string, HeartbeatAgentState>();
|
||||
const intervals: number[] = [];
|
||||
for (const agent of resolveHeartbeatAgents(cfg)) {
|
||||
const intervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat);
|
||||
if (!intervalMs) continue;
|
||||
intervals.push(intervalMs);
|
||||
const prevState = prevAgents.get(agent.agentId);
|
||||
const nextDueMs = resolveNextDue(now, intervalMs, prevState);
|
||||
nextAgents.set(agent.agentId, {
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
intervalMs,
|
||||
lastRunMs: prevState?.lastRunMs,
|
||||
nextDueMs,
|
||||
});
|
||||
}
|
||||
|
||||
state.cfg = cfg;
|
||||
state.agents = nextAgents;
|
||||
const nextEnabled = nextAgents.size > 0;
|
||||
if (!initialized) {
|
||||
if (!nextEnabled) {
|
||||
log.info("heartbeat: disabled", { enabled: false });
|
||||
} else {
|
||||
log.info("heartbeat: started", { intervalMs: Math.min(...intervals) });
|
||||
}
|
||||
initialized = true;
|
||||
} else if (prevEnabled !== nextEnabled) {
|
||||
if (!nextEnabled) {
|
||||
log.info("heartbeat: disabled", { enabled: false });
|
||||
} else {
|
||||
log.info("heartbeat: started", { intervalMs: Math.min(...intervals) });
|
||||
}
|
||||
}
|
||||
|
||||
scheduleNext();
|
||||
};
|
||||
|
||||
const lastRunByAgent = new Map<string, number>();
|
||||
const run: HeartbeatWakeHandler = async (params) => {
|
||||
if (!heartbeatsEnabled) {
|
||||
return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult;
|
||||
}
|
||||
if (state.agents.size === 0) {
|
||||
if (heartbeatAgents.length === 0) {
|
||||
return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult;
|
||||
}
|
||||
|
||||
@@ -732,44 +603,52 @@ export function startHeartbeatRunner(opts: {
|
||||
const now = startedAt;
|
||||
let ran = false;
|
||||
|
||||
for (const agent of state.agents.values()) {
|
||||
if (isInterval && now < agent.nextDueMs) {
|
||||
for (const agent of heartbeatAgents) {
|
||||
const agentIntervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat);
|
||||
if (!agentIntervalMs) continue;
|
||||
const lastRun = lastRunByAgent.get(agent.agentId);
|
||||
if (isInterval && typeof lastRun === "number" && now - lastRun < agentIntervalMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
const res = await runHeartbeatOnce({
|
||||
cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime: state.runtime },
|
||||
deps: { runtime },
|
||||
});
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
agent.lastRunMs = now;
|
||||
agent.nextDueMs = now + agent.intervalMs;
|
||||
lastRunByAgent.set(agent.agentId, now);
|
||||
}
|
||||
if (res.status === "ran") ran = true;
|
||||
}
|
||||
|
||||
scheduleNext();
|
||||
if (ran) return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
};
|
||||
|
||||
setHeartbeatWakeHandler(async (params) => run({ reason: params.reason }));
|
||||
updateConfig(state.cfg);
|
||||
|
||||
let timer: NodeJS.Timeout | null = null;
|
||||
if (intervalMs) {
|
||||
timer = setInterval(() => {
|
||||
requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
|
||||
}, intervalMs);
|
||||
timer.unref?.();
|
||||
log.info("heartbeat: started", { intervalMs });
|
||||
}
|
||||
|
||||
const cleanup = () => {
|
||||
state.stopped = true;
|
||||
setHeartbeatWakeHandler(null);
|
||||
if (state.timer) clearTimeout(state.timer);
|
||||
state.timer = null;
|
||||
if (timer) clearInterval(timer);
|
||||
timer = null;
|
||||
};
|
||||
|
||||
opts.abortSignal?.addEventListener("abort", cleanup, { once: true });
|
||||
|
||||
return { stop: cleanup, updateConfig };
|
||||
return { stop: cleanup };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user