Merge pull request #572 from ngutman/fix/mac-node-bridge-ping
fix(macos): add node bridge ping loop (AI-assisted)
This commit is contained in:
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
|
- macOS: add node bridge heartbeat pings to detect half-open sockets and reconnect cleanly. (#572) — thanks @ngutman
|
||||||
- CLI: add `sandbox list` and `sandbox recreate` commands for managing Docker sandbox containers after image/config updates. (#563) — thanks @pasogott
|
- CLI: add `sandbox list` and `sandbox recreate` commands for managing Docker sandbox containers after image/config updates. (#563) — thanks @pasogott
|
||||||
- Providers: add Microsoft Teams provider with polling, attachments, and CLI send support. (#404) — thanks @onutc
|
- Providers: add Microsoft Teams provider with polling, attachments, and CLI send support. (#404) — thanks @onutc
|
||||||
- Commands: accept /models as an alias for /model.
|
- Commands: accept /models as an alias for /model.
|
||||||
|
|||||||
@@ -23,6 +23,9 @@ actor MacNodeBridgeSession {
|
|||||||
private var buffer = Data()
|
private var buffer = Data()
|
||||||
private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:]
|
private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:]
|
||||||
private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:]
|
private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:]
|
||||||
|
private var pingTask: Task<Void, Never>?
|
||||||
|
private var lastPongAt: Date?
|
||||||
|
private var lastPingId: String?
|
||||||
|
|
||||||
private(set) var state: State = .idle
|
private(set) var state: State = .idle
|
||||||
|
|
||||||
@@ -77,6 +80,7 @@ actor MacNodeBridgeSession {
|
|||||||
if base.type == "hello-ok" {
|
if base.type == "hello-ok" {
|
||||||
let ok = try self.decoder.decode(BridgeHelloOk.self, from: data)
|
let ok = try self.decoder.decode(BridgeHelloOk.self, from: data)
|
||||||
self.state = .connected(serverName: ok.serverName)
|
self.state = .connected(serverName: ok.serverName)
|
||||||
|
self.startPingLoop()
|
||||||
await onConnected?(ok.serverName)
|
await onConnected?(ok.serverName)
|
||||||
} else if base.type == "error" {
|
} else if base.type == "error" {
|
||||||
let err = try self.decoder.decode(BridgeErrorFrame.self, from: data)
|
let err = try self.decoder.decode(BridgeErrorFrame.self, from: data)
|
||||||
@@ -113,6 +117,10 @@ actor MacNodeBridgeSession {
|
|||||||
let ping = try self.decoder.decode(BridgePing.self, from: nextData)
|
let ping = try self.decoder.decode(BridgePing.self, from: nextData)
|
||||||
try await self.send(BridgePong(type: "pong", id: ping.id))
|
try await self.send(BridgePong(type: "pong", id: ping.id))
|
||||||
|
|
||||||
|
case "pong":
|
||||||
|
let pong = try self.decoder.decode(BridgePong.self, from: nextData)
|
||||||
|
self.notePong(pong)
|
||||||
|
|
||||||
case "invoke":
|
case "invoke":
|
||||||
let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData)
|
let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData)
|
||||||
let res = await onInvoke(req)
|
let res = await onInvoke(req)
|
||||||
@@ -182,6 +190,11 @@ actor MacNodeBridgeSession {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func disconnect() async {
|
func disconnect() async {
|
||||||
|
self.pingTask?.cancel()
|
||||||
|
self.pingTask = nil
|
||||||
|
self.lastPongAt = nil
|
||||||
|
self.lastPingId = nil
|
||||||
|
|
||||||
self.connection?.cancel()
|
self.connection?.cancel()
|
||||||
self.connection = nil
|
self.connection = nil
|
||||||
self.queue = nil
|
self.queue = nil
|
||||||
@@ -239,12 +252,17 @@ actor MacNodeBridgeSession {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private func send(_ obj: some Encodable) async throws {
|
private func send(_ obj: some Encodable) async throws {
|
||||||
|
guard let connection = self.connection else {
|
||||||
|
throw NSError(domain: "Bridge", code: 15, userInfo: [
|
||||||
|
NSLocalizedDescriptionKey: "not connected",
|
||||||
|
])
|
||||||
|
}
|
||||||
let data = try self.encoder.encode(obj)
|
let data = try self.encoder.encode(obj)
|
||||||
var line = Data()
|
var line = Data()
|
||||||
line.append(data)
|
line.append(data)
|
||||||
line.append(0x0A)
|
line.append(0x0A)
|
||||||
try await withCheckedThrowingContinuation(isolation: self) { (cont: CheckedContinuation<Void, Error>) in
|
try await withCheckedThrowingContinuation(isolation: self) { (cont: CheckedContinuation<Void, Error>) in
|
||||||
self.connection?.send(content: line, completion: .contentProcessed { err in
|
connection.send(content: line, completion: .contentProcessed { err in
|
||||||
if let err { cont.resume(throwing: err) } else { cont.resume(returning: ()) }
|
if let err { cont.resume(throwing: err) } else { cont.resume(returning: ()) }
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -280,6 +298,51 @@ actor MacNodeBridgeSession {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func startPingLoop() {
|
||||||
|
self.pingTask?.cancel()
|
||||||
|
self.lastPongAt = Date()
|
||||||
|
self.pingTask = Task { [weak self] in
|
||||||
|
guard let self else { return }
|
||||||
|
await self.runPingLoop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func runPingLoop() async {
|
||||||
|
let intervalSeconds = 15.0
|
||||||
|
let timeoutSeconds = 45.0
|
||||||
|
|
||||||
|
while !Task.isCancelled {
|
||||||
|
do {
|
||||||
|
try await Task.sleep(nanoseconds: UInt64(intervalSeconds * 1_000_000_000))
|
||||||
|
} catch {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
guard self.connection != nil else { return }
|
||||||
|
|
||||||
|
if let last = self.lastPongAt,
|
||||||
|
Date().timeIntervalSince(last) > timeoutSeconds
|
||||||
|
{
|
||||||
|
await self.disconnect()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
let id = UUID().uuidString
|
||||||
|
self.lastPingId = id
|
||||||
|
do {
|
||||||
|
try await self.send(BridgePing(type: "ping", id: id))
|
||||||
|
} catch {
|
||||||
|
await self.disconnect()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func notePong(_ pong: BridgePong) {
|
||||||
|
_ = pong
|
||||||
|
self.lastPongAt = Date()
|
||||||
|
}
|
||||||
|
|
||||||
private static func makeStateStream(
|
private static func makeStateStream(
|
||||||
for connection: NWConnection) -> AsyncStream<NWConnection.State>
|
for connection: NWConnection) -> AsyncStream<NWConnection.State>
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
import Testing
|
||||||
|
@testable import Clawdbot
|
||||||
|
|
||||||
|
@Suite
|
||||||
|
struct MacNodeBridgeSessionTests {
|
||||||
|
@Test func sendEventThrowsWhenNotConnected() async {
|
||||||
|
let session = MacNodeBridgeSession()
|
||||||
|
|
||||||
|
do {
|
||||||
|
try await session.sendEvent(event: "test", payloadJSON: "{}")
|
||||||
|
Issue.record("Expected sendEvent to throw when disconnected")
|
||||||
|
} catch {
|
||||||
|
let ns = error as NSError
|
||||||
|
#expect(ns.domain == "Bridge")
|
||||||
|
#expect(ns.code == 15)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@@ -179,14 +179,23 @@ async function ensureDevGatewayConfig(opts: { reset?: boolean }) {
|
|||||||
mode: "local",
|
mode: "local",
|
||||||
bind: "loopback",
|
bind: "loopback",
|
||||||
},
|
},
|
||||||
agent: {
|
agents: {
|
||||||
workspace,
|
defaults: {
|
||||||
skipBootstrap: true,
|
workspace,
|
||||||
},
|
skipBootstrap: true,
|
||||||
identity: {
|
},
|
||||||
name: DEV_IDENTITY_NAME,
|
list: [
|
||||||
theme: DEV_IDENTITY_THEME,
|
{
|
||||||
emoji: DEV_IDENTITY_EMOJI,
|
id: "dev",
|
||||||
|
default: true,
|
||||||
|
workspace,
|
||||||
|
identity: {
|
||||||
|
name: DEV_IDENTITY_NAME,
|
||||||
|
theme: DEV_IDENTITY_THEME,
|
||||||
|
emoji: DEV_IDENTITY_EMOJI,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
await ensureDevWorkspace(workspace);
|
await ensureDevWorkspace(workspace);
|
||||||
|
|||||||
@@ -278,7 +278,7 @@ describe("doctor", () => {
|
|||||||
changes: ["Moved routing.allowFrom → whatsapp.allowFrom."],
|
changes: ["Moved routing.allowFrom → whatsapp.allowFrom."],
|
||||||
});
|
});
|
||||||
|
|
||||||
await doctorCommand(runtime);
|
await doctorCommand(runtime, { nonInteractive: true });
|
||||||
|
|
||||||
expect(writeConfigFile).toHaveBeenCalledTimes(1);
|
expect(writeConfigFile).toHaveBeenCalledTimes(1);
|
||||||
const written = writeConfigFile.mock.calls[0]?.[0] as Record<
|
const written = writeConfigFile.mock.calls[0]?.[0] as Record<
|
||||||
|
|||||||
@@ -114,10 +114,13 @@ export async function doctorCommand(
|
|||||||
.join("\n"),
|
.join("\n"),
|
||||||
"Legacy config keys detected",
|
"Legacy config keys detected",
|
||||||
);
|
);
|
||||||
const migrate = await prompter.confirm({
|
const migrate =
|
||||||
message: "Migrate legacy config entries now?",
|
options.nonInteractive === true
|
||||||
initialValue: true,
|
? true
|
||||||
});
|
: await prompter.confirm({
|
||||||
|
message: "Migrate legacy config entries now?",
|
||||||
|
initialValue: true,
|
||||||
|
});
|
||||||
if (migrate) {
|
if (migrate) {
|
||||||
// Legacy migration (2026-01-02, commit: 16420e5b) — normalize per-provider allowlists; move WhatsApp gating into whatsapp.allowFrom.
|
// Legacy migration (2026-01-02, commit: 16420e5b) — normalize per-provider allowlists; move WhatsApp gating into whatsapp.allowFrom.
|
||||||
const { config: migrated, changes } = migrateLegacyConfig(
|
const { config: migrated, changes } = migrateLegacyConfig(
|
||||||
|
|||||||
@@ -170,7 +170,7 @@ export function applyMinimaxHostedProviderConfig(
|
|||||||
cfg: ClawdbotConfig,
|
cfg: ClawdbotConfig,
|
||||||
params?: { baseUrl?: string },
|
params?: { baseUrl?: string },
|
||||||
): ClawdbotConfig {
|
): ClawdbotConfig {
|
||||||
const models = { ...cfg.agent?.models };
|
const models = { ...cfg.agents?.defaults?.models };
|
||||||
models[MINIMAX_HOSTED_MODEL_REF] = {
|
models[MINIMAX_HOSTED_MODEL_REF] = {
|
||||||
...models[MINIMAX_HOSTED_MODEL_REF],
|
...models[MINIMAX_HOSTED_MODEL_REF],
|
||||||
alias: models[MINIMAX_HOSTED_MODEL_REF]?.alias ?? "Minimax",
|
alias: models[MINIMAX_HOSTED_MODEL_REF]?.alias ?? "Minimax",
|
||||||
@@ -206,9 +206,12 @@ export function applyMinimaxHostedProviderConfig(
|
|||||||
|
|
||||||
return {
|
return {
|
||||||
...cfg,
|
...cfg,
|
||||||
agent: {
|
agents: {
|
||||||
...cfg.agent,
|
...cfg.agents,
|
||||||
models,
|
defaults: {
|
||||||
|
...cfg.agents?.defaults,
|
||||||
|
models,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
models: {
|
models: {
|
||||||
mode: cfg.models?.mode ?? "merge",
|
mode: cfg.models?.mode ?? "merge",
|
||||||
@@ -248,17 +251,14 @@ export function applyMinimaxHostedConfig(
|
|||||||
const next = applyMinimaxHostedProviderConfig(cfg, params);
|
const next = applyMinimaxHostedProviderConfig(cfg, params);
|
||||||
return {
|
return {
|
||||||
...next,
|
...next,
|
||||||
agent: {
|
agents: {
|
||||||
...next.agent,
|
...next.agents,
|
||||||
model: {
|
defaults: {
|
||||||
...(next.agent?.model &&
|
...next.agents?.defaults,
|
||||||
"fallbacks" in (next.agent.model as Record<string, unknown>)
|
model: {
|
||||||
? {
|
...(next.agents?.defaults?.model ?? {}),
|
||||||
fallbacks: (next.agent.model as { fallbacks?: string[] })
|
primary: MINIMAX_HOSTED_MODEL_REF,
|
||||||
.fallbacks,
|
},
|
||||||
}
|
|
||||||
: undefined),
|
|
||||||
primary: MINIMAX_HOSTED_MODEL_REF,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1211,6 +1211,7 @@ export type AgentDefaultsConfig = {
|
|||||||
| "slack"
|
| "slack"
|
||||||
| "signal"
|
| "signal"
|
||||||
| "imessage"
|
| "imessage"
|
||||||
|
| "msteams"
|
||||||
| "none";
|
| "none";
|
||||||
/** Optional delivery override (E.164 for WhatsApp, chat id for Telegram). */
|
/** Optional delivery override (E.164 for WhatsApp, chat id for Telegram). */
|
||||||
to?: string;
|
to?: string;
|
||||||
|
|||||||
@@ -56,8 +56,9 @@ export async function monitorMSTeamsProvider(
|
|||||||
const textLimit = resolveTextChunkLimit(cfg, "msteams");
|
const textLimit = resolveTextChunkLimit(cfg, "msteams");
|
||||||
const MB = 1024 * 1024;
|
const MB = 1024 * 1024;
|
||||||
const mediaMaxBytes =
|
const mediaMaxBytes =
|
||||||
typeof cfg.agent?.mediaMaxMb === "number" && cfg.agent.mediaMaxMb > 0
|
typeof cfg.agents?.defaults?.mediaMaxMb === "number" &&
|
||||||
? Math.floor(cfg.agent.mediaMaxMb * MB)
|
cfg.agents.defaults.mediaMaxMb > 0
|
||||||
|
? Math.floor(cfg.agents.defaults.mediaMaxMb * MB)
|
||||||
: 8 * MB;
|
: 8 * MB;
|
||||||
const conversationStore =
|
const conversationStore =
|
||||||
opts.conversationStore ?? createMSTeamsConversationStoreFs();
|
opts.conversationStore ?? createMSTeamsConversationStoreFs();
|
||||||
|
|||||||
Reference in New Issue
Block a user