refactor(gateway): split server runtime

This commit is contained in:
Peter Steinberger
2026-01-14 09:11:21 +00:00
parent ea018a68cc
commit d19bc1562b
30 changed files with 3486 additions and 2542 deletions

View File

@@ -0,0 +1,197 @@
import { randomUUID } from "node:crypto";
import { normalizeChannelId } from "../channels/plugins/index.js";
import { agentCommand } from "../commands/agent.js";
import { loadConfig } from "../config/config.js";
import { saveSessionStore } from "../config/sessions.js";
import { normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import type {
BridgeEvent,
BridgeHandlersContext,
} from "./server-bridge-types.js";
import { loadSessionEntry } from "./session-utils.js";
import { formatForLog } from "./ws-log.js";
export const handleBridgeEvent = async (
ctx: BridgeHandlersContext,
nodeId: string,
evt: BridgeEvent,
) => {
switch (evt.event) {
case "voice.transcript": {
if (!evt.payloadJSON) return;
let payload: unknown;
try {
payload = JSON.parse(evt.payloadJSON) as unknown;
} catch {
return;
}
const obj =
typeof payload === "object" && payload !== null
? (payload as Record<string, unknown>)
: {};
const text = typeof obj.text === "string" ? obj.text.trim() : "";
if (!text) return;
if (text.length > 20_000) return;
const sessionKeyRaw =
typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
const cfg = loadConfig();
const rawMainKey = normalizeMainKey(cfg.session?.mainKey);
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey;
const { storePath, store, entry, canonicalKey } =
loadSessionEntry(sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
store[canonicalKey] = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
if (storePath) {
await saveSessionStore(storePath, store);
}
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
// This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId).
ctx.addChatRun(sessionId, {
sessionKey,
clientRunId: `voice-${randomUUID()}`,
});
void agentCommand(
{
message: text,
sessionId,
sessionKey,
thinking: "low",
deliver: false,
messageChannel: "node",
},
defaultRuntime,
ctx.deps,
).catch((err) => {
ctx.logBridge.warn(`agent failed node=${nodeId}: ${formatForLog(err)}`);
});
return;
}
case "agent.request": {
if (!evt.payloadJSON) return;
type AgentDeepLink = {
message?: string;
sessionKey?: string | null;
thinking?: string | null;
deliver?: boolean;
to?: string | null;
channel?: string | null;
timeoutSeconds?: number | null;
key?: string | null;
};
let link: AgentDeepLink | null = null;
try {
link = JSON.parse(evt.payloadJSON) as AgentDeepLink;
} catch {
return;
}
const message = (link?.message ?? "").trim();
if (!message) return;
if (message.length > 20_000) return;
const channelRaw =
typeof link?.channel === "string" ? link.channel.trim() : "";
const channel = normalizeChannelId(channelRaw) ?? undefined;
const to =
typeof link?.to === "string" && link.to.trim()
? link.to.trim()
: undefined;
const deliver = Boolean(link?.deliver) && Boolean(channel);
const sessionKeyRaw = (link?.sessionKey ?? "").trim();
const sessionKey =
sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`;
const { storePath, store, entry, canonicalKey } =
loadSessionEntry(sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
store[canonicalKey] = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
if (storePath) {
await saveSessionStore(storePath, store);
}
void agentCommand(
{
message,
sessionId,
sessionKey,
thinking: link?.thinking ?? undefined,
deliver,
to,
channel,
timeout:
typeof link?.timeoutSeconds === "number"
? link.timeoutSeconds.toString()
: undefined,
messageChannel: "node",
},
defaultRuntime,
ctx.deps,
).catch((err) => {
ctx.logBridge.warn(`agent failed node=${nodeId}: ${formatForLog(err)}`);
});
return;
}
case "chat.subscribe": {
if (!evt.payloadJSON) return;
let payload: unknown;
try {
payload = JSON.parse(evt.payloadJSON) as unknown;
} catch {
return;
}
const obj =
typeof payload === "object" && payload !== null
? (payload as Record<string, unknown>)
: {};
const sessionKey =
typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
if (!sessionKey) return;
ctx.bridgeSubscribe(nodeId, sessionKey);
return;
}
case "chat.unsubscribe": {
if (!evt.payloadJSON) return;
let payload: unknown;
try {
payload = JSON.parse(evt.payloadJSON) as unknown;
} catch {
return;
}
const obj =
typeof payload === "object" && payload !== null
? (payload as Record<string, unknown>)
: {};
const sessionKey =
typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
if (!sessionKey) return;
ctx.bridgeUnsubscribe(nodeId, sessionKey);
return;
}
default:
return;
}
};

View File

@@ -0,0 +1,391 @@
import { randomUUID } from "node:crypto";
import { resolveThinkingDefault } from "../agents/model-selection.js";
import { resolveAgentTimeoutMs } from "../agents/timeout.js";
import { agentCommand } from "../commands/agent.js";
import { mergeSessionEntry, saveSessionStore } from "../config/sessions.js";
import { registerAgentRunContext } from "../infra/agent-events.js";
import { defaultRuntime } from "../runtime.js";
import {
abortChatRunById,
abortChatRunsForSessionKey,
isChatStopCommandText,
resolveChatRunExpiresAtMs,
} from "./chat-abort.js";
import {
type ChatImageContent,
parseMessageWithAttachments,
} from "./chat-attachments.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateChatAbortParams,
validateChatHistoryParams,
validateChatSendParams,
} from "./protocol/index.js";
import type { BridgeMethodHandler } from "./server-bridge-types.js";
import { MAX_CHAT_HISTORY_MESSAGES_BYTES } from "./server-constants.js";
import {
capArrayByJsonBytes,
loadSessionEntry,
readSessionMessages,
resolveSessionModelRef,
} from "./session-utils.js";
export const handleChatBridgeMethods: BridgeMethodHandler = async (
ctx,
nodeId,
method,
params,
) => {
switch (method) {
case "chat.history": {
if (!validateChatHistoryParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`,
},
};
}
const { sessionKey, limit } = params as {
sessionKey: string;
limit?: number;
};
const { cfg, storePath, entry } = loadSessionEntry(sessionKey);
const sessionId = entry?.sessionId;
const rawMessages =
sessionId && storePath
? readSessionMessages(sessionId, storePath, entry?.sessionFile)
: [];
const max = typeof limit === "number" ? limit : 200;
const sliced =
rawMessages.length > max ? rawMessages.slice(-max) : rawMessages;
const capped = capArrayByJsonBytes(
sliced,
MAX_CHAT_HISTORY_MESSAGES_BYTES,
).items;
let thinkingLevel = entry?.thinkingLevel;
if (!thinkingLevel) {
const configured = cfg.agents?.defaults?.thinkingDefault;
if (configured) {
thinkingLevel = configured;
} else {
const { provider, model } = resolveSessionModelRef(cfg, entry);
const catalog = await ctx.loadGatewayModelCatalog();
thinkingLevel = resolveThinkingDefault({
cfg,
provider,
model,
catalog,
});
}
}
return {
ok: true,
payloadJSON: JSON.stringify({
sessionKey,
sessionId,
messages: capped,
thinkingLevel,
}),
};
}
case "chat.abort": {
if (!validateChatAbortParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid chat.abort params: ${formatValidationErrors(validateChatAbortParams.errors)}`,
},
};
}
const { sessionKey, runId } = params as {
sessionKey: string;
runId?: string;
};
const ops = {
chatAbortControllers: ctx.chatAbortControllers,
chatRunBuffers: ctx.chatRunBuffers,
chatDeltaSentAt: ctx.chatDeltaSentAt,
chatAbortedRuns: ctx.chatAbortedRuns,
removeChatRun: ctx.removeChatRun,
agentRunSeq: ctx.agentRunSeq,
broadcast: ctx.broadcast,
bridgeSendToSession: ctx.bridgeSendToSession,
};
if (!runId) {
const res = abortChatRunsForSessionKey(ops, {
sessionKey,
stopReason: "rpc",
});
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
aborted: res.aborted,
runIds: res.runIds,
}),
};
}
const active = ctx.chatAbortControllers.get(runId);
if (!active) {
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
aborted: false,
runIds: [],
}),
};
}
if (active.sessionKey !== sessionKey) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: "runId does not match sessionKey",
},
};
}
const res = abortChatRunById(ops, {
runId,
sessionKey,
stopReason: "rpc",
});
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
aborted: res.aborted,
runIds: res.aborted ? [runId] : [],
}),
};
}
case "chat.send": {
if (!validateChatSendParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`,
},
};
}
const p = params as {
sessionKey: string;
message: string;
thinking?: string;
deliver?: boolean;
attachments?: Array<{
type?: string;
mimeType?: string;
fileName?: string;
content?: unknown;
}>;
timeoutMs?: number;
idempotencyKey: string;
};
const stopCommand = isChatStopCommandText(p.message);
const normalizedAttachments =
p.attachments
?.map((a) => ({
type: typeof a?.type === "string" ? a.type : undefined,
mimeType: typeof a?.mimeType === "string" ? a.mimeType : undefined,
fileName: typeof a?.fileName === "string" ? a.fileName : undefined,
content:
typeof a?.content === "string"
? a.content
: ArrayBuffer.isView(a?.content)
? Buffer.from(
a.content.buffer,
a.content.byteOffset,
a.content.byteLength,
).toString("base64")
: undefined,
}))
.filter((a) => a.content) ?? [];
let parsedMessage = p.message;
let parsedImages: ChatImageContent[] = [];
if (normalizedAttachments.length > 0) {
try {
const parsed = await parseMessageWithAttachments(
p.message,
normalizedAttachments,
{ maxBytes: 5_000_000, log: ctx.logBridge },
);
parsedMessage = parsed.message;
parsedImages = parsed.images;
} catch (err) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: String(err),
},
};
}
}
const { cfg, storePath, store, entry, canonicalKey } = loadSessionEntry(
p.sessionKey,
);
const timeoutMs = resolveAgentTimeoutMs({
cfg,
overrideMs: p.timeoutMs,
});
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
const sessionEntry = mergeSessionEntry(entry, {
sessionId,
updatedAt: now,
});
const clientRunId = p.idempotencyKey;
registerAgentRunContext(clientRunId, { sessionKey: p.sessionKey });
if (stopCommand) {
const res = abortChatRunsForSessionKey(
{
chatAbortControllers: ctx.chatAbortControllers,
chatRunBuffers: ctx.chatRunBuffers,
chatDeltaSentAt: ctx.chatDeltaSentAt,
chatAbortedRuns: ctx.chatAbortedRuns,
removeChatRun: ctx.removeChatRun,
agentRunSeq: ctx.agentRunSeq,
broadcast: ctx.broadcast,
bridgeSendToSession: ctx.bridgeSendToSession,
},
{ sessionKey: p.sessionKey, stopReason: "stop" },
);
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
aborted: res.aborted,
runIds: res.runIds,
}),
};
}
const cached = ctx.dedupe.get(`chat:${clientRunId}`);
if (cached) {
if (cached.ok) {
return { ok: true, payloadJSON: JSON.stringify(cached.payload) };
}
return {
ok: false,
error: cached.error ?? {
code: ErrorCodes.UNAVAILABLE,
message: "request failed",
},
};
}
const activeExisting = ctx.chatAbortControllers.get(clientRunId);
if (activeExisting) {
return {
ok: true,
payloadJSON: JSON.stringify({
runId: clientRunId,
status: "in_flight",
}),
};
}
try {
const abortController = new AbortController();
ctx.chatAbortControllers.set(clientRunId, {
controller: abortController,
sessionId,
sessionKey: p.sessionKey,
startedAtMs: now,
expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }),
});
ctx.addChatRun(clientRunId, {
sessionKey: p.sessionKey,
clientRunId,
});
if (store) {
store[canonicalKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, store);
}
}
const ackPayload = {
runId: clientRunId,
status: "started" as const,
};
void agentCommand(
{
message: parsedMessage,
images: parsedImages.length > 0 ? parsedImages : undefined,
sessionId,
sessionKey: p.sessionKey,
runId: clientRunId,
thinking: p.thinking,
deliver: p.deliver,
timeout: Math.ceil(timeoutMs / 1000).toString(),
messageChannel: `node(${nodeId})`,
abortSignal: abortController.signal,
},
defaultRuntime,
ctx.deps,
)
.then(() => {
ctx.dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: true,
payload: { runId: clientRunId, status: "ok" as const },
});
})
.catch((err) => {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
ctx.dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: false,
payload: {
runId: clientRunId,
status: "error" as const,
summary: String(err),
},
error,
});
})
.finally(() => {
ctx.chatAbortControllers.delete(clientRunId);
});
return { ok: true, payloadJSON: JSON.stringify(ackPayload) };
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId: clientRunId,
status: "error" as const,
summary: String(err),
};
ctx.dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: false,
payload,
error,
});
return {
ok: false,
error: error ?? {
code: ErrorCodes.UNAVAILABLE,
message: String(err),
},
};
}
}
default:
return null;
}
};

