Merge pull request #1357 from vignesh07/fix/node-invoke-timeout

fix(node): enforce timeout for node.invoke handlers
This commit is contained in:
Peter Steinberger
2026-01-21 05:49:36 +00:00
committed by GitHub
2 changed files with 90 additions and 1 deletions

View File

@@ -23,6 +23,35 @@ public actor GatewayNodeSession {
private var onConnected: (@Sendable () async -> Void)?
private var onDisconnected: (@Sendable (String) async -> Void)?
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
static func invokeWithTimeout(
request: BridgeInvokeRequest,
timeoutMs: Int?,
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse
) async -> BridgeInvokeResponse {
let timeout = max(0, timeoutMs ?? 0)
guard timeout > 0 else {
return await onInvoke(request)
}
return await withTaskGroup(of: BridgeInvokeResponse.self) { group in
group.addTask { await onInvoke(request) }
group.addTask {
try? await Task.sleep(nanoseconds: UInt64(timeout) * 1_000_000)
return BridgeInvokeResponse(
id: request.id,
ok: false,
error: ClawdbotNodeError(
code: .unavailable,
message: "node invoke timed out")
)
}
let first = await group.next()!
group.cancelAll()
return first
}
}
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
private var canvasHostUrl: String?
@@ -167,7 +196,11 @@ public actor GatewayNodeSession {
let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
guard let onInvoke else { return }
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
let response = await onInvoke(req)
let response = await Self.invokeWithTimeout(
request: req,
timeoutMs: request.timeoutMs,
onInvoke: onInvoke
)
await self.sendInvokeResult(request: request, response: response)
} catch {
self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)")

View File

@@ -0,0 +1,56 @@
import Foundation
import Testing
@testable import ClawdbotKit
import ClawdbotProtocol
struct GatewayNodeSessionTests {
@Test
func invokeWithTimeoutReturnsUnderlyingResponseBeforeTimeout() async {
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
let response = await GatewayNodeSession.invokeWithTimeout(
request: request,
timeoutMs: 50,
onInvoke: { req in
#expect(req.id == "1")
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: "{}", error: nil)
}
)
#expect(response.ok == true)
#expect(response.error == nil)
#expect(response.payloadJSON == "{}")
}
@Test
func invokeWithTimeoutReturnsTimeoutError() async {
let request = BridgeInvokeRequest(id: "abc", command: "x", paramsJSON: nil)
let response = await GatewayNodeSession.invokeWithTimeout(
request: request,
timeoutMs: 10,
onInvoke: { _ in
try? await Task.sleep(nanoseconds: 200_000_000) // 200ms
return BridgeInvokeResponse(id: "abc", ok: true, payloadJSON: "{}", error: nil)
}
)
#expect(response.ok == false)
#expect(response.error?.code == .unavailable)
#expect(response.error?.message.contains("timed out") == true)
}
@Test
func invokeWithTimeoutZeroDisablesTimeout() async {
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
let response = await GatewayNodeSession.invokeWithTimeout(
request: request,
timeoutMs: 0,
onInvoke: { req in
try? await Task.sleep(nanoseconds: 5_000_000)
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: nil, error: nil)
}
)
#expect(response.ok == true)
#expect(response.error == nil)
}
}