diff --git a/docs/tools/lobster.md b/docs/tools/lobster.md new file mode 100644 index 000000000..f54fc3da2 --- /dev/null +++ b/docs/tools/lobster.md @@ -0,0 +1,64 @@ +--- +title: Lobster +description: Run Lobster pipelines (typed workflows) as a first-class Clawdbot tool. +--- + +# Lobster + +The `lobster` tool lets Clawdbot run Lobster pipelines as a **local-first, typed workflow runtime**. + +This is designed for: +- Deterministic orchestration (move multi-step tool workflows out of the LLM) +- Human-in-the-loop approvals that **halt and resume** +- Lower token usage (one `lobster.run` call instead of many tool calls) + +## Security model + +- Lobster runs as a **local subprocess**. +- Lobster does **not** manage OAuth or secrets. +- Side effects still go through Clawdbot tools (messaging, files, etc.). + +Recommendations: +- Prefer configuring `lobsterPath` as an **absolute path** to avoid PATH hijack. +- Use Lobster approvals (`approve`) for any side-effectful step. + +## Actions + +### `run` + +Run a pipeline in tool mode. + +Example: + +```json +{ + "action": "run", + "pipeline": "exec --json \"echo [1]\" | approve --prompt 'ok?'", + "lobsterPath": "/absolute/path/to/lobster", + "timeoutMs": 20000 +} +``` + +### `resume` + +Resume a halted pipeline. + +Example: + +```json +{ + "action": "resume", + "token": "", + "approve": true, + "lobsterPath": "/absolute/path/to/lobster" +} +``` + +## Output + +Lobster returns a JSON envelope: + +- `ok`: boolean +- `status`: `ok` | `needs_approval` | `cancelled` +- `output`: array of items +- `requiresApproval`: approval request object (when `status=needs_approval`) diff --git a/src/agents/clawdbot-tools.ts b/src/agents/clawdbot-tools.ts index 5ae4891dc..c95636fb3 100644 --- a/src/agents/clawdbot-tools.ts +++ b/src/agents/clawdbot-tools.ts @@ -9,6 +9,7 @@ import type { AnyAgentTool } from "./tools/common.js"; import { createCronTool } from "./tools/cron-tool.js"; import { createGatewayTool } from "./tools/gateway-tool.js"; import { createImageTool } from "./tools/image-tool.js"; +import { createLobsterTool } from "./tools/lobster-tool.js"; import { createMessageTool } from "./tools/message-tool.js"; import { createNodesTool } from "./tools/nodes-tool.js"; import { createSessionStatusTool } from "./tools/session-status-tool.js"; @@ -111,6 +112,7 @@ export function createClawdbotTools(options?: { agentSessionKey: options?.agentSessionKey, config: options?.config, }), + createLobsterTool({ sandboxed: options?.sandboxed }), ...(webSearchTool ? [webSearchTool] : []), ...(webFetchTool ? [webFetchTool] : []), ...(imageTool ? [imageTool] : []), diff --git a/src/agents/tools/lobster-tool.test.ts b/src/agents/tools/lobster-tool.test.ts new file mode 100644 index 000000000..26441f2d6 --- /dev/null +++ b/src/agents/tools/lobster-tool.test.ts @@ -0,0 +1,122 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { createLobsterTool } from "./lobster-tool.js"; + +async function writeFakeLobster(params: { + script: (args: string[]) => unknown; +}) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-")); + const binPath = path.join(dir, "lobster"); + + const file = `#!/usr/bin/env node\n` + + `const args = process.argv.slice(2);\n` + + `const payload = (${params.script.toString()})(args);\n` + + `process.stdout.write(JSON.stringify(payload));\n`; + + await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 }); + return { dir, binPath }; +} + +describe("lobster tool", () => { + it("runs lobster in tool mode and returns envelope", async () => { + const fake = await writeFakeLobster({ + script: (args) => { + if (args[0] !== "run") throw new Error("expected run"); + return { + ok: true, + status: "ok", + output: [{ hello: "world" }], + requiresApproval: null, + }; + }, + }); + + const tool = createLobsterTool(); + const res = await tool.execute("call1", { + action: "run", + pipeline: "exec --json \"echo [1]\"", + lobsterPath: fake.binPath, + timeoutMs: 1000, + }); + + expect(res.details).toMatchObject({ + ok: true, + status: "ok", + output: [{ hello: "world" }], + requiresApproval: null, + }); + }); + + it("supports resume action", async () => { + const fake = await writeFakeLobster({ + script: (args) => { + if (args[0] !== "resume") throw new Error("expected resume"); + return { + ok: true, + status: "ok", + output: ["resumed"], + requiresApproval: null, + }; + }, + }); + + const tool = createLobsterTool(); + const res = await tool.execute("call2", { + action: "resume", + token: "tok", + approve: true, + lobsterPath: fake.binPath, + timeoutMs: 1000, + }); + + expect(res.details).toMatchObject({ ok: true, status: "ok" }); + }); + + it("rejects non-absolute lobsterPath", async () => { + const tool = createLobsterTool(); + await expect( + tool.execute("call3", { + action: "run", + pipeline: "json", + lobsterPath: "./lobster", + }), + ).rejects.toThrow(/absolute path/); + }); + + it("blocks tool in sandboxed mode", async () => { + const tool = createLobsterTool({ sandboxed: true }); + await expect( + tool.execute("call4", { + action: "run", + pipeline: "json", + lobsterPath: "/usr/bin/true", + }), + ).rejects.toThrow(/not available in sandboxed/); + }); + + it("rejects invalid JSON", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-bad-")); + const binPath = path.join(dir, "lobster"); + await fs.writeFile( + binPath, + `#!/usr/bin/env node\nprocess.stdout.write('not-json');\n`, + { + encoding: "utf8", + mode: 0o755, + }, + ); + + const tool = createLobsterTool(); + await expect( + tool.execute("call5", { + action: "run", + pipeline: "json", + lobsterPath: binPath, + }), + ).rejects.toThrow(/invalid JSON/); + }); +}); diff --git a/src/agents/tools/lobster-tool.ts b/src/agents/tools/lobster-tool.ts new file mode 100644 index 000000000..c01cab81a --- /dev/null +++ b/src/agents/tools/lobster-tool.ts @@ -0,0 +1,231 @@ +import { Type } from "@sinclair/typebox"; +import { spawn } from "node:child_process"; +import path from "node:path"; + +import { stringEnum } from "../schema/typebox.js"; +import type { AnyAgentTool } from "./common.js"; +import { jsonResult, readNumberParam, readStringParam } from "./common.js"; + +const LobsterActions = ["run", "resume"] as const; + +type LobsterToolParams = { + action: (typeof LobsterActions)[number]; + pipeline?: string; + token?: string; + approve?: boolean; + lobsterPath?: string; + cwd?: string; + timeoutMs?: number; + maxStdoutBytes?: number; +}; + +type LobsterEnvelope = + | { + ok: true; + status: "ok" | "needs_approval" | "cancelled"; + output: unknown[]; + requiresApproval: null | { + type: "approval_request"; + prompt: string; + items: unknown[]; + resumeToken?: string; + }; + } + | { + ok: false; + error: { type?: string; message: string }; + }; + +function buildSchema() { + return Type.Object({ + action: stringEnum(LobsterActions), + pipeline: Type.Optional(Type.String({ description: "Lobster pipeline string." })), + token: Type.Optional(Type.String({ description: "Resume token from lobster tool mode." })), + approve: Type.Optional(Type.Boolean({ description: "Approval decision for resume." })), + lobsterPath: Type.Optional( + Type.String({ + description: + "Path to lobster executable. Prefer an absolute path to avoid PATH hijack. Defaults to 'lobster'.", + }), + ), + cwd: Type.Optional( + Type.String({ + description: "Working directory for lobster subprocess.", + }), + ), + timeoutMs: Type.Optional( + Type.Number({ + description: "Subprocess timeout (ms).", + }), + ), + maxStdoutBytes: Type.Optional( + Type.Number({ + description: "Max stdout bytes to read before aborting.", + }), + ), + }); +} + +function resolveExecutablePath(lobsterPathRaw: string | undefined) { + const lobsterPath = lobsterPathRaw?.trim() || "lobster"; + if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) { + throw new Error("lobsterPath must be an absolute path (or omit to use PATH)"); + } + return lobsterPath; +} + +async function runLobsterSubprocess(params: { + execPath: string; + argv: string[]; + cwd: string; + timeoutMs: number; + maxStdoutBytes: number; +}) { + const { execPath, argv, cwd } = params; + const timeoutMs = Math.max(200, params.timeoutMs); + const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); + + return await new Promise<{ stdout: string; exitCode: number | null }>((resolve, reject) => { + const child = spawn(execPath, argv, { + cwd, + stdio: ["ignore", "pipe", "pipe"], + env: { + ...process.env, + // Ensure lobster never tries to be interactive. + LOBSTER_MODE: "tool", + }, + }); + + let stdout = ""; + let stdoutBytes = 0; + let stderr = ""; + + child.stdout?.setEncoding("utf8"); + child.stderr?.setEncoding("utf8"); + + child.stdout?.on("data", (chunk) => { + const str = String(chunk); + stdoutBytes += Buffer.byteLength(str, "utf8"); + if (stdoutBytes > maxStdoutBytes) { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster output exceeded maxStdoutBytes")); + } + return; + } + stdout += str; + }); + + child.stderr?.on("data", (chunk) => { + stderr += String(chunk); + }); + + const timer = setTimeout(() => { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster subprocess timed out")); + } + }, timeoutMs); + + child.once("error", (err) => { + clearTimeout(timer); + reject(err); + }); + + child.once("exit", (code) => { + clearTimeout(timer); + if (code !== 0) { + reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`)); + return; + } + resolve({ stdout, exitCode: code }); + }); + }); +} + +function parseEnvelope(stdout: string): LobsterEnvelope { + let parsed: unknown; + try { + parsed = JSON.parse(stdout); + } catch { + throw new Error("lobster returned invalid JSON"); + } + + if (!parsed || typeof parsed !== "object") { + throw new Error("lobster returned invalid JSON envelope"); + } + + const ok = (parsed as { ok?: unknown }).ok; + if (ok === true) { + const env = parsed as LobsterEnvelope; + if (!Array.isArray((env as any).output)) { + throw new Error("lobster tool output must include output[]"); + } + return env; + } + + if (ok === false) { + const env = parsed as LobsterEnvelope; + const msg = (env as any)?.error?.message; + if (typeof msg !== "string" || !msg.trim()) { + throw new Error("lobster error envelope missing error.message"); + } + return env; + } + + throw new Error("lobster returned invalid JSON envelope"); +} + +export function createLobsterTool(options: { sandboxed?: boolean } = {}): AnyAgentTool { + const parameters = buildSchema(); + + return { + label: "Lobster", + name: "lobster", + description: + "Run Lobster pipelines as a local-first, typed workflow runtime (tool mode JSON envelope, resumable approvals).", + parameters, + async execute(_callId, paramsRaw) { + if (options.sandboxed) { + throw new Error("lobster tool is not available in sandboxed mode"); + } + + const params = paramsRaw as Record; + const action = readStringParam(params, "action", { required: true }) as LobsterToolParams["action"]; + + const execPath = resolveExecutablePath(readStringParam(params, "lobsterPath")); + const cwd = readStringParam(params, "cwd", { allowEmpty: false }) || process.cwd(); + + const timeoutMs = readNumberParam(params, "timeoutMs", { integer: true }) ?? 20_000; + const maxStdoutBytes = readNumberParam(params, "maxStdoutBytes", { integer: true }) ?? 512_000; + + let argv: string[]; + if (action === "run") { + const pipeline = readStringParam(params, "pipeline", { required: true, label: "pipeline" })!; + argv = ["run", "--mode", "tool", pipeline]; + } else if (action === "resume") { + const token = readStringParam(params, "token", { required: true, label: "token" })!; + const approve = params["approve"]; + if (typeof approve !== "boolean") { + throw new Error("approve required"); + } + argv = ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; + } else { + throw new Error(`Unknown action: ${action}`); + } + + const { stdout } = await runLobsterSubprocess({ + execPath, + argv, + cwd, + timeoutMs, + maxStdoutBytes, + }); + + const envelope = parseEnvelope(stdout); + return jsonResult(envelope); + }, + }; +}