From 8846ffec643682940a1c0f9fc9bd193c45d36b18 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 12 Dec 2025 23:33:12 +0000 Subject: [PATCH] fix: expose heartbeat controls and harden mac CLI --- .gitignore | 1 + .../Sources/Clawdis/ControlChannel.swift | 11 ++- apps/macos/Sources/ClawdisCLI/main.swift | 11 ++- .../GatewayEndpointStoreTests.swift | 11 +-- docs/mac/canvas.md | 18 +++++ src/gateway/server.test.ts | 81 +++++++++++++++++++ src/gateway/server.ts | 50 +++++++++++- 7 files changed, 171 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 1405d1165..e6cef82e9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ apps/macos/.build/ bin/clawdis-mac apps/macos/.build-local/ apps/macos/.swiftpm/ +Core/ apps/ios/*.xcodeproj/ apps/ios/*.xcworkspace/ apps/ios/.swiftpm/ diff --git a/apps/macos/Sources/Clawdis/ControlChannel.swift b/apps/macos/Sources/Clawdis/ControlChannel.swift index f74afdc2b..69e56a2a6 100644 --- a/apps/macos/Sources/Clawdis/ControlChannel.swift +++ b/apps/macos/Sources/Clawdis/ControlChannel.swift @@ -111,8 +111,8 @@ final class ControlChannel: ObservableObject { } func lastHeartbeat() async throws -> ControlHeartbeatEvent? { - // Heartbeat removed in new protocol - nil + let data = try await self.request(method: "last-heartbeat") + return try JSONDecoder().decode(ControlHeartbeatEvent?.self, from: data) } func request( @@ -219,6 +219,13 @@ final class ControlChannel: ObservableObject { AgentEventStore.shared.append(agent) self.routeWorkActivity(from: agent) } + case let .event(evt) where evt.event == "heartbeat": + if let payload = evt.payload, + let heartbeat = try? GatewayPayloadDecoding.decode(payload, as: ControlHeartbeatEvent.self), + let data = try? JSONEncoder().encode(heartbeat) + { + NotificationCenter.default.post(name: .controlHeartbeat, object: data) + } case let .event(evt) where evt.event == "shutdown": self.state = .degraded("gateway shutdown") case .snapshot: diff --git a/apps/macos/Sources/ClawdisCLI/main.swift b/apps/macos/Sources/ClawdisCLI/main.swift index 5b46d899b..d7d8589e6 100644 --- a/apps/macos/Sources/ClawdisCLI/main.swift +++ b/apps/macos/Sources/ClawdisCLI/main.swift @@ -491,6 +491,7 @@ struct ClawdisCLI { shutdown(fd, SHUT_WR) var data = Data() + let decoder = JSONDecoder() var buffer = [UInt8](repeating: 0, count: 8192) let bufSize = buffer.count while true { @@ -501,14 +502,20 @@ struct ClawdisCLI { until: deadline, timeoutSeconds: timeoutSeconds) let n = buffer.withUnsafeMutableBytes { read(fd, $0.baseAddress!, bufSize) } - if n > 0 { data.append(buffer, count: n); continue } + if n > 0 { + data.append(buffer, count: n) + if let resp = try? decoder.decode(Response.self, from: data) { + return resp + } + continue + } if n == 0 { break } if n == -1, errno == EINTR { continue } if n == -1, errno == EAGAIN { continue } throw POSIXError(POSIXErrorCode(rawValue: errno) ?? .EIO) } guard !data.isEmpty else { throw POSIXError(.ECONNRESET) } - return try JSONDecoder().decode(Response.self, from: data) + return try decoder.decode(Response.self, from: data) } private static func rpcTimeoutSeconds(for request: Request) -> TimeInterval { diff --git a/apps/macos/Tests/ClawdisIPCTests/GatewayEndpointStoreTests.swift b/apps/macos/Tests/ClawdisIPCTests/GatewayEndpointStoreTests.swift index ce0132a9b..f6140c970 100644 --- a/apps/macos/Tests/ClawdisIPCTests/GatewayEndpointStoreTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/GatewayEndpointStoreTests.swift @@ -39,7 +39,7 @@ import Testing #expect(cfg.token == "t") } - @Test func remoteWithoutTunnelIsUnavailable() async throws { + @Test func remoteWithoutTunnelRecoversByEnsuringTunnel() async throws { let mode = ModeBox(.remote) let store = GatewayEndpointStore(deps: .init( mode: { mode.get() }, @@ -48,12 +48,9 @@ import Testing remotePortIfRunning: { nil }, ensureRemoteTunnel: { 18789 })) - do { - _ = try await store.requireConfig() - Issue.record("expected requireConfig to throw") - } catch { - #expect(error.localizedDescription.contains("no active control tunnel")) - } + let cfg = try await store.requireConfig() + #expect(cfg.url.absoluteString == "ws://127.0.0.1:18789") + #expect(cfg.token == nil) } @Test func ensureRemoteTunnelPublishesReadyState() async throws { diff --git a/docs/mac/canvas.md b/docs/mac/canvas.md index 85a0ef789..e9c558a89 100644 --- a/docs/mac/canvas.md +++ b/docs/mac/canvas.md @@ -82,6 +82,24 @@ This should be modeled after `WebChatManager`/`WebChatWindowController` but targ Related: - For “invoke the agent again from UI” flows, prefer the macOS deep link scheme (`clawdis://agent?...`) so *any* UI surface (Canvas, WebChat, native views) can trigger a new agent run. See `docs/clawdis-mac.md`. +## Triggering agent runs from Canvas (deep links) + +Canvas can trigger new agent runs via the macOS app deep-link scheme: +- `clawdis://agent?...` + +This is intentionally separate from `clawdis-canvas://…` (which is only for serving local Canvas files into the `WKWebView`). + +Suggested patterns: +- HTML: render links/buttons that navigate to `clawdis://agent?message=...`. +- JS: set `window.location.href = 'clawdis://agent?...'` for “run this now” actions. + +Implementation note (important): +- In `WKWebView`, intercept `clawdis://…` navigations in `WKNavigationDelegate` and forward them to the app, e.g. by calling `DeepLinkHandler.shared.handle(url:)` and returning `.cancel` for the navigation. + +Safety: +- `clawdis://agent` is disabled by default and must be enabled in **Clawdis → Settings → Debug** (“Allow URL scheme (agent)”). +- Without a `key` query param, the app will prompt for confirmation before invoking the agent. + ## Security / guardrails Recommended defaults: diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index e301364b7..1c04ea864 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -8,6 +8,7 @@ import { WebSocket } from "ws"; import { agentCommand } from "../commands/agent.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; +import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; import { PROTOCOL_VERSION } from "./protocol/index.js"; import { startGatewayServer } from "./server.js"; @@ -172,6 +173,86 @@ async function connectOk( } describe("gateway server", () => { + test("broadcasts heartbeat events and serves last-heartbeat", async () => { + type HeartbeatPayload = { + ts: number; + status: string; + to?: string; + preview?: string; + durationMs?: number; + hasMedia?: boolean; + reason?: string; + }; + type EventFrame = { + type: "event"; + event: string; + payload?: HeartbeatPayload | null; + }; + type ResFrame = { + type: "res"; + id: string; + ok: boolean; + payload?: unknown; + }; + + const { server, ws } = await startServerWithClient(); + ws.send( + JSON.stringify({ + type: "hello", + minProtocol: 1, + maxProtocol: 1, + client: { name: "test", version: "1", platform: "test", mode: "test" }, + caps: [], + }), + ); + await onceMessage(ws, (o) => o.type === "hello-ok"); + + const waitHeartbeat = onceMessage( + ws, + (o) => o.type === "event" && o.event === "heartbeat", + ); + emitHeartbeatEvent({ status: "sent", to: "+123", preview: "ping" }); + const evt = await waitHeartbeat; + expect(evt.payload?.status).toBe("sent"); + expect(typeof evt.payload?.ts).toBe("number"); + + ws.send( + JSON.stringify({ + type: "req", + id: "hb-last", + method: "last-heartbeat", + }), + ); + const last = await onceMessage( + ws, + (o) => o.type === "res" && o.id === "hb-last", + ); + expect(last.ok).toBe(true); + const lastPayload = last.payload as HeartbeatPayload | null | undefined; + expect(lastPayload?.status).toBe("sent"); + expect(lastPayload?.ts).toBe(evt.payload?.ts); + + ws.send( + JSON.stringify({ + type: "req", + id: "hb-toggle-off", + method: "set-heartbeats", + params: { enabled: false }, + }), + ); + const toggle = await onceMessage( + ws, + (o) => o.type === "res" && o.id === "hb-toggle-off", + ); + expect(toggle.ok).toBe(true); + expect((toggle.payload as { enabled?: boolean } | undefined)?.enabled).toBe( + false, + ); + + ws.close(); + await server.close(); + }); + test("agent falls back to allowFrom when lastTo is stale", async () => { testAllowFrom = ["+436769770569"]; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 21f3453d0..e603b7ac0 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -22,6 +22,10 @@ import { import { isVerbose } from "../globals.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; +import { + getLastHeartbeatEvent, + onHeartbeatEvent, +} from "../infra/heartbeat-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { listSystemPresence, @@ -35,6 +39,7 @@ import { defaultRuntime } from "../runtime.js"; import { monitorTelegramProvider } from "../telegram/monitor.js"; import { sendMessageTelegram } from "../telegram/send.js"; import { normalizeE164 } from "../utils.js"; +import { setHeartbeatsEnabled } from "../web/auto-reply.js"; import { sendMessageWhatsApp } from "../web/outbound.js"; import { ensureWebChatServerFromConfig } from "../webchat/server.js"; import { buildMessageWithAttachments } from "./chat-attachments.js"; @@ -65,6 +70,8 @@ type Client = { const METHODS = [ "health", "status", + "last-heartbeat", + "set-heartbeats", "system-presence", "system-event", "send", @@ -74,7 +81,15 @@ const METHODS = [ "chat.send", ]; -const EVENTS = ["agent", "chat", "presence", "tick", "shutdown", "health"]; +const EVENTS = [ + "agent", + "chat", + "presence", + "tick", + "shutdown", + "health", + "heartbeat", +]; export type GatewayServer = { close: () => Promise; @@ -494,6 +509,10 @@ export async function startGatewayServer( } }); + const heartbeatUnsub = onHeartbeatEvent((evt) => { + broadcast("heartbeat", evt, { dropIfSlow: true }); + }); + wss.on("connection", (socket) => { let client: Client | null = null; let closed = false; @@ -974,6 +993,28 @@ export async function startGatewayServer( respond(true, status, undefined); break; } + case "last-heartbeat": { + respond(true, getLastHeartbeatEvent(), undefined); + break; + } + case "set-heartbeats": { + const params = (req.params ?? {}) as Record; + const enabled = params.enabled; + if (typeof enabled !== "boolean") { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "invalid set-heartbeats params: enabled (boolean) required", + ), + ); + break; + } + setHeartbeatsEnabled(enabled); + respond(true, { ok: true, enabled }, undefined); + break; + } case "system-presence": { const presence = listSystemPresence(); respond(true, presence, undefined); @@ -1399,6 +1440,13 @@ export async function startGatewayServer( /* ignore */ } } + if (heartbeatUnsub) { + try { + heartbeatUnsub(); + } catch { + /* ignore */ + } + } chatRunSessions.clear(); chatRunBuffers.clear(); for (const c of clients) {