feat(discovery): bonjour beacons + bridge presence
This commit is contained in:
@@ -12,6 +12,28 @@ import { emitHeartbeatEvent } from "../infra/heartbeat-events.js";
|
||||
import { PROTOCOL_VERSION } from "./protocol/index.js";
|
||||
import { startGatewayServer } from "./server.js";
|
||||
|
||||
type BridgeClientInfo = {
|
||||
nodeId: string;
|
||||
displayName?: string;
|
||||
platform?: string;
|
||||
version?: string;
|
||||
remoteIp?: string;
|
||||
};
|
||||
|
||||
type BridgeStartOpts = {
|
||||
onAuthenticated?: (node: BridgeClientInfo) => Promise<void> | void;
|
||||
onDisconnected?: (node: BridgeClientInfo) => Promise<void> | void;
|
||||
onPairRequested?: (request: unknown) => Promise<void> | void;
|
||||
};
|
||||
|
||||
const bridgeStartCalls = vi.hoisted(() => [] as BridgeStartOpts[]);
|
||||
vi.mock("../infra/bridge/server.js", () => ({
|
||||
startNodeBridgeServer: vi.fn(async (opts: BridgeStartOpts) => {
|
||||
bridgeStartCalls.push(opts);
|
||||
return { port: 0, close: async () => {} };
|
||||
}),
|
||||
}));
|
||||
|
||||
let testSessionStorePath: string | undefined;
|
||||
let testAllowFrom: string[] | undefined;
|
||||
let testCronStorePath: string | undefined;
|
||||
@@ -324,6 +346,75 @@ describe("gateway server", () => {
|
||||
}
|
||||
});
|
||||
|
||||
test("emits presence updates for bridge connect/disconnect", async () => {
|
||||
const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-"));
|
||||
const prevHome = process.env.HOME;
|
||||
process.env.HOME = homeDir;
|
||||
try {
|
||||
const before = bridgeStartCalls.length;
|
||||
const { server, ws } = await startServerWithClient();
|
||||
try {
|
||||
await connectOk(ws);
|
||||
const bridgeCall = bridgeStartCalls[before];
|
||||
expect(bridgeCall).toBeTruthy();
|
||||
|
||||
const waitPresenceReason = async (reason: string) => {
|
||||
await onceMessage(
|
||||
ws,
|
||||
(o) => {
|
||||
if (o.type !== "event" || o.event !== "presence") return false;
|
||||
const payload = o.payload as { presence?: unknown } | null;
|
||||
const list = payload?.presence;
|
||||
if (!Array.isArray(list)) return false;
|
||||
return list.some(
|
||||
(p) =>
|
||||
typeof p === "object" &&
|
||||
p !== null &&
|
||||
(p as { instanceId?: unknown }).instanceId === "iris-1" &&
|
||||
(p as { reason?: unknown }).reason === reason,
|
||||
);
|
||||
},
|
||||
3000,
|
||||
);
|
||||
};
|
||||
|
||||
const presenceConnectedP = waitPresenceReason("iris-connected");
|
||||
await bridgeCall?.onAuthenticated?.({
|
||||
nodeId: "iris-1",
|
||||
displayName: "Iris",
|
||||
platform: "ios",
|
||||
version: "1.0",
|
||||
remoteIp: "10.0.0.10",
|
||||
});
|
||||
await presenceConnectedP;
|
||||
|
||||
const presenceDisconnectedP = waitPresenceReason("iris-disconnected");
|
||||
await bridgeCall?.onDisconnected?.({
|
||||
nodeId: "iris-1",
|
||||
displayName: "Iris",
|
||||
platform: "ios",
|
||||
version: "1.0",
|
||||
remoteIp: "10.0.0.10",
|
||||
});
|
||||
await presenceDisconnectedP;
|
||||
} finally {
|
||||
try {
|
||||
ws.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
await server.close();
|
||||
await fs.rm(homeDir, { recursive: true, force: true });
|
||||
}
|
||||
} finally {
|
||||
if (prevHome === undefined) {
|
||||
delete process.env.HOME;
|
||||
} else {
|
||||
process.env.HOME = prevHome;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
test("supports cron.add and cron.list", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-cron-"));
|
||||
testCronStorePath = path.join(dir, "cron.json");
|
||||
|
||||
@@ -666,7 +666,66 @@ export async function startGatewayServer(
|
||||
const started = await startNodeBridgeServer({
|
||||
host: bridgeHost,
|
||||
port: bridgePort,
|
||||
onAuthenticated: (node) => {
|
||||
const host = node.displayName?.trim() || node.nodeId;
|
||||
const ip = node.remoteIp?.trim();
|
||||
const version = node.version?.trim() || "unknown";
|
||||
const text = `Node: ${host}${ip ? ` (${ip})` : ""} · app ${version} · last input 0s ago · mode remote · reason iris-connected`;
|
||||
upsertPresence(node.nodeId, {
|
||||
host,
|
||||
ip,
|
||||
version,
|
||||
mode: "remote",
|
||||
reason: "iris-connected",
|
||||
lastInputSeconds: 0,
|
||||
instanceId: node.nodeId,
|
||||
text,
|
||||
});
|
||||
presenceVersion += 1;
|
||||
broadcast(
|
||||
"presence",
|
||||
{ presence: listSystemPresence() },
|
||||
{
|
||||
dropIfSlow: true,
|
||||
stateVersion: {
|
||||
presence: presenceVersion,
|
||||
health: healthVersion,
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
onDisconnected: (node) => {
|
||||
const host = node.displayName?.trim() || node.nodeId;
|
||||
const ip = node.remoteIp?.trim();
|
||||
const version = node.version?.trim() || "unknown";
|
||||
const text = `Node: ${host}${ip ? ` (${ip})` : ""} · app ${version} · last input 0s ago · mode remote · reason iris-disconnected`;
|
||||
upsertPresence(node.nodeId, {
|
||||
host,
|
||||
ip,
|
||||
version,
|
||||
mode: "remote",
|
||||
reason: "iris-disconnected",
|
||||
lastInputSeconds: 0,
|
||||
instanceId: node.nodeId,
|
||||
text,
|
||||
});
|
||||
presenceVersion += 1;
|
||||
broadcast(
|
||||
"presence",
|
||||
{ presence: listSystemPresence() },
|
||||
{
|
||||
dropIfSlow: true,
|
||||
stateVersion: {
|
||||
presence: presenceVersion,
|
||||
health: healthVersion,
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
onEvent: handleBridgeEvent,
|
||||
onPairRequested: (request) => {
|
||||
broadcast("node.pair.requested", request, { dropIfSlow: true });
|
||||
},
|
||||
});
|
||||
if (started.port > 0) {
|
||||
bridge = started;
|
||||
@@ -680,9 +739,22 @@ export async function startGatewayServer(
|
||||
}
|
||||
|
||||
try {
|
||||
const sshPortEnv = process.env.CLAWDIS_SSH_PORT?.trim();
|
||||
const sshPortParsed = sshPortEnv ? Number.parseInt(sshPortEnv, 10) : NaN;
|
||||
const sshPort =
|
||||
Number.isFinite(sshPortParsed) && sshPortParsed > 0
|
||||
? sshPortParsed
|
||||
: undefined;
|
||||
|
||||
const tailnetDnsEnv = process.env.CLAWDIS_TAILNET_DNS?.trim();
|
||||
const tailnetDns =
|
||||
tailnetDnsEnv && tailnetDnsEnv.length > 0 ? tailnetDnsEnv : undefined;
|
||||
|
||||
const bonjour = await startGatewayBonjourAdvertiser({
|
||||
gatewayPort: port,
|
||||
bridgePort: bridge?.port,
|
||||
sshPort,
|
||||
tailnetDns,
|
||||
});
|
||||
bonjourStop = bonjour.stop;
|
||||
} catch (err) {
|
||||
|
||||
@@ -126,4 +126,131 @@ describe("node bridge server", () => {
|
||||
|
||||
await server.close();
|
||||
});
|
||||
|
||||
it("calls onPairRequested for newly created pending requests", async () => {
|
||||
let requested: { nodeId?: string; requestId?: string } | null = null;
|
||||
const server = await startNodeBridgeServer({
|
||||
host: "127.0.0.1",
|
||||
port: 0,
|
||||
pairingBaseDir: baseDir,
|
||||
onPairRequested: async (req) => {
|
||||
requested = req;
|
||||
},
|
||||
});
|
||||
|
||||
const socket = net.connect({ host: "127.0.0.1", port: server.port });
|
||||
sendLine(socket, { type: "pair-request", nodeId: "n3", platform: "ios" });
|
||||
|
||||
for (let i = 0; i < 40; i += 1) {
|
||||
if (requested) break;
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
}
|
||||
|
||||
expect(requested?.nodeId).toBe("n3");
|
||||
expect(typeof requested?.requestId).toBe("string");
|
||||
|
||||
socket.destroy();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
it("passes node metadata to onAuthenticated and onDisconnected", async () => {
|
||||
let lastAuthed: {
|
||||
nodeId?: string;
|
||||
displayName?: string;
|
||||
platform?: string;
|
||||
version?: string;
|
||||
remoteIp?: string;
|
||||
} | null = null;
|
||||
|
||||
let disconnected: {
|
||||
nodeId?: string;
|
||||
displayName?: string;
|
||||
platform?: string;
|
||||
version?: string;
|
||||
remoteIp?: string;
|
||||
} | null = null;
|
||||
|
||||
let resolveDisconnected: (() => void) | null = null;
|
||||
const disconnectedP = new Promise<void>((resolve) => {
|
||||
resolveDisconnected = resolve;
|
||||
});
|
||||
|
||||
const server = await startNodeBridgeServer({
|
||||
host: "127.0.0.1",
|
||||
port: 0,
|
||||
pairingBaseDir: baseDir,
|
||||
onAuthenticated: async (node) => {
|
||||
lastAuthed = node;
|
||||
},
|
||||
onDisconnected: async (node) => {
|
||||
disconnected = node;
|
||||
resolveDisconnected?.();
|
||||
},
|
||||
});
|
||||
|
||||
const socket = net.connect({ host: "127.0.0.1", port: server.port });
|
||||
const readLine = createLineReader(socket);
|
||||
sendLine(socket, {
|
||||
type: "pair-request",
|
||||
nodeId: "n4",
|
||||
displayName: "Iris",
|
||||
platform: "ios",
|
||||
version: "1.0",
|
||||
});
|
||||
|
||||
// Approve the pending request from the gateway side.
|
||||
let reqId: string | undefined;
|
||||
for (let i = 0; i < 40; i += 1) {
|
||||
const list = await listNodePairing(baseDir);
|
||||
const req = list.pending.find((p) => p.nodeId === "n4");
|
||||
if (req) {
|
||||
reqId = req.requestId;
|
||||
break;
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
}
|
||||
expect(reqId).toBeTruthy();
|
||||
if (!reqId) throw new Error("expected a pending requestId");
|
||||
const approved = await approveNodePairing(reqId, baseDir);
|
||||
const token = approved?.node?.token ?? "";
|
||||
expect(token.length).toBeGreaterThan(0);
|
||||
|
||||
const line1 = JSON.parse(await readLine()) as { type: string };
|
||||
expect(line1.type).toBe("pair-ok");
|
||||
const line2 = JSON.parse(await readLine()) as { type: string };
|
||||
expect(line2.type).toBe("hello-ok");
|
||||
socket.destroy();
|
||||
|
||||
const socket2 = net.connect({ host: "127.0.0.1", port: server.port });
|
||||
const readLine2 = createLineReader(socket2);
|
||||
sendLine(socket2, {
|
||||
type: "hello",
|
||||
nodeId: "n4",
|
||||
token,
|
||||
displayName: "Different name",
|
||||
platform: "ios",
|
||||
version: "2.0",
|
||||
});
|
||||
const line3 = JSON.parse(await readLine2()) as { type: string };
|
||||
expect(line3.type).toBe("hello-ok");
|
||||
|
||||
for (let i = 0; i < 40; i += 1) {
|
||||
if (lastAuthed?.nodeId === "n4") break;
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
}
|
||||
|
||||
expect(lastAuthed?.nodeId).toBe("n4");
|
||||
// Prefer paired metadata over hello payload (token verifies the stored node record).
|
||||
expect(lastAuthed?.displayName).toBe("Iris");
|
||||
expect(lastAuthed?.platform).toBe("ios");
|
||||
expect(lastAuthed?.version).toBe("1.0");
|
||||
expect(lastAuthed?.remoteIp?.includes("127.0.0.1")).toBe(true);
|
||||
|
||||
socket2.destroy();
|
||||
await disconnectedP;
|
||||
expect(disconnected?.nodeId).toBe("n4");
|
||||
expect(disconnected?.remoteIp?.includes("127.0.0.1")).toBe(true);
|
||||
|
||||
await server.close();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,6 +4,7 @@ import os from "node:os";
|
||||
import {
|
||||
getPairedNode,
|
||||
listNodePairing,
|
||||
type NodePairingPendingRequest,
|
||||
requestNodePairing,
|
||||
verifyNodeToken,
|
||||
} from "../node-pairing.js";
|
||||
@@ -64,13 +65,24 @@ export type NodeBridgeServer = {
|
||||
close: () => Promise<void>;
|
||||
};
|
||||
|
||||
export type NodeBridgeClientInfo = {
|
||||
nodeId: string;
|
||||
displayName?: string;
|
||||
platform?: string;
|
||||
version?: string;
|
||||
remoteIp?: string;
|
||||
};
|
||||
|
||||
export type NodeBridgeServerOpts = {
|
||||
host: string;
|
||||
port: number; // 0 = ephemeral
|
||||
pairingBaseDir?: string;
|
||||
onEvent?: (nodeId: string, evt: BridgeEventFrame) => Promise<void> | void;
|
||||
onAuthenticated?: (nodeId: string) => Promise<void> | void;
|
||||
onDisconnected?: (nodeId: string) => Promise<void> | void;
|
||||
onAuthenticated?: (node: NodeBridgeClientInfo) => Promise<void> | void;
|
||||
onDisconnected?: (node: NodeBridgeClientInfo) => Promise<void> | void;
|
||||
onPairRequested?: (
|
||||
request: NodePairingPendingRequest,
|
||||
) => Promise<void> | void;
|
||||
serverName?: string;
|
||||
};
|
||||
|
||||
@@ -109,6 +121,7 @@ export async function startNodeBridgeServer(
|
||||
let buffer = "";
|
||||
let isAuthenticated = false;
|
||||
let nodeId: string | null = null;
|
||||
let nodeInfo: NodeBridgeClientInfo | null = null;
|
||||
const invokeWaiters = new Map<
|
||||
string,
|
||||
{
|
||||
@@ -163,15 +176,22 @@ export async function startNodeBridgeServer(
|
||||
token,
|
||||
opts.pairingBaseDir,
|
||||
);
|
||||
if (!verified.ok) {
|
||||
if (!verified.ok || !verified.node) {
|
||||
sendError("UNAUTHORIZED", "invalid token");
|
||||
return;
|
||||
}
|
||||
|
||||
isAuthenticated = true;
|
||||
connections.set(nodeId, socket);
|
||||
nodeInfo = {
|
||||
nodeId,
|
||||
displayName: verified.node.displayName ?? hello.displayName,
|
||||
platform: verified.node.platform ?? hello.platform,
|
||||
version: verified.node.version ?? hello.version,
|
||||
remoteIp: remoteAddress,
|
||||
};
|
||||
send({ type: "hello-ok", serverName } satisfies BridgeHelloOkFrame);
|
||||
await opts.onAuthenticated?.(nodeId);
|
||||
await opts.onAuthenticated?.(nodeInfo);
|
||||
};
|
||||
|
||||
const waitForApproval = async (request: {
|
||||
@@ -227,6 +247,9 @@ export async function startNodeBridgeServer(
|
||||
},
|
||||
opts.pairingBaseDir,
|
||||
);
|
||||
if (result.created) {
|
||||
await opts.onPairRequested?.(result.request);
|
||||
}
|
||||
|
||||
const wait = await waitForApproval(result.request);
|
||||
if (!wait.ok) {
|
||||
@@ -236,9 +259,16 @@ export async function startNodeBridgeServer(
|
||||
|
||||
isAuthenticated = true;
|
||||
connections.set(nodeId, socket);
|
||||
nodeInfo = {
|
||||
nodeId,
|
||||
displayName: req.displayName,
|
||||
platform: req.platform,
|
||||
version: req.version,
|
||||
remoteIp: remoteAddress,
|
||||
};
|
||||
send({ type: "pair-ok", token: wait.token } satisfies BridgePairOkFrame);
|
||||
send({ type: "hello-ok", serverName } satisfies BridgeHelloOkFrame);
|
||||
await opts.onAuthenticated?.(nodeId);
|
||||
await opts.onAuthenticated?.(nodeInfo);
|
||||
};
|
||||
|
||||
const handleEvent = async (evt: BridgeEventFrame) => {
|
||||
@@ -319,9 +349,9 @@ export async function startNodeBridgeServer(
|
||||
});
|
||||
|
||||
socket.on("close", () => {
|
||||
const id = nodeId;
|
||||
const info = nodeInfo;
|
||||
stop();
|
||||
if (id && isAuthenticated) void opts.onDisconnected?.(id);
|
||||
if (info && isAuthenticated) void opts.onDisconnected?.(info);
|
||||
});
|
||||
socket.on("error", () => {
|
||||
// close handler will run after close
|
||||
|
||||
Reference in New Issue
Block a user