View File

@@ -0,0 +1,133 @@
import {
resolveAgentWorkspaceDir,
resolveDefaultAgentId,
} from "../agents/agent-scope.js";
import {
CONFIG_PATH_CLAWDBOT,
loadConfig,
parseConfigJson5,
readConfigFileSnapshot,
validateConfigObject,
writeConfigFile,
} from "../config/config.js";
import { buildConfigSchema } from "../config/schema.js";
import { loadClawdbotPlugins } from "../plugins/loader.js";
import {
ErrorCodes,
formatValidationErrors,
validateConfigGetParams,
validateConfigSchemaParams,
validateConfigSetParams,
} from "./protocol/index.js";
import type { BridgeMethodHandler } from "./server-bridge-types.js";
export const handleConfigBridgeMethods: BridgeMethodHandler = async (
_ctx,
_nodeId,
method,
params,
) => {
switch (method) {
case "config.get": {
if (!validateConfigGetParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid config.get params: ${formatValidationErrors(validateConfigGetParams.errors)}`,
},
};
}
const snapshot = await readConfigFileSnapshot();
return { ok: true, payloadJSON: JSON.stringify(snapshot) };
}
case "config.schema": {
if (!validateConfigSchemaParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid config.schema params: ${formatValidationErrors(validateConfigSchemaParams.errors)}`,
},
};
}
const cfg = loadConfig();
const workspaceDir = resolveAgentWorkspaceDir(
cfg,
resolveDefaultAgentId(cfg),
);
const pluginRegistry = loadClawdbotPlugins({
config: cfg,
workspaceDir,
logger: {
info: () => {},
warn: () => {},
error: () => {},
debug: () => {},
},
});
const schema = buildConfigSchema({
plugins: pluginRegistry.plugins.map((plugin) => ({
id: plugin.id,
name: plugin.name,
description: plugin.description,
configUiHints: plugin.configUiHints,
})),
});
return { ok: true, payloadJSON: JSON.stringify(schema) };
}
case "config.set": {
if (!validateConfigSetParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid config.set params: ${formatValidationErrors(validateConfigSetParams.errors)}`,
},
};
}
const rawValue = (params as { raw?: unknown }).raw;
if (typeof rawValue !== "string") {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: "invalid config.set params: raw (string) required",
},
};
}
const parsedRes = parseConfigJson5(rawValue);
if (!parsedRes.ok) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: parsedRes.error,
},
};
}
const validated = validateConfigObject(parsedRes.parsed);
if (!validated.ok) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: "invalid config",
details: { issues: validated.issues },
},
};
}
await writeConfigFile(validated.config);
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
path: CONFIG_PATH_CLAWDBOT,
config: validated.config,
}),
};
}
default:
return null;
}
};

View File

@@ -0,0 +1,401 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
resolveEmbeddedSessionLane,
waitForEmbeddedPiRunEnd,
} from "../agents/pi-embedded.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveMainSessionKeyFromConfig,
type SessionEntry,
saveSessionStore,
} from "../config/sessions.js";
import { clearCommandLane } from "../process/command-queue.js";
import {
ErrorCodes,
formatValidationErrors,
type SessionsCompactParams,
type SessionsDeleteParams,
type SessionsListParams,
type SessionsPatchParams,
type SessionsResetParams,
type SessionsResolveParams,
validateSessionsCompactParams,
validateSessionsDeleteParams,
validateSessionsListParams,
validateSessionsPatchParams,
validateSessionsResetParams,
validateSessionsResolveParams,
} from "./protocol/index.js";
import type { BridgeMethodHandler } from "./server-bridge-types.js";
import {
archiveFileOnDisk,
listSessionsFromStore,
loadCombinedSessionStoreForGateway,
loadSessionEntry,
resolveGatewaySessionStoreTarget,
resolveSessionTranscriptCandidates,
type SessionsPatchResult,
} from "./session-utils.js";
import { applySessionsPatchToStore } from "./sessions-patch.js";
import { resolveSessionKeyFromResolveParams } from "./sessions-resolve.js";
export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
ctx,
_nodeId,
method,
params,
) => {
switch (method) {
case "sessions.list": {
if (!validateSessionsListParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`,
},
};
}
const p = params as SessionsListParams;
const cfg = loadConfig();
const { storePath, store } = loadCombinedSessionStoreForGateway(cfg);
const result = listSessionsFromStore({
cfg,
storePath,
store,
opts: p,
});
return { ok: true, payloadJSON: JSON.stringify(result) };
}
case "sessions.resolve": {
if (!validateSessionsResolveParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid sessions.resolve params: ${formatValidationErrors(validateSessionsResolveParams.errors)}`,
},
};
}
const p = params as SessionsResolveParams;
const cfg = loadConfig();
const resolved = resolveSessionKeyFromResolveParams({ cfg, p });
if (!resolved.ok) {
return {
ok: false,
error: {
code: resolved.error.code,
message: resolved.error.message,
details: resolved.error.details,
},
};
}
return {
ok: true,
payloadJSON: JSON.stringify({ ok: true, key: resolved.key }),
};
}
case "sessions.patch": {
if (!validateSessionsPatchParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`,
},
};
}
const p = params as SessionsPatchParams;
const key = String(p.key ?? "").trim();
if (!key) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: "key required",
},
};
}
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find(
(candidate) => store[candidate],
);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const applied = await applySessionsPatchToStore({
cfg,
store,
storeKey: primaryKey,
patch: p,
loadGatewayModelCatalog: ctx.loadGatewayModelCatalog,
});
if (!applied.ok) {
return {
ok: false,
error: {
code: applied.error.code,
message: applied.error.message,
details: applied.error.details,
},
};
}
await saveSessionStore(storePath, store);
const payload: SessionsPatchResult = {
ok: true,
path: storePath,
key: target.canonicalKey,
entry: applied.entry,
};
return { ok: true, payloadJSON: JSON.stringify(payload) };
}
case "sessions.reset": {
if (!validateSessionsResetParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid sessions.reset params: ${formatValidationErrors(validateSessionsResetParams.errors)}`,
},
};
}
const p = params as SessionsResetParams;
const key = String(p.key ?? "").trim();
if (!key) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: "key required",
},
};
}
const { storePath, store, entry } = loadSessionEntry(key);
const now = Date.now();
const next: SessionEntry = {
sessionId: randomUUID(),
updatedAt: now,
systemSent: false,
abortedLastRun: false,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
model: entry?.model,
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
label: entry?.label,
displayName: entry?.displayName,
chatType: entry?.chatType,
channel: entry?.channel,
subject: entry?.subject,
room: entry?.room,
space: entry?.space,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
store[key] = next;
await saveSessionStore(storePath, store);
return {
ok: true,
payloadJSON: JSON.stringify({ ok: true, key, entry: next }),
};
}
case "sessions.delete": {
if (!validateSessionsDeleteParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid sessions.delete params: ${formatValidationErrors(validateSessionsDeleteParams.errors)}`,
},
};
}
const p = params as SessionsDeleteParams;
const key = String(p.key ?? "").trim();
if (!key) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: "key required",
},
};
}
const mainKey = resolveMainSessionKeyFromConfig();
if (key === mainKey) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `Cannot delete the main session (${mainKey}).`,
},
};
}
const deleteTranscript =
typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true;
const { storePath, store, entry } = loadSessionEntry(key);
const sessionId = entry?.sessionId;
const existed = Boolean(store[key]);
clearCommandLane(resolveEmbeddedSessionLane(key));
if (sessionId && isEmbeddedPiRunActive(sessionId)) {
abortEmbeddedPiRun(sessionId);
const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000);
if (!ended) {
return {
ok: false,
error: {
code: ErrorCodes.UNAVAILABLE,
message: `Session ${key} is still active; try again in a moment.`,
},
};
}
}
if (existed) delete store[key];
await saveSessionStore(storePath, store);
const archived: string[] = [];
if (deleteTranscript && sessionId) {
for (const candidate of resolveSessionTranscriptCandidates(
sessionId,
storePath,
entry?.sessionFile,
)) {
if (!fs.existsSync(candidate)) continue;
try {
archived.push(archiveFileOnDisk(candidate, "deleted"));
} catch {
// Best-effort; deleting the store entry is the main operation.
}
}
}
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
key,
deleted: existed,
archived,
}),
};
}
case "sessions.compact": {
if (!validateSessionsCompactParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid sessions.compact params: ${formatValidationErrors(validateSessionsCompactParams.errors)}`,
},
};
}
const p = params as SessionsCompactParams;
const key = String(p.key ?? "").trim();
if (!key) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: "key required",
},
};
}
const maxLines =
typeof p.maxLines === "number" && Number.isFinite(p.maxLines)
? Math.max(1, Math.floor(p.maxLines))
: 400;
const { storePath, store, entry } = loadSessionEntry(key);
const sessionId = entry?.sessionId;
if (!sessionId) {
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
key,
compacted: false,
reason: "no sessionId",
}),
};
}
const filePath = resolveSessionTranscriptCandidates(
sessionId,
storePath,
entry?.sessionFile,
).find((candidate) => fs.existsSync(candidate));
if (!filePath) {
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
key,
compacted: false,
reason: "no transcript",
}),
};
}
const raw = fs.readFileSync(filePath, "utf-8");
const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0);
if (lines.length <= maxLines) {
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
key,
compacted: false,
kept: lines.length,
}),
};
}
const archived = archiveFileOnDisk(filePath, "bak");
const keptLines = lines.slice(-maxLines);
fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8");
// Token counts no longer match; clear so status + UI reflect reality after the next turn.
if (store[key]) {
delete store[key].inputTokens;
delete store[key].outputTokens;
delete store[key].totalTokens;
store[key].updatedAt = Date.now();
await saveSessionStore(storePath, store);
}
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
key,
compacted: true,
archived,
kept: keptLines.length,
}),
};
}
default:
return null;
}
};

View File

@@ -0,0 +1,81 @@
import {
loadVoiceWakeConfig,
setVoiceWakeTriggers,
} from "../infra/voicewake.js";
import {
ErrorCodes,
formatValidationErrors,
validateModelsListParams,
validateTalkModeParams,
} from "./protocol/index.js";
import type { BridgeMethodHandler } from "./server-bridge-types.js";
import { HEALTH_REFRESH_INTERVAL_MS } from "./server-constants.js";
import { normalizeVoiceWakeTriggers } from "./server-utils.js";
export const handleSystemBridgeMethods: BridgeMethodHandler = async (
ctx,
_nodeId,
method,
params,
) => {
switch (method) {
case "voicewake.get": {
const cfg = await loadVoiceWakeConfig();
return {
ok: true,
payloadJSON: JSON.stringify({ triggers: cfg.triggers }),
};
}
case "voicewake.set": {
const triggers = normalizeVoiceWakeTriggers(params.triggers);
const cfg = await setVoiceWakeTriggers(triggers);
ctx.broadcastVoiceWakeChanged(cfg.triggers);
return {
ok: true,
payloadJSON: JSON.stringify({ triggers: cfg.triggers }),
};
}
case "health": {
const now = Date.now();
const cached = ctx.getHealthCache();
if (cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) {
return { ok: true, payloadJSON: JSON.stringify(cached) };
}
const snap = await ctx.refreshHealthSnapshot({ probe: false });
return { ok: true, payloadJSON: JSON.stringify(snap) };
}
case "talk.mode": {
if (!validateTalkModeParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid talk.mode params: ${formatValidationErrors(validateTalkModeParams.errors)}`,
},
};
}
const payload = {
enabled: (params as { enabled: boolean }).enabled,
phase: (params as { phase?: string }).phase ?? null,
ts: Date.now(),
};
ctx.broadcast("talk.mode", payload, { dropIfSlow: true });
return { ok: true, payloadJSON: JSON.stringify(payload) };
}
case "models.list": {
if (!validateModelsListParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid models.list params: ${formatValidationErrors(validateModelsListParams.errors)}`,
},
};
}
const models = await ctx.loadGatewayModelCatalog();
return { ok: true, payloadJSON: JSON.stringify({ models }) };
}
default:
return null;
}
};

View File

@@ -0,0 +1,281 @@
import type { ModelCatalogEntry } from "../agents/model-catalog.js";
import type {
CanvasHostHandler,
CanvasHostServer,
} from "../canvas-host/server.js";
import { startCanvasHost } from "../canvas-host/server.js";
import type { CliDeps } from "../cli/deps.js";
import type { HealthSummary } from "../commands/health.js";
import {
deriveDefaultBridgePort,
deriveDefaultCanvasHostPort,
} from "../config/port-defaults.js";
import type { NodeBridgeServer } from "../infra/bridge/server.js";
import {
pickPrimaryTailnetIPv4,
pickPrimaryTailnetIPv6,
} from "../infra/tailnet.js";
import type { RuntimeEnv } from "../runtime.js";
import type { ChatAbortControllerEntry } from "./chat-abort.js";
import { createBridgeHandlers } from "./server-bridge.js";
import {
type BridgeListConnectedFn,
type BridgeSendEventFn,
createBridgeSubscriptionManager,
} from "./server-bridge-subscriptions.js";
import type { ChatRunEntry } from "./server-chat.js";
import { startGatewayDiscovery } from "./server-discovery-runtime.js";
import { loadGatewayModelCatalog } from "./server-model-catalog.js";
import { startGatewayNodeBridge } from "./server-node-bridge.js";
import type { DedupeEntry } from "./server-shared.js";
export type GatewayBridgeRuntime = {
bridge: import("../infra/bridge/server.js").NodeBridgeServer | null;
bridgeHost: string | null;
bridgePort: number;
canvasHostServer: CanvasHostServer | null;
nodePresenceTimers: Map<string, ReturnType<typeof setInterval>>;
bonjourStop: (() => Promise<void>) | null;
bridgeSendToSession: (
sessionKey: string,
event: string,
payload: unknown,
) => void;
bridgeSendToAllSubscribed: (event: string, payload: unknown) => void;
broadcastVoiceWakeChanged: (triggers: string[]) => void;
};
export async function startGatewayBridgeRuntime(params: {
cfg: {
bridge?: {
enabled?: boolean;
port?: number;
bind?: "loopback" | "lan" | "auto" | "custom";
};
canvasHost?: { port?: number; root?: string; liveReload?: boolean };
discovery?: { wideArea?: { enabled?: boolean } };
};
port: number;
canvasHostEnabled: boolean;
canvasHost: CanvasHostHandler | null;
canvasRuntime: RuntimeEnv;
allowCanvasHostInTests?: boolean;
machineDisplayName: string;
deps: CliDeps;
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
dedupe: Map<string, DedupeEntry>;
agentRunSeq: Map<string, number>;
chatRunState: { abortedRuns: Map<string, number> };
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
addChatRun: (sessionId: string, entry: ChatRunEntry) => void;
removeChatRun: (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => ChatRunEntry | undefined;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
getHealthCache: () => HealthSummary | null;
refreshGatewayHealthSnapshot: (opts?: {
probe?: boolean;
}) => Promise<HealthSummary>;
loadGatewayModelCatalog?: () => Promise<ModelCatalogEntry[]>;
logBridge: { info: (msg: string) => void; warn: (msg: string) => void };
logCanvas: { warn: (msg: string) => void };
logDiscovery: { info: (msg: string) => void; warn: (msg: string) => void };
}): Promise<GatewayBridgeRuntime> {
const wideAreaDiscoveryEnabled =
params.cfg.discovery?.wideArea?.enabled === true;
const bridgeEnabled = (() => {
if (params.cfg.bridge?.enabled !== undefined)
return params.cfg.bridge.enabled === true;
return process.env.CLAWDBOT_BRIDGE_ENABLED !== "0";
})();
const bridgePort = (() => {
if (
typeof params.cfg.bridge?.port === "number" &&
params.cfg.bridge.port > 0
) {
return params.cfg.bridge.port;
}
if (process.env.CLAWDBOT_BRIDGE_PORT !== undefined) {
const parsed = Number.parseInt(process.env.CLAWDBOT_BRIDGE_PORT, 10);
return Number.isFinite(parsed) && parsed > 0
? parsed
: deriveDefaultBridgePort(params.port);
}
return deriveDefaultBridgePort(params.port);
})();
const bridgeHost = (() => {
// Back-compat: allow an env var override when no bind policy is configured.
if (params.cfg.bridge?.bind === undefined) {
const env = process.env.CLAWDBOT_BRIDGE_HOST?.trim();
if (env) return env;
}
const bind =
params.cfg.bridge?.bind ?? (wideAreaDiscoveryEnabled ? "auto" : "lan");
if (bind === "loopback") return "127.0.0.1";
if (bind === "lan") return "0.0.0.0";
const tailnetIPv4 = pickPrimaryTailnetIPv4();
const tailnetIPv6 = pickPrimaryTailnetIPv6();
if (bind === "auto") {
return tailnetIPv4 ?? tailnetIPv6 ?? "0.0.0.0";
}
if (bind === "custom") {
// For bridge, customBindHost is not currently supported on GatewayConfig.
// This will fall back to "0.0.0.0" until we add customBindHost to BridgeConfig.
return "0.0.0.0";
}
return "0.0.0.0";
})();
const canvasHostPort = (() => {
if (process.env.CLAWDBOT_CANVAS_HOST_PORT !== undefined) {
const parsed = Number.parseInt(process.env.CLAWDBOT_CANVAS_HOST_PORT, 10);
if (Number.isFinite(parsed) && parsed > 0) return parsed;
return deriveDefaultCanvasHostPort(params.port);
}
const configured = params.cfg.canvasHost?.port;
if (typeof configured === "number" && configured > 0) return configured;
return deriveDefaultCanvasHostPort(params.port);
})();
let canvasHostServer: CanvasHostServer | null = null;
if (params.canvasHostEnabled && bridgeEnabled && bridgeHost) {
try {
const started = await startCanvasHost({
runtime: params.canvasRuntime,
rootDir: params.cfg.canvasHost?.root,
port: canvasHostPort,
listenHost: bridgeHost,
allowInTests: params.allowCanvasHostInTests,
liveReload: params.cfg.canvasHost?.liveReload,
handler: params.canvasHost ?? undefined,
ownsHandler: params.canvasHost ? false : undefined,
});
if (started.port > 0) {
canvasHostServer = started;
}
} catch (err) {
params.logCanvas.warn(
`failed to start on ${bridgeHost}:${canvasHostPort}: ${String(err)}`,
);
}
}
let bridge: NodeBridgeServer | null = null;
const bridgeSubscriptions = createBridgeSubscriptionManager();
const bridgeSubscribe = bridgeSubscriptions.subscribe;
const bridgeUnsubscribe = bridgeSubscriptions.unsubscribe;
const bridgeUnsubscribeAll = bridgeSubscriptions.unsubscribeAll;
const bridgeSendEvent: BridgeSendEventFn = (opts) => {
bridge?.sendEvent(opts);
};
const bridgeListConnected: BridgeListConnectedFn = () =>
bridge?.listConnected() ?? [];
const bridgeSendToSession = (
sessionKey: string,
event: string,
payload: unknown,
) =>
bridgeSubscriptions.sendToSession(
sessionKey,
event,
payload,
bridgeSendEvent,
);
const bridgeSendToAllSubscribed = (event: string, payload: unknown) =>
bridgeSubscriptions.sendToAllSubscribed(event, payload, bridgeSendEvent);
const bridgeSendToAllConnected = (event: string, payload: unknown) =>
bridgeSubscriptions.sendToAllConnected(
event,
payload,
bridgeListConnected,
bridgeSendEvent,
);
const broadcastVoiceWakeChanged = (triggers: string[]) => {
const payload = { triggers };
params.broadcast("voicewake.changed", payload, { dropIfSlow: true });
bridgeSendToAllConnected("voicewake.changed", payload);
};
const { handleBridgeRequest, handleBridgeEvent } = createBridgeHandlers({
deps: params.deps,
broadcast: params.broadcast,
bridgeSendToSession,
bridgeSubscribe,
bridgeUnsubscribe,
broadcastVoiceWakeChanged,
addChatRun: params.addChatRun,
removeChatRun: params.removeChatRun,
chatAbortControllers: params.chatAbortControllers,
chatAbortedRuns: params.chatRunState.abortedRuns,
chatRunBuffers: params.chatRunBuffers,
chatDeltaSentAt: params.chatDeltaSentAt,
dedupe: params.dedupe,
agentRunSeq: params.agentRunSeq,
getHealthCache: params.getHealthCache,
refreshHealthSnapshot: params.refreshGatewayHealthSnapshot,
loadGatewayModelCatalog:
params.loadGatewayModelCatalog ?? loadGatewayModelCatalog,
logBridge: params.logBridge,
});
const canvasHostPortForBridge = canvasHostServer?.port;
const canvasHostHostForBridge =
canvasHostServer &&
bridgeHost &&
bridgeHost !== "0.0.0.0" &&
bridgeHost !== "::"
? bridgeHost
: undefined;
const bridgeRuntime = await startGatewayNodeBridge({
bridgeEnabled,
bridgePort,
bridgeHost,
machineDisplayName: params.machineDisplayName,
canvasHostPort: canvasHostPortForBridge,
canvasHostHost: canvasHostHostForBridge,
broadcast: params.broadcast,
bridgeUnsubscribeAll,
handleBridgeRequest,
handleBridgeEvent,
logBridge: params.logBridge,
});
bridge = bridgeRuntime.bridge;
const discovery = await startGatewayDiscovery({
machineDisplayName: params.machineDisplayName,
port: params.port,
bridgePort: bridge?.port,
canvasPort: canvasHostPortForBridge,
wideAreaDiscoveryEnabled,
logDiscovery: params.logDiscovery,
});
return {
bridge,
bridgeHost,
bridgePort,
canvasHostServer,
nodePresenceTimers: bridgeRuntime.nodePresenceTimers,
bonjourStop: discovery.bonjourStop,
bridgeSendToSession,
bridgeSendToAllSubscribed,
broadcastVoiceWakeChanged,
};
}

View File

@@ -0,0 +1,66 @@
import type { ModelCatalogEntry } from "../agents/model-catalog.js";
import type { CliDeps } from "../cli/deps.js";
import type { HealthSummary } from "../commands/health.js";
import type { ChatAbortControllerEntry } from "./chat-abort.js";
import type { ChatRunEntry } from "./server-chat.js";
import type { DedupeEntry } from "./server-shared.js";
export type BridgeHandlersContext = {
deps: CliDeps;
broadcast: (
event: string,
payload: unknown,
opts?: { dropIfSlow?: boolean },
) => void;
bridgeSendToSession: (
sessionKey: string,
event: string,
payload: unknown,
) => void;
bridgeSubscribe: (nodeId: string, sessionKey: string) => void;
bridgeUnsubscribe: (nodeId: string, sessionKey: string) => void;
broadcastVoiceWakeChanged: (triggers: string[]) => void;
addChatRun: (sessionId: string, entry: ChatRunEntry) => void;
removeChatRun: (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => ChatRunEntry | undefined;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatAbortedRuns: Map<string, number>;
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
dedupe: Map<string, DedupeEntry>;
agentRunSeq: Map<string, number>;
getHealthCache: () => HealthSummary | null;
refreshHealthSnapshot: (opts?: { probe?: boolean }) => Promise<HealthSummary>;
loadGatewayModelCatalog: () => Promise<ModelCatalogEntry[]>;
logBridge: { warn: (msg: string) => void };
};
export type BridgeRequest = {
id: string;
method: string;
paramsJSON?: string | null;
};
export type BridgeEvent = {
event: string;
payloadJSON?: string | null;
};
export type BridgeResponse =
| { ok: true; payloadJSON?: string | null }
| {
ok: false;
error: { code: string; message: string; details?: unknown };
};
export type BridgeRequestParams = Record<string, unknown>;
export type BridgeMethodHandler = (
ctx: BridgeHandlersContext,
nodeId: string,
method: string,
params: BridgeRequestParams,
) => Promise<BridgeResponse | null>;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,56 @@
import type { GatewayWsClient } from "./server/ws-types.js";
import { MAX_BUFFERED_BYTES } from "./server-constants.js";
import { logWs, summarizeAgentEventForWsLog } from "./ws-log.js";
export function createGatewayBroadcaster(params: {
clients: Set<GatewayWsClient>;
}) {
let seq = 0;
const broadcast = (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => {
const eventSeq = ++seq;
const frame = JSON.stringify({
type: "event",
event,
payload,
seq: eventSeq,
stateVersion: opts?.stateVersion,
});
const logMeta: Record<string, unknown> = {
event,
seq: eventSeq,
clients: params.clients.size,
dropIfSlow: opts?.dropIfSlow,
presenceVersion: opts?.stateVersion?.presence,
healthVersion: opts?.stateVersion?.health,
};
if (event === "agent") {
Object.assign(logMeta, summarizeAgentEventForWsLog(payload));
}
logWs("out", "event", logMeta);
for (const c of params.clients) {
const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES;
if (slow && opts?.dropIfSlow) continue;
if (slow) {
try {
c.socket.close(1008, "slow consumer");
} catch {
/* ignore */
}
continue;
}
try {
c.socket.send(frame);
} catch {
/* ignore */
}
}
};
return { broadcast };
}

138
src/gateway/server-close.ts Normal file
View File

@@ -0,0 +1,138 @@
import type { Server as HttpServer } from "node:http";
import type { WebSocketServer } from "ws";
import type {
CanvasHostHandler,
CanvasHostServer,
} from "../canvas-host/server.js";
import {
type ChannelId,
listChannelPlugins,
} from "../channels/plugins/index.js";
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
import type { NodeBridgeServer } from "../infra/bridge/server.js";
import type { PluginServicesHandle } from "../plugins/services.js";
export function createGatewayCloseHandler(params: {
bonjourStop: (() => Promise<void>) | null;
tailscaleCleanup: (() => Promise<void>) | null;
canvasHost: CanvasHostHandler | null;
canvasHostServer: CanvasHostServer | null;
bridge: NodeBridgeServer | null;
stopChannel: (name: ChannelId, accountId?: string) => Promise<void>;
pluginServices: PluginServicesHandle | null;
cron: { stop: () => void };
heartbeatRunner: { stop: () => void };
nodePresenceTimers: Map<string, ReturnType<typeof setInterval>>;
broadcast: (
event: string,
payload: unknown,
opts?: { dropIfSlow?: boolean },
) => void;
tickInterval: ReturnType<typeof setInterval>;
healthInterval: ReturnType<typeof setInterval>;
dedupeCleanup: ReturnType<typeof setInterval>;
agentUnsub: (() => void) | null;
heartbeatUnsub: (() => void) | null;
chatRunState: { clear: () => void };
clients: Set<{ socket: { close: (code: number, reason: string) => void } }>;
configReloader: { stop: () => Promise<void> };
browserControl: { stop: () => Promise<void> } | null;
wss: WebSocketServer;
httpServer: HttpServer;
}) {
return async (opts?: {
reason?: string;
restartExpectedMs?: number | null;
}) => {
const reasonRaw =
typeof opts?.reason === "string" ? opts.reason.trim() : "";
const reason = reasonRaw || "gateway stopping";
const restartExpectedMs =
typeof opts?.restartExpectedMs === "number" &&
Number.isFinite(opts.restartExpectedMs)
? Math.max(0, Math.floor(opts.restartExpectedMs))
: null;
if (params.bonjourStop) {
try {
await params.bonjourStop();
} catch {
/* ignore */
}
}
if (params.tailscaleCleanup) {
await params.tailscaleCleanup();
}
if (params.canvasHost) {
try {
await params.canvasHost.close();
} catch {
/* ignore */
}
}
if (params.canvasHostServer) {
try {
await params.canvasHostServer.close();
} catch {
/* ignore */
}
}
if (params.bridge) {
try {
await params.bridge.close();
} catch {
/* ignore */
}
}
for (const plugin of listChannelPlugins()) {
await params.stopChannel(plugin.id);
}
if (params.pluginServices) {
await params.pluginServices.stop().catch(() => {});
}
await stopGmailWatcher();
params.cron.stop();
params.heartbeatRunner.stop();
for (const timer of params.nodePresenceTimers.values()) {
clearInterval(timer);
}
params.nodePresenceTimers.clear();
params.broadcast("shutdown", {
reason,
restartExpectedMs,
});
clearInterval(params.tickInterval);
clearInterval(params.healthInterval);
clearInterval(params.dedupeCleanup);
if (params.agentUnsub) {
try {
params.agentUnsub();
} catch {
/* ignore */
}
}
if (params.heartbeatUnsub) {
try {
params.heartbeatUnsub();
} catch {
/* ignore */
}
}
params.chatRunState.clear();
for (const c of params.clients) {
try {
c.socket.close(1012, "service restart");
} catch {
/* ignore */
}
}
params.clients.clear();
await params.configReloader.stop().catch(() => {});
if (params.browserControl) {
await params.browserControl.stop().catch(() => {});
}
await new Promise<void>((resolve) => params.wss.close(() => resolve()));
await new Promise<void>((resolve, reject) =>
params.httpServer.close((err) => (err ? reject(err) : resolve())),
);
};
}

119
src/gateway/server-cron.ts Normal file
View File

@@ -0,0 +1,119 @@
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
import type { CliDeps } from "../cli/deps.js";
import { loadConfig } from "../config/config.js";
import { resolveAgentMainSessionKey } from "../config/sessions.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
import { CronService } from "../cron/service.js";
import { resolveCronStorePath } from "../cron/store.js";
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
export type GatewayCronState = {
cron: CronService;
storePath: string;
cronEnabled: boolean;
};
export function buildGatewayCronService(params: {
cfg: ReturnType<typeof loadConfig>;
deps: CliDeps;
broadcast: (
event: string,
payload: unknown,
opts?: { dropIfSlow?: boolean },
) => void;
}): GatewayCronState {
const cronLogger = getChildLogger({ module: "cron" });
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const cronEnabled =
process.env.CLAWDBOT_SKIP_CRON !== "1" &&
params.cfg.cron?.enabled !== false;
const resolveCronAgent = (requested?: string | null) => {
const runtimeConfig = loadConfig();
const normalized =
typeof requested === "string" && requested.trim()
? normalizeAgentId(requested)
: undefined;
const hasAgent =
normalized !== undefined &&
Array.isArray(runtimeConfig.agents?.list) &&
runtimeConfig.agents.list.some(
(entry) =>
entry &&
typeof entry.id === "string" &&
normalizeAgentId(entry.id) === normalized,
);
const agentId = hasAgent
? normalized
: resolveDefaultAgentId(runtimeConfig);
return { agentId, cfg: runtimeConfig };
};
const cron = new CronService({
storePath,
cronEnabled,
enqueueSystemEvent: (text, opts) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId);
const sessionKey = resolveAgentMainSessionKey({
cfg: runtimeConfig,
agentId,
});
enqueueSystemEvent(text, { sessionKey });
},
requestHeartbeatNow,
runHeartbeatOnce: async (opts) => {
const runtimeConfig = loadConfig();
return await runHeartbeatOnce({
cfg: runtimeConfig,
reason: opts?.reason,
deps: { ...params.deps, runtime: defaultRuntime },
});
},
runIsolatedAgentJob: async ({ job, message }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
return await runCronIsolatedAgentTurn({
cfg: runtimeConfig,
deps: params.deps,
job,
message,
agentId,
sessionKey: `cron:${job.id}`,
lane: "cron",
});
},
log: getChildLogger({ module: "cron", storePath }),
onEvent: (evt) => {
params.broadcast("cron", evt, { dropIfSlow: true });
if (evt.action === "finished") {
const logPath = resolveCronRunLogPath({
storePath,
jobId: evt.jobId,
});
void appendCronRunLog(logPath, {
ts: Date.now(),
jobId: evt.jobId,
action: "finished",
status: evt.status,
error: evt.error,
summary: evt.summary,
runAtMs: evt.runAtMs,
durationMs: evt.durationMs,
nextRunAtMs: evt.nextRunAtMs,
}).catch((err) => {
cronLogger.warn(
{ err: String(err), logPath },
"cron: run log append failed",
);
});
}
},
});
return { cron, storePath, cronEnabled };
}

View File

@@ -0,0 +1,79 @@
import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js";
import {
pickPrimaryTailnetIPv4,
pickPrimaryTailnetIPv6,
} from "../infra/tailnet.js";
import {
WIDE_AREA_DISCOVERY_DOMAIN,
writeWideAreaBridgeZone,
} from "../infra/widearea-dns.js";
import {
formatBonjourInstanceName,
resolveBonjourCliPath,
resolveTailnetDnsHint,
} from "./server-discovery.js";
export async function startGatewayDiscovery(params: {
machineDisplayName: string;
port: number;
bridgePort?: number;
canvasPort?: number;
wideAreaDiscoveryEnabled: boolean;
logDiscovery: { info: (msg: string) => void; warn: (msg: string) => void };
}) {
let bonjourStop: (() => Promise<void>) | null = null;
const tailnetDns = await resolveTailnetDnsHint();
const sshPortEnv = process.env.CLAWDBOT_SSH_PORT?.trim();
const sshPortParsed = sshPortEnv ? Number.parseInt(sshPortEnv, 10) : NaN;
const sshPort =
Number.isFinite(sshPortParsed) && sshPortParsed > 0
? sshPortParsed
: undefined;
try {
const bonjour = await startGatewayBonjourAdvertiser({
instanceName: formatBonjourInstanceName(params.machineDisplayName),
gatewayPort: params.port,
bridgePort: params.bridgePort,
canvasPort: params.canvasPort,
sshPort,
tailnetDns,
cliPath: resolveBonjourCliPath(),
});
bonjourStop = bonjour.stop;
} catch (err) {
params.logDiscovery.warn(`bonjour advertising failed: ${String(err)}`);
}
if (params.wideAreaDiscoveryEnabled && params.bridgePort) {
const tailnetIPv4 = pickPrimaryTailnetIPv4();
if (!tailnetIPv4) {
params.logDiscovery.warn(
"discovery.wideArea.enabled is true, but no Tailscale IPv4 address was found; skipping unicast DNS-SD zone update",
);
} else {
try {
const tailnetIPv6 = pickPrimaryTailnetIPv6();
const result = await writeWideAreaBridgeZone({
bridgePort: params.bridgePort,
gatewayPort: params.port,
displayName: formatBonjourInstanceName(params.machineDisplayName),
tailnetIPv4,
tailnetIPv6: tailnetIPv6 ?? undefined,
tailnetDns,
sshPort,
cliPath: resolveBonjourCliPath(),
});
params.logDiscovery.info(
`wide-area DNS-SD ${result.changed ? "updated" : "unchanged"} (${WIDE_AREA_DISCOVERY_DOMAIN}${result.zonePath})`,
);
} catch (err) {
params.logDiscovery.warn(
`wide-area discovery update failed: ${String(err)}`,
);
}
}
}
return { bonjourStop };
}

View File

@@ -0,0 +1,13 @@
import type { loadConfig } from "../config/config.js";
import { setCommandLaneConcurrency } from "../process/command-queue.js";
export function applyGatewayLaneConcurrency(
cfg: ReturnType<typeof loadConfig>,
) {
setCommandLaneConcurrency("cron", cfg.cron?.maxConcurrentRuns ?? 1);
setCommandLaneConcurrency("main", cfg.agents?.defaults?.maxConcurrent ?? 1);
setCommandLaneConcurrency(
"subagent",
cfg.agents?.defaults?.subagents?.maxConcurrent ?? 1,
);
}

View File

@@ -0,0 +1,129 @@
import type { HealthSummary } from "../commands/health.js";
import {
abortChatRunById,
type ChatAbortControllerEntry,
} from "./chat-abort.js";
import { setBroadcastHealthUpdate } from "./server/health-state.js";
import type { ChatRunEntry } from "./server-chat.js";
import {
DEDUPE_MAX,
DEDUPE_TTL_MS,
HEALTH_REFRESH_INTERVAL_MS,
TICK_INTERVAL_MS,
} from "./server-constants.js";
import type { DedupeEntry } from "./server-shared.js";
import { formatError } from "./server-utils.js";
export function startGatewayMaintenanceTimers(params: {
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
bridgeSendToAllSubscribed: (event: string, payload: unknown) => void;
getPresenceVersion: () => number;
getHealthVersion: () => number;
refreshGatewayHealthSnapshot: (opts?: {
probe?: boolean;
}) => Promise<HealthSummary>;
logHealth: { error: (msg: string) => void };
dedupe: Map<string, DedupeEntry>;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunState: { abortedRuns: Map<string, number> };
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
removeChatRun: (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => ChatRunEntry | undefined;
agentRunSeq: Map<string, number>;
bridgeSendToSession: (
sessionKey: string,
event: string,
payload: unknown,
) => void;
}): {
tickInterval: ReturnType<typeof setInterval>;
healthInterval: ReturnType<typeof setInterval>;
dedupeCleanup: ReturnType<typeof setInterval>;
} {
setBroadcastHealthUpdate((snap: HealthSummary) => {
params.broadcast("health", snap, {
stateVersion: {
presence: params.getPresenceVersion(),
health: params.getHealthVersion(),
},
});
params.bridgeSendToAllSubscribed("health", snap);
});
// periodic keepalive
const tickInterval = setInterval(() => {
const payload = { ts: Date.now() };
params.broadcast("tick", payload, { dropIfSlow: true });
params.bridgeSendToAllSubscribed("tick", payload);
}, TICK_INTERVAL_MS);
// periodic health refresh to keep cached snapshot warm
const healthInterval = setInterval(() => {
void params
.refreshGatewayHealthSnapshot({ probe: true })
.catch((err) =>
params.logHealth.error(`refresh failed: ${formatError(err)}`),
);
}, HEALTH_REFRESH_INTERVAL_MS);
// Prime cache so first client gets a snapshot without waiting.
void params
.refreshGatewayHealthSnapshot({ probe: true })
.catch((err) =>
params.logHealth.error(`initial refresh failed: ${formatError(err)}`),
);
// dedupe cache cleanup
const dedupeCleanup = setInterval(() => {
const now = Date.now();
for (const [k, v] of params.dedupe) {
if (now - v.ts > DEDUPE_TTL_MS) params.dedupe.delete(k);
}
if (params.dedupe.size > DEDUPE_MAX) {
const entries = [...params.dedupe.entries()].sort(
(a, b) => a[1].ts - b[1].ts,
);
for (let i = 0; i < params.dedupe.size - DEDUPE_MAX; i++) {
params.dedupe.delete(entries[i][0]);
}
}
for (const [runId, entry] of params.chatAbortControllers) {
if (now <= entry.expiresAtMs) continue;
abortChatRunById(
{
chatAbortControllers: params.chatAbortControllers,
chatRunBuffers: params.chatRunBuffers,
chatDeltaSentAt: params.chatDeltaSentAt,
chatAbortedRuns: params.chatRunState.abortedRuns,
removeChatRun: params.removeChatRun,
agentRunSeq: params.agentRunSeq,
broadcast: params.broadcast,
bridgeSendToSession: params.bridgeSendToSession,
},
{ runId, sessionKey: entry.sessionKey, stopReason: "timeout" },
);
}
const ABORTED_RUN_TTL_MS = 60 * 60_000;
for (const [runId, abortedAt] of params.chatRunState.abortedRuns) {
if (now - abortedAt <= ABORTED_RUN_TTL_MS) continue;
params.chatRunState.abortedRuns.delete(runId);
params.chatRunBuffers.delete(runId);
params.chatDeltaSentAt.delete(runId);
}
}, 60_000);
return { tickInterval, healthInterval, dedupeCleanup };
}

View File

@@ -0,0 +1,83 @@
import { listChannelPlugins } from "../channels/plugins/index.js";
const BASE_METHODS = [
"health",
"logs.tail",
"channels.status",
"channels.logout",
"status",
"usage.status",
"config.get",
"config.set",
"config.apply",
"config.schema",
"wizard.start",
"wizard.next",
"wizard.cancel",
"wizard.status",
"talk.mode",
"models.list",
"agents.list",
"skills.status",
"skills.install",
"skills.update",
"update.run",
"voicewake.get",
"voicewake.set",
"sessions.list",
"sessions.patch",
"sessions.reset",
"sessions.delete",
"sessions.compact",
"last-heartbeat",
"set-heartbeats",
"wake",
"node.pair.request",
"node.pair.list",
"node.pair.approve",
"node.pair.reject",
"node.pair.verify",
"node.rename",
"node.list",
"node.describe",
"node.invoke",
"cron.list",
"cron.status",
"cron.add",
"cron.update",
"cron.remove",
"cron.run",
"cron.runs",
"system-presence",
"system-event",
"send",
"agent",
"agent.wait",
// WebChat WebSocket-native chat methods
"chat.history",
"chat.abort",
"chat.send",
];
const CHANNEL_METHODS = listChannelPlugins().flatMap(
(plugin) => plugin.gatewayMethods ?? [],
);
export const GATEWAY_METHODS = Array.from(
new Set([...BASE_METHODS, ...CHANNEL_METHODS]),
);
export const GATEWAY_EVENTS = [
"agent",
"chat",
"presence",
"tick",
"talk.mode",
"shutdown",
"health",
"heartbeat",
"cron",
"node.pair.requested",
"node.pair.resolved",
"voicewake.changed",
];

View File

@@ -0,0 +1,16 @@
type BridgeLike = {
listConnected?: () => Array<{ platform?: string | null }>;
};
const isMobilePlatform = (platform: unknown): boolean => {
const p = typeof platform === "string" ? platform.trim().toLowerCase() : "";
if (!p) return false;
return (
p.startsWith("ios") || p.startsWith("ipados") || p.startsWith("android")
);
};
export function hasConnectedMobileNode(bridge: BridgeLike | null): boolean {
const connected = bridge?.listConnected?.() ?? [];
return connected.some((n) => isMobilePlatform(n.platform));
}

View File

@@ -0,0 +1,19 @@
import {
loadModelCatalog,
type ModelCatalogEntry,
resetModelCatalogCacheForTest,
} from "../agents/model-catalog.js";
import { loadConfig } from "../config/config.js";
export type GatewayModelChoice = ModelCatalogEntry;
// Test-only escape hatch: model catalog is cached at module scope for the
// process lifetime, which is fine for the real gateway daemon, but makes
// isolated unit tests harder. Keep this intentionally obscure.
export function __resetModelCatalogCacheForTest() {
resetModelCatalogCacheForTest();
}
export async function loadGatewayModelCatalog(): Promise<GatewayModelChoice[]> {
return await loadModelCatalog({ config: loadConfig() });
}

View File

@@ -0,0 +1,171 @@
import type { NodeBridgeServer } from "../infra/bridge/server.js";
import { startNodeBridgeServer } from "../infra/bridge/server.js";
import {
listSystemPresence,
upsertPresence,
} from "../infra/system-presence.js";
import { loadVoiceWakeConfig } from "../infra/voicewake.js";
import { isLoopbackAddress } from "./net.js";
import {
getHealthVersion,
getPresenceVersion,
incrementPresenceVersion,
} from "./server/health-state.js";
import type {
BridgeEvent,
BridgeRequest,
BridgeResponse,
} from "./server-bridge-types.js";
export type GatewayNodeBridgeRuntime = {
bridge: NodeBridgeServer | null;
nodePresenceTimers: Map<string, ReturnType<typeof setInterval>>;
};
export async function startGatewayNodeBridge(params: {
bridgeEnabled: boolean;
bridgePort: number;
bridgeHost: string | null;
machineDisplayName: string;
canvasHostPort?: number;
canvasHostHost?: string;
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
bridgeUnsubscribeAll: (nodeId: string) => void;
handleBridgeRequest: (
nodeId: string,
req: BridgeRequest,
) => Promise<BridgeResponse>;
handleBridgeEvent: (nodeId: string, evt: BridgeEvent) => Promise<void> | void;
logBridge: { info: (msg: string) => void; warn: (msg: string) => void };
}): Promise<GatewayNodeBridgeRuntime> {
const nodePresenceTimers = new Map<string, ReturnType<typeof setInterval>>();
const stopNodePresenceTimer = (nodeId: string) => {
const timer = nodePresenceTimers.get(nodeId);
if (timer) {
clearInterval(timer);
}
nodePresenceTimers.delete(nodeId);
};
const beaconNodePresence = (
node: {
nodeId: string;
displayName?: string;
remoteIp?: string;
version?: string;
platform?: string;
deviceFamily?: string;
modelIdentifier?: string;
},
reason: string,
) => {
const host = node.displayName?.trim() || node.nodeId;
const rawIp = node.remoteIp?.trim();
const ip = rawIp && !isLoopbackAddress(rawIp) ? rawIp : undefined;
const version = node.version?.trim() || "unknown";
const platform = node.platform?.trim() || undefined;
const deviceFamily = node.deviceFamily?.trim() || undefined;
const modelIdentifier = node.modelIdentifier?.trim() || undefined;
const text = `Node: ${host}${ip ? ` (${ip})` : ""} · app ${version} · last input 0s ago · mode remote · reason ${reason}`;
upsertPresence(node.nodeId, {
host,
ip,
version,
platform,
deviceFamily,
modelIdentifier,
mode: "remote",
reason,
lastInputSeconds: 0,
instanceId: node.nodeId,
text,
});
incrementPresenceVersion();
params.broadcast(
"presence",
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: {
presence: getPresenceVersion(),
health: getHealthVersion(),
},
},
);
};
const startNodePresenceTimer = (node: { nodeId: string }) => {
stopNodePresenceTimer(node.nodeId);
nodePresenceTimers.set(
node.nodeId,
setInterval(() => {
beaconNodePresence(node, "periodic");
}, 180_000),
);
};
if (params.bridgeEnabled && params.bridgePort > 0 && params.bridgeHost) {
try {
const started = await startNodeBridgeServer({
host: params.bridgeHost,
port: params.bridgePort,
serverName: params.machineDisplayName,
canvasHostPort: params.canvasHostPort,
canvasHostHost: params.canvasHostHost,
onRequest: (nodeId, req) => params.handleBridgeRequest(nodeId, req),
onAuthenticated: async (node) => {
beaconNodePresence(node, "node-connected");
startNodePresenceTimer(node);
try {
const cfg = await loadVoiceWakeConfig();
started.sendEvent({
nodeId: node.nodeId,
event: "voicewake.changed",
payloadJSON: JSON.stringify({ triggers: cfg.triggers }),
});
} catch {
// Best-effort only.
}
},
onDisconnected: (node) => {
params.bridgeUnsubscribeAll(node.nodeId);
stopNodePresenceTimer(node.nodeId);
beaconNodePresence(node, "node-disconnected");
},
onEvent: params.handleBridgeEvent,
onPairRequested: (request) => {
params.broadcast("node.pair.requested", request, {
dropIfSlow: true,
});
},
});
if (started.port > 0) {
params.logBridge.info(
`listening on tcp://${params.bridgeHost}:${started.port} (node)`,
);
return { bridge: started, nodePresenceTimers };
}
} catch (err) {
params.logBridge.warn(`failed to start: ${String(err)}`);
}
} else if (
params.bridgeEnabled &&
params.bridgePort > 0 &&
!params.bridgeHost
) {
params.logBridge.warn(
"bind policy requested tailnet IP, but no tailnet interface was found; refusing to start bridge",
);
}
return { bridge: null, nodePresenceTimers };
}

