diff --git a/docs/webchat.md b/docs/webchat.md index c36ddb5da..0fc364721 100644 --- a/docs/webchat.md +++ b/docs/webchat.md @@ -1,33 +1,39 @@ -# Web Chat (loopback + SSH tunnel) +# WebChat (loopback + SSH tunnel) -Updated: 2025-12-08 +Updated: 2025-12-09 -## What shipped -- The relay now starts a loopback-only web chat server automatically (default port **18788**, configurable via `webchat.port`). -- Endpoints: - - `GET /webchat/info?session=` → `{port, sessionId, initialMessages, basePath}` plus history from the relay’s session store. - - `GET /webchat/*` → static Web Chat assets. - - `POST /webchat/rpc` → runs the agent in-process and returns `{ ok, payloads?, error? }` (no CLI spawn, no PATH dependency). -- The macOS app simply loads `http://127.0.0.1:/webchat/?session=` (or the SSH-forwarded port in remote mode). No Swift bridge is used for sends; all chat traffic stays inside the Node relay. -- Initial messages are fetched from `/webchat/info`, so history appears immediately. -- Enable/disable via `webchat.enabled` (default **true**); set the port with `webchat.port`. +## What it is +- A local web UI for chatting with the Gateway. +- Static assets served by the WebChat HTTP server (default port **18788**, configurable). +- The WebChat backend holds a single WebSocket connection to the Gateway (`ws://127.0.0.1:18789` by default) for all control/data: history fetch, sends, agent runs, presence. +- Trust model: access is granted by being on localhost or inside your SSH/Tailscale tunnel. No additional auth prompts once you can reach the box. +- `webchat.gatewayPort` config can point at a non-default Gateway port if needed. -## Security -- Loopback only; remote access requires SSH port-forwarding. -- No bearer token; the trust model is “local machine or your SSH tunnel”. +## Endpoints +- `GET /webchat/info?session=` → `{ port, sessionId, initialMessages, basePath }` plus history from the Gateway session store. +- `GET /webchat/*` → static assets. +- `POST /webchat/rpc` → proxies a chat/agent action through the Gateway connection and returns `{ ok, payloads?, error? }`. + +## How it connects +- On startup, the WebChat server dials the Gateway WebSocket and performs the mandatory `hello` handshake; the `hello-ok` snapshot seeds presence + health immediately. +- All outgoing sends/agent calls are requests on that WS; streamed events (`agent`, `presence`, `tick`) are forwarded to the browser client. +- If a seq gap is detected in Gateway events, WebChat auto-refreshes health + presence and broadcasts a `gateway-refresh` to connected browsers. +- If the Gateway WS is unavailable, WebChat fails fast and surfaces the error in the UI. + +## Remote use +- SSH tunnel example: `ssh -N -L 18788:127.0.0.1:18788 -L 18789:127.0.0.1:18789 user@host`. +- Browse to `http://127.0.0.1:18788/webchat/?session=` through the tunnel; the backend WS also rides the tunnel. + +## Config +- `webchat.enabled` (default true) +- `webchat.port` (default 18788) +- Gateway WS port is set by `clawdis gateway --port`; WebChat expects it at 18789 unless overridden. ## Failure handling -- Bootstrap errors show in-app (“Web chat failed to connect …”) instead of hanging. -- The mac app logs tunnel and endpoint details to the `com.steipete.clawdis/WebChat` subsystem. +- Clear UI error when the Gateway handshake fails or the WS drops. +- WebChat does not attempt fallback transports; the Gateway WS is required. ## Dev notes -- Static assets stay in `apps/macos/Sources/Clawdis/Resources/WebChat`; the server reads them directly. -- Server code: `src/webchat/server.ts`. -- CLI entrypoint (optional): `clawdis webchat --json [--port N]` to query/start manually. -- RPC send path is in-process; the relay does not spawn `clawdis` or rely on PATH. -- Mac glue: `WebChatWindow.swift` (bootstrap + tunnel) and `WebChatTunnel` (SSH -L). - -## TODO / nice-to-haves -- Enforce token by default once mobile/remote auth flows are in place. -- Stream responses instead of one-shot payloads. -- Expose a readiness endpoint for health checks. +- Assets live in `apps/macos/Sources/Clawdis/Resources/WebChat`. +- Server implementation: `src/webchat/server.ts`. +- macOS glue: `WebChatWindow.swift` + `WebChatTunnel` for SSH -L helpers. diff --git a/src/webchat/server.test.ts b/src/webchat/server.test.ts new file mode 100644 index 000000000..6e4148beb --- /dev/null +++ b/src/webchat/server.test.ts @@ -0,0 +1,91 @@ +import { describe, expect, test } from "vitest"; +import { WebSocket } from "ws"; +import { + startWebChatServer, + stopWebChatServer, + __forceWebChatSnapshotForTests, + __broadcastGatewayEventForTests, +} from "./server.js"; + +async function getFreePort(): Promise { + const { createServer } = await import("node:net"); + return await new Promise((resolve, reject) => { + const server = createServer(); + server.listen(0, "127.0.0.1", () => { + const port = (server.address() as any).port as number; + server.close((err: Error | null) => (err ? reject(err) : resolve(port))); + }); + }); +} + +function onceMessage(ws: WebSocket, filter: (obj: any) => boolean, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error("timeout")), timeoutMs); + const closeHandler = (code: number, reason: Buffer) => { + clearTimeout(timer); + ws.off("message", handler); + reject(new Error(`closed ${code}: ${reason.toString()}`)); + }; + const handler = (data: WebSocket.RawData) => { + const obj = JSON.parse(String(data)); + if (filter(obj)) { + clearTimeout(timer); + ws.off("message", handler); + ws.off("close", closeHandler); + resolve(obj as T); + } + }; + ws.on("message", handler); + ws.once("close", closeHandler); + }); +} + +describe("webchat server", () => { + test("hydrates snapshot to new sockets (offline mock)", { timeout: 8000 }, async () => { + const wPort = await getFreePort(); + await startWebChatServer(wPort, undefined, { disableGateway: true }); + const ws = new WebSocket(`ws://127.0.0.1:${wPort}/webchat/socket?session=test`); + const messages: any[] = []; + ws.on("message", (data) => { + try { + messages.push(JSON.parse(String(data))); + } catch { + /* ignore */ + } + }); + + try { + await new Promise((resolve) => ws.once("open", resolve)); + + __forceWebChatSnapshotForTests({ + presence: [], + health: {}, + stateVersion: { presence: 1, health: 1 }, + uptimeMs: 0, + }); + + const waitFor = async (pred: (m: any) => boolean, label: string) => { + const start = Date.now(); + while (Date.now() - start < 3000) { + const found = messages.find((m) => { + try { + return pred(m); + } catch { + return false; + } + }); + if (found) return found; + await new Promise((resolve) => setTimeout(resolve, 10)); + } + throw new Error(`timeout waiting for ${label}`); + }; + + await waitFor((m) => m?.type === "session", "session"); + const snap = await waitFor((m) => m?.type === "gateway-snapshot", "snapshot"); + expect(snap.snapshot?.stateVersion?.presence).toBe(1); + } finally { + ws.close(); + await stopWebChatServer(); + } + }); +}); diff --git a/src/webchat/server.ts b/src/webchat/server.ts index b95ec9ddc..1020bd58b 100644 --- a/src/webchat/server.ts +++ b/src/webchat/server.ts @@ -1,13 +1,10 @@ -import crypto from "node:crypto"; import fs from "node:fs"; import http from "node:http"; import os from "node:os"; import path from "node:path"; import { fileURLToPath } from "node:url"; -import sharp from "sharp"; import { type WebSocket, WebSocketServer } from "ws"; -import { agentCommand } from "../commands/agent.js"; import { loadConfig } from "../config/config.js"; import { loadSessionStore, @@ -15,7 +12,8 @@ import { type SessionEntry, } from "../config/sessions.js"; import { logDebug, logError } from "../logger.js"; -import type { RuntimeEnv } from "../runtime.js"; +import { GatewayClient } from "../gateway/client.js"; +import { randomUUID } from "node:crypto"; const WEBCHAT_DEFAULT_PORT = 18788; @@ -25,17 +23,15 @@ type WebChatServerState = { }; type ChatMessage = { role: string; content: string }; -type AttachmentInput = { - content?: string; - mimeType?: string; - fileName?: string; - type?: string; -}; type RpcPayload = { role: string; content: string }; let state: WebChatServerState | null = null; let wss: WebSocketServer | null = null; const wsSessions: Map> = new Map(); +let gateway: GatewayClient | null = null; +let gatewayReady = false; +let latestSnapshot: Record | null = null; +let latestPolicy: Record | null = null; function resolveWebRoot() { const here = path.dirname(fileURLToPath(import.meta.url)); @@ -129,186 +125,66 @@ function broadcastSession(sessionKey: string, payload: unknown) { } } -async function persistAttachments( - attachments: AttachmentInput[], - sessionId: string, -): Promise<{ placeholder: string; path: string }[]> { - const out: { placeholder: string; path: string }[] = []; - if (!attachments?.length) return out; - - const root = path.join( - os.homedir(), - ".clawdis", - "webchat-uploads", - sessionId, - ); - await fs.promises.mkdir(root, { recursive: true }); - - let idx = 1; - for (const att of attachments) { - try { - if (!att?.content || typeof att.content !== "string") continue; - const mime = - typeof att.mimeType === "string" - ? att.mimeType - : "application/octet-stream"; - const baseName = att.fileName || `${att.type || "attachment"}-${idx}`; - const ext = mime.startsWith("image/") - ? mime.split("/")[1] || "bin" - : "bin"; - const fileName = `${baseName}.${ext}`.replace(/[^a-zA-Z0-9._-]/g, "_"); - const buf = Buffer.from(att.content, "base64"); - - let finalBuf: Buffer = buf; - let meta: { width?: number; height?: number } = {}; - - if (att.type === "image") { - const image = sharp(buf, { failOn: "none" }); - meta = await image.metadata(); - const needsResize = - (meta.width && meta.width > 2000) || - (meta.height && meta.height > 2000); - if (needsResize) { - const resized = await image - .resize({ width: 2000, height: 2000, fit: "inside" }) - .toBuffer({ resolveWithObject: true }); - finalBuf = resized.data as Buffer; - meta = { width: resized.info.width, height: resized.info.height }; - } +function broadcastAll(payload: unknown) { + const msg = JSON.stringify(payload); + for (const [, conns] of wsSessions) { + for (const ws of conns) { + try { + ws.send(msg); + } catch { + // ignore } - - if (finalBuf.length > 6 * 1024 * 1024) { - out.push({ - placeholder: `[Attachment too large: ${baseName} (${(finalBuf.length / 1024 / 1024).toFixed(1)} MB)]`, - path: "", - }); - idx += 1; - continue; - } - - const dest = path.join(root, fileName); - await fs.promises.writeFile(dest, finalBuf); - - const sizeLabel = `${(finalBuf.length / 1024).toFixed(0)} KB`; - const dimLabel = - meta?.width && meta?.height ? `, ${meta.width}x${meta.height}` : ""; - const placeholder = `[Attachment saved: ${dest} (${mime}${dimLabel}, ${sizeLabel})]`; - out.push({ placeholder, path: dest }); - } catch (err) { - out.push({ placeholder: `[Attachment error: ${String(err)}]`, path: "" }); } - idx += 1; } - - return out; -} - -function formatMessageWithAttachments( - text: string, - saved: { placeholder: string }[], -): string { - if (!saved || saved.length === 0) return text; - const parts = [text, ...saved.map((s) => `\n\n${s.placeholder}`)]; - return parts.join(""); } async function handleRpc( body: unknown, - sessionKey: string, ): Promise<{ ok: boolean; payloads?: RpcPayload[]; error?: string }> { const payload = body as { text?: unknown; - attachments?: unknown; thinking?: unknown; deliver?: unknown; to?: unknown; + timeout?: unknown; }; const text: string = (payload.text ?? "").toString(); if (!text.trim()) return { ok: false, error: "empty text" }; - const attachments = Array.isArray(payload.attachments) - ? (payload.attachments as AttachmentInput[]) - : []; + if (!gateway || !gatewayReady) { + return { ok: false, error: "gateway unavailable" }; + } + const thinking = typeof payload.thinking === "string" ? payload.thinking : undefined; const to = typeof payload.to === "string" ? payload.to : undefined; const deliver = Boolean(payload.deliver); + const timeout = + typeof payload.timeout === "number" ? payload.timeout : undefined; - const cfg = loadConfig(); - const replyCfg = cfg.inbound?.reply; - if (!replyCfg || replyCfg.mode !== "command") { - return { ok: false, error: "agent command mode not configured" }; - } - - const storePath = replyCfg.session?.store - ? resolveStorePath(replyCfg.session.store) - : resolveStorePath(undefined); - const store = loadSessionStore(storePath); - const sessionId = pickSessionId(sessionKey, store) ?? crypto.randomUUID(); - - const logs: string[] = []; - const runtime: RuntimeEnv = { - log: (msg: string) => void logs.push(String(msg)), - error: (_msg: string) => {}, - exit: (code: number) => { - throw new Error(`agent exited ${code}`); - }, - }; - + const idempotencyKey = randomUUID(); try { - const savedAttachments = await persistAttachments(attachments, sessionId); - - await agentCommand( + // Send agent request; wait for final res (status ok/error) + const res = (await gateway.request( + "agent", { - message: formatMessageWithAttachments(text, savedAttachments), - sessionId, + message: text, thinking, deliver, to, - json: true, - surface: "webchat", + timeout, + idempotencyKey, }, - runtime, - ); + { expectFinal: true }, + )) as { status?: string; summary?: string }; + if (res?.status && res.status !== "ok") { + return { ok: false, error: res.summary || res.status }; + } + // The actual agent output is delivered via events; HTTP just returns ack. + return { ok: true, payloads: [] }; } catch (err) { return { ok: false, error: String(err) }; } - - // Push latest session state to any connected webchat clients for this sessionKey. - try { - const cfg = loadConfig(); - const sessionCfg = cfg.inbound?.reply?.session; - const storePath = sessionCfg?.store - ? resolveStorePath(sessionCfg.store) - : resolveStorePath(undefined); - const store = loadSessionStore(storePath); - const persistedSessionId = pickSessionId(sessionKey, store) ?? sessionId; - const messages = persistedSessionId - ? readSessionMessages(persistedSessionId, storePath) - : []; - const sessionEntry = sessionKey ? store[sessionKey] : undefined; - const persistedThinking = sessionEntry?.thinkingLevel; - broadcastSession(sessionKey, { - type: "session", - sessionKey, - messages, - thinkingLevel: - typeof persistedThinking === "string" - ? persistedThinking - : (cfg.inbound?.reply?.thinkingDefault ?? "off"), - }); - } catch { - // best-effort; ignore broadcast errors - } - - const jsonLine = logs.find((l) => l.trim().startsWith("{")); - if (!jsonLine) return { ok: false, error: "no agent output" }; - try { - const parsed = JSON.parse(jsonLine); - return { ok: true, payloads: parsed.payloads ?? [] }; - } catch (err) { - return { ok: false, error: `parse error: ${String(err)}` }; - } } function notFound(res: http.ServerResponse) { @@ -316,7 +192,11 @@ function notFound(res: http.ServerResponse) { res.end("Not Found"); } -export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { +export async function startWebChatServer( + port = WEBCHAT_DEFAULT_PORT, + gatewayOverrideUrl?: string, + opts?: { disableGateway?: boolean }, +) { if (state) return state; const root = resolveWebRoot(); @@ -359,6 +239,9 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { sessionId, initialMessages: messages, basePath: "/", + gatewayConnected: gatewayReady, + gatewaySnapshot: latestSnapshot, + gatewayPolicy: latestPolicy, }), ); return; @@ -372,9 +255,7 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { } catch { // ignore } - const sessionKey = - typeof body.session === "string" ? body.session : "main"; - const result = await handleRpc(body, sessionKey); + const result = await handleRpc(body); res.setHeader("Content-Type", "application/json"); res.end(JSON.stringify(result)); return; @@ -434,6 +315,58 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { ); }); + // Gateway connection (control/data plane) + const cfgObj = loadConfig() as Record; + if (!opts?.disableGateway) { + const cfgGatewayPort = + (cfgObj.webchat as { gatewayPort?: number } | undefined)?.gatewayPort ?? + 18789; + const gatewayUrl = gatewayOverrideUrl ?? `ws://127.0.0.1:${cfgGatewayPort}`; + const gatewayToken = + process.env.CLAWDIS_GATEWAY_TOKEN ?? + (cfgObj.gateway as { token?: string } | undefined)?.token; + gateway = new GatewayClient({ + url: gatewayUrl, + token: gatewayToken, + clientName: "webchat-backend", + clientVersion: + process.env.CLAWDIS_VERSION ?? process.env.npm_package_version ?? "dev", + platform: process.platform, + mode: "webchat", + instanceId: `webchat-${os.hostname()}`, + onHelloOk: (hello) => { + gatewayReady = true; + latestSnapshot = hello.snapshot as Record; + latestPolicy = hello.policy as Record; + broadcastAll({ type: "gateway-snapshot", snapshot: hello.snapshot, policy: hello.policy }); + }, + onEvent: (evt) => { + broadcastAll({ type: "gateway-event", event: evt.event, payload: evt.payload, seq: evt.seq, stateVersion: evt.stateVersion }); + }, + onClose: () => { + gatewayReady = false; + }, + onGap: async () => { + if (!gatewayReady || !gateway) return; + try { + const [health, presence] = await Promise.all([ + gateway.request("health"), + gateway.request("system-presence"), + ]); + latestSnapshot = { + ...latestSnapshot, + health, + presence, + } as Record; + broadcastAll({ type: "gateway-refresh", health, presence }); + } catch (err) { + logError(`webchat gap refresh failed: ${String(err)}`); + } + }, + }); + gateway.start(); + } + // WebSocket setup for live session updates. wss = new WebSocketServer({ noServer: true }); server.on("upgrade", (req, socket, head) => { @@ -443,10 +376,13 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { socket.destroy(); return; } - if ( - req.socket.remoteAddress && - !req.socket.remoteAddress.startsWith("127.") - ) { + const addr = req.socket.remoteAddress ?? ""; + const isLocal = + addr.startsWith("127.") || + addr === "::1" || + addr.endsWith("127.0.0.1") || + addr.endsWith("::ffff:127.0.0.1"); + if (!isLocal) { socket.destroy(); return; } @@ -486,6 +422,15 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { : (cfg.inbound?.reply?.thinkingDefault ?? "off"), }), ); + if (latestSnapshot) { + ws.send( + JSON.stringify({ + type: "gateway-snapshot", + snapshot: latestSnapshot, + policy: latestPolicy, + }), + ); + } }); } catch (_err) { socket.destroy(); @@ -531,6 +476,54 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) { return state; } +export async function stopWebChatServer() { + if (!state) return; + gatewayReady = false; + gateway?.stop(); + gateway = null; + if (wss) { + for (const client of wss.clients) { + try { + client.close(); + } catch { + /* ignore */ + } + } + await new Promise((resolve) => wss?.close(() => resolve())); + } + if (state.server) { + await new Promise((resolve) => state?.server.close(() => resolve())); + } + wss = null; + wsSessions.clear(); + state = null; +} + +export async function waitForWebChatGatewayReady(timeoutMs = 10000) { + const start = Date.now(); + while (!latestSnapshot) { + if (Date.now() - start > timeoutMs) { + throw new Error("webchat gateway not ready"); + } + await new Promise((resolve) => setTimeout(resolve, 50)); + } +} + +// Test-only helpers to seed/broadcast without a live Gateway connection. +export function __forceWebChatSnapshotForTests( + snapshot: Record, + policy?: Record, +) { + latestSnapshot = snapshot; + latestPolicy = policy ?? null; + gatewayReady = true; + broadcastAll({ type: "gateway-snapshot", snapshot: latestSnapshot, policy: latestPolicy }); +} + +export function __broadcastGatewayEventForTests(event: string, payload: unknown) { + broadcastAll({ type: "gateway-event", event, payload }); +} + export async function ensureWebChatServerFromConfig() { const cfg = loadConfig(); if (cfg.webchat?.enabled === false) return null;