refactor: add gateway server helper modules
This commit is contained in:
227
src/gateway/server-http.ts
Normal file
227
src/gateway/server-http.ts
Normal file
@@ -0,0 +1,227 @@
|
||||
import {
|
||||
createServer as createHttpServer,
|
||||
type IncomingMessage,
|
||||
type Server as HttpServer,
|
||||
type ServerResponse,
|
||||
} from "node:http";
|
||||
import { type WebSocketServer } from "ws";
|
||||
import { handleA2uiHttpRequest } from "../canvas-host/a2ui.js";
|
||||
import type { CanvasHostHandler } from "../canvas-host/server.js";
|
||||
import { type HooksConfigResolved, extractHookToken, normalizeAgentPayload, normalizeHookHeaders, normalizeWakePayload, readJsonBody } from "./hooks.js";
|
||||
import { applyHookMappings } from "./hooks-mapping.js";
|
||||
import { handleControlUiHttpRequest } from "./control-ui.js";
|
||||
import type { createSubsystemLogger } from "../logging.js";
|
||||
|
||||
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
|
||||
type HookDispatchers = {
|
||||
dispatchWakeHook: (value: { text: string; mode: "now" | "next-heartbeat" }) => void;
|
||||
dispatchAgentHook: (value: {
|
||||
message: string;
|
||||
name: string;
|
||||
wakeMode: "now" | "next-heartbeat";
|
||||
sessionKey: string;
|
||||
deliver: boolean;
|
||||
channel:
|
||||
| "last"
|
||||
| "whatsapp"
|
||||
| "telegram"
|
||||
| "discord"
|
||||
| "signal"
|
||||
| "imessage";
|
||||
to?: string;
|
||||
thinking?: string;
|
||||
timeoutSeconds?: number;
|
||||
}) => string;
|
||||
};
|
||||
|
||||
function sendJson(res: ServerResponse, status: number, body: unknown) {
|
||||
res.statusCode = status;
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
res.end(JSON.stringify(body));
|
||||
}
|
||||
|
||||
export type HooksRequestHandler = (
|
||||
req: IncomingMessage,
|
||||
res: ServerResponse,
|
||||
) => Promise<boolean>;
|
||||
|
||||
export function createHooksRequestHandler(opts: {
|
||||
hooksConfig: HooksConfigResolved | null;
|
||||
bindHost: string;
|
||||
port: number;
|
||||
logHooks: SubsystemLogger;
|
||||
} & HookDispatchers): HooksRequestHandler {
|
||||
const { hooksConfig, bindHost, port, logHooks, dispatchAgentHook, dispatchWakeHook } = opts;
|
||||
return async (req, res) => {
|
||||
if (!hooksConfig) return false;
|
||||
const url = new URL(req.url ?? "/", `http://${bindHost}:${port}`);
|
||||
const basePath = hooksConfig.basePath;
|
||||
if (url.pathname !== basePath && !url.pathname.startsWith(`${basePath}/`)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const token = extractHookToken(req, url);
|
||||
if (!token || token !== hooksConfig.token) {
|
||||
res.statusCode = 401;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Unauthorized");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (req.method !== "POST") {
|
||||
res.statusCode = 405;
|
||||
res.setHeader("Allow", "POST");
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Method Not Allowed");
|
||||
return true;
|
||||
}
|
||||
|
||||
const subPath = url.pathname.slice(basePath.length).replace(/^\/+/, "");
|
||||
if (!subPath) {
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
return true;
|
||||
}
|
||||
|
||||
const body = await readJsonBody(req, hooksConfig.maxBodyBytes);
|
||||
if (!body.ok) {
|
||||
const status = body.error === "payload too large" ? 413 : 400;
|
||||
sendJson(res, status, { ok: false, error: body.error });
|
||||
return true;
|
||||
}
|
||||
|
||||
const payload =
|
||||
typeof body.value === "object" && body.value !== null ? body.value : {};
|
||||
const headers = normalizeHookHeaders(req);
|
||||
|
||||
if (subPath === "wake") {
|
||||
const normalized = normalizeWakePayload(payload as Record<string, unknown>);
|
||||
if (!normalized.ok) {
|
||||
sendJson(res, 400, { ok: false, error: normalized.error });
|
||||
return true;
|
||||
}
|
||||
dispatchWakeHook(normalized.value);
|
||||
sendJson(res, 200, { ok: true, mode: normalized.value.mode });
|
||||
return true;
|
||||
}
|
||||
|
||||
if (subPath === "agent") {
|
||||
const normalized = normalizeAgentPayload(payload as Record<string, unknown>);
|
||||
if (!normalized.ok) {
|
||||
sendJson(res, 400, { ok: false, error: normalized.error });
|
||||
return true;
|
||||
}
|
||||
const runId = dispatchAgentHook(normalized.value);
|
||||
sendJson(res, 202, { ok: true, runId });
|
||||
return true;
|
||||
}
|
||||
|
||||
if (hooksConfig.mappings.length > 0) {
|
||||
try {
|
||||
const mapped = await applyHookMappings(hooksConfig.mappings, {
|
||||
payload: payload as Record<string, unknown>,
|
||||
headers,
|
||||
url,
|
||||
path: subPath,
|
||||
});
|
||||
if (mapped) {
|
||||
if (!mapped.ok) {
|
||||
sendJson(res, 400, { ok: false, error: mapped.error });
|
||||
return true;
|
||||
}
|
||||
if (mapped.action === null) {
|
||||
res.statusCode = 204;
|
||||
res.end();
|
||||
return true;
|
||||
}
|
||||
if (mapped.action.kind === "wake") {
|
||||
dispatchWakeHook({
|
||||
text: mapped.action.text,
|
||||
mode: mapped.action.mode,
|
||||
});
|
||||
sendJson(res, 200, { ok: true, mode: mapped.action.mode });
|
||||
return true;
|
||||
}
|
||||
const runId = dispatchAgentHook({
|
||||
message: mapped.action.message,
|
||||
name: mapped.action.name ?? "Hook",
|
||||
wakeMode: mapped.action.wakeMode,
|
||||
sessionKey: mapped.action.sessionKey ?? "",
|
||||
deliver: mapped.action.deliver === true,
|
||||
channel: mapped.action.channel ?? "last",
|
||||
to: mapped.action.to,
|
||||
thinking: mapped.action.thinking,
|
||||
timeoutSeconds: mapped.action.timeoutSeconds,
|
||||
});
|
||||
sendJson(res, 202, { ok: true, runId });
|
||||
return true;
|
||||
}
|
||||
} catch (err) {
|
||||
logHooks.warn(`hook mapping failed: ${String(err)}`);
|
||||
sendJson(res, 500, { ok: false, error: "hook mapping failed" });
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
export function createGatewayHttpServer(opts: {
|
||||
canvasHost: CanvasHostHandler | null;
|
||||
controlUiEnabled: boolean;
|
||||
controlUiBasePath: string;
|
||||
handleHooksRequest: HooksRequestHandler;
|
||||
}): HttpServer {
|
||||
const { canvasHost, controlUiEnabled, controlUiBasePath, handleHooksRequest } =
|
||||
opts;
|
||||
const httpServer: HttpServer = createHttpServer((req, res) => {
|
||||
// Don't interfere with WebSocket upgrades; ws handles the 'upgrade' event.
|
||||
if (String(req.headers.upgrade ?? "").toLowerCase() === "websocket") return;
|
||||
|
||||
void (async () => {
|
||||
if (await handleHooksRequest(req, res)) return;
|
||||
if (canvasHost) {
|
||||
if (await handleA2uiHttpRequest(req, res)) return;
|
||||
if (await canvasHost.handleHttpRequest(req, res)) return;
|
||||
}
|
||||
if (controlUiEnabled) {
|
||||
if (
|
||||
handleControlUiHttpRequest(req, res, {
|
||||
basePath: controlUiBasePath,
|
||||
})
|
||||
)
|
||||
return;
|
||||
}
|
||||
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
})().catch((err) => {
|
||||
res.statusCode = 500;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end(String(err));
|
||||
});
|
||||
});
|
||||
|
||||
return httpServer;
|
||||
}
|
||||
|
||||
export function attachGatewayUpgradeHandler(opts: {
|
||||
httpServer: HttpServer;
|
||||
wss: WebSocketServer;
|
||||
canvasHost: CanvasHostHandler | null;
|
||||
}) {
|
||||
const { httpServer, wss, canvasHost } = opts;
|
||||
httpServer.on("upgrade", (req, socket, head) => {
|
||||
if (canvasHost?.handleUpgrade(req, socket, head)) return;
|
||||
wss.handleUpgrade(req, socket, head, (ws) => {
|
||||
wss.emit("connection", ws, req);
|
||||
});
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user