View File

@@ -0,0 +1,42 @@
import type { loadConfig } from "../config/config.js";
import { loadClawdbotPlugins } from "../plugins/loader.js";
import type { GatewayRequestHandler } from "./server-methods/types.js";
export function loadGatewayPlugins(params: {
cfg: ReturnType<typeof loadConfig>;
workspaceDir: string;
log: {
info: (msg: string) => void;
warn: (msg: string) => void;
error: (msg: string) => void;
debug: (msg: string) => void;
};
coreGatewayHandlers: Record<string, GatewayRequestHandler>;
baseMethods: string[];
}) {
const pluginRegistry = loadClawdbotPlugins({
config: params.cfg,
workspaceDir: params.workspaceDir,
logger: {
info: (msg) => params.log.info(msg),
warn: (msg) => params.log.warn(msg),
error: (msg) => params.log.error(msg),
debug: (msg) => params.log.debug(msg),
},
coreGatewayHandlers: params.coreGatewayHandlers,
});
const pluginMethods = Object.keys(pluginRegistry.gatewayHandlers);
const gatewayMethods = Array.from(
new Set([...params.baseMethods, ...pluginMethods]),
);
if (pluginRegistry.diagnostics.length > 0) {
for (const diag of pluginRegistry.diagnostics) {
if (diag.level === "error") {
params.log.warn(`[plugins] ${diag.message}`);
} else {
params.log.info(`[plugins] ${diag.message}`);
}
}
}
return { pluginRegistry, gatewayMethods };
}

