feat(gateway): add OpenAI-compatible HTTP endpoint

This commit is contained in:
Peter Steinberger
2026-01-10 21:29:19 +01:00
parent ab314a22e0
commit dafa8a2881
6 changed files with 783 additions and 0 deletions

View File

@@ -0,0 +1,297 @@
import { describe, expect, it } from "vitest";
import { emitAgentEvent } from "../infra/agent-events.js";
import {
agentCommand,
getFreePort,
installGatewayTestHooks,
} from "./test-helpers.js";
installGatewayTestHooks();
async function startServer(port: number) {
const { startGatewayServer } = await import("./server.js");
return await startGatewayServer(port, {
host: "127.0.0.1",
auth: { mode: "token", token: "secret" },
controlUiEnabled: false,
});
}
async function postChatCompletions(
port: number,
body: unknown,
headers?: Record<string, string>,
) {
const res = await fetch(`http://127.0.0.1:${port}/v1/chat/completions`, {
method: "POST",
headers: {
"content-type": "application/json",
authorization: "Bearer secret",
...headers,
},
body: JSON.stringify(body),
});
return res;
}
function parseSseDataLines(text: string): string[] {
return text
.split("\n")
.map((line) => line.trim())
.filter((line) => line.startsWith("data: "))
.map((line) => line.slice("data: ".length));
}
describe("OpenAI-compatible HTTP API (e2e)", () => {
it("rejects non-POST", async () => {
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await fetch(`http://127.0.0.1:${port}/v1/chat/completions`, {
method: "GET",
headers: { authorization: "Bearer secret" },
});
expect(res.status).toBe(405);
} finally {
await server.close({ reason: "test done" });
}
});
it("rejects missing auth", async () => {
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await fetch(`http://127.0.0.1:${port}/v1/chat/completions`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ messages: [{ role: "user", content: "hi" }] }),
});
expect(res.status).toBe(401);
} finally {
await server.close({ reason: "test done" });
}
});
it("routes to a specific agent via header", async () => {
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],
} as never);
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(
port,
{ model: "clawdbot", messages: [{ role: "user", content: "hi" }] },
{ "x-clawdbot-agent-id": "beta" },
);
expect(res.status).toBe(200);
expect(agentCommand).toHaveBeenCalledTimes(1);
const [opts] = agentCommand.mock.calls[0] ?? [];
expect(
(opts as { sessionKey?: string } | undefined)?.sessionKey ?? "",
).toMatch(/^agent:beta:/);
} finally {
await server.close({ reason: "test done" });
}
});
it("honors x-clawdbot-session-key override", async () => {
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],
} as never);
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(
port,
{ model: "clawdbot", messages: [{ role: "user", content: "hi" }] },
{
"x-clawdbot-agent-id": "beta",
"x-clawdbot-session-key": "agent:beta:openai:custom",
},
);
expect(res.status).toBe(200);
const [opts] = agentCommand.mock.calls[0] ?? [];
expect((opts as { sessionKey?: string } | undefined)?.sessionKey).toBe(
"agent:beta:openai:custom",
);
} finally {
await server.close({ reason: "test done" });
}
});
it("uses OpenAI user for a stable session key", async () => {
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],
} as never);
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(port, {
user: "alice",
model: "clawdbot",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
const [opts] = agentCommand.mock.calls[0] ?? [];
expect(
(opts as { sessionKey?: string } | undefined)?.sessionKey ?? "",
).toContain("openai-user:alice");
} finally {
await server.close({ reason: "test done" });
}
});
it("extracts user message text from array content", async () => {
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],
} as never);
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(port, {
model: "clawdbot",
messages: [
{
role: "user",
content: [
{ type: "text", text: "hello" },
{ type: "input_text", text: "world" },
],
},
],
});
expect(res.status).toBe(200);
const [opts] = agentCommand.mock.calls[0] ?? [];
expect((opts as { message?: string } | undefined)?.message).toBe(
"hello\nworld",
);
} finally {
await server.close({ reason: "test done" });
}
});
it("returns a non-streaming OpenAI chat.completion response", async () => {
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],
} as never);
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(port, {
stream: false,
model: "clawdbot",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
const json = (await res.json()) as Record<string, unknown>;
expect(json.object).toBe("chat.completion");
expect(Array.isArray(json.choices)).toBe(true);
const choice0 = (json.choices as Array<Record<string, unknown>>)[0] ?? {};
const msg =
(choice0.message as Record<string, unknown> | undefined) ?? {};
expect(msg.role).toBe("assistant");
expect(msg.content).toBe("hello");
} finally {
await server.close({ reason: "test done" });
}
});
it("requires a user message", async () => {
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(port, {
model: "clawdbot",
messages: [{ role: "system", content: "yo" }],
});
expect(res.status).toBe(400);
const json = (await res.json()) as Record<string, unknown>;
expect((json.error as Record<string, unknown> | undefined)?.type).toBe(
"invalid_request_error",
);
} finally {
await server.close({ reason: "test done" });
}
});
it("streams SSE chunks when stream=true (delta events)", async () => {
agentCommand.mockImplementationOnce(async (opts: unknown) => {
const runId = (opts as { runId?: string } | undefined)?.runId ?? "";
emitAgentEvent({ runId, stream: "assistant", data: { delta: "he" } });
emitAgentEvent({ runId, stream: "assistant", data: { delta: "llo" } });
return { payloads: [{ text: "hello" }] } as never;
});
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(port, {
stream: true,
model: "clawdbot",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
expect(res.headers.get("content-type") ?? "").toContain(
"text/event-stream",
);
const text = await res.text();
const data = parseSseDataLines(text);
expect(data[data.length - 1]).toBe("[DONE]");
const jsonChunks = data
.filter((d) => d !== "[DONE]")
.map((d) => JSON.parse(d) as Record<string, unknown>);
expect(jsonChunks.some((c) => c.object === "chat.completion.chunk")).toBe(
true,
);
const allContent = jsonChunks
.flatMap(
(c) =>
(c.choices as Array<Record<string, unknown>> | undefined) ?? [],
)
.map(
(choice) =>
(choice.delta as Record<string, unknown> | undefined)?.content,
)
.filter((v): v is string => typeof v === "string")
.join("");
expect(allContent).toBe("hello");
} finally {
await server.close({ reason: "test done" });
}
});
it("streams SSE chunks when stream=true (fallback when no deltas)", async () => {
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],
} as never);
const port = await getFreePort();
const server = await startServer(port);
try {
const res = await postChatCompletions(port, {
stream: true,
model: "clawdbot",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
const text = await res.text();
expect(text).toContain("[DONE]");
expect(text).toContain("hello");
} finally {
await server.close({ reason: "test done" });
}
});
});

