import { Type } from "@sinclair/typebox"; import { spawn } from "node:child_process"; import path from "node:path"; import type { ClawdbotPluginApi } from "../../../src/plugins/types.js"; type LobsterEnvelope = | { ok: true; status: "ok" | "needs_approval" | "cancelled"; output: unknown[]; requiresApproval: null | { type: "approval_request"; prompt: string; items: unknown[]; resumeToken?: string; }; } | { ok: false; error: { type?: string; message: string }; }; function resolveExecutablePath(lobsterPathRaw: string | undefined) { const lobsterPath = lobsterPathRaw?.trim() || "lobster"; if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) { throw new Error("lobsterPath must be an absolute path (or omit to use PATH)"); } return lobsterPath; } function isWindowsSpawnEINVAL(err: unknown) { if (!err || typeof err !== "object") return false; const code = (err as { code?: unknown }).code; return code === "EINVAL"; } async function runLobsterSubprocessOnce( params: { execPath: string; argv: string[]; cwd: string; timeoutMs: number; maxStdoutBytes: number; }, useShell: boolean, ) { const { execPath, argv, cwd } = params; const timeoutMs = Math.max(200, params.timeoutMs); const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); const env = { ...process.env, LOBSTER_MODE: "tool" } as Record; const nodeOptions = env.NODE_OPTIONS ?? ""; if (nodeOptions.includes("--inspect")) { delete env.NODE_OPTIONS; } return await new Promise<{ stdout: string }>((resolve, reject) => { const child = spawn(execPath, argv, { cwd, stdio: ["ignore", "pipe", "pipe"], env, shell: useShell, windowsHide: useShell ? true : undefined, }); let stdout = ""; let stdoutBytes = 0; let stderr = ""; child.stdout?.setEncoding("utf8"); child.stderr?.setEncoding("utf8"); child.stdout?.on("data", (chunk) => { const str = String(chunk); stdoutBytes += Buffer.byteLength(str, "utf8"); if (stdoutBytes > maxStdoutBytes) { try { child.kill("SIGKILL"); } finally { reject(new Error("lobster output exceeded maxStdoutBytes")); } return; } stdout += str; }); child.stderr?.on("data", (chunk) => { stderr += String(chunk); }); const timer = setTimeout(() => { try { child.kill("SIGKILL"); } finally { reject(new Error("lobster subprocess timed out")); } }, timeoutMs); child.once("error", (err) => { clearTimeout(timer); reject(err); }); child.once("exit", (code) => { clearTimeout(timer); if (code !== 0) { reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`)); return; } resolve({ stdout }); }); }); } async function runLobsterSubprocess(params: { execPath: string; argv: string[]; cwd: string; timeoutMs: number; maxStdoutBytes: number; }) { try { return await runLobsterSubprocessOnce(params, false); } catch (err) { if (process.platform === "win32" && isWindowsSpawnEINVAL(err)) { return await runLobsterSubprocessOnce(params, true); } throw err; } } function parseEnvelope(stdout: string): LobsterEnvelope { let parsed: unknown; try { parsed = JSON.parse(stdout); } catch { throw new Error("lobster returned invalid JSON"); } if (!parsed || typeof parsed !== "object") { throw new Error("lobster returned invalid JSON envelope"); } const ok = (parsed as { ok?: unknown }).ok; if (ok === true || ok === false) { return parsed as LobsterEnvelope; } throw new Error("lobster returned invalid JSON envelope"); } export function createLobsterTool(api: ClawdbotPluginApi) { return { name: "lobster", description: "Run Lobster pipelines as a local-first workflow runtime (typed JSON envelope + resumable approvals).", parameters: Type.Object({ // NOTE: Prefer string enums in tool schemas; some providers reject unions/anyOf. action: Type.Unsafe<"run" | "resume">({ type: "string", enum: ["run", "resume"] }), pipeline: Type.Optional(Type.String()), argsJson: Type.Optional(Type.String()), token: Type.Optional(Type.String()), approve: Type.Optional(Type.Boolean()), lobsterPath: Type.Optional(Type.String()), cwd: Type.Optional(Type.String()), timeoutMs: Type.Optional(Type.Number()), maxStdoutBytes: Type.Optional(Type.Number()), }), async execute(_id: string, params: Record) { const action = String(params.action || "").trim(); if (!action) throw new Error("action required"); const execPath = resolveExecutablePath( typeof params.lobsterPath === "string" ? params.lobsterPath : undefined, ); const cwd = typeof params.cwd === "string" && params.cwd.trim() ? params.cwd.trim() : process.cwd(); const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : 20_000; const maxStdoutBytes = typeof params.maxStdoutBytes === "number" ? params.maxStdoutBytes : 512_000; const argv = (() => { if (action === "run") { const pipeline = typeof params.pipeline === "string" ? params.pipeline : ""; if (!pipeline.trim()) throw new Error("pipeline required"); const argv = ["run", "--mode", "tool", pipeline]; const argsJson = typeof params.argsJson === "string" ? params.argsJson : ""; if (argsJson.trim()) { argv.push("--args-json", argsJson); } return argv; } if (action === "resume") { const token = typeof params.token === "string" ? params.token : ""; if (!token.trim()) throw new Error("token required"); const approve = params.approve; if (typeof approve !== "boolean") throw new Error("approve required"); return ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; } throw new Error(`Unknown action: ${action}`); })(); if (api.runtime?.version && api.logger?.debug) { api.logger.debug(`lobster plugin runtime=${api.runtime.version}`); } const { stdout } = await runLobsterSubprocess({ execPath, argv, cwd, timeoutMs, maxStdoutBytes, }); const envelope = parseEnvelope(stdout); return { content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }], details: envelope, }; }, }; }