View File

@@ -0,0 +1,178 @@
import type { CliDeps } from "../cli/deps.js";
import type { loadConfig } from "../config/config.js";
import { startGmailWatcher, stopGmailWatcher } from "../hooks/gmail-watcher.js";
import { startHeartbeatRunner } from "../infra/heartbeat-runner.js";
import { setCommandLaneConcurrency } from "../process/command-queue.js";
import type { ChannelKind, GatewayReloadPlan } from "./config-reload.js";
import { resolveHooksConfig } from "./hooks.js";
import { startBrowserControlServerIfEnabled } from "./server-browser.js";
import {
buildGatewayCronService,
type GatewayCronState,
} from "./server-cron.js";
type GatewayHotReloadState = {
hooksConfig: ReturnType<typeof resolveHooksConfig>;
heartbeatRunner: { stop: () => void };
cronState: GatewayCronState;
browserControl: Awaited<
ReturnType<typeof startBrowserControlServerIfEnabled>
> | null;
};
export function createGatewayReloadHandlers(params: {
deps: CliDeps;
broadcast: (
event: string,
payload: unknown,
opts?: { dropIfSlow?: boolean },
) => void;
getState: () => GatewayHotReloadState;
setState: (state: GatewayHotReloadState) => void;
startChannel: (name: ChannelKind) => Promise<void>;
stopChannel: (name: ChannelKind) => Promise<void>;
logHooks: {
info: (msg: string) => void;
warn: (msg: string) => void;
error: (msg: string) => void;
};
logBrowser: { error: (msg: string) => void };
logChannels: { info: (msg: string) => void; error: (msg: string) => void };
logCron: { error: (msg: string) => void };
logReload: { info: (msg: string) => void; warn: (msg: string) => void };
}) {
const applyHotReload = async (
plan: GatewayReloadPlan,
nextConfig: ReturnType<typeof loadConfig>,
) => {
const state = params.getState();
const nextState = { ...state };
if (plan.reloadHooks) {
try {
nextState.hooksConfig = resolveHooksConfig(nextConfig);
} catch (err) {
params.logHooks.warn(`hooks config reload failed: ${String(err)}`);
}
}
if (plan.restartHeartbeat) {
state.heartbeatRunner.stop();
nextState.heartbeatRunner = startHeartbeatRunner({ cfg: nextConfig });
}
if (plan.restartCron) {
state.cronState.cron.stop();
nextState.cronState = buildGatewayCronService({
cfg: nextConfig,
deps: params.deps,
broadcast: params.broadcast,
});
void nextState.cronState.cron
.start()
.catch((err) =>
params.logCron.error(`failed to start: ${String(err)}`),
);
}
if (plan.restartBrowserControl) {
if (state.browserControl) {
await state.browserControl.stop().catch(() => {});
}
try {
nextState.browserControl = await startBrowserControlServerIfEnabled();
} catch (err) {
params.logBrowser.error(`server failed to start: ${String(err)}`);
}
}
if (plan.restartGmailWatcher) {
await stopGmailWatcher().catch(() => {});
if (process.env.CLAWDBOT_SKIP_GMAIL_WATCHER !== "1") {
try {
const gmailResult = await startGmailWatcher(nextConfig);
if (gmailResult.started) {
params.logHooks.info("gmail watcher started");
} else if (
gmailResult.reason &&
gmailResult.reason !== "hooks not enabled" &&
gmailResult.reason !== "no gmail account configured"
) {
params.logHooks.warn(
`gmail watcher not started: ${gmailResult.reason}`,
);
}
} catch (err) {
params.logHooks.error(
`gmail watcher failed to start: ${String(err)}`,
);
}
} else {
params.logHooks.info(
"skipping gmail watcher restart (CLAWDBOT_SKIP_GMAIL_WATCHER=1)",
);
}
}
if (plan.restartChannels.size > 0) {
if (
process.env.CLAWDBOT_SKIP_CHANNELS === "1" ||
process.env.CLAWDBOT_SKIP_PROVIDERS === "1"
) {
params.logChannels.info(
"skipping channel reload (CLAWDBOT_SKIP_CHANNELS=1 or CLAWDBOT_SKIP_PROVIDERS=1)",
);
} else {
const restartChannel = async (name: ChannelKind) => {
params.logChannels.info(`restarting ${name} channel`);
await params.stopChannel(name);
await params.startChannel(name);
};
for (const channel of plan.restartChannels) {
await restartChannel(channel);
}
}
}
setCommandLaneConcurrency("cron", nextConfig.cron?.maxConcurrentRuns ?? 1);
setCommandLaneConcurrency(
"main",
nextConfig.agents?.defaults?.maxConcurrent ?? 1,
);
setCommandLaneConcurrency(
"subagent",
nextConfig.agents?.defaults?.subagents?.maxConcurrent ?? 1,
);
if (plan.hotReasons.length > 0) {
params.logReload.info(
`config hot reload applied (${plan.hotReasons.join(", ")})`,
);
} else if (plan.noopPaths.length > 0) {
params.logReload.info(
`config change applied (dynamic reads: ${plan.noopPaths.join(", ")})`,
);
}
params.setState(nextState);
};
const requestGatewayRestart = (
plan: GatewayReloadPlan,
_nextConfig: ReturnType<typeof loadConfig>,
) => {
const reasons = plan.restartReasons.length
? plan.restartReasons.join(", ")
: plan.changedPaths.join(", ");
params.logReload.warn(
`config change requires gateway restart (${reasons})`,
);
if (process.listenerCount("SIGUSR1") === 0) {
params.logReload.warn("no SIGUSR1 listener found; restart skipped");
return;
}
process.emit("SIGUSR1");
};
return { applyHotReload, requestGatewayRestart };
}

