From cd84c5ad0886c1c335cec3402640f25f5c0da511 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 12 Dec 2025 16:52:02 +0000 Subject: [PATCH] fix(macos): prevent gateway request double-resume --- .../Sources/Clawdis/GatewayChannel.swift | 5 +- .../GatewayChannelRequestTests.swift | 111 ++++++++++++++++++ 2 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 apps/macos/Tests/ClawdisIPCTests/GatewayChannelRequestTests.swift diff --git a/apps/macos/Sources/Clawdis/GatewayChannel.swift b/apps/macos/Sources/Clawdis/GatewayChannel.swift index ee7c6bd79..6ef99c4c8 100644 --- a/apps/macos/Sources/Clawdis/GatewayChannel.swift +++ b/apps/macos/Sources/Clawdis/GatewayChannel.swift @@ -355,7 +355,8 @@ actor GatewayChannelActor { do { try await self.task?.send(.data(data)) } catch { - self.pending.removeValue(forKey: id) + let wrapped = self.wrap(error, context: "gateway send \(method)") + let waiter = self.pending.removeValue(forKey: id) // Treat send failures as a broken socket: mark disconnected and trigger reconnect. self.connected = false self.task?.cancel(with: .goingAway, reason: nil) @@ -363,7 +364,7 @@ actor GatewayChannelActor { guard let self else { return } await self.scheduleReconnect() } - cont.resume(throwing: self.wrap(error, context: "gateway send \(method)")) + if let waiter { waiter.resume(throwing: wrapped) } } } } diff --git a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelRequestTests.swift b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelRequestTests.swift new file mode 100644 index 000000000..f053d7da8 --- /dev/null +++ b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelRequestTests.swift @@ -0,0 +1,111 @@ +import Foundation +import os +import Testing +@testable import Clawdis + +@Suite struct GatewayChannelRequestTests { + private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable { + private let requestSendDelayMs: Int + private let pendingReceiveHandler = + OSAllocatedUnfairLock<(@Sendable (Result) -> Void)?>(initialState: nil) + private let sendCount = OSAllocatedUnfairLock(initialState: 0) + + var state: URLSessionTask.State = .suspended + + init(requestSendDelayMs: Int) { + self.requestSendDelayMs = requestSendDelayMs + } + + func resume() { + self.state = .running + } + + func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + _ = (closeCode, reason) + self.state = .canceling + let handler = self.pendingReceiveHandler.withLock { handler in + defer { handler = nil } + return handler + } + handler?(Result.failure(URLError(.cancelled))) + } + + func send(_ message: URLSessionWebSocketTask.Message) async throws { + _ = message + let currentSendCount = self.sendCount.withLock { count in + defer { count += 1 } + return count + } + + // First send is the hello frame. Second send is the request frame. + if currentSendCount == 1 { + try await Task.sleep(nanoseconds: UInt64(self.requestSendDelayMs) * 1_000_000) + throw URLError(.cannotConnectToHost) + } + } + + func receive() async throws -> URLSessionWebSocketTask.Message { + .data(Self.helloOkData()) + } + + func receive( + completionHandler: @escaping @Sendable (Result) -> Void) + { + self.pendingReceiveHandler.withLock { $0 = completionHandler } + } + + private static func helloOkData() -> Data { + let json = """ + { + "type": "hello-ok", + "protocol": 1, + "server": { "version": "test", "connId": "test" }, + "features": { "methods": [], "events": [] }, + "snapshot": { + "presence": [ { "ts": 1 } ], + "health": {}, + "stateVersion": { "presence": 0, "health": 0 }, + "uptimeMs": 0 + }, + "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + } + """ + return Data(json.utf8) + } + } + + private final class FakeWebSocketSession: WebSocketSessioning, @unchecked Sendable { + private let requestSendDelayMs: Int + + init(requestSendDelayMs: Int) { + self.requestSendDelayMs = requestSendDelayMs + } + + func makeWebSocketTask(url: URL) -> WebSocketTaskBox { + _ = url + let task = FakeWebSocketTask(requestSendDelayMs: self.requestSendDelayMs) + return WebSocketTaskBox(task: task) + } + } + + @Test func requestTimeoutThenSendFailureDoesNotDoubleResume() async { + let session = FakeWebSocketSession(requestSendDelayMs: 100) + let channel = GatewayChannelActor( + url: URL(string: "ws://example.invalid")!, + token: nil, + session: session) + + do { + _ = try await channel.request(method: "test", params: nil, timeoutMs: 10) + Issue.record("Expected request to time out") + } catch { + let ns = error as NSError + #expect(ns.domain == "Gateway") + #expect(ns.code == 5) + } + + // Give the delayed send failure task time to run; this used to crash due to a double-resume. + try? await Task.sleep(nanoseconds: 250 * 1_000_000) + } +} +