fix: preserve gateway presence instanceId

This commit is contained in:
Peter Steinberger
2026-01-20 09:51:20 +00:00
parent 2439c31844
commit c440cc2f84
6 changed files with 81 additions and 31 deletions

View File

@@ -220,4 +220,36 @@ describe("subscribeEmbeddedPiSession", () => {
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe("MEDIA:");
});
it("emits agent events when media arrives without text", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onAgentEvent = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
onAgentEvent,
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "MEDIA: https://example.com/a.png" },
});
const payloads = onAgentEvent.mock.calls
.map((call) => call[0]?.data as Record<string, unknown> | undefined)
.filter((value): value is Record<string, unknown> => Boolean(value));
expect(payloads).toHaveLength(1);
expect(payloads[0]?.text).toBe("");
expect(payloads[0]?.mediaUrls).toEqual(["https://example.com/a.png"]);
});
});

View File

@@ -156,25 +156,26 @@ export class GatewayClient {
const signedAtMs = Date.now();
const role = this.opts.role ?? "operator";
const scopes = this.opts.scopes ?? ["operator.admin"];
const device = (() => {
if (!this.opts.deviceIdentity) return undefined;
const payload = buildDeviceAuthPayload({
deviceId: this.opts.deviceIdentity.deviceId,
clientId: this.opts.clientName ?? GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT,
clientMode: this.opts.mode ?? GATEWAY_CLIENT_MODES.BACKEND,
role,
scopes,
signedAtMs,
token: this.opts.token ?? null,
});
const signature = signDevicePayload(this.opts.deviceIdentity.privateKeyPem, payload);
return {
id: this.opts.deviceIdentity.deviceId,
publicKey: publicKeyRawBase64UrlFromPem(this.opts.deviceIdentity.publicKeyPem),
signature,
signedAt: signedAtMs,
};
})();
const identity = this.opts.deviceIdentity;
if (!identity) {
throw new Error("gateway client requires a device identity");
}
const payload = buildDeviceAuthPayload({
deviceId: identity.deviceId,
clientId: this.opts.clientName ?? GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT,
clientMode: this.opts.mode ?? GATEWAY_CLIENT_MODES.BACKEND,
role,
scopes,
signedAtMs,
token: this.opts.token ?? null,
});
const signature = signDevicePayload(identity.privateKeyPem, payload);
const device = {
id: identity.deviceId,
publicKey: publicKeyRawBase64UrlFromPem(identity.publicKeyPem),
signature,
signedAt: signedAtMs,
};
const params: ConnectParams = {
minProtocol: this.opts.minProtocol ?? PROTOCOL_VERSION,
maxProtocol: this.opts.maxProtocol ?? PROTOCOL_VERSION,

View File

@@ -141,7 +141,6 @@ import {
type SkillsBinsParams,
SkillsBinsParamsSchema,
type SkillsBinsResult,
SkillsBinsResultSchema,
type SkillsInstallParams,
SkillsInstallParamsSchema,
type SkillsStatusParams,

View File

@@ -57,10 +57,11 @@ describe("gateway node command allowlist", () => {
await connectOk(nodeWs, {
role: "node",
client: {
id: "node-empty",
id: GATEWAY_CLIENT_NAMES.NODE_HOST,
version: "1.0.0",
platform: "ios",
mode: GATEWAY_CLIENT_MODES.NODE,
instanceId: "node-empty",
},
commands: [],
});
@@ -92,10 +93,11 @@ describe("gateway node command allowlist", () => {
await connectOk(nodeWs, {
role: "node",
client: {
id: "node-allowed",
id: GATEWAY_CLIENT_NAMES.NODE_HOST,
version: "1.0.0",
platform: "ios",
mode: GATEWAY_CLIENT_MODES.NODE,
instanceId: "node-allowed",
},
commands: ["canvas.snapshot"],
});

View File

@@ -386,7 +386,7 @@ export function attachGatewayWsMessageHandler(params: {
}
if (device && devicePublicKey) {
const requirePairing = async (reason: string, paired?: { deviceId: string }) => {
const requirePairing = async (reason: string, _paired?: { deviceId: string }) => {
const pairing = await requestDevicePairing({
deviceId: device.id,
publicKey: devicePublicKey,
@@ -444,13 +444,9 @@ export function attachGatewayWsMessageHandler(params: {
const ok = await requirePairing("not-paired");
if (!ok) return;
} else {
const allowedRoles = new Set(
Array.isArray(paired.roles)
? paired.roles
: paired.role
? [paired.role]
: [],
);
const allowedRoles = new Set(
Array.isArray(paired.roles) ? paired.roles : paired.role ? [paired.role] : [],
);
if (allowedRoles.size === 0) {
const ok = await requirePairing("role-upgrade", paired);
if (!ok) return;
@@ -532,7 +528,7 @@ export function attachGatewayWsMessageHandler(params: {
deviceFamily: connectParams.client.deviceFamily,
modelIdentifier: connectParams.client.modelIdentifier,
mode: connectParams.client.mode,
instanceId: connectParams.device?.id ?? instanceId,
instanceId: instanceId ?? connectParams.device?.id,
reason: "connect",
});
incrementPresenceVersion();

View File

@@ -35,4 +35,24 @@ describe("agent-events sequencing", () => {
expect(seen["run-1"]).toEqual([1, 2, 3]);
expect(seen["run-2"]).toEqual([1]);
});
test("preserves compaction ordering on the event bus", async () => {
const phases: Array<string> = [];
const stop = onAgentEvent((evt) => {
if (evt.runId !== "run-1") return;
if (evt.stream !== "compaction") return;
if (typeof evt.data?.phase === "string") phases.push(evt.data.phase);
});
emitAgentEvent({ runId: "run-1", stream: "compaction", data: { phase: "start" } });
emitAgentEvent({
runId: "run-1",
stream: "compaction",
data: { phase: "end", willRetry: false },
});
stop();
expect(phases).toEqual(["start", "end"]);
});
});