Webchat: push updates over WebSocket

This commit is contained in:
Peter Steinberger
2025-12-08 16:19:25 +00:00
parent 421924b73f
commit 7144a0fb9b
4 changed files with 232 additions and 61 deletions

View File

@@ -198,42 +198,67 @@ const startChat = async () => {
mount.appendChild(panel);
logStatus("boot: ready");
// Periodically sync messages + thinking from the session store so webchat stays
// in lock-step with other transports (e.g., WhatsApp or CLI).
// Live sync via WebSocket so other transports (WhatsApp/CLI) appear instantly.
let lastSyncedTs = latestTimestamp(initialMessages);
const syncIntervalMs = 3000;
async function syncFromSession() {
if (agent.state.isStreaming) return; // avoid stomping in-flight turns
try {
const infoUrl = new URL(`./info?session=${encodeURIComponent(sessionKey)}`, window.location.href);
const resp = await fetch(infoUrl, { credentials: "omit" });
if (!resp.ok) return;
const info = await resp.json();
const messages = Array.isArray(info.initialMessages) ? info.initialMessages : [];
const ts = latestTimestamp(messages);
const thinking = typeof info.thinkingLevel === "string" ? info.thinkingLevel : "off";
let ws;
let reconnectTimer;
if (ts && ts !== lastSyncedTs) {
agent.replaceMessages(messages);
lastSyncedTs = ts;
}
const applySnapshot = (info) => {
const messages = Array.isArray(info?.messages) ? info.messages : [];
const ts = latestTimestamp(messages);
const thinking = typeof info?.thinkingLevel === "string" ? info.thinkingLevel : "off";
if (thinking && thinking !== agent.state.thinkingLevel) {
agent.setThinkingLevel(thinking);
if (panel?.agentInterface) {
panel.agentInterface.sessionThinkingLevel = thinking;
panel.agentInterface.pendingThinkingLevel = null;
if (panel.agentInterface._messageEditor) {
panel.agentInterface._messageEditor.thinkingLevel = thinking;
}
if (!agent.state.isStreaming && ts && ts !== lastSyncedTs) {
agent.replaceMessages(messages);
lastSyncedTs = ts;
}
if (thinking && thinking !== agent.state.thinkingLevel) {
agent.setThinkingLevel(thinking);
if (panel?.agentInterface) {
panel.agentInterface.sessionThinkingLevel = thinking;
panel.agentInterface.pendingThinkingLevel = null;
if (panel.agentInterface._messageEditor) {
panel.agentInterface._messageEditor.thinkingLevel = thinking;
}
}
} catch (err) {
console.warn("session sync failed", err);
}
}
};
setInterval(syncFromSession, syncIntervalMs);
const connectSocket = () => {
try {
const wsUrl = new URL(`./socket?session=${encodeURIComponent(sessionKey)}`, window.location.href);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
ws = new WebSocket(wsUrl);
ws.onmessage = (ev) => {
try {
const data = JSON.parse(ev.data);
if (data?.type === "session") applySnapshot(data);
} catch (err) {
console.warn("ws message parse failed", err);
}
};
ws.onclose = () => {
ws = null;
if (!reconnectTimer) {
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
connectSocket();
}, 2000);
}
};
ws.onerror = () => {
ws?.close();
};
} catch (err) {
console.warn("ws connect failed", err);
}
};
connectSocket();
};
startChat().catch((err) => {

View File

@@ -196455,36 +196455,57 @@ const startChat = async () => {
mount.appendChild(panel);
logStatus("boot: ready");
let lastSyncedTs = latestTimestamp(initialMessages);
const syncIntervalMs = 3e3;
async function syncFromSession() {
if (agent.state.isStreaming) return;
try {
const infoUrl = new URL(`./info?session=${encodeURIComponent(sessionKey)}`, window.location.href);
const resp = await fetch(infoUrl, { credentials: "omit" });
if (!resp.ok) return;
const info$1 = await resp.json();
const messages = Array.isArray(info$1.initialMessages) ? info$1.initialMessages : [];
const ts = latestTimestamp(messages);
const thinking = typeof info$1.thinkingLevel === "string" ? info$1.thinkingLevel : "off";
if (ts && ts !== lastSyncedTs) {
agent.replaceMessages(messages);
lastSyncedTs = ts;
}
if (thinking && thinking !== agent.state.thinkingLevel) {
agent.setThinkingLevel(thinking);
if (panel?.agentInterface) {
panel.agentInterface.sessionThinkingLevel = thinking;
panel.agentInterface.pendingThinkingLevel = null;
if (panel.agentInterface._messageEditor) {
panel.agentInterface._messageEditor.thinkingLevel = thinking;
}
let ws;
let reconnectTimer;
const applySnapshot = (info$1) => {
const messages = Array.isArray(info$1?.messages) ? info$1.messages : [];
const ts = latestTimestamp(messages);
const thinking = typeof info$1?.thinkingLevel === "string" ? info$1.thinkingLevel : "off";
if (!agent.state.isStreaming && ts && ts !== lastSyncedTs) {
agent.replaceMessages(messages);
lastSyncedTs = ts;
}
if (thinking && thinking !== agent.state.thinkingLevel) {
agent.setThinkingLevel(thinking);
if (panel?.agentInterface) {
panel.agentInterface.sessionThinkingLevel = thinking;
panel.agentInterface.pendingThinkingLevel = null;
if (panel.agentInterface._messageEditor) {
panel.agentInterface._messageEditor.thinkingLevel = thinking;
}
}
} catch (err) {
console.warn("session sync failed", err);
}
}
setInterval(syncFromSession, syncIntervalMs);
};
const connectSocket = () => {
try {
const wsUrl = new URL(`./socket?session=${encodeURIComponent(sessionKey)}`, window.location.href);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
ws = new WebSocket(wsUrl);
ws.onmessage = (ev) => {
try {
const data = JSON.parse(ev.data);
if (data?.type === "session") applySnapshot(data);
} catch (err) {
console.warn("ws message parse failed", err);
}
};
ws.onclose = () => {
ws = null;
if (!reconnectTimer) {
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
connectSocket();
}, 2e3);
}
};
ws.onerror = () => {
ws?.close();
};
} catch (err) {
console.warn("ws connect failed", err);
}
};
connectSocket();
};
startChat().catch((err) => {
const msg = err?.stack || err?.message || String(err);

View File

@@ -45,6 +45,7 @@
"sharp": "^0.34.5",
"tslog": "^4.10.2",
"undici": "^7.16.0",
"ws": "^8.18.0",
"zod": "^4.1.13"
},
"devDependencies": {

View File

@@ -5,6 +5,7 @@ import os from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";
import sharp from "sharp";
import { WebSocketServer, WebSocket } from "ws";
import { agentCommand } from "../commands/agent.js";
import { loadConfig } from "../config/config.js";
@@ -33,6 +34,8 @@ type AttachmentInput = {
type RpcPayload = { role: string; content: string };
let state: WebChatServerState | null = null;
let wss: WebSocketServer | null = null;
const wsSessions: Map<string, Set<WebSocket>> = new Map();
function resolveWebRoot() {
const here = path.dirname(fileURLToPath(import.meta.url));
@@ -93,6 +96,19 @@ function readSessionMessages(sessionId: string, storePath: string): ChatMessage[
return messages;
}
function broadcastSession(sessionKey: string, payload: any) {
const conns = wsSessions.get(sessionKey);
if (!conns || conns.size === 0) return;
const msg = JSON.stringify(payload);
for (const ws of conns) {
try {
ws.send(msg);
} catch {
// ignore and let close handler prune
}
}
}
async function persistAttachments(
attachments: AttachmentInput[],
sessionId: string,
@@ -225,6 +241,33 @@ async function handleRpc(
return { ok: false, error: String(err) };
}
// Push latest session state to any connected webchat clients for this sessionKey.
try {
const cfg = loadConfig();
const sessionCfg = cfg.inbound?.reply?.session;
const storePath = sessionCfg?.store
? resolveStorePath(sessionCfg.store)
: resolveStorePath(undefined);
const store = loadSessionStore(storePath);
const persistedSessionId = pickSessionId(sessionKey, store) ?? sessionId;
const messages = persistedSessionId
? readSessionMessages(persistedSessionId, storePath)
: [];
const sessionEntry = sessionKey ? store[sessionKey] : undefined;
const persistedThinking = sessionEntry?.thinkingLevel;
broadcastSession(sessionKey, {
type: "session",
sessionKey,
messages,
thinkingLevel:
typeof persistedThinking === "string"
? persistedThinking
: cfg.inbound?.reply?.thinkingDefault ?? "off",
});
} catch {
// best-effort; ignore broadcast errors
}
const jsonLine = logs.find((l) => l.trim().startsWith("{"));
if (!jsonLine) return { ok: false, error: "no agent output" };
try {
@@ -244,6 +287,14 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
if (state) return state;
const root = resolveWebRoot();
// Precompute session store root for file watching
const cfg = loadConfig();
const sessionCfg = cfg.inbound?.reply?.session;
const storePath = sessionCfg?.store
? resolveStorePath(sessionCfg.store)
: resolveStorePath(undefined);
const storeDir = path.dirname(storePath);
const server = http.createServer(async (req, res) => {
if (!req.url) return notFound(res);
if (req.socket.remoteAddress && !req.socket.remoteAddress.startsWith("127.")) {
@@ -258,11 +309,6 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
if (isInfo) {
const sessionKey = url.searchParams.get("session") ?? "main";
const cfg = loadConfig();
const sessionCfg = cfg.inbound?.reply?.session;
const storePath = sessionCfg?.store
? resolveStorePath(sessionCfg.store)
: resolveStorePath(undefined);
const store = loadSessionStore(storePath);
const sessionId = pickSessionId(sessionKey, store);
const messages = sessionId
@@ -345,6 +391,84 @@ export async function startWebChatServer(port = WEBCHAT_DEFAULT_PORT) {
server.listen(port, "127.0.0.1", () => resolve());
});
// WebSocket setup for live session updates.
wss = new WebSocketServer({ noServer: true });
server.on("upgrade", (req, socket, head) => {
try {
const url = new URL(req.url ?? "", "http://127.0.0.1");
if (url.pathname !== "/webchat/socket" && url.pathname !== "/socket") {
socket.destroy();
return;
}
if (req.socket.remoteAddress && !req.socket.remoteAddress.startsWith("127.")) {
socket.destroy();
return;
}
const sessionKey = url.searchParams.get("session") ?? "main";
wss!.handleUpgrade(req, socket, head, (ws) => {
ws.on("close", () => {
const set = wsSessions.get(sessionKey);
if (set) {
set.delete(ws);
if (set.size === 0) wsSessions.delete(sessionKey);
}
});
wsSessions.set(sessionKey, (wsSessions.get(sessionKey) ?? new Set()).add(ws));
// Send initial snapshot
const store = loadSessionStore(storePath);
const sessionId = pickSessionId(sessionKey, store);
const sessionEntry = sessionKey ? store[sessionKey] : undefined;
const persistedThinking = sessionEntry?.thinkingLevel;
const messages = sessionId ? readSessionMessages(sessionId, storePath) : [];
ws.send(
JSON.stringify({
type: "session",
sessionKey,
messages,
thinkingLevel:
typeof persistedThinking === "string"
? persistedThinking
: cfg.inbound?.reply?.thinkingDefault ?? "off",
}),
);
});
} catch (err) {
socket.destroy();
}
});
// Watch for session/message file changes and push updates.
try {
if (fs.existsSync(storeDir)) {
fs.watch(storeDir, { persistent: false }, (event, filename) => {
if (!filename) return;
// On any file change, refresh for active sessions.
for (const sessionKey of wsSessions.keys()) {
try {
const store = loadSessionStore(storePath);
const sessionId = pickSessionId(sessionKey, store);
const sessionEntry = sessionKey ? store[sessionKey] : undefined;
const persistedThinking = sessionEntry?.thinkingLevel;
const messages = sessionId ? readSessionMessages(sessionId, storePath) : [];
broadcastSession(sessionKey, {
type: "session",
sessionKey,
messages,
thinkingLevel:
typeof persistedThinking === "string"
? persistedThinking
: cfg.inbound?.reply?.thinkingDefault ?? "off",
});
} catch {
// ignore
}
}
});
}
} catch {
// watcher is best-effort
}
state = { server, port };
logDebug(`webchat server listening on 127.0.0.1:${port}`);
return state;