fix(node): enforce node.invoke timeout in node client
Use the timeout provided on node invoke requests to ensure node clients always respond with a result. This prevents gateway-side node.invoke calls from hanging until the gateway timeout when a node command stalls. Tests: - swift test --filter GatewayNodeSessionTests
This commit is contained in:
@@ -23,6 +23,35 @@ public actor GatewayNodeSession {
|
|||||||
private var onConnected: (@Sendable () async -> Void)?
|
private var onConnected: (@Sendable () async -> Void)?
|
||||||
private var onDisconnected: (@Sendable (String) async -> Void)?
|
private var onDisconnected: (@Sendable (String) async -> Void)?
|
||||||
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
|
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
|
||||||
|
|
||||||
|
static func invokeWithTimeout(
|
||||||
|
request: BridgeInvokeRequest,
|
||||||
|
timeoutMs: Int?,
|
||||||
|
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse
|
||||||
|
) async -> BridgeInvokeResponse {
|
||||||
|
let timeout = max(0, timeoutMs ?? 0)
|
||||||
|
guard timeout > 0 else {
|
||||||
|
return await onInvoke(request)
|
||||||
|
}
|
||||||
|
|
||||||
|
return await withTaskGroup(of: BridgeInvokeResponse.self) { group in
|
||||||
|
group.addTask { await onInvoke(request) }
|
||||||
|
group.addTask {
|
||||||
|
try? await Task.sleep(nanoseconds: UInt64(timeout) * 1_000_000)
|
||||||
|
return BridgeInvokeResponse(
|
||||||
|
id: request.id,
|
||||||
|
ok: false,
|
||||||
|
error: ClawdbotNodeError(
|
||||||
|
code: .unavailable,
|
||||||
|
message: "node invoke timed out")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
let first = await group.next()!
|
||||||
|
group.cancelAll()
|
||||||
|
return first
|
||||||
|
}
|
||||||
|
}
|
||||||
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
|
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
|
||||||
private var canvasHostUrl: String?
|
private var canvasHostUrl: String?
|
||||||
|
|
||||||
@@ -167,7 +196,11 @@ public actor GatewayNodeSession {
|
|||||||
let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
|
let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
|
||||||
guard let onInvoke else { return }
|
guard let onInvoke else { return }
|
||||||
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
|
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
|
||||||
let response = await onInvoke(req)
|
let response = await Self.invokeWithTimeout(
|
||||||
|
request: req,
|
||||||
|
timeoutMs: request.timeoutMs,
|
||||||
|
onInvoke: onInvoke
|
||||||
|
)
|
||||||
await self.sendInvokeResult(request: request, response: response)
|
await self.sendInvokeResult(request: request, response: response)
|
||||||
} catch {
|
} catch {
|
||||||
self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)")
|
self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)")
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
import Foundation
|
||||||
|
import Testing
|
||||||
|
@testable import ClawdbotKit
|
||||||
|
import ClawdbotProtocol
|
||||||
|
|
||||||
|
struct GatewayNodeSessionTests {
|
||||||
|
@Test
|
||||||
|
func invokeWithTimeoutReturnsUnderlyingResponseBeforeTimeout() async {
|
||||||
|
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
|
||||||
|
let response = await GatewayNodeSession.invokeWithTimeout(
|
||||||
|
request: request,
|
||||||
|
timeoutMs: 50,
|
||||||
|
onInvoke: { req in
|
||||||
|
#expect(req.id == "1")
|
||||||
|
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: "{}", error: nil)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
#expect(response.ok == true)
|
||||||
|
#expect(response.error == nil)
|
||||||
|
#expect(response.payloadJSON == "{}")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
func invokeWithTimeoutReturnsTimeoutError() async {
|
||||||
|
let request = BridgeInvokeRequest(id: "abc", command: "x", paramsJSON: nil)
|
||||||
|
let response = await GatewayNodeSession.invokeWithTimeout(
|
||||||
|
request: request,
|
||||||
|
timeoutMs: 10,
|
||||||
|
onInvoke: { _ in
|
||||||
|
try? await Task.sleep(nanoseconds: 200_000_000) // 200ms
|
||||||
|
return BridgeInvokeResponse(id: "abc", ok: true, payloadJSON: "{}", error: nil)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
#expect(response.ok == false)
|
||||||
|
#expect(response.error?.code == .unavailable)
|
||||||
|
#expect(response.error?.message.contains("timed out") == true)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
func invokeWithTimeoutZeroDisablesTimeout() async {
|
||||||
|
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
|
||||||
|
let response = await GatewayNodeSession.invokeWithTimeout(
|
||||||
|
request: request,
|
||||||
|
timeoutMs: 0,
|
||||||
|
onInvoke: { req in
|
||||||
|
try? await Task.sleep(nanoseconds: 5_000_000)
|
||||||
|
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: nil, error: nil)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
#expect(response.ok == true)
|
||||||
|
#expect(response.error == nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user