diff --git a/docs/tools/lobster.md b/docs/tools/lobster.md deleted file mode 100644 index f54fc3da2..000000000 --- a/docs/tools/lobster.md +++ /dev/null @@ -1,64 +0,0 @@ ---- -title: Lobster -description: Run Lobster pipelines (typed workflows) as a first-class Clawdbot tool. ---- - -# Lobster - -The `lobster` tool lets Clawdbot run Lobster pipelines as a **local-first, typed workflow runtime**. - -This is designed for: -- Deterministic orchestration (move multi-step tool workflows out of the LLM) -- Human-in-the-loop approvals that **halt and resume** -- Lower token usage (one `lobster.run` call instead of many tool calls) - -## Security model - -- Lobster runs as a **local subprocess**. -- Lobster does **not** manage OAuth or secrets. -- Side effects still go through Clawdbot tools (messaging, files, etc.). - -Recommendations: -- Prefer configuring `lobsterPath` as an **absolute path** to avoid PATH hijack. -- Use Lobster approvals (`approve`) for any side-effectful step. - -## Actions - -### `run` - -Run a pipeline in tool mode. - -Example: - -```json -{ - "action": "run", - "pipeline": "exec --json \"echo [1]\" | approve --prompt 'ok?'", - "lobsterPath": "/absolute/path/to/lobster", - "timeoutMs": 20000 -} -``` - -### `resume` - -Resume a halted pipeline. - -Example: - -```json -{ - "action": "resume", - "token": "", - "approve": true, - "lobsterPath": "/absolute/path/to/lobster" -} -``` - -## Output - -Lobster returns a JSON envelope: - -- `ok`: boolean -- `status`: `ok` | `needs_approval` | `cancelled` -- `output`: array of items -- `requiresApproval`: approval request object (when `status=needs_approval`) diff --git a/extensions/lobster/README.md b/extensions/lobster/README.md new file mode 100644 index 000000000..13e675b45 --- /dev/null +++ b/extensions/lobster/README.md @@ -0,0 +1,38 @@ +# Lobster (plugin) + +Adds the `lobster` agent tool as an **optional** plugin tool. + +## What this is + +- Lobster is a standalone workflow shell (typed JSON-first pipelines + approvals/resume). +- This plugin integrates Lobster with Clawdbot *without core changes*. + +## Enable + +Because this tool can trigger side effects (via workflows), it is registered with `optional: true`. + +Enable it in an agent allowlist: + +```json +{ + "agents": { + "list": [ + { + "id": "main", + "tools": { + "allow": [ + "lobster" // plugin id (enables all tools from this plugin) + ] + } + } + ] + } +} +``` + +## Security + +- Runs the `lobster` executable as a local subprocess. +- Does not manage OAuth/tokens. +- Uses timeouts, stdout caps, and strict JSON envelope parsing. +- Prefer an absolute `lobsterPath` in production to avoid PATH hijack. diff --git a/extensions/lobster/index.ts b/extensions/lobster/index.ts new file mode 100644 index 000000000..f1c06c554 --- /dev/null +++ b/extensions/lobster/index.ts @@ -0,0 +1,7 @@ +import type { ClawdbotPluginApi } from "../../src/plugins/types.js"; + +import { createLobsterTool } from "./src/lobster-tool.js"; + +export default function register(api: ClawdbotPluginApi) { + api.registerTool(createLobsterTool(api), { optional: true }); +} diff --git a/extensions/lobster/package.json b/extensions/lobster/package.json new file mode 100644 index 000000000..606975434 --- /dev/null +++ b/extensions/lobster/package.json @@ -0,0 +1,9 @@ +{ + "name": "@clawdbot/lobster", + "version": "2026.1.17-1", + "type": "module", + "description": "Lobster workflow tool plugin (typed pipelines + resumable approvals)", + "clawdbot": { + "extensions": ["./index.ts"] + } +} diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts new file mode 100644 index 000000000..3b1dae859 --- /dev/null +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -0,0 +1,87 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import type { ClawdbotPluginApi } from "../../../src/plugins/types.js"; +import { createLobsterTool } from "./lobster-tool.js"; + +async function writeFakeLobster(params: { + payload: unknown; +}) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-plugin-")); + const binPath = path.join(dir, "lobster"); + + const file = `#!/usr/bin/env node\n` + + `process.stdout.write(JSON.stringify(${JSON.stringify(params.payload)}));\n`; + + await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 }); + return { dir, binPath }; +} + +function fakeApi(): ClawdbotPluginApi { + return { + id: "lobster", + name: "lobster", + source: "test", + config: {} as any, + runtime: { version: "test" } as any, + logger: { info() {}, warn() {}, error() {}, debug() {} }, + registerTool() {}, + registerHttpHandler() {}, + registerChannel() {}, + registerGatewayMethod() {}, + registerCli() {}, + registerService() {}, + registerProvider() {}, + resolvePath: (p) => p, + }; +} + +describe("lobster plugin tool", () => { + it("runs lobster and returns parsed envelope in details", async () => { + const fake = await writeFakeLobster({ + payload: { ok: true, status: "ok", output: [{ hello: "world" }], requiresApproval: null }, + }); + + const tool = createLobsterTool(fakeApi()); + const res = await tool.execute("call1", { + action: "run", + pipeline: "noop", + lobsterPath: fake.binPath, + timeoutMs: 1000, + }); + + expect(res.details).toMatchObject({ ok: true, status: "ok" }); + }); + + it("requires absolute lobsterPath when provided", async () => { + const tool = createLobsterTool(fakeApi()); + await expect( + tool.execute("call2", { + action: "run", + pipeline: "noop", + lobsterPath: "./lobster", + }), + ).rejects.toThrow(/absolute path/); + }); + + it("rejects invalid JSON from lobster", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-plugin-bad-")); + const binPath = path.join(dir, "lobster"); + await fs.writeFile(binPath, `#!/usr/bin/env node\nprocess.stdout.write('nope');\n`, { + encoding: "utf8", + mode: 0o755, + }); + + const tool = createLobsterTool(fakeApi()); + await expect( + tool.execute("call3", { + action: "run", + pipeline: "noop", + lobsterPath: binPath, + }), + ).rejects.toThrow(/invalid JSON/); + }); +}); diff --git a/extensions/lobster/src/lobster-tool.ts b/extensions/lobster/src/lobster-tool.ts new file mode 100644 index 000000000..2d76e0821 --- /dev/null +++ b/extensions/lobster/src/lobster-tool.ts @@ -0,0 +1,185 @@ +import { Type } from "@sinclair/typebox"; +import { spawn } from "node:child_process"; +import path from "node:path"; + +import type { ClawdbotPluginApi } from "../../../src/plugins/types.js"; + +type LobsterEnvelope = + | { + ok: true; + status: "ok" | "needs_approval" | "cancelled"; + output: unknown[]; + requiresApproval: null | { + type: "approval_request"; + prompt: string; + items: unknown[]; + resumeToken?: string; + }; + } + | { + ok: false; + error: { type?: string; message: string }; + }; + +function resolveExecutablePath(lobsterPathRaw: string | undefined) { + const lobsterPath = lobsterPathRaw?.trim() || "lobster"; + if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) { + throw new Error("lobsterPath must be an absolute path (or omit to use PATH)"); + } + return lobsterPath; +} + +async function runLobsterSubprocess(params: { + execPath: string; + argv: string[]; + cwd: string; + timeoutMs: number; + maxStdoutBytes: number; +}) { + const { execPath, argv, cwd } = params; + const timeoutMs = Math.max(200, params.timeoutMs); + const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); + + return await new Promise<{ stdout: string }>((resolve, reject) => { + const child = spawn(execPath, argv, { + cwd, + stdio: ["ignore", "pipe", "pipe"], + env: { + ...process.env, + LOBSTER_MODE: "tool", + }, + }); + + let stdout = ""; + let stdoutBytes = 0; + let stderr = ""; + + child.stdout?.setEncoding("utf8"); + child.stderr?.setEncoding("utf8"); + + child.stdout?.on("data", (chunk) => { + const str = String(chunk); + stdoutBytes += Buffer.byteLength(str, "utf8"); + if (stdoutBytes > maxStdoutBytes) { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster output exceeded maxStdoutBytes")); + } + return; + } + stdout += str; + }); + + child.stderr?.on("data", (chunk) => { + stderr += String(chunk); + }); + + const timer = setTimeout(() => { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster subprocess timed out")); + } + }, timeoutMs); + + child.once("error", (err) => { + clearTimeout(timer); + reject(err); + }); + + child.once("exit", (code) => { + clearTimeout(timer); + if (code !== 0) { + reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`)); + return; + } + resolve({ stdout }); + }); + }); +} + +function parseEnvelope(stdout: string): LobsterEnvelope { + let parsed: unknown; + try { + parsed = JSON.parse(stdout); + } catch { + throw new Error("lobster returned invalid JSON"); + } + + if (!parsed || typeof parsed !== "object") { + throw new Error("lobster returned invalid JSON envelope"); + } + + const ok = (parsed as { ok?: unknown }).ok; + if (ok === true || ok === false) { + return parsed as LobsterEnvelope; + } + + throw new Error("lobster returned invalid JSON envelope"); +} + +export function createLobsterTool(api: ClawdbotPluginApi) { + return { + name: "lobster", + description: + "Run Lobster pipelines as a local-first workflow runtime (typed JSON envelope + resumable approvals).", + parameters: Type.Object({ + // NOTE: Prefer string enums in tool schemas; some providers reject unions/anyOf. + action: Type.Unsafe<"run" | "resume">({ type: "string", enum: ["run", "resume"] }), + pipeline: Type.Optional(Type.String()), + token: Type.Optional(Type.String()), + approve: Type.Optional(Type.Boolean()), + lobsterPath: Type.Optional(Type.String()), + cwd: Type.Optional(Type.String()), + timeoutMs: Type.Optional(Type.Number()), + maxStdoutBytes: Type.Optional(Type.Number()), + }), + async execute(_id: string, params: Record) { + const action = String(params.action || "").trim(); + if (!action) throw new Error("action required"); + + const execPath = resolveExecutablePath( + typeof params.lobsterPath === "string" ? params.lobsterPath : undefined, + ); + const cwd = typeof params.cwd === "string" && params.cwd.trim() ? params.cwd.trim() : process.cwd(); + const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : 20_000; + const maxStdoutBytes = typeof params.maxStdoutBytes === "number" ? params.maxStdoutBytes : 512_000; + + const argv = (() => { + if (action === "run") { + const pipeline = typeof params.pipeline === "string" ? params.pipeline : ""; + if (!pipeline.trim()) throw new Error("pipeline required"); + return ["run", "--mode", "tool", pipeline]; + } + if (action === "resume") { + const token = typeof params.token === "string" ? params.token : ""; + if (!token.trim()) throw new Error("token required"); + const approve = params.approve; + if (typeof approve !== "boolean") throw new Error("approve required"); + return ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; + } + throw new Error(`Unknown action: ${action}`); + })(); + + if (api.runtime?.version && api.logger?.debug) { + api.logger.debug(`lobster plugin runtime=${api.runtime.version}`); + } + + const { stdout } = await runLobsterSubprocess({ + execPath, + argv, + cwd, + timeoutMs, + maxStdoutBytes, + }); + + const envelope = parseEnvelope(stdout); + + return { + content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }], + details: envelope, + }; + }, + }; +} diff --git a/src/agents/clawdbot-tools.ts b/src/agents/clawdbot-tools.ts index c95636fb3..5ae4891dc 100644 --- a/src/agents/clawdbot-tools.ts +++ b/src/agents/clawdbot-tools.ts @@ -9,7 +9,6 @@ import type { AnyAgentTool } from "./tools/common.js"; import { createCronTool } from "./tools/cron-tool.js"; import { createGatewayTool } from "./tools/gateway-tool.js"; import { createImageTool } from "./tools/image-tool.js"; -import { createLobsterTool } from "./tools/lobster-tool.js"; import { createMessageTool } from "./tools/message-tool.js"; import { createNodesTool } from "./tools/nodes-tool.js"; import { createSessionStatusTool } from "./tools/session-status-tool.js"; @@ -112,7 +111,6 @@ export function createClawdbotTools(options?: { agentSessionKey: options?.agentSessionKey, config: options?.config, }), - createLobsterTool({ sandboxed: options?.sandboxed }), ...(webSearchTool ? [webSearchTool] : []), ...(webFetchTool ? [webFetchTool] : []), ...(imageTool ? [imageTool] : []), diff --git a/src/agents/tools/lobster-tool.test.ts b/src/agents/tools/lobster-tool.test.ts deleted file mode 100644 index 26441f2d6..000000000 --- a/src/agents/tools/lobster-tool.test.ts +++ /dev/null @@ -1,122 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; - -import { describe, expect, it } from "vitest"; - -import { createLobsterTool } from "./lobster-tool.js"; - -async function writeFakeLobster(params: { - script: (args: string[]) => unknown; -}) { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-")); - const binPath = path.join(dir, "lobster"); - - const file = `#!/usr/bin/env node\n` + - `const args = process.argv.slice(2);\n` + - `const payload = (${params.script.toString()})(args);\n` + - `process.stdout.write(JSON.stringify(payload));\n`; - - await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 }); - return { dir, binPath }; -} - -describe("lobster tool", () => { - it("runs lobster in tool mode and returns envelope", async () => { - const fake = await writeFakeLobster({ - script: (args) => { - if (args[0] !== "run") throw new Error("expected run"); - return { - ok: true, - status: "ok", - output: [{ hello: "world" }], - requiresApproval: null, - }; - }, - }); - - const tool = createLobsterTool(); - const res = await tool.execute("call1", { - action: "run", - pipeline: "exec --json \"echo [1]\"", - lobsterPath: fake.binPath, - timeoutMs: 1000, - }); - - expect(res.details).toMatchObject({ - ok: true, - status: "ok", - output: [{ hello: "world" }], - requiresApproval: null, - }); - }); - - it("supports resume action", async () => { - const fake = await writeFakeLobster({ - script: (args) => { - if (args[0] !== "resume") throw new Error("expected resume"); - return { - ok: true, - status: "ok", - output: ["resumed"], - requiresApproval: null, - }; - }, - }); - - const tool = createLobsterTool(); - const res = await tool.execute("call2", { - action: "resume", - token: "tok", - approve: true, - lobsterPath: fake.binPath, - timeoutMs: 1000, - }); - - expect(res.details).toMatchObject({ ok: true, status: "ok" }); - }); - - it("rejects non-absolute lobsterPath", async () => { - const tool = createLobsterTool(); - await expect( - tool.execute("call3", { - action: "run", - pipeline: "json", - lobsterPath: "./lobster", - }), - ).rejects.toThrow(/absolute path/); - }); - - it("blocks tool in sandboxed mode", async () => { - const tool = createLobsterTool({ sandboxed: true }); - await expect( - tool.execute("call4", { - action: "run", - pipeline: "json", - lobsterPath: "/usr/bin/true", - }), - ).rejects.toThrow(/not available in sandboxed/); - }); - - it("rejects invalid JSON", async () => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-bad-")); - const binPath = path.join(dir, "lobster"); - await fs.writeFile( - binPath, - `#!/usr/bin/env node\nprocess.stdout.write('not-json');\n`, - { - encoding: "utf8", - mode: 0o755, - }, - ); - - const tool = createLobsterTool(); - await expect( - tool.execute("call5", { - action: "run", - pipeline: "json", - lobsterPath: binPath, - }), - ).rejects.toThrow(/invalid JSON/); - }); -}); diff --git a/src/agents/tools/lobster-tool.ts b/src/agents/tools/lobster-tool.ts deleted file mode 100644 index c01cab81a..000000000 --- a/src/agents/tools/lobster-tool.ts +++ /dev/null @@ -1,231 +0,0 @@ -import { Type } from "@sinclair/typebox"; -import { spawn } from "node:child_process"; -import path from "node:path"; - -import { stringEnum } from "../schema/typebox.js"; -import type { AnyAgentTool } from "./common.js"; -import { jsonResult, readNumberParam, readStringParam } from "./common.js"; - -const LobsterActions = ["run", "resume"] as const; - -type LobsterToolParams = { - action: (typeof LobsterActions)[number]; - pipeline?: string; - token?: string; - approve?: boolean; - lobsterPath?: string; - cwd?: string; - timeoutMs?: number; - maxStdoutBytes?: number; -}; - -type LobsterEnvelope = - | { - ok: true; - status: "ok" | "needs_approval" | "cancelled"; - output: unknown[]; - requiresApproval: null | { - type: "approval_request"; - prompt: string; - items: unknown[]; - resumeToken?: string; - }; - } - | { - ok: false; - error: { type?: string; message: string }; - }; - -function buildSchema() { - return Type.Object({ - action: stringEnum(LobsterActions), - pipeline: Type.Optional(Type.String({ description: "Lobster pipeline string." })), - token: Type.Optional(Type.String({ description: "Resume token from lobster tool mode." })), - approve: Type.Optional(Type.Boolean({ description: "Approval decision for resume." })), - lobsterPath: Type.Optional( - Type.String({ - description: - "Path to lobster executable. Prefer an absolute path to avoid PATH hijack. Defaults to 'lobster'.", - }), - ), - cwd: Type.Optional( - Type.String({ - description: "Working directory for lobster subprocess.", - }), - ), - timeoutMs: Type.Optional( - Type.Number({ - description: "Subprocess timeout (ms).", - }), - ), - maxStdoutBytes: Type.Optional( - Type.Number({ - description: "Max stdout bytes to read before aborting.", - }), - ), - }); -} - -function resolveExecutablePath(lobsterPathRaw: string | undefined) { - const lobsterPath = lobsterPathRaw?.trim() || "lobster"; - if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) { - throw new Error("lobsterPath must be an absolute path (or omit to use PATH)"); - } - return lobsterPath; -} - -async function runLobsterSubprocess(params: { - execPath: string; - argv: string[]; - cwd: string; - timeoutMs: number; - maxStdoutBytes: number; -}) { - const { execPath, argv, cwd } = params; - const timeoutMs = Math.max(200, params.timeoutMs); - const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); - - return await new Promise<{ stdout: string; exitCode: number | null }>((resolve, reject) => { - const child = spawn(execPath, argv, { - cwd, - stdio: ["ignore", "pipe", "pipe"], - env: { - ...process.env, - // Ensure lobster never tries to be interactive. - LOBSTER_MODE: "tool", - }, - }); - - let stdout = ""; - let stdoutBytes = 0; - let stderr = ""; - - child.stdout?.setEncoding("utf8"); - child.stderr?.setEncoding("utf8"); - - child.stdout?.on("data", (chunk) => { - const str = String(chunk); - stdoutBytes += Buffer.byteLength(str, "utf8"); - if (stdoutBytes > maxStdoutBytes) { - try { - child.kill("SIGKILL"); - } finally { - reject(new Error("lobster output exceeded maxStdoutBytes")); - } - return; - } - stdout += str; - }); - - child.stderr?.on("data", (chunk) => { - stderr += String(chunk); - }); - - const timer = setTimeout(() => { - try { - child.kill("SIGKILL"); - } finally { - reject(new Error("lobster subprocess timed out")); - } - }, timeoutMs); - - child.once("error", (err) => { - clearTimeout(timer); - reject(err); - }); - - child.once("exit", (code) => { - clearTimeout(timer); - if (code !== 0) { - reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`)); - return; - } - resolve({ stdout, exitCode: code }); - }); - }); -} - -function parseEnvelope(stdout: string): LobsterEnvelope { - let parsed: unknown; - try { - parsed = JSON.parse(stdout); - } catch { - throw new Error("lobster returned invalid JSON"); - } - - if (!parsed || typeof parsed !== "object") { - throw new Error("lobster returned invalid JSON envelope"); - } - - const ok = (parsed as { ok?: unknown }).ok; - if (ok === true) { - const env = parsed as LobsterEnvelope; - if (!Array.isArray((env as any).output)) { - throw new Error("lobster tool output must include output[]"); - } - return env; - } - - if (ok === false) { - const env = parsed as LobsterEnvelope; - const msg = (env as any)?.error?.message; - if (typeof msg !== "string" || !msg.trim()) { - throw new Error("lobster error envelope missing error.message"); - } - return env; - } - - throw new Error("lobster returned invalid JSON envelope"); -} - -export function createLobsterTool(options: { sandboxed?: boolean } = {}): AnyAgentTool { - const parameters = buildSchema(); - - return { - label: "Lobster", - name: "lobster", - description: - "Run Lobster pipelines as a local-first, typed workflow runtime (tool mode JSON envelope, resumable approvals).", - parameters, - async execute(_callId, paramsRaw) { - if (options.sandboxed) { - throw new Error("lobster tool is not available in sandboxed mode"); - } - - const params = paramsRaw as Record; - const action = readStringParam(params, "action", { required: true }) as LobsterToolParams["action"]; - - const execPath = resolveExecutablePath(readStringParam(params, "lobsterPath")); - const cwd = readStringParam(params, "cwd", { allowEmpty: false }) || process.cwd(); - - const timeoutMs = readNumberParam(params, "timeoutMs", { integer: true }) ?? 20_000; - const maxStdoutBytes = readNumberParam(params, "maxStdoutBytes", { integer: true }) ?? 512_000; - - let argv: string[]; - if (action === "run") { - const pipeline = readStringParam(params, "pipeline", { required: true, label: "pipeline" })!; - argv = ["run", "--mode", "tool", pipeline]; - } else if (action === "resume") { - const token = readStringParam(params, "token", { required: true, label: "token" })!; - const approve = params["approve"]; - if (typeof approve !== "boolean") { - throw new Error("approve required"); - } - argv = ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; - } else { - throw new Error(`Unknown action: ${action}`); - } - - const { stdout } = await runLobsterSubprocess({ - execPath, - argv, - cwd, - timeoutMs, - maxStdoutBytes, - }); - - const envelope = parseEnvelope(stdout); - return jsonResult(envelope); - }, - }; -}