diff --git a/src/gateway/server-bridge-subscriptions.ts b/src/gateway/server-bridge-subscriptions.ts new file mode 100644 index 000000000..ded9f04c5 --- /dev/null +++ b/src/gateway/server-bridge-subscriptions.ts @@ -0,0 +1,144 @@ +export type BridgeSendEventFn = (opts: { + nodeId: string; + event: string; + payloadJSON?: string | null; +}) => void; + +export type BridgeListConnectedFn = () => Array<{ nodeId: string }>; + +export type BridgeSubscriptionManager = { + subscribe: (nodeId: string, sessionKey: string) => void; + unsubscribe: (nodeId: string, sessionKey: string) => void; + unsubscribeAll: (nodeId: string) => void; + sendToSession: ( + sessionKey: string, + event: string, + payload: unknown, + sendEvent?: BridgeSendEventFn | null, + ) => void; + sendToAllSubscribed: ( + event: string, + payload: unknown, + sendEvent?: BridgeSendEventFn | null, + ) => void; + sendToAllConnected: ( + event: string, + payload: unknown, + listConnected?: BridgeListConnectedFn | null, + sendEvent?: BridgeSendEventFn | null, + ) => void; + clear: () => void; +}; + +export function createBridgeSubscriptionManager(): BridgeSubscriptionManager { + const bridgeNodeSubscriptions = new Map>(); + const bridgeSessionSubscribers = new Map>(); + + const toPayloadJSON = (payload: unknown) => + payload ? JSON.stringify(payload) : null; + + const subscribe = (nodeId: string, sessionKey: string) => { + const normalizedNodeId = nodeId.trim(); + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedNodeId || !normalizedSessionKey) return; + + let nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); + if (!nodeSet) { + nodeSet = new Set(); + bridgeNodeSubscriptions.set(normalizedNodeId, nodeSet); + } + if (nodeSet.has(normalizedSessionKey)) return; + nodeSet.add(normalizedSessionKey); + + let sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey); + if (!sessionSet) { + sessionSet = new Set(); + bridgeSessionSubscribers.set(normalizedSessionKey, sessionSet); + } + sessionSet.add(normalizedNodeId); + }; + + const unsubscribe = (nodeId: string, sessionKey: string) => { + const normalizedNodeId = nodeId.trim(); + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedNodeId || !normalizedSessionKey) return; + + const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); + nodeSet?.delete(normalizedSessionKey); + if (nodeSet?.size === 0) bridgeNodeSubscriptions.delete(normalizedNodeId); + + const sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey); + sessionSet?.delete(normalizedNodeId); + if (sessionSet?.size === 0) + bridgeSessionSubscribers.delete(normalizedSessionKey); + }; + + const unsubscribeAll = (nodeId: string) => { + const normalizedNodeId = nodeId.trim(); + const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); + if (!nodeSet) return; + for (const sessionKey of nodeSet) { + const sessionSet = bridgeSessionSubscribers.get(sessionKey); + sessionSet?.delete(normalizedNodeId); + if (sessionSet?.size === 0) bridgeSessionSubscribers.delete(sessionKey); + } + bridgeNodeSubscriptions.delete(normalizedNodeId); + }; + + const sendToSession = ( + sessionKey: string, + event: string, + payload: unknown, + sendEvent?: BridgeSendEventFn | null, + ) => { + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedSessionKey || !sendEvent) return; + const subs = bridgeSessionSubscribers.get(normalizedSessionKey); + if (!subs || subs.size === 0) return; + + const payloadJSON = toPayloadJSON(payload); + for (const nodeId of subs) { + sendEvent({ nodeId, event, payloadJSON }); + } + }; + + const sendToAllSubscribed = ( + event: string, + payload: unknown, + sendEvent?: BridgeSendEventFn | null, + ) => { + if (!sendEvent) return; + const payloadJSON = toPayloadJSON(payload); + for (const nodeId of bridgeNodeSubscriptions.keys()) { + sendEvent({ nodeId, event, payloadJSON }); + } + }; + + const sendToAllConnected = ( + event: string, + payload: unknown, + listConnected?: BridgeListConnectedFn | null, + sendEvent?: BridgeSendEventFn | null, + ) => { + if (!sendEvent || !listConnected) return; + const payloadJSON = toPayloadJSON(payload); + for (const node of listConnected()) { + sendEvent({ nodeId: node.nodeId, event, payloadJSON }); + } + }; + + const clear = () => { + bridgeNodeSubscriptions.clear(); + bridgeSessionSubscribers.clear(); + }; + + return { + subscribe, + unsubscribe, + unsubscribeAll, + sendToSession, + sendToAllSubscribed, + sendToAllConnected, + clear, + }; +} diff --git a/src/gateway/server-bridge.ts b/src/gateway/server-bridge.ts new file mode 100644 index 000000000..e88498cb7 --- /dev/null +++ b/src/gateway/server-bridge.ts @@ -0,0 +1,1148 @@ +import fs from "node:fs"; +import { randomUUID } from "node:crypto"; +import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js"; +import { + buildAllowedModelSet, + buildModelAliasIndex, + modelKey, + resolveConfiguredModelRef, + resolveModelRefFromString, + resolveThinkingDefault, +} from "../agents/model-selection.js"; +import { normalizeGroupActivation } from "../auto-reply/group-activation.js"; +import { + normalizeThinkLevel, + normalizeVerboseLevel, +} from "../auto-reply/thinking.js"; +import { buildConfigSchema } from "../config/schema.js"; +import { + CONFIG_PATH_CLAWDIS, + loadConfig, + parseConfigJson5, + readConfigFileSnapshot, + validateConfigObject, + writeConfigFile, +} from "../config/config.js"; +import { + loadSessionStore, + resolveStorePath, + type SessionEntry, + saveSessionStore, +} from "../config/sessions.js"; +import type { CliDeps } from "../cli/deps.js"; +import { agentCommand } from "../commands/agent.js"; +import { type HealthSummary } from "../commands/health.js"; +import { defaultRuntime } from "../runtime.js"; +import { loadVoiceWakeConfig, setVoiceWakeTriggers } from "../infra/voicewake.js"; +import { + archiveFileOnDisk, + capArrayByJsonBytes, + listSessionsFromStore, + loadSessionEntry, + readSessionMessages, + resolveSessionModelRef, + resolveSessionTranscriptCandidates, + type SessionsPatchResult, +} from "./session-utils.js"; +import { buildMessageWithAttachments } from "./chat-attachments.js"; +import { normalizeVoiceWakeTriggers } from "./server-utils.js"; +import { + ErrorCodes, + errorShape, + formatValidationErrors, + type SessionsCompactParams, + type SessionsDeleteParams, + type SessionsListParams, + type SessionsPatchParams, + type SessionsResetParams, + validateChatAbortParams, + validateChatHistoryParams, + validateChatSendParams, + validateConfigGetParams, + validateConfigSchemaParams, + validateConfigSetParams, + validateModelsListParams, + validateSessionsCompactParams, + validateSessionsDeleteParams, + validateSessionsListParams, + validateSessionsPatchParams, + validateSessionsResetParams, + validateTalkModeParams, +} from "./protocol/index.js"; +import { + HEALTH_REFRESH_INTERVAL_MS, + MAX_CHAT_HISTORY_MESSAGES_BYTES, +} from "./server-constants.js"; +import { formatForLog } from "./ws-log.js"; +import type { ChatRunEntry } from "./server-chat.js"; +import type { DedupeEntry } from "./server-shared.js"; +import type { ModelCatalogEntry } from "../agents/model-catalog.js"; + +export type BridgeHandlersContext = { + deps: CliDeps; + broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void; + bridgeSendToSession: ( + sessionKey: string, + event: string, + payload: unknown, + ) => void; + bridgeSubscribe: (nodeId: string, sessionKey: string) => void; + bridgeUnsubscribe: (nodeId: string, sessionKey: string) => void; + broadcastVoiceWakeChanged: (triggers: string[]) => void; + addChatRun: (sessionId: string, entry: ChatRunEntry) => void; + removeChatRun: ( + sessionId: string, + clientRunId: string, + sessionKey?: string, + ) => ChatRunEntry | undefined; + chatAbortControllers: Map< + string, + { controller: AbortController; sessionId: string; sessionKey: string } + >; + chatRunBuffers: Map; + chatDeltaSentAt: Map; + dedupe: Map; + agentRunSeq: Map; + getHealthCache: () => HealthSummary | null; + refreshHealthSnapshot: (opts?: { probe?: boolean }) => Promise; + loadGatewayModelCatalog: () => Promise; + logBridge: { warn: (msg: string) => void }; +}; + +export function createBridgeHandlers(ctx: BridgeHandlersContext) { + const handleBridgeRequest = async ( + nodeId: string, + req: { id: string; method: string; paramsJSON?: string | null }, + ): Promise< + | { ok: true; payloadJSON?: string | null } + | { ok: false; error: { code: string; message: string; details?: unknown } } + > => { + const method = req.method.trim(); + + const parseParams = (): Record => { + const raw = typeof req.paramsJSON === "string" ? req.paramsJSON : ""; + const trimmed = raw.trim(); + if (!trimmed) return {}; + const parsed = JSON.parse(trimmed) as unknown; + return typeof parsed === "object" && parsed !== null + ? (parsed as Record) + : {}; + }; + + try { + switch (method) { + case "voicewake.get": { + const cfg = await loadVoiceWakeConfig(); + return { + ok: true, + payloadJSON: JSON.stringify({ triggers: cfg.triggers }), + }; + } + case "voicewake.set": { + const params = parseParams(); + const triggers = normalizeVoiceWakeTriggers(params.triggers); + const cfg = await setVoiceWakeTriggers(triggers); + ctx.broadcastVoiceWakeChanged(cfg.triggers); + return { + ok: true, + payloadJSON: JSON.stringify({ triggers: cfg.triggers }), + }; + } + case "health": { + const now = Date.now(); + const cached = ctx.getHealthCache(); + if (cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) { + return { ok: true, payloadJSON: JSON.stringify(cached) }; + } + const snap = await ctx.refreshHealthSnapshot({ probe: false }); + return { ok: true, payloadJSON: JSON.stringify(snap) }; + } + case "config.get": { + const params = parseParams(); + if (!validateConfigGetParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid config.get params: ${formatValidationErrors(validateConfigGetParams.errors)}`, + }, + }; + } + const snapshot = await readConfigFileSnapshot(); + return { ok: true, payloadJSON: JSON.stringify(snapshot) }; + } + case "config.schema": { + const params = parseParams(); + if (!validateConfigSchemaParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid config.schema params: ${formatValidationErrors(validateConfigSchemaParams.errors)}`, + }, + }; + } + const schema = buildConfigSchema(); + return { ok: true, payloadJSON: JSON.stringify(schema) }; + } + case "config.set": { + const params = parseParams(); + if (!validateConfigSetParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid config.set params: ${formatValidationErrors(validateConfigSetParams.errors)}`, + }, + }; + } + const rawValue = (params as { raw?: unknown }).raw; + if (typeof rawValue !== "string") { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "invalid config.set params: raw (string) required", + }, + }; + } + const parsedRes = parseConfigJson5(rawValue); + if (!parsedRes.ok) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: parsedRes.error, + }, + }; + } + const validated = validateConfigObject(parsedRes.parsed); + if (!validated.ok) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "invalid config", + details: { issues: validated.issues }, + }, + }; + } + await writeConfigFile(validated.config); + return { + ok: true, + payloadJSON: JSON.stringify({ + ok: true, + path: CONFIG_PATH_CLAWDIS, + config: validated.config, + }), + }; + } + case "talk.mode": { + const params = parseParams(); + if (!validateTalkModeParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid talk.mode params: ${formatValidationErrors(validateTalkModeParams.errors)}`, + }, + }; + } + const payload = { + enabled: (params as { enabled: boolean }).enabled, + phase: (params as { phase?: string }).phase ?? null, + ts: Date.now(), + }; + ctx.broadcast("talk.mode", payload, { dropIfSlow: true }); + return { ok: true, payloadJSON: JSON.stringify(payload) }; + } + case "models.list": { + const params = parseParams(); + if (!validateModelsListParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid models.list params: ${formatValidationErrors(validateModelsListParams.errors)}`, + }, + }; + } + const models = await ctx.loadGatewayModelCatalog(); + return { ok: true, payloadJSON: JSON.stringify({ models }) }; + } + case "sessions.list": { + const params = parseParams(); + if (!validateSessionsListParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`, + }, + }; + } + const p = params as SessionsListParams; + const cfg = loadConfig(); + const storePath = resolveStorePath(cfg.session?.store); + const store = loadSessionStore(storePath); + const result = listSessionsFromStore({ + cfg, + storePath, + store, + opts: p, + }); + return { ok: true, payloadJSON: JSON.stringify(result) }; + } + case "sessions.patch": { + const params = parseParams(); + if (!validateSessionsPatchParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`, + }, + }; + } + + const p = params as SessionsPatchParams; + const key = String(p.key ?? "").trim(); + if (!key) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "key required", + }, + }; + } + + const cfg = loadConfig(); + const storePath = resolveStorePath(cfg.session?.store); + const store = loadSessionStore(storePath); + const now = Date.now(); + + const existing = store[key]; + const next: SessionEntry = existing + ? { + ...existing, + updatedAt: Math.max(existing.updatedAt ?? 0, now), + } + : { sessionId: randomUUID(), updatedAt: now }; + + if ("thinkingLevel" in p) { + const raw = p.thinkingLevel; + if (raw === null) { + delete next.thinkingLevel; + } else if (raw !== undefined) { + const normalized = normalizeThinkLevel(String(raw)); + if (!normalized) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid thinkingLevel: ${String(raw)}`, + }, + }; + } + next.thinkingLevel = normalized; + } + } + + if ("verboseLevel" in p) { + const raw = p.verboseLevel; + if (raw === null) { + delete next.verboseLevel; + } else if (raw !== undefined) { + const normalized = normalizeVerboseLevel(String(raw)); + if (!normalized) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid verboseLevel: ${String(raw)}`, + }, + }; + } + next.verboseLevel = normalized; + } + } + + if ("model" in p) { + const raw = p.model; + if (raw === null) { + delete next.providerOverride; + delete next.modelOverride; + } else if (raw !== undefined) { + const trimmed = String(raw).trim(); + if (!trimmed) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "invalid model: empty", + }, + }; + } + const resolvedDefault = resolveConfiguredModelRef({ + cfg, + defaultProvider: DEFAULT_PROVIDER, + defaultModel: DEFAULT_MODEL, + }); + const aliasIndex = buildModelAliasIndex({ + cfg, + defaultProvider: resolvedDefault.provider, + }); + const resolved = resolveModelRefFromString({ + raw: trimmed, + defaultProvider: resolvedDefault.provider, + aliasIndex, + }); + if (!resolved) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid model: ${trimmed}`, + }, + }; + } + const catalog = await ctx.loadGatewayModelCatalog(); + const allowed = buildAllowedModelSet({ + cfg, + catalog, + defaultProvider: resolvedDefault.provider, + }); + const key = modelKey(resolved.ref.provider, resolved.ref.model); + if (!allowed.allowAny && !allowed.allowedKeys.has(key)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `model not allowed: ${key}`, + }, + }; + } + if ( + resolved.ref.provider === resolvedDefault.provider && + resolved.ref.model === resolvedDefault.model + ) { + delete next.providerOverride; + delete next.modelOverride; + } else { + next.providerOverride = resolved.ref.provider; + next.modelOverride = resolved.ref.model; + } + } + } + + if ("groupActivation" in p) { + const raw = p.groupActivation; + if (raw === null) { + delete next.groupActivation; + } else if (raw !== undefined) { + const normalized = normalizeGroupActivation(String(raw)); + if (!normalized) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid groupActivation: ${String(raw)}`, + }, + }; + } + next.groupActivation = normalized; + } + } + + store[key] = next; + await saveSessionStore(storePath, store); + const payload: SessionsPatchResult = { + ok: true, + path: storePath, + key, + entry: next, + }; + return { ok: true, payloadJSON: JSON.stringify(payload) }; + } + case "sessions.reset": { + const params = parseParams(); + if (!validateSessionsResetParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid sessions.reset params: ${formatValidationErrors(validateSessionsResetParams.errors)}`, + }, + }; + } + + const p = params as SessionsResetParams; + const key = String(p.key ?? "").trim(); + if (!key) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "key required", + }, + }; + } + + const { storePath, store, entry } = loadSessionEntry(key); + const now = Date.now(); + const next: SessionEntry = { + sessionId: randomUUID(), + updatedAt: now, + systemSent: false, + abortedLastRun: false, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + model: entry?.model, + contextTokens: entry?.contextTokens, + displayName: entry?.displayName, + chatType: entry?.chatType, + surface: entry?.surface, + subject: entry?.subject, + room: entry?.room, + space: entry?.space, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + skillsSnapshot: entry?.skillsSnapshot, + }; + store[key] = next; + await saveSessionStore(storePath, store); + return { + ok: true, + payloadJSON: JSON.stringify({ ok: true, key, entry: next }), + }; + } + case "sessions.delete": { + const params = parseParams(); + if (!validateSessionsDeleteParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid sessions.delete params: ${formatValidationErrors(validateSessionsDeleteParams.errors)}`, + }, + }; + } + + const p = params as SessionsDeleteParams; + const key = String(p.key ?? "").trim(); + if (!key) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "key required", + }, + }; + } + + const deleteTranscript = + typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; + + const { storePath, store, entry } = loadSessionEntry(key); + const sessionId = entry?.sessionId; + const existed = Boolean(store[key]); + if (existed) delete store[key]; + await saveSessionStore(storePath, store); + + const archived: string[] = []; + if (deleteTranscript && sessionId) { + for (const candidate of resolveSessionTranscriptCandidates( + sessionId, + storePath, + )) { + if (!fs.existsSync(candidate)) continue; + try { + archived.push(archiveFileOnDisk(candidate, "deleted")); + } catch { + // Best-effort; deleting the store entry is the main operation. + } + } + } + + return { + ok: true, + payloadJSON: JSON.stringify({ + ok: true, + key, + deleted: existed, + archived, + }), + }; + } + case "sessions.compact": { + const params = parseParams(); + if (!validateSessionsCompactParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid sessions.compact params: ${formatValidationErrors(validateSessionsCompactParams.errors)}`, + }, + }; + } + + const p = params as SessionsCompactParams; + const key = String(p.key ?? "").trim(); + if (!key) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "key required", + }, + }; + } + + const maxLines = + typeof p.maxLines === "number" && Number.isFinite(p.maxLines) + ? Math.max(1, Math.floor(p.maxLines)) + : 400; + + const { storePath, store, entry } = loadSessionEntry(key); + const sessionId = entry?.sessionId; + if (!sessionId) { + return { + ok: true, + payloadJSON: JSON.stringify({ + ok: true, + key, + compacted: false, + reason: "no sessionId", + }), + }; + } + + const filePath = resolveSessionTranscriptCandidates( + sessionId, + storePath, + ).find((candidate) => fs.existsSync(candidate)); + if (!filePath) { + return { + ok: true, + payloadJSON: JSON.stringify({ + ok: true, + key, + compacted: false, + reason: "no transcript", + }), + }; + } + + const raw = fs.readFileSync(filePath, "utf-8"); + const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0); + if (lines.length <= maxLines) { + return { + ok: true, + payloadJSON: JSON.stringify({ + ok: true, + key, + compacted: false, + kept: lines.length, + }), + }; + } + + const archived = archiveFileOnDisk(filePath, "bak"); + const keptLines = lines.slice(-maxLines); + fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); + + // Token counts no longer match; clear so status + UI reflect reality after the next turn. + if (store[key]) { + delete store[key].inputTokens; + delete store[key].outputTokens; + delete store[key].totalTokens; + store[key].updatedAt = Date.now(); + await saveSessionStore(storePath, store); + } + + return { + ok: true, + payloadJSON: JSON.stringify({ + ok: true, + key, + compacted: true, + archived, + kept: keptLines.length, + }), + }; + } + case "chat.history": { + const params = parseParams(); + if (!validateChatHistoryParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`, + }, + }; + } + const { sessionKey, limit } = params as { + sessionKey: string; + limit?: number; + }; + const { cfg, storePath, entry } = loadSessionEntry(sessionKey); + const sessionId = entry?.sessionId; + const rawMessages = + sessionId && storePath + ? readSessionMessages(sessionId, storePath) + : []; + const max = typeof limit === "number" ? limit : 200; + const sliced = + rawMessages.length > max ? rawMessages.slice(-max) : rawMessages; + const capped = capArrayByJsonBytes( + sliced, + MAX_CHAT_HISTORY_MESSAGES_BYTES, + ).items; + let thinkingLevel = entry?.thinkingLevel; + if (!thinkingLevel) { + const configured = cfg.agent?.thinkingDefault; + if (configured) { + thinkingLevel = configured; + } else { + const { provider, model } = resolveSessionModelRef(cfg, entry); + const catalog = await ctx.loadGatewayModelCatalog(); + thinkingLevel = resolveThinkingDefault({ + cfg, + provider, + model, + catalog, + }); + } + } + return { + ok: true, + payloadJSON: JSON.stringify({ + sessionKey, + sessionId, + messages: capped, + thinkingLevel, + }), + }; + } + case "chat.abort": { + const params = parseParams(); + if (!validateChatAbortParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid chat.abort params: ${formatValidationErrors(validateChatAbortParams.errors)}`, + }, + }; + } + + const { sessionKey, runId } = params as { + sessionKey: string; + runId: string; + }; + const active = ctx.chatAbortControllers.get(runId); + if (!active) { + return { + ok: true, + payloadJSON: JSON.stringify({ ok: true, aborted: false }), + }; + } + if (active.sessionKey !== sessionKey) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "runId does not match sessionKey", + }, + }; + } + + active.controller.abort(); + ctx.chatAbortControllers.delete(runId); + ctx.chatRunBuffers.delete(runId); + ctx.chatDeltaSentAt.delete(runId); + ctx.removeChatRun(active.sessionId, runId, sessionKey); + + const payload = { + runId, + sessionKey, + seq: (ctx.agentRunSeq.get(active.sessionId) ?? 0) + 1, + state: "aborted" as const, + }; + ctx.broadcast("chat", payload); + ctx.bridgeSendToSession(sessionKey, "chat", payload); + return { + ok: true, + payloadJSON: JSON.stringify({ ok: true, aborted: true }), + }; + } + case "chat.send": { + const params = parseParams(); + if (!validateChatSendParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`, + }, + }; + } + + const p = params as { + sessionKey: string; + message: string; + thinking?: string; + deliver?: boolean; + attachments?: Array<{ + type?: string; + mimeType?: string; + fileName?: string; + content?: unknown; + }>; + timeoutMs?: number; + idempotencyKey: string; + }; + const timeoutMs = Math.min( + Math.max(p.timeoutMs ?? 30_000, 0), + 30_000, + ); + const normalizedAttachments = + p.attachments?.map((a) => ({ + type: typeof a?.type === "string" ? a.type : undefined, + mimeType: + typeof a?.mimeType === "string" ? a.mimeType : undefined, + fileName: + typeof a?.fileName === "string" ? a.fileName : undefined, + content: + typeof a?.content === "string" + ? a.content + : ArrayBuffer.isView(a?.content) + ? Buffer.from( + a.content.buffer, + a.content.byteOffset, + a.content.byteLength, + ).toString("base64") + : undefined, + })) ?? []; + + let messageWithAttachments = p.message; + if (normalizedAttachments.length > 0) { + try { + messageWithAttachments = buildMessageWithAttachments( + p.message, + normalizedAttachments, + { maxBytes: 5_000_000 }, + ); + } catch (err) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: String(err), + }, + }; + } + } + + const { storePath, store, entry } = loadSessionEntry(p.sessionKey); + const now = Date.now(); + const sessionId = entry?.sessionId ?? randomUUID(); + const sessionEntry: SessionEntry = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + systemSent: entry?.systemSent, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + const clientRunId = p.idempotencyKey; + + const cached = ctx.dedupe.get(`chat:${clientRunId}`); + if (cached) { + if (cached.ok) { + return { ok: true, payloadJSON: JSON.stringify(cached.payload) }; + } + return { + ok: false, + error: cached.error ?? { + code: ErrorCodes.UNAVAILABLE, + message: "request failed", + }, + }; + } + + try { + const abortController = new AbortController(); + ctx.chatAbortControllers.set(clientRunId, { + controller: abortController, + sessionId, + sessionKey: p.sessionKey, + }); + ctx.addChatRun(sessionId, { + sessionKey: p.sessionKey, + clientRunId, + }); + + if (store) { + store[p.sessionKey] = sessionEntry; + if (storePath) { + await saveSessionStore(storePath, store); + } + } + + await agentCommand( + { + message: messageWithAttachments, + sessionId, + thinking: p.thinking, + deliver: p.deliver, + timeout: Math.ceil(timeoutMs / 1000).toString(), + surface: `Node(${nodeId})`, + abortSignal: abortController.signal, + }, + defaultRuntime, + ctx.deps, + ); + const payload = { + runId: clientRunId, + status: "ok" as const, + }; + ctx.dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: true, + payload, + }); + return { ok: true, payloadJSON: JSON.stringify(payload) }; + } catch (err) { + const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); + const payload = { + runId: clientRunId, + status: "error" as const, + summary: String(err), + }; + ctx.dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: false, + payload, + error, + }); + return { + ok: false, + error: error ?? { + code: ErrorCodes.UNAVAILABLE, + message: String(err), + }, + }; + } finally { + ctx.chatAbortControllers.delete(clientRunId); + } + } + default: + return { + ok: false, + error: { + code: "FORBIDDEN", + message: "Method not allowed", + details: { method }, + }, + }; + } + } catch (err) { + return { + ok: false, + error: { code: ErrorCodes.INVALID_REQUEST, message: String(err) }, + }; + } + }; + + const handleBridgeEvent = async ( + nodeId: string, + evt: { event: string; payloadJSON?: string | null }, + ) => { + switch (evt.event) { + case "voice.transcript": { + if (!evt.payloadJSON) return; + let payload: unknown; + try { + payload = JSON.parse(evt.payloadJSON) as unknown; + } catch { + return; + } + const obj = + typeof payload === "object" && payload !== null + ? (payload as Record) + : {}; + const text = typeof obj.text === "string" ? obj.text.trim() : ""; + if (!text) return; + if (text.length > 20_000) return; + const sessionKeyRaw = + typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; + const mainKey = + (loadConfig().session?.mainKey ?? "main").trim() || "main"; + const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : mainKey; + const { storePath, store, entry } = loadSessionEntry(sessionKey); + const now = Date.now(); + const sessionId = entry?.sessionId ?? randomUUID(); + store[sessionKey] = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + systemSent: entry?.systemSent, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + if (storePath) { + await saveSessionStore(storePath, store); + } + + // Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send). + // This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId). + ctx.addChatRun(sessionId, { + sessionKey, + clientRunId: `voice-${randomUUID()}`, + }); + + void agentCommand( + { + message: text, + sessionId, + thinking: "low", + deliver: false, + surface: "Node", + }, + defaultRuntime, + ctx.deps, + ).catch((err) => { + ctx.logBridge.warn( + `agent failed node=${nodeId}: ${formatForLog(err)}`, + ); + }); + return; + } + case "agent.request": { + if (!evt.payloadJSON) return; + type AgentDeepLink = { + message?: string; + sessionKey?: string | null; + thinking?: string | null; + deliver?: boolean; + to?: string | null; + channel?: string | null; + timeoutSeconds?: number | null; + key?: string | null; + }; + let link: AgentDeepLink | null = null; + try { + link = JSON.parse(evt.payloadJSON) as AgentDeepLink; + } catch { + return; + } + const message = (link?.message ?? "").trim(); + if (!message) return; + if (message.length > 20_000) return; + + const channelRaw = + typeof link?.channel === "string" ? link.channel.trim() : ""; + const channel = channelRaw.toLowerCase(); + const provider = + channel === "whatsapp" || + channel === "telegram" || + channel === "signal" || + channel === "imessage" + ? channel + : undefined; + const to = + typeof link?.to === "string" && link.to.trim() + ? link.to.trim() + : undefined; + const deliver = Boolean(link?.deliver) && Boolean(provider); + + const sessionKeyRaw = (link?.sessionKey ?? "").trim(); + const sessionKey = + sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`; + const { storePath, store, entry } = loadSessionEntry(sessionKey); + const now = Date.now(); + const sessionId = entry?.sessionId ?? randomUUID(); + store[sessionKey] = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + systemSent: entry?.systemSent, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + if (storePath) { + await saveSessionStore(storePath, store); + } + + void agentCommand( + { + message, + sessionId, + thinking: link?.thinking ?? undefined, + deliver, + to, + provider, + timeout: + typeof link?.timeoutSeconds === "number" + ? link.timeoutSeconds.toString() + : undefined, + surface: "Node", + }, + defaultRuntime, + ctx.deps, + ).catch((err) => { + ctx.logBridge.warn( + `agent failed node=${nodeId}: ${formatForLog(err)}`, + ); + }); + return; + } + case "chat.subscribe": { + if (!evt.payloadJSON) return; + let payload: unknown; + try { + payload = JSON.parse(evt.payloadJSON) as unknown; + } catch { + return; + } + const obj = + typeof payload === "object" && payload !== null + ? (payload as Record) + : {}; + const sessionKey = + typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; + if (!sessionKey) return; + ctx.bridgeSubscribe(nodeId, sessionKey); + return; + } + case "chat.unsubscribe": { + if (!evt.payloadJSON) return; + let payload: unknown; + try { + payload = JSON.parse(evt.payloadJSON) as unknown; + } catch { + return; + } + const obj = + typeof payload === "object" && payload !== null + ? (payload as Record) + : {}; + const sessionKey = + typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; + if (!sessionKey) return; + ctx.bridgeUnsubscribe(nodeId, sessionKey); + return; + } + default: + return; + } + }; + + return { handleBridgeRequest, handleBridgeEvent }; +} diff --git a/src/gateway/server-browser.ts b/src/gateway/server-browser.ts new file mode 100644 index 000000000..8eb19b88b --- /dev/null +++ b/src/gateway/server-browser.ts @@ -0,0 +1,17 @@ +export type BrowserControlServer = { + stop: () => Promise; +}; + +export async function startBrowserControlServerIfEnabled(): Promise< + BrowserControlServer | null +> { + if (process.env.CLAWDIS_SKIP_BROWSER_CONTROL_SERVER === "1") return null; + // Lazy import: keeps startup fast, but still bundles for the embedded + // gateway (bun --compile) via the static specifier path. + const override = process.env.CLAWDIS_BROWSER_CONTROL_MODULE?.trim(); + const mod = override + ? await import(override) + : await import("../browser/server.js"); + await mod.startBrowserControlServerFromConfig(); + return { stop: mod.stopBrowserControlServer }; +} diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts new file mode 100644 index 000000000..2d0687e47 --- /dev/null +++ b/src/gateway/server-chat.ts @@ -0,0 +1,248 @@ +import type { AgentEventPayload } from "../infra/agent-events.js"; +import { formatForLog } from "./ws-log.js"; + +export type ChatRunEntry = { + sessionKey: string; + clientRunId: string; +}; + +export type ChatRunRegistry = { + add: (sessionId: string, entry: ChatRunEntry) => void; + peek: (sessionId: string) => ChatRunEntry | undefined; + shift: (sessionId: string) => ChatRunEntry | undefined; + remove: ( + sessionId: string, + clientRunId: string, + sessionKey?: string, + ) => ChatRunEntry | undefined; + clear: () => void; +}; + +export function createChatRunRegistry(): ChatRunRegistry { + const chatRunSessions = new Map(); + + const add = (sessionId: string, entry: ChatRunEntry) => { + const queue = chatRunSessions.get(sessionId); + if (queue) { + queue.push(entry); + } else { + chatRunSessions.set(sessionId, [entry]); + } + }; + + const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0]; + + const shift = (sessionId: string) => { + const queue = chatRunSessions.get(sessionId); + if (!queue || queue.length === 0) return undefined; + const entry = queue.shift(); + if (!queue.length) chatRunSessions.delete(sessionId); + return entry; + }; + + const remove = ( + sessionId: string, + clientRunId: string, + sessionKey?: string, + ) => { + const queue = chatRunSessions.get(sessionId); + if (!queue || queue.length === 0) return undefined; + const idx = queue.findIndex( + (entry) => + entry.clientRunId === clientRunId && + (sessionKey ? entry.sessionKey === sessionKey : true), + ); + if (idx < 0) return undefined; + const [entry] = queue.splice(idx, 1); + if (!queue.length) chatRunSessions.delete(sessionId); + return entry; + }; + + const clear = () => { + chatRunSessions.clear(); + }; + + return { add, peek, shift, remove, clear }; +} + +export type ChatRunState = { + registry: ChatRunRegistry; + buffers: Map; + deltaSentAt: Map; + clear: () => void; +}; + +export function createChatRunState(): ChatRunState { + const registry = createChatRunRegistry(); + const buffers = new Map(); + const deltaSentAt = new Map(); + + const clear = () => { + registry.clear(); + buffers.clear(); + deltaSentAt.clear(); + }; + + return { + registry, + buffers, + deltaSentAt, + clear, + }; +} + +export type ChatEventBroadcast = ( + event: string, + payload: unknown, + opts?: { dropIfSlow?: boolean }, +) => void; + +export type BridgeSendToSession = ( + sessionKey: string, + event: string, + payload: unknown, +) => void; + +export type AgentEventHandlerOptions = { + broadcast: ChatEventBroadcast; + bridgeSendToSession: BridgeSendToSession; + agentRunSeq: Map; + chatRunState: ChatRunState; + resolveSessionKeyForRun: (runId: string) => string | undefined; + clearAgentRunContext: (runId: string) => void; +}; + +export function createAgentEventHandler({ + broadcast, + bridgeSendToSession, + agentRunSeq, + chatRunState, + resolveSessionKeyForRun, + clearAgentRunContext, +}: AgentEventHandlerOptions) { + const emitChatDelta = ( + sessionKey: string, + clientRunId: string, + seq: number, + text: string, + ) => { + chatRunState.buffers.set(clientRunId, text); + const now = Date.now(); + const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0; + if (now - last < 150) return; + chatRunState.deltaSentAt.set(clientRunId, now); + const payload = { + runId: clientRunId, + sessionKey, + seq, + state: "delta" as const, + message: { + role: "assistant", + content: [{ type: "text", text }], + timestamp: now, + }, + }; + broadcast("chat", payload, { dropIfSlow: true }); + bridgeSendToSession(sessionKey, "chat", payload); + }; + + const emitChatFinal = ( + sessionKey: string, + clientRunId: string, + seq: number, + jobState: "done" | "error", + error?: unknown, + ) => { + const text = chatRunState.buffers.get(clientRunId)?.trim() ?? ""; + chatRunState.buffers.delete(clientRunId); + chatRunState.deltaSentAt.delete(clientRunId); + if (jobState === "done") { + const payload = { + runId: clientRunId, + sessionKey, + seq, + state: "final" as const, + message: text + ? { + role: "assistant", + content: [{ type: "text", text }], + timestamp: Date.now(), + } + : undefined, + }; + broadcast("chat", payload); + bridgeSendToSession(sessionKey, "chat", payload); + return; + } + const payload = { + runId: clientRunId, + sessionKey, + seq, + state: "error" as const, + errorMessage: error ? formatForLog(error) : undefined, + }; + broadcast("chat", payload); + bridgeSendToSession(sessionKey, "chat", payload); + }; + + return (evt: AgentEventPayload) => { + const last = agentRunSeq.get(evt.runId) ?? 0; + if (evt.seq !== last + 1) { + broadcast("agent", { + runId: evt.runId, + stream: "error", + ts: Date.now(), + data: { + reason: "seq gap", + expected: last + 1, + received: evt.seq, + }, + }); + } + agentRunSeq.set(evt.runId, evt.seq); + broadcast("agent", evt); + + const chatLink = chatRunState.registry.peek(evt.runId); + const sessionKey = + chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId); + const jobState = + evt.stream === "job" && typeof evt.data?.state === "string" + ? (evt.data.state as "done" | "error" | string) + : null; + + if (sessionKey) { + bridgeSendToSession(sessionKey, "agent", evt); + if (evt.stream === "assistant" && typeof evt.data?.text === "string") { + const clientRunId = chatLink?.clientRunId ?? evt.runId; + emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text); + } else if (jobState === "done" || jobState === "error") { + if (chatLink) { + const finished = chatRunState.registry.shift(evt.runId); + if (!finished) { + clearAgentRunContext(evt.runId); + return; + } + emitChatFinal( + finished.sessionKey, + finished.clientRunId, + evt.seq, + jobState, + evt.data?.error, + ); + } else { + emitChatFinal( + sessionKey, + evt.runId, + evt.seq, + jobState, + evt.data?.error, + ); + } + } + } + + if (jobState === "done" || jobState === "error") { + clearAgentRunContext(evt.runId); + } + }; +} diff --git a/src/gateway/server-discovery.ts b/src/gateway/server-discovery.ts new file mode 100644 index 000000000..7418736db --- /dev/null +++ b/src/gateway/server-discovery.ts @@ -0,0 +1,76 @@ +import fs from "node:fs"; +import path from "node:path"; +import { getTailnetHostname } from "../infra/tailscale.js"; +import { runExec } from "../process/exec.js"; + +export type ResolveBonjourCliPathOptions = { + env?: NodeJS.ProcessEnv; + argv?: string[]; + execPath?: string; + cwd?: string; + statSync?: (path: string) => fs.Stats; +}; + +export function formatBonjourInstanceName(displayName: string) { + const trimmed = displayName.trim(); + if (!trimmed) return "Clawdis"; + if (/clawdis/i.test(trimmed)) return trimmed; + return `${trimmed} (Clawdis)`; +} + +export function resolveBonjourCliPath( + opts: ResolveBonjourCliPathOptions = {}, +): string | undefined { + const env = opts.env ?? process.env; + const envPath = env.CLAWDIS_CLI_PATH?.trim(); + if (envPath) return envPath; + + const statSync = opts.statSync ?? fs.statSync; + const isFile = (candidate: string) => { + try { + return statSync(candidate).isFile(); + } catch { + return false; + } + }; + + const execPath = opts.execPath ?? process.execPath; + const execDir = path.dirname(execPath); + const siblingCli = path.join(execDir, "clawdis"); + if (isFile(siblingCli)) return siblingCli; + + const argv = opts.argv ?? process.argv; + const argvPath = argv[1]; + if (argvPath && isFile(argvPath)) { + const base = path.basename(argvPath); + if (!base.includes("gateway-daemon")) return argvPath; + } + + const cwd = opts.cwd ?? process.cwd(); + const distCli = path.join(cwd, "dist", "index.js"); + if (isFile(distCli)) return distCli; + const binCli = path.join(cwd, "bin", "clawdis.js"); + if (isFile(binCli)) return binCli; + + return undefined; +} + +export async function resolveTailnetDnsHint(opts?: { + env?: NodeJS.ProcessEnv; + exec?: typeof runExec; +}): Promise { + const env = opts?.env ?? process.env; + const envRaw = env.CLAWDIS_TAILNET_DNS?.trim(); + const envValue = envRaw && envRaw.length > 0 ? envRaw.replace(/\.$/, "") : ""; + if (envValue) return envValue; + + const exec = + opts?.exec ?? + ((command, args) => + runExec(command, args, { timeoutMs: 1500, maxBuffer: 200_000 })); + try { + return await getTailnetHostname(exec); + } catch { + return undefined; + } +} diff --git a/src/gateway/server-shared.ts b/src/gateway/server-shared.ts new file mode 100644 index 000000000..919fafe5d --- /dev/null +++ b/src/gateway/server-shared.ts @@ -0,0 +1,8 @@ +import type { ErrorShape } from "./protocol/index.js"; + +export type DedupeEntry = { + ts: number; + ok: boolean; + payload?: unknown; + error?: ErrorShape; +}; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 4d0041d14..a998a86c7 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -1,8 +1,6 @@ import { randomUUID } from "node:crypto"; -import fs from "node:fs"; import type { Server as HttpServer } from "node:http"; import os from "node:os"; -import path from "node:path"; import chalk from "chalk"; import { type WebSocket, WebSocketServer } from "ws"; import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js"; @@ -11,19 +9,7 @@ import { type ModelCatalogEntry, resetModelCatalogCacheForTest, } from "../agents/model-catalog.js"; -import { - buildAllowedModelSet, - buildModelAliasIndex, - modelKey, - resolveConfiguredModelRef, - resolveModelRefFromString, - resolveThinkingDefault, -} from "../agents/model-selection.js"; -import { normalizeGroupActivation } from "../auto-reply/group-activation.js"; -import { - normalizeThinkLevel, - normalizeVerboseLevel, -} from "../auto-reply/thinking.js"; +import { resolveConfiguredModelRef } from "../agents/model-selection.js"; import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js"; import { type CanvasHostHandler, @@ -32,25 +18,19 @@ import { startCanvasHost, } from "../canvas-host/server.js"; import { createDefaultDeps } from "../cli/deps.js"; -import { agentCommand } from "../commands/agent.js"; import { getHealthSnapshot, type HealthSummary } from "../commands/health.js"; import { CONFIG_PATH_CLAWDIS, isNixMode, loadConfig, migrateLegacyConfig, - parseConfigJson5, readConfigFileSnapshot, STATE_DIR_CLAWDIS, - validateConfigObject, writeConfigFile, } from "../config/config.js"; -import { buildConfigSchema } from "../config/schema.js"; import { loadSessionStore, resolveStorePath, - type SessionEntry, - saveSessionStore, } from "../config/sessions.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js"; @@ -91,7 +71,6 @@ import { } from "../infra/tailscale.js"; import { loadVoiceWakeConfig, - setVoiceWakeTriggers, } from "../infra/voicewake.js"; import { WIDE_AREA_DISCOVERY_DOMAIN, @@ -105,8 +84,6 @@ import { runtimeForLogger, } from "../logging.js"; import { setCommandLaneConcurrency } from "../process/command-queue.js"; -import { runExec } from "../process/exec.js"; -import { defaultRuntime } from "../runtime.js"; import { runOnboardingWizard } from "../wizard/onboarding.js"; import type { WizardSession } from "../wizard/session.js"; import { @@ -114,7 +91,6 @@ import { authorizeGatewayConnect, type ResolvedGatewayAuth, } from "./auth.js"; -import { buildMessageWithAttachments } from "./chat-attachments.js"; import { normalizeControlUiBasePath } from "./control-ui.js"; import { resolveHooksConfig } from "./hooks.js"; import { @@ -128,10 +104,21 @@ import { HANDSHAKE_TIMEOUT_MS, HEALTH_REFRESH_INTERVAL_MS, MAX_BUFFERED_BYTES, - MAX_CHAT_HISTORY_MESSAGES_BYTES, MAX_PAYLOAD_BYTES, TICK_INTERVAL_MS, } from "./server-constants.js"; +import { startBrowserControlServerIfEnabled } from "./server-browser.js"; +import { createBridgeHandlers } from "./server-bridge.js"; +import { createBridgeSubscriptionManager } from "./server-bridge-subscriptions.js"; +import { + createAgentEventHandler, + createChatRunState, +} from "./server-chat.js"; +import { + formatBonjourInstanceName, + resolveBonjourCliPath, + resolveTailnetDnsHint, +} from "./server-discovery.js"; import { attachGatewayUpgradeHandler, createGatewayHttpServer, @@ -139,17 +126,8 @@ import { } from "./server-http.js"; import { handleGatewayRequest } from "./server-methods.js"; import { createProviderManager } from "./server-providers.js"; -import { formatError, normalizeVoiceWakeTriggers } from "./server-utils.js"; -import { - archiveFileOnDisk, - capArrayByJsonBytes, - listSessionsFromStore, - loadSessionEntry, - readSessionMessages, - resolveSessionModelRef, - resolveSessionTranscriptCandidates, - type SessionsPatchResult, -} from "./session-utils.js"; +import { formatError } from "./server-utils.js"; +import type { DedupeEntry } from "./server-shared.js"; import { formatForLog, logWs, summarizeAgentEventForWsLog } from "./ws-log.js"; ensureClawdisCliOnPath(); @@ -177,51 +155,6 @@ const discordRuntimeEnv = runtimeForLogger(logDiscord); const signalRuntimeEnv = runtimeForLogger(logSignal); const imessageRuntimeEnv = runtimeForLogger(logIMessage); -function resolveBonjourCliPath(): string | undefined { - const envPath = process.env.CLAWDIS_CLI_PATH?.trim(); - if (envPath) return envPath; - - const isFile = (candidate: string) => { - try { - return fs.statSync(candidate).isFile(); - } catch { - return false; - } - }; - - const execDir = path.dirname(process.execPath); - const siblingCli = path.join(execDir, "clawdis"); - if (isFile(siblingCli)) return siblingCli; - - const argvPath = process.argv[1]; - if (argvPath && isFile(argvPath)) { - const base = path.basename(argvPath); - if (!base.includes("gateway-daemon")) return argvPath; - } - - const cwd = process.cwd(); - const distCli = path.join(cwd, "dist", "index.js"); - if (isFile(distCli)) return distCli; - const binCli = path.join(cwd, "bin", "clawdis.js"); - if (isFile(binCli)) return binCli; - - return undefined; -} - -let stopBrowserControlServerIfStarted: (() => Promise) | null = null; - -async function startBrowserControlServerIfEnabled(): Promise { - if (process.env.CLAWDIS_SKIP_BROWSER_CONTROL_SERVER === "1") return; - // Lazy import: keeps startup fast, but still bundles for the embedded - // gateway (bun --compile) via the static specifier path. - const override = process.env.CLAWDIS_BROWSER_CONTROL_MODULE?.trim(); - const mod = override - ? await import(override) - : await import("../browser/server.js"); - stopBrowserControlServerIfStarted = mod.stopBrowserControlServer; - await mod.startBrowserControlServerFromConfig(); -} - type GatewayModelChoice = ModelCatalogEntry; // Test-only escape hatch: model catalog is cached at module scope for the @@ -243,27 +176,9 @@ import { formatValidationErrors, PROTOCOL_VERSION, type RequestFrame, - type SessionsCompactParams, - type SessionsDeleteParams, - type SessionsListParams, - type SessionsPatchParams, - type SessionsResetParams, type Snapshot, - validateChatAbortParams, - validateChatHistoryParams, - validateChatSendParams, - validateConfigGetParams, - validateConfigSchemaParams, - validateConfigSetParams, validateConnectParams, - validateModelsListParams, validateRequestFrame, - validateSessionsCompactParams, - validateSessionsDeleteParams, - validateSessionsListParams, - validateSessionsPatchParams, - validateSessionsResetParams, - validateTalkModeParams, } from "./protocol/index.js"; type Client = { @@ -273,27 +188,6 @@ type Client = { presenceKey?: string; }; -function formatBonjourInstanceName(displayName: string) { - const trimmed = displayName.trim(); - if (!trimmed) return "Clawdis"; - if (/clawdis/i.test(trimmed)) return trimmed; - return `${trimmed} (Clawdis)`; -} - -async function resolveTailnetDnsHint(): Promise { - const envRaw = process.env.CLAWDIS_TAILNET_DNS?.trim(); - const env = envRaw && envRaw.length > 0 ? envRaw.replace(/\.$/, "") : ""; - if (env) return env; - - const exec: typeof runExec = (command, args) => - runExec(command, args, { timeoutMs: 1500, maxBuffer: 200_000 }); - try { - return await getTailnetHostname(exec); - } catch { - return undefined; - } -} - const METHODS = [ "health", "providers.status", @@ -435,13 +329,6 @@ function buildSnapshot(): Snapshot { }; } -type DedupeEntry = { - ts: number; - ok: boolean; - payload?: unknown; - error?: ErrorShape; -}; - async function refreshHealthSnapshot(_opts?: { probe?: boolean }) { if (!healthRefresh) { healthRefresh = (async () => { @@ -691,8 +578,7 @@ export async function startGatewayServer( }); let bonjourStop: (() => Promise) | null = null; let bridge: Awaited> | null = null; - const bridgeNodeSubscriptions = new Map>(); - const bridgeSessionSubscribers = new Map>(); + const bridgeSubscriptions = createBridgeSubscriptionManager(); const isMobilePlatform = (platform: unknown): boolean => { const p = typeof platform === "string" ? platform.trim().toLowerCase() : ""; @@ -744,48 +630,12 @@ export async function startGatewayServer( // Track per-run sequence to detect out-of-order/lost agent events. const agentRunSeq = new Map(); const dedupe = new Map(); - // Map agent runId -> pending chat runs for WebChat clients. - const chatRunSessions = new Map< - string, - Array<{ sessionKey: string; clientRunId: string }> - >(); - const addChatRun = ( - sessionId: string, - entry: { sessionKey: string; clientRunId: string }, - ) => { - const queue = chatRunSessions.get(sessionId); - if (queue) { - queue.push(entry); - } else { - chatRunSessions.set(sessionId, [entry]); - } - }; - const peekChatRun = (sessionId: string) => - chatRunSessions.get(sessionId)?.[0]; - const shiftChatRun = (sessionId: string) => { - const queue = chatRunSessions.get(sessionId); - if (!queue || queue.length === 0) return undefined; - const entry = queue.shift(); - if (!queue.length) chatRunSessions.delete(sessionId); - return entry; - }; - const removeChatRun = ( - sessionId: string, - clientRunId: string, - sessionKey?: string, - ) => { - const queue = chatRunSessions.get(sessionId); - if (!queue || queue.length === 0) return undefined; - const idx = queue.findIndex( - (entry) => - entry.clientRunId === clientRunId && - (sessionKey ? entry.sessionKey === sessionKey : true), - ); - if (idx < 0) return undefined; - const [entry] = queue.splice(idx, 1); - if (!queue.length) chatRunSessions.delete(sessionId); - return entry; - }; + const chatRunState = createChatRunState(); + const chatRunRegistry = chatRunState.registry; + const chatRunBuffers = chatRunState.buffers; + const chatDeltaSentAt = chatRunState.deltaSentAt; + const addChatRun = chatRunRegistry.add; + const removeChatRun = chatRunRegistry.remove; const resolveSessionKeyForRun = (runId: string) => { const cached = getAgentRunContext(runId)?.sessionKey; if (cached) return cached; @@ -801,8 +651,6 @@ export async function startGatewayServer( } return sessionKey; }; - const chatRunBuffers = new Map(); - const chatDeltaSentAt = new Map(); const chatAbortControllers = new Map< string, { controller: AbortController; sessionId: string; sessionKey: string } @@ -1005,86 +853,33 @@ export async function startGatewayServer( } } - const bridgeSubscribe = (nodeId: string, sessionKey: string) => { - const normalizedNodeId = nodeId.trim(); - const normalizedSessionKey = sessionKey.trim(); - if (!normalizedNodeId || !normalizedSessionKey) return; - - let nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); - if (!nodeSet) { - nodeSet = new Set(); - bridgeNodeSubscriptions.set(normalizedNodeId, nodeSet); - } - if (nodeSet.has(normalizedSessionKey)) return; - nodeSet.add(normalizedSessionKey); - - let sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey); - if (!sessionSet) { - sessionSet = new Set(); - bridgeSessionSubscribers.set(normalizedSessionKey, sessionSet); - } - sessionSet.add(normalizedNodeId); - }; - - const bridgeUnsubscribe = (nodeId: string, sessionKey: string) => { - const normalizedNodeId = nodeId.trim(); - const normalizedSessionKey = sessionKey.trim(); - if (!normalizedNodeId || !normalizedSessionKey) return; - - const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); - nodeSet?.delete(normalizedSessionKey); - if (nodeSet?.size === 0) bridgeNodeSubscriptions.delete(normalizedNodeId); - - const sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey); - sessionSet?.delete(normalizedNodeId); - if (sessionSet?.size === 0) - bridgeSessionSubscribers.delete(normalizedSessionKey); - }; - - const bridgeUnsubscribeAll = (nodeId: string) => { - const normalizedNodeId = nodeId.trim(); - const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); - if (!nodeSet) return; - for (const sessionKey of nodeSet) { - const sessionSet = bridgeSessionSubscribers.get(sessionKey); - sessionSet?.delete(normalizedNodeId); - if (sessionSet?.size === 0) bridgeSessionSubscribers.delete(sessionKey); - } - bridgeNodeSubscriptions.delete(normalizedNodeId); - }; - + const bridgeSubscribe = bridgeSubscriptions.subscribe; + const bridgeUnsubscribe = bridgeSubscriptions.unsubscribe; + const bridgeUnsubscribeAll = bridgeSubscriptions.unsubscribeAll; const bridgeSendToSession = ( sessionKey: string, event: string, payload: unknown, - ) => { - const normalizedSessionKey = sessionKey.trim(); - if (!normalizedSessionKey) return; - const subs = bridgeSessionSubscribers.get(normalizedSessionKey); - if (!subs || subs.size === 0) return; - if (!bridge) return; - - const payloadJSON = payload ? JSON.stringify(payload) : null; - for (const nodeId of subs) { - bridge.sendEvent({ nodeId, event, payloadJSON }); - } - }; - - const bridgeSendToAllSubscribed = (event: string, payload: unknown) => { - if (!bridge) return; - const payloadJSON = payload ? JSON.stringify(payload) : null; - for (const nodeId of bridgeNodeSubscriptions.keys()) { - bridge.sendEvent({ nodeId, event, payloadJSON }); - } - }; - - const bridgeSendToAllConnected = (event: string, payload: unknown) => { - if (!bridge) return; - const payloadJSON = payload ? JSON.stringify(payload) : null; - for (const node of bridge.listConnected()) { - bridge.sendEvent({ nodeId: node.nodeId, event, payloadJSON }); - } - }; + ) => + bridgeSubscriptions.sendToSession( + sessionKey, + event, + payload, + bridge ? (opts) => bridge.sendEvent(opts) : undefined, + ); + const bridgeSendToAllSubscribed = (event: string, payload: unknown) => + bridgeSubscriptions.sendToAllSubscribed( + event, + payload, + bridge ? (opts) => bridge.sendEvent(opts) : undefined, + ); + const bridgeSendToAllConnected = (event: string, payload: unknown) => + bridgeSubscriptions.sendToAllConnected( + event, + payload, + bridge ? () => bridge.listConnected() : undefined, + bridge ? (opts) => bridge.sendEvent(opts) : undefined, + ); const broadcastVoiceWakeChanged = (triggers: string[]) => { const payload = { triggers }; @@ -1092,1032 +887,25 @@ export async function startGatewayServer( bridgeSendToAllConnected("voicewake.changed", payload); }; - const handleBridgeRequest = async ( - nodeId: string, - req: { id: string; method: string; paramsJSON?: string | null }, - ): Promise< - | { ok: true; payloadJSON?: string | null } - | { ok: false; error: { code: string; message: string; details?: unknown } } - > => { - const method = req.method.trim(); - - const parseParams = (): Record => { - const raw = typeof req.paramsJSON === "string" ? req.paramsJSON : ""; - const trimmed = raw.trim(); - if (!trimmed) return {}; - const parsed = JSON.parse(trimmed) as unknown; - return typeof parsed === "object" && parsed !== null - ? (parsed as Record) - : {}; - }; - - try { - switch (method) { - case "voicewake.get": { - const cfg = await loadVoiceWakeConfig(); - return { - ok: true, - payloadJSON: JSON.stringify({ triggers: cfg.triggers }), - }; - } - case "voicewake.set": { - const params = parseParams(); - const triggers = normalizeVoiceWakeTriggers(params.triggers); - const cfg = await setVoiceWakeTriggers(triggers); - broadcastVoiceWakeChanged(cfg.triggers); - return { - ok: true, - payloadJSON: JSON.stringify({ triggers: cfg.triggers }), - }; - } - case "health": { - const now = Date.now(); - const cached = healthCache; - if (cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) { - return { ok: true, payloadJSON: JSON.stringify(cached) }; - } - const snap = await refreshHealthSnapshot({ probe: false }); - return { ok: true, payloadJSON: JSON.stringify(snap) }; - } - case "config.get": { - const params = parseParams(); - if (!validateConfigGetParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid config.get params: ${formatValidationErrors(validateConfigGetParams.errors)}`, - }, - }; - } - const snapshot = await readConfigFileSnapshot(); - return { ok: true, payloadJSON: JSON.stringify(snapshot) }; - } - case "config.schema": { - const params = parseParams(); - if (!validateConfigSchemaParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid config.schema params: ${formatValidationErrors(validateConfigSchemaParams.errors)}`, - }, - }; - } - const schema = buildConfigSchema(); - return { ok: true, payloadJSON: JSON.stringify(schema) }; - } - case "config.set": { - const params = parseParams(); - if (!validateConfigSetParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid config.set params: ${formatValidationErrors(validateConfigSetParams.errors)}`, - }, - }; - } - const rawValue = (params as { raw?: unknown }).raw; - if (typeof rawValue !== "string") { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: "invalid config.set params: raw (string) required", - }, - }; - } - const parsedRes = parseConfigJson5(rawValue); - if (!parsedRes.ok) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: parsedRes.error, - }, - }; - } - const validated = validateConfigObject(parsedRes.parsed); - if (!validated.ok) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: "invalid config", - details: { issues: validated.issues }, - }, - }; - } - await writeConfigFile(validated.config); - return { - ok: true, - payloadJSON: JSON.stringify({ - ok: true, - path: CONFIG_PATH_CLAWDIS, - config: validated.config, - }), - }; - } - case "talk.mode": { - const params = parseParams(); - if (!validateTalkModeParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid talk.mode params: ${formatValidationErrors(validateTalkModeParams.errors)}`, - }, - }; - } - const payload = { - enabled: (params as { enabled: boolean }).enabled, - phase: (params as { phase?: string }).phase ?? null, - ts: Date.now(), - }; - broadcast("talk.mode", payload, { dropIfSlow: true }); - return { ok: true, payloadJSON: JSON.stringify(payload) }; - } - case "models.list": { - const params = parseParams(); - if (!validateModelsListParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid models.list params: ${formatValidationErrors(validateModelsListParams.errors)}`, - }, - }; - } - const models = await loadGatewayModelCatalog(); - return { ok: true, payloadJSON: JSON.stringify({ models }) }; - } - case "sessions.list": { - const params = parseParams(); - if (!validateSessionsListParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`, - }, - }; - } - const p = params as SessionsListParams; - const cfg = loadConfig(); - const storePath = resolveStorePath(cfg.session?.store); - const store = loadSessionStore(storePath); - const result = listSessionsFromStore({ - cfg, - storePath, - store, - opts: p, - }); - return { ok: true, payloadJSON: JSON.stringify(result) }; - } - case "sessions.patch": { - const params = parseParams(); - if (!validateSessionsPatchParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`, - }, - }; - } - - const p = params as SessionsPatchParams; - const key = String(p.key ?? "").trim(); - if (!key) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: "key required", - }, - }; - } - - const cfg = loadConfig(); - const storePath = resolveStorePath(cfg.session?.store); - const store = loadSessionStore(storePath); - const now = Date.now(); - - const existing = store[key]; - const next: SessionEntry = existing - ? { - ...existing, - updatedAt: Math.max(existing.updatedAt ?? 0, now), - } - : { sessionId: randomUUID(), updatedAt: now }; - - if ("thinkingLevel" in p) { - const raw = p.thinkingLevel; - if (raw === null) { - delete next.thinkingLevel; - } else if (raw !== undefined) { - const normalized = normalizeThinkLevel(String(raw)); - if (!normalized) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid thinkingLevel: ${String(raw)}`, - }, - }; - } - next.thinkingLevel = normalized; - } - } - - if ("verboseLevel" in p) { - const raw = p.verboseLevel; - if (raw === null) { - delete next.verboseLevel; - } else if (raw !== undefined) { - const normalized = normalizeVerboseLevel(String(raw)); - if (!normalized) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid verboseLevel: ${String(raw)}`, - }, - }; - } - next.verboseLevel = normalized; - } - } - - if ("model" in p) { - const raw = p.model; - if (raw === null) { - delete next.providerOverride; - delete next.modelOverride; - } else if (raw !== undefined) { - const trimmed = String(raw).trim(); - if (!trimmed) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: "invalid model: empty", - }, - }; - } - const resolvedDefault = resolveConfiguredModelRef({ - cfg, - defaultProvider: DEFAULT_PROVIDER, - defaultModel: DEFAULT_MODEL, - }); - const aliasIndex = buildModelAliasIndex({ - cfg, - defaultProvider: resolvedDefault.provider, - }); - const resolved = resolveModelRefFromString({ - raw: trimmed, - defaultProvider: resolvedDefault.provider, - aliasIndex, - }); - if (!resolved) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid model: ${trimmed}`, - }, - }; - } - const catalog = await loadGatewayModelCatalog(); - const allowed = buildAllowedModelSet({ - cfg, - catalog, - defaultProvider: resolvedDefault.provider, - }); - const key = modelKey(resolved.ref.provider, resolved.ref.model); - if (!allowed.allowAny && !allowed.allowedKeys.has(key)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `model not allowed: ${key}`, - }, - }; - } - if ( - resolved.ref.provider === resolvedDefault.provider && - resolved.ref.model === resolvedDefault.model - ) { - delete next.providerOverride; - delete next.modelOverride; - } else { - next.providerOverride = resolved.ref.provider; - next.modelOverride = resolved.ref.model; - } - } - } - - if ("groupActivation" in p) { - const raw = p.groupActivation; - if (raw === null) { - delete next.groupActivation; - } else if (raw !== undefined) { - const normalized = normalizeGroupActivation(String(raw)); - if (!normalized) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid groupActivation: ${String(raw)}`, - }, - }; - } - next.groupActivation = normalized; - } - } - - store[key] = next; - await saveSessionStore(storePath, store); - const payload: SessionsPatchResult = { - ok: true, - path: storePath, - key, - entry: next, - }; - return { ok: true, payloadJSON: JSON.stringify(payload) }; - } - case "sessions.reset": { - const params = parseParams(); - if (!validateSessionsResetParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid sessions.reset params: ${formatValidationErrors(validateSessionsResetParams.errors)}`, - }, - }; - } - - const p = params as SessionsResetParams; - const key = String(p.key ?? "").trim(); - if (!key) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: "key required", - }, - }; - } - - const { storePath, store, entry } = loadSessionEntry(key); - const now = Date.now(); - const next: SessionEntry = { - sessionId: randomUUID(), - updatedAt: now, - systemSent: false, - abortedLastRun: false, - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - model: entry?.model, - contextTokens: entry?.contextTokens, - displayName: entry?.displayName, - chatType: entry?.chatType, - surface: entry?.surface, - subject: entry?.subject, - room: entry?.room, - space: entry?.space, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - skillsSnapshot: entry?.skillsSnapshot, - }; - store[key] = next; - await saveSessionStore(storePath, store); - return { - ok: true, - payloadJSON: JSON.stringify({ ok: true, key, entry: next }), - }; - } - case "sessions.delete": { - const params = parseParams(); - if (!validateSessionsDeleteParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid sessions.delete params: ${formatValidationErrors(validateSessionsDeleteParams.errors)}`, - }, - }; - } - - const p = params as SessionsDeleteParams; - const key = String(p.key ?? "").trim(); - if (!key) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: "key required", - }, - }; - } - - const deleteTranscript = - typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; - - const { storePath, store, entry } = loadSessionEntry(key); - const sessionId = entry?.sessionId; - const existed = Boolean(store[key]); - if (existed) delete store[key]; - await saveSessionStore(storePath, store); - - const archived: string[] = []; - if (deleteTranscript && sessionId) { - for (const candidate of resolveSessionTranscriptCandidates( - sessionId, - storePath, - )) { - if (!fs.existsSync(candidate)) continue; - try { - archived.push(archiveFileOnDisk(candidate, "deleted")); - } catch { - // Best-effort; deleting the store entry is the main operation. - } - } - } - - return { - ok: true, - payloadJSON: JSON.stringify({ - ok: true, - key, - deleted: existed, - archived, - }), - }; - } - case "sessions.compact": { - const params = parseParams(); - if (!validateSessionsCompactParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid sessions.compact params: ${formatValidationErrors(validateSessionsCompactParams.errors)}`, - }, - }; - } - - const p = params as SessionsCompactParams; - const key = String(p.key ?? "").trim(); - if (!key) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: "key required", - }, - }; - } - - const maxLines = - typeof p.maxLines === "number" && Number.isFinite(p.maxLines) - ? Math.max(1, Math.floor(p.maxLines)) - : 400; - - const { storePath, store, entry } = loadSessionEntry(key); - const sessionId = entry?.sessionId; - if (!sessionId) { - return { - ok: true, - payloadJSON: JSON.stringify({ - ok: true, - key, - compacted: false, - reason: "no sessionId", - }), - }; - } - - const filePath = resolveSessionTranscriptCandidates( - sessionId, - storePath, - ).find((candidate) => fs.existsSync(candidate)); - if (!filePath) { - return { - ok: true, - payloadJSON: JSON.stringify({ - ok: true, - key, - compacted: false, - reason: "no transcript", - }), - }; - } - - const raw = fs.readFileSync(filePath, "utf-8"); - const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0); - if (lines.length <= maxLines) { - return { - ok: true, - payloadJSON: JSON.stringify({ - ok: true, - key, - compacted: false, - kept: lines.length, - }), - }; - } - - const archived = archiveFileOnDisk(filePath, "bak"); - const keptLines = lines.slice(-maxLines); - fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); - - // Token counts no longer match; clear so status + UI reflect reality after the next turn. - if (store[key]) { - delete store[key].inputTokens; - delete store[key].outputTokens; - delete store[key].totalTokens; - store[key].updatedAt = Date.now(); - await saveSessionStore(storePath, store); - } - - return { - ok: true, - payloadJSON: JSON.stringify({ - ok: true, - key, - compacted: true, - archived, - kept: keptLines.length, - }), - }; - } - case "chat.history": { - const params = parseParams(); - if (!validateChatHistoryParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`, - }, - }; - } - const { sessionKey, limit } = params as { - sessionKey: string; - limit?: number; - }; - const { cfg, storePath, entry } = loadSessionEntry(sessionKey); - const sessionId = entry?.sessionId; - const rawMessages = - sessionId && storePath - ? readSessionMessages(sessionId, storePath) - : []; - const max = typeof limit === "number" ? limit : 200; - const sliced = - rawMessages.length > max ? rawMessages.slice(-max) : rawMessages; - const capped = capArrayByJsonBytes( - sliced, - MAX_CHAT_HISTORY_MESSAGES_BYTES, - ).items; - let thinkingLevel = entry?.thinkingLevel; - if (!thinkingLevel) { - const configured = cfg.agent?.thinkingDefault; - if (configured) { - thinkingLevel = configured; - } else { - const { provider, model } = resolveSessionModelRef(cfg, entry); - const catalog = await loadGatewayModelCatalog(); - thinkingLevel = resolveThinkingDefault({ - cfg, - provider, - model, - catalog, - }); - } - } - return { - ok: true, - payloadJSON: JSON.stringify({ - sessionKey, - sessionId, - messages: capped, - thinkingLevel, - }), - }; - } - case "chat.abort": { - const params = parseParams(); - if (!validateChatAbortParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid chat.abort params: ${formatValidationErrors(validateChatAbortParams.errors)}`, - }, - }; - } - - const { sessionKey, runId } = params as { - sessionKey: string; - runId: string; - }; - const active = chatAbortControllers.get(runId); - if (!active) { - return { - ok: true, - payloadJSON: JSON.stringify({ ok: true, aborted: false }), - }; - } - if (active.sessionKey !== sessionKey) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: "runId does not match sessionKey", - }, - }; - } - - active.controller.abort(); - chatAbortControllers.delete(runId); - chatRunBuffers.delete(runId); - chatDeltaSentAt.delete(runId); - removeChatRun(active.sessionId, runId, sessionKey); - - const payload = { - runId, - sessionKey, - seq: (agentRunSeq.get(active.sessionId) ?? 0) + 1, - state: "aborted" as const, - }; - broadcast("chat", payload); - bridgeSendToSession(sessionKey, "chat", payload); - return { - ok: true, - payloadJSON: JSON.stringify({ ok: true, aborted: true }), - }; - } - case "chat.send": { - const params = parseParams(); - if (!validateChatSendParams(params)) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: `invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`, - }, - }; - } - - const p = params as { - sessionKey: string; - message: string; - thinking?: string; - deliver?: boolean; - attachments?: Array<{ - type?: string; - mimeType?: string; - fileName?: string; - content?: unknown; - }>; - timeoutMs?: number; - idempotencyKey: string; - }; - const timeoutMs = Math.min( - Math.max(p.timeoutMs ?? 30_000, 0), - 30_000, - ); - const normalizedAttachments = - p.attachments?.map((a) => ({ - type: typeof a?.type === "string" ? a.type : undefined, - mimeType: - typeof a?.mimeType === "string" ? a.mimeType : undefined, - fileName: - typeof a?.fileName === "string" ? a.fileName : undefined, - content: - typeof a?.content === "string" - ? a.content - : ArrayBuffer.isView(a?.content) - ? Buffer.from( - a.content.buffer, - a.content.byteOffset, - a.content.byteLength, - ).toString("base64") - : undefined, - })) ?? []; - - let messageWithAttachments = p.message; - if (normalizedAttachments.length > 0) { - try { - messageWithAttachments = buildMessageWithAttachments( - p.message, - normalizedAttachments, - { maxBytes: 5_000_000 }, - ); - } catch (err) { - return { - ok: false, - error: { - code: ErrorCodes.INVALID_REQUEST, - message: String(err), - }, - }; - } - } - - const { storePath, store, entry } = loadSessionEntry(p.sessionKey); - const now = Date.now(); - const sessionId = entry?.sessionId ?? randomUUID(); - const sessionEntry: SessionEntry = { - sessionId, - updatedAt: now, - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - systemSent: entry?.systemSent, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - }; - const clientRunId = p.idempotencyKey; - - const cached = dedupe.get(`chat:${clientRunId}`); - if (cached) { - if (cached.ok) { - return { ok: true, payloadJSON: JSON.stringify(cached.payload) }; - } - return { - ok: false, - error: cached.error ?? { - code: ErrorCodes.UNAVAILABLE, - message: "request failed", - }, - }; - } - - try { - const abortController = new AbortController(); - chatAbortControllers.set(clientRunId, { - controller: abortController, - sessionId, - sessionKey: p.sessionKey, - }); - addChatRun(sessionId, { sessionKey: p.sessionKey, clientRunId }); - - if (store) { - store[p.sessionKey] = sessionEntry; - if (storePath) { - await saveSessionStore(storePath, store); - } - } - - await agentCommand( - { - message: messageWithAttachments, - sessionId, - thinking: p.thinking, - deliver: p.deliver, - timeout: Math.ceil(timeoutMs / 1000).toString(), - surface: `Node(${nodeId})`, - abortSignal: abortController.signal, - }, - defaultRuntime, - deps, - ); - const payload = { - runId: clientRunId, - status: "ok" as const, - }; - dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: true, - payload, - }); - return { ok: true, payloadJSON: JSON.stringify(payload) }; - } catch (err) { - const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); - const payload = { - runId: clientRunId, - status: "error" as const, - summary: String(err), - }; - dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: false, - payload, - error, - }); - return { - ok: false, - error: error ?? { - code: ErrorCodes.UNAVAILABLE, - message: String(err), - }, - }; - } finally { - chatAbortControllers.delete(clientRunId); - } - } - default: - return { - ok: false, - error: { - code: "FORBIDDEN", - message: "Method not allowed", - details: { method }, - }, - }; - } - } catch (err) { - return { - ok: false, - error: { code: ErrorCodes.INVALID_REQUEST, message: String(err) }, - }; - } - }; - - const handleBridgeEvent = async ( - nodeId: string, - evt: { event: string; payloadJSON?: string | null }, - ) => { - switch (evt.event) { - case "voice.transcript": { - if (!evt.payloadJSON) return; - let payload: unknown; - try { - payload = JSON.parse(evt.payloadJSON) as unknown; - } catch { - return; - } - const obj = - typeof payload === "object" && payload !== null - ? (payload as Record) - : {}; - const text = typeof obj.text === "string" ? obj.text.trim() : ""; - if (!text) return; - if (text.length > 20_000) return; - const sessionKeyRaw = - typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; - const mainKey = - (loadConfig().session?.mainKey ?? "main").trim() || "main"; - const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : mainKey; - const { storePath, store, entry } = loadSessionEntry(sessionKey); - const now = Date.now(); - const sessionId = entry?.sessionId ?? randomUUID(); - store[sessionKey] = { - sessionId, - updatedAt: now, - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - systemSent: entry?.systemSent, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - }; - if (storePath) { - await saveSessionStore(storePath, store); - } - - // Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send). - // This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId). - addChatRun(sessionId, { - sessionKey, - clientRunId: `voice-${randomUUID()}`, - }); - - void agentCommand( - { - message: text, - sessionId, - thinking: "low", - deliver: false, - surface: "Node", - }, - defaultRuntime, - deps, - ).catch((err) => { - logBridge.warn(`agent failed node=${nodeId}: ${formatForLog(err)}`); - }); - return; - } - case "agent.request": { - if (!evt.payloadJSON) return; - type AgentDeepLink = { - message?: string; - sessionKey?: string | null; - thinking?: string | null; - deliver?: boolean; - to?: string | null; - channel?: string | null; - timeoutSeconds?: number | null; - key?: string | null; - }; - let link: AgentDeepLink | null = null; - try { - link = JSON.parse(evt.payloadJSON) as AgentDeepLink; - } catch { - return; - } - const message = (link?.message ?? "").trim(); - if (!message) return; - if (message.length > 20_000) return; - - const channelRaw = - typeof link?.channel === "string" ? link.channel.trim() : ""; - const channel = channelRaw.toLowerCase(); - const provider = - channel === "whatsapp" || - channel === "telegram" || - channel === "signal" || - channel === "imessage" - ? channel - : undefined; - const to = - typeof link?.to === "string" && link.to.trim() - ? link.to.trim() - : undefined; - const deliver = Boolean(link?.deliver) && Boolean(provider); - - const sessionKeyRaw = (link?.sessionKey ?? "").trim(); - const sessionKey = - sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`; - const { storePath, store, entry } = loadSessionEntry(sessionKey); - const now = Date.now(); - const sessionId = entry?.sessionId ?? randomUUID(); - store[sessionKey] = { - sessionId, - updatedAt: now, - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - systemSent: entry?.systemSent, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - }; - if (storePath) { - await saveSessionStore(storePath, store); - } - - void agentCommand( - { - message, - sessionId, - thinking: link?.thinking ?? undefined, - deliver, - to, - provider, - timeout: - typeof link?.timeoutSeconds === "number" - ? link.timeoutSeconds.toString() - : undefined, - surface: "Node", - }, - defaultRuntime, - deps, - ).catch((err) => { - logBridge.warn(`agent failed node=${nodeId}: ${formatForLog(err)}`); - }); - return; - } - case "chat.subscribe": { - if (!evt.payloadJSON) return; - let payload: unknown; - try { - payload = JSON.parse(evt.payloadJSON) as unknown; - } catch { - return; - } - const obj = - typeof payload === "object" && payload !== null - ? (payload as Record) - : {}; - const sessionKey = - typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; - if (!sessionKey) return; - bridgeSubscribe(nodeId, sessionKey); - return; - } - case "chat.unsubscribe": { - if (!evt.payloadJSON) return; - let payload: unknown; - try { - payload = JSON.parse(evt.payloadJSON) as unknown; - } catch { - return; - } - const obj = - typeof payload === "object" && payload !== null - ? (payload as Record) - : {}; - const sessionKey = - typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; - if (!sessionKey) return; - bridgeUnsubscribe(nodeId, sessionKey); - return; - } - default: - return; - } - }; + const { handleBridgeRequest, handleBridgeEvent } = createBridgeHandlers({ + deps, + broadcast, + bridgeSendToSession, + bridgeSubscribe, + bridgeUnsubscribe, + broadcastVoiceWakeChanged, + addChatRun, + removeChatRun, + chatAbortControllers, + chatRunBuffers, + chatDeltaSentAt, + dedupe, + agentRunSeq, + getHealthCache: () => healthCache, + refreshHealthSnapshot, + loadGatewayModelCatalog, + logBridge, + }); const machineDisplayName = await getMachineDisplayName(); const canvasHostPortForBridge = canvasHostServer?.port; @@ -2327,172 +1115,16 @@ export async function startGatewayServer( } }, 60_000); - const agentUnsub = onAgentEvent((evt) => { - const last = agentRunSeq.get(evt.runId) ?? 0; - if (evt.seq !== last + 1) { - // Fan out an error event so clients can refresh the stream on gaps. - broadcast("agent", { - runId: evt.runId, - stream: "error", - ts: Date.now(), - data: { - reason: "seq gap", - expected: last + 1, - received: evt.seq, - }, - }); - } - agentRunSeq.set(evt.runId, evt.seq); - broadcast("agent", evt); - - const chatLink = peekChatRun(evt.runId); - const sessionKey = - chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId); - const jobState = - evt.stream === "job" && typeof evt.data?.state === "string" - ? evt.data.state - : null; - - if (sessionKey) { - if (chatLink) { - // Map agent bus events to chat events for WS WebChat clients. - // Use clientRunId so the webchat can correlate with its pending promise. - const { clientRunId } = chatLink; - bridgeSendToSession(sessionKey, "agent", evt); - if (evt.stream === "assistant" && typeof evt.data?.text === "string") { - const base = { - runId: clientRunId, - sessionKey, - seq: evt.seq, - }; - chatRunBuffers.set(clientRunId, evt.data.text); - const now = Date.now(); - const last = chatDeltaSentAt.get(clientRunId) ?? 0; - // Throttle UI delta events so slow clients don't accumulate unbounded buffers. - if (now - last >= 150) { - chatDeltaSentAt.set(clientRunId, now); - const payload = { - ...base, - state: "delta" as const, - message: { - role: "assistant", - content: [{ type: "text", text: evt.data.text }], - timestamp: now, - }, - }; - broadcast("chat", payload, { dropIfSlow: true }); - bridgeSendToSession(sessionKey, "chat", payload); - } - } else if (jobState === "done" || jobState === "error") { - const finished = shiftChatRun(evt.runId); - if (!finished) { - if (jobState) clearAgentRunContext(evt.runId); - return; - } - const { sessionKey: finishedSessionKey, clientRunId: finishedRunId } = - finished; - const base = { - runId: finishedRunId, - sessionKey: finishedSessionKey, - seq: evt.seq, - }; - const text = chatRunBuffers.get(finishedRunId)?.trim() ?? ""; - chatRunBuffers.delete(finishedRunId); - chatDeltaSentAt.delete(finishedRunId); - if (jobState === "done") { - const payload = { - ...base, - state: "final", - message: text - ? { - role: "assistant", - content: [{ type: "text", text }], - timestamp: Date.now(), - } - : undefined, - }; - broadcast("chat", payload); - bridgeSendToSession(finishedSessionKey, "chat", payload); - } else { - const payload = { - ...base, - state: "error", - errorMessage: evt.data.error - ? formatForLog(evt.data.error) - : undefined, - }; - broadcast("chat", payload); - bridgeSendToSession(finishedSessionKey, "chat", payload); - } - } - } else { - const clientRunId = evt.runId; - bridgeSendToSession(sessionKey, "agent", evt); - if (evt.stream === "assistant" && typeof evt.data?.text === "string") { - const base = { - runId: clientRunId, - sessionKey, - seq: evt.seq, - }; - chatRunBuffers.set(clientRunId, evt.data.text); - const now = Date.now(); - const last = chatDeltaSentAt.get(clientRunId) ?? 0; - if (now - last >= 150) { - chatDeltaSentAt.set(clientRunId, now); - const payload = { - ...base, - state: "delta" as const, - message: { - role: "assistant", - content: [{ type: "text", text: evt.data.text }], - timestamp: now, - }, - }; - broadcast("chat", payload, { dropIfSlow: true }); - bridgeSendToSession(sessionKey, "chat", payload); - } - } else if (jobState === "done" || jobState === "error") { - const base = { - runId: clientRunId, - sessionKey, - seq: evt.seq, - }; - const text = chatRunBuffers.get(clientRunId)?.trim() ?? ""; - chatRunBuffers.delete(clientRunId); - chatDeltaSentAt.delete(clientRunId); - if (jobState === "done") { - const payload = { - ...base, - state: "final", - message: text - ? { - role: "assistant", - content: [{ type: "text", text }], - timestamp: Date.now(), - } - : undefined, - }; - broadcast("chat", payload); - bridgeSendToSession(sessionKey, "chat", payload); - } else { - const payload = { - ...base, - state: "error", - errorMessage: evt.data.error - ? formatForLog(evt.data.error) - : undefined, - }; - broadcast("chat", payload); - bridgeSendToSession(sessionKey, "chat", payload); - } - } - } - } - - if (jobState === "done" || jobState === "error") { - clearAgentRunContext(evt.runId); - } - }); + const agentUnsub = onAgentEvent( + createAgentEventHandler({ + broadcast, + bridgeSendToSession, + agentRunSeq, + chatRunState, + resolveSessionKeyForRun, + clearAgentRunContext, + }), + ); const heartbeatUnsub = onHeartbeatEvent((evt) => { broadcast("heartbeat", evt, { dropIfSlow: true }); @@ -2922,8 +1554,11 @@ export async function startGatewayServer( } // Start clawd browser control server (unless disabled via config). + let browserControl: Awaited< + ReturnType + > = null; try { - await startBrowserControlServerIfEnabled(); + browserControl = await startBrowserControlServerIfEnabled(); } catch (err) { logBrowser.error(`server failed to start: ${String(err)}`); } @@ -3028,8 +1663,7 @@ export async function startGatewayServer( /* ignore */ } } - chatRunSessions.clear(); - chatRunBuffers.clear(); + chatRunState.clear(); for (const c of clients) { try { c.socket.close(1012, "service restart"); @@ -3038,8 +1672,8 @@ export async function startGatewayServer( } } clients.clear(); - if (stopBrowserControlServerIfStarted) { - await stopBrowserControlServerIfStarted().catch(() => {}); + if (browserControl) { + await browserControl.stop().catch(() => {}); } await new Promise((resolve) => wss.close(() => resolve())); await new Promise((resolve, reject) =>