407
src/gateway/openai-http.ts Normal file
View File

@@ -0,0 +1,407 @@
import { randomUUID } from "node:crypto";
import type { IncomingMessage, ServerResponse } from "node:http";
import { createDefaultDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { emitAgentEvent, onAgentEvent } from "../infra/agent-events.js";
import {
buildAgentMainSessionKey,
normalizeAgentId,
} from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { authorizeGatewayConnect, type ResolvedGatewayAuth } from "./auth.js";
import { readJsonBody } from "./hooks.js";
type OpenAiHttpOptions = {
auth: ResolvedGatewayAuth;
maxBodyBytes?: number;
};
type OpenAiChatMessage = {
role?: unknown;
content?: unknown;
};
type OpenAiChatCompletionRequest = {
model?: unknown;
stream?: unknown;
messages?: unknown;
user?: unknown;
};
function sendJson(res: ServerResponse, status: number, body: unknown) {
res.statusCode = status;
res.setHeader("Content-Type", "application/json; charset=utf-8");
res.end(JSON.stringify(body));
}
function getHeader(req: IncomingMessage, name: string): string | undefined {
const raw = req.headers[name.toLowerCase()];
if (typeof raw === "string") return raw;
if (Array.isArray(raw)) return raw[0];
return undefined;
}
function getBearerToken(req: IncomingMessage): string | undefined {
const raw = getHeader(req, "authorization")?.trim() ?? "";
if (!raw.toLowerCase().startsWith("bearer ")) return undefined;
const token = raw.slice(7).trim();
return token || undefined;
}
function writeSse(res: ServerResponse, data: unknown) {
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
function writeDone(res: ServerResponse) {
res.write("data: [DONE]\n\n");
}
function asMessages(val: unknown): OpenAiChatMessage[] {
return Array.isArray(val) ? (val as OpenAiChatMessage[]) : [];
}
function extractTextContent(content: unknown): string {
if (typeof content === "string") return content;
if (Array.isArray(content)) {
return content
.map((part) => {
if (!part || typeof part !== "object") return "";
const type = (part as { type?: unknown }).type;
const text = (part as { text?: unknown }).text;
const inputText = (part as { input_text?: unknown }).input_text;
if (type === "text" && typeof text === "string") return text;
if (type === "input_text" && typeof text === "string") return text;
if (typeof inputText === "string") return inputText;
return "";
})
.filter(Boolean)
.join("\n");
}
return "";
}
function buildAgentPrompt(messagesUnknown: unknown): {
message: string;
extraSystemPrompt?: string;
} {
const messages = asMessages(messagesUnknown);
const systemParts: string[] = [];
let lastUser = "";
for (const msg of messages) {
if (!msg || typeof msg !== "object") continue;
const role = typeof msg.role === "string" ? msg.role.trim() : "";
const content = extractTextContent(msg.content).trim();
if (!role || !content) continue;
if (role === "system") {
systemParts.push(content);
continue;
}
if (role === "user") {
lastUser = content;
}
}
return {
message: lastUser,
extraSystemPrompt:
systemParts.length > 0 ? systemParts.join("\n\n") : undefined,
};
}
function resolveAgentId(req: IncomingMessage): string {
const raw =
getHeader(req, "x-clawdbot-agent-id")?.trim() ||
getHeader(req, "x-clawdbot-agent")?.trim() ||
"main";
return normalizeAgentId(raw);
}
function resolveSessionKey(params: {
req: IncomingMessage;
agentId: string;
user?: string | undefined;
}): string {
const explicit = getHeader(params.req, "x-clawdbot-session-key")?.trim();
if (explicit) return explicit;
// Default: stateless per-request session key, but stable if OpenAI "user" is provided.
const user = params.user?.trim();
const mainKey = user ? `openai-user:${user}` : `openai:${randomUUID()}`;
return buildAgentMainSessionKey({ agentId: params.agentId, mainKey });
}
function coerceRequest(val: unknown): OpenAiChatCompletionRequest {
if (!val || typeof val !== "object") return {};
return val as OpenAiChatCompletionRequest;
}
export async function handleOpenAiHttpRequest(
req: IncomingMessage,
res: ServerResponse,
opts: OpenAiHttpOptions,
): Promise<boolean> {
const url = new URL(
req.url ?? "/",
`http://${req.headers.host || "localhost"}`,
);
if (url.pathname !== "/v1/chat/completions") return false;
if (req.method !== "POST") {
res.statusCode = 405;
res.setHeader("Allow", "POST");
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Method Not Allowed");
return true;
}
const token = getBearerToken(req);
const authResult = await authorizeGatewayConnect({
auth: opts.auth,
connectAuth: { token, password: token },
req,
});
if (!authResult.ok) {
sendJson(res, 401, {
error: { message: "Unauthorized", type: "unauthorized" },
});
return true;
}
const body = await readJsonBody(req, opts.maxBodyBytes ?? 1024 * 1024);
if (!body.ok) {
sendJson(res, 400, {
error: { message: body.error, type: "invalid_request_error" },
});
return true;
}
const payload = coerceRequest(body.value);
const stream = Boolean(payload.stream);
const model = typeof payload.model === "string" ? payload.model : "clawdbot";
const user = typeof payload.user === "string" ? payload.user : undefined;
const agentId = resolveAgentId(req);
const sessionKey = resolveSessionKey({ req, agentId, user });
const prompt = buildAgentPrompt(payload.messages);
if (!prompt.message) {
sendJson(res, 400, {
error: {
message: "Missing user message in `messages`.",
type: "invalid_request_error",
},
});
return true;
}
const runId = `chatcmpl_${randomUUID()}`;
const deps = createDefaultDeps();
if (!stream) {
try {
const result = await agentCommand(
{
message: prompt.message,
extraSystemPrompt: prompt.extraSystemPrompt,
sessionKey,
runId,
deliver: false,
messageProvider: "webchat",
bestEffortDeliver: false,
},
defaultRuntime,
deps,
);
const payloads = (
result as { payloads?: Array<{ text?: string }> } | null
)?.payloads;
const content =
Array.isArray(payloads) && payloads.length > 0
? payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n")
: "No response from Clawdbot.";
sendJson(res, 200, {
id: runId,
object: "chat.completion",
created: Math.floor(Date.now() / 1000),
model,
choices: [
{
index: 0,
message: { role: "assistant", content },
finish_reason: "stop",
},
],
usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
});
} catch (err) {
sendJson(res, 500, {
error: { message: String(err), type: "api_error" },
});
}
return true;
}
res.statusCode = 200;
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.flushHeaders?.();
let wroteRole = false;
let sawAssistantDelta = false;
let closed = false;
const unsubscribe = onAgentEvent((evt) => {
if (evt.runId !== runId) return;
if (closed) return;
if (evt.stream === "assistant") {
const delta = evt.data?.delta;
const text = evt.data?.text;
const content =
typeof delta === "string"
? delta
: typeof text === "string"
? text
: "";
if (!content) return;
if (!wroteRole) {
wroteRole = true;
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
model,
choices: [{ index: 0, delta: { role: "assistant" } }],
});
}
sawAssistantDelta = true;
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
model,
choices: [
{
index: 0,
delta: { content },
finish_reason: null,
},
],
});
return;
}
if (evt.stream === "lifecycle") {
const phase = evt.data?.phase;
if (phase === "end" || phase === "error") {
closed = true;
unsubscribe();
writeDone(res);
res.end();
}
}
});
req.on("close", () => {
closed = true;
unsubscribe();
});
void (async () => {
try {
const result = await agentCommand(
{
message: prompt.message,
extraSystemPrompt: prompt.extraSystemPrompt,
sessionKey,
runId,
deliver: false,
messageProvider: "webchat",
bestEffortDeliver: false,
},
defaultRuntime,
deps,
);
if (closed) return;
if (!sawAssistantDelta) {
if (!wroteRole) {
wroteRole = true;
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
model,
choices: [{ index: 0, delta: { role: "assistant" } }],
});
}
const payloads = (
result as { payloads?: Array<{ text?: string }> } | null
)?.payloads;
const content =
Array.isArray(payloads) && payloads.length > 0
? payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n")
: "No response from Clawdbot.";
sawAssistantDelta = true;
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
model,
choices: [
{
index: 0,
delta: { content },
finish_reason: null,
},
],
});
}
} catch (err) {
if (closed) return;
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
model,
choices: [
{
index: 0,
delta: { content: `Error: ${String(err)}` },
finish_reason: "stop",
},
],
});
emitAgentEvent({
runId,
stream: "lifecycle",
data: { phase: "error" },
});
} finally {
if (!closed) {
closed = true;
unsubscribe();
writeDone(res);
res.end();
}
}
})();
return true;
}

