refactor(gateway): split server helpers

This commit is contained in:
Peter Steinberger
2026-01-03 19:33:41 +01:00
parent ce92fac983
commit 6a125b554b
7 changed files with 1722 additions and 1447 deletions

View File

@@ -0,0 +1,144 @@
export type BridgeSendEventFn = (opts: {
nodeId: string;
event: string;
payloadJSON?: string | null;
}) => void;
export type BridgeListConnectedFn = () => Array<{ nodeId: string }>;
export type BridgeSubscriptionManager = {
subscribe: (nodeId: string, sessionKey: string) => void;
unsubscribe: (nodeId: string, sessionKey: string) => void;
unsubscribeAll: (nodeId: string) => void;
sendToSession: (
sessionKey: string,
event: string,
payload: unknown,
sendEvent?: BridgeSendEventFn | null,
) => void;
sendToAllSubscribed: (
event: string,
payload: unknown,
sendEvent?: BridgeSendEventFn | null,
) => void;
sendToAllConnected: (
event: string,
payload: unknown,
listConnected?: BridgeListConnectedFn | null,
sendEvent?: BridgeSendEventFn | null,
) => void;
clear: () => void;
};
export function createBridgeSubscriptionManager(): BridgeSubscriptionManager {
const bridgeNodeSubscriptions = new Map<string, Set<string>>();
const bridgeSessionSubscribers = new Map<string, Set<string>>();
const toPayloadJSON = (payload: unknown) =>
payload ? JSON.stringify(payload) : null;
const subscribe = (nodeId: string, sessionKey: string) => {
const normalizedNodeId = nodeId.trim();
const normalizedSessionKey = sessionKey.trim();
if (!normalizedNodeId || !normalizedSessionKey) return;
let nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId);
if (!nodeSet) {
nodeSet = new Set<string>();
bridgeNodeSubscriptions.set(normalizedNodeId, nodeSet);
}
if (nodeSet.has(normalizedSessionKey)) return;
nodeSet.add(normalizedSessionKey);
let sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey);
if (!sessionSet) {
sessionSet = new Set<string>();
bridgeSessionSubscribers.set(normalizedSessionKey, sessionSet);
}
sessionSet.add(normalizedNodeId);
};
const unsubscribe = (nodeId: string, sessionKey: string) => {
const normalizedNodeId = nodeId.trim();
const normalizedSessionKey = sessionKey.trim();
if (!normalizedNodeId || !normalizedSessionKey) return;
const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId);
nodeSet?.delete(normalizedSessionKey);
if (nodeSet?.size === 0) bridgeNodeSubscriptions.delete(normalizedNodeId);
const sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey);
sessionSet?.delete(normalizedNodeId);
if (sessionSet?.size === 0)
bridgeSessionSubscribers.delete(normalizedSessionKey);
};
const unsubscribeAll = (nodeId: string) => {
const normalizedNodeId = nodeId.trim();
const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId);
if (!nodeSet) return;
for (const sessionKey of nodeSet) {
const sessionSet = bridgeSessionSubscribers.get(sessionKey);
sessionSet?.delete(normalizedNodeId);
if (sessionSet?.size === 0) bridgeSessionSubscribers.delete(sessionKey);
}
bridgeNodeSubscriptions.delete(normalizedNodeId);
};
const sendToSession = (
sessionKey: string,
event: string,
payload: unknown,
sendEvent?: BridgeSendEventFn | null,
) => {
const normalizedSessionKey = sessionKey.trim();
if (!normalizedSessionKey || !sendEvent) return;
const subs = bridgeSessionSubscribers.get(normalizedSessionKey);
if (!subs || subs.size === 0) return;
const payloadJSON = toPayloadJSON(payload);
for (const nodeId of subs) {
sendEvent({ nodeId, event, payloadJSON });
}
};
const sendToAllSubscribed = (
event: string,
payload: unknown,
sendEvent?: BridgeSendEventFn | null,
) => {
if (!sendEvent) return;
const payloadJSON = toPayloadJSON(payload);
for (const nodeId of bridgeNodeSubscriptions.keys()) {
sendEvent({ nodeId, event, payloadJSON });
}
};
const sendToAllConnected = (
event: string,
payload: unknown,
listConnected?: BridgeListConnectedFn | null,
sendEvent?: BridgeSendEventFn | null,
) => {
if (!sendEvent || !listConnected) return;
const payloadJSON = toPayloadJSON(payload);
for (const node of listConnected()) {
sendEvent({ nodeId: node.nodeId, event, payloadJSON });
}
};
const clear = () => {
bridgeNodeSubscriptions.clear();
bridgeSessionSubscribers.clear();
};
return {
subscribe,
unsubscribe,
unsubscribeAll,
sendToSession,
sendToAllSubscribed,
sendToAllConnected,
clear,
};
}