View File

@@ -0,0 +1,75 @@
import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js";
import { normalizeChannelId } from "../channels/plugins/index.js";
import type { CliDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
import {
consumeRestartSentinel,
formatRestartSentinelMessage,
summarizeRestartSentinel,
} from "../infra/restart-sentinel.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { defaultRuntime } from "../runtime.js";
import { loadSessionEntry } from "./session-utils.js";
export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
const sentinel = await consumeRestartSentinel();
if (!sentinel) return;
const payload = sentinel.payload;
const sessionKey = payload.sessionKey?.trim();
const message = formatRestartSentinelMessage(payload);
const summary = summarizeRestartSentinel(payload);
if (!sessionKey) {
const mainSessionKey = resolveMainSessionKeyFromConfig();
enqueueSystemEvent(message, { sessionKey: mainSessionKey });
return;
}
const { cfg, entry } = loadSessionEntry(sessionKey);
const lastChannel = entry?.lastChannel;
const lastTo = entry?.lastTo?.trim();
const parsedTarget = resolveAnnounceTargetFromKey(sessionKey);
const channelRaw = lastChannel ?? parsedTarget?.channel;
const channel = channelRaw ? normalizeChannelId(channelRaw) : null;
const to = lastTo || parsedTarget?.to;
if (!channel || !to) {
enqueueSystemEvent(message, { sessionKey });
return;
}
const resolved = resolveOutboundTarget({
channel,
to,
cfg,
accountId: parsedTarget?.accountId ?? entry?.lastAccountId,
mode: "implicit",
});
if (!resolved.ok) {
enqueueSystemEvent(message, { sessionKey });
return;
}
try {
await agentCommand(
{
message,
sessionKey,
to: resolved.to,
channel,
deliver: true,
bestEffortDeliver: true,
messageChannel: channel,
},
defaultRuntime,
params.deps,
);
} catch (err) {
enqueueSystemEvent(`${summary}\n${String(err)}`, { sessionKey });
}
}
export function shouldWakeFromRestartSentinel() {
return !process.env.VITEST && process.env.NODE_ENV !== "test";
}

