diff --git a/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js b/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js index 730ff1720..e0390356e 100644 --- a/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js +++ b/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js @@ -1,10 +1,9 @@ // Bundled entry point for the macOS WKWebView web chat. -// This replaces the inline module script in index.html so we can ship a single JS bundle. +// New version: talks directly to the Gateway WebSocket (chat.* methods), no /rpc or file watchers. /* global window, document */ if (!globalThis.process) { - // Some vendor modules peek at process.env; provide a minimal stub for browser. globalThis.process = { env: {} }; } @@ -18,59 +17,131 @@ const logStatus = (msg) => { } }; -// Keep the WebChat UI in lockstep with the host system theme. -const setupSystemThemeSync = () => { - const mql = window.matchMedia?.("(prefers-color-scheme: dark)"); - if (!mql) return; - - const apply = (isDark) => { - document.documentElement.classList.toggle("dark", isDark); - document.body?.classList.toggle("dark", isDark); - }; - - // Set initial theme immediately. - apply(mql.matches); - - // React to live theme switches (e.g., macOS Light <-> Dark). - const onChange = (event) => apply(event.matches); - if (mql.addEventListener) { - mql.addEventListener("change", onChange); - } else { - mql.addListener(onChange); // Safari < 14 fallback - } +const randomId = () => { + if (globalThis.crypto?.randomUUID) return globalThis.crypto.randomUUID(); + return `id-${Math.random().toString(16).slice(2)}-${Date.now()}`; }; -async function fetchBootstrap() { - const params = new URLSearchParams(window.location.search); - const sessionKey = params.get("session") || "main"; - const infoUrl = new URL(`./info?session=${encodeURIComponent(sessionKey)}`, window.location.href); - const infoResp = await fetch(infoUrl, { credentials: "omit" }); - if (!infoResp.ok) { - throw new Error(`webchat info failed (${infoResp.status})`); +class GatewaySocket { + constructor(url) { + this.url = url; + this.ws = null; + this.pending = new Map(); + this.handlers = new Map(); + } + + async connect() { + return new Promise((resolve, reject) => { + const ws = new WebSocket(this.url); + this.ws = ws; + + ws.onopen = () => { + const hello = { + type: "hello", + minProtocol: 1, + maxProtocol: 1, + client: { + name: "webchat-ui", + version: "dev", + platform: "browser", + mode: "webchat", + instanceId: randomId(), + }, + }; + ws.send(JSON.stringify(hello)); + }; + + ws.onerror = (err) => reject(err); + + ws.onclose = (ev) => { + if (this.pending.size > 0) { + for (const [, p] of this.pending) + p.reject(new Error("gateway closed")); + this.pending.clear(); + } + if (ev.code !== 1000) reject(new Error(`gateway closed ${ev.code}`)); + }; + + ws.onmessage = (ev) => { + let msg; + try { + msg = JSON.parse(ev.data); + } catch { + return; + } + if (msg.type === "hello-ok") { + this.handlers.set("snapshot", msg.snapshot); + resolve(msg); + return; + } + if (msg.type === "event") { + const cb = this.handlers.get(msg.event); + if (cb) cb(msg.payload, msg); + return; + } + if (msg.type === "res") { + const pending = this.pending.get(msg.id); + if (!pending) return; + this.pending.delete(msg.id); + if (msg.ok) pending.resolve(msg.payload); + else pending.reject(new Error(msg.error?.message || "gateway error")); + } + }; + }); + } + + on(event, handler) { + this.handlers.set(event, handler); + } + + async request(method, params, { timeoutMs = 30_000 } = {}) { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + throw new Error("gateway not connected"); + } + const id = randomId(); + const frame = { type: "req", id, method, params }; + this.ws.send(JSON.stringify(frame)); + return new Promise((resolve, reject) => { + this.pending.set(id, { resolve, reject }); + setTimeout(() => { + if (this.pending.has(id)) { + this.pending.delete(id); + reject(new Error(`${method} timed out`)); + } + }, timeoutMs); + }); } - const info = await infoResp.json(); - return { - sessionKey, - basePath: info.basePath || "/webchat/", - initialMessages: Array.isArray(info.initialMessages) ? info.initialMessages : [], - thinkingLevel: typeof info.thinkingLevel === "string" ? info.thinkingLevel : "off", - }; } -function latestTimestamp(messages) { - if (!Array.isArray(messages) || messages.length === 0) return 0; - const withTs = messages.filter((m) => typeof m?.timestamp === "number"); - if (withTs.length === 0) return messages.length; // best-effort monotonic fallback - return withTs[withTs.length - 1].timestamp; -} - -class NativeTransport { - constructor(sessionKey) { +class ChatTransport { + constructor(sessionKey, gateway, healthOkRef) { this.sessionKey = sessionKey; + this.gateway = gateway; + this.healthOkRef = healthOkRef; + this.pendingRuns = new Map(); + + this.gateway.on("chat", (payload) => { + const runId = payload?.runId; + const pending = runId ? this.pendingRuns.get(runId) : null; + if (!pending) return; + if (payload.state === "error") { + pending.reject(new Error(payload.errorMessage || "chat error")); + this.pendingRuns.delete(runId); + return; + } + if (payload.state === "delta") return; // ignore partials for now + pending.resolve(payload); + this.pendingRuns.delete(runId); + }); } - async *run(messages, userMessage, cfg, signal) { - const attachments = userMessage.attachments?.map((a) => ({ + async *run(_messages, userMessage, cfg, _signal) { + if (!this.healthOkRef.current) { + throw new Error("gateway health not OK; cannot send"); + } + + const text = userMessage.content?.[0]?.text ?? ""; + const attachments = (userMessage.attachments || []).map((a) => ({ type: a.type, mimeType: a.mimeType, fileName: a.fileName, @@ -79,79 +150,66 @@ class NativeTransport { ? a.content : btoa(String.fromCharCode(...new Uint8Array(a.content))), })); - const rpcUrl = new URL("./rpc", window.location.href); - const rpcBody = { - text: userMessage.content?.[0]?.text ?? "", - session: this.sessionKey, - attachments, - }; - if (cfg?.thinkingOnce) { - rpcBody.thinkingOnce = cfg.thinkingOnce; - } else if (cfg?.thinkingOverride) { - rpcBody.thinking = cfg.thinkingOverride; - } - const resultResp = await fetch(rpcUrl, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(rpcBody), - signal, + const thinking = + cfg?.thinkingOnce ?? cfg?.thinkingOverride ?? cfg?.thinking ?? undefined; + const runId = randomId(); + + const pending = new Promise((resolve, reject) => { + this.pendingRuns.set(runId, { resolve, reject }); + setTimeout(() => { + if (this.pendingRuns.has(runId)) { + this.pendingRuns.delete(runId); + reject(new Error("chat timed out")); + } + }, 30_000); }); - if (!resultResp.ok) { - throw new Error(`rpc failed (${resultResp.status})`); - } - const body = await resultResp.json(); - if (!body.ok) { - throw new Error(body.error || "rpc error"); - } - const first = Array.isArray(body.payloads) ? body.payloads[0] : undefined; - const text = (first?.text ?? "").toString(); + await this.gateway.request("chat.send", { + sessionKey: this.sessionKey, + message: text, + attachments: attachments.length ? attachments : undefined, + thinking, + idempotencyKey: runId, + timeoutMs: 30_000, + }); - const usage = { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }; - const assistant = { + yield { type: "turn_start" }; + + const payload = await pending; + const message = payload?.message || { role: "assistant", - content: [{ type: "text", text }], - api: cfg.model.api, - provider: cfg.model.provider, - model: cfg.model.id, - usage, - stopReason: "stop", + content: [{ type: "text", text: "" }], timestamp: Date.now(), }; - yield { type: "turn_start" }; - yield { type: "message_start", message: assistant }; - yield { type: "message_end", message: assistant }; + yield { type: "message_start", message }; + yield { type: "message_end", message }; yield { type: "turn_end" }; yield { type: "agent_end" }; } } const startChat = async () => { - logStatus("boot: fetching session info"); - const { initialMessages, sessionKey, thinkingLevel } = await fetchBootstrap(); - logStatus("boot: starting imports"); - // Align UI theme with host OS preference and keep it updated. - setupSystemThemeSync(); - const { Agent } = await import("./agent/agent.js"); const { ChatPanel } = await import("./ChatPanel.js"); - const { AppStorage, setAppStorage } = await import("./storage/app-storage.js"); + const { AppStorage, setAppStorage } = await import( + "./storage/app-storage.js" + ); const { SettingsStore } = await import("./storage/stores/settings-store.js"); - const { ProviderKeysStore } = await import("./storage/stores/provider-keys-store.js"); + const { ProviderKeysStore } = await import( + "./storage/stores/provider-keys-store.js" + ); const { SessionsStore } = await import("./storage/stores/sessions-store.js"); - const { CustomProvidersStore } = await import("./storage/stores/custom-providers-store.js"); - const { IndexedDBStorageBackend } = await import("./storage/backends/indexeddb-storage-backend.js"); + const { CustomProvidersStore } = await import( + "./storage/stores/custom-providers-store.js" + ); + const { IndexedDBStorageBackend } = await import( + "./storage/backends/indexeddb-storage-backend.js" + ); const { getModel } = await import("@mariozechner/pi-ai"); logStatus("boot: modules loaded"); - // Initialize storage with an in-browser IndexedDB backend. + // Storage init const backend = new IndexedDBStorageBackend({ dbName: "clawdis-webchat", version: 1, @@ -167,11 +225,14 @@ const startChat = async () => { const providerKeysStore = new ProviderKeysStore(); const sessionsStore = new SessionsStore(); const customProvidersStore = new CustomProvidersStore(); - - for (const store of [settingsStore, providerKeysStore, sessionsStore, customProvidersStore]) { + for (const store of [ + settingsStore, + providerKeysStore, + sessionsStore, + customProvidersStore, + ]) { store.setBackend(backend); } - const storage = new AppStorage( settingsStore, providerKeysStore, @@ -181,14 +242,47 @@ const startChat = async () => { ); setAppStorage(storage); - // Prepopulate a dummy API key so the UI does not block sends in embedded mode. - const defaultProvider = "anthropic"; + // Seed dummy API key try { - await providerKeysStore.set(defaultProvider, "embedded"); + await providerKeysStore.set("anthropic", "embedded"); } catch (err) { logStatus(`storage warn: could not seed provider key: ${err}`); } + // Gateway WS + const params = new URLSearchParams(window.location.search); + const sessionKey = params.get("session") || "main"; + const wsUrl = (() => { + const u = new URL(window.location.href); + u.protocol = u.protocol.replace("http", "ws"); + u.port = params.get("gatewayPort") || "18789"; + u.pathname = "/"; + u.search = ""; + return u.toString(); + })(); + logStatus("boot: connecting gateway"); + const gateway = new GatewaySocket(wsUrl); + const hello = await gateway.connect(); + const healthOkRef = { current: Boolean(hello?.snapshot?.health?.ok ?? true) }; + + // Update health on demand when we get tick; simplest is to poll health occasionally. + gateway.on("tick", async () => { + try { + const health = await gateway.request("health", {}, { timeoutMs: 5_000 }); + healthOkRef.current = !!health?.ok; + } catch { + healthOkRef.current = false; + } + }); + + logStatus("boot: fetching history"); + const history = await gateway.request("chat.history", { sessionKey }); + const initialMessages = Array.isArray(history?.messages) + ? history.messages + : []; + const thinkingLevel = + typeof history?.thinkingLevel === "string" ? history.thinkingLevel : "off"; + const agent = new Agent({ initialState: { systemPrompt: "You are Clawd (primary session).", @@ -196,7 +290,7 @@ const startChat = async () => { thinkingLevel, messages: initialMessages, }, - transport: new NativeTransport(sessionKey), + transport: new ChatTransport(sessionKey, gateway, healthOkRef), }); const origPrompt = agent.prompt.bind(agent); @@ -222,68 +316,6 @@ const startChat = async () => { mount.textContent = ""; mount.appendChild(panel); logStatus("boot: ready"); - - // Live sync via WebSocket so other transports (WhatsApp/CLI) appear instantly. - let lastSyncedTs = latestTimestamp(initialMessages); - let ws; - let reconnectTimer; - - const applySnapshot = (info) => { - const messages = Array.isArray(info?.messages) ? info.messages : []; - const ts = latestTimestamp(messages); - const thinking = typeof info?.thinkingLevel === "string" ? info.thinkingLevel : "off"; - - if (!agent.state.isStreaming && ts && ts !== lastSyncedTs) { - agent.replaceMessages(messages); - lastSyncedTs = ts; - } - - if (thinking && thinking !== agent.state.thinkingLevel) { - agent.setThinkingLevel(thinking); - if (panel?.agentInterface) { - panel.agentInterface.sessionThinkingLevel = thinking; - panel.agentInterface.pendingThinkingLevel = null; - if (panel.agentInterface._messageEditor) { - panel.agentInterface._messageEditor.thinkingLevel = thinking; - } - } - } - }; - - const connectSocket = () => { - try { - const wsUrl = new URL(`./socket?session=${encodeURIComponent(sessionKey)}`, window.location.href); - wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); - ws = new WebSocket(wsUrl); - - ws.onmessage = (ev) => { - try { - const data = JSON.parse(ev.data); - if (data?.type === "session") applySnapshot(data); - } catch (err) { - console.warn("ws message parse failed", err); - } - }; - - ws.onclose = () => { - ws = null; - if (!reconnectTimer) { - reconnectTimer = setTimeout(() => { - reconnectTimer = null; - connectSocket(); - }, 2000); - } - }; - - ws.onerror = () => { - ws?.close(); - }; - } catch (err) { - console.warn("ws connect failed", err); - } - }; - - connectSocket(); }; startChat().catch((err) => { diff --git a/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js b/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js index 02f61448a..7219edff6 100644 --- a/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js +++ b/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js @@ -196336,107 +196336,176 @@ const logStatus = (msg) => { if (el && !el.dataset.booted) el.textContent = msg; } catch {} }; -async function fetchBootstrap() { - const params = new URLSearchParams(window.location.search); - const sessionKey = params.get("session") || "main"; - const infoUrl = new URL(`./info?session=${encodeURIComponent(sessionKey)}`, window.location.href); - const infoResp = await fetch(infoUrl, { credentials: "omit" }); - if (!infoResp.ok) { - throw new Error(`webchat info failed (${infoResp.status})`); +const randomId = () => { + if (globalThis.crypto?.randomUUID) return globalThis.crypto.randomUUID(); + return `id-${Math.random().toString(16).slice(2)}-${Date.now()}`; +}; +var GatewaySocket = class { + constructor(url) { + this.url = url; + this.ws = null; + this.pending = new Map(); + this.handlers = new Map(); } - const info$1 = await infoResp.json(); - return { - sessionKey, - basePath: info$1.basePath || "/webchat/", - initialMessages: Array.isArray(info$1.initialMessages) ? info$1.initialMessages : [], - thinkingLevel: typeof info$1.thinkingLevel === "string" ? info$1.thinkingLevel : "off" - }; -} -function latestTimestamp(messages) { - if (!Array.isArray(messages) || messages.length === 0) return 0; - const withTs = messages.filter((m$3) => typeof m$3?.timestamp === "number"); - if (withTs.length === 0) return messages.length; - return withTs[withTs.length - 1].timestamp; -} -var NativeTransport = class { - constructor(sessionKey) { + async connect() { + return new Promise((resolve, reject) => { + const ws = new WebSocket(this.url); + this.ws = ws; + ws.onopen = () => { + const hello = { + type: "hello", + minProtocol: 1, + maxProtocol: 1, + client: { + name: "webchat-ui", + version: "dev", + platform: "browser", + mode: "webchat", + instanceId: randomId() + } + }; + ws.send(JSON.stringify(hello)); + }; + ws.onerror = (err) => reject(err); + ws.onclose = (ev) => { + if (this.pending.size > 0) { + for (const [, p$3] of this.pending) p$3.reject(new Error("gateway closed")); + this.pending.clear(); + } + if (ev.code !== 1e3) reject(new Error(`gateway closed ${ev.code}`)); + }; + ws.onmessage = (ev) => { + let msg; + try { + msg = JSON.parse(ev.data); + } catch { + return; + } + if (msg.type === "hello-ok") { + this.handlers.set("snapshot", msg.snapshot); + resolve(msg); + return; + } + if (msg.type === "event") { + const cb = this.handlers.get(msg.event); + if (cb) cb(msg.payload, msg); + return; + } + if (msg.type === "res") { + const pending = this.pending.get(msg.id); + if (!pending) return; + this.pending.delete(msg.id); + if (msg.ok) pending.resolve(msg.payload); + else pending.reject(new Error(msg.error?.message || "gateway error")); + } + }; + }); + } + on(event, handler) { + this.handlers.set(event, handler); + } + async request(method, params, { timeoutMs = 3e4 } = {}) { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + throw new Error("gateway not connected"); + } + const id = randomId(); + const frame = { + type: "req", + id, + method, + params + }; + this.ws.send(JSON.stringify(frame)); + return new Promise((resolve, reject) => { + this.pending.set(id, { + resolve, + reject + }); + setTimeout(() => { + if (this.pending.has(id)) { + this.pending.delete(id); + reject(new Error(`${method} timed out`)); + } + }, timeoutMs); + }); + } +}; +var ChatTransport = class { + constructor(sessionKey, gateway, healthOkRef) { this.sessionKey = sessionKey; + this.gateway = gateway; + this.healthOkRef = healthOkRef; + this.pendingRuns = new Map(); + this.gateway.on("chat", (payload) => { + const runId = payload?.runId; + const pending = runId ? this.pendingRuns.get(runId) : null; + if (!pending) return; + if (payload.state === "error") { + pending.reject(new Error(payload.errorMessage || "chat error")); + this.pendingRuns.delete(runId); + return; + } + if (payload.state === "delta") return; + pending.resolve(payload); + this.pendingRuns.delete(runId); + }); } - async *run(messages, userMessage, cfg, signal) { - const attachments = userMessage.attachments?.map((a$2) => ({ + async *run(_messages, userMessage, cfg, _signal) { + if (!this.healthOkRef.current) { + throw new Error("gateway health not OK; cannot send"); + } + const text$2 = userMessage.content?.[0]?.text ?? ""; + const attachments = (userMessage.attachments || []).map((a$2) => ({ type: a$2.type, mimeType: a$2.mimeType, fileName: a$2.fileName, content: typeof a$2.content === "string" ? a$2.content : btoa(String.fromCharCode(...new Uint8Array(a$2.content))) })); - const rpcUrl = new URL("./rpc", window.location.href); - const rpcBody = { - text: userMessage.content?.[0]?.text ?? "", - session: this.sessionKey, - attachments - }; - if (cfg?.thinkingOnce) { - rpcBody.thinkingOnce = cfg.thinkingOnce; - } else if (cfg?.thinkingOverride) { - rpcBody.thinking = cfg.thinkingOverride; - } - const resultResp = await fetch(rpcUrl, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(rpcBody), - signal + const thinking = cfg?.thinkingOnce ?? cfg?.thinkingOverride ?? cfg?.thinking ?? undefined; + const runId = randomId(); + const pending = new Promise((resolve, reject) => { + this.pendingRuns.set(runId, { + resolve, + reject + }); + setTimeout(() => { + if (this.pendingRuns.has(runId)) { + this.pendingRuns.delete(runId); + reject(new Error("chat timed out")); + } + }, 3e4); }); - if (!resultResp.ok) { - throw new Error(`rpc failed (${resultResp.status})`); - } - const body = await resultResp.json(); - if (!body.ok) { - throw new Error(body.error || "rpc error"); - } - const first = Array.isArray(body.payloads) ? body.payloads[0] : undefined; - const text$2 = (first?.text ?? "").toString(); - const usage = { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - cost: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - total: 0 - } - }; - const assistant = { + await this.gateway.request("chat.send", { + sessionKey: this.sessionKey, + message: text$2, + attachments: attachments.length ? attachments : undefined, + thinking, + idempotencyKey: runId, + timeoutMs: 3e4 + }); + yield { type: "turn_start" }; + const payload = await pending; + const message = payload?.message || { role: "assistant", content: [{ type: "text", - text: text$2 + text: "" }], - api: cfg.model.api, - provider: cfg.model.provider, - model: cfg.model.id, - usage, - stopReason: "stop", timestamp: Date.now() }; - yield { type: "turn_start" }; yield { type: "message_start", - message: assistant + message }; yield { type: "message_end", - message: assistant + message }; yield { type: "turn_end" }; yield { type: "agent_end" }; } }; const startChat = async () => { - logStatus("boot: fetching session info"); - const { initialMessages, sessionKey, thinkingLevel } = await fetchBootstrap(); logStatus("boot: starting imports"); const { Agent: Agent$1 } = await Promise.resolve().then(() => (init_agent(), agent_exports)); const { ChatPanel: ChatPanel$1 } = await Promise.resolve().then(() => (init_ChatPanel(), ChatPanel_exports)); @@ -196473,12 +196542,37 @@ const startChat = async () => { } const storage = new AppStorage$1(settingsStore, providerKeysStore, sessionsStore, customProvidersStore, backend); setAppStorage$1(storage); - const defaultProvider = "anthropic"; try { - await providerKeysStore.set(defaultProvider, "embedded"); + await providerKeysStore.set("anthropic", "embedded"); } catch (err) { logStatus(`storage warn: could not seed provider key: ${err}`); } + const params = new URLSearchParams(window.location.search); + const sessionKey = params.get("session") || "main"; + const wsUrl = (() => { + const u$4 = new URL(window.location.href); + u$4.protocol = u$4.protocol.replace("http", "ws"); + u$4.port = params.get("gatewayPort") || "18789"; + u$4.pathname = "/"; + u$4.search = ""; + return u$4.toString(); + })(); + logStatus("boot: connecting gateway"); + const gateway = new GatewaySocket(wsUrl); + const hello = await gateway.connect(); + const healthOkRef = { current: Boolean(hello?.snapshot?.health?.ok ?? true) }; + gateway.on("tick", async () => { + try { + const health = await gateway.request("health", {}, { timeoutMs: 5e3 }); + healthOkRef.current = !!health?.ok; + } catch { + healthOkRef.current = false; + } + }); + logStatus("boot: fetching history"); + const history = await gateway.request("chat.history", { sessionKey }); + const initialMessages = Array.isArray(history?.messages) ? history.messages : []; + const thinkingLevel = typeof history?.thinkingLevel === "string" ? history.thinkingLevel : "off"; const agent = new Agent$1({ initialState: { systemPrompt: "You are Clawd (primary session).", @@ -196486,7 +196580,7 @@ const startChat = async () => { thinkingLevel, messages: initialMessages }, - transport: new NativeTransport(sessionKey) + transport: new ChatTransport(sessionKey, gateway, healthOkRef) }); const origPrompt = agent.prompt.bind(agent); agent.prompt = async (input, attachments) => { @@ -196512,58 +196606,6 @@ const startChat = async () => { mount.textContent = ""; mount.appendChild(panel); logStatus("boot: ready"); - let lastSyncedTs = latestTimestamp(initialMessages); - let ws; - let reconnectTimer; - const applySnapshot = (info$1) => { - const messages = Array.isArray(info$1?.messages) ? info$1.messages : []; - const ts = latestTimestamp(messages); - const thinking = typeof info$1?.thinkingLevel === "string" ? info$1.thinkingLevel : "off"; - if (!agent.state.isStreaming && ts && ts !== lastSyncedTs) { - agent.replaceMessages(messages); - lastSyncedTs = ts; - } - if (thinking && thinking !== agent.state.thinkingLevel) { - agent.setThinkingLevel(thinking); - if (panel?.agentInterface) { - panel.agentInterface.sessionThinkingLevel = thinking; - panel.agentInterface.pendingThinkingLevel = null; - if (panel.agentInterface._messageEditor) { - panel.agentInterface._messageEditor.thinkingLevel = thinking; - } - } - } - }; - const connectSocket = () => { - try { - const wsUrl = new URL(`./socket?session=${encodeURIComponent(sessionKey)}`, window.location.href); - wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); - ws = new WebSocket(wsUrl); - ws.onmessage = (ev) => { - try { - const data = JSON.parse(ev.data); - if (data?.type === "session") applySnapshot(data); - } catch (err) { - console.warn("ws message parse failed", err); - } - }; - ws.onclose = () => { - ws = null; - if (!reconnectTimer) { - reconnectTimer = setTimeout(() => { - reconnectTimer = null; - connectSocket(); - }, 2e3); - } - }; - ws.onerror = () => { - ws?.close(); - }; - } catch (err) { - console.warn("ws connect failed", err); - } - }; - connectSocket(); }; startChat().catch((err) => { const msg = err?.stack || err?.message || String(err); @@ -196575,4 +196617,4 @@ startChat().catch((err) => { document.body.innerText = "Web chat failed to load:\\n" + msg; }); -//#endregion +//#endregion \ No newline at end of file diff --git a/docs/mac/webchat.md b/docs/mac/webchat.md index 75a9b495d..9189d1325 100644 --- a/docs/mac/webchat.md +++ b/docs/mac/webchat.md @@ -14,8 +14,8 @@ The macOS menu bar app opens the gateway’s loopback web chat server in a WKWeb - WK logs: navigation lifecycle, readyState, js location, and JS errors/unhandled rejections are mirrored to OSLog for easier diagnosis. ## How it’s wired -- Assets: `apps/macos/Sources/Clawdis/Resources/WebChat/` contains the `pi-web-ui` dist plus a local import map pointing at bundled vendor modules and a tiny `pi-ai` stub. Everything is served from the gateway at `/` (legacy `/webchat/*` still works). -- Bridge: none. The web UI calls `/webchat/rpc` directly; Swift no longer proxies messages. RPC is handled in-process inside the gateway (no CLI spawn/PATH dependency). +- Assets: `apps/macos/Sources/Clawdis/Resources/WebChat/` contains the `pi-web-ui` dist plus a local import map pointing at bundled vendor modules and a tiny `pi-ai` stub. Everything is served from the static host at `/` (legacy `/webchat/*` still works). +- Bridge: none. The web UI connects directly to the Gateway WebSocket (default 18789) and uses `chat.history`/`chat.send` plus `chat/presence/tick/health` events. No `/rpc` or file-watcher socket path remains. - Session: always primary; multiple transports (WhatsApp/Telegram/Desktop) share the same session key so context is unified. ## Security / surface area diff --git a/docs/refactor/webagent-session.md b/docs/refactor/webagent-session.md new file mode 100644 index 000000000..d26d5e81c --- /dev/null +++ b/docs/refactor/webagent-session.md @@ -0,0 +1,39 @@ +# WebAgent session migration (WS-only) + +Context: web chat currently lives in a WKWebView that loads the pi-web bundle. Sends go over HTTP `/rpc` to the webchat server, and updates come from `/socket` snapshots based on session JSONL file changes. The Gateway itself already speaks WebSocket to the webchat server, and Pi writes the session JSONL files. This doc tracks the plan to move WebChat to a single Gateway WebSocket and drop the HTTP shim/file-watching. + +## Target state +- Gateway WS adds methods: + - `chat.history { sessionKey }` → `{ sessionKey, messages[], thinkingLevel }` (reads the existing JSONL + sessions.json). + - `chat.send { sessionKey, message, attachments?, thinking?, deliver?, timeoutMs<=30000, idempotencyKey }` → `res { runId, status:"accepted" }` or `res ok:false` on validation/timeout. +- Gateway WS emits `chat` events `{ runId, sessionKey, seq, state:"delta"|"final"|"error", message?, errorMessage?, usage?, stopReason? }`. Streaming is optional; minimum is a single `state:"final"` per send. +- Client consumes only WS: bootstrap via `chat.history`, send via `chat.send`, live updates via `chat` events. No file watchers. +- Health gate: client subscribes to `health` and blocks send when health is not OK; 30s client-side timeout for sends. +- Tunneling: only the Gateway WS port needs to be forwarded; HTTP server remains for static assets but no RPC endpoints. + +## Server work (Node) +- Implement `chat.history` and `chat.send` handlers in `src/gateway/server.ts`; update protocol schemas/tests. +- Emit `chat` events by plumbing `agentCommand`/`emitAgentEvent` outputs; include assistant text/tool results. +- Remove `/rpc` and `/socket` routes + file-watch broadcast from `src/webchat/server.ts`; leave static host only. + +## Client work (pi-web bundle) +- Replace `NativeTransport` with a Gateway WS client: + - `hello` → `chat.history` for initial state. + - Listen to `chat/presence/tick/health`; update UI from events only. + - Send via `chat.send`; mark pending until `chat state:final|error`. + - Enforce health gate + 30s timeout. +- Remove reliance on session file snapshots and `/rpc`. + +## Persistence +- Keep passing `--session <.../.clawdis/sessions/{{SessionId}}.jsonl>` to Pi so it continues writing JSONL. The WS history reader uses the same file; no new store introduced. + +## Docs to update when shipping +- `docs/webchat.md` (WS-only flow, methods/events, health gate, tunnel WS port). +- `docs/mac/webchat.md` (WKWebView now talks Gateway WS; `/rpc`/file-watch removed). +- `docs/architecture.md` / `typebox.md` if protocol methods are listed. +- Optional: add a concise Gateway chat protocol appendix if needed. + +## Open decisions +- Streaming granularity: start with `state:"final"` only, or include token/tool deltas immediately? +- Attachments over WS: text-only initially is OK; confirm before wiring binary/upload path. +- Error shape: use `res ok:false` for validation/timeout, `chat state:"error"` for model/runtime failures. diff --git a/docs/webchat.md b/docs/webchat.md index cc1a8ee0a..d8a705f92 100644 --- a/docs/webchat.md +++ b/docs/webchat.md @@ -1,5 +1,5 @@ --- -summary: "Loopback WebChat server and SSH tunnel usage for chat UI" +summary: "Loopback WebChat static host and Gateway WS usage for chat UI" read_when: - Debugging or configuring WebChat access --- @@ -8,23 +8,21 @@ read_when: Updated: 2025-12-09 ## What it is -- A local web UI for chatting with the Gateway. +- A local web UI for chatting with the Gateway, now WS-only for data. - Static assets served by the WebChat HTTP server (default port **18788**, configurable). -- The WebChat backend holds a single WebSocket connection to the Gateway (`ws://127.0.0.1:18789` by default) for all control/data: history fetch, sends, agent runs, presence. +- The browser/WebView connects directly to the Gateway WebSocket (`ws://127.0.0.1:18789` by default) for history, sends, and events. No file watching or HTTP RPC. - Trust model: access is granted by being on localhost or inside your SSH/Tailscale tunnel. No additional auth prompts once you can reach the box. - `webchat.gatewayPort` config can point at a non-default Gateway port if needed. ## Endpoints -- UI is now served at the root: `http://127.0.0.1:/` (legacy `/webchat/` still works). -- `GET /webchat/info?session=` (alias `/info`) → `{ port, sessionId, initialMessages, basePath }` plus history from the Gateway session store. -- `GET /` (or `/webchat/*`) → static assets. -- `POST /webchat/rpc` (alias `/rpc`) → proxies a chat/agent action through the Gateway connection and returns `{ ok, payloads?, error? }`. +- UI is served at the root: `http://127.0.0.1:/` (legacy `/webchat/` still works). +- `GET /` (or `/webchat/*`) → static assets only. No RPC endpoints. +- Data plane is entirely on the Gateway WS (`ws://127.0.0.1:`): methods `chat.history`, `chat.send`; events `chat`, `presence`, `tick`, `health`. ## How it connects -- On startup, the WebChat server dials the Gateway WebSocket and performs the mandatory `hello` handshake; the `hello-ok` snapshot seeds presence + health immediately. -- All outgoing sends/agent calls are requests on that WS; streamed events (`agent`, `presence`, `tick`) are forwarded to the browser client. -- If a seq gap is detected in Gateway events, WebChat auto-refreshes health + presence and broadcasts a `gateway-refresh` to connected browsers. -- If the Gateway WS is unavailable, WebChat fails fast and surfaces the error in the UI. +- Browser/WebView performs Gateway WS `hello`, then calls `chat.history` for bootstrap and `chat.send` for sends; listens to `chat/presence/tick/health` events. +- No session file watching. History comes from the Gateway via `chat.history`. +- If Gateway WS is unavailable, the UI surfaces the error and blocks send. ## Remote use - SSH tunnel example: `ssh -N -L 18788:127.0.0.1:18788 -L 18789:127.0.0.1:18789 user@host`. @@ -36,10 +34,10 @@ Updated: 2025-12-09 - Gateway WS port is set by `clawdis gateway --port`; WebChat expects it at 18789 unless overridden. ## Failure handling -- Clear UI error when the Gateway handshake fails or the WS drops. +- UI errors when the Gateway handshake fails or the WS drops; no HTTP fallback. - WebChat does not attempt fallback transports; the Gateway WS is required. ## Dev notes - Assets live in `apps/macos/Sources/Clawdis/Resources/WebChat`. -- Server implementation: `src/webchat/server.ts`. -- macOS glue: `WebChatWindow.swift` + `WebChatTunnel` for SSH -L helpers. +- Static host: `src/webchat/server.ts` (loopback-only HTTP). +- macOS glue: `WebChatWindow.swift` + `WebChatTunnel` for SSH -L helpers; WKWebView talks directly to Gateway WS. diff --git a/src/auto-reply/envelope.test.ts b/src/auto-reply/envelope.test.ts index 12ef60086..256353186 100644 --- a/src/auto-reply/envelope.test.ts +++ b/src/auto-reply/envelope.test.ts @@ -13,7 +13,9 @@ describe("formatAgentEnvelope", () => { timestamp: ts, body: "hello", }); - expect(body).toBe("[WebChat user1 mac-mini 10.0.0.5 2025-01-02 03:04] hello"); + expect(body).toBe( + "[WebChat user1 mac-mini 10.0.0.5 2025-01-02 03:04] hello", + ); }); it("handles missing optional fields", () => { diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index ff4122564..405be01a2 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -3,7 +3,7 @@ import crypto from "node:crypto"; import { lookupContextTokens } from "../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL } from "../agents/defaults.js"; import { resolveBundledPiBinary } from "../agents/pi-path.js"; -import { loadConfig, type ClawdisConfig } from "../config/config.js"; +import { type ClawdisConfig, loadConfig } from "../config/config.js"; import { DEFAULT_IDLE_MINUTES, DEFAULT_RESET_TRIGGER, diff --git a/src/cli/program.ts b/src/cli/program.ts index 0092afcc0..bdb1efc67 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -14,12 +14,7 @@ import { defaultRuntime } from "../runtime.js"; import { VERSION } from "../version.js"; import { startWebChatServer } from "../webchat/server.js"; import { createDefaultDeps } from "./deps.js"; -import { - forceFreePort, - listPortListeners, - PortProcess, - parseLsofOutput, -} from "./ports.js"; +import { forceFreePort, listPortListeners } from "./ports.js"; export function buildProgram() { const program = new Command(); @@ -217,7 +212,7 @@ Examples: } }); - program + program; const gateway = program .command("gateway") .description("Run the WebSocket Gateway") diff --git a/src/commands/agent.ts b/src/commands/agent.ts index a2d522b70..aad1b3d46 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -12,7 +12,7 @@ import { type VerboseLevel, } from "../auto-reply/thinking.js"; import { type CliDeps, createDefaultDeps } from "../cli/deps.js"; -import { loadConfig, type ClawdisConfig } from "../config/config.js"; +import { type ClawdisConfig, loadConfig } from "../config/config.js"; import { DEFAULT_IDLE_MINUTES, loadSessionStore, diff --git a/src/commands/health.ts b/src/commands/health.ts index a3948d590..f1655385b 100644 --- a/src/commands/health.ts +++ b/src/commands/health.ts @@ -1,6 +1,3 @@ -import fs from "node:fs"; -import path from "node:path"; - import { loadConfig } from "../config/config.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import { info } from "../globals.js"; diff --git a/src/commands/send.ts b/src/commands/send.ts index 23bc6be88..3b3cd0000 100644 --- a/src/commands/send.ts +++ b/src/commands/send.ts @@ -1,9 +1,9 @@ import type { CliDeps } from "../cli/deps.js"; import { listPortListeners } from "../cli/ports.js"; -import { info, success } from "../globals.js"; -import type { RuntimeEnv } from "../runtime.js"; import { callGateway, randomIdempotencyKey } from "../gateway/call.js"; import { startGatewayServer } from "../gateway/server.js"; +import { success } from "../globals.js"; +import type { RuntimeEnv } from "../runtime.js"; export async function sendCommand( opts: { @@ -91,7 +91,9 @@ export async function sendCommand( } runtime.log( - success(`✅ Sent via gateway. Message ID: ${result.messageId ?? "unknown"}`), + success( + `✅ Sent via gateway. Message ID: ${result.messageId ?? "unknown"}`, + ), ); if (opts.json) { runtime.log( diff --git a/src/gateway/chat-attachments.ts b/src/gateway/chat-attachments.ts new file mode 100644 index 000000000..7e7d97834 --- /dev/null +++ b/src/gateway/chat-attachments.ts @@ -0,0 +1,51 @@ +export type ChatAttachment = { + type?: string; + mimeType?: string; + fileName?: string; + content?: unknown; +}; + +export function buildMessageWithAttachments( + message: string, + attachments: ChatAttachment[] | undefined, + opts?: { maxBytes?: number }, +): string { + const maxBytes = opts?.maxBytes ?? 2_000_000; // 2 MB + if (!attachments || attachments.length === 0) return message; + + const blocks: string[] = []; + + for (const [idx, att] of attachments.entries()) { + if (!att) continue; + const mime = att.mimeType ?? ""; + const content = att.content; + const label = att.fileName || att.type || `attachment-${idx + 1}`; + + if (typeof content !== "string") { + throw new Error(`attachment ${label}: content must be base64 string`); + } + if (!mime.startsWith("image/")) { + throw new Error(`attachment ${label}: only image/* supported`); + } + + let sizeBytes = 0; + try { + sizeBytes = Buffer.from(content, "base64").byteLength; + } catch { + throw new Error(`attachment ${label}: invalid base64 content`); + } + if (sizeBytes <= 0 || sizeBytes > maxBytes) { + throw new Error( + `attachment ${label}: exceeds size limit (${sizeBytes} > ${maxBytes} bytes)`, + ); + } + + const safeLabel = label.replace(/\s+/g, "_"); + const dataUrl = `![${safeLabel}](data:${mime};base64,${content})`; + blocks.push(dataUrl); + } + + if (blocks.length === 0) return message; + const separator = message.trim().length > 0 ? "\n\n" : ""; + return `${message}${separator}${blocks.join("\n\n")}`; +} diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 65fea133f..cb68697f6 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -3,6 +3,10 @@ import { type AgentEvent, AgentEventSchema, AgentParamsSchema, + type ChatEvent, + ChatEventSchema, + ChatHistoryParamsSchema, + ChatSendParamsSchema, ErrorCodes, type ErrorShape, ErrorShapeSchema, @@ -51,6 +55,9 @@ export const validateRequestFrame = ajv.compile(RequestFrameSchema); export const validateSendParams = ajv.compile(SendParamsSchema); export const validateAgentParams = ajv.compile(AgentParamsSchema); +export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema); +export const validateChatSendParams = ajv.compile(ChatSendParamsSchema); +export const validateChatEvent = ajv.compile(ChatEventSchema); export function formatValidationErrors( errors: ErrorObject[] | null | undefined, @@ -72,8 +79,11 @@ export { ErrorShapeSchema, StateVersionSchema, AgentEventSchema, + ChatEventSchema, SendParamsSchema, AgentParamsSchema, + ChatHistoryParamsSchema, + ChatSendParamsSchema, TickEventSchema, ShutdownEventSchema, ProtocolSchemas, @@ -95,6 +105,7 @@ export type { ErrorShape, StateVersion, AgentEvent, + ChatEvent, TickEvent, ShutdownEvent, }; diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index 80185e95f..f4fddd402 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -219,6 +219,45 @@ export const AgentParamsSchema = Type.Object( { additionalProperties: false }, ); +// WebChat/WebSocket-native chat methods +export const ChatHistoryParamsSchema = Type.Object( + { + sessionKey: NonEmptyString, + }, + { additionalProperties: false }, +); + +export const ChatSendParamsSchema = Type.Object( + { + sessionKey: NonEmptyString, + message: NonEmptyString, + thinking: Type.Optional(Type.String()), + deliver: Type.Optional(Type.Boolean()), + attachments: Type.Optional(Type.Array(Type.Unknown())), + timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })), + idempotencyKey: NonEmptyString, + }, + { additionalProperties: false }, +); + +export const ChatEventSchema = Type.Object( + { + runId: NonEmptyString, + sessionKey: NonEmptyString, + seq: Type.Integer({ minimum: 0 }), + state: Type.Union([ + Type.Literal("delta"), + Type.Literal("final"), + Type.Literal("error"), + ]), + message: Type.Optional(Type.Unknown()), + errorMessage: Type.Optional(Type.String()), + usage: Type.Optional(Type.Unknown()), + stopReason: Type.Optional(Type.String()), + }, + { additionalProperties: false }, +); + export const ProtocolSchemas: Record = { Hello: HelloSchema, HelloOk: HelloOkSchema, @@ -234,6 +273,9 @@ export const ProtocolSchemas: Record = { AgentEvent: AgentEventSchema, SendParams: SendParamsSchema, AgentParams: AgentParamsSchema, + ChatHistoryParams: ChatHistoryParamsSchema, + ChatSendParams: ChatSendParamsSchema, + ChatEvent: ChatEventSchema, TickEvent: TickEventSchema, ShutdownEvent: ShutdownEventSchema, }; @@ -252,6 +294,7 @@ export type PresenceEntry = Static; export type ErrorShape = Static; export type StateVersion = Static; export type AgentEvent = Static; +export type ChatEvent = Static; export type TickEvent = Static; export type ShutdownEvent = Static; diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index b54ee5f40..4b22f980a 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -544,6 +544,54 @@ describe("gateway server", () => { await server.close(); }); + test("chat.send accepts image attachment", { timeout: 12000 }, async () => { + const { server, ws } = await startServerWithClient(); + ws.send( + JSON.stringify({ + type: "hello", + minProtocol: 1, + maxProtocol: 1, + client: { name: "test", version: "1", platform: "test", mode: "test" }, + caps: [], + }), + ); + await onceMessage(ws, (o) => o.type === "hello-ok"); + + const reqId = "chat-img"; + ws.send( + JSON.stringify({ + type: "req", + id: reqId, + method: "chat.send", + params: { + sessionKey: "main", + message: "see image", + idempotencyKey: "idem-img", + attachments: [ + { + type: "image", + mimeType: "image/png", + fileName: "dot.png", + content: + "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/woAAn8B9FD5fHAAAAAASUVORK5CYII=", + }, + ], + }, + }), + ); + + const res = await onceMessage( + ws, + (o) => o.type === "res" && o.id === reqId, + 8000, + ); + expect(res.ok).toBe(true); + expect(res.payload?.runId).toBeDefined(); + + ws.close(); + await server.close(); + }); + test("presence includes client fingerprint", async () => { const { server, ws } = await startServerWithClient(); ws.send( diff --git a/src/gateway/server.ts b/src/gateway/server.ts index dcfa0c0fd..62563ad0f 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -1,15 +1,23 @@ -import chalk from "chalk"; import { randomUUID } from "node:crypto"; +import fs from "node:fs"; import os from "node:os"; +import path from "node:path"; +import chalk from "chalk"; import { type WebSocket, WebSocketServer } from "ws"; -import { GatewayLockError, acquireGatewayLock } from "../infra/gateway-lock.js"; import { createDefaultDeps } from "../cli/deps.js"; import { agentCommand } from "../commands/agent.js"; import { getHealthSnapshot } from "../commands/health.js"; import { getStatusSummary } from "../commands/status.js"; import { loadConfig } from "../config/config.js"; +import { + loadSessionStore, + resolveStorePath, + type SessionEntry, + saveSessionStore, +} from "../config/sessions.js"; import { isVerbose } from "../globals.js"; import { onAgentEvent } from "../infra/agent-events.js"; +import { acquireGatewayLock, GatewayLockError } from "../infra/gateway-lock.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { listSystemPresence, @@ -23,6 +31,7 @@ import { monitorTelegramProvider } from "../telegram/monitor.js"; import { sendMessageTelegram } from "../telegram/send.js"; import { sendMessageWhatsApp } from "../web/outbound.js"; import { ensureWebChatServerFromConfig } from "../webchat/server.js"; +import { buildMessageWithAttachments } from "./chat-attachments.js"; import { ErrorCodes, type ErrorShape, @@ -33,6 +42,8 @@ import { type RequestFrame, type Snapshot, validateAgentParams, + validateChatHistoryParams, + validateChatSendParams, validateHello, validateRequestFrame, validateSendParams, @@ -51,9 +62,12 @@ const METHODS = [ "system-event", "send", "agent", + // WebChat WebSocket-native chat methods + "chat.history", + "chat.send", ]; -const EVENTS = ["agent", "presence", "tick", "shutdown"]; +const EVENTS = ["agent", "chat", "presence", "tick", "shutdown"]; export type GatewayServer = { close: () => Promise; @@ -93,6 +107,9 @@ type DedupeEntry = { error?: ErrorShape; }; const dedupe = new Map(); +// Map runId -> sessionKey for chat events (WS WebChat clients). +const chatRunSessions = new Map(); +const chatRunBuffers = new Map(); const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN; @@ -103,12 +120,73 @@ function formatForLog(value: unknown): string { ? String(value) : JSON.stringify(value); if (!str) return ""; - return str.length > LOG_VALUE_LIMIT ? `${str.slice(0, LOG_VALUE_LIMIT)}...` : str; + return str.length > LOG_VALUE_LIMIT + ? `${str.slice(0, LOG_VALUE_LIMIT)}...` + : str; } catch { return String(value); } } +function readSessionMessages( + sessionId: string, + storePath: string | undefined, +): unknown[] { + const candidates: string[] = []; + if (storePath) { + const dir = path.dirname(storePath); + candidates.push(path.join(dir, `${sessionId}.jsonl`)); + } + candidates.push( + path.join(os.homedir(), ".clawdis", "sessions", `${sessionId}.jsonl`), + ); + candidates.push( + path.join(os.homedir(), ".pi", "agent", "sessions", `${sessionId}.jsonl`), + ); + candidates.push( + path.join( + os.homedir(), + ".tau", + "agent", + "sessions", + "clawdis", + `${sessionId}.jsonl`, + ), + ); + + const filePath = candidates.find((p) => fs.existsSync(p)); + if (!filePath) return []; + + const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/); + const messages: unknown[] = []; + for (const line of lines) { + if (!line.trim()) continue; + try { + const parsed = JSON.parse(line); + // pi/tau logs either raw message or wrapper { message } + if (parsed?.message) { + messages.push(parsed.message); + } else if (parsed?.role && parsed?.content) { + messages.push(parsed); + } + } catch { + // ignore bad lines + } + } + return messages; +} + +function loadSessionEntry(sessionKey: string) { + const cfg = loadConfig(); + const sessionCfg = cfg.inbound?.reply?.session; + const storePath = sessionCfg?.store + ? resolveStorePath(sessionCfg.store) + : resolveStorePath(undefined); + const store = loadSessionStore(storePath); + const entry = store[sessionKey]; + return { cfg, storePath, store, entry }; +} + function logWs( direction: "in" | "out", kind: string, @@ -134,7 +212,9 @@ function logWs( coloredMeta.push(`${chalk.dim(key)}=${formatForLog(value)}`); } } - const line = coloredMeta.length ? `${prefix} ${coloredMeta.join(" ")}` : prefix; + const line = coloredMeta.length + ? `${prefix} ${coloredMeta.join(" ")}` + : prefix; console.log(line); } @@ -143,7 +223,8 @@ function formatError(err: unknown): string { if (typeof err === "string") return err; const status = (err as { status?: unknown })?.status; const code = (err as { code?: unknown })?.code; - if (status || code) return `status=${status ?? "unknown"} code=${code ?? "unknown"}`; + if (status || code) + return `status=${status ?? "unknown"} code=${code ?? "unknown"}`; return JSON.stringify(err, null, 2); } @@ -287,6 +368,48 @@ export async function startGatewayServer(port = 18789): Promise { } agentRunSeq.set(evt.runId, evt.seq); broadcast("agent", evt); + + const sessionKey = chatRunSessions.get(evt.runId); + if (sessionKey) { + // Map agent bus events to chat events for WS WebChat clients. + const base = { + runId: evt.runId, + sessionKey, + seq: evt.seq, + }; + if (evt.stream === "assistant" && typeof evt.data?.text === "string") { + const buf = chatRunBuffers.get(evt.runId) ?? []; + buf.push(evt.data.text); + chatRunBuffers.set(evt.runId, buf); + } else if ( + evt.stream === "job" && + typeof evt.data?.state === "string" && + (evt.data.state === "done" || evt.data.state === "error") + ) { + const text = chatRunBuffers.get(evt.runId)?.join("\n").trim() ?? ""; + chatRunBuffers.delete(evt.runId); + if (evt.data.state === "done") { + broadcast("chat", { + ...base, + state: "final", + message: text + ? { + role: "assistant", + content: [{ type: "text", text }], + timestamp: Date.now(), + } + : undefined, + }); + } else { + broadcast("chat", { + ...base, + state: "error", + errorMessage: evt.data.error ? String(evt.data.error) : undefined, + }); + } + chatRunSessions.delete(evt.runId); + } + } }); wss.on("connection", (socket) => { @@ -500,6 +623,163 @@ export async function startGatewayServer(port = 18789): Promise { respond(true, health, undefined); break; } + case "chat.history": { + const params = (req.params ?? {}) as Record; + if (!validateChatHistoryParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`, + ), + ); + break; + } + const { sessionKey } = params as { sessionKey: string }; + const { storePath, entry } = loadSessionEntry(sessionKey); + const sessionId = entry?.sessionId; + const messages = + sessionId && storePath + ? readSessionMessages(sessionId, storePath) + : []; + const thinkingLevel = + entry?.thinkingLevel ?? + loadConfig().inbound?.reply?.thinkingDefault ?? + "off"; + respond(true, { sessionKey, sessionId, messages, thinkingLevel }); + break; + } + case "chat.send": { + const params = (req.params ?? {}) as Record; + if (!validateChatSendParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`, + ), + ); + break; + } + const p = params as { + sessionKey: string; + message: string; + thinking?: string; + deliver?: boolean; + attachments?: Array<{ + type?: string; + mimeType?: string; + fileName?: string; + content?: unknown; + }>; + timeoutMs?: number; + idempotencyKey: string; + }; + const timeoutMs = Math.min( + Math.max(p.timeoutMs ?? 30_000, 0), + 30_000, + ); + const normalizedAttachments = + p.attachments?.map((a) => ({ + type: typeof a?.type === "string" ? a.type : undefined, + mimeType: + typeof a?.mimeType === "string" ? a.mimeType : undefined, + fileName: + typeof a?.fileName === "string" ? a.fileName : undefined, + content: + typeof a?.content === "string" + ? a.content + : ArrayBuffer.isView(a?.content) + ? Buffer.from(a.content as ArrayBufferLike).toString( + "base64", + ) + : undefined, + })) ?? []; + let messageWithAttachments = p.message; + if (normalizedAttachments.length > 0) { + try { + messageWithAttachments = buildMessageWithAttachments( + p.message, + normalizedAttachments, + { maxBytes: 5_000_000 }, + ); + } catch (err) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, String(err)), + ); + break; + } + } + const { storePath, store, entry } = loadSessionEntry(p.sessionKey); + const now = Date.now(); + const sessionId = entry?.sessionId ?? randomUUID(); + const sessionEntry: SessionEntry = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + systemSent: entry?.systemSent, + }; + if (store) { + store[p.sessionKey] = sessionEntry; + if (storePath) { + await saveSessionStore(storePath, store); + } + } + chatRunSessions.set(sessionId, p.sessionKey); + + const idem = p.idempotencyKey; + const cached = dedupe.get(`chat:${idem}`); + if (cached) { + respond(cached.ok, cached.payload, cached.error, { + cached: true, + }); + break; + } + + try { + await agentCommand( + { + message: messageWithAttachments, + sessionId, + thinking: p.thinking, + deliver: p.deliver, + timeout: Math.ceil(timeoutMs / 1000).toString(), + surface: "WebChat", + }, + defaultRuntime, + deps, + ); + const payload = { + runId: sessionId, + status: "ok" as const, + }; + dedupe.set(`chat:${idem}`, { ts: Date.now(), ok: true, payload }); + respond(true, payload, undefined, { runId: sessionId }); + } catch (err) { + const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); + const payload = { + runId: sessionId, + status: "error" as const, + summary: String(err), + }; + dedupe.set(`chat:${idem}`, { + ts: Date.now(), + ok: false, + payload, + error, + }); + respond(false, payload, error, { + runId: sessionId, + error: formatForLog(err), + }); + } + break; + } case "status": { const status = await getStatusSummary(); respond(true, status, undefined); @@ -640,7 +920,9 @@ export async function startGatewayServer(port = 18789): Promise { const idem = params.idempotencyKey; const cached = dedupe.get(`agent:${idem}`); if (cached) { - respond(cached.ok, cached.payload, cached.error, { cached: true }); + respond(cached.ok, cached.payload, cached.error, { + cached: true, + }); break; } const message = params.message.trim(); @@ -773,6 +1055,8 @@ export async function startGatewayServer(port = 18789): Promise { /* ignore */ } } + chatRunSessions.clear(); + chatRunBuffers.clear(); for (const c of clients) { try { c.socket.close(1012, "service restart"); diff --git a/src/infra/provider-summary.ts b/src/infra/provider-summary.ts index e6f673379..8ba07369e 100644 --- a/src/infra/provider-summary.ts +++ b/src/infra/provider-summary.ts @@ -1,5 +1,5 @@ import chalk from "chalk"; -import { loadConfig, type ClawdisConfig } from "../config/config.js"; +import { type ClawdisConfig, loadConfig } from "../config/config.js"; import { normalizeE164 } from "../utils.js"; import { getWebAuthAgeMs, diff --git a/src/infra/restart.ts b/src/infra/restart.ts index 275f5c1eb..46c93b38b 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -3,9 +3,7 @@ import { spawn } from "node:child_process"; const DEFAULT_LAUNCHD_LABEL = "com.steipete.clawdis"; export function triggerClawdisRestart(): void { - const label = - process.env.CLAWDIS_LAUNCHD_LABEL || - DEFAULT_LAUNCHD_LABEL; + const label = process.env.CLAWDIS_LAUNCHD_LABEL || DEFAULT_LAUNCHD_LABEL; const uid = typeof process.getuid === "function" ? process.getuid() : undefined; const target = uid !== undefined ? `gui/${uid}/${label}` : label; diff --git a/src/logging.ts b/src/logging.ts index b4af3b341..4b479bb20 100644 --- a/src/logging.ts +++ b/src/logging.ts @@ -3,7 +3,7 @@ import path from "node:path"; import util from "node:util"; import { Logger as TsLogger } from "tslog"; -import { loadConfig, type ClawdisConfig } from "./config/config.js"; +import { type ClawdisConfig, loadConfig } from "./config/config.js"; import { isVerbose } from "./globals.js"; // Pin to /tmp so mac Debug UI and docs match; os.tmpdir() can be a per-user diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 59ec33f7b..be40716e6 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -45,7 +45,8 @@ export async function sendMessageTelegram( const api = opts.api ?? bot?.api; const mediaUrl = opts.mediaUrl?.trim(); - const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); const sendWithRetry = async (fn: () => Promise, label: string) => { let lastErr: unknown; for (let attempt = 1; attempt <= 3; attempt++) { @@ -53,12 +54,17 @@ export async function sendMessageTelegram( return await fn(); } catch (err) { lastErr = err; - const terminal = attempt === 3 || - !/429|timeout|connect|reset|closed|unavailable|temporarily/i.test(String(err ?? "")); + const terminal = + attempt === 3 || + !/429|timeout|connect|reset|closed|unavailable|temporarily/i.test( + String(err ?? ""), + ); if (terminal) break; const backoff = 400 * attempt; if (opts.verbose) { - console.warn(`telegram send retry ${attempt}/2 for ${label} in ${backoff}ms: ${String(err)}`); + console.warn( + `telegram send retry ${attempt}/2 for ${label} in ${backoff}ms: ${String(err)}`, + ); } await sleep(backoff); } @@ -80,13 +86,25 @@ export async function sendMessageTelegram( | Awaited> | Awaited>; if (kind === "image") { - result = await sendWithRetry(() => api.sendPhoto(chatId, file, { caption }), "photo"); + result = await sendWithRetry( + () => api.sendPhoto(chatId, file, { caption }), + "photo", + ); } else if (kind === "video") { - result = await sendWithRetry(() => api.sendVideo(chatId, file, { caption }), "video"); + result = await sendWithRetry( + () => api.sendVideo(chatId, file, { caption }), + "video", + ); } else if (kind === "audio") { - result = await sendWithRetry(() => api.sendAudio(chatId, file, { caption }), "audio"); + result = await sendWithRetry( + () => api.sendAudio(chatId, file, { caption }), + "audio", + ); } else { - result = await sendWithRetry(() => api.sendDocument(chatId, file, { caption }), "document"); + result = await sendWithRetry( + () => api.sendDocument(chatId, file, { caption }), + "document", + ); } const messageId = String(result?.message_id ?? "unknown"); return { messageId, chatId: String(result?.chat?.id ?? chatId) }; diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 39d43c06c..fb60b3ed5 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -680,8 +680,12 @@ describe("web auto-reply", () => { expect(resolver).toHaveBeenCalledTimes(1); const args = resolver.mock.calls[0][0]; - expect(args.Body).toContain("[WhatsApp +1 2025-01-01 00:00] [clawdis] first"); - expect(args.Body).toContain("[WhatsApp +1 2025-01-01 01:00] [clawdis] second"); + expect(args.Body).toContain( + "[WhatsApp +1 2025-01-01 00:00] [clawdis] first", + ); + expect(args.Body).toContain( + "[WhatsApp +1 2025-01-01 01:00] [clawdis] second", + ); // Max listeners bumped to avoid warnings in multi-instance test runs expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 89d4d6607..2cb6a8f72 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1,4 +1,5 @@ import { chunkText } from "../auto-reply/chunk.js"; +import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { waitForever } from "../cli/wait.js"; @@ -10,7 +11,7 @@ import { resolveStorePath, saveSessionStore, } from "../config/sessions.js"; -import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; +import { danger, isVerbose, logVerbose, success } from "../globals.js"; import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { logInfo } from "../logger.js"; @@ -18,10 +19,10 @@ import { getChildLogger } from "../logging.js"; import { getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { jidToE164, normalizeE164 } from "../utils.js"; +import { setActiveWebListener } from "./active-listener.js"; import { monitorWebInbox } from "./inbound.js"; import { loadWebMedia } from "./media.js"; import { sendMessageWhatsApp } from "./outbound.js"; -import { setActiveWebListener } from "./active-listener.js"; import { computeBackoff, newConnectionId, @@ -31,7 +32,6 @@ import { sleepWithAbort, } from "./reconnect.js"; import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js"; -import { formatAgentEnvelope } from "../auto-reply/envelope.js"; const WEB_TEXT_LIMIT = 4000; const DEFAULT_GROUP_HISTORY_LIMIT = 50; @@ -494,7 +494,8 @@ async function deliverWebReply(params: { ? [replyResult.mediaUrl] : []; - const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); const sendWithRetry = async ( fn: () => Promise, @@ -1401,11 +1402,11 @@ export async function monitorWebProvider( }, "web reconnect: scheduling retry", ); - runtime.error( - danger( - `WhatsApp Web connection closed (status ${status}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}… (${errorStr})`, - ), - ); + runtime.error( + danger( + `WhatsApp Web connection closed (status ${status}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}… (${errorStr})`, + ), + ); await closeListener(); try { await sleep(delay, abortSignal); diff --git a/src/web/outbound.ts b/src/web/outbound.ts index 864405517..1c72a6bb1 100644 --- a/src/web/outbound.ts +++ b/src/web/outbound.ts @@ -6,8 +6,8 @@ import { logVerbose } from "../globals.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; import { toWhatsappJid } from "../utils.js"; -import { loadWebMedia } from "./media.js"; import { getActiveWebListener } from "./active-listener.js"; +import { loadWebMedia } from "./media.js"; import { createWaSocket, waitForWaConnection } from "./session.js"; export async function sendMessageWhatsApp( @@ -18,7 +18,9 @@ export async function sendMessageWhatsApp( const correlationId = randomUUID(); const active = getActiveWebListener(); const usingActive = Boolean(active); - const sock = usingActive ? null : await createWaSocket(false, options.verbose); + const sock = usingActive + ? null + : await createWaSocket(false, options.verbose); const logger = getChildLogger({ module: "web-outbound", correlationId, @@ -29,9 +31,12 @@ export async function sendMessageWhatsApp( if (!usingActive) { logInfo("🔌 Connecting to WhatsApp Web…"); logger.info("connecting to whatsapp web"); - await waitForWaConnection(sock!); + if (!sock) { + throw new Error("WhatsApp socket unavailable"); + } + await waitForWaConnection(sock); try { - await sock!.sendPresenceUpdate("composing", jid); + await sock.sendPresenceUpdate("composing", jid); } catch (err) { logVerbose(`Presence update skipped: ${String(err)}`); } @@ -82,6 +87,7 @@ export async function sendMessageWhatsApp( ); const result = usingActive ? await (async () => { + if (!active) throw new Error("Active web listener missing"); let mediaBuffer: Buffer | undefined; let mediaType: string | undefined; if (options.mediaUrl) { @@ -89,13 +95,17 @@ export async function sendMessageWhatsApp( mediaBuffer = media.buffer; mediaType = media.contentType; } - await active!.sendComposingTo(to); - return active!.sendMessage(to, body, mediaBuffer, mediaType); + await active.sendComposingTo(to); + return active.sendMessage(to, body, mediaBuffer, mediaType); })() - : await sock!.sendMessage(jid, payload); + : await (async () => { + if (!sock) throw new Error("WhatsApp socket unavailable"); + return sock.sendMessage(jid, payload); + })(); const messageId = usingActive - ? (result as { messageId?: string })?.messageId ?? "unknown" - : (result as any)?.key?.id ?? "unknown"; + ? ((result as { messageId?: string })?.messageId ?? "unknown") + : ((result as { key?: { id?: string } } | undefined)?.key?.id ?? + "unknown"); logInfo( `✅ Sent via web session. Message ID: ${messageId} -> ${jid}${options.mediaUrl ? " (media)" : ""}`, ); diff --git a/src/webchat/server.test.ts b/src/webchat/server.test.ts index 9191fed13..b55af2c5c 100644 --- a/src/webchat/server.test.ts +++ b/src/webchat/server.test.ts @@ -1,11 +1,7 @@ +import http from "node:http"; import type { AddressInfo } from "node:net"; import { describe, expect, test } from "vitest"; -import { WebSocket } from "ws"; -import { - __forceWebChatSnapshotForTests, - startWebChatServer, - stopWebChatServer, -} from "./server.js"; +import { startWebChatServer, stopWebChatServer } from "./server.js"; async function getFreePort(): Promise { const { createServer } = await import("node:net"); @@ -19,76 +15,30 @@ async function getFreePort(): Promise { }); } -type SnapshotMessage = { - type?: string; - snapshot?: { stateVersion?: { presence?: number } }; -}; -type SessionMessage = { type?: string }; +const fetchText = (url: string) => + new Promise((resolve, reject) => { + http + .get(url, (res) => { + const chunks: Buffer[] = []; + res + .on("data", (c) => + chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c)), + ) + .on("end", () => resolve(Buffer.concat(chunks).toString("utf-8"))) + .on("error", reject); + }) + .on("error", reject); + }); -describe("webchat server", () => { - test( - "hydrates snapshot to new sockets (offline mock)", - { timeout: 8000 }, - async () => { - const wPort = await getFreePort(); - await startWebChatServer(wPort, undefined, { disableGateway: true }); - const ws = new WebSocket( - `ws://127.0.0.1:${wPort}/webchat/socket?session=test`, - ); - const messages: unknown[] = []; - ws.on("message", (data) => { - try { - messages.push(JSON.parse(String(data))); - } catch { - /* ignore */ - } - }); - - try { - await new Promise((resolve) => ws.once("open", resolve)); - - __forceWebChatSnapshotForTests({ - presence: [], - health: {}, - stateVersion: { presence: 1, health: 1 }, - uptimeMs: 0, - }); - - const waitFor = async ( - pred: (m: unknown) => m is T, - label: string, - ): Promise => { - const start = Date.now(); - while (Date.now() - start < 3000) { - const found = messages.find((m): m is T => { - try { - return pred(m); - } catch { - return false; - } - }); - if (found) return found; - await new Promise((resolve) => setTimeout(resolve, 10)); - } - throw new Error(`timeout waiting for ${label}`); - }; - - const isSessionMessage = (m: unknown): m is SessionMessage => - typeof m === "object" && - m !== null && - (m as SessionMessage).type === "session"; - const isSnapshotMessage = (m: unknown): m is SnapshotMessage => - typeof m === "object" && - m !== null && - (m as SnapshotMessage).type === "gateway-snapshot"; - - await waitFor(isSessionMessage, "session"); - const snap = await waitFor(isSnapshotMessage, "snapshot"); - expect(snap.snapshot?.stateVersion?.presence).toBe(1); - } finally { - ws.close(); - await stopWebChatServer(); - } - }, - ); +describe("webchat server (static only)", () => { + test("serves index.html over loopback", { timeout: 8000 }, async () => { + const port = await getFreePort(); + await startWebChatServer(port); + try { + const body = await fetchText(`http://127.0.0.1:${port}/`); + expect(body.toLowerCase()).toContain("> = new Map(); -let gateway: GatewayClient | null = null; -let gatewayReady = false; -let latestSnapshot: Record | null = null; -let latestPolicy: Record | null = null; function resolveWebRoot() { const here = path.dirname(fileURLToPath(import.meta.url)); @@ -52,151 +33,6 @@ function resolveWebRoot() { throw new Error(`webchat assets not found; tried: ${candidates.join(", ")}`); } -function readBody(req: http.IncomingMessage): Promise { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - req - .on("data", (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c))) - .on("end", () => resolve(Buffer.concat(chunks))) - .on("error", reject); - }); -} - -function pickSessionId( - sessionKey: string, - store: Record, -): string | null { - if (store[sessionKey]?.sessionId) return store[sessionKey].sessionId; - const first = Object.values(store)[0]?.sessionId; - return first ?? null; -} - -function readSessionMessages( - sessionId: string, - storePath: string, -): ChatMessage[] { - const dir = path.dirname(storePath); - const candidates = [ - path.join(dir, `${sessionId}.jsonl`), - path.join( - os.homedir(), - ".tau/agent/sessions/clawdis", - `${sessionId}.jsonl`, - ), - ]; - let content: string | null = null; - for (const p of candidates) { - if (fs.existsSync(p)) { - try { - content = fs.readFileSync(p, "utf-8"); - break; - } catch { - // continue - } - } - } - if (!content) return []; - - const messages: ChatMessage[] = []; - for (const line of content.split(/\r?\n/)) { - if (!line.trim()) continue; - try { - const obj = JSON.parse(line); - const msg = obj.message ?? obj; - if (!msg?.role || !msg?.content) continue; - messages.push({ role: msg.role, content: msg.content }); - } catch (err) { - logDebug(`webchat history parse error: ${String(err)}`); - } - } - return messages; -} - -function broadcastSession(sessionKey: string, payload: unknown) { - const conns = wsSessions.get(sessionKey); - if (!conns || conns.size === 0) return; - const msg = JSON.stringify(payload); - for (const ws of conns) { - try { - ws.send(msg); - } catch { - // ignore and let close handler prune - } - } -} - -function broadcastAll(payload: unknown) { - const msg = JSON.stringify(payload); - for (const [, conns] of wsSessions) { - for (const ws of conns) { - try { - ws.send(msg); - } catch { - // ignore - } - } - } -} - -async function handleRpc( - body: unknown, - meta?: { remoteAddress?: string | null; senderHost?: string }, -): Promise<{ ok: boolean; payloads?: RpcPayload[]; error?: string }> { - const payload = body as { - text?: unknown; - thinking?: unknown; - deliver?: unknown; - to?: unknown; - timeout?: unknown; - }; - - const textRaw: string = (payload.text ?? "").toString(); - if (!textRaw.trim()) return { ok: false, error: "empty text" }; - if (!gateway || !gatewayReady) { - return { ok: false, error: "gateway unavailable" }; - } - - const thinking = - typeof payload.thinking === "string" ? payload.thinking : undefined; - const to = typeof payload.to === "string" ? payload.to : undefined; - const deliver = Boolean(payload.deliver); - const timeout = - typeof payload.timeout === "number" ? payload.timeout : undefined; - - const idempotencyKey = randomUUID(); - try { - // Wrap user text with surface + host/IP envelope - const message = formatAgentEnvelope({ - surface: "WebChat", - from: meta?.senderHost ?? os.hostname(), - ip: meta?.remoteAddress ?? undefined, - timestamp: Date.now(), - body: textRaw.trim(), - }); - - // Send agent request; wait for final res (status ok/error) - const res = (await gateway.request( - "agent", - { - message, - thinking, - deliver, - to, - timeout, - idempotencyKey, - }, - { expectFinal: true }, - )) as { status?: string; summary?: string }; - if (res?.status && res.status !== "ok") { - return { ok: false, error: res.summary || res.status }; - } - // The actual agent output is delivered via events; HTTP just returns ack. - return { ok: true, payloads: [] }; - } catch (err) { - return { ok: false, error: String(err) }; - } -} - function notFound(res: http.ServerResponse) { res.statusCode = 404; res.end("Not Found"); @@ -204,19 +40,10 @@ function notFound(res: http.ServerResponse) { export async function startWebChatServer( port = WEBCHAT_DEFAULT_PORT, - gatewayOverrideUrl?: string, - opts?: { disableGateway?: boolean }, -) { +): Promise { if (state) return state; const root = resolveWebRoot(); - // Precompute session store root for file watching - const cfg = loadConfig(); - const sessionCfg = cfg.inbound?.reply?.session; - const storePath = sessionCfg?.store - ? resolveStorePath(sessionCfg.store) - : resolveStorePath(undefined); - const storeDir = path.dirname(storePath); const server = http.createServer(async (req, res) => { if (!req.url) return notFound(res); @@ -230,59 +57,14 @@ export async function startWebChatServer( } const url = new URL(req.url, "http://127.0.0.1"); - const isInfo = url.pathname === "/webchat/info" || url.pathname === "/info"; - const isRpc = url.pathname === "/webchat/rpc" || url.pathname === "/rpc"; - - if (isInfo) { - const sessionKey = url.searchParams.get("session") ?? "main"; - const store = loadSessionStore(storePath); - const sessionId = pickSessionId(sessionKey, store); - const messages = sessionId - ? readSessionMessages(sessionId, storePath) - : []; - res.setHeader("Content-Type", "application/json"); - res.end( - JSON.stringify({ - port, - sessionKey, - storePath, - sessionId, - initialMessages: messages, - basePath: "/", - gatewayConnected: gatewayReady, - gatewaySnapshot: latestSnapshot, - gatewayPolicy: latestPolicy, - }), - ); - return; - } - - if (isRpc && req.method === "POST") { - const bodyBuf = await readBody(req); - let body: Record = {}; - try { - body = JSON.parse(bodyBuf.toString("utf-8")); - } catch { - // ignore - } - const forwarded = - (req.headers["x-forwarded-for"] as string | undefined)?.split(",")[0]?.trim() ?? - req.socket.remoteAddress; - const result = await handleRpc(body, { - remoteAddress: forwarded, - senderHost: os.hostname(), - }); - res.setHeader("Content-Type", "application/json"); - res.end(JSON.stringify(result)); - return; - } if (url.pathname.startsWith("/webchat")) { let rel = url.pathname.replace(/^\/webchat\/?/, ""); if (!rel || rel.endsWith("/")) rel = `${rel}index.html`; const filePath = path.join(root, rel); - if (!filePath.startsWith(root)) return notFound(res); - if (!fs.existsSync(filePath)) return notFound(res); + if (!filePath.startsWith(root) || !fs.existsSync(filePath)) { + return notFound(res); + } const data = fs.readFileSync(filePath); const ext = path.extname(filePath).toLowerCase(); const type = @@ -331,172 +113,6 @@ export async function startWebChatServer( ); }); - // Gateway connection (control/data plane) - const cfgObj = loadConfig() as Record; - if (!opts?.disableGateway) { - const cfgGatewayPort = - (cfgObj.webchat as { gatewayPort?: number } | undefined)?.gatewayPort ?? - 18789; - const gatewayUrl = gatewayOverrideUrl ?? `ws://127.0.0.1:${cfgGatewayPort}`; - const gatewayToken = - process.env.CLAWDIS_GATEWAY_TOKEN ?? - (cfgObj.gateway as { token?: string } | undefined)?.token; - gateway = new GatewayClient({ - url: gatewayUrl, - token: gatewayToken, - clientName: "webchat-backend", - clientVersion: - process.env.CLAWDIS_VERSION ?? process.env.npm_package_version ?? "dev", - platform: process.platform, - mode: "webchat", - instanceId: `webchat-${os.hostname()}`, - onHelloOk: (hello) => { - gatewayReady = true; - latestSnapshot = hello.snapshot as Record; - latestPolicy = hello.policy as Record; - broadcastAll({ - type: "gateway-snapshot", - snapshot: hello.snapshot, - policy: hello.policy, - }); - }, - onEvent: (evt) => { - broadcastAll({ - type: "gateway-event", - event: evt.event, - payload: evt.payload, - seq: evt.seq, - stateVersion: evt.stateVersion, - }); - }, - onClose: () => { - gatewayReady = false; - }, - onGap: async () => { - if (!gatewayReady || !gateway) return; - try { - const [health, presence] = await Promise.all([ - gateway.request("health"), - gateway.request("system-presence"), - ]); - latestSnapshot = { - ...latestSnapshot, - health, - presence, - } as Record; - broadcastAll({ type: "gateway-refresh", health, presence }); - } catch (err) { - logError(`webchat gap refresh failed: ${String(err)}`); - } - }, - }); - gateway.start(); - } - - // WebSocket setup for live session updates. - wss = new WebSocketServer({ noServer: true }); - server.on("upgrade", (req, socket, head) => { - try { - const url = new URL(req.url ?? "", "http://127.0.0.1"); - if (url.pathname !== "/webchat/socket" && url.pathname !== "/socket") { - socket.destroy(); - return; - } - const addr = req.socket.remoteAddress ?? ""; - const isLocal = - addr.startsWith("127.") || - addr === "::1" || - addr.endsWith("127.0.0.1") || - addr.endsWith("::ffff:127.0.0.1"); - if (!isLocal) { - socket.destroy(); - return; - } - const sessionKey = url.searchParams.get("session") ?? "main"; - if (!wss) { - socket.destroy(); - return; - } - wss.handleUpgrade(req, socket, head, (ws: WebSocket) => { - ws.on("close", () => { - const set = wsSessions.get(sessionKey); - if (set) { - set.delete(ws); - if (set.size === 0) wsSessions.delete(sessionKey); - } - }); - wsSessions.set( - sessionKey, - (wsSessions.get(sessionKey) ?? new Set()).add(ws), - ); - // Send initial snapshot - const store = loadSessionStore(storePath); - const sessionId = pickSessionId(sessionKey, store); - const sessionEntry = sessionKey ? store[sessionKey] : undefined; - const persistedThinking = sessionEntry?.thinkingLevel; - const messages = sessionId - ? readSessionMessages(sessionId, storePath) - : []; - ws.send( - JSON.stringify({ - type: "session", - sessionKey, - messages, - thinkingLevel: - typeof persistedThinking === "string" - ? persistedThinking - : (cfg.inbound?.reply?.thinkingDefault ?? "off"), - }), - ); - if (latestSnapshot) { - ws.send( - JSON.stringify({ - type: "gateway-snapshot", - snapshot: latestSnapshot, - policy: latestPolicy, - }), - ); - } - }); - } catch (_err) { - socket.destroy(); - } - }); - - // Watch for session/message file changes and push updates. - try { - if (fs.existsSync(storeDir)) { - fs.watch(storeDir, { persistent: false }, (_event, filename) => { - if (!filename) return; - // On any file change, refresh for active sessions. - for (const sessionKey of wsSessions.keys()) { - try { - const store = loadSessionStore(storePath); - const sessionId = pickSessionId(sessionKey, store); - const sessionEntry = sessionKey ? store[sessionKey] : undefined; - const persistedThinking = sessionEntry?.thinkingLevel; - const messages = sessionId - ? readSessionMessages(sessionId, storePath) - : []; - broadcastSession(sessionKey, { - type: "session", - sessionKey, - messages, - thinkingLevel: - typeof persistedThinking === "string" - ? persistedThinking - : (cfg.inbound?.reply?.thinkingDefault ?? "off"), - }); - } catch { - // ignore - } - } - }); - } - } catch { - // watcher is best-effort - } - state = { server, port }; logDebug(`webchat server listening on 127.0.0.1:${port}`); return state; @@ -504,67 +120,31 @@ export async function startWebChatServer( export async function stopWebChatServer() { if (!state) return; - gatewayReady = false; - gateway?.stop(); - gateway = null; - if (wss) { - for (const client of wss.clients) { - try { - client.close(); - } catch { - /* ignore */ - } - } - await new Promise((resolve) => wss?.close(() => resolve())); - } if (state.server) { await new Promise((resolve) => state?.server.close(() => resolve())); } - wss = null; - wsSessions.clear(); state = null; } -export async function waitForWebChatGatewayReady(timeoutMs = 10000) { - const start = Date.now(); - while (!latestSnapshot) { - if (Date.now() - start > timeoutMs) { - throw new Error("webchat gateway not ready"); - } - await new Promise((resolve) => setTimeout(resolve, 50)); - } +// Legacy no-op: gateway readiness is now handled directly by clients. +export async function waitForWebChatGatewayReady() { + return; } -// Test-only helpers to seed/broadcast without a live Gateway connection. -export function __forceWebChatSnapshotForTests( - snapshot: Record, - policy?: Record, -) { - latestSnapshot = snapshot; - latestPolicy = policy ?? null; - gatewayReady = true; - broadcastAll({ - type: "gateway-snapshot", - snapshot: latestSnapshot, - policy: latestPolicy, - }); +export function __forceWebChatSnapshotForTests() { + // no-op: snapshots now come from the Gateway WS directly. } -export function __broadcastGatewayEventForTests( - event: string, - payload: unknown, -) { - broadcastAll({ type: "gateway-event", event, payload }); +export async function __broadcastGatewayEventForTests() { + // no-op } -export async function ensureWebChatServerFromConfig(opts?: { - gatewayUrl?: string; -}) { +export async function ensureWebChatServerFromConfig() { const cfg = loadConfig(); if (cfg.webchat?.enabled === false) return null; const port = cfg.webchat?.port ?? WEBCHAT_DEFAULT_PORT; try { - return await startWebChatServer(port, opts?.gatewayUrl); + return await startWebChatServer(port); } catch (err) { logDebug(`webchat server failed to start: ${String(err)}`); throw err;