WebChat: loopback snapshot hydration

This commit is contained in:
Peter Steinberger
2025-12-09 14:41:55 +01:00
parent b2e7fb01a9
commit 2746efeb25
3 changed files with 283 additions and 193 deletions

View File

@@ -1,13 +1,10 @@
import crypto from "node:crypto";
import fs from "node:fs";
import http from "node:http";
import os from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";
import sharp from "sharp";
import { type WebSocket, WebSocketServer } from "ws";
import { agentCommand } from "../commands/agent.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
@@ -15,7 +12,8 @@ import {
type SessionEntry,
} from "../config/sessions.js";
import { logDebug, logError } from "../logger.js";
import type { RuntimeEnv } from "../runtime.js";
import { GatewayClient } from "../gateway/client.js";
import { randomUUID } from "node:crypto";
const WEBCHAT_DEFAULT_PORT = 18788;
@@ -25,17 +23,15 @@ type WebChatServerState = {
};
type ChatMessage = { role: string; content: string };
type AttachmentInput = {
content?: string;
mimeType?: string;
fileName?: string;
type?: string;
};
type RpcPayload = { role: string; content: string };
let state: WebChatServerState | null = null;
let wss: WebSocketServer | null = null;
const wsSessions: Map<string, Set<WebSocket>> = new Map();
let gateway: GatewayClient | null = null;
let gatewayReady = false;
let latestSnapshot: Record<string, unknown> | null = null;
let latestPolicy: Record<string, unknown> | null = null;
function resolveWebRoot() {
const here = path.dirname(fileURLToPath(import.meta.url));
@@ -129,186 +125,66 @@ function broadcastSession(sessionKey: string, payload: unknown) {
}
}
async function persistAttachments(
attachments: AttachmentInput[],
sessionId: string,
): Promise<{ placeholder: string; path: string }[]> {
const out: { placeholder: string; path: string }[] = [];
if (!attachments?.length) return out;
const root = path.join(
os.homedir(),
".clawdis",
"webchat-uploads",
sessionId,
);
await fs.promises.mkdir(root, { recursive: true });
let idx = 1;
for (const att of attachments) {
try {
if (!att?.content || typeof att.content !== "string") continue;
const mime =
typeof att.mimeType === "string"
? att.mimeType
: "application/octet-stream";
const baseName = att.fileName || `${att.type || "attachment"}-${idx}`;
const ext = mime.startsWith("image/")
? mime.split("/")[1] || "bin"
: "bin";
const fileName = `${baseName}.${ext}`.replace(/[^a-zA-Z0-9._-]/g, "_");
const buf = Buffer.from(att.content, "base64");
let finalBuf: Buffer = buf;
let meta: { width?: number; height?: number } = {};
if (att.type === "image") {
const image = sharp(buf, { failOn: "none" });
meta = await image.metadata();
const needsResize =
(meta.width && meta.width > 2000) ||
(meta.height && meta.height > 2000);
if (needsResize) {
const resized = await image
.resize({ width: 2000, height: 2000, fit: "inside" })
.toBuffer({ resolveWithObject: true });
finalBuf = resized.data as Buffer;
meta = { width: resized.info.width, height: resized.info.height };
}
function broadcastAll(payload: unknown) {
const msg = JSON.stringify(payload);
for (const [, conns] of wsSessions) {
for (const ws of conns) {
try {
ws.send(msg);
} catch {
// ignore
}
if (finalBuf.length > 6 * 1024 * 1024) {
out.push({
placeholder: `[Attachment too large: ${baseName} (${(finalBuf.length / 1024 / 1024).toFixed(1)} MB)]`,
path: "",
});
idx += 1;
continue;
}
const dest = path.join(root, fileName);
await fs.promises.writeFile(dest, finalBuf);
const sizeLabel = `${(finalBuf.length / 1024).toFixed(0)} KB`;
const dimLabel =
meta?.width && meta?.height ? `, ${meta.width}x${meta.height}` : "";
const placeholder = `[Attachment saved: ${dest} (${mime}${dimLabel}, ${sizeLabel})]`;
out.push({ placeholder, path: dest });
} catch (err) {
out.push({ placeholder: `[Attachment error: ${String(err)}]`, path: "" });
}
idx += 1;
}
return out;
}
function formatMessageWithAttachments(
text: string,
saved: { placeholder: string }[],
): string {
if (!saved || saved.length === 0) return text;
const parts = [text, ...saved.map((s) => `\n\n${s.placeholder}`)];
return parts.join("");
}
async function handleRpc(
body: unknown,
sessionKey: string,
): Promise<{ ok: boolean; payloads?: RpcPayload[]; error?: string }> {
const payload = body as {
text?: unknown;
attachments?: unknown;
thinking?: unknown;
deliver?: unknown;
to?: unknown;
timeout?: unknown;
};
const text: string = (payload.text ?? "").toString();
if (!text.trim()) return { ok: false, error: "empty text" };
const attachments = Array.isArray(payload.attachments)
? (payload.attachments as AttachmentInput[])
: [];
if (!gateway || !gatewayReady) {
return { ok: false, error: "gateway unavailable" };
}
const thinking =
typeof payload.thinking === "string" ? payload.thinking : undefined;
const to = typeof payload.to === "string" ? payload.to : undefined;
const deliver = Boolean(payload.deliver);
const timeout =
typeof payload.timeout === "number" ? payload.timeout : undefined;
const cfg = loadConfig();
const replyCfg = cfg.inbound?.reply;
if (!replyCfg || replyCfg.mode !== "command") {
return { ok: false, error: "agent command mode not configured" };
}
const storePath = replyCfg.session?.store
? resolveStorePath(replyCfg.session.store)
: resolveStorePath(undefined);
const store = loadSessionStore(storePath);
const sessionId = pickSessionId(sessionKey, store) ?? crypto.randomUUID();
const logs: string[] = [];
const runtime: RuntimeEnv = {
log: (msg: string) => void logs.push(String(msg)),
error: (_msg: string) => {},
exit: (code: number) => {
throw new Error(`agent exited ${code}`);
},
};
const idempotencyKey = randomUUID();
try {
const savedAttachments = await persistAttachments(attachments, sessionId);
await agentCommand(
// Send agent request; wait for final res (status ok/error)
const res = (await gateway.request(
"agent",
{
message: formatMessageWithAttachments(text, savedAttachments),
sessionId,
message: text,
thinking,
deliver,
to,
json: true,
surface: "webchat",
timeout,
idempotencyKey,
},
runtime,
);
{ expectFinal: true },
)) as { status?: string; summary?: string };
if (res?.status && res.status !== "ok") {
return { ok: false, error: res.summary || res.status };
}
// The actual agent output is delivered via events; HTTP just returns ack.
return { ok: true, payloads: [] };
} catch (err) {
return { ok: false, error: String(err) };
}
// Push latest session state to any connected webchat clients for this sessionKey.
try {
const cfg = loadConfig();
const sessionCfg = cfg.inbound?.reply?.session;
const storePath = sessionCfg?.store
? resolveStorePath(sessionCfg.store)
: resolveStorePath(undefined);
const store = loadSessionStore(storePath);
const persistedSessionId = pickSessionId(sessionKey, store) ?? sessionId;
const messages = persistedSessionId
? readSessionMessages(persistedSessionId, storePath)
: [];
const sessionEntry = sessionKey ? store[sessionKey] : undefined;
const persistedThinking = sessionEntry?.thinkingLevel;
broadcastSession(sessionKey, {
type: "session",
sessionKey,
messages,
thinkingLevel:
typeof persistedThinking === "string"
? persistedThinking
: (cfg.inbound?.reply?.thinkingDefault ?? "off"),
});
} catch {
// best-effort; ignore broadcast errors
}
const jsonLine = logs.find((l) => l.trim().startsWith("{"));
if (!jsonLine) return { ok: false, error: "no agent output" };
try {
const parsed = JSON.parse(jsonLine);
return { ok: true, payloads: parsed.payloads ?? [] };
} catch (err) {
return { ok: false, error: `parse error: ${String(err)}` };
}
}
function notFound(res: http.ServerResponse) {
@@ -316,7 +192,11 @@ function notFound(res: http.ServerResponse) {
res.end("Not Found");
}
export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
export async function startWebChatServer(
port = WEBCHAT_DEFAULT_PORT,
gatewayOverrideUrl?: string,
opts?: { disableGateway?: boolean },
) {
if (state) return state;
const root = resolveWebRoot();
@@ -359,6 +239,9 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
sessionId,
initialMessages: messages,
basePath: "/",
gatewayConnected: gatewayReady,
gatewaySnapshot: latestSnapshot,
gatewayPolicy: latestPolicy,
}),
);
return;
@@ -372,9 +255,7 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
} catch {
// ignore
}
const sessionKey =
typeof body.session === "string" ? body.session : "main";
const result = await handleRpc(body, sessionKey);
const result = await handleRpc(body);
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify(result));
return;
@@ -434,6 +315,58 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
);
});
// Gateway connection (control/data plane)
const cfgObj = loadConfig() as Record<string, unknown>;
if (!opts?.disableGateway) {
const cfgGatewayPort =
(cfgObj.webchat as { gatewayPort?: number } | undefined)?.gatewayPort ??
18789;
const gatewayUrl = gatewayOverrideUrl ?? `ws://127.0.0.1:${cfgGatewayPort}`;
const gatewayToken =
process.env.CLAWDIS_GATEWAY_TOKEN ??
(cfgObj.gateway as { token?: string } | undefined)?.token;
gateway = new GatewayClient({
url: gatewayUrl,
token: gatewayToken,
clientName: "webchat-backend",
clientVersion:
process.env.CLAWDIS_VERSION ?? process.env.npm_package_version ?? "dev",
platform: process.platform,
mode: "webchat",
instanceId: `webchat-${os.hostname()}`,
onHelloOk: (hello) => {
gatewayReady = true;
latestSnapshot = hello.snapshot as Record<string, unknown>;
latestPolicy = hello.policy as Record<string, unknown>;
broadcastAll({ type: "gateway-snapshot", snapshot: hello.snapshot, policy: hello.policy });
},
onEvent: (evt) => {
broadcastAll({ type: "gateway-event", event: evt.event, payload: evt.payload, seq: evt.seq, stateVersion: evt.stateVersion });
},
onClose: () => {
gatewayReady = false;
},
onGap: async () => {
if (!gatewayReady || !gateway) return;
try {
const [health, presence] = await Promise.all([
gateway.request("health"),
gateway.request("system-presence"),
]);
latestSnapshot = {
...latestSnapshot,
health,
presence,
} as Record<string, unknown>;
broadcastAll({ type: "gateway-refresh", health, presence });
} catch (err) {
logError(`webchat gap refresh failed: ${String(err)}`);
}
},
});
gateway.start();
}
// WebSocket setup for live session updates.
wss = new WebSocketServer({ noServer: true });
server.on("upgrade", (req, socket, head) => {
@@ -443,10 +376,13 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
socket.destroy();
return;
}
if (
req.socket.remoteAddress &&
!req.socket.remoteAddress.startsWith("127.")
) {
const addr = req.socket.remoteAddress ?? "";
const isLocal =
addr.startsWith("127.") ||
addr === "::1" ||
addr.endsWith("127.0.0.1") ||
addr.endsWith("::ffff:127.0.0.1");
if (!isLocal) {
socket.destroy();
return;
}
@@ -486,6 +422,15 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
: (cfg.inbound?.reply?.thinkingDefault ?? "off"),
}),
);
if (latestSnapshot) {
ws.send(
JSON.stringify({
type: "gateway-snapshot",
snapshot: latestSnapshot,
policy: latestPolicy,
}),
);
}
});
} catch (_err) {
socket.destroy();
@@ -531,6 +476,54 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
return state;
}
export async function stopWebChatServer() {
if (!state) return;
gatewayReady = false;
gateway?.stop();
gateway = null;
if (wss) {
for (const client of wss.clients) {
try {
client.close();
} catch {
/* ignore */
}
}
await new Promise<void>((resolve) => wss?.close(() => resolve()));
}
if (state.server) {
await new Promise<void>((resolve) => state?.server.close(() => resolve()));
}
wss = null;
wsSessions.clear();
state = null;
}
export async function waitForWebChatGatewayReady(timeoutMs = 10000) {
const start = Date.now();
while (!latestSnapshot) {
if (Date.now() - start > timeoutMs) {
throw new Error("webchat gateway not ready");
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
}
// Test-only helpers to seed/broadcast without a live Gateway connection.
export function __forceWebChatSnapshotForTests(
snapshot: Record<string, unknown>,
policy?: Record<string, unknown>,
) {
latestSnapshot = snapshot;
latestPolicy = policy ?? null;
gatewayReady = true;
broadcastAll({ type: "gateway-snapshot", snapshot: latestSnapshot, policy: latestPolicy });
}
export function __broadcastGatewayEventForTests(event: string, payload: unknown) {
broadcastAll({ type: "gateway-event", event, payload });
}
export async function ensureWebChatServerFromConfig() {
const cfg = loadConfig();
if (cfg.webchat?.enabled === false) return null;