feat: add per-agent heartbeat config
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import { resolveAgentConfig, resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { resolveEffectiveMessagesConfig } from "../agents/identity.js";
|
||||
import {
|
||||
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
||||
@@ -14,20 +15,22 @@ import type { ClawdbotConfig } from "../config/config.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveMainSessionKey,
|
||||
resolveAgentMainSessionKey,
|
||||
resolveStorePath,
|
||||
saveSessionStore,
|
||||
updateSessionStore,
|
||||
} from "../config/sessions.js";
|
||||
import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { createSubsystemLogger } from "../logging.js";
|
||||
import { getQueueSize } from "../process/command-queue.js";
|
||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { normalizeAgentId } from "../routing/session-key.js";
|
||||
import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js";
|
||||
import { emitHeartbeatEvent } from "./heartbeat-events.js";
|
||||
import {
|
||||
type HeartbeatRunResult,
|
||||
type HeartbeatWakeHandler,
|
||||
requestHeartbeatNow,
|
||||
setHeartbeatWakeHandler,
|
||||
} from "./heartbeat-wake.js";
|
||||
@@ -49,8 +52,48 @@ export function setHeartbeatsEnabled(enabled: boolean) {
|
||||
heartbeatsEnabled = enabled;
|
||||
}
|
||||
|
||||
export function resolveHeartbeatIntervalMs(cfg: ClawdbotConfig, overrideEvery?: string) {
|
||||
const raw = overrideEvery ?? cfg.agents?.defaults?.heartbeat?.every ?? DEFAULT_HEARTBEAT_EVERY;
|
||||
type HeartbeatConfig = AgentDefaultsConfig["heartbeat"];
|
||||
type HeartbeatAgent = {
|
||||
agentId: string;
|
||||
heartbeat?: HeartbeatConfig;
|
||||
};
|
||||
|
||||
function resolveHeartbeatConfig(
|
||||
cfg: ClawdbotConfig,
|
||||
agentId?: string,
|
||||
): HeartbeatConfig | undefined {
|
||||
const defaults = cfg.agents?.defaults?.heartbeat;
|
||||
if (!agentId) return defaults;
|
||||
const overrides = resolveAgentConfig(cfg, agentId)?.heartbeat;
|
||||
if (!defaults && !overrides) return overrides;
|
||||
return { ...defaults, ...overrides };
|
||||
}
|
||||
|
||||
function resolveHeartbeatAgents(cfg: ClawdbotConfig): HeartbeatAgent[] {
|
||||
const list = cfg.agents?.list ?? [];
|
||||
const explicit = list.filter((entry) => entry?.heartbeat);
|
||||
if (explicit.length > 0) {
|
||||
return explicit
|
||||
.map((entry) => {
|
||||
const id = normalizeAgentId(entry.id);
|
||||
return { agentId: id, heartbeat: resolveHeartbeatConfig(cfg, id) };
|
||||
})
|
||||
.filter((entry) => entry.agentId);
|
||||
}
|
||||
const fallbackId = resolveDefaultAgentId(cfg);
|
||||
return [{ agentId: fallbackId, heartbeat: resolveHeartbeatConfig(cfg, fallbackId) }];
|
||||
}
|
||||
|
||||
export function resolveHeartbeatIntervalMs(
|
||||
cfg: ClawdbotConfig,
|
||||
overrideEvery?: string,
|
||||
heartbeat?: HeartbeatConfig,
|
||||
) {
|
||||
const raw =
|
||||
overrideEvery ??
|
||||
heartbeat?.every ??
|
||||
cfg.agents?.defaults?.heartbeat?.every ??
|
||||
DEFAULT_HEARTBEAT_EVERY;
|
||||
if (!raw) return null;
|
||||
const trimmed = String(raw).trim();
|
||||
if (!trimmed) return null;
|
||||
@@ -64,23 +107,31 @@ export function resolveHeartbeatIntervalMs(cfg: ClawdbotConfig, overrideEvery?:
|
||||
return ms;
|
||||
}
|
||||
|
||||
export function resolveHeartbeatPrompt(cfg: ClawdbotConfig) {
|
||||
return resolveHeartbeatPromptText(cfg.agents?.defaults?.heartbeat?.prompt);
|
||||
}
|
||||
|
||||
function resolveHeartbeatAckMaxChars(cfg: ClawdbotConfig) {
|
||||
return Math.max(
|
||||
0,
|
||||
cfg.agents?.defaults?.heartbeat?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
||||
export function resolveHeartbeatPrompt(cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig) {
|
||||
return resolveHeartbeatPromptText(
|
||||
heartbeat?.prompt ?? cfg.agents?.defaults?.heartbeat?.prompt,
|
||||
);
|
||||
}
|
||||
|
||||
function resolveHeartbeatSession(cfg: ClawdbotConfig) {
|
||||
function resolveHeartbeatAckMaxChars(cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig) {
|
||||
return Math.max(
|
||||
0,
|
||||
heartbeat?.ackMaxChars ??
|
||||
cfg.agents?.defaults?.heartbeat?.ackMaxChars ??
|
||||
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
||||
);
|
||||
}
|
||||
|
||||
function resolveHeartbeatSession(cfg: ClawdbotConfig, agentId?: string) {
|
||||
const sessionCfg = cfg.session;
|
||||
const scope = sessionCfg?.scope ?? "per-sender";
|
||||
const sessionKey = scope === "global" ? "global" : resolveMainSessionKey(cfg);
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||
const storePath = resolveStorePath(sessionCfg?.store, { agentId });
|
||||
const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg));
|
||||
const sessionKey =
|
||||
scope === "global"
|
||||
? "global"
|
||||
: resolveAgentMainSessionKey({ cfg, agentId: resolvedAgentId });
|
||||
const storeAgentId = scope === "global" ? resolveDefaultAgentId(cfg) : resolvedAgentId;
|
||||
const storePath = resolveStorePath(sessionCfg?.store, { agentId: storeAgentId });
|
||||
const store = loadSessionStore(storePath);
|
||||
const entry = store[sessionKey];
|
||||
return { sessionKey, storePath, store, entry };
|
||||
@@ -186,14 +237,18 @@ function normalizeHeartbeatReply(
|
||||
|
||||
export async function runHeartbeatOnce(opts: {
|
||||
cfg?: ClawdbotConfig;
|
||||
agentId?: string;
|
||||
heartbeat?: HeartbeatConfig;
|
||||
reason?: string;
|
||||
deps?: HeartbeatDeps;
|
||||
}): Promise<HeartbeatRunResult> {
|
||||
const cfg = opts.cfg ?? loadConfig();
|
||||
const agentId = normalizeAgentId(opts.agentId ?? resolveDefaultAgentId(cfg));
|
||||
const heartbeat = opts.heartbeat ?? resolveHeartbeatConfig(cfg, agentId);
|
||||
if (!heartbeatsEnabled) {
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
if (!resolveHeartbeatIntervalMs(cfg)) {
|
||||
if (!resolveHeartbeatIntervalMs(cfg, undefined, heartbeat)) {
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
|
||||
@@ -203,9 +258,9 @@ export async function runHeartbeatOnce(opts: {
|
||||
}
|
||||
|
||||
const startedAt = opts.deps?.nowMs?.() ?? Date.now();
|
||||
const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg);
|
||||
const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId);
|
||||
const previousUpdatedAt = entry?.updatedAt;
|
||||
const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry });
|
||||
const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat });
|
||||
const lastChannel =
|
||||
entry?.lastChannel && entry.lastChannel !== INTERNAL_MESSAGE_CHANNEL
|
||||
? normalizeChannelId(entry.lastChannel)
|
||||
@@ -222,18 +277,19 @@ export async function runHeartbeatOnce(opts: {
|
||||
lastTo: entry?.lastTo,
|
||||
provider: senderProvider,
|
||||
});
|
||||
const prompt = resolveHeartbeatPrompt(cfg);
|
||||
const prompt = resolveHeartbeatPrompt(cfg, heartbeat);
|
||||
const ctx = {
|
||||
Body: prompt,
|
||||
From: sender,
|
||||
To: sender,
|
||||
Provider: "heartbeat",
|
||||
SessionKey: sessionKey,
|
||||
};
|
||||
|
||||
try {
|
||||
const replyResult = await getReplyFromConfig(ctx, { isHeartbeat: true }, cfg);
|
||||
const replyPayload = resolveHeartbeatReplyPayload(replyResult);
|
||||
const includeReasoning = cfg.agents?.defaults?.heartbeat?.includeReasoning === true;
|
||||
const includeReasoning = heartbeat?.includeReasoning === true;
|
||||
const reasoningPayloads = includeReasoning
|
||||
? resolveHeartbeatReasoningPayloads(replyResult).filter((payload) => payload !== replyPayload)
|
||||
: [];
|
||||
@@ -255,10 +311,10 @@ export async function runHeartbeatOnce(opts: {
|
||||
return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
}
|
||||
|
||||
const ackMaxChars = resolveHeartbeatAckMaxChars(cfg);
|
||||
const ackMaxChars = resolveHeartbeatAckMaxChars(cfg, heartbeat);
|
||||
const normalized = normalizeHeartbeatReply(
|
||||
replyPayload,
|
||||
resolveEffectiveMessagesConfig(cfg, resolveAgentIdFromSessionKey(sessionKey)).responsePrefix,
|
||||
resolveEffectiveMessagesConfig(cfg, agentId).responsePrefix,
|
||||
ackMaxChars,
|
||||
);
|
||||
const shouldSkipMain = normalized.shouldSkip && !normalized.hasMedia;
|
||||
@@ -409,19 +465,57 @@ export function startHeartbeatRunner(opts: {
|
||||
abortSignal?: AbortSignal;
|
||||
}) {
|
||||
const cfg = opts.cfg ?? loadConfig();
|
||||
const intervalMs = resolveHeartbeatIntervalMs(cfg);
|
||||
const heartbeatAgents = resolveHeartbeatAgents(cfg);
|
||||
const intervals = heartbeatAgents
|
||||
.map((agent) => resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat))
|
||||
.filter((value): value is number => typeof value === "number");
|
||||
const intervalMs = intervals.length > 0 ? Math.min(...intervals) : null;
|
||||
if (!intervalMs) {
|
||||
log.info("heartbeat: disabled", { enabled: false });
|
||||
}
|
||||
|
||||
const runtime = opts.runtime ?? defaultRuntime;
|
||||
const run = async (params?: { reason?: string }) => {
|
||||
const res = await runHeartbeatOnce({
|
||||
cfg,
|
||||
reason: params?.reason,
|
||||
deps: { runtime },
|
||||
});
|
||||
return res;
|
||||
const lastRunByAgent = new Map<string, number>();
|
||||
const run: HeartbeatWakeHandler = async (params) => {
|
||||
if (!heartbeatsEnabled) {
|
||||
return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult;
|
||||
}
|
||||
if (heartbeatAgents.length === 0) {
|
||||
return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult;
|
||||
}
|
||||
|
||||
const reason = params?.reason;
|
||||
const isInterval = reason === "interval";
|
||||
const startedAt = Date.now();
|
||||
const now = startedAt;
|
||||
let ran = false;
|
||||
|
||||
for (const agent of heartbeatAgents) {
|
||||
const agentIntervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat);
|
||||
if (!agentIntervalMs) continue;
|
||||
const lastRun = lastRunByAgent.get(agent.agentId);
|
||||
if (isInterval && typeof lastRun === "number" && now - lastRun < agentIntervalMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const res = await runHeartbeatOnce({
|
||||
cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime },
|
||||
});
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
lastRunByAgent.set(agent.agentId, now);
|
||||
}
|
||||
if (res.status === "ran") ran = true;
|
||||
}
|
||||
|
||||
if (ran) return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
};
|
||||
|
||||
setHeartbeatWakeHandler(async (params) => run({ reason: params.reason }));
|
||||
|
||||
Reference in New Issue
Block a user