View File

@@ -22,6 +22,7 @@ import {
resolveHookProvider,
} from "./hooks.js";
import { applyHookMappings } from "./hooks-mapping.js";
import { handleOpenAiHttpRequest } from "./openai-http.js";
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
@@ -206,12 +207,14 @@ export function createGatewayHttpServer(opts: {
controlUiEnabled: boolean;
controlUiBasePath: string;
handleHooksRequest: HooksRequestHandler;
resolvedAuth: import("./auth.js").ResolvedGatewayAuth;
}): HttpServer {
const {
canvasHost,
controlUiEnabled,
controlUiBasePath,
handleHooksRequest,
resolvedAuth,
} = opts;
const httpServer: HttpServer = createHttpServer((req, res) => {
// Don't interfere with WebSocket upgrades; ws handles the 'upgrade' event.
@@ -219,6 +222,8 @@ export function createGatewayHttpServer(opts: {
void (async () => {
if (await handleHooksRequest(req, res)) return;
if (await handleOpenAiHttpRequest(req, res, { auth: resolvedAuth }))
return;
if (canvasHost) {
if (await handleA2uiHttpRequest(req, res)) return;
if (await canvasHost.handleHttpRequest(req, res)) return;

View File

@@ -616,6 +616,7 @@ export async function startGatewayServer(
controlUiEnabled,
controlUiBasePath,
handleHooksRequest,
resolvedAuth,
});
let bonjourStop: (() => Promise<void>) | null = null;
let bridge: Awaited<ReturnType<typeof startNodeBridgeServer>> | null = null;