feat: add gateway webhooks
This commit is contained in:
@@ -65,7 +65,7 @@ import {
|
||||
} from "../cron/run-log.js";
|
||||
import { CronService } from "../cron/service.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
import type { CronJobCreate, CronJobPatch } from "../cron/types.js";
|
||||
import type { CronJob, CronJobCreate, CronJobPatch } from "../cron/types.js";
|
||||
import { isVerbose } from "../globals.js";
|
||||
import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js";
|
||||
@@ -146,6 +146,109 @@ import { handleControlUiHttpRequest } from "./control-ui.js";
|
||||
|
||||
ensureClawdisCliOnPath();
|
||||
|
||||
const DEFAULT_HOOKS_PATH = "/hooks";
|
||||
const DEFAULT_HOOKS_MAX_BODY_BYTES = 256 * 1024;
|
||||
|
||||
type HooksConfigResolved = {
|
||||
basePath: string;
|
||||
token: string;
|
||||
maxBodyBytes: number;
|
||||
};
|
||||
|
||||
function resolveHooksConfig(cfg: ClawdisConfig): HooksConfigResolved | null {
|
||||
if (cfg.hooks?.enabled !== true) return null;
|
||||
const token = cfg.hooks?.token?.trim();
|
||||
if (!token) {
|
||||
throw new Error("hooks.enabled requires hooks.token");
|
||||
}
|
||||
const rawPath = cfg.hooks?.path?.trim() || DEFAULT_HOOKS_PATH;
|
||||
const withSlash = rawPath.startsWith("/") ? rawPath : `/${rawPath}`;
|
||||
const trimmed =
|
||||
withSlash.length > 1 ? withSlash.replace(/\/+$/, "") : withSlash;
|
||||
if (trimmed === "/") {
|
||||
throw new Error("hooks.path may not be '/'");
|
||||
}
|
||||
return {
|
||||
basePath: trimmed,
|
||||
token,
|
||||
maxBodyBytes: DEFAULT_HOOKS_MAX_BODY_BYTES,
|
||||
};
|
||||
}
|
||||
|
||||
function extractHookToken(
|
||||
req: IncomingMessage,
|
||||
url: URL,
|
||||
): string | undefined {
|
||||
const auth =
|
||||
typeof req.headers.authorization === "string"
|
||||
? req.headers.authorization.trim()
|
||||
: "";
|
||||
if (auth.toLowerCase().startsWith("bearer ")) {
|
||||
const token = auth.slice(7).trim();
|
||||
if (token) return token;
|
||||
}
|
||||
const headerToken =
|
||||
typeof req.headers["x-clawdis-token"] === "string"
|
||||
? req.headers["x-clawdis-token"].trim()
|
||||
: "";
|
||||
if (headerToken) return headerToken;
|
||||
const queryToken = url.searchParams.get("token");
|
||||
if (queryToken) return queryToken.trim();
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function readJsonBody(
|
||||
req: IncomingMessage,
|
||||
maxBytes: number,
|
||||
): Promise<{ ok: true; value: unknown } | { ok: false; error: string }> {
|
||||
return await new Promise((resolve) => {
|
||||
let done = false;
|
||||
let total = 0;
|
||||
const chunks: Buffer[] = [];
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
if (done) return;
|
||||
total += chunk.length;
|
||||
if (total > maxBytes) {
|
||||
done = true;
|
||||
resolve({ ok: false, error: "payload too large" });
|
||||
req.destroy();
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
req.on("end", () => {
|
||||
if (done) return;
|
||||
done = true;
|
||||
const raw = Buffer.concat(chunks).toString("utf-8").trim();
|
||||
if (!raw) {
|
||||
resolve({ ok: true, value: {} });
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(raw) as unknown;
|
||||
resolve({ ok: true, value: parsed });
|
||||
} catch (err) {
|
||||
resolve({ ok: false, error: String(err) });
|
||||
}
|
||||
});
|
||||
req.on("error", (err) => {
|
||||
if (done) return;
|
||||
done = true;
|
||||
resolve({ ok: false, error: String(err) });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function sendJson(
|
||||
res: import("node:http").ServerResponse,
|
||||
status: number,
|
||||
body: unknown,
|
||||
) {
|
||||
res.statusCode = status;
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
res.end(JSON.stringify(body));
|
||||
}
|
||||
|
||||
const log = createSubsystemLogger("gateway");
|
||||
const logCanvas = log.child("canvas");
|
||||
const logBridge = log.child("bridge");
|
||||
@@ -155,6 +258,7 @@ const logProviders = log.child("providers");
|
||||
const logBrowser = log.child("browser");
|
||||
const logHealth = log.child("health");
|
||||
const logCron = log.child("cron");
|
||||
const logHooks = log.child("hooks");
|
||||
const logWsControl = log.child("ws");
|
||||
const logWhatsApp = logProviders.child("whatsapp");
|
||||
const logTelegram = logProviders.child("telegram");
|
||||
@@ -1174,6 +1278,7 @@ export async function startGatewayServer(
|
||||
password,
|
||||
allowTailscale,
|
||||
};
|
||||
const hooksConfig = resolveHooksConfig(cfgAtStart);
|
||||
const canvasHostEnabled =
|
||||
process.env.CLAWDIS_SKIP_CANVAS_HOST !== "1" &&
|
||||
cfgAtStart.canvasHost?.enabled !== false;
|
||||
@@ -1194,6 +1299,7 @@ export async function startGatewayServer(
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
let canvasHost: CanvasHostHandler | null = null;
|
||||
let canvasHostServer: CanvasHostServer | null = null;
|
||||
if (canvasHostEnabled) {
|
||||
@@ -1215,11 +1321,200 @@ export async function startGatewayServer(
|
||||
}
|
||||
}
|
||||
|
||||
const handleHooksRequest = async (
|
||||
req: IncomingMessage,
|
||||
res: import("node:http").ServerResponse,
|
||||
): Promise<boolean> => {
|
||||
if (!hooksConfig) return false;
|
||||
const url = new URL(req.url ?? "/", `http://${bindHost}:${port}`);
|
||||
const basePath = hooksConfig.basePath;
|
||||
if (
|
||||
url.pathname !== basePath &&
|
||||
!url.pathname.startsWith(`${basePath}/`)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const token = extractHookToken(req, url);
|
||||
if (!token || token !== hooksConfig.token) {
|
||||
res.statusCode = 401;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Unauthorized");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (req.method !== "POST") {
|
||||
res.statusCode = 405;
|
||||
res.setHeader("Allow", "POST");
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Method Not Allowed");
|
||||
return true;
|
||||
}
|
||||
|
||||
const subPath = url.pathname
|
||||
.slice(basePath.length)
|
||||
.replace(/^\/+/, "");
|
||||
if (!subPath) {
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
return true;
|
||||
}
|
||||
|
||||
const body = await readJsonBody(req, hooksConfig.maxBodyBytes);
|
||||
if (!body.ok) {
|
||||
const status = body.error === "payload too large" ? 413 : 400;
|
||||
sendJson(res, status, { ok: false, error: body.error });
|
||||
return true;
|
||||
}
|
||||
|
||||
const payload =
|
||||
typeof body.value === "object" && body.value !== null ? body.value : {};
|
||||
|
||||
if (subPath === "wake") {
|
||||
const text =
|
||||
typeof (payload as { text?: unknown }).text === "string"
|
||||
? (payload as { text?: string }).text.trim()
|
||||
: "";
|
||||
if (!text) {
|
||||
sendJson(res, 400, { ok: false, error: "text required" });
|
||||
return true;
|
||||
}
|
||||
const modeRaw = (payload as { mode?: unknown }).mode;
|
||||
const mode =
|
||||
modeRaw === "next-heartbeat" ? "next-heartbeat" : "now";
|
||||
enqueueSystemEvent(text);
|
||||
if (mode === "now") {
|
||||
requestReplyHeartbeatNow({ reason: "hook:wake" });
|
||||
}
|
||||
sendJson(res, 200, { ok: true, mode });
|
||||
return true;
|
||||
}
|
||||
|
||||
if (subPath === "agent") {
|
||||
const message =
|
||||
typeof (payload as { message?: unknown }).message === "string"
|
||||
? (payload as { message?: string }).message.trim()
|
||||
: "";
|
||||
if (!message) {
|
||||
sendJson(res, 400, { ok: false, error: "message required" });
|
||||
return true;
|
||||
}
|
||||
|
||||
const nameRaw = (payload as { name?: unknown }).name;
|
||||
const name =
|
||||
typeof nameRaw === "string" && nameRaw.trim() ? nameRaw.trim() : "Hook";
|
||||
const wakeModeRaw = (payload as { wakeMode?: unknown }).wakeMode;
|
||||
const wakeMode =
|
||||
wakeModeRaw === "next-heartbeat" ? "next-heartbeat" : "now";
|
||||
const sessionKeyRaw = (payload as { sessionKey?: unknown }).sessionKey;
|
||||
const sessionKey =
|
||||
typeof sessionKeyRaw === "string" && sessionKeyRaw.trim()
|
||||
? sessionKeyRaw.trim()
|
||||
: `hook:${randomUUID()}`;
|
||||
|
||||
const channelRaw = (payload as { channel?: unknown }).channel;
|
||||
const channel =
|
||||
channelRaw === "whatsapp" ||
|
||||
channelRaw === "telegram" ||
|
||||
channelRaw === "last"
|
||||
? channelRaw
|
||||
: channelRaw === undefined
|
||||
? undefined
|
||||
: null;
|
||||
if (channel === null) {
|
||||
sendJson(res, 400, {
|
||||
ok: false,
|
||||
error: "channel must be last|whatsapp|telegram",
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
const toRaw = (payload as { to?: unknown }).to;
|
||||
const to =
|
||||
typeof toRaw === "string" && toRaw.trim() ? toRaw.trim() : undefined;
|
||||
const deliver = (payload as { deliver?: unknown }).deliver === true;
|
||||
const thinkingRaw = (payload as { thinking?: unknown }).thinking;
|
||||
const thinking =
|
||||
typeof thinkingRaw === "string" && thinkingRaw.trim()
|
||||
? thinkingRaw.trim()
|
||||
: undefined;
|
||||
const timeoutRaw = (payload as { timeoutSeconds?: unknown }).timeoutSeconds;
|
||||
const timeoutSeconds =
|
||||
typeof timeoutRaw === "number" && Number.isFinite(timeoutRaw) && timeoutRaw > 0
|
||||
? Math.floor(timeoutRaw)
|
||||
: undefined;
|
||||
|
||||
const jobId = randomUUID();
|
||||
const now = Date.now();
|
||||
const job: CronJob = {
|
||||
id: jobId,
|
||||
name,
|
||||
enabled: true,
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
schedule: { kind: "at", atMs: now },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode,
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message,
|
||||
thinking,
|
||||
timeoutSeconds,
|
||||
deliver,
|
||||
channel: channel ?? "last",
|
||||
to,
|
||||
},
|
||||
state: { nextRunAtMs: now },
|
||||
};
|
||||
|
||||
const runId = randomUUID();
|
||||
sendJson(res, 202, { ok: true, runId });
|
||||
|
||||
void (async () => {
|
||||
try {
|
||||
const cfg = loadConfig();
|
||||
const result = await runCronIsolatedAgentTurn({
|
||||
cfg,
|
||||
deps,
|
||||
job,
|
||||
message,
|
||||
sessionKey,
|
||||
lane: "cron",
|
||||
});
|
||||
const summary =
|
||||
result.summary?.trim() ||
|
||||
result.error?.trim() ||
|
||||
result.status;
|
||||
const prefix =
|
||||
result.status === "ok" ? `Hook ${name}` : `Hook ${name} (${result.status})`;
|
||||
enqueueSystemEvent(`${prefix}: ${summary}`.trim());
|
||||
if (wakeMode === "now") {
|
||||
requestReplyHeartbeatNow({ reason: `hook:${jobId}` });
|
||||
}
|
||||
} catch (err) {
|
||||
logHooks.warn({ err: String(err) }, "hook agent failed");
|
||||
enqueueSystemEvent(`Hook ${name} (error): ${String(err)}`);
|
||||
if (wakeMode === "now") {
|
||||
requestReplyHeartbeatNow({ reason: `hook:${jobId}:error` });
|
||||
}
|
||||
}
|
||||
})();
|
||||
return true;
|
||||
}
|
||||
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
return true;
|
||||
};
|
||||
|
||||
const httpServer: HttpServer = createHttpServer((req, res) => {
|
||||
// Don't interfere with WebSocket upgrades; ws handles the 'upgrade' event.
|
||||
if (String(req.headers.upgrade ?? "").toLowerCase() === "websocket") return;
|
||||
|
||||
void (async () => {
|
||||
if (await handleHooksRequest(req, res)) return;
|
||||
if (canvasHost) {
|
||||
if (await handleA2uiHttpRequest(req, res)) return;
|
||||
if (await canvasHost.handleHttpRequest(req, res)) return;
|
||||
|
||||
Reference in New Issue
Block a user