diff --git a/docs/gateway/index.md b/docs/gateway/index.md index de553aa2d..a9f0a70b9 100644 --- a/docs/gateway/index.md +++ b/docs/gateway/index.md @@ -28,6 +28,7 @@ pnpm gateway:watch - Disable with `gateway.reload.mode="off"`. - Binds WebSocket control plane to `127.0.0.1:` (default 18789). - The same port also serves HTTP (control UI, hooks, A2UI). Single-port multiplex. + - OpenAI-compatible HTTP API: [`/v1/chat/completions`](/gateway/openai-http-api). - Starts a Canvas file server by default on `canvasHost.port` (default `18793`), serving `http://:18793/__clawdbot__/canvas/` from `~/clawd/canvas`. Disable with `canvasHost.enabled=false` or `CLAWDBOT_SKIP_CANVAS_HOST=1`. - Logs to stdout; use launchd/systemd to keep it alive and rotate logs. - Pass `--verbose` to mirror debug logging (handshakes, req/res, events) from the log file into stdio when troubleshooting. diff --git a/docs/gateway/openai-http-api.md b/docs/gateway/openai-http-api.md new file mode 100644 index 000000000..519158d11 --- /dev/null +++ b/docs/gateway/openai-http-api.md @@ -0,0 +1,72 @@ +--- +summary: "Expose an OpenAI-compatible /v1/chat/completions HTTP endpoint from the Gateway" +read_when: + - Integrating tools that expect OpenAI Chat Completions +--- +# OpenAI-compatible HTTP API + +Clawdbot’s Gateway can serve a small OpenAI-compatible endpoint: + +- `POST /v1/chat/completions` +- Same port as the Gateway (WS + HTTP multiplex): `http://:/v1/chat/completions` + +## Authentication + +Uses the Gateway auth configuration. Send a bearer token: + +- `Authorization: Bearer ` + +Notes: +- When `gateway.auth.mode="token"`, use `gateway.auth.token` (or `CLAWDBOT_GATEWAY_TOKEN`). +- When `gateway.auth.mode="password"`, use `gateway.auth.password` (or `CLAWDBOT_GATEWAY_PASSWORD`). + +## Choosing an agent + +Target a specific Clawdbot agent by id: + +- `x-clawdbot-agent-id: ` (default: `main`) + +Advanced: +- `x-clawdbot-session-key: ` to fully control session routing. + +## Session behavior + +By default the endpoint is **stateless per request** (a new session key is generated each call). + +If the request includes an OpenAI `user` string, the Gateway derives a stable session key from it, so repeated calls can share an agent session. + +## Streaming (SSE) + +Set `stream: true` to receive Server-Sent Events (SSE): + +- `Content-Type: text/event-stream` +- Each event line is `data: ` +- Stream ends with `data: [DONE]` + +## Examples + +Non-streaming: +```bash +curl -sS http://127.0.0.1:18789/v1/chat/completions \ + -H 'Authorization: Bearer YOUR_TOKEN' \ + -H 'Content-Type: application/json' \ + -H 'x-clawdbot-agent-id: main' \ + -d '{ + "model": "clawdbot", + "messages": [{"role":"user","content":"hi"}] + }' +``` + +Streaming: +```bash +curl -N http://127.0.0.1:18789/v1/chat/completions \ + -H 'Authorization: Bearer YOUR_TOKEN' \ + -H 'Content-Type: application/json' \ + -H 'x-clawdbot-agent-id: main' \ + -d '{ + "model": "clawdbot", + "stream": true, + "messages": [{"role":"user","content":"hi"}] + }' +``` + diff --git a/src/gateway/openai-http.e2e.test.ts b/src/gateway/openai-http.e2e.test.ts new file mode 100644 index 000000000..ec85285d5 --- /dev/null +++ b/src/gateway/openai-http.e2e.test.ts @@ -0,0 +1,297 @@ +import { describe, expect, it } from "vitest"; + +import { emitAgentEvent } from "../infra/agent-events.js"; +import { + agentCommand, + getFreePort, + installGatewayTestHooks, +} from "./test-helpers.js"; + +installGatewayTestHooks(); + +async function startServer(port: number) { + const { startGatewayServer } = await import("./server.js"); + return await startGatewayServer(port, { + host: "127.0.0.1", + auth: { mode: "token", token: "secret" }, + controlUiEnabled: false, + }); +} + +async function postChatCompletions( + port: number, + body: unknown, + headers?: Record, +) { + const res = await fetch(`http://127.0.0.1:${port}/v1/chat/completions`, { + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Bearer secret", + ...headers, + }, + body: JSON.stringify(body), + }); + return res; +} + +function parseSseDataLines(text: string): string[] { + return text + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.startsWith("data: ")) + .map((line) => line.slice("data: ".length)); +} + +describe("OpenAI-compatible HTTP API (e2e)", () => { + it("rejects non-POST", async () => { + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await fetch(`http://127.0.0.1:${port}/v1/chat/completions`, { + method: "GET", + headers: { authorization: "Bearer secret" }, + }); + expect(res.status).toBe(405); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("rejects missing auth", async () => { + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await fetch(`http://127.0.0.1:${port}/v1/chat/completions`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ messages: [{ role: "user", content: "hi" }] }), + }); + expect(res.status).toBe(401); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("routes to a specific agent via header", async () => { + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "hello" }], + } as never); + + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions( + port, + { model: "clawdbot", messages: [{ role: "user", content: "hi" }] }, + { "x-clawdbot-agent-id": "beta" }, + ); + expect(res.status).toBe(200); + + expect(agentCommand).toHaveBeenCalledTimes(1); + const [opts] = agentCommand.mock.calls[0] ?? []; + expect( + (opts as { sessionKey?: string } | undefined)?.sessionKey ?? "", + ).toMatch(/^agent:beta:/); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("honors x-clawdbot-session-key override", async () => { + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "hello" }], + } as never); + + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions( + port, + { model: "clawdbot", messages: [{ role: "user", content: "hi" }] }, + { + "x-clawdbot-agent-id": "beta", + "x-clawdbot-session-key": "agent:beta:openai:custom", + }, + ); + expect(res.status).toBe(200); + + const [opts] = agentCommand.mock.calls[0] ?? []; + expect((opts as { sessionKey?: string } | undefined)?.sessionKey).toBe( + "agent:beta:openai:custom", + ); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("uses OpenAI user for a stable session key", async () => { + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "hello" }], + } as never); + + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions(port, { + user: "alice", + model: "clawdbot", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + + const [opts] = agentCommand.mock.calls[0] ?? []; + expect( + (opts as { sessionKey?: string } | undefined)?.sessionKey ?? "", + ).toContain("openai-user:alice"); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("extracts user message text from array content", async () => { + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "hello" }], + } as never); + + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions(port, { + model: "clawdbot", + messages: [ + { + role: "user", + content: [ + { type: "text", text: "hello" }, + { type: "input_text", text: "world" }, + ], + }, + ], + }); + expect(res.status).toBe(200); + + const [opts] = agentCommand.mock.calls[0] ?? []; + expect((opts as { message?: string } | undefined)?.message).toBe( + "hello\nworld", + ); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("returns a non-streaming OpenAI chat.completion response", async () => { + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "hello" }], + } as never); + + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions(port, { + stream: false, + model: "clawdbot", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + const json = (await res.json()) as Record; + expect(json.object).toBe("chat.completion"); + expect(Array.isArray(json.choices)).toBe(true); + const choice0 = (json.choices as Array>)[0] ?? {}; + const msg = + (choice0.message as Record | undefined) ?? {}; + expect(msg.role).toBe("assistant"); + expect(msg.content).toBe("hello"); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("requires a user message", async () => { + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions(port, { + model: "clawdbot", + messages: [{ role: "system", content: "yo" }], + }); + expect(res.status).toBe(400); + const json = (await res.json()) as Record; + expect((json.error as Record | undefined)?.type).toBe( + "invalid_request_error", + ); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("streams SSE chunks when stream=true (delta events)", async () => { + agentCommand.mockImplementationOnce(async (opts: unknown) => { + const runId = (opts as { runId?: string } | undefined)?.runId ?? ""; + emitAgentEvent({ runId, stream: "assistant", data: { delta: "he" } }); + emitAgentEvent({ runId, stream: "assistant", data: { delta: "llo" } }); + return { payloads: [{ text: "hello" }] } as never; + }); + + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions(port, { + stream: true, + model: "clawdbot", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + expect(res.headers.get("content-type") ?? "").toContain( + "text/event-stream", + ); + + const text = await res.text(); + const data = parseSseDataLines(text); + expect(data[data.length - 1]).toBe("[DONE]"); + + const jsonChunks = data + .filter((d) => d !== "[DONE]") + .map((d) => JSON.parse(d) as Record); + expect(jsonChunks.some((c) => c.object === "chat.completion.chunk")).toBe( + true, + ); + const allContent = jsonChunks + .flatMap( + (c) => + (c.choices as Array> | undefined) ?? [], + ) + .map( + (choice) => + (choice.delta as Record | undefined)?.content, + ) + .filter((v): v is string => typeof v === "string") + .join(""); + expect(allContent).toBe("hello"); + } finally { + await server.close({ reason: "test done" }); + } + }); + + it("streams SSE chunks when stream=true (fallback when no deltas)", async () => { + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "hello" }], + } as never); + + const port = await getFreePort(); + const server = await startServer(port); + try { + const res = await postChatCompletions(port, { + stream: true, + model: "clawdbot", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + const text = await res.text(); + expect(text).toContain("[DONE]"); + expect(text).toContain("hello"); + } finally { + await server.close({ reason: "test done" }); + } + }); +}); diff --git a/src/gateway/openai-http.ts b/src/gateway/openai-http.ts new file mode 100644 index 000000000..4194cb33a --- /dev/null +++ b/src/gateway/openai-http.ts @@ -0,0 +1,407 @@ +import { randomUUID } from "node:crypto"; +import type { IncomingMessage, ServerResponse } from "node:http"; + +import { createDefaultDeps } from "../cli/deps.js"; +import { agentCommand } from "../commands/agent.js"; +import { emitAgentEvent, onAgentEvent } from "../infra/agent-events.js"; +import { + buildAgentMainSessionKey, + normalizeAgentId, +} from "../routing/session-key.js"; +import { defaultRuntime } from "../runtime.js"; +import { authorizeGatewayConnect, type ResolvedGatewayAuth } from "./auth.js"; +import { readJsonBody } from "./hooks.js"; + +type OpenAiHttpOptions = { + auth: ResolvedGatewayAuth; + maxBodyBytes?: number; +}; + +type OpenAiChatMessage = { + role?: unknown; + content?: unknown; +}; + +type OpenAiChatCompletionRequest = { + model?: unknown; + stream?: unknown; + messages?: unknown; + user?: unknown; +}; + +function sendJson(res: ServerResponse, status: number, body: unknown) { + res.statusCode = status; + res.setHeader("Content-Type", "application/json; charset=utf-8"); + res.end(JSON.stringify(body)); +} + +function getHeader(req: IncomingMessage, name: string): string | undefined { + const raw = req.headers[name.toLowerCase()]; + if (typeof raw === "string") return raw; + if (Array.isArray(raw)) return raw[0]; + return undefined; +} + +function getBearerToken(req: IncomingMessage): string | undefined { + const raw = getHeader(req, "authorization")?.trim() ?? ""; + if (!raw.toLowerCase().startsWith("bearer ")) return undefined; + const token = raw.slice(7).trim(); + return token || undefined; +} + +function writeSse(res: ServerResponse, data: unknown) { + res.write(`data: ${JSON.stringify(data)}\n\n`); +} + +function writeDone(res: ServerResponse) { + res.write("data: [DONE]\n\n"); +} + +function asMessages(val: unknown): OpenAiChatMessage[] { + return Array.isArray(val) ? (val as OpenAiChatMessage[]) : []; +} + +function extractTextContent(content: unknown): string { + if (typeof content === "string") return content; + if (Array.isArray(content)) { + return content + .map((part) => { + if (!part || typeof part !== "object") return ""; + const type = (part as { type?: unknown }).type; + const text = (part as { text?: unknown }).text; + const inputText = (part as { input_text?: unknown }).input_text; + if (type === "text" && typeof text === "string") return text; + if (type === "input_text" && typeof text === "string") return text; + if (typeof inputText === "string") return inputText; + return ""; + }) + .filter(Boolean) + .join("\n"); + } + return ""; +} + +function buildAgentPrompt(messagesUnknown: unknown): { + message: string; + extraSystemPrompt?: string; +} { + const messages = asMessages(messagesUnknown); + + const systemParts: string[] = []; + let lastUser = ""; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") continue; + const role = typeof msg.role === "string" ? msg.role.trim() : ""; + const content = extractTextContent(msg.content).trim(); + if (!role || !content) continue; + if (role === "system") { + systemParts.push(content); + continue; + } + if (role === "user") { + lastUser = content; + } + } + + return { + message: lastUser, + extraSystemPrompt: + systemParts.length > 0 ? systemParts.join("\n\n") : undefined, + }; +} + +function resolveAgentId(req: IncomingMessage): string { + const raw = + getHeader(req, "x-clawdbot-agent-id")?.trim() || + getHeader(req, "x-clawdbot-agent")?.trim() || + "main"; + return normalizeAgentId(raw); +} + +function resolveSessionKey(params: { + req: IncomingMessage; + agentId: string; + user?: string | undefined; +}): string { + const explicit = getHeader(params.req, "x-clawdbot-session-key")?.trim(); + if (explicit) return explicit; + + // Default: stateless per-request session key, but stable if OpenAI "user" is provided. + const user = params.user?.trim(); + const mainKey = user ? `openai-user:${user}` : `openai:${randomUUID()}`; + return buildAgentMainSessionKey({ agentId: params.agentId, mainKey }); +} + +function coerceRequest(val: unknown): OpenAiChatCompletionRequest { + if (!val || typeof val !== "object") return {}; + return val as OpenAiChatCompletionRequest; +} + +export async function handleOpenAiHttpRequest( + req: IncomingMessage, + res: ServerResponse, + opts: OpenAiHttpOptions, +): Promise { + const url = new URL( + req.url ?? "/", + `http://${req.headers.host || "localhost"}`, + ); + if (url.pathname !== "/v1/chat/completions") return false; + + if (req.method !== "POST") { + res.statusCode = 405; + res.setHeader("Allow", "POST"); + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end("Method Not Allowed"); + return true; + } + + const token = getBearerToken(req); + const authResult = await authorizeGatewayConnect({ + auth: opts.auth, + connectAuth: { token, password: token }, + req, + }); + if (!authResult.ok) { + sendJson(res, 401, { + error: { message: "Unauthorized", type: "unauthorized" }, + }); + return true; + } + + const body = await readJsonBody(req, opts.maxBodyBytes ?? 1024 * 1024); + if (!body.ok) { + sendJson(res, 400, { + error: { message: body.error, type: "invalid_request_error" }, + }); + return true; + } + + const payload = coerceRequest(body.value); + const stream = Boolean(payload.stream); + const model = typeof payload.model === "string" ? payload.model : "clawdbot"; + const user = typeof payload.user === "string" ? payload.user : undefined; + + const agentId = resolveAgentId(req); + const sessionKey = resolveSessionKey({ req, agentId, user }); + const prompt = buildAgentPrompt(payload.messages); + if (!prompt.message) { + sendJson(res, 400, { + error: { + message: "Missing user message in `messages`.", + type: "invalid_request_error", + }, + }); + return true; + } + + const runId = `chatcmpl_${randomUUID()}`; + const deps = createDefaultDeps(); + + if (!stream) { + try { + const result = await agentCommand( + { + message: prompt.message, + extraSystemPrompt: prompt.extraSystemPrompt, + sessionKey, + runId, + deliver: false, + messageProvider: "webchat", + bestEffortDeliver: false, + }, + defaultRuntime, + deps, + ); + + const payloads = ( + result as { payloads?: Array<{ text?: string }> } | null + )?.payloads; + const content = + Array.isArray(payloads) && payloads.length > 0 + ? payloads + .map((p) => (typeof p.text === "string" ? p.text : "")) + .filter(Boolean) + .join("\n\n") + : "No response from Clawdbot."; + + sendJson(res, 200, { + id: runId, + object: "chat.completion", + created: Math.floor(Date.now() / 1000), + model, + choices: [ + { + index: 0, + message: { role: "assistant", content }, + finish_reason: "stop", + }, + ], + usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }, + }); + } catch (err) { + sendJson(res, 500, { + error: { message: String(err), type: "api_error" }, + }); + } + return true; + } + + res.statusCode = 200; + res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.flushHeaders?.(); + + let wroteRole = false; + let sawAssistantDelta = false; + let closed = false; + + const unsubscribe = onAgentEvent((evt) => { + if (evt.runId !== runId) return; + if (closed) return; + + if (evt.stream === "assistant") { + const delta = evt.data?.delta; + const text = evt.data?.text; + const content = + typeof delta === "string" + ? delta + : typeof text === "string" + ? text + : ""; + if (!content) return; + + if (!wroteRole) { + wroteRole = true; + writeSse(res, { + id: runId, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model, + choices: [{ index: 0, delta: { role: "assistant" } }], + }); + } + + sawAssistantDelta = true; + writeSse(res, { + id: runId, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model, + choices: [ + { + index: 0, + delta: { content }, + finish_reason: null, + }, + ], + }); + return; + } + + if (evt.stream === "lifecycle") { + const phase = evt.data?.phase; + if (phase === "end" || phase === "error") { + closed = true; + unsubscribe(); + writeDone(res); + res.end(); + } + } + }); + + req.on("close", () => { + closed = true; + unsubscribe(); + }); + + void (async () => { + try { + const result = await agentCommand( + { + message: prompt.message, + extraSystemPrompt: prompt.extraSystemPrompt, + sessionKey, + runId, + deliver: false, + messageProvider: "webchat", + bestEffortDeliver: false, + }, + defaultRuntime, + deps, + ); + + if (closed) return; + + if (!sawAssistantDelta) { + if (!wroteRole) { + wroteRole = true; + writeSse(res, { + id: runId, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model, + choices: [{ index: 0, delta: { role: "assistant" } }], + }); + } + + const payloads = ( + result as { payloads?: Array<{ text?: string }> } | null + )?.payloads; + const content = + Array.isArray(payloads) && payloads.length > 0 + ? payloads + .map((p) => (typeof p.text === "string" ? p.text : "")) + .filter(Boolean) + .join("\n\n") + : "No response from Clawdbot."; + + sawAssistantDelta = true; + writeSse(res, { + id: runId, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model, + choices: [ + { + index: 0, + delta: { content }, + finish_reason: null, + }, + ], + }); + } + } catch (err) { + if (closed) return; + writeSse(res, { + id: runId, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model, + choices: [ + { + index: 0, + delta: { content: `Error: ${String(err)}` }, + finish_reason: "stop", + }, + ], + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "error" }, + }); + } finally { + if (!closed) { + closed = true; + unsubscribe(); + writeDone(res); + res.end(); + } + } + })(); + + return true; +} diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index 6dc106ac9..dbd4105b1 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -22,6 +22,7 @@ import { resolveHookProvider, } from "./hooks.js"; import { applyHookMappings } from "./hooks-mapping.js"; +import { handleOpenAiHttpRequest } from "./openai-http.js"; type SubsystemLogger = ReturnType; @@ -206,12 +207,14 @@ export function createGatewayHttpServer(opts: { controlUiEnabled: boolean; controlUiBasePath: string; handleHooksRequest: HooksRequestHandler; + resolvedAuth: import("./auth.js").ResolvedGatewayAuth; }): HttpServer { const { canvasHost, controlUiEnabled, controlUiBasePath, handleHooksRequest, + resolvedAuth, } = opts; const httpServer: HttpServer = createHttpServer((req, res) => { // Don't interfere with WebSocket upgrades; ws handles the 'upgrade' event. @@ -219,6 +222,8 @@ export function createGatewayHttpServer(opts: { void (async () => { if (await handleHooksRequest(req, res)) return; + if (await handleOpenAiHttpRequest(req, res, { auth: resolvedAuth })) + return; if (canvasHost) { if (await handleA2uiHttpRequest(req, res)) return; if (await canvasHost.handleHttpRequest(req, res)) return; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 2f77446bc..6b51c2065 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -616,6 +616,7 @@ export async function startGatewayServer( controlUiEnabled, controlUiBasePath, handleHooksRequest, + resolvedAuth, }); let bonjourStop: (() => Promise) | null = null; let bridge: Awaited> | null = null;