1148
src/gateway/server-bridge.ts Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,17 @@
export type BrowserControlServer = {
stop: () => Promise<void>;
};
export async function startBrowserControlServerIfEnabled(): Promise<
BrowserControlServer | null
> {
if (process.env.CLAWDIS_SKIP_BROWSER_CONTROL_SERVER === "1") return null;
// Lazy import: keeps startup fast, but still bundles for the embedded
// gateway (bun --compile) via the static specifier path.
const override = process.env.CLAWDIS_BROWSER_CONTROL_MODULE?.trim();
const mod = override
? await import(override)
: await import("../browser/server.js");
await mod.startBrowserControlServerFromConfig();
return { stop: mod.stopBrowserControlServer };
}

248
src/gateway/server-chat.ts Normal file
View File

@@ -0,0 +1,248 @@
import type { AgentEventPayload } from "../infra/agent-events.js";
import { formatForLog } from "./ws-log.js";
export type ChatRunEntry = {
sessionKey: string;
clientRunId: string;
};
export type ChatRunRegistry = {
add: (sessionId: string, entry: ChatRunEntry) => void;
peek: (sessionId: string) => ChatRunEntry | undefined;
shift: (sessionId: string) => ChatRunEntry | undefined;
remove: (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => ChatRunEntry | undefined;
clear: () => void;
};
export function createChatRunRegistry(): ChatRunRegistry {
const chatRunSessions = new Map<string, ChatRunEntry[]>();
const add = (sessionId: string, entry: ChatRunEntry) => {
const queue = chatRunSessions.get(sessionId);
if (queue) {
queue.push(entry);
} else {
chatRunSessions.set(sessionId, [entry]);
}
};
const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0];
const shift = (sessionId: string) => {
const queue = chatRunSessions.get(sessionId);
if (!queue || queue.length === 0) return undefined;
const entry = queue.shift();
if (!queue.length) chatRunSessions.delete(sessionId);
return entry;
};
const remove = (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => {
const queue = chatRunSessions.get(sessionId);
if (!queue || queue.length === 0) return undefined;
const idx = queue.findIndex(
(entry) =>
entry.clientRunId === clientRunId &&
(sessionKey ? entry.sessionKey === sessionKey : true),
);
if (idx < 0) return undefined;
const [entry] = queue.splice(idx, 1);
if (!queue.length) chatRunSessions.delete(sessionId);
return entry;
};
const clear = () => {
chatRunSessions.clear();
};
return { add, peek, shift, remove, clear };
}
export type ChatRunState = {
registry: ChatRunRegistry;
buffers: Map<string, string>;
deltaSentAt: Map<string, number>;
clear: () => void;
};
export function createChatRunState(): ChatRunState {
const registry = createChatRunRegistry();
const buffers = new Map<string, string>();
const deltaSentAt = new Map<string, number>();
const clear = () => {
registry.clear();
buffers.clear();
deltaSentAt.clear();
};
return {
registry,
buffers,
deltaSentAt,
clear,
};
}
export type ChatEventBroadcast = (
event: string,
payload: unknown,
opts?: { dropIfSlow?: boolean },
) => void;
export type BridgeSendToSession = (
sessionKey: string,
event: string,
payload: unknown,
) => void;
export type AgentEventHandlerOptions = {
broadcast: ChatEventBroadcast;
bridgeSendToSession: BridgeSendToSession;
agentRunSeq: Map<string, number>;
chatRunState: ChatRunState;
resolveSessionKeyForRun: (runId: string) => string | undefined;
clearAgentRunContext: (runId: string) => void;
};
export function createAgentEventHandler({
broadcast,
bridgeSendToSession,
agentRunSeq,
chatRunState,
resolveSessionKeyForRun,
clearAgentRunContext,
}: AgentEventHandlerOptions) {
const emitChatDelta = (
sessionKey: string,
clientRunId: string,
seq: number,
text: string,
) => {
chatRunState.buffers.set(clientRunId, text);
const now = Date.now();
const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0;
if (now - last < 150) return;
chatRunState.deltaSentAt.set(clientRunId, now);
const payload = {
runId: clientRunId,
sessionKey,
seq,
state: "delta" as const,
message: {
role: "assistant",
content: [{ type: "text", text }],
timestamp: now,
},
};
broadcast("chat", payload, { dropIfSlow: true });
bridgeSendToSession(sessionKey, "chat", payload);
};
const emitChatFinal = (
sessionKey: string,
clientRunId: string,
seq: number,
jobState: "done" | "error",
error?: unknown,
) => {
const text = chatRunState.buffers.get(clientRunId)?.trim() ?? "";
chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId);
if (jobState === "done") {
const payload = {
runId: clientRunId,
sessionKey,
seq,
state: "final" as const,
message: text
? {
role: "assistant",
content: [{ type: "text", text }],
timestamp: Date.now(),
}
: undefined,
};
broadcast("chat", payload);
bridgeSendToSession(sessionKey, "chat", payload);
return;
}
const payload = {
runId: clientRunId,
sessionKey,
seq,
state: "error" as const,
errorMessage: error ? formatForLog(error) : undefined,
};
broadcast("chat", payload);
bridgeSendToSession(sessionKey, "chat", payload);
};
return (evt: AgentEventPayload) => {
const last = agentRunSeq.get(evt.runId) ?? 0;
if (evt.seq !== last + 1) {
broadcast("agent", {
runId: evt.runId,
stream: "error",
ts: Date.now(),
data: {
reason: "seq gap",
expected: last + 1,
received: evt.seq,
},
});
}
agentRunSeq.set(evt.runId, evt.seq);
broadcast("agent", evt);
const chatLink = chatRunState.registry.peek(evt.runId);
const sessionKey =
chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId);
const jobState =
evt.stream === "job" && typeof evt.data?.state === "string"
? (evt.data.state as "done" | "error" | string)
: null;
if (sessionKey) {
bridgeSendToSession(sessionKey, "agent", evt);
if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
const clientRunId = chatLink?.clientRunId ?? evt.runId;
emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text);
} else if (jobState === "done" || jobState === "error") {
if (chatLink) {
const finished = chatRunState.registry.shift(evt.runId);
if (!finished) {
clearAgentRunContext(evt.runId);
return;
}
emitChatFinal(
finished.sessionKey,
finished.clientRunId,
evt.seq,
jobState,
evt.data?.error,
);
} else {
emitChatFinal(
sessionKey,
evt.runId,
evt.seq,
jobState,
evt.data?.error,
);
}
}
}
if (jobState === "done" || jobState === "error") {
clearAgentRunContext(evt.runId);
}
};
}

