feat: add ws chat attachments

This commit is contained in:
Peter Steinberger
2025-12-09 23:16:57 +01:00
parent e80e5b0801
commit 1dd5c97ae0
25 changed files with 987 additions and 882 deletions

View File

@@ -13,7 +13,9 @@ describe("formatAgentEnvelope", () => {
timestamp: ts,
body: "hello",
});
expect(body).toBe("[WebChat user1 mac-mini 10.0.0.5 2025-01-02 03:04] hello");
expect(body).toBe(
"[WebChat user1 mac-mini 10.0.0.5 2025-01-02 03:04] hello",
);
});
it("handles missing optional fields", () => {

View File

@@ -3,7 +3,7 @@ import crypto from "node:crypto";
import { lookupContextTokens } from "../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL } from "../agents/defaults.js";
import { resolveBundledPiBinary } from "../agents/pi-path.js";
import { loadConfig, type ClawdisConfig } from "../config/config.js";
import { type ClawdisConfig, loadConfig } from "../config/config.js";
import {
DEFAULT_IDLE_MINUTES,
DEFAULT_RESET_TRIGGER,

View File

@@ -14,12 +14,7 @@ import { defaultRuntime } from "../runtime.js";
import { VERSION } from "../version.js";
import { startWebChatServer } from "../webchat/server.js";
import { createDefaultDeps } from "./deps.js";
import {
forceFreePort,
listPortListeners,
PortProcess,
parseLsofOutput,
} from "./ports.js";
import { forceFreePort, listPortListeners } from "./ports.js";
export function buildProgram() {
const program = new Command();
@@ -217,7 +212,7 @@ Examples:
}
});
program
program;
const gateway = program
.command("gateway")
.description("Run the WebSocket Gateway")

View File

@@ -12,7 +12,7 @@ import {
type VerboseLevel,
} from "../auto-reply/thinking.js";
import { type CliDeps, createDefaultDeps } from "../cli/deps.js";
import { loadConfig, type ClawdisConfig } from "../config/config.js";
import { type ClawdisConfig, loadConfig } from "../config/config.js";
import {
DEFAULT_IDLE_MINUTES,
loadSessionStore,

View File

@@ -1,6 +1,3 @@
import fs from "node:fs";
import path from "node:path";
import { loadConfig } from "../config/config.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import { info } from "../globals.js";

View File

@@ -1,9 +1,9 @@
import type { CliDeps } from "../cli/deps.js";
import { listPortListeners } from "../cli/ports.js";
import { info, success } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
import { startGatewayServer } from "../gateway/server.js";
import { success } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
export async function sendCommand(
opts: {
@@ -91,7 +91,9 @@ export async function sendCommand(
}
runtime.log(
success(`✅ Sent via gateway. Message ID: ${result.messageId ?? "unknown"}`),
success(
`✅ Sent via gateway. Message ID: ${result.messageId ?? "unknown"}`,
),
);
if (opts.json) {
runtime.log(

View File

@@ -0,0 +1,51 @@
export type ChatAttachment = {
type?: string;
mimeType?: string;
fileName?: string;
content?: unknown;
};
export function buildMessageWithAttachments(
message: string,
attachments: ChatAttachment[] | undefined,
opts?: { maxBytes?: number },
): string {
const maxBytes = opts?.maxBytes ?? 2_000_000; // 2 MB
if (!attachments || attachments.length === 0) return message;
const blocks: string[] = [];
for (const [idx, att] of attachments.entries()) {
if (!att) continue;
const mime = att.mimeType ?? "";
const content = att.content;
const label = att.fileName || att.type || `attachment-${idx + 1}`;
if (typeof content !== "string") {
throw new Error(`attachment ${label}: content must be base64 string`);
}
if (!mime.startsWith("image/")) {
throw new Error(`attachment ${label}: only image/* supported`);
}
let sizeBytes = 0;
try {
sizeBytes = Buffer.from(content, "base64").byteLength;
} catch {
throw new Error(`attachment ${label}: invalid base64 content`);
}
if (sizeBytes <= 0 || sizeBytes > maxBytes) {
throw new Error(
`attachment ${label}: exceeds size limit (${sizeBytes} > ${maxBytes} bytes)`,
);
}
const safeLabel = label.replace(/\s+/g, "_");
const dataUrl = `![${safeLabel}](data:${mime};base64,${content})`;
blocks.push(dataUrl);
}
if (blocks.length === 0) return message;
const separator = message.trim().length > 0 ? "\n\n" : "";
return `${message}${separator}${blocks.join("\n\n")}`;
}

View File

@@ -3,6 +3,10 @@ import {
type AgentEvent,
AgentEventSchema,
AgentParamsSchema,
type ChatEvent,
ChatEventSchema,
ChatHistoryParamsSchema,
ChatSendParamsSchema,
ErrorCodes,
type ErrorShape,
ErrorShapeSchema,
@@ -51,6 +55,9 @@ export const validateRequestFrame =
ajv.compile<RequestFrame>(RequestFrameSchema);
export const validateSendParams = ajv.compile(SendParamsSchema);
export const validateAgentParams = ajv.compile(AgentParamsSchema);
export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema);
export const validateChatSendParams = ajv.compile(ChatSendParamsSchema);
export const validateChatEvent = ajv.compile(ChatEventSchema);
export function formatValidationErrors(
errors: ErrorObject[] | null | undefined,
@@ -72,8 +79,11 @@ export {
ErrorShapeSchema,
StateVersionSchema,
AgentEventSchema,
ChatEventSchema,
SendParamsSchema,
AgentParamsSchema,
ChatHistoryParamsSchema,
ChatSendParamsSchema,
TickEventSchema,
ShutdownEventSchema,
ProtocolSchemas,
@@ -95,6 +105,7 @@ export type {
ErrorShape,
StateVersion,
AgentEvent,
ChatEvent,
TickEvent,
ShutdownEvent,
};

View File

@@ -219,6 +219,45 @@ export const AgentParamsSchema = Type.Object(
{ additionalProperties: false },
);
// WebChat/WebSocket-native chat methods
export const ChatHistoryParamsSchema = Type.Object(
{
sessionKey: NonEmptyString,
},
{ additionalProperties: false },
);
export const ChatSendParamsSchema = Type.Object(
{
sessionKey: NonEmptyString,
message: NonEmptyString,
thinking: Type.Optional(Type.String()),
deliver: Type.Optional(Type.Boolean()),
attachments: Type.Optional(Type.Array(Type.Unknown())),
timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })),
idempotencyKey: NonEmptyString,
},
{ additionalProperties: false },
);
export const ChatEventSchema = Type.Object(
{
runId: NonEmptyString,
sessionKey: NonEmptyString,
seq: Type.Integer({ minimum: 0 }),
state: Type.Union([
Type.Literal("delta"),
Type.Literal("final"),
Type.Literal("error"),
]),
message: Type.Optional(Type.Unknown()),
errorMessage: Type.Optional(Type.String()),
usage: Type.Optional(Type.Unknown()),
stopReason: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
export const ProtocolSchemas: Record<string, TSchema> = {
Hello: HelloSchema,
HelloOk: HelloOkSchema,
@@ -234,6 +273,9 @@ export const ProtocolSchemas: Record<string, TSchema> = {
AgentEvent: AgentEventSchema,
SendParams: SendParamsSchema,
AgentParams: AgentParamsSchema,
ChatHistoryParams: ChatHistoryParamsSchema,
ChatSendParams: ChatSendParamsSchema,
ChatEvent: ChatEventSchema,
TickEvent: TickEventSchema,
ShutdownEvent: ShutdownEventSchema,
};
@@ -252,6 +294,7 @@ export type PresenceEntry = Static<typeof PresenceEntrySchema>;
export type ErrorShape = Static<typeof ErrorShapeSchema>;
export type StateVersion = Static<typeof StateVersionSchema>;
export type AgentEvent = Static<typeof AgentEventSchema>;
export type ChatEvent = Static<typeof ChatEventSchema>;
export type TickEvent = Static<typeof TickEventSchema>;
export type ShutdownEvent = Static<typeof ShutdownEventSchema>;

View File

@@ -544,6 +544,54 @@ describe("gateway server", () => {
await server.close();
});
test("chat.send accepts image attachment", { timeout: 12000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const reqId = "chat-img";
ws.send(
JSON.stringify({
type: "req",
id: reqId,
method: "chat.send",
params: {
sessionKey: "main",
message: "see image",
idempotencyKey: "idem-img",
attachments: [
{
type: "image",
mimeType: "image/png",
fileName: "dot.png",
content:
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/woAAn8B9FD5fHAAAAAASUVORK5CYII=",
},
],
},
}),
);
const res = await onceMessage(
ws,
(o) => o.type === "res" && o.id === reqId,
8000,
);
expect(res.ok).toBe(true);
expect(res.payload?.runId).toBeDefined();
ws.close();
await server.close();
});
test("presence includes client fingerprint", async () => {
const { server, ws } = await startServerWithClient();
ws.send(

View File

@@ -1,15 +1,23 @@
import chalk from "chalk";
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import chalk from "chalk";
import { type WebSocket, WebSocketServer } from "ws";
import { GatewayLockError, acquireGatewayLock } from "../infra/gateway-lock.js";
import { createDefaultDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { getHealthSnapshot } from "../commands/health.js";
import { getStatusSummary } from "../commands/status.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveStorePath,
type SessionEntry,
saveSessionStore,
} from "../config/sessions.js";
import { isVerbose } from "../globals.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { acquireGatewayLock, GatewayLockError } from "../infra/gateway-lock.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import {
listSystemPresence,
@@ -23,6 +31,7 @@ import { monitorTelegramProvider } from "../telegram/monitor.js";
import { sendMessageTelegram } from "../telegram/send.js";
import { sendMessageWhatsApp } from "../web/outbound.js";
import { ensureWebChatServerFromConfig } from "../webchat/server.js";
import { buildMessageWithAttachments } from "./chat-attachments.js";
import {
ErrorCodes,
type ErrorShape,
@@ -33,6 +42,8 @@ import {
type RequestFrame,
type Snapshot,
validateAgentParams,
validateChatHistoryParams,
validateChatSendParams,
validateHello,
validateRequestFrame,
validateSendParams,
@@ -51,9 +62,12 @@ const METHODS = [
"system-event",
"send",
"agent",
// WebChat WebSocket-native chat methods
"chat.history",
"chat.send",
];
const EVENTS = ["agent", "presence", "tick", "shutdown"];
const EVENTS = ["agent", "chat", "presence", "tick", "shutdown"];
export type GatewayServer = {
close: () => Promise<void>;
@@ -93,6 +107,9 @@ type DedupeEntry = {
error?: ErrorShape;
};
const dedupe = new Map<string, DedupeEntry>();
// Map runId -> sessionKey for chat events (WS WebChat clients).
const chatRunSessions = new Map<string, string>();
const chatRunBuffers = new Map<string, string[]>();
const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN;
@@ -103,12 +120,73 @@ function formatForLog(value: unknown): string {
? String(value)
: JSON.stringify(value);
if (!str) return "";
return str.length > LOG_VALUE_LIMIT ? `${str.slice(0, LOG_VALUE_LIMIT)}...` : str;
return str.length > LOG_VALUE_LIMIT
? `${str.slice(0, LOG_VALUE_LIMIT)}...`
: str;
} catch {
return String(value);
}
}
function readSessionMessages(
sessionId: string,
storePath: string | undefined,
): unknown[] {
const candidates: string[] = [];
if (storePath) {
const dir = path.dirname(storePath);
candidates.push(path.join(dir, `${sessionId}.jsonl`));
}
candidates.push(
path.join(os.homedir(), ".clawdis", "sessions", `${sessionId}.jsonl`),
);
candidates.push(
path.join(os.homedir(), ".pi", "agent", "sessions", `${sessionId}.jsonl`),
);
candidates.push(
path.join(
os.homedir(),
".tau",
"agent",
"sessions",
"clawdis",
`${sessionId}.jsonl`,
),
);
const filePath = candidates.find((p) => fs.existsSync(p));
if (!filePath) return [];
const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/);
const messages: unknown[] = [];
for (const line of lines) {
if (!line.trim()) continue;
try {
const parsed = JSON.parse(line);
// pi/tau logs either raw message or wrapper { message }
if (parsed?.message) {
messages.push(parsed.message);
} else if (parsed?.role && parsed?.content) {
messages.push(parsed);
}
} catch {
// ignore bad lines
}
}
return messages;
}
function loadSessionEntry(sessionKey: string) {
const cfg = loadConfig();
const sessionCfg = cfg.inbound?.reply?.session;
const storePath = sessionCfg?.store
? resolveStorePath(sessionCfg.store)
: resolveStorePath(undefined);
const store = loadSessionStore(storePath);
const entry = store[sessionKey];
return { cfg, storePath, store, entry };
}
function logWs(
direction: "in" | "out",
kind: string,
@@ -134,7 +212,9 @@ function logWs(
coloredMeta.push(`${chalk.dim(key)}=${formatForLog(value)}`);
}
}
const line = coloredMeta.length ? `${prefix} ${coloredMeta.join(" ")}` : prefix;
const line = coloredMeta.length
? `${prefix} ${coloredMeta.join(" ")}`
: prefix;
console.log(line);
}
@@ -143,7 +223,8 @@ function formatError(err: unknown): string {
if (typeof err === "string") return err;
const status = (err as { status?: unknown })?.status;
const code = (err as { code?: unknown })?.code;
if (status || code) return `status=${status ?? "unknown"} code=${code ?? "unknown"}`;
if (status || code)
return `status=${status ?? "unknown"} code=${code ?? "unknown"}`;
return JSON.stringify(err, null, 2);
}
@@ -287,6 +368,48 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
}
agentRunSeq.set(evt.runId, evt.seq);
broadcast("agent", evt);
const sessionKey = chatRunSessions.get(evt.runId);
if (sessionKey) {
// Map agent bus events to chat events for WS WebChat clients.
const base = {
runId: evt.runId,
sessionKey,
seq: evt.seq,
};
if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
const buf = chatRunBuffers.get(evt.runId) ?? [];
buf.push(evt.data.text);
chatRunBuffers.set(evt.runId, buf);
} else if (
evt.stream === "job" &&
typeof evt.data?.state === "string" &&
(evt.data.state === "done" || evt.data.state === "error")
) {
const text = chatRunBuffers.get(evt.runId)?.join("\n").trim() ?? "";
chatRunBuffers.delete(evt.runId);
if (evt.data.state === "done") {
broadcast("chat", {
...base,
state: "final",
message: text
? {
role: "assistant",
content: [{ type: "text", text }],
timestamp: Date.now(),
}
: undefined,
});
} else {
broadcast("chat", {
...base,
state: "error",
errorMessage: evt.data.error ? String(evt.data.error) : undefined,
});
}
chatRunSessions.delete(evt.runId);
}
}
});
wss.on("connection", (socket) => {
@@ -500,6 +623,163 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
respond(true, health, undefined);
break;
}
case "chat.history": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateChatHistoryParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`,
),
);
break;
}
const { sessionKey } = params as { sessionKey: string };
const { storePath, entry } = loadSessionEntry(sessionKey);
const sessionId = entry?.sessionId;
const messages =
sessionId && storePath
? readSessionMessages(sessionId, storePath)
: [];
const thinkingLevel =
entry?.thinkingLevel ??
loadConfig().inbound?.reply?.thinkingDefault ??
"off";
respond(true, { sessionKey, sessionId, messages, thinkingLevel });
break;
}
case "chat.send": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateChatSendParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`,
),
);
break;
}
const p = params as {
sessionKey: string;
message: string;
thinking?: string;
deliver?: boolean;
attachments?: Array<{
type?: string;
mimeType?: string;
fileName?: string;
content?: unknown;
}>;
timeoutMs?: number;
idempotencyKey: string;
};
const timeoutMs = Math.min(
Math.max(p.timeoutMs ?? 30_000, 0),
30_000,
);
const normalizedAttachments =
p.attachments?.map((a) => ({
type: typeof a?.type === "string" ? a.type : undefined,
mimeType:
typeof a?.mimeType === "string" ? a.mimeType : undefined,
fileName:
typeof a?.fileName === "string" ? a.fileName : undefined,
content:
typeof a?.content === "string"
? a.content
: ArrayBuffer.isView(a?.content)
? Buffer.from(a.content as ArrayBufferLike).toString(
"base64",
)
: undefined,
})) ?? [];
let messageWithAttachments = p.message;
if (normalizedAttachments.length > 0) {
try {
messageWithAttachments = buildMessageWithAttachments(
p.message,
normalizedAttachments,
{ maxBytes: 5_000_000 },
);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, String(err)),
);
break;
}
}
const { storePath, store, entry } = loadSessionEntry(p.sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
systemSent: entry?.systemSent,
};
if (store) {
store[p.sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, store);
}
}
chatRunSessions.set(sessionId, p.sessionKey);
const idem = p.idempotencyKey;
const cached = dedupe.get(`chat:${idem}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
break;
}
try {
await agentCommand(
{
message: messageWithAttachments,
sessionId,
thinking: p.thinking,
deliver: p.deliver,
timeout: Math.ceil(timeoutMs / 1000).toString(),
surface: "WebChat",
},
defaultRuntime,
deps,
);
const payload = {
runId: sessionId,
status: "ok" as const,
};
dedupe.set(`chat:${idem}`, { ts: Date.now(), ok: true, payload });
respond(true, payload, undefined, { runId: sessionId });
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId: sessionId,
status: "error" as const,
summary: String(err),
};
dedupe.set(`chat:${idem}`, {
ts: Date.now(),
ok: false,
payload,
error,
});
respond(false, payload, error, {
runId: sessionId,
error: formatForLog(err),
});
}
break;
}
case "status": {
const status = await getStatusSummary();
respond(true, status, undefined);
@@ -640,7 +920,9 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
const idem = params.idempotencyKey;
const cached = dedupe.get(`agent:${idem}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error, { cached: true });
respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
break;
}
const message = params.message.trim();
@@ -773,6 +1055,8 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
/* ignore */
}
}
chatRunSessions.clear();
chatRunBuffers.clear();
for (const c of clients) {
try {
c.socket.close(1012, "service restart");

View File

@@ -1,5 +1,5 @@
import chalk from "chalk";
import { loadConfig, type ClawdisConfig } from "../config/config.js";
import { type ClawdisConfig, loadConfig } from "../config/config.js";
import { normalizeE164 } from "../utils.js";
import {
getWebAuthAgeMs,

View File

@@ -3,9 +3,7 @@ import { spawn } from "node:child_process";
const DEFAULT_LAUNCHD_LABEL = "com.steipete.clawdis";
export function triggerClawdisRestart(): void {
const label =
process.env.CLAWDIS_LAUNCHD_LABEL ||
DEFAULT_LAUNCHD_LABEL;
const label = process.env.CLAWDIS_LAUNCHD_LABEL || DEFAULT_LAUNCHD_LABEL;
const uid =
typeof process.getuid === "function" ? process.getuid() : undefined;
const target = uid !== undefined ? `gui/${uid}/${label}` : label;

View File

@@ -3,7 +3,7 @@ import path from "node:path";
import util from "node:util";
import { Logger as TsLogger } from "tslog";
import { loadConfig, type ClawdisConfig } from "./config/config.js";
import { type ClawdisConfig, loadConfig } from "./config/config.js";
import { isVerbose } from "./globals.js";
// Pin to /tmp so mac Debug UI and docs match; os.tmpdir() can be a per-user

View File

@@ -45,7 +45,8 @@ export async function sendMessageTelegram(
const api = opts.api ?? bot?.api;
const mediaUrl = opts.mediaUrl?.trim();
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const sleep = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));
const sendWithRetry = async <T>(fn: () => Promise<T>, label: string) => {
let lastErr: unknown;
for (let attempt = 1; attempt <= 3; attempt++) {
@@ -53,12 +54,17 @@ export async function sendMessageTelegram(
return await fn();
} catch (err) {
lastErr = err;
const terminal = attempt === 3 ||
!/429|timeout|connect|reset|closed|unavailable|temporarily/i.test(String(err ?? ""));
const terminal =
attempt === 3 ||
!/429|timeout|connect|reset|closed|unavailable|temporarily/i.test(
String(err ?? ""),
);
if (terminal) break;
const backoff = 400 * attempt;
if (opts.verbose) {
console.warn(`telegram send retry ${attempt}/2 for ${label} in ${backoff}ms: ${String(err)}`);
console.warn(
`telegram send retry ${attempt}/2 for ${label} in ${backoff}ms: ${String(err)}`,
);
}
await sleep(backoff);
}
@@ -80,13 +86,25 @@ export async function sendMessageTelegram(
| Awaited<ReturnType<typeof api.sendAudio>>
| Awaited<ReturnType<typeof api.sendDocument>>;
if (kind === "image") {
result = await sendWithRetry(() => api.sendPhoto(chatId, file, { caption }), "photo");
result = await sendWithRetry(
() => api.sendPhoto(chatId, file, { caption }),
"photo",
);
} else if (kind === "video") {
result = await sendWithRetry(() => api.sendVideo(chatId, file, { caption }), "video");
result = await sendWithRetry(
() => api.sendVideo(chatId, file, { caption }),
"video",
);
} else if (kind === "audio") {
result = await sendWithRetry(() => api.sendAudio(chatId, file, { caption }), "audio");
result = await sendWithRetry(
() => api.sendAudio(chatId, file, { caption }),
"audio",
);
} else {
result = await sendWithRetry(() => api.sendDocument(chatId, file, { caption }), "document");
result = await sendWithRetry(
() => api.sendDocument(chatId, file, { caption }),
"document",
);
}
const messageId = String(result?.message_id ?? "unknown");
return { messageId, chatId: String(result?.chat?.id ?? chatId) };

View File

@@ -680,8 +680,12 @@ describe("web auto-reply", () => {
expect(resolver).toHaveBeenCalledTimes(1);
const args = resolver.mock.calls[0][0];
expect(args.Body).toContain("[WhatsApp +1 2025-01-01 00:00] [clawdis] first");
expect(args.Body).toContain("[WhatsApp +1 2025-01-01 01:00] [clawdis] second");
expect(args.Body).toContain(
"[WhatsApp +1 2025-01-01 00:00] [clawdis] first",
);
expect(args.Body).toContain(
"[WhatsApp +1 2025-01-01 01:00] [clawdis] second",
);
// Max listeners bumped to avoid warnings in multi-instance test runs
expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50);

View File

@@ -1,4 +1,5 @@
import { chunkText } from "../auto-reply/chunk.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { waitForever } from "../cli/wait.js";
@@ -10,7 +11,7 @@ import {
resolveStorePath,
saveSessionStore,
} from "../config/sessions.js";
import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
import { danger, isVerbose, logVerbose, success } from "../globals.js";
import { emitHeartbeatEvent } from "../infra/heartbeat-events.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { logInfo } from "../logger.js";
@@ -18,10 +19,10 @@ import { getChildLogger } from "../logging.js";
import { getQueueSize } from "../process/command-queue.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { jidToE164, normalizeE164 } from "../utils.js";
import { setActiveWebListener } from "./active-listener.js";
import { monitorWebInbox } from "./inbound.js";
import { loadWebMedia } from "./media.js";
import { sendMessageWhatsApp } from "./outbound.js";
import { setActiveWebListener } from "./active-listener.js";
import {
computeBackoff,
newConnectionId,
@@ -31,7 +32,6 @@ import {
sleepWithAbort,
} from "./reconnect.js";
import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
const WEB_TEXT_LIMIT = 4000;
const DEFAULT_GROUP_HISTORY_LIMIT = 50;
@@ -494,7 +494,8 @@ async function deliverWebReply(params: {
? [replyResult.mediaUrl]
: [];
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const sleep = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));
const sendWithRetry = async (
fn: () => Promise<unknown>,
@@ -1401,11 +1402,11 @@ export async function monitorWebProvider(
},
"web reconnect: scheduling retry",
);
runtime.error(
danger(
`WhatsApp Web connection closed (status ${status}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}… (${errorStr})`,
),
);
runtime.error(
danger(
`WhatsApp Web connection closed (status ${status}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}… (${errorStr})`,
),
);
await closeListener();
try {
await sleep(delay, abortSignal);

View File

@@ -6,8 +6,8 @@ import { logVerbose } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { toWhatsappJid } from "../utils.js";
import { loadWebMedia } from "./media.js";
import { getActiveWebListener } from "./active-listener.js";
import { loadWebMedia } from "./media.js";
import { createWaSocket, waitForWaConnection } from "./session.js";
export async function sendMessageWhatsApp(
@@ -18,7 +18,9 @@ export async function sendMessageWhatsApp(
const correlationId = randomUUID();
const active = getActiveWebListener();
const usingActive = Boolean(active);
const sock = usingActive ? null : await createWaSocket(false, options.verbose);
const sock = usingActive
? null
: await createWaSocket(false, options.verbose);
const logger = getChildLogger({
module: "web-outbound",
correlationId,
@@ -29,9 +31,12 @@ export async function sendMessageWhatsApp(
if (!usingActive) {
logInfo("🔌 Connecting to WhatsApp Web…");
logger.info("connecting to whatsapp web");
await waitForWaConnection(sock!);
if (!sock) {
throw new Error("WhatsApp socket unavailable");
}
await waitForWaConnection(sock);
try {
await sock!.sendPresenceUpdate("composing", jid);
await sock.sendPresenceUpdate("composing", jid);
} catch (err) {
logVerbose(`Presence update skipped: ${String(err)}`);
}
@@ -82,6 +87,7 @@ export async function sendMessageWhatsApp(
);
const result = usingActive
? await (async () => {
if (!active) throw new Error("Active web listener missing");
let mediaBuffer: Buffer | undefined;
let mediaType: string | undefined;
if (options.mediaUrl) {
@@ -89,13 +95,17 @@ export async function sendMessageWhatsApp(
mediaBuffer = media.buffer;
mediaType = media.contentType;
}
await active!.sendComposingTo(to);
return active!.sendMessage(to, body, mediaBuffer, mediaType);
await active.sendComposingTo(to);
return active.sendMessage(to, body, mediaBuffer, mediaType);
})()
: await sock!.sendMessage(jid, payload);
: await (async () => {
if (!sock) throw new Error("WhatsApp socket unavailable");
return sock.sendMessage(jid, payload);
})();
const messageId = usingActive
? (result as { messageId?: string })?.messageId ?? "unknown"
: (result as any)?.key?.id ?? "unknown";
? ((result as { messageId?: string })?.messageId ?? "unknown")
: ((result as { key?: { id?: string } } | undefined)?.key?.id ??
"unknown");
logInfo(
`✅ Sent via web session. Message ID: ${messageId} -> ${jid}${options.mediaUrl ? " (media)" : ""}`,
);

View File

@@ -1,11 +1,7 @@
import http from "node:http";
import type { AddressInfo } from "node:net";
import { describe, expect, test } from "vitest";
import { WebSocket } from "ws";
import {
__forceWebChatSnapshotForTests,
startWebChatServer,
stopWebChatServer,
} from "./server.js";
import { startWebChatServer, stopWebChatServer } from "./server.js";
async function getFreePort(): Promise<number> {
const { createServer } = await import("node:net");
@@ -19,76 +15,30 @@ async function getFreePort(): Promise<number> {
});
}
type SnapshotMessage = {
type?: string;
snapshot?: { stateVersion?: { presence?: number } };
};
type SessionMessage = { type?: string };
const fetchText = (url: string) =>
new Promise<string>((resolve, reject) => {
http
.get(url, (res) => {
const chunks: Buffer[] = [];
res
.on("data", (c) =>
chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c)),
)
.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8")))
.on("error", reject);
})
.on("error", reject);
});
describe("webchat server", () => {
test(
"hydrates snapshot to new sockets (offline mock)",
{ timeout: 8000 },
async () => {
const wPort = await getFreePort();
await startWebChatServer(wPort, undefined, { disableGateway: true });
const ws = new WebSocket(
`ws://127.0.0.1:${wPort}/webchat/socket?session=test`,
);
const messages: unknown[] = [];
ws.on("message", (data) => {
try {
messages.push(JSON.parse(String(data)));
} catch {
/* ignore */
}
});
try {
await new Promise<void>((resolve) => ws.once("open", resolve));
__forceWebChatSnapshotForTests({
presence: [],
health: {},
stateVersion: { presence: 1, health: 1 },
uptimeMs: 0,
});
const waitFor = async <T>(
pred: (m: unknown) => m is T,
label: string,
): Promise<T> => {
const start = Date.now();
while (Date.now() - start < 3000) {
const found = messages.find((m): m is T => {
try {
return pred(m);
} catch {
return false;
}
});
if (found) return found;
await new Promise((resolve) => setTimeout(resolve, 10));
}
throw new Error(`timeout waiting for ${label}`);
};
const isSessionMessage = (m: unknown): m is SessionMessage =>
typeof m === "object" &&
m !== null &&
(m as SessionMessage).type === "session";
const isSnapshotMessage = (m: unknown): m is SnapshotMessage =>
typeof m === "object" &&
m !== null &&
(m as SnapshotMessage).type === "gateway-snapshot";
await waitFor(isSessionMessage, "session");
const snap = await waitFor(isSnapshotMessage, "snapshot");
expect(snap.snapshot?.stateVersion?.presence).toBe(1);
} finally {
ws.close();
await stopWebChatServer();
}
},
);
describe("webchat server (static only)", () => {
test("serves index.html over loopback", { timeout: 8000 }, async () => {
const port = await getFreePort();
await startWebChatServer(port);
try {
const body = await fetchText(`http://127.0.0.1:${port}/`);
expect(body.toLowerCase()).toContain("<html");
} finally {
await stopWebChatServer();
}
});
});

View File

@@ -1,18 +1,8 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import http from "node:http";
import os from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { type WebSocket, WebSocketServer } from "ws";
import { loadConfig } from "../config/config.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import {
loadSessionStore,
resolveStorePath,
type SessionEntry,
} from "../config/sessions.js";
import { GatewayClient } from "../gateway/client.js";
import { logDebug, logError } from "../logger.js";
const WEBCHAT_DEFAULT_PORT = 18788;
@@ -22,16 +12,7 @@ type WebChatServerState = {
port: number;
};
type ChatMessage = { role: string; content: string };
type RpcPayload = { role: string; content: string };
let state: WebChatServerState | null = null;
let wss: WebSocketServer | null = null;
const wsSessions: Map<string, Set<WebSocket>> = new Map();
let gateway: GatewayClient | null = null;
let gatewayReady = false;
let latestSnapshot: Record<string, unknown> | null = null;
let latestPolicy: Record<string, unknown> | null = null;
function resolveWebRoot() {
const here = path.dirname(fileURLToPath(import.meta.url));
@@ -52,151 +33,6 @@ function resolveWebRoot() {
throw new Error(`webchat assets not found; tried: ${candidates.join(", ")}`);
}
function readBody(req: http.IncomingMessage): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req
.on("data", (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c)))
.on("end", () => resolve(Buffer.concat(chunks)))
.on("error", reject);
});
}
function pickSessionId(
sessionKey: string,
store: Record<string, SessionEntry>,
): string | null {
if (store[sessionKey]?.sessionId) return store[sessionKey].sessionId;
const first = Object.values(store)[0]?.sessionId;
return first ?? null;
}
function readSessionMessages(
sessionId: string,
storePath: string,
): ChatMessage[] {
const dir = path.dirname(storePath);
const candidates = [
path.join(dir, `${sessionId}.jsonl`),
path.join(
os.homedir(),
".tau/agent/sessions/clawdis",
`${sessionId}.jsonl`,
),
];
let content: string | null = null;
for (const p of candidates) {
if (fs.existsSync(p)) {
try {
content = fs.readFileSync(p, "utf-8");
break;
} catch {
// continue
}
}
}
if (!content) return [];
const messages: ChatMessage[] = [];
for (const line of content.split(/\r?\n/)) {
if (!line.trim()) continue;
try {
const obj = JSON.parse(line);
const msg = obj.message ?? obj;
if (!msg?.role || !msg?.content) continue;
messages.push({ role: msg.role, content: msg.content });
} catch (err) {
logDebug(`webchat history parse error: ${String(err)}`);
}
}
return messages;
}
function broadcastSession(sessionKey: string, payload: unknown) {
const conns = wsSessions.get(sessionKey);
if (!conns || conns.size === 0) return;
const msg = JSON.stringify(payload);
for (const ws of conns) {
try {
ws.send(msg);
} catch {
// ignore and let close handler prune
}
}
}
function broadcastAll(payload: unknown) {
const msg = JSON.stringify(payload);
for (const [, conns] of wsSessions) {
for (const ws of conns) {
try {
ws.send(msg);
} catch {
// ignore
}
}
}
}
async function handleRpc(
body: unknown,
meta?: { remoteAddress?: string | null; senderHost?: string },
): Promise<{ ok: boolean; payloads?: RpcPayload[]; error?: string }> {
const payload = body as {
text?: unknown;
thinking?: unknown;
deliver?: unknown;
to?: unknown;
timeout?: unknown;
};
const textRaw: string = (payload.text ?? "").toString();
if (!textRaw.trim()) return { ok: false, error: "empty text" };
if (!gateway || !gatewayReady) {
return { ok: false, error: "gateway unavailable" };
}
const thinking =
typeof payload.thinking === "string" ? payload.thinking : undefined;
const to = typeof payload.to === "string" ? payload.to : undefined;
const deliver = Boolean(payload.deliver);
const timeout =
typeof payload.timeout === "number" ? payload.timeout : undefined;
const idempotencyKey = randomUUID();
try {
// Wrap user text with surface + host/IP envelope
const message = formatAgentEnvelope({
surface: "WebChat",
from: meta?.senderHost ?? os.hostname(),
ip: meta?.remoteAddress ?? undefined,
timestamp: Date.now(),
body: textRaw.trim(),
});
// Send agent request; wait for final res (status ok/error)
const res = (await gateway.request(
"agent",
{
message,
thinking,
deliver,
to,
timeout,
idempotencyKey,
},
{ expectFinal: true },
)) as { status?: string; summary?: string };
if (res?.status && res.status !== "ok") {
return { ok: false, error: res.summary || res.status };
}
// The actual agent output is delivered via events; HTTP just returns ack.
return { ok: true, payloads: [] };
} catch (err) {
return { ok: false, error: String(err) };
}
}
function notFound(res: http.ServerResponse) {
res.statusCode = 404;
res.end("Not Found");
@@ -204,19 +40,10 @@ function notFound(res: http.ServerResponse) {
export async function startWebChatServer(
port = WEBCHAT_DEFAULT_PORT,
gatewayOverrideUrl?: string,
opts?: { disableGateway?: boolean },
) {
): Promise<WebChatServerState | null> {
if (state) return state;
const root = resolveWebRoot();
// Precompute session store root for file watching
const cfg = loadConfig();
const sessionCfg = cfg.inbound?.reply?.session;
const storePath = sessionCfg?.store
? resolveStorePath(sessionCfg.store)
: resolveStorePath(undefined);
const storeDir = path.dirname(storePath);
const server = http.createServer(async (req, res) => {
if (!req.url) return notFound(res);
@@ -230,59 +57,14 @@ export async function startWebChatServer(
}
const url = new URL(req.url, "http://127.0.0.1");
const isInfo = url.pathname === "/webchat/info" || url.pathname === "/info";
const isRpc = url.pathname === "/webchat/rpc" || url.pathname === "/rpc";
if (isInfo) {
const sessionKey = url.searchParams.get("session") ?? "main";
const store = loadSessionStore(storePath);
const sessionId = pickSessionId(sessionKey, store);
const messages = sessionId
? readSessionMessages(sessionId, storePath)
: [];
res.setHeader("Content-Type", "application/json");
res.end(
JSON.stringify({
port,
sessionKey,
storePath,
sessionId,
initialMessages: messages,
basePath: "/",
gatewayConnected: gatewayReady,
gatewaySnapshot: latestSnapshot,
gatewayPolicy: latestPolicy,
}),
);
return;
}
if (isRpc && req.method === "POST") {
const bodyBuf = await readBody(req);
let body: Record<string, unknown> = {};
try {
body = JSON.parse(bodyBuf.toString("utf-8"));
} catch {
// ignore
}
const forwarded =
(req.headers["x-forwarded-for"] as string | undefined)?.split(",")[0]?.trim() ??
req.socket.remoteAddress;
const result = await handleRpc(body, {
remoteAddress: forwarded,
senderHost: os.hostname(),
});
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify(result));
return;
}
if (url.pathname.startsWith("/webchat")) {
let rel = url.pathname.replace(/^\/webchat\/?/, "");
if (!rel || rel.endsWith("/")) rel = `${rel}index.html`;
const filePath = path.join(root, rel);
if (!filePath.startsWith(root)) return notFound(res);
if (!fs.existsSync(filePath)) return notFound(res);
if (!filePath.startsWith(root) || !fs.existsSync(filePath)) {
return notFound(res);
}
const data = fs.readFileSync(filePath);
const ext = path.extname(filePath).toLowerCase();
const type =
@@ -331,172 +113,6 @@ export async function startWebChatServer(
);
});
// Gateway connection (control/data plane)
const cfgObj = loadConfig() as Record<string, unknown>;
if (!opts?.disableGateway) {
const cfgGatewayPort =
(cfgObj.webchat as { gatewayPort?: number } | undefined)?.gatewayPort ??
18789;
const gatewayUrl = gatewayOverrideUrl ?? `ws://127.0.0.1:${cfgGatewayPort}`;
const gatewayToken =
process.env.CLAWDIS_GATEWAY_TOKEN ??
(cfgObj.gateway as { token?: string } | undefined)?.token;
gateway = new GatewayClient({
url: gatewayUrl,
token: gatewayToken,
clientName: "webchat-backend",
clientVersion:
process.env.CLAWDIS_VERSION ?? process.env.npm_package_version ?? "dev",
platform: process.platform,
mode: "webchat",
instanceId: `webchat-${os.hostname()}`,
onHelloOk: (hello) => {
gatewayReady = true;
latestSnapshot = hello.snapshot as Record<string, unknown>;
latestPolicy = hello.policy as Record<string, unknown>;
broadcastAll({
type: "gateway-snapshot",
snapshot: hello.snapshot,
policy: hello.policy,
});
},
onEvent: (evt) => {
broadcastAll({
type: "gateway-event",
event: evt.event,
payload: evt.payload,
seq: evt.seq,
stateVersion: evt.stateVersion,
});
},
onClose: () => {
gatewayReady = false;
},
onGap: async () => {
if (!gatewayReady || !gateway) return;
try {
const [health, presence] = await Promise.all([
gateway.request("health"),
gateway.request("system-presence"),
]);
latestSnapshot = {
...latestSnapshot,
health,
presence,
} as Record<string, unknown>;
broadcastAll({ type: "gateway-refresh", health, presence });
} catch (err) {
logError(`webchat gap refresh failed: ${String(err)}`);
}
},
});
gateway.start();
}
// WebSocket setup for live session updates.
wss = new WebSocketServer({ noServer: true });
server.on("upgrade", (req, socket, head) => {
try {
const url = new URL(req.url ?? "", "http://127.0.0.1");
if (url.pathname !== "/webchat/socket" && url.pathname !== "/socket") {
socket.destroy();
return;
}
const addr = req.socket.remoteAddress ?? "";
const isLocal =
addr.startsWith("127.") ||
addr === "::1" ||
addr.endsWith("127.0.0.1") ||
addr.endsWith("::ffff:127.0.0.1");
if (!isLocal) {
socket.destroy();
return;
}
const sessionKey = url.searchParams.get("session") ?? "main";
if (!wss) {
socket.destroy();
return;
}
wss.handleUpgrade(req, socket, head, (ws: WebSocket) => {
ws.on("close", () => {
const set = wsSessions.get(sessionKey);
if (set) {
set.delete(ws);
if (set.size === 0) wsSessions.delete(sessionKey);
}
});
wsSessions.set(
sessionKey,
(wsSessions.get(sessionKey) ?? new Set()).add(ws),
);
// Send initial snapshot
const store = loadSessionStore(storePath);
const sessionId = pickSessionId(sessionKey, store);
const sessionEntry = sessionKey ? store[sessionKey] : undefined;
const persistedThinking = sessionEntry?.thinkingLevel;
const messages = sessionId
? readSessionMessages(sessionId, storePath)
: [];
ws.send(
JSON.stringify({
type: "session",
sessionKey,
messages,
thinkingLevel:
typeof persistedThinking === "string"
? persistedThinking
: (cfg.inbound?.reply?.thinkingDefault ?? "off"),
}),
);
if (latestSnapshot) {
ws.send(
JSON.stringify({
type: "gateway-snapshot",
snapshot: latestSnapshot,
policy: latestPolicy,
}),
);
}
});
} catch (_err) {
socket.destroy();
}
});
// Watch for session/message file changes and push updates.
try {
if (fs.existsSync(storeDir)) {
fs.watch(storeDir, { persistent: false }, (_event, filename) => {
if (!filename) return;
// On any file change, refresh for active sessions.
for (const sessionKey of wsSessions.keys()) {
try {
const store = loadSessionStore(storePath);
const sessionId = pickSessionId(sessionKey, store);
const sessionEntry = sessionKey ? store[sessionKey] : undefined;
const persistedThinking = sessionEntry?.thinkingLevel;
const messages = sessionId
? readSessionMessages(sessionId, storePath)
: [];
broadcastSession(sessionKey, {
type: "session",
sessionKey,
messages,
thinkingLevel:
typeof persistedThinking === "string"
? persistedThinking
: (cfg.inbound?.reply?.thinkingDefault ?? "off"),
});
} catch {
// ignore
}
}
});
}
} catch {
// watcher is best-effort
}
state = { server, port };
logDebug(`webchat server listening on 127.0.0.1:${port}`);
return state;
@@ -504,67 +120,31 @@ export async function startWebChatServer(
export async function stopWebChatServer() {
if (!state) return;
gatewayReady = false;
gateway?.stop();
gateway = null;
if (wss) {
for (const client of wss.clients) {
try {
client.close();
} catch {
/* ignore */
}
}
await new Promise<void>((resolve) => wss?.close(() => resolve()));
}
if (state.server) {
await new Promise<void>((resolve) => state?.server.close(() => resolve()));
}
wss = null;
wsSessions.clear();
state = null;
}
export async function waitForWebChatGatewayReady(timeoutMs = 10000) {
const start = Date.now();
while (!latestSnapshot) {
if (Date.now() - start > timeoutMs) {
throw new Error("webchat gateway not ready");
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
// Legacy no-op: gateway readiness is now handled directly by clients.
export async function waitForWebChatGatewayReady() {
return;
}
// Test-only helpers to seed/broadcast without a live Gateway connection.
export function __forceWebChatSnapshotForTests(
snapshot: Record<string, unknown>,
policy?: Record<string, unknown>,
) {
latestSnapshot = snapshot;
latestPolicy = policy ?? null;
gatewayReady = true;
broadcastAll({
type: "gateway-snapshot",
snapshot: latestSnapshot,
policy: latestPolicy,
});
export function __forceWebChatSnapshotForTests() {
// no-op: snapshots now come from the Gateway WS directly.
}
export function __broadcastGatewayEventForTests(
event: string,
payload: unknown,
) {
broadcastAll({ type: "gateway-event", event, payload });
export async function __broadcastGatewayEventForTests() {
// no-op
}
export async function ensureWebChatServerFromConfig(opts?: {
gatewayUrl?: string;
}) {
export async function ensureWebChatServerFromConfig() {
const cfg = loadConfig();
if (cfg.webchat?.enabled === false) return null;
const port = cfg.webchat?.port ?? WEBCHAT_DEFAULT_PORT;
try {
return await startWebChatServer(port, opts?.gatewayUrl);
return await startWebChatServer(port);
} catch (err) {
logDebug(`webchat server failed to start: ${String(err)}`);
throw err;