feat(gateway): add config hot reload
This commit is contained in:
@@ -96,6 +96,11 @@ import {
|
||||
import { createBridgeHandlers } from "./server-bridge.js";
|
||||
import { createBridgeSubscriptionManager } from "./server-bridge-subscriptions.js";
|
||||
import { startBrowserControlServerIfEnabled } from "./server-browser.js";
|
||||
import {
|
||||
startGatewayConfigReloader,
|
||||
type GatewayReloadPlan,
|
||||
type ProviderKind,
|
||||
} from "./config-reload.js";
|
||||
import { createAgentEventHandler, createChatRunState } from "./server-chat.js";
|
||||
import {
|
||||
DEDUPE_MAX,
|
||||
@@ -133,6 +138,7 @@ const logProviders = log.child("providers");
|
||||
const logBrowser = log.child("browser");
|
||||
const logHealth = log.child("health");
|
||||
const logCron = log.child("cron");
|
||||
const logReload = log.child("reload");
|
||||
const logHooks = log.child("hooks");
|
||||
const logWsControl = log.child("ws");
|
||||
const logWhatsApp = logProviders.child("whatsapp");
|
||||
@@ -408,7 +414,7 @@ export async function startGatewayServer(
|
||||
password,
|
||||
allowTailscale,
|
||||
};
|
||||
const hooksConfig = resolveHooksConfig(cfgAtStart);
|
||||
let hooksConfig = resolveHooksConfig(cfgAtStart);
|
||||
const canvasHostEnabled =
|
||||
process.env.CLAWDIS_SKIP_CANVAS_HOST !== "1" &&
|
||||
cfgAtStart.canvasHost?.enabled !== false;
|
||||
@@ -554,7 +560,7 @@ export async function startGatewayServer(
|
||||
}
|
||||
|
||||
const handleHooksRequest = createHooksRequestHandler({
|
||||
hooksConfig,
|
||||
getHooksConfig: () => hooksConfig,
|
||||
bindHost,
|
||||
port,
|
||||
logHooks,
|
||||
@@ -650,57 +656,61 @@ export async function startGatewayServer(
|
||||
setCommandLaneConcurrency("cron", cfgAtStart.cron?.maxConcurrentRuns ?? 1);
|
||||
setCommandLaneConcurrency("main", cfgAtStart.agent?.maxConcurrent ?? 1);
|
||||
|
||||
const cronStorePath = resolveCronStorePath(cfgAtStart.cron?.store);
|
||||
const cronLogger = getChildLogger({
|
||||
module: "cron",
|
||||
storePath: cronStorePath,
|
||||
});
|
||||
const deps = createDefaultDeps();
|
||||
const cronEnabled =
|
||||
process.env.CLAWDIS_SKIP_CRON !== "1" && cfgAtStart.cron?.enabled !== false;
|
||||
const cron = new CronService({
|
||||
storePath: cronStorePath,
|
||||
cronEnabled,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: async ({ job, message }) => {
|
||||
const cfg = loadConfig();
|
||||
return await runCronIsolatedAgentTurn({
|
||||
cfg,
|
||||
deps,
|
||||
job,
|
||||
message,
|
||||
sessionKey: `cron:${job.id}`,
|
||||
lane: "cron",
|
||||
});
|
||||
},
|
||||
log: cronLogger,
|
||||
onEvent: (evt) => {
|
||||
broadcast("cron", evt, { dropIfSlow: true });
|
||||
if (evt.action === "finished") {
|
||||
const logPath = resolveCronRunLogPath({
|
||||
storePath: cronStorePath,
|
||||
jobId: evt.jobId,
|
||||
const buildCronService = (cfg: ReturnType<typeof loadConfig>) => {
|
||||
const storePath = resolveCronStorePath(cfg.cron?.store);
|
||||
const cronEnabled =
|
||||
process.env.CLAWDIS_SKIP_CRON !== "1" && cfg.cron?.enabled !== false;
|
||||
const cron = new CronService({
|
||||
storePath,
|
||||
cronEnabled,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: async ({ job, message }) => {
|
||||
const runtimeConfig = loadConfig();
|
||||
return await runCronIsolatedAgentTurn({
|
||||
cfg: runtimeConfig,
|
||||
deps,
|
||||
job,
|
||||
message,
|
||||
sessionKey: `cron:${job.id}`,
|
||||
lane: "cron",
|
||||
});
|
||||
void appendCronRunLog(logPath, {
|
||||
ts: Date.now(),
|
||||
jobId: evt.jobId,
|
||||
action: "finished",
|
||||
status: evt.status,
|
||||
error: evt.error,
|
||||
summary: evt.summary,
|
||||
runAtMs: evt.runAtMs,
|
||||
durationMs: evt.durationMs,
|
||||
nextRunAtMs: evt.nextRunAtMs,
|
||||
}).catch((err) => {
|
||||
cronLogger.warn(
|
||||
{ err: String(err), logPath },
|
||||
"cron: run log append failed",
|
||||
);
|
||||
});
|
||||
}
|
||||
},
|
||||
});
|
||||
},
|
||||
log: getChildLogger({ module: "cron", storePath }),
|
||||
onEvent: (evt) => {
|
||||
broadcast("cron", evt, { dropIfSlow: true });
|
||||
if (evt.action === "finished") {
|
||||
const logPath = resolveCronRunLogPath({
|
||||
storePath,
|
||||
jobId: evt.jobId,
|
||||
});
|
||||
void appendCronRunLog(logPath, {
|
||||
ts: Date.now(),
|
||||
jobId: evt.jobId,
|
||||
action: "finished",
|
||||
status: evt.status,
|
||||
error: evt.error,
|
||||
summary: evt.summary,
|
||||
runAtMs: evt.runAtMs,
|
||||
durationMs: evt.durationMs,
|
||||
nextRunAtMs: evt.nextRunAtMs,
|
||||
}).catch((err) => {
|
||||
cronLogger.warn(
|
||||
{ err: String(err), logPath },
|
||||
"cron: run log append failed",
|
||||
);
|
||||
});
|
||||
}
|
||||
},
|
||||
});
|
||||
return { cron, storePath, cronEnabled };
|
||||
};
|
||||
|
||||
let { cron, storePath: cronStorePath } = buildCronService(cfgAtStart);
|
||||
|
||||
const providerManager = createProviderManager({
|
||||
loadConfig,
|
||||
@@ -719,6 +729,10 @@ export async function startGatewayServer(
|
||||
getRuntimeSnapshot,
|
||||
startProviders,
|
||||
startWhatsAppProvider,
|
||||
startTelegramProvider,
|
||||
startDiscordProvider,
|
||||
startSignalProvider,
|
||||
startIMessageProvider,
|
||||
stopWhatsAppProvider,
|
||||
stopTelegramProvider,
|
||||
stopDiscordProvider,
|
||||
@@ -1122,7 +1136,7 @@ export async function startGatewayServer(
|
||||
broadcast("heartbeat", evt, { dropIfSlow: true });
|
||||
});
|
||||
|
||||
const heartbeatRunner = startHeartbeatRunner({ cfg: cfgAtStart });
|
||||
let heartbeatRunner = startHeartbeatRunner({ cfg: cfgAtStart });
|
||||
|
||||
void cron
|
||||
.start()
|
||||
@@ -1585,6 +1599,160 @@ export async function startGatewayServer(
|
||||
logProviders.info("skipping provider start (CLAWDIS_SKIP_PROVIDERS=1)");
|
||||
}
|
||||
|
||||
const applyHotReload = async (
|
||||
plan: GatewayReloadPlan,
|
||||
nextConfig: ReturnType<typeof loadConfig>,
|
||||
) => {
|
||||
if (plan.reloadHooks) {
|
||||
try {
|
||||
hooksConfig = resolveHooksConfig(nextConfig);
|
||||
} catch (err) {
|
||||
logHooks.warn(`hooks config reload failed: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (plan.restartHeartbeat) {
|
||||
heartbeatRunner.stop();
|
||||
heartbeatRunner = startHeartbeatRunner({ cfg: nextConfig });
|
||||
}
|
||||
|
||||
if (plan.restartCron) {
|
||||
cron.stop();
|
||||
const next = buildCronService(nextConfig);
|
||||
cron = next.cron;
|
||||
cronStorePath = next.storePath;
|
||||
void cron
|
||||
.start()
|
||||
.catch((err) => logCron.error(`failed to start: ${String(err)}`));
|
||||
}
|
||||
|
||||
if (plan.restartBrowserControl) {
|
||||
if (browserControl) {
|
||||
await browserControl.stop().catch(() => {});
|
||||
}
|
||||
try {
|
||||
browserControl = await startBrowserControlServerIfEnabled();
|
||||
} catch (err) {
|
||||
logBrowser.error(`server failed to start: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (plan.restartGmailWatcher) {
|
||||
await stopGmailWatcher().catch(() => {});
|
||||
if (process.env.CLAWDIS_SKIP_GMAIL_WATCHER !== "1") {
|
||||
try {
|
||||
const gmailResult = await startGmailWatcher(nextConfig);
|
||||
if (gmailResult.started) {
|
||||
logHooks.info("gmail watcher started");
|
||||
} else if (
|
||||
gmailResult.reason &&
|
||||
gmailResult.reason !== "hooks not enabled" &&
|
||||
gmailResult.reason !== "no gmail account configured"
|
||||
) {
|
||||
logHooks.warn(`gmail watcher not started: ${gmailResult.reason}`);
|
||||
}
|
||||
} catch (err) {
|
||||
logHooks.error(`gmail watcher failed to start: ${String(err)}`);
|
||||
}
|
||||
} else {
|
||||
logHooks.info(
|
||||
"skipping gmail watcher restart (CLAWDIS_SKIP_GMAIL_WATCHER=1)",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (plan.restartProviders.size > 0) {
|
||||
if (process.env.CLAWDIS_SKIP_PROVIDERS === "1") {
|
||||
logProviders.info("skipping provider reload (CLAWDIS_SKIP_PROVIDERS=1)");
|
||||
} else {
|
||||
const restartProvider = async (
|
||||
name: ProviderKind,
|
||||
stop: () => Promise<void>,
|
||||
start: () => Promise<void>,
|
||||
) => {
|
||||
logProviders.info(`restarting ${name} provider`);
|
||||
await stop();
|
||||
await start();
|
||||
};
|
||||
if (plan.restartProviders.has("whatsapp")) {
|
||||
await restartProvider(
|
||||
"whatsapp",
|
||||
stopWhatsAppProvider,
|
||||
startWhatsAppProvider,
|
||||
);
|
||||
}
|
||||
if (plan.restartProviders.has("telegram")) {
|
||||
await restartProvider(
|
||||
"telegram",
|
||||
stopTelegramProvider,
|
||||
startTelegramProvider,
|
||||
);
|
||||
}
|
||||
if (plan.restartProviders.has("discord")) {
|
||||
await restartProvider(
|
||||
"discord",
|
||||
stopDiscordProvider,
|
||||
startDiscordProvider,
|
||||
);
|
||||
}
|
||||
if (plan.restartProviders.has("signal")) {
|
||||
await restartProvider(
|
||||
"signal",
|
||||
stopSignalProvider,
|
||||
startSignalProvider,
|
||||
);
|
||||
}
|
||||
if (plan.restartProviders.has("imessage")) {
|
||||
await restartProvider(
|
||||
"imessage",
|
||||
stopIMessageProvider,
|
||||
startIMessageProvider,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setCommandLaneConcurrency(
|
||||
"cron",
|
||||
nextConfig.cron?.maxConcurrentRuns ?? 1,
|
||||
);
|
||||
setCommandLaneConcurrency("main", nextConfig.agent?.maxConcurrent ?? 1);
|
||||
|
||||
if (plan.hotReasons.length > 0) {
|
||||
logReload.info(
|
||||
`config hot reload applied (${plan.hotReasons.join(", ")})`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const requestGatewayRestart = (
|
||||
plan: GatewayReloadPlan,
|
||||
_nextConfig: ReturnType<typeof loadConfig>,
|
||||
) => {
|
||||
const reasons = plan.restartReasons.length
|
||||
? plan.restartReasons.join(", ")
|
||||
: plan.changedPaths.join(", ");
|
||||
logReload.warn(`config change requires gateway restart (${reasons})`);
|
||||
if (process.listenerCount("SIGUSR1") === 0) {
|
||||
logReload.warn("no SIGUSR1 listener found; restart skipped");
|
||||
return;
|
||||
}
|
||||
process.emit("SIGUSR1");
|
||||
};
|
||||
|
||||
const configReloader = startGatewayConfigReloader({
|
||||
initialConfig: cfgAtStart,
|
||||
readSnapshot: readConfigFileSnapshot,
|
||||
onHotReload: applyHotReload,
|
||||
onRestart: requestGatewayRestart,
|
||||
log: {
|
||||
info: (msg) => logReload.info(msg),
|
||||
warn: (msg) => logReload.warn(msg),
|
||||
error: (msg) => logReload.error(msg),
|
||||
},
|
||||
watchPath: CONFIG_PATH_CLAWDIS,
|
||||
});
|
||||
|
||||
return {
|
||||
close: async (opts) => {
|
||||
const reasonRaw =
|
||||
@@ -1664,6 +1832,7 @@ export async function startGatewayServer(
|
||||
}
|
||||
}
|
||||
clients.clear();
|
||||
await configReloader.stop().catch(() => {});
|
||||
if (browserControl) {
|
||||
await browserControl.stop().catch(() => {});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user