Gateway: finalize WS control plane

This commit is contained in:
Peter Steinberger
2025-12-09 14:41:41 +01:00
parent 9ef1545d06
commit b2e7fb01a9
23 changed files with 5209 additions and 2495 deletions

View File

@@ -0,0 +1,6 @@
import { describe, it } from "vitest";
// Placeholder suite to keep vitest happy; legacy session reply coverage lives in other files.
describe.skip("reply.session (legacy)", () => {
it("placeholder", () => {});
});

View File

@@ -5,9 +5,10 @@ import { healthCommand } from "../commands/health.js";
import { sendCommand } from "../commands/send.js";
import { sessionsCommand } from "../commands/sessions.js";
import { statusCommand } from "../commands/status.js";
import { startGatewayServer } from "../gateway/server.js";
import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
import { loadConfig } from "../config/config.js";
import { danger, info, setVerbose } from "../globals.js";
import { startControlChannel } from "../infra/control-channel.js";
import { acquireRelayLock, RelayLockError } from "../infra/relay-lock.js";
import { getResolvedLoggerSettings } from "../logging.js";
import {
@@ -331,6 +332,178 @@ Examples:
}
});
program
.command("gateway")
.description("Run the WebSocket Gateway (replaces relay)")
.option("--port <port>", "Port for the gateway WebSocket", "18789")
.option(
"--token <token>",
"Shared token required in hello.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)",
)
.action(async (opts) => {
const port = Number.parseInt(String(opts.port ?? "18789"), 10);
if (Number.isNaN(port) || port <= 0) {
defaultRuntime.error("Invalid port");
defaultRuntime.exit(1);
}
if (opts.token) {
process.env.CLAWDIS_GATEWAY_TOKEN = String(opts.token);
}
try {
await startGatewayServer(port);
} catch (err) {
defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
defaultRuntime.exit(1);
}
// Keep process alive
await new Promise<never>(() => {});
});
const gatewayCallOpts = (cmd: Command) =>
cmd
.option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789")
.option("--token <token>", "Gateway token (if required)")
.option("--timeout <ms>", "Timeout in ms", "10000")
.option("--expect-final", "Wait for final response (agent)" , false);
gatewayCallOpts(
program
.command("gw:call")
.description("Call a Gateway method over WS and print JSON")
.argument("<method>", "Method name (health/status/system-presence/send/agent)")
.option("--params <json>", "JSON object string for params", "{}")
.action(async (method, opts) => {
try {
const params = JSON.parse(String(opts.params ?? "{}"));
const result = await callGateway({
url: opts.url,
token: opts.token,
method,
params,
expectFinal: Boolean(opts.expectFinal),
timeoutMs: Number(opts.timeout ?? 10000),
clientName: "cli",
mode: "cli",
});
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(`Gateway call failed: ${String(err)}`);
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
program
.command("gw:health")
.description("Fetch Gateway health over WS")
.action(async (opts) => {
try {
const result = await callGateway({
url: opts.url,
token: opts.token,
method: "health",
timeoutMs: Number(opts.timeout ?? 10000),
});
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
program
.command("gw:status")
.description("Fetch Gateway status over WS")
.action(async (opts) => {
try {
const result = await callGateway({
url: opts.url,
token: opts.token,
method: "status",
timeoutMs: Number(opts.timeout ?? 10000),
});
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
program
.command("gw:send")
.description("Send a message via the Gateway")
.requiredOption("--to <jidOrPhone>", "Destination (E.164 or jid)")
.requiredOption("--message <text>", "Message text")
.option("--media-url <url>", "Optional media URL")
.option("--idempotency-key <key>", "Idempotency key")
.action(async (opts) => {
try {
const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey();
const result = await callGateway({
url: opts.url,
token: opts.token,
method: "send",
params: {
to: opts.to,
message: opts.message,
mediaUrl: opts.mediaUrl,
idempotencyKey,
},
timeoutMs: Number(opts.timeout ?? 10000),
});
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
program
.command("gw:agent")
.description("Run an agent turn via the Gateway (waits for final)")
.requiredOption("--message <text>", "User message")
.option("--to <jidOrPhone>", "Destination")
.option("--session-id <id>", "Session id")
.option("--thinking <level>", "Thinking level")
.option("--deliver", "Deliver response", false)
.option("--timeout-seconds <n>", "Agent timeout seconds")
.option("--idempotency-key <key>", "Idempotency key")
.action(async (opts) => {
try {
const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey();
const result = await callGateway({
url: opts.url,
token: opts.token,
method: "agent",
params: {
message: opts.message,
to: opts.to,
sessionId: opts.sessionId,
thinking: opts.thinking,
deliver: Boolean(opts.deliver),
timeout: opts.timeoutSeconds
? Number.parseInt(String(opts.timeoutSeconds), 10)
: undefined,
idempotencyKey,
},
expectFinal: true,
timeoutMs: Number(opts.timeout ?? 10000),
});
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
program
.command("relay")
.description(
@@ -508,24 +681,6 @@ Examples:
const runners: Array<Promise<unknown>> = [];
let control = null as Awaited<
ReturnType<typeof startControlChannel>
> | null;
try {
control = await startControlChannel(
{
setHeartbeats: async (enabled: boolean) => {
setHeartbeatsEnabled(enabled);
},
},
{ runtime: defaultRuntime },
);
} catch (err) {
defaultRuntime.error(
danger(`Control channel failed to start: ${String(err)}`),
);
}
if (startWeb) {
const webTuning: WebMonitorTuning = {};
if (webHeartbeat !== undefined)
@@ -613,7 +768,6 @@ Examples:
defaultRuntime.exit(1);
} finally {
if (releaseRelayLock) await releaseRelayLock();
if (control) await control.close();
}
});

70
src/gateway/call.ts Normal file
View File

@@ -0,0 +1,70 @@
import { randomUUID } from "node:crypto";
import { GatewayClient } from "./client.js";
export type CallGatewayOptions = {
url?: string;
token?: string;
method: string;
params?: unknown;
expectFinal?: boolean;
timeoutMs?: number;
clientName?: string;
clientVersion?: string;
platform?: string;
mode?: string;
instanceId?: string;
minProtocol?: number;
maxProtocol?: number;
};
export async function callGateway<T = unknown>(opts: CallGatewayOptions): Promise<T> {
const timeoutMs = opts.timeoutMs ?? 10_000;
return await new Promise<T>((resolve, reject) => {
let settled = false;
const stop = (err?: Error, value?: T) => {
if (settled) return;
settled = true;
clearTimeout(timer);
if (err) reject(err);
else resolve(value as T);
};
const client = new GatewayClient({
url: opts.url,
token: opts.token,
instanceId: opts.instanceId ?? randomUUID(),
clientName: opts.clientName ?? "cli",
clientVersion: opts.clientVersion ?? "dev",
platform: opts.platform,
mode: opts.mode ?? "cli",
minProtocol: opts.minProtocol ?? 1,
maxProtocol: opts.maxProtocol ?? 1,
onHelloOk: async () => {
try {
const result = await client.request<T>(opts.method, opts.params, {
expectFinal: opts.expectFinal,
});
client.stop();
stop(undefined, result);
} catch (err) {
client.stop();
stop(err as Error);
}
},
onClose: (code, reason) => {
stop(new Error(`gateway closed (${code}): ${reason}`));
},
});
const timer = setTimeout(() => {
client.stop();
stop(new Error("gateway timeout"));
}, timeoutMs);
client.start();
});
}
export function randomIdempotencyKey() {
return randomUUID();
}

173
src/gateway/client.ts Normal file
View File

@@ -0,0 +1,173 @@
import { randomUUID } from "node:crypto";
import { WebSocket } from "ws";
import { logDebug, logError } from "../logger.js";
import {
type EventFrame,
type Hello,
type HelloOk,
type RequestFrame,
validateRequestFrame,
} from "./protocol/index.js";
type Pending = {
resolve: (value: any) => void;
reject: (err: Error) => void;
expectFinal: boolean;
};
export type GatewayClientOptions = {
url?: string; // ws://127.0.0.1:18789
token?: string;
instanceId?: string;
clientName?: string;
clientVersion?: string;
platform?: string;
mode?: string;
minProtocol?: number;
maxProtocol?: number;
onEvent?: (evt: EventFrame) => void;
onHelloOk?: (hello: HelloOk) => void;
onClose?: (code: number, reason: string) => void;
onGap?: (info: { expected: number; received: number }) => void;
};
export class GatewayClient {
private ws: WebSocket | null = null;
private opts: GatewayClientOptions;
private pending = new Map<string, Pending>();
private backoffMs = 1000;
private closed = false;
private lastSeq: number | null = null;
constructor(opts: GatewayClientOptions) {
this.opts = opts;
}
start() {
if (this.closed) return;
const url = this.opts.url ?? "ws://127.0.0.1:18789";
this.ws = new WebSocket(url, { maxPayload: 512 * 1024 });
this.ws.on("open", () => this.sendHello());
this.ws.on("message", (data) => this.handleMessage(data.toString()));
this.ws.on("close", (code, reason) => {
this.ws = null;
this.flushPendingErrors(
new Error(`gateway closed (${code}): ${reason.toString()}`),
);
this.scheduleReconnect();
this.opts.onClose?.(code, reason.toString());
});
this.ws.on("error", (err) => {
logDebug(`gateway client error: ${String(err)}`);
});
}
stop() {
this.closed = true;
this.ws?.close();
this.ws = null;
this.flushPendingErrors(new Error("gateway client stopped"));
}
private sendHello() {
const hello: Hello = {
type: "hello",
minProtocol: this.opts.minProtocol ?? 1,
maxProtocol: this.opts.maxProtocol ?? 1,
client: {
name: this.opts.clientName ?? "webchat-backend",
version: this.opts.clientVersion ?? "dev",
platform: this.opts.platform ?? process.platform,
mode: this.opts.mode ?? "backend",
instanceId: this.opts.instanceId,
},
caps: [],
auth: this.opts.token ? { token: this.opts.token } : undefined,
};
this.ws?.send(JSON.stringify(hello));
}
private handleMessage(raw: string) {
try {
const parsed = JSON.parse(raw);
if (parsed?.type === "hello-ok") {
this.backoffMs = 1000;
this.opts.onHelloOk?.(parsed as HelloOk);
return;
}
if (parsed?.type === "hello-error") {
logError(`gateway hello-error: ${parsed.reason}`);
this.ws?.close(1008, "hello-error");
return;
}
if (parsed?.type === "event") {
const evt = parsed as EventFrame;
const seq = typeof evt.seq === "number" ? evt.seq : null;
if (seq !== null) {
if (this.lastSeq !== null && seq > this.lastSeq + 1) {
this.opts.onGap?.({ expected: this.lastSeq + 1, received: seq });
}
this.lastSeq = seq;
}
this.opts.onEvent?.(evt);
return;
}
if (parsed?.type === "res") {
const pending = this.pending.get(parsed.id);
if (!pending) return;
// If the payload is an ack with status accepted, keep waiting for final.
const status = parsed.payload?.status;
if (pending.expectFinal && status === "accepted") {
return;
}
this.pending.delete(parsed.id);
if (parsed.ok) pending.resolve(parsed.payload);
else pending.reject(new Error(parsed.error?.message ?? "unknown error"));
}
} catch (err) {
logDebug(`gateway client parse error: ${String(err)}`);
}
}
private scheduleReconnect() {
if (this.closed) return;
const delay = this.backoffMs;
this.backoffMs = Math.min(this.backoffMs * 2, 30_000);
setTimeout(() => this.start(), delay).unref();
}
private flushPendingErrors(err: Error) {
for (const [, p] of this.pending) {
p.reject(err);
}
this.pending.clear();
}
async request<T = unknown>(
method: string,
params?: unknown,
opts?: { expectFinal?: boolean },
): Promise<T> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error("gateway not connected");
}
const id = randomUUID();
const frame: RequestFrame = { type: "req", id, method, params };
if (!validateRequestFrame(frame)) {
throw new Error(
`invalid request frame: ${JSON.stringify(
validateRequestFrame.errors,
null,
2,
)}`,
);
}
const expectFinal = opts?.expectFinal === true;
const p = new Promise<T>((resolve, reject) => {
this.pending.set(id, { resolve, reject, expectFinal });
});
this.ws.send(JSON.stringify(frame));
return p;
}
}

View File

@@ -0,0 +1,79 @@
import AjvPkg, { type ErrorObject } from "ajv";
import {
AgentEventSchema,
AgentParamsSchema,
ErrorCodes,
ErrorShapeSchema,
EventFrameSchema,
HelloErrorSchema,
HelloOkSchema,
HelloSchema,
PresenceEntrySchema,
ProtocolSchemas,
RequestFrameSchema,
ResponseFrameSchema,
SendParamsSchema,
SnapshotSchema,
StateVersionSchema,
errorShape,
type AgentEvent,
type ErrorShape,
type EventFrame,
type Hello,
type HelloError,
type HelloOk,
type PresenceEntry,
type RequestFrame,
type ResponseFrame,
type Snapshot,
type StateVersion,
} from "./schema.js";
const ajv = new (AjvPkg as unknown as new (opts?: object) => import("ajv").default)({
allErrors: true,
strict: false,
removeAdditional: false,
});
export const validateHello = ajv.compile<Hello>(HelloSchema);
export const validateRequestFrame = ajv.compile<RequestFrame>(RequestFrameSchema);
export const validateSendParams = ajv.compile(SendParamsSchema);
export const validateAgentParams = ajv.compile(AgentParamsSchema);
export function formatValidationErrors(errors: ErrorObject[] | null | undefined) {
if (!errors) return "unknown validation error";
return ajv.errorsText(errors, { separator: "; " });
}
export {
HelloSchema,
HelloOkSchema,
HelloErrorSchema,
RequestFrameSchema,
ResponseFrameSchema,
EventFrameSchema,
PresenceEntrySchema,
SnapshotSchema,
ErrorShapeSchema,
StateVersionSchema,
AgentEventSchema,
SendParamsSchema,
AgentParamsSchema,
ProtocolSchemas,
ErrorCodes,
errorShape,
};
export type {
Hello,
HelloOk,
HelloError,
RequestFrame,
ResponseFrame,
EventFrame,
PresenceEntry,
Snapshot,
ErrorShape,
StateVersion,
AgentEvent,
};

View File

@@ -0,0 +1,239 @@
import { Type, type Static, type TSchema } from "@sinclair/typebox";
const NonEmptyString = Type.String({ minLength: 1 });
export const PresenceEntrySchema = Type.Object(
{
host: Type.Optional(NonEmptyString),
ip: Type.Optional(NonEmptyString),
version: Type.Optional(NonEmptyString),
mode: Type.Optional(NonEmptyString),
lastInputSeconds: Type.Optional(Type.Integer({ minimum: 0 })),
reason: Type.Optional(NonEmptyString),
tags: Type.Optional(Type.Array(NonEmptyString)),
text: Type.Optional(Type.String()),
ts: Type.Integer({ minimum: 0 }),
instanceId: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
);
export const HealthSnapshotSchema = Type.Any();
export const StateVersionSchema = Type.Object(
{
presence: Type.Integer({ minimum: 0 }),
health: Type.Integer({ minimum: 0 }),
},
{ additionalProperties: false },
);
export const SnapshotSchema = Type.Object(
{
presence: Type.Array(PresenceEntrySchema),
health: HealthSnapshotSchema,
stateVersion: StateVersionSchema,
uptimeMs: Type.Integer({ minimum: 0 }),
},
{ additionalProperties: false },
);
export const HelloSchema = Type.Object(
{
type: Type.Literal("hello"),
minProtocol: Type.Integer({ minimum: 1 }),
maxProtocol: Type.Integer({ minimum: 1 }),
client: Type.Object(
{
name: NonEmptyString,
version: NonEmptyString,
platform: NonEmptyString,
mode: NonEmptyString,
instanceId: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
),
caps: Type.Optional(Type.Array(NonEmptyString, { default: [] })),
auth: Type.Optional(
Type.Object(
{
token: Type.Optional(Type.String()),
},
{ additionalProperties: false },
),
),
locale: Type.Optional(Type.String()),
userAgent: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
export const HelloOkSchema = Type.Object(
{
type: Type.Literal("hello-ok"),
protocol: Type.Integer({ minimum: 1 }),
server: Type.Object(
{
version: NonEmptyString,
commit: Type.Optional(NonEmptyString),
host: Type.Optional(NonEmptyString),
connId: NonEmptyString,
},
{ additionalProperties: false },
),
features: Type.Object(
{
methods: Type.Array(NonEmptyString),
events: Type.Array(NonEmptyString),
},
{ additionalProperties: false },
),
snapshot: SnapshotSchema,
policy: Type.Object(
{
maxPayload: Type.Integer({ minimum: 1 }),
maxBufferedBytes: Type.Integer({ minimum: 1 }),
tickIntervalMs: Type.Integer({ minimum: 1 }),
},
{ additionalProperties: false },
),
},
{ additionalProperties: false },
);
export const HelloErrorSchema = Type.Object(
{
type: Type.Literal("hello-error"),
reason: NonEmptyString,
expectedProtocol: Type.Optional(Type.Integer({ minimum: 1 })),
minClient: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
);
export const ErrorShapeSchema = Type.Object(
{
code: NonEmptyString,
message: NonEmptyString,
details: Type.Optional(Type.Unknown()),
retryable: Type.Optional(Type.Boolean()),
retryAfterMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
export const RequestFrameSchema = Type.Object(
{
type: Type.Literal("req"),
id: NonEmptyString,
method: NonEmptyString,
params: Type.Optional(Type.Unknown()),
},
{ additionalProperties: false },
);
export const ResponseFrameSchema = Type.Object(
{
type: Type.Literal("res"),
id: NonEmptyString,
ok: Type.Boolean(),
payload: Type.Optional(Type.Unknown()),
error: Type.Optional(ErrorShapeSchema),
},
{ additionalProperties: false },
);
export const EventFrameSchema = Type.Object(
{
type: Type.Literal("event"),
event: NonEmptyString,
payload: Type.Optional(Type.Unknown()),
seq: Type.Optional(Type.Integer({ minimum: 0 })),
stateVersion: Type.Optional(StateVersionSchema),
},
{ additionalProperties: false },
);
export const AgentEventSchema = Type.Object(
{
runId: NonEmptyString,
seq: Type.Integer({ minimum: 0 }),
stream: NonEmptyString,
ts: Type.Integer({ minimum: 0 }),
data: Type.Record(Type.String(), Type.Unknown()),
},
{ additionalProperties: false },
);
export const SendParamsSchema = Type.Object(
{
to: NonEmptyString,
message: NonEmptyString,
mediaUrl: Type.Optional(Type.String()),
provider: Type.Optional(Type.String()),
idempotencyKey: NonEmptyString,
},
{ additionalProperties: false },
);
export const AgentParamsSchema = Type.Object(
{
message: NonEmptyString,
to: Type.Optional(Type.String()),
sessionId: Type.Optional(Type.String()),
thinking: Type.Optional(Type.String()),
deliver: Type.Optional(Type.Boolean()),
timeout: Type.Optional(Type.Integer({ minimum: 0 })),
idempotencyKey: NonEmptyString,
},
{ additionalProperties: false },
);
export const ProtocolSchemas: Record<string, TSchema> = {
Hello: HelloSchema,
HelloOk: HelloOkSchema,
HelloError: HelloErrorSchema,
RequestFrame: RequestFrameSchema,
ResponseFrame: ResponseFrameSchema,
EventFrame: EventFrameSchema,
PresenceEntry: PresenceEntrySchema,
StateVersion: StateVersionSchema,
Snapshot: SnapshotSchema,
ErrorShape: ErrorShapeSchema,
AgentEvent: AgentEventSchema,
SendParams: SendParamsSchema,
AgentParams: AgentParamsSchema,
};
export type Hello = Static<typeof HelloSchema>;
export type HelloOk = Static<typeof HelloOkSchema>;
export type HelloError = Static<typeof HelloErrorSchema>;
export type RequestFrame = Static<typeof RequestFrameSchema>;
export type ResponseFrame = Static<typeof ResponseFrameSchema>;
export type EventFrame = Static<typeof EventFrameSchema>;
export type Snapshot = Static<typeof SnapshotSchema>;
export type PresenceEntry = Static<typeof PresenceEntrySchema>;
export type ErrorShape = Static<typeof ErrorShapeSchema>;
export type StateVersion = Static<typeof StateVersionSchema>;
export type AgentEvent = Static<typeof AgentEventSchema>;
export const ErrorCodes = {
NOT_LINKED: "NOT_LINKED",
AGENT_TIMEOUT: "AGENT_TIMEOUT",
INVALID_REQUEST: "INVALID_REQUEST",
UNAVAILABLE: "UNAVAILABLE",
} as const;
export type ErrorCode = (typeof ErrorCodes)[keyof typeof ErrorCodes];
export function errorShape(
code: ErrorCode,
message: string,
opts?: { details?: unknown; retryable?: boolean; retryAfterMs?: number },
): ErrorShape {
return {
code,
message,
...opts,
};
}

427
src/gateway/server.test.ts Normal file
View File

@@ -0,0 +1,427 @@
import { describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import { AddressInfo, createServer } from "node:net";
import { startGatewayServer } from "./server.js";
import { emitAgentEvent } from "../infra/agent-events.js";
vi.mock("../commands/health.js", () => ({
getHealthSnapshot: vi.fn().mockResolvedValue({ ok: true, stub: true }),
}));
vi.mock("../commands/status.js", () => ({
getStatusSummary: vi.fn().mockResolvedValue({ ok: true }),
}));
vi.mock("../web/outbound.js", () => ({
sendMessageWhatsApp: vi.fn().mockResolvedValue({ messageId: "msg-1", toJid: "jid-1" }),
}));
vi.mock("../commands/agent.js", () => ({
agentCommand: vi.fn().mockResolvedValue(undefined),
}));
async function getFreePort(): Promise<number> {
return await new Promise((resolve, reject) => {
const server = createServer();
server.listen(0, "127.0.0.1", () => {
const port = (server.address() as AddressInfo).port;
server.close((err) => (err ? reject(err) : resolve(port)));
});
});
}
function onceMessage<T = any>(ws: WebSocket, filter: (obj: any) => boolean, timeoutMs = 3000) {
return new Promise<T>((resolve, reject) => {
const timer = setTimeout(() => reject(new Error("timeout")), timeoutMs);
const closeHandler = (code: number, reason: Buffer) => {
clearTimeout(timer);
ws.off("message", handler);
reject(new Error(`closed ${code}: ${reason.toString()}`));
};
const handler = (data: WebSocket.RawData) => {
const obj = JSON.parse(String(data));
if (filter(obj)) {
clearTimeout(timer);
ws.off("message", handler);
ws.off("close", closeHandler);
resolve(obj as T);
}
};
ws.on("message", handler);
ws.once("close", closeHandler);
});
}
async function startServerWithClient(token?: string) {
const port = await getFreePort();
const prev = process.env.CLAWDIS_GATEWAY_TOKEN;
if (token === undefined) {
delete process.env.CLAWDIS_GATEWAY_TOKEN;
} else {
process.env.CLAWDIS_GATEWAY_TOKEN = token;
}
const server = await startGatewayServer(port);
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => ws.once("open", resolve));
return { server, ws, port, prevToken: prev };
}
describe("gateway server", () => {
test("rejects protocol mismatch", async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 2,
maxProtocol: 3,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
const res = await onceMessage(ws, () => true);
expect(res.type).toBe("hello-error");
expect(res.reason).toContain("protocol mismatch");
ws.close();
await server.close();
});
test("rejects invalid token", async () => {
const { server, ws, prevToken } = await startServerWithClient("secret");
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
auth: { token: "wrong" },
}),
);
const res = await onceMessage(ws, () => true);
expect(res.type).toBe("hello-error");
expect(res.reason).toContain("unauthorized");
ws.close();
await server.close();
process.env.CLAWDIS_GATEWAY_TOKEN = prevToken;
});
test("closes silent handshakes after timeout", async () => {
const { server, ws } = await startServerWithClient();
const closed = await new Promise<boolean>((resolve) => {
const timer = setTimeout(() => resolve(false), 4000);
ws.once("close", () => {
clearTimeout(timer);
resolve(true);
});
});
expect(closed).toBe(true);
await server.close();
});
test("hello + health + presence + status succeed", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const healthP = onceMessage(ws, (o) => o.type === "res" && o.id === "health1");
const statusP = onceMessage(ws, (o) => o.type === "res" && o.id === "status1");
const presenceP = onceMessage(ws, (o) => o.type === "res" && o.id === "presence1");
const sendReq = (id: string, method: string) =>
ws.send(JSON.stringify({ type: "req", id, method }));
sendReq("health1", "health");
sendReq("status1", "status");
sendReq("presence1", "system-presence");
const health = await healthP;
const status = await statusP;
const presence = await presenceP;
expect(health.ok).toBe(true);
expect(status.ok).toBe(true);
expect(presence.ok).toBe(true);
expect(Array.isArray(presence.payload)).toBe(true);
ws.close();
await server.close();
});
test("presence events carry seq + stateVersion", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const presenceEventP = onceMessage(ws, (o) => o.type === "event" && o.event === "presence");
ws.send(
JSON.stringify({
type: "req",
id: "evt-1",
method: "system-event",
params: { text: "note from test" },
}),
);
const evt = await presenceEventP;
expect(typeof evt.seq).toBe("number");
expect(evt.stateVersion?.presence).toBeGreaterThan(0);
expect(Array.isArray(evt.payload?.presence)).toBe(true);
ws.close();
await server.close();
});
test("agent events stream with seq", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
// Emit a fake agent event directly through the shared emitter.
const evtPromise = onceMessage(ws, (o) => o.type === "event" && o.event === "agent");
emitAgentEvent({ runId: "run-1", stream: "job", data: { msg: "hi" } });
const evt = await evtPromise;
expect(evt.payload.runId).toBe("run-1");
expect(typeof evt.seq).toBe("number");
expect(evt.payload.data.msg).toBe("hi");
ws.close();
await server.close();
});
test("agent ack then final response", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const ackP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status === "accepted");
const finalP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted");
ws.send(
JSON.stringify({
type: "req",
id: "ag1",
method: "agent",
params: { message: "hi", idempotencyKey: "idem-ag" },
}),
);
const ack = await ackP;
const final = await finalP;
expect(ack.payload.runId).toBeDefined();
expect(final.payload.runId).toBe(ack.payload.runId);
expect(final.payload.status).toBe("ok");
ws.close();
await server.close();
});
test("agent dedupes by idempotencyKey after completion", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const firstFinalP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted");
ws.send(
JSON.stringify({
type: "req",
id: "ag1",
method: "agent",
params: { message: "hi", idempotencyKey: "same-agent" },
}),
);
const firstFinal = await firstFinalP;
const secondP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag2");
ws.send(
JSON.stringify({
type: "req",
id: "ag2",
method: "agent",
params: { message: "hi again", idempotencyKey: "same-agent" },
}),
);
const second = await secondP;
expect(second.payload).toEqual(firstFinal.payload);
ws.close();
await server.close();
});
test("shutdown event is broadcast on close", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const shutdownP = onceMessage(ws, (o) => o.type === "event" && o.event === "shutdown", 5000);
await server.close();
const evt = await shutdownP;
expect(evt.payload?.reason).toBeDefined();
});
test("presence broadcast reaches multiple clients", { timeout: 8000 }, async () => {
const port = await getFreePort();
const server = await startGatewayServer(port);
const mkClient = async () => {
const c = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => c.once("open", resolve));
c.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(c, (o) => o.type === "hello-ok");
return c;
};
const clients = await Promise.all([mkClient(), mkClient(), mkClient()]);
const waits = clients.map((c) => onceMessage(c, (o) => o.type === "event" && o.event === "presence"));
clients[0].send(
JSON.stringify({
type: "req",
id: "broadcast",
method: "system-event",
params: { text: "fanout" },
}),
);
const events = await Promise.all(waits);
for (const evt of events) {
expect(evt.payload?.presence?.length).toBeGreaterThan(0);
expect(typeof evt.seq).toBe("number");
}
for (const c of clients) c.close();
await server.close();
});
test("send dedupes by idempotencyKey", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const idem = "same-key";
const res1P = onceMessage(ws, (o) => o.type === "res" && o.id === "a1");
const res2P = onceMessage(ws, (o) => o.type === "res" && o.id === "a2");
const sendReq = (id: string) =>
ws.send(
JSON.stringify({
type: "req",
id,
method: "send",
params: { to: "+15550000000", message: "hi", idempotencyKey: idem },
}),
);
sendReq("a1");
sendReq("a2");
const res1 = await res1P;
const res2 = await res2P;
expect(res1.ok).toBe(true);
expect(res2.ok).toBe(true);
expect(res1.payload).toEqual(res2.payload);
ws.close();
await server.close();
});
test("agent dedupe survives reconnect", { timeout: 15000 }, async () => {
const port = await getFreePort();
const server = await startGatewayServer(port);
const dial = async () => {
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => ws.once("open", resolve));
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
return ws;
};
const idem = "reconnect-agent";
const ws1 = await dial();
const final1P = onceMessage(ws1, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted", 6000);
ws1.send(
JSON.stringify({
type: "req",
id: "ag1",
method: "agent",
params: { message: "hi", idempotencyKey: idem },
}),
);
const final1 = await final1P;
ws1.close();
const ws2 = await dial();
const final2P = onceMessage(ws2, (o) => o.type === "res" && o.id === "ag2", 6000);
ws2.send(
JSON.stringify({
type: "req",
id: "ag2",
method: "agent",
params: { message: "hi again", idempotencyKey: idem },
}),
);
const res = await final2P;
expect(res.payload).toEqual(final1.payload);
ws2.close();
await server.close();
});
});

479
src/gateway/server.ts Normal file
View File

@@ -0,0 +1,479 @@
import os from "node:os";
import { randomUUID } from "node:crypto";
import { WebSocketServer, type WebSocket } from "ws";
import { getHealthSnapshot } from "../commands/health.js";
import { getStatusSummary } from "../commands/status.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { listSystemPresence, upsertPresence } from "../infra/system-presence.js";
import { logError } from "../logger.js";
import { defaultRuntime } from "../runtime.js";
import {
ErrorCodes,
type ErrorShape,
type Hello,
type RequestFrame,
type Snapshot,
errorShape,
formatValidationErrors,
validateAgentParams,
validateHello,
validateRequestFrame,
validateSendParams,
} from "./protocol/index.js";
import { sendMessageWhatsApp } from "../web/outbound.js";
import { createDefaultDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { onAgentEvent } from "../infra/agent-events.js";
type Client = {
socket: WebSocket;
hello: Hello;
connId: string;
};
const METHODS = [
"health",
"status",
"system-presence",
"system-event",
"set-heartbeats",
"send",
"agent",
];
const EVENTS = ["agent", "presence", "tick", "shutdown"];
export type GatewayServer = {
close: () => Promise<void>;
};
let presenceVersion = 1;
let healthVersion = 1;
let seq = 0;
function buildSnapshot(): Snapshot {
const presence = listSystemPresence();
const uptimeMs = Math.round(process.uptime() * 1000);
// Health is async; caller should await getHealthSnapshot and replace later if needed.
const emptyHealth: unknown = {};
return {
presence,
health: emptyHealth,
stateVersion: { presence: presenceVersion, health: healthVersion },
uptimeMs,
};
}
const MAX_PAYLOAD_BYTES = 512 * 1024; // cap incoming frame size
const MAX_BUFFERED_BYTES = 1.5 * 1024 * 1024; // per-connection send buffer limit
const HANDSHAKE_TIMEOUT_MS = 3000;
const TICK_INTERVAL_MS = 30_000;
const DEDUPE_TTL_MS = 5 * 60_000;
const DEDUPE_MAX = 1000;
const SERVER_PROTO = 1;
type DedupeEntry = { ts: number; ok: boolean; payload?: unknown; error?: ErrorShape };
const dedupe = new Map<string, DedupeEntry>();
const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN;
export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
const wss = new WebSocketServer({ port, host: "127.0.0.1", maxPayload: MAX_PAYLOAD_BYTES });
const clients = new Set<Client>();
const broadcast = (
event: string,
payload: unknown,
opts?: { dropIfSlow?: boolean; stateVersion?: { presence?: number; health?: number } },
) => {
const frame = JSON.stringify({
type: "event",
event,
payload,
seq: ++seq,
stateVersion: opts?.stateVersion,
});
for (const c of clients) {
const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES;
if (slow && opts?.dropIfSlow) continue;
if (slow) {
try {
c.socket.close(1008, "slow consumer");
} catch {
/* ignore */
}
continue;
}
try {
c.socket.send(frame);
} catch {
/* ignore */
}
}
};
// periodic keepalive
const tickInterval = setInterval(() => {
broadcast("tick", { ts: Date.now() }, { dropIfSlow: true });
}, TICK_INTERVAL_MS);
// dedupe cache cleanup
const dedupeCleanup = setInterval(() => {
const now = Date.now();
for (const [k, v] of dedupe) {
if (now - v.ts > DEDUPE_TTL_MS) dedupe.delete(k);
}
if (dedupe.size > DEDUPE_MAX) {
const entries = [...dedupe.entries()].sort((a, b) => a[1].ts - b[1].ts);
for (let i = 0; i < dedupe.size - DEDUPE_MAX; i++) {
dedupe.delete(entries[i][0]);
}
}
}, 60_000);
const agentUnsub = onAgentEvent((evt) => {
broadcast("agent", evt);
});
wss.on("connection", (socket) => {
let client: Client | null = null;
let closed = false;
const connId = randomUUID();
const deps = createDefaultDeps();
const send = (obj: unknown) => {
try {
socket.send(JSON.stringify(obj));
} catch {
/* ignore */
}
};
const close = () => {
if (closed) return;
closed = true;
clearTimeout(handshakeTimer);
if (client) clients.delete(client);
try {
socket.close(1000);
} catch {
/* ignore */
}
};
socket.once("error", () => close());
socket.once("close", () => {
if (client) {
// mark presence as disconnected
const key = client.hello.client.instanceId || connId;
upsertPresence(key, {
reason: "disconnect",
});
presenceVersion += 1;
broadcast(
"presence",
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: { presence: presenceVersion, health: healthVersion },
},
);
}
close();
});
const handshakeTimer = setTimeout(() => {
if (!client) close();
}, HANDSHAKE_TIMEOUT_MS);
socket.on("message", async (data) => {
if (closed) return;
const text = data.toString();
try {
const parsed = JSON.parse(text);
if (!client) {
// Expect hello
if (!validateHello(parsed)) {
send({
type: "hello-error",
reason: `invalid hello: ${formatValidationErrors(validateHello.errors)}`,
});
socket.close(1008, "invalid hello");
close();
return;
}
const hello = parsed as Hello;
// protocol negotiation
const { minProtocol, maxProtocol } = hello;
if (maxProtocol < SERVER_PROTO || minProtocol > SERVER_PROTO) {
send({
type: "hello-error",
reason: "protocol mismatch",
expectedProtocol: SERVER_PROTO,
});
socket.close(1002, "protocol mismatch");
close();
return;
}
// token auth if required
const token = getGatewayToken();
if (token && hello.auth?.token !== token) {
send({
type: "hello-error",
reason: "unauthorized",
});
socket.close(1008, "unauthorized");
close();
return;
}
client = { socket, hello, connId };
clients.add(client);
// synthesize presence entry for this connection
const presenceKey = hello.client.instanceId || connId;
upsertPresence(presenceKey, {
host: os.hostname(),
version:
process.env.CLAWDIS_VERSION ??
process.env.npm_package_version ??
"dev",
mode: hello.client.mode,
instanceId: hello.client.instanceId,
reason: "connect",
});
presenceVersion += 1;
const snapshot = buildSnapshot();
// Fill health asynchronously for snapshot
const health = await getHealthSnapshot();
snapshot.health = health;
snapshot.stateVersion.health = ++healthVersion;
const helloOk = {
type: "hello-ok",
protocol: SERVER_PROTO,
server: {
version: process.env.CLAWDIS_VERSION ?? process.env.npm_package_version ?? "dev",
commit: process.env.GIT_COMMIT,
host: os.hostname(),
connId,
},
features: { methods: METHODS, events: EVENTS },
snapshot,
policy: {
maxPayload: MAX_PAYLOAD_BYTES,
maxBufferedBytes: MAX_BUFFERED_BYTES,
tickIntervalMs: TICK_INTERVAL_MS,
},
};
clearTimeout(handshakeTimer);
send(helloOk);
return;
}
// After handshake, accept only req frames
if (!validateRequestFrame(parsed)) {
send({
type: "res",
id: (parsed as { id?: unknown })?.id ?? "invalid",
ok: false,
error: errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid request frame: ${formatValidationErrors(validateRequestFrame.errors)}`,
),
});
return;
}
const req = parsed as RequestFrame;
const respond = (
ok: boolean,
payload?: unknown,
error?: ErrorShape,
) => send({ type: "res", id: req.id, ok, payload, error });
switch (req.method) {
case "health": {
const health = await getHealthSnapshot();
healthVersion += 1;
respond(true, health, undefined);
break;
}
case "status": {
const status = await getStatusSummary();
respond(true, status, undefined);
break;
}
case "system-presence": {
const presence = listSystemPresence();
respond(true, presence, undefined);
break;
}
case "system-event": {
const text = String((req.params as { text?: unknown } | undefined)?.text ?? "").trim();
if (!text) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "text required"));
break;
}
enqueueSystemEvent(text);
presenceVersion += 1;
broadcast(
"presence",
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: { presence: presenceVersion, health: healthVersion },
},
);
respond(true, { ok: true }, undefined);
break;
}
case "set-heartbeats": {
respond(true, { ok: true }, undefined);
break;
}
case "send": {
const p = (req.params ?? {}) as Record<string, unknown>;
if (!validateSendParams(p)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid send params: ${formatValidationErrors(validateSendParams.errors)}`,
),
);
break;
}
const params = p as {
to: string;
message: string;
mediaUrl?: string;
provider?: string;
idempotencyKey: string;
};
const idem = params.idempotencyKey;
const cached = dedupe.get(`send:${idem}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error);
break;
}
const to = params.to.trim();
const message = params.message.trim();
try {
const result = await sendMessageWhatsApp(to, message, {
mediaUrl: params.mediaUrl,
verbose: false,
});
const payload = {
runId: idem,
messageId: result.messageId,
toJid: result.toJid ?? `${to}@s.whatsapp.net`,
};
dedupe.set(`send:${idem}`, { ts: Date.now(), ok: true, payload });
respond(true, payload, undefined);
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
dedupe.set(`send:${idem}`, { ts: Date.now(), ok: false, error });
respond(false, undefined, error);
}
break;
}
case "agent": {
const p = (req.params ?? {}) as Record<string, unknown>;
if (!validateAgentParams(p)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid agent params: ${formatValidationErrors(validateAgentParams.errors)}`,
),
);
break;
}
const params = p as {
message: string;
to?: string;
sessionId?: string;
thinking?: string;
deliver?: boolean;
idempotencyKey: string;
timeout?: number;
};
const idem = params.idempotencyKey;
const cached = dedupe.get(`agent:${idem}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error);
break;
}
const message = params.message.trim();
const runId = params.sessionId || randomUUID();
const ackPayload = { runId, status: "accepted" as const };
dedupe.set(`agent:${idem}`, { ts: Date.now(), ok: true, payload: ackPayload });
respond(true, ackPayload, undefined); // ack quickly
try {
await agentCommand(
{
message,
to: params.to,
sessionId: params.sessionId,
thinking: params.thinking,
deliver: params.deliver,
timeout: params.timeout?.toString(),
},
defaultRuntime,
deps,
);
const payload = { runId, status: "ok" as const, summary: "completed" };
dedupe.set(`agent:${idem}`, { ts: Date.now(), ok: true, payload });
respond(true, payload, undefined);
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = { runId, status: "error" as const, summary: String(err) };
dedupe.set(`agent:${idem}`, { ts: Date.now(), ok: false, payload, error });
respond(false, payload, error);
}
break;
}
default: {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, `unknown method: ${req.method}`));
break;
}
}
} catch (err) {
logError(`gateway: parse/handle error: ${String(err)}`);
// If still in handshake, close; otherwise respond error
if (!client) {
close();
}
}
});
});
defaultRuntime.log(
`gateway listening on ws://127.0.0.1:${port} (PID ${process.pid})`,
);
return {
close: async () => {
broadcast("shutdown", { reason: "gateway stopping", restartExpectedMs: null });
clearInterval(tickInterval);
clearInterval(dedupeCleanup);
if (agentUnsub) {
try {
agentUnsub();
} catch {
/* ignore */
}
}
for (const c of clients) {
try {
c.socket.close(1012, "service restart");
} catch {
/* ignore */
}
}
clients.clear();
await new Promise<void>((resolve) => wss.close(() => resolve()));
},
};
}

View File

@@ -1,107 +0,0 @@
import crypto from "node:crypto";
import net from "node:net";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
import { startControlChannel } from "./control-channel.js";
import { emitHeartbeatEvent } from "./heartbeat-events.js";
// Mock health/status to avoid hitting real services
vi.mock("../commands/health.js", () => ({
getHealthSnapshot: vi.fn(async () => ({
ts: Date.now(),
durationMs: 10,
web: {
linked: true,
authAgeMs: 1000,
connect: { ok: true, status: 200, error: null, elapsedMs: 5 },
},
heartbeatSeconds: 60,
sessions: { path: "/tmp/sessions.json", count: 1, recent: [] },
ipc: { path: "/tmp/clawdis.sock", exists: true },
})),
}));
vi.mock("../commands/status.js", () => ({
getStatusSummary: vi.fn(async () => ({
web: { linked: true, authAgeMs: 1000 },
heartbeatSeconds: 60,
sessions: {
path: "/tmp/sessions.json",
count: 1,
defaults: { model: "claude-opus-4-5", contextTokens: 200_000 },
recent: [],
},
})),
}));
describe("control channel", () => {
let server: Awaited<ReturnType<typeof startControlChannel>>;
let client: net.Socket;
beforeAll(async () => {
server = await startControlChannel({}, { port: 19999 });
client = net.createConnection({ host: "127.0.0.1", port: 19999 });
});
afterAll(async () => {
client.destroy();
await server.close();
});
const sendRequest = (method: string, params?: unknown) =>
new Promise<Record<string, unknown>>((resolve, reject) => {
const id = crypto.randomUUID();
const frame = { type: "request", id, method, params };
client.write(`${JSON.stringify(frame)}\n`);
const onData = (chunk: Buffer) => {
const lines = chunk.toString("utf8").trim().split(/\n/);
for (const line of lines) {
try {
const parsed = JSON.parse(line) as { id?: string };
if (parsed.id === id) {
client.off("data", onData);
resolve(parsed as Record<string, unknown>);
return;
}
} catch {
/* ignore non-JSON noise */
}
}
};
client.on("data", onData);
client.on("error", reject);
});
it("responds to ping", async () => {
const res = await sendRequest("ping");
expect(res.ok).toBe(true);
});
it("returns health snapshot", async () => {
const res = await sendRequest("health");
expect(res.ok).toBe(true);
const payload = res.payload as { web?: { linked?: boolean } };
expect(payload.web?.linked).toBe(true);
});
it("emits heartbeat events", async () => {
const evtPromise = new Promise<Record<string, unknown>>((resolve) => {
const handler = (chunk: Buffer) => {
const lines = chunk.toString("utf8").trim().split(/\n/);
for (const line of lines) {
const parsed = JSON.parse(line) as { type?: string; event?: string };
if (parsed.type === "event" && parsed.event === "heartbeat") {
client.off("data", handler);
resolve(parsed as Record<string, unknown>);
}
}
};
client.on("data", handler);
});
emitHeartbeatEvent({ status: "sent", to: "+1", preview: "hi" });
const evt = await evtPromise;
expect(evt.event).toBe("heartbeat");
});
});

View File

@@ -1,235 +0,0 @@
import net from "node:net";
import { getHealthSnapshot, type HealthSummary } from "../commands/health.js";
import { getStatusSummary, type StatusSummary } from "../commands/status.js";
import { logDebug, logError } from "../logger.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { type AgentEventPayload, onAgentEvent } from "./agent-events.js";
import {
emitHeartbeatEvent,
getLastHeartbeatEvent,
type HeartbeatEventPayload,
onHeartbeatEvent,
} from "./heartbeat-events.js";
import { enqueueSystemEvent } from "./system-events.js";
import { listSystemPresence, updateSystemPresence } from "./system-presence.js";
type ControlRequest = {
type: "request";
id: string;
method: string;
params?: Record<string, unknown>;
};
type ControlResponse = {
type: "response";
id: string;
ok: boolean;
payload?: unknown;
error?: string;
};
type ControlEvent = {
type: "event";
event: string;
payload: unknown;
};
type Handlers = {
setHeartbeats?: (enabled: boolean) => Promise<void> | void;
};
type ControlServer = {
close: () => Promise<void>;
broadcastHeartbeat: (evt: HeartbeatEventPayload) => void;
broadcastAgentEvent: (evt: AgentEventPayload) => void;
};
const DEFAULT_PORT = 18789;
export async function startControlChannel(
handlers: Handlers = {},
opts: { port?: number; runtime?: RuntimeEnv } = {},
): Promise<ControlServer> {
const port = opts.port ?? DEFAULT_PORT;
const runtime = opts.runtime ?? defaultRuntime;
const clients = new Set<net.Socket>();
const server = net.createServer((socket) => {
socket.setEncoding("utf8");
clients.add(socket);
// Seed relay status + last heartbeat for new clients.
write(socket, {
type: "event",
event: "relay-status",
payload: { state: "running" },
});
const last = getLastHeartbeatEvent();
if (last)
write(socket, { type: "event", event: "heartbeat", payload: last });
let buffer = "";
socket.on("data", (chunk) => {
buffer += chunk;
const lines = buffer.split(/\r?\n/);
buffer = lines.pop() ?? "";
for (const line of lines) {
logDebug(`control: line ${line.slice(0, 200)}`);
handleLine(socket, line.trim());
}
});
socket.on("error", () => {
/* ignore */
});
socket.on("close", () => {
clients.delete(socket);
});
});
await new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.listen(port, "127.0.0.1", () => resolve());
});
const stopHeartbeat = onHeartbeatEvent((evt) => broadcast("heartbeat", evt));
const stopAgent = onAgentEvent((evt) => broadcast("agent", evt));
const handleLine = async (socket: net.Socket, line: string) => {
if (!line) return;
const started = Date.now();
let parsed: ControlRequest;
try {
parsed = JSON.parse(line) as ControlRequest;
} catch (err) {
logError(
`control: parse error (${String(err)}) on line: ${line.slice(0, 200)}`,
);
return write(socket, {
type: "response",
id: "",
ok: false,
error: `parse error: ${String(err)}`,
});
}
if (parsed.type !== "request" || !parsed.id) {
return write(socket, {
type: "response",
id: parsed.id ?? "",
ok: false,
error: "unsupported frame",
});
}
const respond = (payload: unknown, ok = true, error?: string) =>
write(socket, {
type: "response",
id: parsed.id,
ok,
payload: ok ? payload : undefined,
error: ok ? undefined : error,
});
try {
logDebug(`control: recv ${parsed.method}`);
switch (parsed.method) {
case "ping": {
respond({ pong: true, ts: Date.now() });
break;
}
case "health": {
const summary = await getHealthSnapshot();
respond(summary satisfies HealthSummary);
break;
}
case "status": {
const summary = await getStatusSummary();
respond(summary satisfies StatusSummary);
break;
}
case "last-heartbeat": {
respond(getLastHeartbeatEvent());
break;
}
case "set-heartbeats": {
const enabled = Boolean(parsed.params?.enabled);
if (handlers.setHeartbeats) await handlers.setHeartbeats(enabled);
respond({ ok: true });
break;
}
case "system-event": {
const text = String(parsed.params?.text ?? "").trim();
if (text) {
enqueueSystemEvent(text);
updateSystemPresence(text);
}
respond({ ok: true });
break;
}
case "system-presence": {
const pres = listSystemPresence();
logDebug?.(`control: system-presence count=${pres.length}`);
respond(pres);
break;
}
default:
respond(undefined, false, `unknown method: ${parsed.method}`);
break;
}
logDebug(
`control: ${parsed.method} responded in ${Date.now() - started}ms`,
);
} catch (err) {
logError(
`control: ${parsed.method} failed in ${Date.now() - started}ms: ${String(err)}`,
);
respond(undefined, false, String(err));
}
};
const write = (socket: net.Socket, frame: ControlResponse | ControlEvent) => {
try {
socket.write(`${JSON.stringify(frame)}\n`);
} catch {
// ignore
}
};
const broadcast = (event: string, payload: unknown) => {
const frame: ControlEvent = { type: "event", event, payload };
const line = `${JSON.stringify(frame)}\n`;
for (const client of [...clients]) {
try {
client.write(line);
} catch {
clients.delete(client);
}
}
};
runtime.log?.(`control channel listening on 127.0.0.1:${port}`);
return {
close: async () => {
stopHeartbeat();
stopAgent();
await new Promise<void>((resolve) => server.close(() => resolve()));
for (const client of [...clients]) {
client.destroy();
}
clients.clear();
},
broadcastHeartbeat: (evt: HeartbeatEventPayload) => {
emitHeartbeatEvent(evt);
broadcast("heartbeat", evt);
},
broadcastAgentEvent: (evt: AgentEventPayload) => {
broadcast("agent", evt);
},
};
}

View File

@@ -7,11 +7,14 @@ export type SystemPresence = {
lastInputSeconds?: number;
mode?: string;
reason?: string;
instanceId?: string;
text: string;
ts: number;
};
const entries = new Map<string, SystemPresence>();
const TTL_MS = 5 * 60 * 1000; // 5 minutes
const MAX_ENTRIES = 200;
function resolvePrimaryIPv4(): string | undefined {
const nets = os.networkInterfaces();
@@ -36,12 +39,12 @@ function initSelfPresence() {
const ip = resolvePrimaryIPv4() ?? undefined;
const version =
process.env.CLAWDIS_VERSION ?? process.env.npm_package_version ?? "unknown";
const text = `Relay: ${host}${ip ? ` (${ip})` : ""} · app ${version} · mode relay · reason self`;
const text = `Gateway: ${host}${ip ? ` (${ip})` : ""} · app ${version} · mode gateway · reason self`;
const selfEntry: SystemPresence = {
host,
ip,
version,
mode: "relay",
mode: "gateway",
reason: "self",
text,
ts: Date.now(),
@@ -105,8 +108,41 @@ export function updateSystemPresence(text: string) {
entries.set(key, parsed);
}
export function upsertPresence(
key: string,
presence: Partial<SystemPresence>,
) {
ensureSelfPresence();
const existing = entries.get(key) ?? ({} as SystemPresence);
const merged: SystemPresence = {
...existing,
...presence,
ts: Date.now(),
text:
presence.text ||
existing.text ||
`Node: ${presence.host ?? existing.host ?? "unknown"} · mode ${
presence.mode ?? existing.mode ?? "unknown"
}`,
};
entries.set(key, merged);
}
export function listSystemPresence(): SystemPresence[] {
ensureSelfPresence();
// prune expired
const now = Date.now();
for (const [k, v] of [...entries]) {
if (now - v.ts > TTL_MS) entries.delete(k);
}
// enforce max size (LRU by ts)
if (entries.size > MAX_ENTRIES) {
const sorted = [...entries.entries()].sort((a, b) => a[1].ts - b[1].ts);
const toDrop = entries.size - MAX_ENTRIES;
for (let i = 0; i < toDrop; i++) {
entries.delete(sorted[i][0]);
}
}
touchSelfPresence();
return [...entries.values()].sort((a, b) => b.ts - a.ts);
}