diff --git a/docs/gateway.md b/docs/gateway.md index 65c5e63b3..b269b102c 100644 --- a/docs/gateway.md +++ b/docs/gateway.md @@ -22,7 +22,7 @@ pnpm clawdis gateway --force ``` - Binds WebSocket control plane to `127.0.0.1:` (default 18789). - Logs to stdout; use launchd/systemd to keep it alive and rotate logs. -- Pass `--verbose` to mirror debug logging from the log file into stdio when troubleshooting. +- Pass `--verbose` to mirror debug logging (handshakes, req/res, events) from the log file into stdio when troubleshooting. - `--force` uses `lsof` to find listeners on the chosen port, sends SIGTERM, logs what it killed, then starts the gateway (fails fast if `lsof` is missing). - Optional shared secret: pass `--token ` or set `CLAWDIS_GATEWAY_TOKEN` to require clients to send `hello.auth.token`. diff --git a/src/commands/health.ts b/src/commands/health.ts index 21a587ba4..2687060e7 100644 --- a/src/commands/health.ts +++ b/src/commands/health.ts @@ -77,9 +77,20 @@ async function probeWebConnect(timeoutMs: number): Promise { elapsedMs: Date.now() - started, }; } catch (err) { + const status = getStatusCode(err); + // Conflict/duplicate sessions are expected when the primary gateway session + // is already connected. Treat these as healthy so health checks don’t flap. + if (status === 409 || status === 440 || status === 515) { + return { + ok: true, + status, + error: "already connected (conflict)", + elapsedMs: Date.now() - started, + }; + } return { ok: false, - status: getStatusCode(err), + status, error: err instanceof Error ? err.message : String(err), elapsedMs: Date.now() - started, }; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index bca925000..d67b88638 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -14,7 +14,7 @@ import { listSystemPresence, upsertPresence, } from "../infra/system-presence.js"; -import { logError } from "../logger.js"; +import { logDebug, logError } from "../logger.js"; import { getResolvedLoggerSettings } from "../logging.js"; import { monitorWebProvider, webAuthExists } from "../providers/web/index.js"; import { defaultRuntime } from "../runtime.js"; @@ -82,6 +82,7 @@ const HANDSHAKE_TIMEOUT_MS = 3000; const TICK_INTERVAL_MS = 30_000; const DEDUPE_TTL_MS = 5 * 60_000; const DEDUPE_MAX = 1000; +const LOG_VALUE_LIMIT = 240; type DedupeEntry = { ts: number; @@ -93,6 +94,35 @@ const dedupe = new Map(); const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN; +function formatForLog(value: unknown): string { + try { + const str = + typeof value === "string" || typeof value === "number" + ? String(value) + : JSON.stringify(value); + if (!str) return ""; + return str.length > LOG_VALUE_LIMIT ? `${str.slice(0, LOG_VALUE_LIMIT)}...` : str; + } catch { + return String(value); + } +} + +function logWs( + direction: "in" | "out", + kind: string, + meta?: Record, +) { + if (!isVerbose()) return; + const parts = [`gateway/ws ${direction} ${kind}`]; + if (meta) { + for (const [key, raw] of Object.entries(meta)) { + if (raw === undefined) continue; + parts.push(`${key}=${formatForLog(raw)}`); + } + } + logDebug(parts.join(" ")); +} + function formatError(err: unknown): string { if (err instanceof Error) return err.message; if (typeof err === "string") return err; @@ -171,13 +201,22 @@ export async function startGatewayServer(port = 18789): Promise { stateVersion?: { presence?: number; health?: number }; }, ) => { + const eventSeq = ++seq; const frame = JSON.stringify({ type: "event", event, payload, - seq: ++seq, + seq: eventSeq, stateVersion: opts?.stateVersion, }); + logWs("out", "event", { + event, + seq: eventSeq, + clients: clients.size, + dropIfSlow: opts?.dropIfSlow, + presenceVersion: opts?.stateVersion?.presence, + healthVersion: opts?.stateVersion?.health, + }); for (const c of clients) { const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES; if (slow && opts?.dropIfSlow) continue; @@ -240,6 +279,10 @@ export async function startGatewayServer(port = 18789): Promise { let closed = false; const connId = randomUUID(); const deps = createDefaultDeps(); + const remoteAddr = ( + socket as WebSocket & { _socket?: { remoteAddress?: string } } + )._socket?.remoteAddress; + logWs("in", "connect", { connId, remoteAddr }); const send = (obj: unknown) => { try { @@ -279,6 +322,7 @@ export async function startGatewayServer(port = 18789): Promise { }, ); } + logWs("out", "close", { connId }); close(); }); @@ -309,6 +353,13 @@ export async function startGatewayServer(port = 18789): Promise { maxProtocol < PROTOCOL_VERSION || minProtocol > PROTOCOL_VERSION ) { + logWs("out", "hello-error", { + connId, + reason: "protocol mismatch", + minProtocol, + maxProtocol, + expected: PROTOCOL_VERSION, + }); send({ type: "hello-error", reason: "protocol mismatch", @@ -321,6 +372,7 @@ export async function startGatewayServer(port = 18789): Promise { // token auth if required const token = getGatewayToken(); if (token && hello.auth?.token !== token) { + logWs("out", "hello-error", { connId, reason: "unauthorized" }); send({ type: "hello-error", reason: "unauthorized", @@ -332,9 +384,15 @@ export async function startGatewayServer(port = 18789): Promise { // synthesize presence entry for this connection (client fingerprint) const presenceKey = hello.client.instanceId || connId; - const remoteAddr = ( - socket as WebSocket & { _socket?: { remoteAddress?: string } } - )._socket?.remoteAddress; + logWs("in", "hello", { + connId, + client: hello.client.name, + version: hello.client.version, + mode: hello.client.mode, + instanceId: hello.client.instanceId, + platform: hello.client.platform, + token: hello.auth?.token ? "set" : "none", + }); upsertPresence(presenceKey, { host: hello.client.name || os.hostname(), ip: remoteAddr, @@ -373,6 +431,13 @@ export async function startGatewayServer(port = 18789): Promise { // Add the client only after the hello response is ready so no tick/presence // events reach it before the handshake completes. client = { socket, hello, connId }; + logWs("out", "hello-ok", { + connId, + methods: METHODS.length, + events: EVENTS.length, + presence: snapshot.presence.length, + stateVersion: snapshot.stateVersion.presence, + }); send(helloOk); clients.add(client); return; @@ -392,8 +457,26 @@ export async function startGatewayServer(port = 18789): Promise { return; } const req = parsed as RequestFrame; - const respond = (ok: boolean, payload?: unknown, error?: ErrorShape) => + logWs("in", "req", { + connId, + id: req.id, + method: req.method, + }); + const respond = ( + ok: boolean, + payload?: unknown, + error?: ErrorShape, + meta?: Record, + ) => { send({ type: "res", id: req.id, ok, payload, error }); + logWs("out", "res", { + connId, + id: req.id, + ok, + method: req.method, + ...meta, + }); + }; switch (req.method) { case "health": { @@ -463,7 +546,9 @@ export async function startGatewayServer(port = 18789): Promise { const idem = params.idempotencyKey; const cached = dedupe.get(`send:${idem}`); if (cached) { - respond(cached.ok, cached.payload, cached.error); + respond(cached.ok, cached.payload, cached.error, { + cached: true, + }); break; } const to = params.to.trim(); @@ -486,7 +571,7 @@ export async function startGatewayServer(port = 18789): Promise { ok: true, payload, }); - respond(true, payload, undefined); + respond(true, payload, undefined, { provider }); } else { const result = await sendMessageWhatsApp(to, message, { mediaUrl: params.mediaUrl, @@ -503,12 +588,15 @@ export async function startGatewayServer(port = 18789): Promise { ok: true, payload, }); - respond(true, payload, undefined); + respond(true, payload, undefined, { provider }); } } catch (err) { const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); dedupe.set(`send:${idem}`, { ts: Date.now(), ok: false, error }); - respond(false, undefined, error); + respond(false, undefined, error, { + provider, + error: formatForLog(err), + }); } break; } @@ -537,7 +625,7 @@ export async function startGatewayServer(port = 18789): Promise { const idem = params.idempotencyKey; const cached = dedupe.get(`agent:${idem}`); if (cached) { - respond(cached.ok, cached.payload, cached.error); + respond(cached.ok, cached.payload, cached.error, { cached: true }); break; } const message = params.message.trim(); @@ -550,6 +638,12 @@ export async function startGatewayServer(port = 18789): Promise { seq: ++seq, }; socket.send(JSON.stringify(ackEvent)); + logWs("out", "event", { + connId, + event: "agent", + runId, + status: "accepted", + }); try { await agentCommand( { @@ -573,7 +667,7 @@ export async function startGatewayServer(port = 18789): Promise { ok: true, payload, }); - respond(true, payload, undefined); + respond(true, payload, undefined, { runId }); } catch (err) { const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); const payload = { @@ -587,7 +681,10 @@ export async function startGatewayServer(port = 18789): Promise { payload, error, }); - respond(false, payload, error); + respond(false, payload, error, { + runId, + error: formatForLog(err), + }); } break; } @@ -605,6 +702,7 @@ export async function startGatewayServer(port = 18789): Promise { } } catch (err) { logError(`gateway: parse/handle error: ${String(err)}`); + logWs("out", "parse-error", { connId, error: formatForLog(err) }); // If still in handshake, close; otherwise respond error if (!client) { close(); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index f6cd5ef20..404ed73a3 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -32,7 +32,6 @@ import { } from "./reconnect.js"; import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; -import { formatAgentEnvelope } from "../auto-reply/envelope.js"; const WEB_TEXT_LIMIT = 4000; const DEFAULT_GROUP_HISTORY_LIMIT = 50;