From 72eb240c3b8eab6ce08c9dcc423cabcee9b5f679 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 9 Dec 2025 17:02:58 +0100 Subject: [PATCH] gateway: harden ws protocol and liveness --- .../Sources/Clawdis/GatewayChannel.swift | 31 ++++++++++++++- src/commands/agent.ts | 38 +++++++------------ src/gateway/call.ts | 5 ++- src/gateway/client.ts | 34 +++++++++++++++++ src/gateway/server.ts | 35 +++++++++++------ src/infra/agent-events.ts | 7 +++- 6 files changed, 108 insertions(+), 42 deletions(-) diff --git a/apps/macos/Sources/Clawdis/GatewayChannel.swift b/apps/macos/Sources/Clawdis/GatewayChannel.swift index f3c068b3a..0f11f4dde 100644 --- a/apps/macos/Sources/Clawdis/GatewayChannel.swift +++ b/apps/macos/Sources/Clawdis/GatewayChannel.swift @@ -26,6 +26,8 @@ private actor GatewayChannelActor { private var backoffMs: Double = 500 private var shouldReconnect = true private var lastSeq: Int? + private var lastTick: Date? + private var tickIntervalMs: Double = 30_000 private let decoder = JSONDecoder() private let encoder = JSONEncoder() @@ -49,8 +51,8 @@ private actor GatewayChannelActor { private func sendHello() async throws { let hello: [String: Any] = [ "type": "hello", - "minProtocol": 1, - "maxProtocol": 1, + "minProtocol": GATEWAY_PROTOCOL_VERSION, + "maxProtocol": GATEWAY_PROTOCOL_VERSION, "client": [ "name": "clawdis-mac", "version": Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "dev", @@ -80,6 +82,12 @@ private actor GatewayChannelActor { guard let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], let type = obj["type"] as? String else { return false } if type == "hello-ok" { + if let policy = obj["policy"] as? [String: Any], + let tick = policy["tickIntervalMs"] as? Double { + self.tickIntervalMs = tick + } + self.lastTick = Date() + Task { await self.watchTicks() } NotificationCenter.default.post(name: .gatewaySnapshot, object: nil, userInfo: obj) return true } @@ -134,14 +142,33 @@ private actor GatewayChannelActor { } self.lastSeq = seq } + if evt.event == "tick" { self.lastTick = Date() } NotificationCenter.default.post(name: .gatewayEvent, object: frame) case .helloOk: + self.lastTick = Date() NotificationCenter.default.post(name: .gatewaySnapshot, object: frame) default: break } } + private func watchTicks() async { + let tolerance = self.tickIntervalMs * 2 + while self.connected { + try? await Task.sleep(nanoseconds: UInt64(tolerance * 1_000_000)) + guard self.connected else { return } + if let last = self.lastTick { + let delta = Date().timeIntervalSince(last) * 1000 + if delta > tolerance { + self.logger.error("gateway tick missed; reconnecting") + self.connected = false + await self.scheduleReconnect() + return + } + } + } + } + private func scheduleReconnect() async { guard self.shouldReconnect else { return } let delay = self.backoffMs / 1000 diff --git a/src/commands/agent.ts b/src/commands/agent.ts index c80b0dfe9..d1e6b8a23 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -25,7 +25,6 @@ import { emitAgentEvent } from "../infra/agent-events.js"; import { runCommandWithTimeout } from "../process/exec.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; -import { sendViaIpc } from "../web/ipc.js"; type AgentCommandOpts = { message: string; @@ -420,30 +419,19 @@ export async function agentCommand( if (deliver && targetTo) { const text = payload.text ?? ""; const media = mediaList; - // Prefer IPC to reuse the running relay; fall back to direct web send. - let sentViaIpc = false; - const ipcResult = await sendViaIpc(targetTo, text, media[0]); - if (ipcResult) { - sentViaIpc = ipcResult.success; - if (ipcResult.success && media.length > 1) { - for (const extra of media.slice(1)) { - await sendViaIpc(targetTo, "", extra); - } - } - } - if (!sentViaIpc) { - if (text || media.length === 0) { - await deps.sendMessageWhatsApp(targetTo, text, { - verbose: false, - mediaUrl: media[0], - }); - } - for (const extra of media.slice(1)) { - await deps.sendMessageWhatsApp(targetTo, "", { - verbose: false, - mediaUrl: extra, - }); - } + if (!text && media.length === 0) continue; + + const primaryMedia = media[0]; + await deps.sendMessageWhatsApp(targetTo, text, { + verbose: false, + mediaUrl: primaryMedia, + }); + + for (const extra of media.slice(1)) { + await deps.sendMessageWhatsApp(targetTo, "", { + verbose: false, + mediaUrl: extra, + }); } } } diff --git a/src/gateway/call.ts b/src/gateway/call.ts index 0d1661710..a137d3749 100644 --- a/src/gateway/call.ts +++ b/src/gateway/call.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import { GatewayClient } from "./client.js"; +import { PROTOCOL_VERSION } from "./protocol/index.js"; export type CallGatewayOptions = { url?: string; @@ -39,8 +40,8 @@ export async function callGateway( clientVersion: opts.clientVersion ?? "dev", platform: opts.platform, mode: opts.mode ?? "cli", - minProtocol: opts.minProtocol ?? 1, - maxProtocol: opts.maxProtocol ?? 1, + minProtocol: opts.minProtocol ?? PROTOCOL_VERSION, + maxProtocol: opts.maxProtocol ?? PROTOCOL_VERSION, onHelloOk: async () => { try { const result = await client.request(opts.method, opts.params, { diff --git a/src/gateway/client.ts b/src/gateway/client.ts index 6c0b024fb..936f48823 100644 --- a/src/gateway/client.ts +++ b/src/gateway/client.ts @@ -39,6 +39,10 @@ export class GatewayClient { private backoffMs = 1000; private closed = false; private lastSeq: number | null = null; + // Track last tick to detect silent stalls. + private lastTick: number | null = null; + private tickIntervalMs = 30_000; + private tickTimer: NodeJS.Timeout | null = null; constructor(opts: GatewayClientOptions) { this.opts = opts; @@ -66,6 +70,10 @@ export class GatewayClient { stop() { this.closed = true; + if (this.tickTimer) { + clearInterval(this.tickTimer); + this.tickTimer = null; + } this.ws?.close(); this.ws = null; this.flushPendingErrors(new Error("gateway client stopped")); @@ -94,6 +102,12 @@ export class GatewayClient { const parsed = JSON.parse(raw); if (parsed?.type === "hello-ok") { this.backoffMs = 1000; + this.tickIntervalMs = + typeof parsed.policy?.tickIntervalMs === "number" + ? parsed.policy.tickIntervalMs + : 30_000; + this.lastTick = Date.now(); + this.startTickWatch(); this.opts.onHelloOk?.(parsed as HelloOk); return; } @@ -111,6 +125,9 @@ export class GatewayClient { } this.lastSeq = seq; } + if (evt.event === "tick") { + this.lastTick = Date.now(); + } this.opts.onEvent?.(evt); return; } @@ -134,6 +151,10 @@ export class GatewayClient { private scheduleReconnect() { if (this.closed) return; + if (this.tickTimer) { + clearInterval(this.tickTimer); + this.tickTimer = null; + } const delay = this.backoffMs; this.backoffMs = Math.min(this.backoffMs * 2, 30_000); setTimeout(() => this.start(), delay).unref(); @@ -146,6 +167,19 @@ export class GatewayClient { this.pending.clear(); } + private startTickWatch() { + if (this.tickTimer) clearInterval(this.tickTimer); + const interval = Math.max(this.tickIntervalMs, 1000); + this.tickTimer = setInterval(() => { + if (this.closed) return; + if (!this.lastTick) return; + const gap = Date.now() - this.lastTick; + if (gap > this.tickIntervalMs * 2) { + this.ws?.close(4000, "tick timeout"); + } + }, interval); + } + async request( method: string, params?: unknown, diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 083896501..98d53f520 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -40,7 +40,6 @@ const METHODS = [ "status", "system-presence", "system-event", - "set-heartbeats", "send", "agent", ]; @@ -54,6 +53,8 @@ export type GatewayServer = { let presenceVersion = 1; let healthVersion = 1; let seq = 0; +// Track per-run sequence to detect out-of-order/lost agent events. +const agentRunSeq = new Map(); function buildSnapshot(): Snapshot { const presence = listSystemPresence(); @@ -147,6 +148,21 @@ export async function startGatewayServer(port = 18789): Promise { }, 60_000); const agentUnsub = onAgentEvent((evt) => { + const last = agentRunSeq.get(evt.runId) ?? 0; + if (evt.seq !== last + 1) { + // Fan out an error event so clients can refresh the stream on gaps. + broadcast("agent", { + runId: evt.runId, + stream: "error", + ts: Date.now(), + data: { + reason: "seq gap", + expected: last + 1, + received: evt.seq, + }, + }); + } + agentRunSeq.set(evt.runId, evt.seq); broadcast("agent", evt); }); @@ -247,14 +263,15 @@ export async function startGatewayServer(port = 18789): Promise { client = { socket, hello, connId }; clients.add(client); - // synthesize presence entry for this connection + // synthesize presence entry for this connection (client fingerprint) const presenceKey = hello.client.instanceId || connId; + const remoteAddr = ( + socket as WebSocket & { _socket?: { remoteAddress?: string } } + )._socket?.remoteAddress; upsertPresence(presenceKey, { - host: os.hostname(), - version: - process.env.CLAWDIS_VERSION ?? - process.env.npm_package_version ?? - "dev", + host: hello.client.name || os.hostname(), + ip: remoteAddr, + version: hello.client.version, mode: hello.client.mode, instanceId: hello.client.instanceId, reason: "connect", @@ -352,10 +369,6 @@ export async function startGatewayServer(port = 18789): Promise { respond(true, { ok: true }, undefined); break; } - case "set-heartbeats": { - respond(true, { ok: true }, undefined); - break; - } case "send": { const p = (req.params ?? {}) as Record; if (!validateSendParams(p)) { diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 0acd7c14a..929815615 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -6,13 +6,16 @@ export type AgentEventPayload = { data: Record; }; -let seq = 0; +// Keep per-run counters so streams stay strictly monotonic per runId. +const seqByRun = new Map(); const listeners = new Set<(evt: AgentEventPayload) => void>(); export function emitAgentEvent(event: Omit) { + const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1; + seqByRun.set(event.runId, nextSeq); const enriched: AgentEventPayload = { ...event, - seq: ++seq, + seq: nextSeq, ts: Date.now(), }; for (const listener of listeners) {