View File

@@ -0,0 +1,76 @@
import fs from "node:fs";
import path from "node:path";
import { getTailnetHostname } from "../infra/tailscale.js";
import { runExec } from "../process/exec.js";
export type ResolveBonjourCliPathOptions = {
env?: NodeJS.ProcessEnv;
argv?: string[];
execPath?: string;
cwd?: string;
statSync?: (path: string) => fs.Stats;
};
export function formatBonjourInstanceName(displayName: string) {
const trimmed = displayName.trim();
if (!trimmed) return "Clawdis";
if (/clawdis/i.test(trimmed)) return trimmed;
return `${trimmed} (Clawdis)`;
}
export function resolveBonjourCliPath(
opts: ResolveBonjourCliPathOptions = {},
): string | undefined {
const env = opts.env ?? process.env;
const envPath = env.CLAWDIS_CLI_PATH?.trim();
if (envPath) return envPath;
const statSync = opts.statSync ?? fs.statSync;
const isFile = (candidate: string) => {
try {
return statSync(candidate).isFile();
} catch {
return false;
}
};
const execPath = opts.execPath ?? process.execPath;
const execDir = path.dirname(execPath);
const siblingCli = path.join(execDir, "clawdis");
if (isFile(siblingCli)) return siblingCli;
const argv = opts.argv ?? process.argv;
const argvPath = argv[1];
if (argvPath && isFile(argvPath)) {
const base = path.basename(argvPath);
if (!base.includes("gateway-daemon")) return argvPath;
}
const cwd = opts.cwd ?? process.cwd();
const distCli = path.join(cwd, "dist", "index.js");
if (isFile(distCli)) return distCli;
const binCli = path.join(cwd, "bin", "clawdis.js");
if (isFile(binCli)) return binCli;
return undefined;
}
export async function resolveTailnetDnsHint(opts?: {
env?: NodeJS.ProcessEnv;
exec?: typeof runExec;
}): Promise<string | undefined> {
const env = opts?.env ?? process.env;
const envRaw = env.CLAWDIS_TAILNET_DNS?.trim();
const envValue = envRaw && envRaw.length > 0 ? envRaw.replace(/\.$/, "") : "";
if (envValue) return envValue;
const exec =
opts?.exec ??
((command, args) =>
runExec(command, args, { timeoutMs: 1500, maxBuffer: 200_000 }));
try {
return await getTailnetHostname(exec);
} catch {
return undefined;
}
}

View File

@@ -0,0 +1,8 @@
import type { ErrorShape } from "./protocol/index.js";
export type DedupeEntry = {
ts: number;
ok: boolean;
payload?: unknown;
error?: ErrorShape;
};

File diff suppressed because it is too large Load Diff