diff --git a/CHANGELOG.md b/CHANGELOG.md index 98ee02e4c..938e39cc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - macOS: add node bridge heartbeat pings to detect half-open sockets and reconnect cleanly. (#572) — thanks @ngutman +- Node bridge: harden keepalive + heartbeat handling (TCP keepalive, better disconnects, and keepalive config tests). (#577) — thanks @steipete - CLI: add `sandbox list` and `sandbox recreate` commands for managing Docker sandbox containers after image/config updates. (#563) — thanks @pasogott - Providers: add Microsoft Teams provider with polling, attachments, and CLI send support. (#404) — thanks @onutc - Commands: accept /models as an alias for /model. diff --git a/apps/macos/Sources/Clawdbot/NodeMode/MacNodeBridgeSession.swift b/apps/macos/Sources/Clawdbot/NodeMode/MacNodeBridgeSession.swift index 7c8f5ec7e..c33aa12a3 100644 --- a/apps/macos/Sources/Clawdbot/NodeMode/MacNodeBridgeSession.swift +++ b/apps/macos/Sources/Clawdbot/NodeMode/MacNodeBridgeSession.swift @@ -1,6 +1,7 @@ import ClawdbotKit import Foundation import Network +import OSLog actor MacNodeBridgeSession { private struct TimeoutError: LocalizedError { @@ -15,8 +16,10 @@ actor MacNodeBridgeSession { case failed(message: String) } + private let logger = Logger(subsystem: "com.clawdbot", category: "node.bridge-session") private let encoder = JSONEncoder() private let decoder = JSONDecoder() + private let clock = ContinuousClock() private var connection: NWConnection? private var queue: DispatchQueue? @@ -24,8 +27,7 @@ actor MacNodeBridgeSession { private var pendingRPC: [String: CheckedContinuation] = [:] private var serverEventSubscribers: [UUID: AsyncStream.Continuation] = [:] private var pingTask: Task? - private var lastPongAt: Date? - private var lastPingId: String? + private var lastPongAt: ContinuousClock.Instant? private(set) var state: State = .idle @@ -41,6 +43,12 @@ actor MacNodeBridgeSession { let params = NWParameters.tcp params.includePeerToPeer = true + let tcpOptions = NWProtocolTCP.Options() + tcpOptions.enableKeepalive = true + tcpOptions.keepaliveIdle = 30 + tcpOptions.keepaliveInterval = 15 + tcpOptions.keepaliveCount = 3 + params.defaultProtocolStack.transportProtocol = tcpOptions let connection = NWConnection(to: endpoint, using: params) let queue = DispatchQueue(label: "com.clawdbot.macos.bridge-session") self.connection = connection @@ -50,6 +58,10 @@ actor MacNodeBridgeSession { connection.start(queue: queue) try await Self.waitForReady(stateStream, timeoutSeconds: 6) + connection.stateUpdateHandler = { [weak self] state in + guard let self else { return } + Task { await self.handleConnectionState(state) } + } try await AsyncTimeout.withTimeout( seconds: 6, @@ -193,7 +205,6 @@ actor MacNodeBridgeSession { self.pingTask?.cancel() self.pingTask = nil self.lastPongAt = nil - self.lastPingId = nil self.connection?.cancel() self.connection = nil @@ -300,7 +311,7 @@ actor MacNodeBridgeSession { private func startPingLoop() { self.pingTask?.cancel() - self.lastPongAt = Date() + self.lastPongAt = self.clock.now self.pingTask = Task { [weak self] in guard let self else { return } await self.runPingLoop() @@ -308,30 +319,29 @@ actor MacNodeBridgeSession { } private func runPingLoop() async { - let intervalSeconds = 15.0 - let timeoutSeconds = 45.0 + let interval: Duration = .seconds(15) + let timeout: Duration = .seconds(45) while !Task.isCancelled { - do { - try await Task.sleep(nanoseconds: UInt64(intervalSeconds * 1_000_000_000)) - } catch { - return - } + try? await Task.sleep(for: interval) guard self.connection != nil else { return } - if let last = self.lastPongAt, - Date().timeIntervalSince(last) > timeoutSeconds - { - await self.disconnect() - return + if let last = self.lastPongAt { + let now = self.clock.now + if now > last.advanced(by: timeout) { + let age = last.duration(to: now) + self.logger.warning("Node bridge heartbeat timed out; disconnecting (age: \(String(describing: age), privacy: .public)).") + await self.disconnect() + return + } } let id = UUID().uuidString - self.lastPingId = id do { try await self.send(BridgePing(type: "ping", id: id)) } catch { + self.logger.warning("Node bridge ping send failed; disconnecting (error: \(String(describing: error), privacy: .public)).") await self.disconnect() return } @@ -340,7 +350,20 @@ actor MacNodeBridgeSession { private func notePong(_ pong: BridgePong) { _ = pong - self.lastPongAt = Date() + self.lastPongAt = self.clock.now + } + + private func handleConnectionState(_ state: NWConnection.State) async { + switch state { + case let .failed(error): + self.logger.warning("Node bridge connection failed; disconnecting (error: \(String(describing: error), privacy: .public)).") + await self.disconnect() + case .cancelled: + self.logger.warning("Node bridge connection cancelled; disconnecting.") + await self.disconnect() + default: + break + } } private static func makeStateStream( diff --git a/src/agents/sandbox-agent-config.test.ts b/src/agents/sandbox-agent-config.test.ts index ef8401198..fb2ae76ea 100644 --- a/src/agents/sandbox-agent-config.test.ts +++ b/src/agents/sandbox-agent-config.test.ts @@ -52,11 +52,11 @@ describe("Agent-specific sandbox config", () => { spawnCalls.length = 0; }); - it( - "should use global sandbox config when no agent-specific config exists", - { timeout: 15_000 }, - async () => { - const { resolveSandboxContext } = await import("./sandbox.js"); + it( + "should use global sandbox config when no agent-specific config exists", + { timeout: 15_000 }, + async () => { + const { resolveSandboxContext } = await import("./sandbox.js"); const cfg: ClawdbotConfig = { agents: { @@ -75,19 +75,19 @@ describe("Agent-specific sandbox config", () => { }, }; - const context = await resolveSandboxContext({ - config: cfg, - sessionKey: "agent:main:main", - workspaceDir: "/tmp/test", - }); + const context = await resolveSandboxContext({ + config: cfg, + sessionKey: "agent:main:main", + workspaceDir: "/tmp/test", + }); - expect(context).toBeDefined(); - expect(context?.enabled).toBe(true); - }, - ); + expect(context).toBeDefined(); + expect(context?.enabled).toBe(true); + }, + ); - it("should allow agent-specific docker setupCommand overrides", async () => { - const { resolveSandboxContext } = await import("./sandbox.js"); + it("should allow agent-specific docker setupCommand overrides", async () => { + const { resolveSandboxContext } = await import("./sandbox.js"); const cfg: ClawdbotConfig = { agents: { diff --git a/src/commands/doctor.test.ts b/src/commands/doctor.test.ts index e6ef9905d..ba61c0bcd 100644 --- a/src/commands/doctor.test.ts +++ b/src/commands/doctor.test.ts @@ -276,28 +276,28 @@ describe("doctor", () => { exit: vi.fn(), }; - migrateLegacyConfig.mockReturnValue({ - config: { whatsapp: { allowFrom: ["+15555550123"] } }, - changes: ["Moved routing.allowFrom → whatsapp.allowFrom."], - }); + migrateLegacyConfig.mockReturnValue({ + config: { whatsapp: { allowFrom: ["+15555550123"] } }, + changes: ["Moved routing.allowFrom → whatsapp.allowFrom."], + }); - await doctorCommand(runtime, { nonInteractive: true }); + await doctorCommand(runtime, { nonInteractive: true }); - expect(writeConfigFile).toHaveBeenCalledTimes(1); - const written = writeConfigFile.mock.calls[0]?.[0] as Record< - string, - unknown - >; - expect((written.whatsapp as Record)?.allowFrom).toEqual([ - "+15555550123", - ]); - expect(written.routing).toBeUndefined(); - }, - ); + expect(writeConfigFile).toHaveBeenCalledTimes(1); + const written = writeConfigFile.mock.calls[0]?.[0] as Record< + string, + unknown + >; + expect((written.whatsapp as Record)?.allowFrom).toEqual([ + "+15555550123", + ]); + expect(written.routing).toBeUndefined(); + }, + ); - it("migrates legacy Clawdis services", async () => { - readConfigFileSnapshot.mockResolvedValue({ - path: "/tmp/clawdbot.json", + it("migrates legacy Clawdis services", async () => { + readConfigFileSnapshot.mockResolvedValue({ + path: "/tmp/clawdbot.json", exists: true, raw: "{}", parsed: {}, diff --git a/src/infra/bridge/server.test.ts b/src/infra/bridge/server.test.ts index 7fbb29831..c60de4ca5 100644 --- a/src/infra/bridge/server.test.ts +++ b/src/infra/bridge/server.test.ts @@ -3,10 +3,10 @@ import net from "node:net"; import os from "node:os"; import path from "node:path"; -import { afterAll, beforeAll, describe, expect, it } from "vitest"; +import { afterAll, beforeAll, describe, expect, it, vi } from "vitest"; import { approveNodePairing, listNodePairing } from "../node-pairing.js"; -import { startNodeBridgeServer } from "./server.js"; +import { configureNodeBridgeSocket, startNodeBridgeServer } from "./server.js"; function createLineReader(socket: net.Socket) { let buffer = ""; @@ -70,6 +70,16 @@ describe("node bridge server", () => { delete process.env.CLAWDBOT_ENABLE_BRIDGE_IN_TESTS; }); + it("enables keepalive on sockets", () => { + const socket = { + setNoDelay: vi.fn(), + setKeepAlive: vi.fn(), + }; + configureNodeBridgeSocket(socket); + expect(socket.setNoDelay).toHaveBeenCalledWith(true); + expect(socket.setKeepAlive).toHaveBeenCalledWith(true, 15_000); + }); + it("rejects hello when not paired", async () => { const server = await startNodeBridgeServer({ host: "127.0.0.1", diff --git a/src/infra/bridge/server.ts b/src/infra/bridge/server.ts index bff99abec..10c5e7db9 100644 --- a/src/infra/bridge/server.ts +++ b/src/infra/bridge/server.ts @@ -160,6 +160,14 @@ function isTestEnv() { return process.env.NODE_ENV === "test" || Boolean(process.env.VITEST); } +export function configureNodeBridgeSocket(socket: { + setNoDelay: (noDelay?: boolean) => void; + setKeepAlive: (enable?: boolean, initialDelay?: number) => void; +}) { + socket.setNoDelay(true); + socket.setKeepAlive(true, 15_000); +} + function encodeLine(frame: AnyBridgeFrame) { return `${JSON.stringify(frame)}\n`; } @@ -228,7 +236,7 @@ export async function startNodeBridgeServer( const loopbackHost = "127.0.0.1"; const onConnection = (socket: net.Socket) => { - socket.setNoDelay(true); + configureNodeBridgeSocket(socket); let buffer = ""; let isAuthenticated = false;