diff --git a/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js b/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js index bcccdd2c0..70b032db4 100644 --- a/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js +++ b/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js @@ -198,42 +198,67 @@ const startChat = async () => { mount.appendChild(panel); logStatus("boot: ready"); - // Periodically sync messages + thinking from the session store so webchat stays - // in lock-step with other transports (e.g., WhatsApp or CLI). + // Live sync via WebSocket so other transports (WhatsApp/CLI) appear instantly. let lastSyncedTs = latestTimestamp(initialMessages); - const syncIntervalMs = 3000; - async function syncFromSession() { - if (agent.state.isStreaming) return; // avoid stomping in-flight turns - try { - const infoUrl = new URL(`./info?session=${encodeURIComponent(sessionKey)}`, window.location.href); - const resp = await fetch(infoUrl, { credentials: "omit" }); - if (!resp.ok) return; - const info = await resp.json(); - const messages = Array.isArray(info.initialMessages) ? info.initialMessages : []; - const ts = latestTimestamp(messages); - const thinking = typeof info.thinkingLevel === "string" ? info.thinkingLevel : "off"; + let ws; + let reconnectTimer; - if (ts && ts !== lastSyncedTs) { - agent.replaceMessages(messages); - lastSyncedTs = ts; - } + const applySnapshot = (info) => { + const messages = Array.isArray(info?.messages) ? info.messages : []; + const ts = latestTimestamp(messages); + const thinking = typeof info?.thinkingLevel === "string" ? info.thinkingLevel : "off"; - if (thinking && thinking !== agent.state.thinkingLevel) { - agent.setThinkingLevel(thinking); - if (panel?.agentInterface) { - panel.agentInterface.sessionThinkingLevel = thinking; - panel.agentInterface.pendingThinkingLevel = null; - if (panel.agentInterface._messageEditor) { - panel.agentInterface._messageEditor.thinkingLevel = thinking; - } + if (!agent.state.isStreaming && ts && ts !== lastSyncedTs) { + agent.replaceMessages(messages); + lastSyncedTs = ts; + } + + if (thinking && thinking !== agent.state.thinkingLevel) { + agent.setThinkingLevel(thinking); + if (panel?.agentInterface) { + panel.agentInterface.sessionThinkingLevel = thinking; + panel.agentInterface.pendingThinkingLevel = null; + if (panel.agentInterface._messageEditor) { + panel.agentInterface._messageEditor.thinkingLevel = thinking; } } - } catch (err) { - console.warn("session sync failed", err); } - } + }; - setInterval(syncFromSession, syncIntervalMs); + const connectSocket = () => { + try { + const wsUrl = new URL(`./socket?session=${encodeURIComponent(sessionKey)}`, window.location.href); + wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); + ws = new WebSocket(wsUrl); + + ws.onmessage = (ev) => { + try { + const data = JSON.parse(ev.data); + if (data?.type === "session") applySnapshot(data); + } catch (err) { + console.warn("ws message parse failed", err); + } + }; + + ws.onclose = () => { + ws = null; + if (!reconnectTimer) { + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + connectSocket(); + }, 2000); + } + }; + + ws.onerror = () => { + ws?.close(); + }; + } catch (err) { + console.warn("ws connect failed", err); + } + }; + + connectSocket(); }; startChat().catch((err) => { diff --git a/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js b/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js index da7daf911..596ebf1fa 100644 --- a/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js +++ b/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js @@ -196455,36 +196455,57 @@ const startChat = async () => { mount.appendChild(panel); logStatus("boot: ready"); let lastSyncedTs = latestTimestamp(initialMessages); - const syncIntervalMs = 3e3; - async function syncFromSession() { - if (agent.state.isStreaming) return; - try { - const infoUrl = new URL(`./info?session=${encodeURIComponent(sessionKey)}`, window.location.href); - const resp = await fetch(infoUrl, { credentials: "omit" }); - if (!resp.ok) return; - const info$1 = await resp.json(); - const messages = Array.isArray(info$1.initialMessages) ? info$1.initialMessages : []; - const ts = latestTimestamp(messages); - const thinking = typeof info$1.thinkingLevel === "string" ? info$1.thinkingLevel : "off"; - if (ts && ts !== lastSyncedTs) { - agent.replaceMessages(messages); - lastSyncedTs = ts; - } - if (thinking && thinking !== agent.state.thinkingLevel) { - agent.setThinkingLevel(thinking); - if (panel?.agentInterface) { - panel.agentInterface.sessionThinkingLevel = thinking; - panel.agentInterface.pendingThinkingLevel = null; - if (panel.agentInterface._messageEditor) { - panel.agentInterface._messageEditor.thinkingLevel = thinking; - } + let ws; + let reconnectTimer; + const applySnapshot = (info$1) => { + const messages = Array.isArray(info$1?.messages) ? info$1.messages : []; + const ts = latestTimestamp(messages); + const thinking = typeof info$1?.thinkingLevel === "string" ? info$1.thinkingLevel : "off"; + if (!agent.state.isStreaming && ts && ts !== lastSyncedTs) { + agent.replaceMessages(messages); + lastSyncedTs = ts; + } + if (thinking && thinking !== agent.state.thinkingLevel) { + agent.setThinkingLevel(thinking); + if (panel?.agentInterface) { + panel.agentInterface.sessionThinkingLevel = thinking; + panel.agentInterface.pendingThinkingLevel = null; + if (panel.agentInterface._messageEditor) { + panel.agentInterface._messageEditor.thinkingLevel = thinking; } } - } catch (err) { - console.warn("session sync failed", err); } - } - setInterval(syncFromSession, syncIntervalMs); + }; + const connectSocket = () => { + try { + const wsUrl = new URL(`./socket?session=${encodeURIComponent(sessionKey)}`, window.location.href); + wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); + ws = new WebSocket(wsUrl); + ws.onmessage = (ev) => { + try { + const data = JSON.parse(ev.data); + if (data?.type === "session") applySnapshot(data); + } catch (err) { + console.warn("ws message parse failed", err); + } + }; + ws.onclose = () => { + ws = null; + if (!reconnectTimer) { + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + connectSocket(); + }, 2e3); + } + }; + ws.onerror = () => { + ws?.close(); + }; + } catch (err) { + console.warn("ws connect failed", err); + } + }; + connectSocket(); }; startChat().catch((err) => { const msg = err?.stack || err?.message || String(err); diff --git a/package.json b/package.json index 7230a486b..1ee3f7aad 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,7 @@ "sharp": "^0.34.5", "tslog": "^4.10.2", "undici": "^7.16.0", + "ws": "^8.18.0", "zod": "^4.1.13" }, "devDependencies": { diff --git a/src/webchat/server.ts b/src/webchat/server.ts index d9e68c84b..d980ed314 100644 --- a/src/webchat/server.ts +++ b/src/webchat/server.ts @@ -5,6 +5,7 @@ import os from "node:os"; import path from "node:path"; import { fileURLToPath } from "node:url"; import sharp from "sharp"; +import { WebSocketServer, WebSocket } from "ws"; import { agentCommand } from "../commands/agent.js"; import { loadConfig } from "../config/config.js"; @@ -33,6 +34,8 @@ type AttachmentInput = { type RpcPayload = { role: string; content: string }; let state: WebChatServerState | null = null; +let wss: WebSocketServer | null = null; +const wsSessions: Map> = new Map(); function resolveWebRoot() { const here = path.dirname(fileURLToPath(import.meta.url)); @@ -93,6 +96,19 @@ function readSessionMessages(sessionId: string, storePath: string): ChatMessage[ return messages; } +function broadcastSession(sessionKey: string, payload: any) { + const conns = wsSessions.get(sessionKey); + if (!conns || conns.size === 0) return; + const msg = JSON.stringify(payload); + for (const ws of conns) { + try { + ws.send(msg); + } catch { + // ignore and let close handler prune + } + } +} + async function persistAttachments( attachments: AttachmentInput[], sessionId: string, @@ -225,6 +241,33 @@ async function handleRpc( return { ok: false, error: String(err) }; } + // Push latest session state to any connected webchat clients for this sessionKey. + try { + const cfg = loadConfig(); + const sessionCfg = cfg.inbound?.reply?.session; + const storePath = sessionCfg?.store + ? resolveStorePath(sessionCfg.store) + : resolveStorePath(undefined); + const store = loadSessionStore(storePath); + const persistedSessionId = pickSessionId(sessionKey, store) ?? sessionId; + const messages = persistedSessionId + ? readSessionMessages(persistedSessionId, storePath) + : []; + const sessionEntry = sessionKey ? store[sessionKey] : undefined; + const persistedThinking = sessionEntry?.thinkingLevel; + broadcastSession(sessionKey, { + type: "session", + sessionKey, + messages, + thinkingLevel: + typeof persistedThinking === "string" + ? persistedThinking + : cfg.inbound?.reply?.thinkingDefault ?? "off", + }); + } catch { + // best-effort; ignore broadcast errors + } + const jsonLine = logs.find((l) => l.trim().startsWith("{")); if (!jsonLine) return { ok: false, error: "no agent output" }; try { @@ -244,6 +287,14 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { if (state) return state; const root = resolveWebRoot(); + // Precompute session store root for file watching + const cfg = loadConfig(); + const sessionCfg = cfg.inbound?.reply?.session; + const storePath = sessionCfg?.store + ? resolveStorePath(sessionCfg.store) + : resolveStorePath(undefined); + const storeDir = path.dirname(storePath); + const server = http.createServer(async (req, res) => { if (!req.url) return notFound(res); if (req.socket.remoteAddress && !req.socket.remoteAddress.startsWith("127.")) { @@ -258,11 +309,6 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { if (isInfo) { const sessionKey = url.searchParams.get("session") ?? "main"; - const cfg = loadConfig(); - const sessionCfg = cfg.inbound?.reply?.session; - const storePath = sessionCfg?.store - ? resolveStorePath(sessionCfg.store) - : resolveStorePath(undefined); const store = loadSessionStore(storePath); const sessionId = pickSessionId(sessionKey, store); const messages = sessionId @@ -345,6 +391,84 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { server.listen(port, "127.0.0.1", () => resolve()); }); + // WebSocket setup for live session updates. + wss = new WebSocketServer({ noServer: true }); + server.on("upgrade", (req, socket, head) => { + try { + const url = new URL(req.url ?? "", "http://127.0.0.1"); + if (url.pathname !== "/webchat/socket" && url.pathname !== "/socket") { + socket.destroy(); + return; + } + if (req.socket.remoteAddress && !req.socket.remoteAddress.startsWith("127.")) { + socket.destroy(); + return; + } + const sessionKey = url.searchParams.get("session") ?? "main"; + wss!.handleUpgrade(req, socket, head, (ws) => { + ws.on("close", () => { + const set = wsSessions.get(sessionKey); + if (set) { + set.delete(ws); + if (set.size === 0) wsSessions.delete(sessionKey); + } + }); + wsSessions.set(sessionKey, (wsSessions.get(sessionKey) ?? new Set()).add(ws)); + // Send initial snapshot + const store = loadSessionStore(storePath); + const sessionId = pickSessionId(sessionKey, store); + const sessionEntry = sessionKey ? store[sessionKey] : undefined; + const persistedThinking = sessionEntry?.thinkingLevel; + const messages = sessionId ? readSessionMessages(sessionId, storePath) : []; + ws.send( + JSON.stringify({ + type: "session", + sessionKey, + messages, + thinkingLevel: + typeof persistedThinking === "string" + ? persistedThinking + : cfg.inbound?.reply?.thinkingDefault ?? "off", + }), + ); + }); + } catch (err) { + socket.destroy(); + } + }); + + // Watch for session/message file changes and push updates. + try { + if (fs.existsSync(storeDir)) { + fs.watch(storeDir, { persistent: false }, (event, filename) => { + if (!filename) return; + // On any file change, refresh for active sessions. + for (const sessionKey of wsSessions.keys()) { + try { + const store = loadSessionStore(storePath); + const sessionId = pickSessionId(sessionKey, store); + const sessionEntry = sessionKey ? store[sessionKey] : undefined; + const persistedThinking = sessionEntry?.thinkingLevel; + const messages = sessionId ? readSessionMessages(sessionId, storePath) : []; + broadcastSession(sessionKey, { + type: "session", + sessionKey, + messages, + thinkingLevel: + typeof persistedThinking === "string" + ? persistedThinking + : cfg.inbound?.reply?.thinkingDefault ?? "off", + }); + } catch { + // ignore + } + } + }); + } + } catch { + // watcher is best-effort + } + state = { server, port }; logDebug(`webchat server listening on 127.0.0.1:${port}`); return state;