gateway: improve conflict handling and logging

This commit is contained in:
Peter Steinberger
2025-12-09 20:07:24 +00:00
parent 0de944be28
commit 5e5845547e
4 changed files with 124 additions and 16 deletions

View File

@@ -22,7 +22,7 @@ pnpm clawdis gateway --force
``` ```
- Binds WebSocket control plane to `127.0.0.1:<port>` (default 18789). - Binds WebSocket control plane to `127.0.0.1:<port>` (default 18789).
- Logs to stdout; use launchd/systemd to keep it alive and rotate logs. - Logs to stdout; use launchd/systemd to keep it alive and rotate logs.
- Pass `--verbose` to mirror debug logging from the log file into stdio when troubleshooting. - Pass `--verbose` to mirror debug logging (handshakes, req/res, events) from the log file into stdio when troubleshooting.
- `--force` uses `lsof` to find listeners on the chosen port, sends SIGTERM, logs what it killed, then starts the gateway (fails fast if `lsof` is missing). - `--force` uses `lsof` to find listeners on the chosen port, sends SIGTERM, logs what it killed, then starts the gateway (fails fast if `lsof` is missing).
- Optional shared secret: pass `--token <value>` or set `CLAWDIS_GATEWAY_TOKEN` to require clients to send `hello.auth.token`. - Optional shared secret: pass `--token <value>` or set `CLAWDIS_GATEWAY_TOKEN` to require clients to send `hello.auth.token`.

View File

