Merge pull request #577 from clawdbot/fix/node-bridge-keepalive

fix: harden node bridge keepalive
This commit is contained in:
Peter Steinberger
2026-01-09 14:46:59 +00:00
committed by GitHub
6 changed files with 98 additions and 56 deletions

View File

@@ -3,6 +3,7 @@
## Unreleased ## Unreleased
- macOS: add node bridge heartbeat pings to detect half-open sockets and reconnect cleanly. (#572) — thanks @ngutman - macOS: add node bridge heartbeat pings to detect half-open sockets and reconnect cleanly. (#572) — thanks @ngutman
- Node bridge: harden keepalive + heartbeat handling (TCP keepalive, better disconnects, and keepalive config tests). (#577) — thanks @steipete
- CLI: add `sandbox list` and `sandbox recreate` commands for managing Docker sandbox containers after image/config updates. (#563) — thanks @pasogott - CLI: add `sandbox list` and `sandbox recreate` commands for managing Docker sandbox containers after image/config updates. (#563) — thanks @pasogott
- Providers: add Microsoft Teams provider with polling, attachments, and CLI send support. (#404) — thanks @onutc - Providers: add Microsoft Teams provider with polling, attachments, and CLI send support. (#404) — thanks @onutc
- Commands: accept /models as an alias for /model. - Commands: accept /models as an alias for /model.

View File

@@ -1,6 +1,7 @@
import ClawdbotKit import ClawdbotKit
import Foundation import Foundation
import Network import Network
import OSLog
actor MacNodeBridgeSession { actor MacNodeBridgeSession {
private struct TimeoutError: LocalizedError { private struct TimeoutError: LocalizedError {
@@ -15,8 +16,10 @@ actor MacNodeBridgeSession {
case failed(message: String) case failed(message: String)
} }
private let logger = Logger(subsystem: "com.clawdbot", category: "node.bridge-session")
private let encoder = JSONEncoder() private let encoder = JSONEncoder()
private let decoder = JSONDecoder() private let decoder = JSONDecoder()
private let clock = ContinuousClock()
private var connection: NWConnection? private var connection: NWConnection?
private var queue: DispatchQueue? private var queue: DispatchQueue?
@@ -24,8 +27,7 @@ actor MacNodeBridgeSession {
private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:] private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:]
private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:] private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:]
private var pingTask: Task<Void, Never>? private var pingTask: Task<Void, Never>?
private var lastPongAt: Date? private var lastPongAt: ContinuousClock.Instant?
private var lastPingId: String?
private(set) var state: State = .idle private(set) var state: State = .idle
@@ -41,6 +43,12 @@ actor MacNodeBridgeSession {
let params = NWParameters.tcp let params = NWParameters.tcp
params.includePeerToPeer = true params.includePeerToPeer = true
let tcpOptions = NWProtocolTCP.Options()
tcpOptions.enableKeepalive = true
tcpOptions.keepaliveIdle = 30
tcpOptions.keepaliveInterval = 15
tcpOptions.keepaliveCount = 3
params.defaultProtocolStack.transportProtocol = tcpOptions
let connection = NWConnection(to: endpoint, using: params) let connection = NWConnection(to: endpoint, using: params)
let queue = DispatchQueue(label: "com.clawdbot.macos.bridge-session") let queue = DispatchQueue(label: "com.clawdbot.macos.bridge-session")
self.connection = connection self.connection = connection
@@ -50,6 +58,10 @@ actor MacNodeBridgeSession {
connection.start(queue: queue) connection.start(queue: queue)
try await Self.waitForReady(stateStream, timeoutSeconds: 6) try await Self.waitForReady(stateStream, timeoutSeconds: 6)
connection.stateUpdateHandler = { [weak self] state in
guard let self else { return }
Task { await self.handleConnectionState(state) }
}
try await AsyncTimeout.withTimeout( try await AsyncTimeout.withTimeout(
seconds: 6, seconds: 6,
@@ -193,7 +205,6 @@ actor MacNodeBridgeSession {
self.pingTask?.cancel() self.pingTask?.cancel()
self.pingTask = nil self.pingTask = nil
self.lastPongAt = nil self.lastPongAt = nil
self.lastPingId = nil
self.connection?.cancel() self.connection?.cancel()
self.connection = nil self.connection = nil
@@ -300,7 +311,7 @@ actor MacNodeBridgeSession {
private func startPingLoop() { private func startPingLoop() {
self.pingTask?.cancel() self.pingTask?.cancel()
self.lastPongAt = Date() self.lastPongAt = self.clock.now
self.pingTask = Task { [weak self] in self.pingTask = Task { [weak self] in
guard let self else { return } guard let self else { return }
await self.runPingLoop() await self.runPingLoop()
@@ -308,30 +319,29 @@ actor MacNodeBridgeSession {
} }
private func runPingLoop() async { private func runPingLoop() async {
let intervalSeconds = 15.0 let interval: Duration = .seconds(15)
let timeoutSeconds = 45.0 let timeout: Duration = .seconds(45)
while !Task.isCancelled { while !Task.isCancelled {
do { try? await Task.sleep(for: interval)
try await Task.sleep(nanoseconds: UInt64(intervalSeconds * 1_000_000_000))
} catch {
return
}
guard self.connection != nil else { return } guard self.connection != nil else { return }
if let last = self.lastPongAt, if let last = self.lastPongAt {
Date().timeIntervalSince(last) > timeoutSeconds let now = self.clock.now
{ if now > last.advanced(by: timeout) {
await self.disconnect() let age = last.duration(to: now)
return self.logger.warning("Node bridge heartbeat timed out; disconnecting (age: \(String(describing: age), privacy: .public)).")
await self.disconnect()
return
}
} }
let id = UUID().uuidString let id = UUID().uuidString
self.lastPingId = id
do { do {
try await self.send(BridgePing(type: "ping", id: id)) try await self.send(BridgePing(type: "ping", id: id))
} catch { } catch {
self.logger.warning("Node bridge ping send failed; disconnecting (error: \(String(describing: error), privacy: .public)).")
await self.disconnect() await self.disconnect()
return return
} }
@@ -340,7 +350,20 @@ actor MacNodeBridgeSession {
private func notePong(_ pong: BridgePong) { private func notePong(_ pong: BridgePong) {
_ = pong _ = pong
self.lastPongAt = Date() self.lastPongAt = self.clock.now
}
private func handleConnectionState(_ state: NWConnection.State) async {
switch state {
case let .failed(error):
self.logger.warning("Node bridge connection failed; disconnecting (error: \(String(describing: error), privacy: .public)).")
await self.disconnect()
case .cancelled:
self.logger.warning("Node bridge connection cancelled; disconnecting.")
await self.disconnect()
default:
break
}
} }
private static func makeStateStream( private static func makeStateStream(

View File

@@ -52,11 +52,11 @@ describe("Agent-specific sandbox config", () => {
spawnCalls.length = 0; spawnCalls.length = 0;
}); });
it( it(
"should use global sandbox config when no agent-specific config exists", "should use global sandbox config when no agent-specific config exists",
{ timeout: 15_000 }, { timeout: 15_000 },
async () => { async () => {
const { resolveSandboxContext } = await import("./sandbox.js"); const { resolveSandboxContext } = await import("./sandbox.js");
const cfg: ClawdbotConfig = { const cfg: ClawdbotConfig = {
agents: { agents: {
@@ -75,19 +75,19 @@ describe("Agent-specific sandbox config", () => {
}, },
}; };
const context = await resolveSandboxContext({ const context = await resolveSandboxContext({
config: cfg, config: cfg,
sessionKey: "agent:main:main", sessionKey: "agent:main:main",
workspaceDir: "/tmp/test", workspaceDir: "/tmp/test",
}); });
expect(context).toBeDefined(); expect(context).toBeDefined();
expect(context?.enabled).toBe(true); expect(context?.enabled).toBe(true);
}, },
); );
it("should allow agent-specific docker setupCommand overrides", async () => { it("should allow agent-specific docker setupCommand overrides", async () => {
const { resolveSandboxContext } = await import("./sandbox.js"); const { resolveSandboxContext } = await import("./sandbox.js");
const cfg: ClawdbotConfig = { const cfg: ClawdbotConfig = {
agents: { agents: {

View File

@@ -276,28 +276,28 @@ describe("doctor", () => {
exit: vi.fn(), exit: vi.fn(),
}; };
migrateLegacyConfig.mockReturnValue({ migrateLegacyConfig.mockReturnValue({
config: { whatsapp: { allowFrom: ["+15555550123"] } }, config: { whatsapp: { allowFrom: ["+15555550123"] } },
changes: ["Moved routing.allowFrom → whatsapp.allowFrom."], changes: ["Moved routing.allowFrom → whatsapp.allowFrom."],
}); });
await doctorCommand(runtime, { nonInteractive: true }); await doctorCommand(runtime, { nonInteractive: true });
expect(writeConfigFile).toHaveBeenCalledTimes(1); expect(writeConfigFile).toHaveBeenCalledTimes(1);
const written = writeConfigFile.mock.calls[0]?.[0] as Record< const written = writeConfigFile.mock.calls[0]?.[0] as Record<
string, string,
unknown unknown
>; >;
expect((written.whatsapp as Record<string, unknown>)?.allowFrom).toEqual([ expect((written.whatsapp as Record<string, unknown>)?.allowFrom).toEqual([
"+15555550123", "+15555550123",
]); ]);
expect(written.routing).toBeUndefined(); expect(written.routing).toBeUndefined();
}, },
); );
it("migrates legacy Clawdis services", async () => { it("migrates legacy Clawdis services", async () => {
readConfigFileSnapshot.mockResolvedValue({ readConfigFileSnapshot.mockResolvedValue({
path: "/tmp/clawdbot.json", path: "/tmp/clawdbot.json",
exists: true, exists: true,
raw: "{}", raw: "{}",
parsed: {}, parsed: {},

View File

@@ -3,10 +3,10 @@ import net from "node:net";
import os from "node:os"; import os from "node:os";
import path from "node:path"; import path from "node:path";
import { afterAll, beforeAll, describe, expect, it } from "vitest"; import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
import { approveNodePairing, listNodePairing } from "../node-pairing.js"; import { approveNodePairing, listNodePairing } from "../node-pairing.js";
import { startNodeBridgeServer } from "./server.js"; import { configureNodeBridgeSocket, startNodeBridgeServer } from "./server.js";
function createLineReader(socket: net.Socket) { function createLineReader(socket: net.Socket) {
let buffer = ""; let buffer = "";
@@ -70,6 +70,16 @@ describe("node bridge server", () => {
delete process.env.CLAWDBOT_ENABLE_BRIDGE_IN_TESTS; delete process.env.CLAWDBOT_ENABLE_BRIDGE_IN_TESTS;
}); });
it("enables keepalive on sockets", () => {
const socket = {
setNoDelay: vi.fn(),
setKeepAlive: vi.fn(),
};
configureNodeBridgeSocket(socket);
expect(socket.setNoDelay).toHaveBeenCalledWith(true);
expect(socket.setKeepAlive).toHaveBeenCalledWith(true, 15_000);
});
it("rejects hello when not paired", async () => { it("rejects hello when not paired", async () => {
const server = await startNodeBridgeServer({ const server = await startNodeBridgeServer({
host: "127.0.0.1", host: "127.0.0.1",

View File

@@ -160,6 +160,14 @@ function isTestEnv() {
return process.env.NODE_ENV === "test" || Boolean(process.env.VITEST); return process.env.NODE_ENV === "test" || Boolean(process.env.VITEST);
} }
export function configureNodeBridgeSocket(socket: {
setNoDelay: (noDelay?: boolean) => void;
setKeepAlive: (enable?: boolean, initialDelay?: number) => void;
}) {
socket.setNoDelay(true);
socket.setKeepAlive(true, 15_000);
}
function encodeLine(frame: AnyBridgeFrame) { function encodeLine(frame: AnyBridgeFrame) {
return `${JSON.stringify(frame)}\n`; return `${JSON.stringify(frame)}\n`;
} }
@@ -228,7 +236,7 @@ export async function startNodeBridgeServer(
const loopbackHost = "127.0.0.1"; const loopbackHost = "127.0.0.1";
const onConnection = (socket: net.Socket) => { const onConnection = (socket: net.Socket) => {
socket.setNoDelay(true); configureNodeBridgeSocket(socket);
let buffer = ""; let buffer = "";
let isAuthenticated = false; let isAuthenticated = false;