diff --git a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift index 6bb9db67b..2cc26a51d 100644 --- a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift +++ b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift @@ -23,6 +23,35 @@ public actor GatewayNodeSession { private var onConnected: (@Sendable () async -> Void)? private var onDisconnected: (@Sendable (String) async -> Void)? private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)? + + static func invokeWithTimeout( + request: BridgeInvokeRequest, + timeoutMs: Int?, + onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse + ) async -> BridgeInvokeResponse { + let timeout = max(0, timeoutMs ?? 0) + guard timeout > 0 else { + return await onInvoke(request) + } + + return await withTaskGroup(of: BridgeInvokeResponse.self) { group in + group.addTask { await onInvoke(request) } + group.addTask { + try? await Task.sleep(nanoseconds: UInt64(timeout) * 1_000_000) + return BridgeInvokeResponse( + id: request.id, + ok: false, + error: ClawdbotNodeError( + code: .unavailable, + message: "node invoke timed out") + ) + } + + let first = await group.next()! + group.cancelAll() + return first + } + } private var serverEventSubscribers: [UUID: AsyncStream.Continuation] = [:] private var canvasHostUrl: String? @@ -167,7 +196,11 @@ public actor GatewayNodeSession { let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data) guard let onInvoke else { return } let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON) - let response = await onInvoke(req) + let response = await Self.invokeWithTimeout( + request: req, + timeoutMs: request.timeoutMs, + onInvoke: onInvoke + ) await self.sendInvokeResult(request: request, response: response) } catch { self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)") diff --git a/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayNodeSessionTests.swift b/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayNodeSessionTests.swift new file mode 100644 index 000000000..0fc688f63 --- /dev/null +++ b/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayNodeSessionTests.swift @@ -0,0 +1,56 @@ +import Foundation +import Testing +@testable import ClawdbotKit +import ClawdbotProtocol + +struct GatewayNodeSessionTests { + @Test + func invokeWithTimeoutReturnsUnderlyingResponseBeforeTimeout() async { + let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil) + let response = await GatewayNodeSession.invokeWithTimeout( + request: request, + timeoutMs: 50, + onInvoke: { req in + #expect(req.id == "1") + return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: "{}", error: nil) + } + ) + + #expect(response.ok == true) + #expect(response.error == nil) + #expect(response.payloadJSON == "{}") + } + + @Test + func invokeWithTimeoutReturnsTimeoutError() async { + let request = BridgeInvokeRequest(id: "abc", command: "x", paramsJSON: nil) + let response = await GatewayNodeSession.invokeWithTimeout( + request: request, + timeoutMs: 10, + onInvoke: { _ in + try? await Task.sleep(nanoseconds: 200_000_000) // 200ms + return BridgeInvokeResponse(id: "abc", ok: true, payloadJSON: "{}", error: nil) + } + ) + + #expect(response.ok == false) + #expect(response.error?.code == .unavailable) + #expect(response.error?.message.contains("timed out") == true) + } + + @Test + func invokeWithTimeoutZeroDisablesTimeout() async { + let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil) + let response = await GatewayNodeSession.invokeWithTimeout( + request: request, + timeoutMs: 0, + onInvoke: { req in + try? await Task.sleep(nanoseconds: 5_000_000) + return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: nil, error: nil) + } + ) + + #expect(response.ok == true) + #expect(response.error == nil) + } +}