View File

@@ -0,0 +1,105 @@
import type {
BridgeBindMode,
GatewayAuthConfig,
GatewayTailscaleConfig,
loadConfig,
} from "../config/config.js";
import {
assertGatewayAuthConfigured,
type ResolvedGatewayAuth,
resolveGatewayAuth,
} from "./auth.js";
import { normalizeControlUiBasePath } from "./control-ui.js";
import { resolveHooksConfig } from "./hooks.js";
import { isLoopbackHost, resolveGatewayBindHost } from "./net.js";
export type GatewayRuntimeConfig = {
bindHost: string;
controlUiEnabled: boolean;
openAiChatCompletionsEnabled: boolean;
controlUiBasePath: string;
resolvedAuth: ResolvedGatewayAuth;
authMode: ResolvedGatewayAuth["mode"];
tailscaleConfig: GatewayTailscaleConfig;
tailscaleMode: "off" | "serve" | "funnel";
hooksConfig: ReturnType<typeof resolveHooksConfig>;
canvasHostEnabled: boolean;
};
export async function resolveGatewayRuntimeConfig(params: {
cfg: ReturnType<typeof loadConfig>;
port: number;
bind?: BridgeBindMode;
host?: string;
controlUiEnabled?: boolean;
openAiChatCompletionsEnabled?: boolean;
auth?: GatewayAuthConfig;
tailscale?: GatewayTailscaleConfig;
}): Promise<GatewayRuntimeConfig> {
const bindMode = params.bind ?? params.cfg.gateway?.bind ?? "loopback";
const customBindHost = params.cfg.gateway?.customBindHost;
const bindHost =
params.host ?? (await resolveGatewayBindHost(bindMode, customBindHost));
const controlUiEnabled =
params.controlUiEnabled ?? params.cfg.gateway?.controlUi?.enabled ?? true;
const openAiChatCompletionsEnabled =
params.openAiChatCompletionsEnabled ??
params.cfg.gateway?.http?.endpoints?.chatCompletions?.enabled ??
false;
const controlUiBasePath = normalizeControlUiBasePath(
params.cfg.gateway?.controlUi?.basePath,
);
const authBase = params.cfg.gateway?.auth ?? {};
const authOverrides = params.auth ?? {};
const authConfig = {
...authBase,
...authOverrides,
};
const tailscaleBase = params.cfg.gateway?.tailscale ?? {};
const tailscaleOverrides = params.tailscale ?? {};
const tailscaleConfig = {
...tailscaleBase,
...tailscaleOverrides,
};
const tailscaleMode = tailscaleConfig.mode ?? "off";
const resolvedAuth = resolveGatewayAuth({
authConfig,
env: process.env,
tailscaleMode,
});
const authMode: ResolvedGatewayAuth["mode"] = resolvedAuth.mode;
const hooksConfig = resolveHooksConfig(params.cfg);
const canvasHostEnabled =
process.env.CLAWDBOT_SKIP_CANVAS_HOST !== "1" &&
params.cfg.canvasHost?.enabled !== false;
assertGatewayAuthConfigured(resolvedAuth);
if (tailscaleMode === "funnel" && authMode !== "password") {
throw new Error(
"tailscale funnel requires gateway auth mode=password (set gateway.auth.password or CLAWDBOT_GATEWAY_PASSWORD)",
);
}
if (tailscaleMode !== "off" && !isLoopbackHost(bindHost)) {
throw new Error(
"tailscale serve/funnel requires gateway bind=loopback (127.0.0.1)",
);
}
if (!isLoopbackHost(bindHost) && authMode === "none") {
throw new Error(
`refusing to bind gateway to ${bindHost}:${params.port} without auth (set gateway.auth.token or CLAWDBOT_GATEWAY_TOKEN, or pass --token)`,
);
}
return {
bindHost,
controlUiEnabled,
openAiChatCompletionsEnabled,
controlUiBasePath,
resolvedAuth,
authMode,
tailscaleConfig,
tailscaleMode,
hooksConfig,
canvasHostEnabled,
};
}

