fix: harden mac bridge disconnect handling (#676) (thanks @ngutman)

This commit is contained in:
Peter Steinberger
2026-01-10 22:25:05 +01:00
parent 55d2608808
commit 66bc003126
3 changed files with 43 additions and 8 deletions

View File

@@ -12,9 +12,8 @@
- Gateway: add OpenAI-compatible `/v1/chat/completions` HTTP endpoint (auth, SSE streaming, per-agent routing). (#680) — thanks @steipete. - Gateway: add OpenAI-compatible `/v1/chat/completions` HTTP endpoint (auth, SSE streaming, per-agent routing). (#680) — thanks @steipete.
### Fixes ### Fixes
- Block Streaming: enable for all providers, not just Telegram. (#684) — thanks @rubyrunsstuff. - macOS: stabilize bridge tunnels, guard invoke senders on disconnect, and drain stdout/stderr to avoid deadlocks. (#676) — thanks @ngutman.
- Agents/System: clarify sandboxed runtime in system prompt and surface elevated availability when sandboxed. - Agents/System: clarify sandboxed runtime in system prompt and surface elevated availability when sandboxed.
- Agents/System: add reasoning visibility hint + /reasoning and /status guidance in system prompt.
- Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj. - Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj.
- WhatsApp: fix group reactions by preserving message IDs and sender JIDs in history; normalize participant phone numbers to JIDs in outbound reactions. (#640) — thanks @mcinteerj. - WhatsApp: fix group reactions by preserving message IDs and sender JIDs in history; normalize participant phone numbers to JIDs in outbound reactions. (#640) — thanks @mcinteerj.
- WhatsApp: expose group participant IDs to the model so reactions can target the right sender. - WhatsApp: expose group participant IDs to the model so reactions can target the right sender.

View File

@@ -27,6 +27,7 @@ actor MacNodeBridgeSession {
private var buffer = Data() private var buffer = Data()
private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:] private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:]
private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:] private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:]
private var invokeTasks: [UUID: Task<Void, Never>] = [:]
private var pingTask: Task<Void, Never>? private var pingTask: Task<Void, Never>?
private var lastPongAt: ContinuousClock.Instant? private var lastPongAt: ContinuousClock.Instant?
@@ -142,15 +143,13 @@ actor MacNodeBridgeSession {
case "invoke": case "invoke":
let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData) let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData)
Task.detached { [weak self] in let taskID = UUID()
let task = Task { [weak self] in
let res = await onInvoke(req) let res = await onInvoke(req)
guard let self else { return } guard let self else { return }
do { await self.sendInvokeResponse(res, taskID: taskID)
try await self.send(res)
} catch {
await self.logInvokeSendFailure(error)
}
} }
self.invokeTasks[taskID] = task
default: default:
continue continue
@@ -226,6 +225,7 @@ actor MacNodeBridgeSession {
self.pingTask = nil self.pingTask = nil
self.lastPongAt = nil self.lastPongAt = nil
self.disconnectHandler = nil self.disconnectHandler = nil
self.cancelInvokeTasks()
self.connection?.cancel() self.connection?.cancel()
self.connection = nil self.connection = nil
@@ -413,6 +413,23 @@ actor MacNodeBridgeSession {
"node bridge invoke response send failed: \(error.localizedDescription, privacy: .public)") "node bridge invoke response send failed: \(error.localizedDescription, privacy: .public)")
} }
private func sendInvokeResponse(_ response: BridgeInvokeResponse, taskID: UUID) async {
defer { self.invokeTasks[taskID] = nil }
if Task.isCancelled { return }
do {
try await self.send(response)
} catch {
await self.logInvokeSendFailure(error)
}
}
private func cancelInvokeTasks() {
for task in self.invokeTasks.values {
task.cancel()
}
self.invokeTasks.removeAll()
}
private static func makeStateStream( private static func makeStateStream(
for connection: NWConnection) -> AsyncStream<NWConnection.State> for connection: NWConnection) -> AsyncStream<NWConnection.State>
{ {

View File

@@ -57,6 +57,25 @@ struct LowCoverageHelperTests {
#expect(result.timedOut == true) #expect(result.timedOut == true)
} }
@Test func shellExecutorDrainsStdoutAndStderr() async {
let script = """
i=0
while [ $i -lt 2000 ]; do
echo "stdout-$i"
echo "stderr-$i" 1>&2
i=$((i+1))
done
"""
let result = await ShellExecutor.runDetailed(
command: ["/bin/sh", "-c", script],
cwd: nil,
env: nil,
timeout: 2)
#expect(result.success == true)
#expect(result.stdout.contains("stdout-1999"))
#expect(result.stderr.contains("stderr-1999"))
}
@Test func pairedNodesStorePersists() async throws { @Test func pairedNodesStorePersists() async throws {
let dir = FileManager.default.temporaryDirectory let dir = FileManager.default.temporaryDirectory
.appendingPathComponent("paired-\(UUID().uuidString)", isDirectory: true) .appendingPathComponent("paired-\(UUID().uuidString)", isDirectory: true)