2131 lines
65 KiB
TypeScript
2131 lines
65 KiB
TypeScript
import { randomUUID } from "node:crypto";
|
|
import type { Server as HttpServer } from "node:http";
|
|
import os from "node:os";
|
|
import chalk from "chalk";
|
|
import { type WebSocket, WebSocketServer } from "ws";
|
|
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js";
|
|
import {
|
|
loadModelCatalog,
|
|
type ModelCatalogEntry,
|
|
resetModelCatalogCacheForTest,
|
|
} from "../agents/model-catalog.js";
|
|
import { resolveConfiguredModelRef } from "../agents/model-selection.js";
|
|
import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js";
|
|
import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js";
|
|
import {
|
|
type CanvasHostHandler,
|
|
type CanvasHostServer,
|
|
createCanvasHostHandler,
|
|
startCanvasHost,
|
|
} from "../canvas-host/server.js";
|
|
import { createDefaultDeps } from "../cli/deps.js";
|
|
import { agentCommand } from "../commands/agent.js";
|
|
import { getHealthSnapshot, type HealthSummary } from "../commands/health.js";
|
|
import {
|
|
CONFIG_PATH_CLAWDBOT,
|
|
isNixMode,
|
|
loadConfig,
|
|
migrateLegacyConfig,
|
|
readConfigFileSnapshot,
|
|
STATE_DIR_CLAWDBOT,
|
|
writeConfigFile,
|
|
} from "../config/config.js";
|
|
import {
|
|
deriveDefaultBridgePort,
|
|
deriveDefaultCanvasHostPort,
|
|
} from "../config/port-defaults.js";
|
|
import {
|
|
loadSessionStore,
|
|
resolveMainSessionKey,
|
|
resolveStorePath,
|
|
} from "../config/sessions.js";
|
|
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
|
|
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
|
|
import { CronService } from "../cron/service.js";
|
|
import { resolveCronStorePath } from "../cron/store.js";
|
|
import type { CronJob } from "../cron/types.js";
|
|
import { startGmailWatcher, stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
|
import {
|
|
clearAgentRunContext,
|
|
getAgentRunContext,
|
|
onAgentEvent,
|
|
registerAgentRunContext,
|
|
} from "../infra/agent-events.js";
|
|
import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js";
|
|
import { startNodeBridgeServer } from "../infra/bridge/server.js";
|
|
import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js";
|
|
import { GatewayLockError } from "../infra/gateway-lock.js";
|
|
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
|
|
import { startHeartbeatRunner } from "../infra/heartbeat-runner.js";
|
|
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
|
import { getMachineDisplayName } from "../infra/machine-name.js";
|
|
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
|
|
import { ensureClawdbotCliOnPath } from "../infra/path-env.js";
|
|
import {
|
|
consumeRestartSentinel,
|
|
formatRestartSentinelMessage,
|
|
summarizeRestartSentinel,
|
|
} from "../infra/restart-sentinel.js";
|
|
import { autoMigrateLegacyState } from "../infra/state-migrations.js";
|
|
import { enqueueSystemEvent } from "../infra/system-events.js";
|
|
import {
|
|
listSystemPresence,
|
|
upsertPresence,
|
|
} from "../infra/system-presence.js";
|
|
import {
|
|
pickPrimaryTailnetIPv4,
|
|
pickPrimaryTailnetIPv6,
|
|
} from "../infra/tailnet.js";
|
|
import {
|
|
disableTailscaleFunnel,
|
|
disableTailscaleServe,
|
|
enableTailscaleFunnel,
|
|
enableTailscaleServe,
|
|
getTailnetHostname,
|
|
} from "../infra/tailscale.js";
|
|
import { loadVoiceWakeConfig } from "../infra/voicewake.js";
|
|
import {
|
|
WIDE_AREA_DISCOVERY_DOMAIN,
|
|
writeWideAreaBridgeZone,
|
|
} from "../infra/widearea-dns.js";
|
|
import { rawDataToString } from "../infra/ws.js";
|
|
import {
|
|
createSubsystemLogger,
|
|
getChildLogger,
|
|
getResolvedLoggerSettings,
|
|
runtimeForLogger,
|
|
} from "../logging.js";
|
|
import { setCommandLaneConcurrency } from "../process/command-queue.js";
|
|
import { defaultRuntime } from "../runtime.js";
|
|
import { runOnboardingWizard } from "../wizard/onboarding.js";
|
|
import type { WizardSession } from "../wizard/session.js";
|
|
import {
|
|
assertGatewayAuthConfigured,
|
|
authorizeGatewayConnect,
|
|
type ResolvedGatewayAuth,
|
|
resolveGatewayAuth,
|
|
} from "./auth.js";
|
|
import {
|
|
type GatewayReloadPlan,
|
|
type ProviderKind,
|
|
startGatewayConfigReloader,
|
|
} from "./config-reload.js";
|
|
import { normalizeControlUiBasePath } from "./control-ui.js";
|
|
import { type HookMessageProvider, resolveHooksConfig } from "./hooks.js";
|
|
import {
|
|
isLoopbackAddress,
|
|
isLoopbackHost,
|
|
resolveGatewayBindHost,
|
|
} from "./net.js";
|
|
import {
|
|
type ConnectParams,
|
|
ErrorCodes,
|
|
type ErrorShape,
|
|
errorShape,
|
|
formatValidationErrors,
|
|
PROTOCOL_VERSION,
|
|
type RequestFrame,
|
|
type Snapshot,
|
|
validateConnectParams,
|
|
validateRequestFrame,
|
|
} from "./protocol/index.js";
|
|
import { createBridgeHandlers } from "./server-bridge.js";
|
|
import {
|
|
type BridgeListConnectedFn,
|
|
type BridgeSendEventFn,
|
|
createBridgeSubscriptionManager,
|
|
} from "./server-bridge-subscriptions.js";
|
|
import { startBrowserControlServerIfEnabled } from "./server-browser.js";
|
|
import { createAgentEventHandler, createChatRunState } from "./server-chat.js";
|
|
import {
|
|
DEDUPE_MAX,
|
|
DEDUPE_TTL_MS,
|
|
HANDSHAKE_TIMEOUT_MS,
|
|
HEALTH_REFRESH_INTERVAL_MS,
|
|
MAX_BUFFERED_BYTES,
|
|
MAX_PAYLOAD_BYTES,
|
|
TICK_INTERVAL_MS,
|
|
} from "./server-constants.js";
|
|
import {
|
|
formatBonjourInstanceName,
|
|
resolveBonjourCliPath,
|
|
resolveTailnetDnsHint,
|
|
} from "./server-discovery.js";
|
|
import {
|
|
attachGatewayUpgradeHandler,
|
|
createGatewayHttpServer,
|
|
createHooksRequestHandler,
|
|
} from "./server-http.js";
|
|
import { handleGatewayRequest } from "./server-methods.js";
|
|
import { createProviderManager } from "./server-providers.js";
|
|
import type { DedupeEntry } from "./server-shared.js";
|
|
import { formatError } from "./server-utils.js";
|
|
import { loadSessionEntry } from "./session-utils.js";
|
|
import { formatForLog, logWs, summarizeAgentEventForWsLog } from "./ws-log.js";
|
|
|
|
ensureClawdbotCliOnPath();
|
|
|
|
const log = createSubsystemLogger("gateway");
|
|
const logCanvas = log.child("canvas");
|
|
const logBridge = log.child("bridge");
|
|
const logDiscovery = log.child("discovery");
|
|
const logTailscale = log.child("tailscale");
|
|
const logProviders = log.child("providers");
|
|
const logBrowser = log.child("browser");
|
|
const logHealth = log.child("health");
|
|
const logCron = log.child("cron");
|
|
const logReload = log.child("reload");
|
|
const logHooks = log.child("hooks");
|
|
const logWsControl = log.child("ws");
|
|
const logWhatsApp = logProviders.child("whatsapp");
|
|
const logTelegram = logProviders.child("telegram");
|
|
const logDiscord = logProviders.child("discord");
|
|
const logSlack = logProviders.child("slack");
|
|
const logSignal = logProviders.child("signal");
|
|
const logIMessage = logProviders.child("imessage");
|
|
const logMSTeams = logProviders.child("msteams");
|
|
const canvasRuntime = runtimeForLogger(logCanvas);
|
|
const whatsappRuntimeEnv = runtimeForLogger(logWhatsApp);
|
|
const telegramRuntimeEnv = runtimeForLogger(logTelegram);
|
|
const discordRuntimeEnv = runtimeForLogger(logDiscord);
|
|
const slackRuntimeEnv = runtimeForLogger(logSlack);
|
|
const signalRuntimeEnv = runtimeForLogger(logSignal);
|
|
const imessageRuntimeEnv = runtimeForLogger(logIMessage);
|
|
const msteamsRuntimeEnv = runtimeForLogger(logMSTeams);
|
|
|
|
type GatewayModelChoice = ModelCatalogEntry;
|
|
|
|
// Test-only escape hatch: model catalog is cached at module scope for the
|
|
// process lifetime, which is fine for the real gateway daemon, but makes
|
|
// isolated unit tests harder. Keep this intentionally obscure.
|
|
export function __resetModelCatalogCacheForTest() {
|
|
resetModelCatalogCacheForTest();
|
|
}
|
|
|
|
async function loadGatewayModelCatalog(): Promise<GatewayModelChoice[]> {
|
|
return await loadModelCatalog({ config: loadConfig() });
|
|
}
|
|
|
|
type Client = {
|
|
socket: WebSocket;
|
|
connect: ConnectParams;
|
|
connId: string;
|
|
presenceKey?: string;
|
|
};
|
|
|
|
const METHODS = [
|
|
"health",
|
|
"logs.tail",
|
|
"providers.status",
|
|
"status",
|
|
"usage.status",
|
|
"config.get",
|
|
"config.set",
|
|
"config.apply",
|
|
"config.schema",
|
|
"wizard.start",
|
|
"wizard.next",
|
|
"wizard.cancel",
|
|
"wizard.status",
|
|
"talk.mode",
|
|
"models.list",
|
|
"agents.list",
|
|
"skills.status",
|
|
"skills.install",
|
|
"skills.update",
|
|
"update.run",
|
|
"voicewake.get",
|
|
"voicewake.set",
|
|
"sessions.list",
|
|
"sessions.patch",
|
|
"sessions.reset",
|
|
"sessions.delete",
|
|
"sessions.compact",
|
|
"last-heartbeat",
|
|
"set-heartbeats",
|
|
"wake",
|
|
"node.pair.request",
|
|
"node.pair.list",
|
|
"node.pair.approve",
|
|
"node.pair.reject",
|
|
"node.pair.verify",
|
|
"node.rename",
|
|
"node.list",
|
|
"node.describe",
|
|
"node.invoke",
|
|
"cron.list",
|
|
"cron.status",
|
|
"cron.add",
|
|
"cron.update",
|
|
"cron.remove",
|
|
"cron.run",
|
|
"cron.runs",
|
|
"system-presence",
|
|
"system-event",
|
|
"send",
|
|
"agent",
|
|
"agent.wait",
|
|
"web.login.start",
|
|
"web.login.wait",
|
|
"web.logout",
|
|
"telegram.logout",
|
|
// WebChat WebSocket-native chat methods
|
|
"chat.history",
|
|
"chat.abort",
|
|
"chat.send",
|
|
];
|
|
|
|
const EVENTS = [
|
|
"agent",
|
|
"chat",
|
|
"presence",
|
|
"tick",
|
|
"talk.mode",
|
|
"shutdown",
|
|
"health",
|
|
"heartbeat",
|
|
"cron",
|
|
"node.pair.requested",
|
|
"node.pair.resolved",
|
|
"voicewake.changed",
|
|
];
|
|
|
|
export type GatewayServer = {
|
|
close: (opts?: {
|
|
reason?: string;
|
|
restartExpectedMs?: number | null;
|
|
}) => Promise<void>;
|
|
};
|
|
|
|
export type GatewayServerOptions = {
|
|
/**
|
|
* Bind address policy for the Gateway WebSocket/HTTP server.
|
|
* - loopback: 127.0.0.1
|
|
* - lan: 0.0.0.0
|
|
* - tailnet: bind only to the Tailscale IPv4 address (100.64.0.0/10)
|
|
* - auto: prefer tailnet, else LAN
|
|
*/
|
|
bind?: import("../config/config.js").BridgeBindMode;
|
|
/**
|
|
* Advanced override for the bind host, bypassing bind resolution.
|
|
* Prefer `bind` unless you really need a specific address.
|
|
*/
|
|
host?: string;
|
|
/**
|
|
* If false, do not serve the browser Control UI.
|
|
* Default: config `gateway.controlUi.enabled` (or true when absent).
|
|
*/
|
|
controlUiEnabled?: boolean;
|
|
/**
|
|
* Override gateway auth configuration (merges with config).
|
|
*/
|
|
auth?: import("../config/config.js").GatewayAuthConfig;
|
|
/**
|
|
* Override gateway Tailscale exposure configuration (merges with config).
|
|
*/
|
|
tailscale?: import("../config/config.js").GatewayTailscaleConfig;
|
|
/**
|
|
* Test-only: allow canvas host startup even when NODE_ENV/VITEST would disable it.
|
|
*/
|
|
allowCanvasHostInTests?: boolean;
|
|
/**
|
|
* Test-only: override the onboarding wizard runner.
|
|
*/
|
|
wizardRunner?: (
|
|
opts: import("../commands/onboard-types.js").OnboardOptions,
|
|
runtime: import("../runtime.js").RuntimeEnv,
|
|
prompter: import("../wizard/prompts.js").WizardPrompter,
|
|
) => Promise<void>;
|
|
};
|
|
|
|
let presenceVersion = 1;
|
|
let healthVersion = 1;
|
|
let healthCache: HealthSummary | null = null;
|
|
let healthRefresh: Promise<HealthSummary> | null = null;
|
|
let broadcastHealthUpdate: ((snap: HealthSummary) => void) | null = null;
|
|
|
|
function buildSnapshot(): Snapshot {
|
|
const presence = listSystemPresence();
|
|
const uptimeMs = Math.round(process.uptime() * 1000);
|
|
// Health is async; caller should await getHealthSnapshot and replace later if needed.
|
|
const emptyHealth: unknown = {};
|
|
return {
|
|
presence,
|
|
health: emptyHealth,
|
|
stateVersion: { presence: presenceVersion, health: healthVersion },
|
|
uptimeMs,
|
|
// Surface resolved paths so UIs can display the true config location.
|
|
configPath: CONFIG_PATH_CLAWDBOT,
|
|
stateDir: STATE_DIR_CLAWDBOT,
|
|
};
|
|
}
|
|
|
|
async function refreshHealthSnapshot(_opts?: { probe?: boolean }) {
|
|
if (!healthRefresh) {
|
|
healthRefresh = (async () => {
|
|
const snap = await getHealthSnapshot(undefined);
|
|
healthCache = snap;
|
|
healthVersion += 1;
|
|
if (broadcastHealthUpdate) {
|
|
broadcastHealthUpdate(snap);
|
|
}
|
|
return snap;
|
|
})().finally(() => {
|
|
healthRefresh = null;
|
|
});
|
|
}
|
|
return healthRefresh;
|
|
}
|
|
|
|
export async function startGatewayServer(
|
|
port = 18789,
|
|
opts: GatewayServerOptions = {},
|
|
): Promise<GatewayServer> {
|
|
// Ensure all default port derivations (browser/bridge/canvas) see the actual runtime port.
|
|
process.env.CLAWDBOT_GATEWAY_PORT = String(port);
|
|
|
|
const configSnapshot = await readConfigFileSnapshot();
|
|
if (configSnapshot.legacyIssues.length > 0) {
|
|
if (isNixMode) {
|
|
throw new Error(
|
|
"Legacy config entries detected while running in Nix mode. Update your Nix config to the latest schema and restart.",
|
|
);
|
|
}
|
|
const { config: migrated, changes } = migrateLegacyConfig(
|
|
configSnapshot.parsed,
|
|
);
|
|
if (!migrated) {
|
|
throw new Error(
|
|
'Legacy config entries detected but auto-migration failed. Run "clawdbot doctor" to migrate.',
|
|
);
|
|
}
|
|
await writeConfigFile(migrated);
|
|
if (changes.length > 0) {
|
|
log.info(
|
|
`gateway: migrated legacy config entries:\n${changes
|
|
.map((entry) => `- ${entry}`)
|
|
.join("\n")}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
const cfgAtStart = loadConfig();
|
|
await autoMigrateLegacyState({ cfg: cfgAtStart, log });
|
|
const bindMode = opts.bind ?? cfgAtStart.gateway?.bind ?? "loopback";
|
|
const bindHost = opts.host ?? resolveGatewayBindHost(bindMode);
|
|
if (!bindHost) {
|
|
throw new Error(
|
|
"gateway bind is tailnet, but no tailnet interface was found; refusing to start gateway",
|
|
);
|
|
}
|
|
const controlUiEnabled =
|
|
opts.controlUiEnabled ?? cfgAtStart.gateway?.controlUi?.enabled ?? true;
|
|
const controlUiBasePath = normalizeControlUiBasePath(
|
|
cfgAtStart.gateway?.controlUi?.basePath,
|
|
);
|
|
const authBase = cfgAtStart.gateway?.auth ?? {};
|
|
const authOverrides = opts.auth ?? {};
|
|
const authConfig = {
|
|
...authBase,
|
|
...authOverrides,
|
|
};
|
|
const tailscaleBase = cfgAtStart.gateway?.tailscale ?? {};
|
|
const tailscaleOverrides = opts.tailscale ?? {};
|
|
const tailscaleConfig = {
|
|
...tailscaleBase,
|
|
...tailscaleOverrides,
|
|
};
|
|
const tailscaleMode = tailscaleConfig.mode ?? "off";
|
|
const resolvedAuth = resolveGatewayAuth({
|
|
authConfig,
|
|
env: process.env,
|
|
tailscaleMode,
|
|
});
|
|
const authMode: ResolvedGatewayAuth["mode"] = resolvedAuth.mode;
|
|
let hooksConfig = resolveHooksConfig(cfgAtStart);
|
|
const canvasHostEnabled =
|
|
process.env.CLAWDBOT_SKIP_CANVAS_HOST !== "1" &&
|
|
cfgAtStart.canvasHost?.enabled !== false;
|
|
assertGatewayAuthConfigured(resolvedAuth);
|
|
if (tailscaleMode === "funnel" && authMode !== "password") {
|
|
throw new Error(
|
|
"tailscale funnel requires gateway auth mode=password (set gateway.auth.password or CLAWDBOT_GATEWAY_PASSWORD)",
|
|
);
|
|
}
|
|
if (tailscaleMode !== "off" && !isLoopbackHost(bindHost)) {
|
|
throw new Error(
|
|
"tailscale serve/funnel requires gateway bind=loopback (127.0.0.1)",
|
|
);
|
|
}
|
|
if (!isLoopbackHost(bindHost) && authMode === "none") {
|
|
throw new Error(
|
|
`refusing to bind gateway to ${bindHost}:${port} without auth (set gateway.auth.token or CLAWDBOT_GATEWAY_TOKEN, or pass --token)`,
|
|
);
|
|
}
|
|
|
|
const wizardRunner = opts.wizardRunner ?? runOnboardingWizard;
|
|
const wizardSessions = new Map<string, WizardSession>();
|
|
|
|
const findRunningWizard = (): string | null => {
|
|
for (const [id, session] of wizardSessions) {
|
|
if (session.getStatus() === "running") return id;
|
|
}
|
|
return null;
|
|
};
|
|
|
|
const purgeWizardSession = (id: string) => {
|
|
const session = wizardSessions.get(id);
|
|
if (!session) return;
|
|
if (session.getStatus() === "running") return;
|
|
wizardSessions.delete(id);
|
|
};
|
|
|
|
const dispatchWakeHook = (value: {
|
|
text: string;
|
|
mode: "now" | "next-heartbeat";
|
|
}) => {
|
|
enqueueSystemEvent(value.text);
|
|
if (value.mode === "now") {
|
|
requestHeartbeatNow({ reason: "hook:wake" });
|
|
}
|
|
};
|
|
|
|
const dispatchAgentHook = (value: {
|
|
message: string;
|
|
name: string;
|
|
wakeMode: "now" | "next-heartbeat";
|
|
sessionKey: string;
|
|
deliver: boolean;
|
|
provider: HookMessageProvider;
|
|
to?: string;
|
|
model?: string;
|
|
thinking?: string;
|
|
timeoutSeconds?: number;
|
|
}) => {
|
|
const sessionKey = value.sessionKey.trim()
|
|
? value.sessionKey.trim()
|
|
: `hook:${randomUUID()}`;
|
|
const jobId = randomUUID();
|
|
const now = Date.now();
|
|
const job: CronJob = {
|
|
id: jobId,
|
|
name: value.name,
|
|
enabled: true,
|
|
createdAtMs: now,
|
|
updatedAtMs: now,
|
|
schedule: { kind: "at", atMs: now },
|
|
sessionTarget: "isolated",
|
|
wakeMode: value.wakeMode,
|
|
payload: {
|
|
kind: "agentTurn",
|
|
message: value.message,
|
|
model: value.model,
|
|
thinking: value.thinking,
|
|
timeoutSeconds: value.timeoutSeconds,
|
|
deliver: value.deliver,
|
|
provider: value.provider,
|
|
to: value.to,
|
|
},
|
|
state: { nextRunAtMs: now },
|
|
};
|
|
|
|
const runId = randomUUID();
|
|
void (async () => {
|
|
try {
|
|
const cfg = loadConfig();
|
|
const result = await runCronIsolatedAgentTurn({
|
|
cfg,
|
|
deps,
|
|
job,
|
|
message: value.message,
|
|
sessionKey,
|
|
lane: "cron",
|
|
});
|
|
const summary =
|
|
result.summary?.trim() || result.error?.trim() || result.status;
|
|
const prefix =
|
|
result.status === "ok"
|
|
? `Hook ${value.name}`
|
|
: `Hook ${value.name} (${result.status})`;
|
|
enqueueSystemEvent(`${prefix}: ${summary}`.trim());
|
|
if (value.wakeMode === "now") {
|
|
requestHeartbeatNow({ reason: `hook:${jobId}` });
|
|
}
|
|
} catch (err) {
|
|
logHooks.warn(`hook agent failed: ${String(err)}`);
|
|
enqueueSystemEvent(`Hook ${value.name} (error): ${String(err)}`);
|
|
if (value.wakeMode === "now") {
|
|
requestHeartbeatNow({ reason: `hook:${jobId}:error` });
|
|
}
|
|
}
|
|
})();
|
|
|
|
return runId;
|
|
};
|
|
let canvasHost: CanvasHostHandler | null = null;
|
|
let canvasHostServer: CanvasHostServer | null = null;
|
|
if (canvasHostEnabled) {
|
|
try {
|
|
const handler = await createCanvasHostHandler({
|
|
runtime: canvasRuntime,
|
|
rootDir: cfgAtStart.canvasHost?.root,
|
|
basePath: CANVAS_HOST_PATH,
|
|
allowInTests: opts.allowCanvasHostInTests,
|
|
liveReload: cfgAtStart.canvasHost?.liveReload,
|
|
});
|
|
if (handler.rootDir) {
|
|
canvasHost = handler;
|
|
logCanvas.info(
|
|
`canvas host mounted at http://${bindHost}:${port}${CANVAS_HOST_PATH}/ (root ${handler.rootDir})`,
|
|
);
|
|
}
|
|
} catch (err) {
|
|
logCanvas.warn(`canvas host failed to start: ${String(err)}`);
|
|
}
|
|
}
|
|
|
|
const handleHooksRequest = createHooksRequestHandler({
|
|
getHooksConfig: () => hooksConfig,
|
|
bindHost,
|
|
port,
|
|
logHooks,
|
|
dispatchAgentHook,
|
|
dispatchWakeHook,
|
|
});
|
|
|
|
// Try to use Tailscale funnel URL for serve feature (public URLs)
|
|
let serveBaseUrl: string;
|
|
try {
|
|
const tailnetHost = await getTailnetHostname();
|
|
serveBaseUrl = `https://${tailnetHost}`;
|
|
log.info(`Serve base URL (Tailscale): ${serveBaseUrl}`);
|
|
} catch {
|
|
const serveHost = bindHost === "0.0.0.0" ? "localhost" : bindHost;
|
|
serveBaseUrl = `http://${serveHost}:${port}`;
|
|
log.info(`Serve base URL (local): ${serveBaseUrl}`);
|
|
}
|
|
const httpServer: HttpServer = createGatewayHttpServer({
|
|
canvasHost,
|
|
controlUiEnabled,
|
|
controlUiBasePath,
|
|
handleHooksRequest,
|
|
serveBaseUrl,
|
|
});
|
|
let bonjourStop: (() => Promise<void>) | null = null;
|
|
let bridge: Awaited<ReturnType<typeof startNodeBridgeServer>> | null = null;
|
|
const bridgeSubscriptions = createBridgeSubscriptionManager();
|
|
|
|
const isMobilePlatform = (platform: unknown): boolean => {
|
|
const p = typeof platform === "string" ? platform.trim().toLowerCase() : "";
|
|
if (!p) return false;
|
|
return (
|
|
p.startsWith("ios") || p.startsWith("ipados") || p.startsWith("android")
|
|
);
|
|
};
|
|
|
|
const hasConnectedMobileNode = (): boolean => {
|
|
const connected = bridge?.listConnected?.() ?? [];
|
|
return connected.some((n) => isMobilePlatform(n.platform));
|
|
};
|
|
try {
|
|
await new Promise<void>((resolve, reject) => {
|
|
const onError = (err: NodeJS.ErrnoException) => {
|
|
httpServer.off("listening", onListening);
|
|
reject(err);
|
|
};
|
|
const onListening = () => {
|
|
httpServer.off("error", onError);
|
|
resolve();
|
|
};
|
|
httpServer.once("error", onError);
|
|
httpServer.once("listening", onListening);
|
|
httpServer.listen(port, bindHost);
|
|
});
|
|
} catch (err) {
|
|
const code = (err as NodeJS.ErrnoException).code;
|
|
if (code === "EADDRINUSE") {
|
|
throw new GatewayLockError(
|
|
`another gateway instance is already listening on ws://${bindHost}:${port}`,
|
|
err,
|
|
);
|
|
}
|
|
throw new GatewayLockError(
|
|
`failed to bind gateway socket on ws://${bindHost}:${port}: ${String(err)}`,
|
|
err,
|
|
);
|
|
}
|
|
|
|
const wss = new WebSocketServer({
|
|
noServer: true,
|
|
maxPayload: MAX_PAYLOAD_BYTES,
|
|
});
|
|
attachGatewayUpgradeHandler({ httpServer, wss, canvasHost });
|
|
const clients = new Set<Client>();
|
|
let seq = 0;
|
|
// Track per-run sequence to detect out-of-order/lost agent events.
|
|
const agentRunSeq = new Map<string, number>();
|
|
const dedupe = new Map<string, DedupeEntry>();
|
|
const chatRunState = createChatRunState();
|
|
const chatRunRegistry = chatRunState.registry;
|
|
const chatRunBuffers = chatRunState.buffers;
|
|
const chatDeltaSentAt = chatRunState.deltaSentAt;
|
|
const addChatRun = chatRunRegistry.add;
|
|
const removeChatRun = chatRunRegistry.remove;
|
|
const resolveSessionKeyForRun = (runId: string) => {
|
|
const cached = getAgentRunContext(runId)?.sessionKey;
|
|
if (cached) return cached;
|
|
const cfg = loadConfig();
|
|
const storePath = resolveStorePath(cfg.session?.store);
|
|
const store = loadSessionStore(storePath);
|
|
const found = Object.entries(store).find(
|
|
([, entry]) => entry?.sessionId === runId,
|
|
);
|
|
const sessionKey = found?.[0];
|
|
if (sessionKey) {
|
|
registerAgentRunContext(runId, { sessionKey });
|
|
}
|
|
return sessionKey;
|
|
};
|
|
const chatAbortControllers = new Map<
|
|
string,
|
|
{ controller: AbortController; sessionId: string; sessionKey: string }
|
|
>();
|
|
setCommandLaneConcurrency("cron", cfgAtStart.cron?.maxConcurrentRuns ?? 1);
|
|
setCommandLaneConcurrency(
|
|
"main",
|
|
cfgAtStart.agents?.defaults?.maxConcurrent ?? 1,
|
|
);
|
|
setCommandLaneConcurrency(
|
|
"subagent",
|
|
cfgAtStart.agents?.defaults?.subagents?.maxConcurrent ?? 1,
|
|
);
|
|
|
|
const cronLogger = getChildLogger({
|
|
module: "cron",
|
|
});
|
|
const deps = createDefaultDeps();
|
|
const buildCronService = (cfg: ReturnType<typeof loadConfig>) => {
|
|
const storePath = resolveCronStorePath(cfg.cron?.store);
|
|
const cronEnabled =
|
|
process.env.CLAWDBOT_SKIP_CRON !== "1" && cfg.cron?.enabled !== false;
|
|
const cron = new CronService({
|
|
storePath,
|
|
cronEnabled,
|
|
enqueueSystemEvent: (text) => {
|
|
enqueueSystemEvent(text, { sessionKey: resolveMainSessionKey(cfg) });
|
|
},
|
|
requestHeartbeatNow,
|
|
runIsolatedAgentJob: async ({ job, message }) => {
|
|
const runtimeConfig = loadConfig();
|
|
return await runCronIsolatedAgentTurn({
|
|
cfg: runtimeConfig,
|
|
deps,
|
|
job,
|
|
message,
|
|
sessionKey: `cron:${job.id}`,
|
|
lane: "cron",
|
|
});
|
|
},
|
|
log: getChildLogger({ module: "cron", storePath }),
|
|
onEvent: (evt) => {
|
|
broadcast("cron", evt, { dropIfSlow: true });
|
|
if (evt.action === "finished") {
|
|
const logPath = resolveCronRunLogPath({
|
|
storePath,
|
|
jobId: evt.jobId,
|
|
});
|
|
void appendCronRunLog(logPath, {
|
|
ts: Date.now(),
|
|
jobId: evt.jobId,
|
|
action: "finished",
|
|
status: evt.status,
|
|
error: evt.error,
|
|
summary: evt.summary,
|
|
runAtMs: evt.runAtMs,
|
|
durationMs: evt.durationMs,
|
|
nextRunAtMs: evt.nextRunAtMs,
|
|
}).catch((err) => {
|
|
cronLogger.warn(
|
|
{ err: String(err), logPath },
|
|
"cron: run log append failed",
|
|
);
|
|
});
|
|
}
|
|
},
|
|
});
|
|
return { cron, storePath, cronEnabled };
|
|
};
|
|
|
|
let { cron, storePath: cronStorePath } = buildCronService(cfgAtStart);
|
|
|
|
const providerManager = createProviderManager({
|
|
loadConfig,
|
|
logWhatsApp,
|
|
logTelegram,
|
|
logDiscord,
|
|
logSlack,
|
|
logSignal,
|
|
logIMessage,
|
|
logMSTeams,
|
|
whatsappRuntimeEnv,
|
|
telegramRuntimeEnv,
|
|
discordRuntimeEnv,
|
|
slackRuntimeEnv,
|
|
signalRuntimeEnv,
|
|
imessageRuntimeEnv,
|
|
msteamsRuntimeEnv,
|
|
});
|
|
const {
|
|
getRuntimeSnapshot,
|
|
startProviders,
|
|
startWhatsAppProvider,
|
|
startTelegramProvider,
|
|
startDiscordProvider,
|
|
startSlackProvider,
|
|
startSignalProvider,
|
|
startIMessageProvider,
|
|
startMSTeamsProvider,
|
|
stopWhatsAppProvider,
|
|
stopTelegramProvider,
|
|
stopDiscordProvider,
|
|
stopSlackProvider,
|
|
stopSignalProvider,
|
|
stopIMessageProvider,
|
|
stopMSTeamsProvider,
|
|
markWhatsAppLoggedOut,
|
|
} = providerManager;
|
|
|
|
const broadcast = (
|
|
event: string,
|
|
payload: unknown,
|
|
opts?: {
|
|
dropIfSlow?: boolean;
|
|
stateVersion?: { presence?: number; health?: number };
|
|
},
|
|
) => {
|
|
const eventSeq = ++seq;
|
|
const frame = JSON.stringify({
|
|
type: "event",
|
|
event,
|
|
payload,
|
|
seq: eventSeq,
|
|
stateVersion: opts?.stateVersion,
|
|
});
|
|
const logMeta: Record<string, unknown> = {
|
|
event,
|
|
seq: eventSeq,
|
|
clients: clients.size,
|
|
dropIfSlow: opts?.dropIfSlow,
|
|
presenceVersion: opts?.stateVersion?.presence,
|
|
healthVersion: opts?.stateVersion?.health,
|
|
};
|
|
if (event === "agent") {
|
|
Object.assign(logMeta, summarizeAgentEventForWsLog(payload));
|
|
}
|
|
logWs("out", "event", logMeta);
|
|
for (const c of clients) {
|
|
const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES;
|
|
if (slow && opts?.dropIfSlow) continue;
|
|
if (slow) {
|
|
try {
|
|
c.socket.close(1008, "slow consumer");
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
continue;
|
|
}
|
|
try {
|
|
c.socket.send(frame);
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
};
|
|
|
|
const wideAreaDiscoveryEnabled =
|
|
cfgAtStart.discovery?.wideArea?.enabled === true;
|
|
|
|
const bridgeEnabled = (() => {
|
|
if (cfgAtStart.bridge?.enabled !== undefined)
|
|
return cfgAtStart.bridge.enabled === true;
|
|
return process.env.CLAWDBOT_BRIDGE_ENABLED !== "0";
|
|
})();
|
|
|
|
const bridgePort = (() => {
|
|
if (
|
|
typeof cfgAtStart.bridge?.port === "number" &&
|
|
cfgAtStart.bridge.port > 0
|
|
) {
|
|
return cfgAtStart.bridge.port;
|
|
}
|
|
if (process.env.CLAWDBOT_BRIDGE_PORT !== undefined) {
|
|
const parsed = Number.parseInt(process.env.CLAWDBOT_BRIDGE_PORT, 10);
|
|
return Number.isFinite(parsed) && parsed > 0
|
|
? parsed
|
|
: deriveDefaultBridgePort(port);
|
|
}
|
|
return deriveDefaultBridgePort(port);
|
|
})();
|
|
|
|
const bridgeHost = (() => {
|
|
// Back-compat: allow an env var override when no bind policy is configured.
|
|
if (cfgAtStart.bridge?.bind === undefined) {
|
|
const env = process.env.CLAWDBOT_BRIDGE_HOST?.trim();
|
|
if (env) return env;
|
|
}
|
|
|
|
const bind =
|
|
cfgAtStart.bridge?.bind ?? (wideAreaDiscoveryEnabled ? "tailnet" : "lan");
|
|
if (bind === "loopback") return "127.0.0.1";
|
|
if (bind === "lan") return "0.0.0.0";
|
|
|
|
const tailnetIPv4 = pickPrimaryTailnetIPv4();
|
|
const tailnetIPv6 = pickPrimaryTailnetIPv6();
|
|
if (bind === "tailnet") {
|
|
return tailnetIPv4 ?? tailnetIPv6 ?? null;
|
|
}
|
|
if (bind === "auto") {
|
|
return tailnetIPv4 ?? tailnetIPv6 ?? "0.0.0.0";
|
|
}
|
|
return "0.0.0.0";
|
|
})();
|
|
|
|
const canvasHostPort = (() => {
|
|
if (process.env.CLAWDBOT_CANVAS_HOST_PORT !== undefined) {
|
|
const parsed = Number.parseInt(process.env.CLAWDBOT_CANVAS_HOST_PORT, 10);
|
|
if (Number.isFinite(parsed) && parsed > 0) return parsed;
|
|
return deriveDefaultCanvasHostPort(port);
|
|
}
|
|
const configured = cfgAtStart.canvasHost?.port;
|
|
if (typeof configured === "number" && configured > 0) return configured;
|
|
return deriveDefaultCanvasHostPort(port);
|
|
})();
|
|
|
|
if (canvasHostEnabled && bridgeEnabled && bridgeHost) {
|
|
try {
|
|
const started = await startCanvasHost({
|
|
runtime: canvasRuntime,
|
|
rootDir: cfgAtStart.canvasHost?.root,
|
|
port: canvasHostPort,
|
|
listenHost: bridgeHost,
|
|
allowInTests: opts.allowCanvasHostInTests,
|
|
liveReload: cfgAtStart.canvasHost?.liveReload,
|
|
handler: canvasHost ?? undefined,
|
|
ownsHandler: canvasHost ? false : undefined,
|
|
});
|
|
if (started.port > 0) {
|
|
canvasHostServer = started;
|
|
}
|
|
} catch (err) {
|
|
logCanvas.warn(
|
|
`failed to start on ${bridgeHost}:${canvasHostPort}: ${String(err)}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
const bridgeSubscribe = bridgeSubscriptions.subscribe;
|
|
const bridgeUnsubscribe = bridgeSubscriptions.unsubscribe;
|
|
const bridgeUnsubscribeAll = bridgeSubscriptions.unsubscribeAll;
|
|
const bridgeSendEvent: BridgeSendEventFn = (opts) => {
|
|
bridge?.sendEvent(opts);
|
|
};
|
|
const bridgeListConnected: BridgeListConnectedFn = () =>
|
|
bridge?.listConnected() ?? [];
|
|
const bridgeSendToSession = (
|
|
sessionKey: string,
|
|
event: string,
|
|
payload: unknown,
|
|
) =>
|
|
bridgeSubscriptions.sendToSession(
|
|
sessionKey,
|
|
event,
|
|
payload,
|
|
bridgeSendEvent,
|
|
);
|
|
const bridgeSendToAllSubscribed = (event: string, payload: unknown) =>
|
|
bridgeSubscriptions.sendToAllSubscribed(event, payload, bridgeSendEvent);
|
|
const bridgeSendToAllConnected = (event: string, payload: unknown) =>
|
|
bridgeSubscriptions.sendToAllConnected(
|
|
event,
|
|
payload,
|
|
bridgeListConnected,
|
|
bridgeSendEvent,
|
|
);
|
|
|
|
const broadcastVoiceWakeChanged = (triggers: string[]) => {
|
|
const payload = { triggers };
|
|
broadcast("voicewake.changed", payload, { dropIfSlow: true });
|
|
bridgeSendToAllConnected("voicewake.changed", payload);
|
|
};
|
|
|
|
const { handleBridgeRequest, handleBridgeEvent } = createBridgeHandlers({
|
|
deps,
|
|
broadcast,
|
|
bridgeSendToSession,
|
|
bridgeSubscribe,
|
|
bridgeUnsubscribe,
|
|
broadcastVoiceWakeChanged,
|
|
addChatRun,
|
|
removeChatRun,
|
|
chatAbortControllers,
|
|
chatRunBuffers,
|
|
chatDeltaSentAt,
|
|
dedupe,
|
|
agentRunSeq,
|
|
getHealthCache: () => healthCache,
|
|
refreshHealthSnapshot,
|
|
loadGatewayModelCatalog,
|
|
logBridge,
|
|
});
|
|
|
|
const machineDisplayName = await getMachineDisplayName();
|
|
const canvasHostPortForBridge = canvasHostServer?.port;
|
|
const canvasHostHostForBridge =
|
|
canvasHostServer &&
|
|
bridgeHost &&
|
|
bridgeHost !== "0.0.0.0" &&
|
|
bridgeHost !== "::"
|
|
? bridgeHost
|
|
: undefined;
|
|
|
|
const nodePresenceTimers = new Map<string, ReturnType<typeof setInterval>>();
|
|
|
|
const stopNodePresenceTimer = (nodeId: string) => {
|
|
const timer = nodePresenceTimers.get(nodeId);
|
|
if (timer) {
|
|
clearInterval(timer);
|
|
}
|
|
nodePresenceTimers.delete(nodeId);
|
|
};
|
|
|
|
const beaconNodePresence = (
|
|
node: {
|
|
nodeId: string;
|
|
displayName?: string;
|
|
remoteIp?: string;
|
|
version?: string;
|
|
platform?: string;
|
|
deviceFamily?: string;
|
|
modelIdentifier?: string;
|
|
},
|
|
reason: string,
|
|
) => {
|
|
const host = node.displayName?.trim() || node.nodeId;
|
|
const rawIp = node.remoteIp?.trim();
|
|
const ip = rawIp && !isLoopbackAddress(rawIp) ? rawIp : undefined;
|
|
const version = node.version?.trim() || "unknown";
|
|
const platform = node.platform?.trim() || undefined;
|
|
const deviceFamily = node.deviceFamily?.trim() || undefined;
|
|
const modelIdentifier = node.modelIdentifier?.trim() || undefined;
|
|
const text = `Node: ${host}${ip ? ` (${ip})` : ""} · app ${version} · last input 0s ago · mode remote · reason ${reason}`;
|
|
upsertPresence(node.nodeId, {
|
|
host,
|
|
ip,
|
|
version,
|
|
platform,
|
|
deviceFamily,
|
|
modelIdentifier,
|
|
mode: "remote",
|
|
reason,
|
|
lastInputSeconds: 0,
|
|
instanceId: node.nodeId,
|
|
text,
|
|
});
|
|
presenceVersion += 1;
|
|
broadcast(
|
|
"presence",
|
|
{ presence: listSystemPresence() },
|
|
{
|
|
dropIfSlow: true,
|
|
stateVersion: {
|
|
presence: presenceVersion,
|
|
health: healthVersion,
|
|
},
|
|
},
|
|
);
|
|
};
|
|
|
|
const startNodePresenceTimer = (node: { nodeId: string }) => {
|
|
stopNodePresenceTimer(node.nodeId);
|
|
nodePresenceTimers.set(
|
|
node.nodeId,
|
|
setInterval(() => {
|
|
beaconNodePresence(node, "periodic");
|
|
}, 180_000),
|
|
);
|
|
};
|
|
|
|
if (bridgeEnabled && bridgePort > 0 && bridgeHost) {
|
|
try {
|
|
const started = await startNodeBridgeServer({
|
|
host: bridgeHost,
|
|
port: bridgePort,
|
|
serverName: machineDisplayName,
|
|
canvasHostPort: canvasHostPortForBridge,
|
|
canvasHostHost: canvasHostHostForBridge,
|
|
onRequest: (nodeId, req) => handleBridgeRequest(nodeId, req),
|
|
onAuthenticated: async (node) => {
|
|
beaconNodePresence(node, "node-connected");
|
|
startNodePresenceTimer(node);
|
|
|
|
try {
|
|
const cfg = await loadVoiceWakeConfig();
|
|
started.sendEvent({
|
|
nodeId: node.nodeId,
|
|
event: "voicewake.changed",
|
|
payloadJSON: JSON.stringify({ triggers: cfg.triggers }),
|
|
});
|
|
} catch {
|
|
// Best-effort only.
|
|
}
|
|
},
|
|
onDisconnected: (node) => {
|
|
bridgeUnsubscribeAll(node.nodeId);
|
|
stopNodePresenceTimer(node.nodeId);
|
|
beaconNodePresence(node, "node-disconnected");
|
|
},
|
|
onEvent: handleBridgeEvent,
|
|
onPairRequested: (request) => {
|
|
broadcast("node.pair.requested", request, { dropIfSlow: true });
|
|
},
|
|
});
|
|
if (started.port > 0) {
|
|
bridge = started;
|
|
logBridge.info(
|
|
`listening on tcp://${bridgeHost}:${bridge.port} (node)`,
|
|
);
|
|
}
|
|
} catch (err) {
|
|
logBridge.warn(`failed to start: ${String(err)}`);
|
|
}
|
|
} else if (bridgeEnabled && bridgePort > 0 && !bridgeHost) {
|
|
logBridge.warn(
|
|
"bind policy requested tailnet IP, but no tailnet interface was found; refusing to start bridge",
|
|
);
|
|
}
|
|
|
|
const tailnetDns = await resolveTailnetDnsHint();
|
|
const sshPortEnv = process.env.CLAWDBOT_SSH_PORT?.trim();
|
|
const sshPortParsed = sshPortEnv ? Number.parseInt(sshPortEnv, 10) : NaN;
|
|
const sshPort =
|
|
Number.isFinite(sshPortParsed) && sshPortParsed > 0
|
|
? sshPortParsed
|
|
: undefined;
|
|
|
|
try {
|
|
const bonjour = await startGatewayBonjourAdvertiser({
|
|
instanceName: formatBonjourInstanceName(machineDisplayName),
|
|
gatewayPort: port,
|
|
bridgePort: bridge?.port,
|
|
canvasPort: canvasHostPortForBridge,
|
|
sshPort,
|
|
tailnetDns,
|
|
cliPath: resolveBonjourCliPath(),
|
|
});
|
|
bonjourStop = bonjour.stop;
|
|
} catch (err) {
|
|
logDiscovery.warn(`bonjour advertising failed: ${String(err)}`);
|
|
}
|
|
|
|
if (wideAreaDiscoveryEnabled && bridge?.port) {
|
|
const tailnetIPv4 = pickPrimaryTailnetIPv4();
|
|
if (!tailnetIPv4) {
|
|
logDiscovery.warn(
|
|
"discovery.wideArea.enabled is true, but no Tailscale IPv4 address was found; skipping unicast DNS-SD zone update",
|
|
);
|
|
} else {
|
|
try {
|
|
const tailnetIPv6 = pickPrimaryTailnetIPv6();
|
|
const result = await writeWideAreaBridgeZone({
|
|
bridgePort: bridge.port,
|
|
gatewayPort: port,
|
|
displayName: formatBonjourInstanceName(machineDisplayName),
|
|
tailnetIPv4,
|
|
tailnetIPv6: tailnetIPv6 ?? undefined,
|
|
tailnetDns,
|
|
sshPort,
|
|
cliPath: resolveBonjourCliPath(),
|
|
});
|
|
logDiscovery.info(
|
|
`wide-area DNS-SD ${result.changed ? "updated" : "unchanged"} (${WIDE_AREA_DISCOVERY_DOMAIN} → ${result.zonePath})`,
|
|
);
|
|
} catch (err) {
|
|
logDiscovery.warn(`wide-area discovery update failed: ${String(err)}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
broadcastHealthUpdate = (snap: HealthSummary) => {
|
|
broadcast("health", snap, {
|
|
stateVersion: { presence: presenceVersion, health: healthVersion },
|
|
});
|
|
bridgeSendToAllSubscribed("health", snap);
|
|
};
|
|
|
|
// periodic keepalive
|
|
const tickInterval = setInterval(() => {
|
|
const payload = { ts: Date.now() };
|
|
broadcast("tick", payload, { dropIfSlow: true });
|
|
bridgeSendToAllSubscribed("tick", payload);
|
|
}, TICK_INTERVAL_MS);
|
|
|
|
// periodic health refresh to keep cached snapshot warm
|
|
const healthInterval = setInterval(() => {
|
|
void refreshHealthSnapshot({ probe: true }).catch((err) =>
|
|
logHealth.error(`refresh failed: ${formatError(err)}`),
|
|
);
|
|
}, HEALTH_REFRESH_INTERVAL_MS);
|
|
|
|
// Prime cache so first client gets a snapshot without waiting.
|
|
void refreshHealthSnapshot({ probe: true }).catch((err) =>
|
|
logHealth.error(`initial refresh failed: ${formatError(err)}`),
|
|
);
|
|
|
|
// dedupe cache cleanup
|
|
const dedupeCleanup = setInterval(() => {
|
|
const now = Date.now();
|
|
for (const [k, v] of dedupe) {
|
|
if (now - v.ts > DEDUPE_TTL_MS) dedupe.delete(k);
|
|
}
|
|
if (dedupe.size > DEDUPE_MAX) {
|
|
const entries = [...dedupe.entries()].sort((a, b) => a[1].ts - b[1].ts);
|
|
for (let i = 0; i < dedupe.size - DEDUPE_MAX; i++) {
|
|
dedupe.delete(entries[i][0]);
|
|
}
|
|
}
|
|
}, 60_000);
|
|
|
|
const agentUnsub = onAgentEvent(
|
|
createAgentEventHandler({
|
|
broadcast,
|
|
bridgeSendToSession,
|
|
agentRunSeq,
|
|
chatRunState,
|
|
resolveSessionKeyForRun,
|
|
clearAgentRunContext,
|
|
}),
|
|
);
|
|
|
|
const heartbeatUnsub = onHeartbeatEvent((evt) => {
|
|
broadcast("heartbeat", evt, { dropIfSlow: true });
|
|
});
|
|
|
|
let heartbeatRunner = startHeartbeatRunner({ cfg: cfgAtStart });
|
|
|
|
void cron
|
|
.start()
|
|
.catch((err) => logCron.error(`failed to start: ${String(err)}`));
|
|
|
|
wss.on("connection", (socket, upgradeReq) => {
|
|
let client: Client | null = null;
|
|
let closed = false;
|
|
const openedAt = Date.now();
|
|
const connId = randomUUID();
|
|
const remoteAddr = (
|
|
socket as WebSocket & { _socket?: { remoteAddress?: string } }
|
|
)._socket?.remoteAddress;
|
|
const headerValue = (value: string | string[] | undefined) =>
|
|
Array.isArray(value) ? value[0] : value;
|
|
const requestHost = headerValue(upgradeReq.headers.host);
|
|
const requestOrigin = headerValue(upgradeReq.headers.origin);
|
|
const requestUserAgent = headerValue(upgradeReq.headers["user-agent"]);
|
|
const forwardedFor = headerValue(upgradeReq.headers["x-forwarded-for"]);
|
|
const canvasHostPortForWs =
|
|
canvasHostServer?.port ?? (canvasHost ? port : undefined);
|
|
const canvasHostOverride =
|
|
bridgeHost && bridgeHost !== "0.0.0.0" && bridgeHost !== "::"
|
|
? bridgeHost
|
|
: undefined;
|
|
const canvasHostUrl = resolveCanvasHostUrl({
|
|
canvasPort: canvasHostPortForWs,
|
|
hostOverride: canvasHostServer ? canvasHostOverride : undefined,
|
|
requestHost: upgradeReq.headers.host,
|
|
forwardedProto: upgradeReq.headers["x-forwarded-proto"],
|
|
localAddress: upgradeReq.socket?.localAddress,
|
|
});
|
|
logWs("in", "open", { connId, remoteAddr });
|
|
const isWebchatConnect = (params: ConnectParams | null | undefined) =>
|
|
params?.client?.mode === "webchat" ||
|
|
params?.client?.name === "webchat-ui";
|
|
let handshakeState: "pending" | "connected" | "failed" = "pending";
|
|
let closeCause: string | undefined;
|
|
let closeMeta: Record<string, unknown> = {};
|
|
let lastFrameType: string | undefined;
|
|
let lastFrameMethod: string | undefined;
|
|
let lastFrameId: string | undefined;
|
|
|
|
const setCloseCause = (cause: string, meta?: Record<string, unknown>) => {
|
|
if (!closeCause) closeCause = cause;
|
|
if (meta && Object.keys(meta).length > 0) {
|
|
closeMeta = { ...closeMeta, ...meta };
|
|
}
|
|
};
|
|
|
|
const send = (obj: unknown) => {
|
|
try {
|
|
socket.send(JSON.stringify(obj));
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
};
|
|
|
|
const close = () => {
|
|
if (closed) return;
|
|
closed = true;
|
|
clearTimeout(handshakeTimer);
|
|
if (client) clients.delete(client);
|
|
try {
|
|
socket.close(1000);
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
};
|
|
|
|
socket.once("error", (err) => {
|
|
logWsControl.warn(
|
|
`error conn=${connId} remote=${remoteAddr ?? "?"}: ${formatError(err)}`,
|
|
);
|
|
close();
|
|
});
|
|
socket.once("close", (code, reason) => {
|
|
const durationMs = Date.now() - openedAt;
|
|
const closeContext = {
|
|
cause: closeCause,
|
|
handshake: handshakeState,
|
|
durationMs,
|
|
lastFrameType,
|
|
lastFrameMethod,
|
|
lastFrameId,
|
|
host: requestHost,
|
|
origin: requestOrigin,
|
|
userAgent: requestUserAgent,
|
|
forwardedFor,
|
|
...closeMeta,
|
|
};
|
|
if (!client) {
|
|
logWsControl.warn(
|
|
`closed before connect conn=${connId} remote=${remoteAddr ?? "?"} code=${code ?? "n/a"} reason=${reason?.toString() || "n/a"}`,
|
|
closeContext,
|
|
);
|
|
}
|
|
if (client && isWebchatConnect(client.connect)) {
|
|
logWsControl.info(
|
|
`webchat disconnected code=${code} reason=${reason?.toString() || "n/a"} conn=${connId}`,
|
|
);
|
|
}
|
|
if (client?.presenceKey) {
|
|
// mark presence as disconnected
|
|
upsertPresence(client.presenceKey, {
|
|
reason: "disconnect",
|
|
});
|
|
presenceVersion += 1;
|
|
broadcast(
|
|
"presence",
|
|
{ presence: listSystemPresence() },
|
|
{
|
|
dropIfSlow: true,
|
|
stateVersion: { presence: presenceVersion, health: healthVersion },
|
|
},
|
|
);
|
|
}
|
|
logWs("out", "close", {
|
|
connId,
|
|
code,
|
|
reason: reason?.toString(),
|
|
durationMs,
|
|
cause: closeCause,
|
|
handshake: handshakeState,
|
|
lastFrameType,
|
|
lastFrameMethod,
|
|
lastFrameId,
|
|
});
|
|
close();
|
|
});
|
|
|
|
const handshakeTimer = setTimeout(() => {
|
|
if (!client) {
|
|
handshakeState = "failed";
|
|
setCloseCause("handshake-timeout", {
|
|
handshakeMs: Date.now() - openedAt,
|
|
});
|
|
logWsControl.warn(
|
|
`handshake timeout conn=${connId} remote=${remoteAddr ?? "?"}`,
|
|
);
|
|
close();
|
|
}
|
|
}, HANDSHAKE_TIMEOUT_MS);
|
|
|
|
socket.on("message", async (data) => {
|
|
if (closed) return;
|
|
const text = rawDataToString(data);
|
|
try {
|
|
const parsed = JSON.parse(text);
|
|
const frameType =
|
|
parsed && typeof parsed === "object" && "type" in parsed
|
|
? typeof (parsed as { type?: unknown }).type === "string"
|
|
? String((parsed as { type?: unknown }).type)
|
|
: undefined
|
|
: undefined;
|
|
const frameMethod =
|
|
parsed && typeof parsed === "object" && "method" in parsed
|
|
? typeof (parsed as { method?: unknown }).method === "string"
|
|
? String((parsed as { method?: unknown }).method)
|
|
: undefined
|
|
: undefined;
|
|
const frameId =
|
|
parsed && typeof parsed === "object" && "id" in parsed
|
|
? typeof (parsed as { id?: unknown }).id === "string"
|
|
? String((parsed as { id?: unknown }).id)
|
|
: undefined
|
|
: undefined;
|
|
if (frameType || frameMethod || frameId) {
|
|
lastFrameType = frameType;
|
|
lastFrameMethod = frameMethod;
|
|
lastFrameId = frameId;
|
|
}
|
|
if (!client) {
|
|
// Handshake must be a normal request:
|
|
// { type:"req", method:"connect", params: ConnectParams }.
|
|
if (
|
|
!validateRequestFrame(parsed) ||
|
|
(parsed as RequestFrame).method !== "connect" ||
|
|
!validateConnectParams((parsed as RequestFrame).params)
|
|
) {
|
|
if (validateRequestFrame(parsed)) {
|
|
const req = parsed as RequestFrame;
|
|
send({
|
|
type: "res",
|
|
id: req.id,
|
|
ok: false,
|
|
error: errorShape(
|
|
ErrorCodes.INVALID_REQUEST,
|
|
req.method === "connect"
|
|
? `invalid connect params: ${formatValidationErrors(validateConnectParams.errors)}`
|
|
: "invalid handshake: first request must be connect",
|
|
),
|
|
});
|
|
} else {
|
|
logWsControl.warn(
|
|
`invalid handshake conn=${connId} remote=${remoteAddr ?? "?"}`,
|
|
);
|
|
}
|
|
handshakeState = "failed";
|
|
const handshakeError = validateRequestFrame(parsed)
|
|
? (parsed as RequestFrame).method === "connect"
|
|
? `invalid connect params: ${formatValidationErrors(validateConnectParams.errors)}`
|
|
: "invalid handshake: first request must be connect"
|
|
: "invalid request frame";
|
|
setCloseCause("invalid-handshake", {
|
|
frameType,
|
|
frameMethod,
|
|
frameId,
|
|
handshakeError,
|
|
});
|
|
socket.close(1008, "invalid handshake");
|
|
close();
|
|
return;
|
|
}
|
|
|
|
const frame = parsed as RequestFrame;
|
|
const connectParams = frame.params as ConnectParams;
|
|
|
|
// protocol negotiation
|
|
const { minProtocol, maxProtocol } = connectParams;
|
|
if (
|
|
maxProtocol < PROTOCOL_VERSION ||
|
|
minProtocol > PROTOCOL_VERSION
|
|
) {
|
|
handshakeState = "failed";
|
|
logWsControl.warn(
|
|
`protocol mismatch conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`,
|
|
);
|
|
setCloseCause("protocol-mismatch", {
|
|
minProtocol,
|
|
maxProtocol,
|
|
expectedProtocol: PROTOCOL_VERSION,
|
|
client: connectParams.client.name,
|
|
mode: connectParams.client.mode,
|
|
version: connectParams.client.version,
|
|
});
|
|
send({
|
|
type: "res",
|
|
id: frame.id,
|
|
ok: false,
|
|
error: errorShape(
|
|
ErrorCodes.INVALID_REQUEST,
|
|
"protocol mismatch",
|
|
{
|
|
details: { expectedProtocol: PROTOCOL_VERSION },
|
|
},
|
|
),
|
|
});
|
|
socket.close(1002, "protocol mismatch");
|
|
close();
|
|
return;
|
|
}
|
|
|
|
const authResult = await authorizeGatewayConnect({
|
|
auth: resolvedAuth,
|
|
connectAuth: connectParams.auth,
|
|
req: upgradeReq,
|
|
});
|
|
if (!authResult.ok) {
|
|
handshakeState = "failed";
|
|
logWsControl.warn(
|
|
`unauthorized conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`,
|
|
);
|
|
const authProvided = connectParams.auth?.token
|
|
? "token"
|
|
: connectParams.auth?.password
|
|
? "password"
|
|
: "none";
|
|
setCloseCause("unauthorized", {
|
|
authMode: resolvedAuth.mode,
|
|
authProvided,
|
|
authReason: authResult.reason,
|
|
allowTailscale: resolvedAuth.allowTailscale,
|
|
client: connectParams.client.name,
|
|
mode: connectParams.client.mode,
|
|
version: connectParams.client.version,
|
|
});
|
|
send({
|
|
type: "res",
|
|
id: frame.id,
|
|
ok: false,
|
|
error: errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized"),
|
|
});
|
|
socket.close(1008, "unauthorized");
|
|
close();
|
|
return;
|
|
}
|
|
const authMethod = authResult.method ?? "none";
|
|
|
|
const shouldTrackPresence = connectParams.client.mode !== "cli";
|
|
const presenceKey = shouldTrackPresence
|
|
? connectParams.client.instanceId || connId
|
|
: undefined;
|
|
|
|
logWs("in", "connect", {
|
|
connId,
|
|
client: connectParams.client.name,
|
|
version: connectParams.client.version,
|
|
mode: connectParams.client.mode,
|
|
instanceId: connectParams.client.instanceId,
|
|
platform: connectParams.client.platform,
|
|
auth: authMethod,
|
|
});
|
|
|
|
if (isWebchatConnect(connectParams)) {
|
|
logWsControl.info(
|
|
`webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`,
|
|
);
|
|
}
|
|
|
|
if (presenceKey) {
|
|
upsertPresence(presenceKey, {
|
|
host: connectParams.client.name || os.hostname(),
|
|
ip: isLoopbackAddress(remoteAddr) ? undefined : remoteAddr,
|
|
version: connectParams.client.version,
|
|
platform: connectParams.client.platform,
|
|
deviceFamily: connectParams.client.deviceFamily,
|
|
modelIdentifier: connectParams.client.modelIdentifier,
|
|
mode: connectParams.client.mode,
|
|
instanceId: connectParams.client.instanceId,
|
|
reason: "connect",
|
|
});
|
|
presenceVersion += 1;
|
|
}
|
|
|
|
const snapshot = buildSnapshot();
|
|
if (healthCache) {
|
|
snapshot.health = healthCache;
|
|
snapshot.stateVersion.health = healthVersion;
|
|
}
|
|
const helloOk = {
|
|
type: "hello-ok",
|
|
protocol: PROTOCOL_VERSION,
|
|
server: {
|
|
version:
|
|
process.env.CLAWDBOT_VERSION ??
|
|
process.env.npm_package_version ??
|
|
"dev",
|
|
commit: process.env.GIT_COMMIT,
|
|
host: os.hostname(),
|
|
connId,
|
|
},
|
|
features: { methods: METHODS, events: EVENTS },
|
|
snapshot,
|
|
canvasHostUrl,
|
|
policy: {
|
|
maxPayload: MAX_PAYLOAD_BYTES,
|
|
maxBufferedBytes: MAX_BUFFERED_BYTES,
|
|
tickIntervalMs: TICK_INTERVAL_MS,
|
|
},
|
|
};
|
|
|
|
clearTimeout(handshakeTimer);
|
|
client = { socket, connect: connectParams, connId, presenceKey };
|
|
handshakeState = "connected";
|
|
|
|
logWs("out", "hello-ok", {
|
|
connId,
|
|
methods: METHODS.length,
|
|
events: EVENTS.length,
|
|
presence: snapshot.presence.length,
|
|
stateVersion: snapshot.stateVersion.presence,
|
|
});
|
|
|
|
send({ type: "res", id: frame.id, ok: true, payload: helloOk });
|
|
|
|
clients.add(client);
|
|
void refreshHealthSnapshot({ probe: true }).catch((err) =>
|
|
logHealth.error(
|
|
`post-connect health refresh failed: ${formatError(err)}`,
|
|
),
|
|
);
|
|
return;
|
|
}
|
|
|
|
// After handshake, accept only req frames
|
|
if (!validateRequestFrame(parsed)) {
|
|
send({
|
|
type: "res",
|
|
id: (parsed as { id?: unknown })?.id ?? "invalid",
|
|
ok: false,
|
|
error: errorShape(
|
|
ErrorCodes.INVALID_REQUEST,
|
|
`invalid request frame: ${formatValidationErrors(validateRequestFrame.errors)}`,
|
|
),
|
|
});
|
|
return;
|
|
}
|
|
const req = parsed as RequestFrame;
|
|
logWs("in", "req", {
|
|
connId,
|
|
id: req.id,
|
|
method: req.method,
|
|
});
|
|
const respond = (
|
|
ok: boolean,
|
|
payload?: unknown,
|
|
error?: ErrorShape,
|
|
meta?: Record<string, unknown>,
|
|
) => {
|
|
send({ type: "res", id: req.id, ok, payload, error });
|
|
logWs("out", "res", {
|
|
connId,
|
|
id: req.id,
|
|
ok,
|
|
method: req.method,
|
|
errorCode: error?.code,
|
|
errorMessage: error?.message,
|
|
...meta,
|
|
});
|
|
};
|
|
|
|
void (async () => {
|
|
await handleGatewayRequest({
|
|
req,
|
|
respond,
|
|
client,
|
|
isWebchatConnect,
|
|
context: {
|
|
deps,
|
|
cron,
|
|
cronStorePath,
|
|
loadGatewayModelCatalog,
|
|
getHealthCache: () => healthCache,
|
|
refreshHealthSnapshot,
|
|
logHealth,
|
|
incrementPresenceVersion: () => {
|
|
presenceVersion += 1;
|
|
return presenceVersion;
|
|
},
|
|
getHealthVersion: () => healthVersion,
|
|
broadcast,
|
|
bridge,
|
|
bridgeSendToSession,
|
|
hasConnectedMobileNode,
|
|
agentRunSeq,
|
|
chatAbortControllers,
|
|
chatRunBuffers,
|
|
chatDeltaSentAt,
|
|
addChatRun,
|
|
removeChatRun,
|
|
dedupe,
|
|
wizardSessions,
|
|
findRunningWizard,
|
|
purgeWizardSession,
|
|
getRuntimeSnapshot,
|
|
startWhatsAppProvider,
|
|
stopWhatsAppProvider,
|
|
startTelegramProvider,
|
|
stopTelegramProvider,
|
|
startDiscordProvider,
|
|
stopDiscordProvider,
|
|
startSlackProvider,
|
|
stopSlackProvider,
|
|
startSignalProvider,
|
|
stopSignalProvider,
|
|
startIMessageProvider,
|
|
stopIMessageProvider,
|
|
markWhatsAppLoggedOut,
|
|
wizardRunner,
|
|
broadcastVoiceWakeChanged,
|
|
},
|
|
});
|
|
})().catch((err) => {
|
|
log.error(`request handler failed: ${formatForLog(err)}`);
|
|
respond(
|
|
false,
|
|
undefined,
|
|
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
|
|
);
|
|
});
|
|
} catch (err) {
|
|
log.error(`parse/handle error: ${String(err)}`);
|
|
logWs("out", "parse-error", { connId, error: formatForLog(err) });
|
|
// If still in handshake, close; otherwise respond error
|
|
if (!client) {
|
|
close();
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
const { provider: agentProvider, model: agentModel } =
|
|
resolveConfiguredModelRef({
|
|
cfg: cfgAtStart,
|
|
defaultProvider: DEFAULT_PROVIDER,
|
|
defaultModel: DEFAULT_MODEL,
|
|
});
|
|
const modelRef = `${agentProvider}/${agentModel}`;
|
|
log.info(`agent model: ${modelRef}`, {
|
|
consoleMessage: `agent model: ${chalk.whiteBright(modelRef)}`,
|
|
});
|
|
log.info(`listening on ws://${bindHost}:${port} (PID ${process.pid})`);
|
|
log.info(`log file: ${getResolvedLoggerSettings().file}`);
|
|
if (isNixMode) {
|
|
log.info("gateway: running in Nix mode (config managed externally)");
|
|
}
|
|
let tailscaleCleanup: (() => Promise<void>) | null = null;
|
|
if (tailscaleMode !== "off") {
|
|
try {
|
|
if (tailscaleMode === "serve") {
|
|
await enableTailscaleServe(port);
|
|
} else {
|
|
await enableTailscaleFunnel(port);
|
|
}
|
|
const host = await getTailnetHostname().catch(() => null);
|
|
if (host) {
|
|
const uiPath = controlUiBasePath ? `${controlUiBasePath}/` : "/";
|
|
logTailscale.info(
|
|
`${tailscaleMode} enabled: https://${host}${uiPath} (WS via wss://${host})`,
|
|
);
|
|
} else {
|
|
logTailscale.info(`${tailscaleMode} enabled`);
|
|
}
|
|
} catch (err) {
|
|
logTailscale.warn(
|
|
`${tailscaleMode} failed: ${err instanceof Error ? err.message : String(err)}`,
|
|
);
|
|
}
|
|
if (tailscaleConfig.resetOnExit) {
|
|
tailscaleCleanup = async () => {
|
|
try {
|
|
if (tailscaleMode === "serve") {
|
|
await disableTailscaleServe();
|
|
} else {
|
|
await disableTailscaleFunnel();
|
|
}
|
|
} catch (err) {
|
|
logTailscale.warn(
|
|
`${tailscaleMode} cleanup failed: ${err instanceof Error ? err.message : String(err)}`,
|
|
);
|
|
}
|
|
};
|
|
}
|
|
}
|
|
|
|
// Start clawd browser control server (unless disabled via config).
|
|
let browserControl: Awaited<
|
|
ReturnType<typeof startBrowserControlServerIfEnabled>
|
|
> = null;
|
|
try {
|
|
browserControl = await startBrowserControlServerIfEnabled();
|
|
} catch (err) {
|
|
logBrowser.error(`server failed to start: ${String(err)}`);
|
|
}
|
|
|
|
// Start Gmail watcher if configured (hooks.gmail.account).
|
|
if (process.env.CLAWDBOT_SKIP_GMAIL_WATCHER !== "1") {
|
|
try {
|
|
const gmailResult = await startGmailWatcher(cfgAtStart);
|
|
if (gmailResult.started) {
|
|
logHooks.info("gmail watcher started");
|
|
} else if (
|
|
gmailResult.reason &&
|
|
gmailResult.reason !== "hooks not enabled" &&
|
|
gmailResult.reason !== "no gmail account configured"
|
|
) {
|
|
logHooks.warn(`gmail watcher not started: ${gmailResult.reason}`);
|
|
}
|
|
} catch (err) {
|
|
logHooks.error(`gmail watcher failed to start: ${String(err)}`);
|
|
}
|
|
}
|
|
|
|
// Launch configured providers (WhatsApp Web, Discord, Slack, Telegram) so gateway replies via the
|
|
// surface the message came from. Tests can opt out via CLAWDBOT_SKIP_PROVIDERS.
|
|
if (process.env.CLAWDBOT_SKIP_PROVIDERS !== "1") {
|
|
try {
|
|
await startProviders();
|
|
} catch (err) {
|
|
logProviders.error(`provider startup failed: ${String(err)}`);
|
|
}
|
|
} else {
|
|
logProviders.info("skipping provider start (CLAWDBOT_SKIP_PROVIDERS=1)");
|
|
}
|
|
|
|
const scheduleRestartSentinelWake = async () => {
|
|
const sentinel = await consumeRestartSentinel();
|
|
if (!sentinel) return;
|
|
const payload = sentinel.payload;
|
|
const sessionKey = payload.sessionKey?.trim();
|
|
const message = formatRestartSentinelMessage(payload);
|
|
const summary = summarizeRestartSentinel(payload);
|
|
|
|
if (!sessionKey) {
|
|
enqueueSystemEvent(message);
|
|
return;
|
|
}
|
|
|
|
const { cfg, entry } = loadSessionEntry(sessionKey);
|
|
const lastProvider =
|
|
entry?.lastProvider && entry.lastProvider !== "webchat"
|
|
? entry.lastProvider
|
|
: undefined;
|
|
const lastTo = entry?.lastTo?.trim();
|
|
const parsedTarget = resolveAnnounceTargetFromKey(sessionKey);
|
|
const provider = lastProvider ?? parsedTarget?.provider;
|
|
const to = lastTo || parsedTarget?.to;
|
|
if (!provider || !to) {
|
|
enqueueSystemEvent(message);
|
|
return;
|
|
}
|
|
|
|
const resolved = resolveOutboundTarget({
|
|
provider: provider as
|
|
| "whatsapp"
|
|
| "telegram"
|
|
| "discord"
|
|
| "slack"
|
|
| "signal"
|
|
| "imessage"
|
|
| "webchat",
|
|
to,
|
|
allowFrom: cfg.whatsapp?.allowFrom ?? [],
|
|
});
|
|
if (!resolved.ok) {
|
|
enqueueSystemEvent(message);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await agentCommand(
|
|
{
|
|
message,
|
|
sessionKey,
|
|
to: resolved.to,
|
|
provider,
|
|
deliver: true,
|
|
bestEffortDeliver: true,
|
|
messageProvider: provider,
|
|
},
|
|
defaultRuntime,
|
|
deps,
|
|
);
|
|
} catch (err) {
|
|
enqueueSystemEvent(`${summary}\n${String(err)}`);
|
|
}
|
|
};
|
|
|
|
const shouldWakeFromSentinel =
|
|
!process.env.VITEST && process.env.NODE_ENV !== "test";
|
|
if (shouldWakeFromSentinel) {
|
|
setTimeout(() => {
|
|
void scheduleRestartSentinelWake();
|
|
}, 750);
|
|
}
|
|
|
|
const applyHotReload = async (
|
|
plan: GatewayReloadPlan,
|
|
nextConfig: ReturnType<typeof loadConfig>,
|
|
) => {
|
|
if (plan.reloadHooks) {
|
|
try {
|
|
hooksConfig = resolveHooksConfig(nextConfig);
|
|
} catch (err) {
|
|
logHooks.warn(`hooks config reload failed: ${String(err)}`);
|
|
}
|
|
}
|
|
|
|
if (plan.restartHeartbeat) {
|
|
heartbeatRunner.stop();
|
|
heartbeatRunner = startHeartbeatRunner({ cfg: nextConfig });
|
|
}
|
|
|
|
if (plan.restartCron) {
|
|
cron.stop();
|
|
const next = buildCronService(nextConfig);
|
|
cron = next.cron;
|
|
cronStorePath = next.storePath;
|
|
void cron
|
|
.start()
|
|
.catch((err) => logCron.error(`failed to start: ${String(err)}`));
|
|
}
|
|
|
|
if (plan.restartBrowserControl) {
|
|
if (browserControl) {
|
|
await browserControl.stop().catch(() => {});
|
|
}
|
|
try {
|
|
browserControl = await startBrowserControlServerIfEnabled();
|
|
} catch (err) {
|
|
logBrowser.error(`server failed to start: ${String(err)}`);
|
|
}
|
|
}
|
|
|
|
if (plan.restartGmailWatcher) {
|
|
await stopGmailWatcher().catch(() => {});
|
|
if (process.env.CLAWDBOT_SKIP_GMAIL_WATCHER !== "1") {
|
|
try {
|
|
const gmailResult = await startGmailWatcher(nextConfig);
|
|
if (gmailResult.started) {
|
|
logHooks.info("gmail watcher started");
|
|
} else if (
|
|
gmailResult.reason &&
|
|
gmailResult.reason !== "hooks not enabled" &&
|
|
gmailResult.reason !== "no gmail account configured"
|
|
) {
|
|
logHooks.warn(`gmail watcher not started: ${gmailResult.reason}`);
|
|
}
|
|
} catch (err) {
|
|
logHooks.error(`gmail watcher failed to start: ${String(err)}`);
|
|
}
|
|
} else {
|
|
logHooks.info(
|
|
"skipping gmail watcher restart (CLAWDBOT_SKIP_GMAIL_WATCHER=1)",
|
|
);
|
|
}
|
|
}
|
|
|
|
if (plan.restartProviders.size > 0) {
|
|
if (process.env.CLAWDBOT_SKIP_PROVIDERS === "1") {
|
|
logProviders.info(
|
|
"skipping provider reload (CLAWDBOT_SKIP_PROVIDERS=1)",
|
|
);
|
|
} else {
|
|
const restartProvider = async (
|
|
name: ProviderKind,
|
|
stop: () => Promise<void>,
|
|
start: () => Promise<void>,
|
|
) => {
|
|
logProviders.info(`restarting ${name} provider`);
|
|
await stop();
|
|
await start();
|
|
};
|
|
if (plan.restartProviders.has("whatsapp")) {
|
|
await restartProvider(
|
|
"whatsapp",
|
|
stopWhatsAppProvider,
|
|
startWhatsAppProvider,
|
|
);
|
|
}
|
|
if (plan.restartProviders.has("telegram")) {
|
|
await restartProvider(
|
|
"telegram",
|
|
stopTelegramProvider,
|
|
startTelegramProvider,
|
|
);
|
|
}
|
|
if (plan.restartProviders.has("discord")) {
|
|
await restartProvider(
|
|
"discord",
|
|
stopDiscordProvider,
|
|
startDiscordProvider,
|
|
);
|
|
}
|
|
if (plan.restartProviders.has("slack")) {
|
|
await restartProvider("slack", stopSlackProvider, startSlackProvider);
|
|
}
|
|
if (plan.restartProviders.has("signal")) {
|
|
await restartProvider(
|
|
"signal",
|
|
stopSignalProvider,
|
|
startSignalProvider,
|
|
);
|
|
}
|
|
if (plan.restartProviders.has("imessage")) {
|
|
await restartProvider(
|
|
"imessage",
|
|
stopIMessageProvider,
|
|
startIMessageProvider,
|
|
);
|
|
}
|
|
if (plan.restartProviders.has("msteams")) {
|
|
await restartProvider(
|
|
"msteams",
|
|
stopMSTeamsProvider,
|
|
startMSTeamsProvider,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
setCommandLaneConcurrency("cron", nextConfig.cron?.maxConcurrentRuns ?? 1);
|
|
setCommandLaneConcurrency(
|
|
"main",
|
|
nextConfig.agents?.defaults?.maxConcurrent ?? 1,
|
|
);
|
|
setCommandLaneConcurrency(
|
|
"subagent",
|
|
nextConfig.agents?.defaults?.subagents?.maxConcurrent ?? 1,
|
|
);
|
|
|
|
if (plan.hotReasons.length > 0) {
|
|
logReload.info(
|
|
`config hot reload applied (${plan.hotReasons.join(", ")})`,
|
|
);
|
|
} else if (plan.noopPaths.length > 0) {
|
|
logReload.info(
|
|
`config change applied (dynamic reads: ${plan.noopPaths.join(", ")})`,
|
|
);
|
|
}
|
|
};
|
|
|
|
const requestGatewayRestart = (
|
|
plan: GatewayReloadPlan,
|
|
_nextConfig: ReturnType<typeof loadConfig>,
|
|
) => {
|
|
const reasons = plan.restartReasons.length
|
|
? plan.restartReasons.join(", ")
|
|
: plan.changedPaths.join(", ");
|
|
logReload.warn(`config change requires gateway restart (${reasons})`);
|
|
if (process.listenerCount("SIGUSR1") === 0) {
|
|
logReload.warn("no SIGUSR1 listener found; restart skipped");
|
|
return;
|
|
}
|
|
process.emit("SIGUSR1");
|
|
};
|
|
|
|
const configReloader = startGatewayConfigReloader({
|
|
initialConfig: cfgAtStart,
|
|
readSnapshot: readConfigFileSnapshot,
|
|
onHotReload: applyHotReload,
|
|
onRestart: requestGatewayRestart,
|
|
log: {
|
|
info: (msg) => logReload.info(msg),
|
|
warn: (msg) => logReload.warn(msg),
|
|
error: (msg) => logReload.error(msg),
|
|
},
|
|
watchPath: CONFIG_PATH_CLAWDBOT,
|
|
});
|
|
|
|
return {
|
|
close: async (opts) => {
|
|
const reasonRaw =
|
|
typeof opts?.reason === "string" ? opts.reason.trim() : "";
|
|
const reason = reasonRaw || "gateway stopping";
|
|
const restartExpectedMs =
|
|
typeof opts?.restartExpectedMs === "number" &&
|
|
Number.isFinite(opts.restartExpectedMs)
|
|
? Math.max(0, Math.floor(opts.restartExpectedMs))
|
|
: null;
|
|
if (bonjourStop) {
|
|
try {
|
|
await bonjourStop();
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
if (tailscaleCleanup) {
|
|
await tailscaleCleanup();
|
|
}
|
|
if (canvasHost) {
|
|
try {
|
|
await canvasHost.close();
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
if (canvasHostServer) {
|
|
try {
|
|
await canvasHostServer.close();
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
if (bridge) {
|
|
try {
|
|
await bridge.close();
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
await stopWhatsAppProvider();
|
|
await stopTelegramProvider();
|
|
await stopDiscordProvider();
|
|
await stopSlackProvider();
|
|
await stopSignalProvider();
|
|
await stopIMessageProvider();
|
|
await stopMSTeamsProvider();
|
|
await stopGmailWatcher();
|
|
cron.stop();
|
|
heartbeatRunner.stop();
|
|
for (const timer of nodePresenceTimers.values()) {
|
|
clearInterval(timer);
|
|
}
|
|
nodePresenceTimers.clear();
|
|
broadcast("shutdown", {
|
|
reason,
|
|
restartExpectedMs,
|
|
});
|
|
clearInterval(tickInterval);
|
|
clearInterval(healthInterval);
|
|
clearInterval(dedupeCleanup);
|
|
if (agentUnsub) {
|
|
try {
|
|
agentUnsub();
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
if (heartbeatUnsub) {
|
|
try {
|
|
heartbeatUnsub();
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
chatRunState.clear();
|
|
for (const c of clients) {
|
|
try {
|
|
c.socket.close(1012, "service restart");
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
clients.clear();
|
|
await configReloader.stop().catch(() => {});
|
|
if (browserControl) {
|
|
await browserControl.stop().catch(() => {});
|
|
}
|
|
await new Promise<void>((resolve) => wss.close(() => resolve()));
|
|
await new Promise<void>((resolve, reject) =>
|
|
httpServer.close((err) => (err ? reject(err) : resolve())),
|
|
);
|
|
},
|
|
};
|
|
}
|