From c440cc2f8450f57182847a5c572d3049b5d32a5e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 20 Jan 2026 09:51:20 +0000 Subject: [PATCH] fix: preserve gateway presence instanceId --- ...session.subscribeembeddedpisession.test.ts | 32 +++++++++++++++ src/gateway/client.ts | 39 ++++++++++--------- src/gateway/protocol/index.ts | 1 - src/gateway/server.nodes.allowlist.test.ts | 6 ++- .../server/ws-connection/message-handler.ts | 14 +++---- src/infra/agent-events.test.ts | 20 ++++++++++ 6 files changed, 81 insertions(+), 31 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index c142d1544..ec38734ac 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -220,4 +220,36 @@ describe("subscribeEmbeddedPiSession", () => { expect(payloads).toHaveLength(1); expect(payloads[0]?.text).toBe("MEDIA:"); }); + + it("emits agent events when media arrives without text", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onAgentEvent, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: "MEDIA: https://example.com/a.png" }, + }); + + const payloads = onAgentEvent.mock.calls + .map((call) => call[0]?.data as Record | undefined) + .filter((value): value is Record => Boolean(value)); + expect(payloads).toHaveLength(1); + expect(payloads[0]?.text).toBe(""); + expect(payloads[0]?.mediaUrls).toEqual(["https://example.com/a.png"]); + }); }); diff --git a/src/gateway/client.ts b/src/gateway/client.ts index 67232fd63..28b4805da 100644 --- a/src/gateway/client.ts +++ b/src/gateway/client.ts @@ -156,25 +156,26 @@ export class GatewayClient { const signedAtMs = Date.now(); const role = this.opts.role ?? "operator"; const scopes = this.opts.scopes ?? ["operator.admin"]; - const device = (() => { - if (!this.opts.deviceIdentity) return undefined; - const payload = buildDeviceAuthPayload({ - deviceId: this.opts.deviceIdentity.deviceId, - clientId: this.opts.clientName ?? GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT, - clientMode: this.opts.mode ?? GATEWAY_CLIENT_MODES.BACKEND, - role, - scopes, - signedAtMs, - token: this.opts.token ?? null, - }); - const signature = signDevicePayload(this.opts.deviceIdentity.privateKeyPem, payload); - return { - id: this.opts.deviceIdentity.deviceId, - publicKey: publicKeyRawBase64UrlFromPem(this.opts.deviceIdentity.publicKeyPem), - signature, - signedAt: signedAtMs, - }; - })(); + const identity = this.opts.deviceIdentity; + if (!identity) { + throw new Error("gateway client requires a device identity"); + } + const payload = buildDeviceAuthPayload({ + deviceId: identity.deviceId, + clientId: this.opts.clientName ?? GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT, + clientMode: this.opts.mode ?? GATEWAY_CLIENT_MODES.BACKEND, + role, + scopes, + signedAtMs, + token: this.opts.token ?? null, + }); + const signature = signDevicePayload(identity.privateKeyPem, payload); + const device = { + id: identity.deviceId, + publicKey: publicKeyRawBase64UrlFromPem(identity.publicKeyPem), + signature, + signedAt: signedAtMs, + }; const params: ConnectParams = { minProtocol: this.opts.minProtocol ?? PROTOCOL_VERSION, maxProtocol: this.opts.maxProtocol ?? PROTOCOL_VERSION, diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 02d91a85c..d3c62064e 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -141,7 +141,6 @@ import { type SkillsBinsParams, SkillsBinsParamsSchema, type SkillsBinsResult, - SkillsBinsResultSchema, type SkillsInstallParams, SkillsInstallParamsSchema, type SkillsStatusParams, diff --git a/src/gateway/server.nodes.allowlist.test.ts b/src/gateway/server.nodes.allowlist.test.ts index d2c800010..9cfecaa27 100644 --- a/src/gateway/server.nodes.allowlist.test.ts +++ b/src/gateway/server.nodes.allowlist.test.ts @@ -57,10 +57,11 @@ describe("gateway node command allowlist", () => { await connectOk(nodeWs, { role: "node", client: { - id: "node-empty", + id: GATEWAY_CLIENT_NAMES.NODE_HOST, version: "1.0.0", platform: "ios", mode: GATEWAY_CLIENT_MODES.NODE, + instanceId: "node-empty", }, commands: [], }); @@ -92,10 +93,11 @@ describe("gateway node command allowlist", () => { await connectOk(nodeWs, { role: "node", client: { - id: "node-allowed", + id: GATEWAY_CLIENT_NAMES.NODE_HOST, version: "1.0.0", platform: "ios", mode: GATEWAY_CLIENT_MODES.NODE, + instanceId: "node-allowed", }, commands: ["canvas.snapshot"], }); diff --git a/src/gateway/server/ws-connection/message-handler.ts b/src/gateway/server/ws-connection/message-handler.ts index 5a4aa1f51..1544eefa9 100644 --- a/src/gateway/server/ws-connection/message-handler.ts +++ b/src/gateway/server/ws-connection/message-handler.ts @@ -386,7 +386,7 @@ export function attachGatewayWsMessageHandler(params: { } if (device && devicePublicKey) { - const requirePairing = async (reason: string, paired?: { deviceId: string }) => { + const requirePairing = async (reason: string, _paired?: { deviceId: string }) => { const pairing = await requestDevicePairing({ deviceId: device.id, publicKey: devicePublicKey, @@ -444,13 +444,9 @@ export function attachGatewayWsMessageHandler(params: { const ok = await requirePairing("not-paired"); if (!ok) return; } else { - const allowedRoles = new Set( - Array.isArray(paired.roles) - ? paired.roles - : paired.role - ? [paired.role] - : [], - ); + const allowedRoles = new Set( + Array.isArray(paired.roles) ? paired.roles : paired.role ? [paired.role] : [], + ); if (allowedRoles.size === 0) { const ok = await requirePairing("role-upgrade", paired); if (!ok) return; @@ -532,7 +528,7 @@ export function attachGatewayWsMessageHandler(params: { deviceFamily: connectParams.client.deviceFamily, modelIdentifier: connectParams.client.modelIdentifier, mode: connectParams.client.mode, - instanceId: connectParams.device?.id ?? instanceId, + instanceId: instanceId ?? connectParams.device?.id, reason: "connect", }); incrementPresenceVersion(); diff --git a/src/infra/agent-events.test.ts b/src/infra/agent-events.test.ts index c4b666236..1ce051bdb 100644 --- a/src/infra/agent-events.test.ts +++ b/src/infra/agent-events.test.ts @@ -35,4 +35,24 @@ describe("agent-events sequencing", () => { expect(seen["run-1"]).toEqual([1, 2, 3]); expect(seen["run-2"]).toEqual([1]); }); + + test("preserves compaction ordering on the event bus", async () => { + const phases: Array = []; + const stop = onAgentEvent((evt) => { + if (evt.runId !== "run-1") return; + if (evt.stream !== "compaction") return; + if (typeof evt.data?.phase === "string") phases.push(evt.data.phase); + }); + + emitAgentEvent({ runId: "run-1", stream: "compaction", data: { phase: "start" } }); + emitAgentEvent({ + runId: "run-1", + stream: "compaction", + data: { phase: "end", willRetry: false }, + }); + + stop(); + + expect(phases).toEqual(["start", "end"]); + }); });