View File

@@ -0,0 +1,146 @@
import type { Server as HttpServer } from "node:http";
import { WebSocketServer } from "ws";
import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js";
import {
type CanvasHostHandler,
createCanvasHostHandler,
} from "../canvas-host/server.js";
import type { CliDeps } from "../cli/deps.js";
import type { createSubsystemLogger } from "../logging.js";
import type { RuntimeEnv } from "../runtime.js";
import type { ResolvedGatewayAuth } from "./auth.js";
import type { ChatAbortControllerEntry } from "./chat-abort.js";
import type { HooksConfigResolved } from "./hooks.js";
import { createGatewayHooksRequestHandler } from "./server/hooks.js";
import { listenGatewayHttpServer } from "./server/http-listen.js";
import type { GatewayWsClient } from "./server/ws-types.js";
import { createGatewayBroadcaster } from "./server-broadcast.js";
import { type ChatRunEntry, createChatRunState } from "./server-chat.js";
import { MAX_PAYLOAD_BYTES } from "./server-constants.js";
import {
attachGatewayUpgradeHandler,
createGatewayHttpServer,
} from "./server-http.js";
import type { DedupeEntry } from "./server-shared.js";
export async function createGatewayRuntimeState(params: {
cfg: {
canvasHost?: { root?: string; enabled?: boolean; liveReload?: boolean };
};
bindHost: string;
port: number;
controlUiEnabled: boolean;
controlUiBasePath: string;
openAiChatCompletionsEnabled: boolean;
resolvedAuth: ResolvedGatewayAuth;
hooksConfig: () => HooksConfigResolved | null;
deps: CliDeps;
canvasRuntime: RuntimeEnv;
canvasHostEnabled: boolean;
allowCanvasHostInTests?: boolean;
logCanvas: { info: (msg: string) => void; warn: (msg: string) => void };
logHooks: ReturnType<typeof createSubsystemLogger>;
}): Promise<{
canvasHost: CanvasHostHandler | null;
httpServer: HttpServer;
wss: WebSocketServer;
clients: Set<GatewayWsClient>;
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
agentRunSeq: Map<string, number>;
dedupe: Map<string, DedupeEntry>;
chatRunState: ReturnType<typeof createChatRunState>;
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
addChatRun: (sessionId: string, entry: ChatRunEntry) => void;
removeChatRun: (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => ChatRunEntry | undefined;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
}> {
let canvasHost: CanvasHostHandler | null = null;
if (params.canvasHostEnabled) {
try {
const handler = await createCanvasHostHandler({
runtime: params.canvasRuntime,
rootDir: params.cfg.canvasHost?.root,
basePath: CANVAS_HOST_PATH,
allowInTests: params.allowCanvasHostInTests,
liveReload: params.cfg.canvasHost?.liveReload,
});
if (handler.rootDir) {
canvasHost = handler;
params.logCanvas.info(
`canvas host mounted at http://${params.bindHost}:${params.port}${CANVAS_HOST_PATH}/ (root ${handler.rootDir})`,
);
}
} catch (err) {
params.logCanvas.warn(`canvas host failed to start: ${String(err)}`);
}
}
const handleHooksRequest = createGatewayHooksRequestHandler({
deps: params.deps,
getHooksConfig: params.hooksConfig,
bindHost: params.bindHost,
port: params.port,
logHooks: params.logHooks,
});
const httpServer = createGatewayHttpServer({
canvasHost,
controlUiEnabled: params.controlUiEnabled,
controlUiBasePath: params.controlUiBasePath,
openAiChatCompletionsEnabled: params.openAiChatCompletionsEnabled,
handleHooksRequest,
resolvedAuth: params.resolvedAuth,
});
await listenGatewayHttpServer({
httpServer,
bindHost: params.bindHost,
port: params.port,
});
const wss = new WebSocketServer({
noServer: true,
maxPayload: MAX_PAYLOAD_BYTES,
});
attachGatewayUpgradeHandler({ httpServer, wss, canvasHost });
const clients = new Set<GatewayWsClient>();
const { broadcast } = createGatewayBroadcaster({ clients });
const agentRunSeq = new Map<string, number>();
const dedupe = new Map<string, DedupeEntry>();
const chatRunState = createChatRunState();
const chatRunRegistry = chatRunState.registry;
const chatRunBuffers = chatRunState.buffers;
const chatDeltaSentAt = chatRunState.deltaSentAt;
const addChatRun = chatRunRegistry.add;
const removeChatRun = chatRunRegistry.remove;
const chatAbortControllers = new Map<string, ChatAbortControllerEntry>();
return {
canvasHost,
httpServer,
wss,
clients,
broadcast,
agentRunSeq,
dedupe,
chatRunState,
chatRunBuffers,
chatDeltaSentAt,
addChatRun,
removeChatRun,
chatAbortControllers,
};
}

View File

@@ -0,0 +1,22 @@
import { loadConfig } from "../config/config.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import {
getAgentRunContext,
registerAgentRunContext,
} from "../infra/agent-events.js";
export function resolveSessionKeyForRun(runId: string) {
const cached = getAgentRunContext(runId)?.sessionKey;
if (cached) return cached;
const cfg = loadConfig();
const storePath = resolveStorePath(cfg.session?.store);
const store = loadSessionStore(storePath);
const found = Object.entries(store).find(
([, entry]) => entry?.sessionId === runId,
);
const sessionKey = found?.[0];
if (sessionKey) {
registerAgentRunContext(runId, { sessionKey });
}
return sessionKey;
}