@@ -77,9 +77,20 @@ async function probeWebConnect(timeoutMs: number): Promise<HealthConnect> {
elapsedMs: Date.now() - started, elapsedMs: Date.now() - started,
}; };
} catch (err) { } catch (err) {
const status = getStatusCode(err);
// Conflict/duplicate sessions are expected when the primary gateway session
// is already connected. Treat these as healthy so health checks dont flap.
if (status === 409 || status === 440 || status === 515) {
return {
ok: true,
status,
error: "already connected (conflict)",
elapsedMs: Date.now() - started,
};
}
return { return {
ok: false, ok: false,
status: getStatusCode(err), status,
error: err instanceof Error ? err.message : String(err), error: err instanceof Error ? err.message : String(err),
elapsedMs: Date.now() - started, elapsedMs: Date.now() - started,
}; };

View File

@@ -14,7 +14,7 @@ import {
listSystemPresence, listSystemPresence,
upsertPresence, upsertPresence,
} from "../infra/system-presence.js"; } from "../infra/system-presence.js";
import { logError } from "../logger.js"; import { logDebug, logError } from "../logger.js";
import { getResolvedLoggerSettings } from "../logging.js"; import { getResolvedLoggerSettings } from "../logging.js";
import { monitorWebProvider, webAuthExists } from "../providers/web/index.js"; import { monitorWebProvider, webAuthExists } from "../providers/web/index.js";
import { defaultRuntime } from "../runtime.js"; import { defaultRuntime } from "../runtime.js";
@@ -82,6 +82,7 @@ const HANDSHAKE_TIMEOUT_MS = 3000;
const TICK_INTERVAL_MS = 30_000; const TICK_INTERVAL_MS = 30_000;
const DEDUPE_TTL_MS = 5 * 60_000; const DEDUPE_TTL_MS = 5 * 60_000;
const DEDUPE_MAX = 1000; const DEDUPE_MAX = 1000;
const LOG_VALUE_LIMIT = 240;
type DedupeEntry = { type DedupeEntry = {
ts: number; ts: number;
@@ -93,6 +94,35 @@ const dedupe = new Map<string, DedupeEntry>();
const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN; const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN;
function formatForLog(value: unknown): string {
try {
const str =
typeof value === "string" || typeof value === "number"
? String(value)
: JSON.stringify(value);
if (!str) return "";
return str.length > LOG_VALUE_LIMIT ? `${str.slice(0, LOG_VALUE_LIMIT)}...` : str;
} catch {
return String(value);
}
}
function logWs(
direction: "in" | "out",
kind: string,
meta?: Record<string, unknown>,
) {
if (!isVerbose()) return;
const parts = [`gateway/ws ${direction} ${kind}`];
if (meta) {
for (const [key, raw] of Object.entries(meta)) {
if (raw === undefined) continue;
parts.push(`${key}=${formatForLog(raw)}`);
}
}
logDebug(parts.join(" "));
}
function formatError(err: unknown): string { function formatError(err: unknown): string {
if (err instanceof Error) return err.message; if (err instanceof Error) return err.message;
if (typeof err === "string") return err; if (typeof err === "string") return err;
@@ -171,13 +201,22 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
stateVersion?: { presence?: number; health?: number }; stateVersion?: { presence?: number; health?: number };
}, },
) => { ) => {
const eventSeq = ++seq;
const frame = JSON.stringify({ const frame = JSON.stringify({
type: "event", type: "event",
event, event,
payload, payload,
seq: ++seq, seq: eventSeq,
stateVersion: opts?.stateVersion, stateVersion: opts?.stateVersion,
}); });
logWs("out", "event", {
event,
seq: eventSeq,
clients: clients.size,
dropIfSlow: opts?.dropIfSlow,
presenceVersion: opts?.stateVersion?.presence,
healthVersion: opts?.stateVersion?.health,
});
for (const c of clients) { for (const c of clients) {
const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES; const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES;
if (slow && opts?.dropIfSlow) continue; if (slow && opts?.dropIfSlow) continue;
@@ -240,6 +279,10 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
let closed = false; let closed = false;
const connId = randomUUID(); const connId = randomUUID();
const deps = createDefaultDeps(); const deps = createDefaultDeps();
const remoteAddr = (
socket as WebSocket & { _socket?: { remoteAddress?: string } }
)._socket?.remoteAddress;
logWs("in", "connect", { connId, remoteAddr });
const send = (obj: unknown) => { const send = (obj: unknown) => {
try { try {
@@ -279,6 +322,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
}, },
); );
} }
logWs("out", "close", { connId });
close(); close();
}); });
@@ -309,6 +353,13 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
maxProtocol < PROTOCOL_VERSION || maxProtocol < PROTOCOL_VERSION ||
minProtocol > PROTOCOL_VERSION minProtocol > PROTOCOL_VERSION
) { ) {
logWs("out", "hello-error", {
connId,
reason: "protocol mismatch",
minProtocol,
maxProtocol,
expected: PROTOCOL_VERSION,
});
send({ send({
type: "hello-error", type: "hello-error",
reason: "protocol mismatch", reason: "protocol mismatch",
@@ -321,6 +372,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
// token auth if required // token auth if required
const token = getGatewayToken(); const token = getGatewayToken();
if (token && hello.auth?.token !== token) { if (token && hello.auth?.token !== token) {
logWs("out", "hello-error", { connId, reason: "unauthorized" });
send({ send({
type: "hello-error", type: "hello-error",
reason: "unauthorized", reason: "unauthorized",
@@ -332,9 +384,15 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
// synthesize presence entry for this connection (client fingerprint) // synthesize presence entry for this connection (client fingerprint)
const presenceKey = hello.client.instanceId || connId; const presenceKey = hello.client.instanceId || connId;
const remoteAddr = ( logWs("in", "hello", {
socket as WebSocket & { _socket?: { remoteAddress?: string } } connId,
)._socket?.remoteAddress; client: hello.client.name,
version: hello.client.version,
mode: hello.client.mode,
instanceId: hello.client.instanceId,
platform: hello.client.platform,
token: hello.auth?.token ? "set" : "none",
});
upsertPresence(presenceKey, { upsertPresence(presenceKey, {
host: hello.client.name || os.hostname(), host: hello.client.name || os.hostname(),
ip: remoteAddr, ip: remoteAddr,
@@ -373,6 +431,13 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
// Add the client only after the hello response is ready so no tick/presence // Add the client only after the hello response is ready so no tick/presence
// events reach it before the handshake completes. // events reach it before the handshake completes.
client = { socket, hello, connId }; client = { socket, hello, connId };
logWs("out", "hello-ok", {
connId,
methods: METHODS.length,
events: EVENTS.length,
presence: snapshot.presence.length,
stateVersion: snapshot.stateVersion.presence,
});
send(helloOk); send(helloOk);
clients.add(client); clients.add(client);
return; return;
@@ -392,8 +457,26 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
return; return;
} }
const req = parsed as RequestFrame; const req = parsed as RequestFrame;
const respond = (ok: boolean, payload?: unknown, error?: ErrorShape) => logWs("in", "req", {
connId,
id: req.id,
method: req.method,
});
const respond = (
ok: boolean,
payload?: unknown,
error?: ErrorShape,
meta?: Record<string, unknown>,
) => {
send({ type: "res", id: req.id, ok, payload, error }); send({ type: "res", id: req.id, ok, payload, error });
logWs("out", "res", {
connId,
id: req.id,
ok,
method: req.method,
...meta,
});
};
switch (req.method) { switch (req.method) {
case "health": { case "health": {
@@ -463,7 +546,9 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
const idem = params.idempotencyKey; const idem = params.idempotencyKey;
const cached = dedupe.get(`send:${idem}`); const cached = dedupe.get(`send:${idem}`);
if (cached) { if (cached) {
respond(cached.ok, cached.payload, cached.error); respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
break; break;
} }
const to = params.to.trim(); const to = params.to.trim();
@@ -486,7 +571,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
ok: true, ok: true,
payload, payload,
}); });
respond(true, payload, undefined); respond(true, payload, undefined, { provider });
} else { } else {
const result = await sendMessageWhatsApp(to, message, { const result = await sendMessageWhatsApp(to, message, {
mediaUrl: params.mediaUrl, mediaUrl: params.mediaUrl,
@@ -503,12 +588,15 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
ok: true, ok: true,
payload, payload,
}); });
respond(true, payload, undefined); respond(true, payload, undefined, { provider });
} }
} catch (err) { } catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
dedupe.set(`send:${idem}`, { ts: Date.now(), ok: false, error }); dedupe.set(`send:${idem}`, { ts: Date.now(), ok: false, error });
respond(false, undefined, error); respond(false, undefined, error, {
provider,
error: formatForLog(err),
});
} }
break; break;
} }
@@ -537,7 +625,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
const idem = params.idempotencyKey; const idem = params.idempotencyKey;
const cached = dedupe.get(`agent:${idem}`); const cached = dedupe.get(`agent:${idem}`);
if (cached) { if (cached) {
respond(cached.ok, cached.payload, cached.error); respond(cached.ok, cached.payload, cached.error, { cached: true });
break; break;
} }
const message = params.message.trim(); const message = params.message.trim();
@@ -550,6 +638,12 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
seq: ++seq, seq: ++seq,
}; };
socket.send(JSON.stringify(ackEvent)); socket.send(JSON.stringify(ackEvent));
logWs("out", "event", {
connId,
event: "agent",
runId,
status: "accepted",
});
try { try {
await agentCommand( await agentCommand(
{ {
@@ -573,7 +667,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
ok: true, ok: true,
payload, payload,
}); });
respond(true, payload, undefined); respond(true, payload, undefined, { runId });
} catch (err) { } catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = { const payload = {
@@ -587,7 +681,10 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
payload, payload,
error, error,
}); });
respond(false, payload, error); respond(false, payload, error, {
runId,
error: formatForLog(err),
});
} }
break; break;
} }
@@ -605,6 +702,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
} }
} catch (err) { } catch (err) {
logError(`gateway: parse/handle error: ${String(err)}`); logError(`gateway: parse/handle error: ${String(err)}`);
logWs("out", "parse-error", { connId, error: formatForLog(err) });
// If still in handshake, close; otherwise respond error // If still in handshake, close; otherwise respond error
if (!client) { if (!client) {
close(); close();

View File

@@ -32,7 +32,6 @@ import {
} from "./reconnect.js"; } from "./reconnect.js";
import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js"; import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
const WEB_TEXT_LIMIT = 4000; const WEB_TEXT_LIMIT = 4000;
const DEFAULT_GROUP_HISTORY_LIMIT = 50; const DEFAULT_GROUP_HISTORY_LIMIT = 50;