feat(gateway): add bridge RPC chat history and push

This commit is contained in:
Peter Steinberger
2025-12-14 01:54:44 +00:00
parent dd7be2bfd8
commit dccdc950bf
4 changed files with 688 additions and 5 deletions

View File

@@ -492,6 +492,8 @@ export async function startGatewayServer(
const httpServer: HttpServer = createHttpServer();
let bonjourStop: (() => Promise<void>) | null = null;
let bridge: Awaited<ReturnType<typeof startNodeBridgeServer>> | null = null;
const bridgeNodeSubscriptions = new Map<string, Set<string>>();
const bridgeSessionSubscribers = new Map<string, Set<string>>();
try {
await new Promise<void>((resolve, reject) => {
const onError = (err: NodeJS.ErrnoException) => {
@@ -677,6 +679,311 @@ export async function startGatewayServer(
: 18790;
const bridgeEnabled = process.env.CLAWDIS_BRIDGE_ENABLED !== "0";
const bridgeSubscribe = (nodeId: string, sessionKey: string) => {
const normalizedNodeId = nodeId.trim();
const normalizedSessionKey = sessionKey.trim();
if (!normalizedNodeId || !normalizedSessionKey) return;
let nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId);
if (!nodeSet) {
nodeSet = new Set<string>();
bridgeNodeSubscriptions.set(normalizedNodeId, nodeSet);
}
if (nodeSet.has(normalizedSessionKey)) return;
nodeSet.add(normalizedSessionKey);
let sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey);
if (!sessionSet) {
sessionSet = new Set<string>();
bridgeSessionSubscribers.set(normalizedSessionKey, sessionSet);
}
sessionSet.add(normalizedNodeId);
};
const bridgeUnsubscribe = (nodeId: string, sessionKey: string) => {
const normalizedNodeId = nodeId.trim();
const normalizedSessionKey = sessionKey.trim();
if (!normalizedNodeId || !normalizedSessionKey) return;
const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId);
nodeSet?.delete(normalizedSessionKey);
if (nodeSet?.size === 0) bridgeNodeSubscriptions.delete(normalizedNodeId);
const sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey);
sessionSet?.delete(normalizedNodeId);
if (sessionSet?.size === 0)
bridgeSessionSubscribers.delete(normalizedSessionKey);
};
const bridgeUnsubscribeAll = (nodeId: string) => {
const normalizedNodeId = nodeId.trim();
const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId);
if (!nodeSet) return;
for (const sessionKey of nodeSet) {
const sessionSet = bridgeSessionSubscribers.get(sessionKey);
sessionSet?.delete(normalizedNodeId);
if (sessionSet?.size === 0) bridgeSessionSubscribers.delete(sessionKey);
}
bridgeNodeSubscriptions.delete(normalizedNodeId);
};
const bridgeSendToSession = (
sessionKey: string,
event: string,
payload: unknown,
) => {
const normalizedSessionKey = sessionKey.trim();
if (!normalizedSessionKey) return;
const subs = bridgeSessionSubscribers.get(normalizedSessionKey);
if (!subs || subs.size === 0) return;
if (!bridge) return;
const payloadJSON = payload ? JSON.stringify(payload) : null;
for (const nodeId of subs) {
bridge.sendEvent({ nodeId, event, payloadJSON });
}
};
const bridgeSendToAllSubscribed = (event: string, payload: unknown) => {
if (!bridge) return;
const payloadJSON = payload ? JSON.stringify(payload) : null;
for (const nodeId of bridgeNodeSubscriptions.keys()) {
bridge.sendEvent({ nodeId, event, payloadJSON });
}
};
const handleBridgeRequest = async (
nodeId: string,
req: { id: string; method: string; paramsJSON?: string | null },
): Promise<
| { ok: true; payloadJSON?: string | null }
| { ok: false; error: { code: string; message: string; details?: unknown } }
> => {
const method = req.method.trim();
const parseParams = (): Record<string, unknown> => {
const raw = typeof req.paramsJSON === "string" ? req.paramsJSON : "";
const trimmed = raw.trim();
if (!trimmed) return {};
const parsed = JSON.parse(trimmed) as unknown;
return typeof parsed === "object" && parsed !== null
? (parsed as Record<string, unknown>)
: {};
};
try {
switch (method) {
case "health": {
const now = Date.now();
const cached = healthCache;
if (cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) {
return { ok: true, payloadJSON: JSON.stringify(cached) };
}
const snap = await refreshHealthSnapshot({ probe: false });
return { ok: true, payloadJSON: JSON.stringify(snap) };
}
case "chat.history": {
const params = parseParams();
if (!validateChatHistoryParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`,
},
};
}
const { sessionKey } = params as { sessionKey: string };
const { storePath, entry } = loadSessionEntry(sessionKey);
const sessionId = entry?.sessionId;
const messages =
sessionId && storePath
? readSessionMessages(sessionId, storePath)
: [];
const thinkingLevel =
entry?.thinkingLevel ??
loadConfig().inbound?.reply?.thinkingDefault ??
"off";
return {
ok: true,
payloadJSON: JSON.stringify({
sessionKey,
sessionId,
messages,
thinkingLevel,
}),
};
}
case "chat.send": {
const params = parseParams();
if (!validateChatSendParams(params)) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`,
},
};
}
const p = params as {
sessionKey: string;
message: string;
thinking?: string;
deliver?: boolean;
attachments?: Array<{
type?: string;
mimeType?: string;
fileName?: string;
content?: unknown;
}>;
timeoutMs?: number;
idempotencyKey: string;
};
const timeoutMs = Math.min(
Math.max(p.timeoutMs ?? 30_000, 0),
30_000,
);
const normalizedAttachments =
p.attachments?.map((a) => ({
type: typeof a?.type === "string" ? a.type : undefined,
mimeType:
typeof a?.mimeType === "string" ? a.mimeType : undefined,
fileName:
typeof a?.fileName === "string" ? a.fileName : undefined,
content:
typeof a?.content === "string"
? a.content
: ArrayBuffer.isView(a?.content)
? Buffer.from(
a.content.buffer,
a.content.byteOffset,
a.content.byteLength,
).toString("base64")
: undefined,
})) ?? [];
let messageWithAttachments = p.message;
if (normalizedAttachments.length > 0) {
try {
messageWithAttachments = buildMessageWithAttachments(
p.message,
normalizedAttachments,
{ maxBytes: 5_000_000 },
);
} catch (err) {
return {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: String(err),
},
};
}
}
const { storePath, store, entry } = loadSessionEntry(p.sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
systemSent: entry?.systemSent,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
if (store) {
store[p.sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, store);
}
}
const clientRunId = p.idempotencyKey;
chatRunSessions.set(sessionId, {
sessionKey: p.sessionKey,
clientRunId,
});
const cached = dedupe.get(`chat:${clientRunId}`);
if (cached) {
if (cached.ok) {
return { ok: true, payloadJSON: JSON.stringify(cached.payload) };
}
return {
ok: false,
error: cached.error ?? {
code: ErrorCodes.UNAVAILABLE,
message: "request failed",
},
};
}
try {
await agentCommand(
{
message: messageWithAttachments,
sessionId,
thinking: p.thinking,
deliver: p.deliver,
timeout: Math.ceil(timeoutMs / 1000).toString(),
surface: `Iris(${nodeId})`,
},
defaultRuntime,
deps,
);
const payload = {
runId: clientRunId,
status: "ok" as const,
};
dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: true,
payload,
});
return { ok: true, payloadJSON: JSON.stringify(payload) };
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId: clientRunId,
status: "error" as const,
summary: String(err),
};
dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: false,
payload,
error,
});
return {
ok: false,
error: error ?? {
code: ErrorCodes.UNAVAILABLE,
message: String(err),
},
};
}
}
default:
return {
ok: false,
error: {
code: "FORBIDDEN",
message: "Method not allowed",
details: { method },
},
};
}
} catch (err) {
return {
ok: false,
error: { code: ErrorCodes.INVALID_REQUEST, message: String(err) },
};
}
};
const handleBridgeEvent = async (
nodeId: string,
evt: { event: string; payloadJSON?: string | null },
@@ -807,6 +1114,42 @@ export async function startGatewayServer(
});
return;
}
case "chat.subscribe": {
if (!evt.payloadJSON) return;
let payload: unknown;
try {
payload = JSON.parse(evt.payloadJSON) as unknown;
} catch {
return;
}
const obj =
typeof payload === "object" && payload !== null
? (payload as Record<string, unknown>)
: {};
const sessionKey =
typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
if (!sessionKey) return;
bridgeSubscribe(nodeId, sessionKey);
return;
}
case "chat.unsubscribe": {
if (!evt.payloadJSON) return;
let payload: unknown;
try {
payload = JSON.parse(evt.payloadJSON) as unknown;
} catch {
return;
}
const obj =
typeof payload === "object" && payload !== null
? (payload as Record<string, unknown>)
: {};
const sessionKey =
typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : "";
if (!sessionKey) return;
bridgeUnsubscribe(nodeId, sessionKey);
return;
}
default:
return;
}
@@ -820,6 +1163,7 @@ export async function startGatewayServer(
host: bridgeHost,
port: bridgePort,
serverName: machineDisplayName,
onRequest: (nodeId, req) => handleBridgeRequest(nodeId, req),
onAuthenticated: (node) => {
const host = node.displayName?.trim() || node.nodeId;
const ip = node.remoteIp?.trim();
@@ -851,6 +1195,7 @@ export async function startGatewayServer(
);
},
onDisconnected: (node) => {
bridgeUnsubscribeAll(node.nodeId);
const host = node.displayName?.trim() || node.nodeId;
const ip = node.remoteIp?.trim();
const version = node.version?.trim() || "unknown";
@@ -924,11 +1269,14 @@ export async function startGatewayServer(
broadcast("health", snap, {
stateVersion: { presence: presenceVersion, health: healthVersion },
});
bridgeSendToAllSubscribed("health", snap);
};
// periodic keepalive
const tickInterval = setInterval(() => {
broadcast("tick", { ts: Date.now() }, { dropIfSlow: true });
const payload = { ts: Date.now() };
broadcast("tick", payload, { dropIfSlow: true });
bridgeSendToAllSubscribed("tick", payload);
}, TICK_INTERVAL_MS);
// periodic health refresh to keep cached snapshot warm
@@ -997,7 +1345,7 @@ export async function startGatewayServer(
const text = chatRunBuffers.get(clientRunId)?.join("\n").trim() ?? "";
chatRunBuffers.delete(clientRunId);
if (evt.data.state === "done") {
broadcast("chat", {
const payload = {
...base,
state: "final",
message: text
@@ -1007,13 +1355,17 @@ export async function startGatewayServer(
timestamp: Date.now(),
}
: undefined,
});
};
broadcast("chat", payload);
bridgeSendToSession(sessionKey, "chat", payload);
} else {
broadcast("chat", {
const payload = {
...base,
state: "error",
errorMessage: evt.data.error ? String(evt.data.error) : undefined,
});
};
broadcast("chat", payload);
bridgeSendToSession(sessionKey, "chat", payload);
}
chatRunSessions.delete(evt.runId);
}