View File

@@ -0,0 +1,31 @@
import chalk from "chalk";
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js";
import { resolveConfiguredModelRef } from "../agents/model-selection.js";
import type { loadConfig } from "../config/config.js";
import { getResolvedLoggerSettings } from "../logging.js";
export function logGatewayStartup(params: {
cfg: ReturnType<typeof loadConfig>;
bindHost: string;
port: number;
log: { info: (msg: string, meta?: Record<string, unknown>) => void };
isNixMode: boolean;
}) {
const { provider: agentProvider, model: agentModel } =
resolveConfiguredModelRef({
cfg: params.cfg,
defaultProvider: DEFAULT_PROVIDER,
defaultModel: DEFAULT_MODEL,
});
const modelRef = `${agentProvider}/${agentModel}`;
params.log.info(`agent model: ${modelRef}`, {
consoleMessage: `agent model: ${chalk.whiteBright(modelRef)}`,
});
params.log.info(
`listening on ws://${params.bindHost}:${params.port} (PID ${process.pid})`,
);
params.log.info(`log file: ${getResolvedLoggerSettings().file}`);
if (params.isNixMode) {
params.log.info("gateway: running in Nix mode (config managed externally)");
}
}

View File

@@ -0,0 +1,136 @@
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js";
import { loadModelCatalog } from "../agents/model-catalog.js";
import {
getModelRefStatus,
resolveConfiguredModelRef,
resolveHooksGmailModel,
} from "../agents/model-selection.js";
import type { CliDeps } from "../cli/deps.js";
import type { loadConfig } from "../config/config.js";
import { startGmailWatcher } from "../hooks/gmail-watcher.js";
import type { loadClawdbotPlugins } from "../plugins/loader.js";
import {
type PluginServicesHandle,
startPluginServices,
} from "../plugins/services.js";
import { startBrowserControlServerIfEnabled } from "./server-browser.js";
import {
scheduleRestartSentinelWake,
shouldWakeFromRestartSentinel,
} from "./server-restart-sentinel.js";
export async function startGatewaySidecars(params: {
cfg: ReturnType<typeof loadConfig>;
pluginRegistry: ReturnType<typeof loadClawdbotPlugins>;
defaultWorkspaceDir: string;
deps: CliDeps;
startChannels: () => Promise<void>;
log: { warn: (msg: string) => void };
logHooks: {
info: (msg: string) => void;
warn: (msg: string) => void;
error: (msg: string) => void;
};
logChannels: { info: (msg: string) => void; error: (msg: string) => void };
logBrowser: { error: (msg: string) => void };
}) {
// Start clawd browser control server (unless disabled via config).
let browserControl: Awaited<
ReturnType<typeof startBrowserControlServerIfEnabled>
> = null;
try {
browserControl = await startBrowserControlServerIfEnabled();
} catch (err) {
params.logBrowser.error(`server failed to start: ${String(err)}`);
}
// Start Gmail watcher if configured (hooks.gmail.account).
if (process.env.CLAWDBOT_SKIP_GMAIL_WATCHER !== "1") {
try {
const gmailResult = await startGmailWatcher(params.cfg);
if (gmailResult.started) {
params.logHooks.info("gmail watcher started");
} else if (
gmailResult.reason &&
gmailResult.reason !== "hooks not enabled" &&
gmailResult.reason !== "no gmail account configured"
) {
params.logHooks.warn(
`gmail watcher not started: ${gmailResult.reason}`,
);
}
} catch (err) {
params.logHooks.error(`gmail watcher failed to start: ${String(err)}`);
}
}
// Validate hooks.gmail.model if configured.
if (params.cfg.hooks?.gmail?.model) {
const hooksModelRef = resolveHooksGmailModel({
cfg: params.cfg,
defaultProvider: DEFAULT_PROVIDER,
});
if (hooksModelRef) {
const { provider: defaultProvider, model: defaultModel } =
resolveConfiguredModelRef({
cfg: params.cfg,
defaultProvider: DEFAULT_PROVIDER,
defaultModel: DEFAULT_MODEL,
});
const catalog = await loadModelCatalog({ config: params.cfg });
const status = getModelRefStatus({
cfg: params.cfg,
catalog,
ref: hooksModelRef,
defaultProvider,
defaultModel,
});
if (!status.allowed) {
params.logHooks.warn(
`hooks.gmail.model "${status.key}" not in agents.defaults.models allowlist (will use primary instead)`,
);
}
if (!status.inCatalog) {
params.logHooks.warn(
`hooks.gmail.model "${status.key}" not in the model catalog (may fail at runtime)`,
);
}
}
}
// Launch configured channels so gateway replies via the surface the message came from.
// Tests can opt out via CLAWDBOT_SKIP_CHANNELS (or legacy CLAWDBOT_SKIP_PROVIDERS).
const skipChannels =
process.env.CLAWDBOT_SKIP_CHANNELS === "1" ||
process.env.CLAWDBOT_SKIP_PROVIDERS === "1";
if (!skipChannels) {
try {
await params.startChannels();
} catch (err) {
params.logChannels.error(`channel startup failed: ${String(err)}`);
}
} else {
params.logChannels.info(
"skipping channel start (CLAWDBOT_SKIP_CHANNELS=1 or CLAWDBOT_SKIP_PROVIDERS=1)",
);
}
let pluginServices: PluginServicesHandle | null = null;
try {
pluginServices = await startPluginServices({
registry: params.pluginRegistry,
config: params.cfg,
workspaceDir: params.defaultWorkspaceDir,
});
} catch (err) {
params.log.warn(`plugin services failed to start: ${String(err)}`);
}
if (shouldWakeFromRestartSentinel()) {
setTimeout(() => {
void scheduleRestartSentinelWake({ deps: params.deps });
}, 750);
}
return { browserControl, pluginServices };
}

View File

@@ -0,0 +1,60 @@
import {
disableTailscaleFunnel,
disableTailscaleServe,
enableTailscaleFunnel,
enableTailscaleServe,
getTailnetHostname,
} from "../infra/tailscale.js";
export async function startGatewayTailscaleExposure(params: {
tailscaleMode: "off" | "serve" | "funnel";
resetOnExit?: boolean;
port: number;
controlUiBasePath?: string;
logTailscale: { info: (msg: string) => void; warn: (msg: string) => void };
}): Promise<(() => Promise<void>) | null> {
if (params.tailscaleMode === "off") {
return null;
}
try {
if (params.tailscaleMode === "serve") {
await enableTailscaleServe(params.port);
} else {
await enableTailscaleFunnel(params.port);
}
const host = await getTailnetHostname().catch(() => null);
if (host) {
const uiPath = params.controlUiBasePath
? `${params.controlUiBasePath}/`
: "/";
params.logTailscale.info(
`${params.tailscaleMode} enabled: https://${host}${uiPath} (WS via wss://${host})`,
);
} else {
params.logTailscale.info(`${params.tailscaleMode} enabled`);
}
} catch (err) {
params.logTailscale.warn(
`${params.tailscaleMode} failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
if (!params.resetOnExit) {
return null;
}
return async () => {
try {
if (params.tailscaleMode === "serve") {
await disableTailscaleServe();
} else {
await disableTailscaleFunnel();
}
} catch (err) {
params.logTailscale.warn(
`${params.tailscaleMode} cleanup failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
};
}

View File

@@ -0,0 +1,21 @@
import type { WizardSession } from "../wizard/session.js";
export function createWizardSessionTracker() {
const wizardSessions = new Map<string, WizardSession>();
const findRunningWizard = (): string | null => {
for (const [id, session] of wizardSessions) {
if (session.getStatus() === "running") return id;
}
return null;
};
const purgeWizardSession = (id: string) => {
const session = wizardSessions.get(id);
if (!session) return;
if (session.getStatus() === "running") return;
wizardSessions.delete(id);
};
return { wizardSessions, findRunningWizard, purgeWizardSession };
}

View File

@@ -0,0 +1,52 @@
import type { WebSocketServer } from "ws";
import type { createSubsystemLogger } from "../logging.js";
import type { ResolvedGatewayAuth } from "./auth.js";
import { attachGatewayWsConnectionHandler } from "./server/ws-connection.js";
import type { GatewayWsClient } from "./server/ws-types.js";
import type {
GatewayRequestContext,
GatewayRequestHandlers,
} from "./server-methods/types.js";
export function attachGatewayWsHandlers(params: {
wss: WebSocketServer;
clients: Set<GatewayWsClient>;
port: number;
bridgeHost?: string;
canvasHostEnabled: boolean;
canvasHostServerPort?: number;
resolvedAuth: ResolvedGatewayAuth;
gatewayMethods: string[];
events: string[];
logGateway: ReturnType<typeof createSubsystemLogger>;
logHealth: ReturnType<typeof createSubsystemLogger>;
logWsControl: ReturnType<typeof createSubsystemLogger>;
extraHandlers: GatewayRequestHandlers;
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
context: GatewayRequestContext;
}) {
attachGatewayWsConnectionHandler({
wss: params.wss,
clients: params.clients,
port: params.port,
bridgeHost: params.bridgeHost,
canvasHostEnabled: params.canvasHostEnabled,
canvasHostServerPort: params.canvasHostServerPort,
resolvedAuth: params.resolvedAuth,
gatewayMethods: params.gatewayMethods,
events: params.events,
logGateway: params.logGateway,
logHealth: params.logHealth,
logWsControl: params.logWsControl,
extraHandlers: params.extraHandlers,
broadcast: params.broadcast,
buildRequestContext: () => params.context,
});
}

File diff suppressed because it is too large Load Diff