From 147fccd967e14c9a507ad57234a4bcd23a9ba29c Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Sat, 17 Jan 2026 19:00:58 -0800 Subject: [PATCH 1/5] Add lobster tool for running local Lobster pipelines --- docs/tools/lobster.md | 64 +++++++ src/agents/clawdbot-tools.ts | 2 + src/agents/tools/lobster-tool.test.ts | 122 ++++++++++++++ src/agents/tools/lobster-tool.ts | 231 ++++++++++++++++++++++++++ 4 files changed, 419 insertions(+) create mode 100644 docs/tools/lobster.md create mode 100644 src/agents/tools/lobster-tool.test.ts create mode 100644 src/agents/tools/lobster-tool.ts diff --git a/docs/tools/lobster.md b/docs/tools/lobster.md new file mode 100644 index 000000000..f54fc3da2 --- /dev/null +++ b/docs/tools/lobster.md @@ -0,0 +1,64 @@ +--- +title: Lobster +description: Run Lobster pipelines (typed workflows) as a first-class Clawdbot tool. +--- + +# Lobster + +The `lobster` tool lets Clawdbot run Lobster pipelines as a **local-first, typed workflow runtime**. + +This is designed for: +- Deterministic orchestration (move multi-step tool workflows out of the LLM) +- Human-in-the-loop approvals that **halt and resume** +- Lower token usage (one `lobster.run` call instead of many tool calls) + +## Security model + +- Lobster runs as a **local subprocess**. +- Lobster does **not** manage OAuth or secrets. +- Side effects still go through Clawdbot tools (messaging, files, etc.). + +Recommendations: +- Prefer configuring `lobsterPath` as an **absolute path** to avoid PATH hijack. +- Use Lobster approvals (`approve`) for any side-effectful step. + +## Actions + +### `run` + +Run a pipeline in tool mode. + +Example: + +```json +{ + "action": "run", + "pipeline": "exec --json \"echo [1]\" | approve --prompt 'ok?'", + "lobsterPath": "/absolute/path/to/lobster", + "timeoutMs": 20000 +} +``` + +### `resume` + +Resume a halted pipeline. + +Example: + +```json +{ + "action": "resume", + "token": "", + "approve": true, + "lobsterPath": "/absolute/path/to/lobster" +} +``` + +## Output + +Lobster returns a JSON envelope: + +- `ok`: boolean +- `status`: `ok` | `needs_approval` | `cancelled` +- `output`: array of items +- `requiresApproval`: approval request object (when `status=needs_approval`) diff --git a/src/agents/clawdbot-tools.ts b/src/agents/clawdbot-tools.ts index 5ae4891dc..c95636fb3 100644 --- a/src/agents/clawdbot-tools.ts +++ b/src/agents/clawdbot-tools.ts @@ -9,6 +9,7 @@ import type { AnyAgentTool } from "./tools/common.js"; import { createCronTool } from "./tools/cron-tool.js"; import { createGatewayTool } from "./tools/gateway-tool.js"; import { createImageTool } from "./tools/image-tool.js"; +import { createLobsterTool } from "./tools/lobster-tool.js"; import { createMessageTool } from "./tools/message-tool.js"; import { createNodesTool } from "./tools/nodes-tool.js"; import { createSessionStatusTool } from "./tools/session-status-tool.js"; @@ -111,6 +112,7 @@ export function createClawdbotTools(options?: { agentSessionKey: options?.agentSessionKey, config: options?.config, }), + createLobsterTool({ sandboxed: options?.sandboxed }), ...(webSearchTool ? [webSearchTool] : []), ...(webFetchTool ? [webFetchTool] : []), ...(imageTool ? [imageTool] : []), diff --git a/src/agents/tools/lobster-tool.test.ts b/src/agents/tools/lobster-tool.test.ts new file mode 100644 index 000000000..26441f2d6 --- /dev/null +++ b/src/agents/tools/lobster-tool.test.ts @@ -0,0 +1,122 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { createLobsterTool } from "./lobster-tool.js"; + +async function writeFakeLobster(params: { + script: (args: string[]) => unknown; +}) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-")); + const binPath = path.join(dir, "lobster"); + + const file = `#!/usr/bin/env node\n` + + `const args = process.argv.slice(2);\n` + + `const payload = (${params.script.toString()})(args);\n` + + `process.stdout.write(JSON.stringify(payload));\n`; + + await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 }); + return { dir, binPath }; +} + +describe("lobster tool", () => { + it("runs lobster in tool mode and returns envelope", async () => { + const fake = await writeFakeLobster({ + script: (args) => { + if (args[0] !== "run") throw new Error("expected run"); + return { + ok: true, + status: "ok", + output: [{ hello: "world" }], + requiresApproval: null, + }; + }, + }); + + const tool = createLobsterTool(); + const res = await tool.execute("call1", { + action: "run", + pipeline: "exec --json \"echo [1]\"", + lobsterPath: fake.binPath, + timeoutMs: 1000, + }); + + expect(res.details).toMatchObject({ + ok: true, + status: "ok", + output: [{ hello: "world" }], + requiresApproval: null, + }); + }); + + it("supports resume action", async () => { + const fake = await writeFakeLobster({ + script: (args) => { + if (args[0] !== "resume") throw new Error("expected resume"); + return { + ok: true, + status: "ok", + output: ["resumed"], + requiresApproval: null, + }; + }, + }); + + const tool = createLobsterTool(); + const res = await tool.execute("call2", { + action: "resume", + token: "tok", + approve: true, + lobsterPath: fake.binPath, + timeoutMs: 1000, + }); + + expect(res.details).toMatchObject({ ok: true, status: "ok" }); + }); + + it("rejects non-absolute lobsterPath", async () => { + const tool = createLobsterTool(); + await expect( + tool.execute("call3", { + action: "run", + pipeline: "json", + lobsterPath: "./lobster", + }), + ).rejects.toThrow(/absolute path/); + }); + + it("blocks tool in sandboxed mode", async () => { + const tool = createLobsterTool({ sandboxed: true }); + await expect( + tool.execute("call4", { + action: "run", + pipeline: "json", + lobsterPath: "/usr/bin/true", + }), + ).rejects.toThrow(/not available in sandboxed/); + }); + + it("rejects invalid JSON", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-bad-")); + const binPath = path.join(dir, "lobster"); + await fs.writeFile( + binPath, + `#!/usr/bin/env node\nprocess.stdout.write('not-json');\n`, + { + encoding: "utf8", + mode: 0o755, + }, + ); + + const tool = createLobsterTool(); + await expect( + tool.execute("call5", { + action: "run", + pipeline: "json", + lobsterPath: binPath, + }), + ).rejects.toThrow(/invalid JSON/); + }); +}); diff --git a/src/agents/tools/lobster-tool.ts b/src/agents/tools/lobster-tool.ts new file mode 100644 index 000000000..c01cab81a --- /dev/null +++ b/src/agents/tools/lobster-tool.ts @@ -0,0 +1,231 @@ +import { Type } from "@sinclair/typebox"; +import { spawn } from "node:child_process"; +import path from "node:path"; + +import { stringEnum } from "../schema/typebox.js"; +import type { AnyAgentTool } from "./common.js"; +import { jsonResult, readNumberParam, readStringParam } from "./common.js"; + +const LobsterActions = ["run", "resume"] as const; + +type LobsterToolParams = { + action: (typeof LobsterActions)[number]; + pipeline?: string; + token?: string; + approve?: boolean; + lobsterPath?: string; + cwd?: string; + timeoutMs?: number; + maxStdoutBytes?: number; +}; + +type LobsterEnvelope = + | { + ok: true; + status: "ok" | "needs_approval" | "cancelled"; + output: unknown[]; + requiresApproval: null | { + type: "approval_request"; + prompt: string; + items: unknown[]; + resumeToken?: string; + }; + } + | { + ok: false; + error: { type?: string; message: string }; + }; + +function buildSchema() { + return Type.Object({ + action: stringEnum(LobsterActions), + pipeline: Type.Optional(Type.String({ description: "Lobster pipeline string." })), + token: Type.Optional(Type.String({ description: "Resume token from lobster tool mode." })), + approve: Type.Optional(Type.Boolean({ description: "Approval decision for resume." })), + lobsterPath: Type.Optional( + Type.String({ + description: + "Path to lobster executable. Prefer an absolute path to avoid PATH hijack. Defaults to 'lobster'.", + }), + ), + cwd: Type.Optional( + Type.String({ + description: "Working directory for lobster subprocess.", + }), + ), + timeoutMs: Type.Optional( + Type.Number({ + description: "Subprocess timeout (ms).", + }), + ), + maxStdoutBytes: Type.Optional( + Type.Number({ + description: "Max stdout bytes to read before aborting.", + }), + ), + }); +} + +function resolveExecutablePath(lobsterPathRaw: string | undefined) { + const lobsterPath = lobsterPathRaw?.trim() || "lobster"; + if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) { + throw new Error("lobsterPath must be an absolute path (or omit to use PATH)"); + } + return lobsterPath; +} + +async function runLobsterSubprocess(params: { + execPath: string; + argv: string[]; + cwd: string; + timeoutMs: number; + maxStdoutBytes: number; +}) { + const { execPath, argv, cwd } = params; + const timeoutMs = Math.max(200, params.timeoutMs); + const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); + + return await new Promise<{ stdout: string; exitCode: number | null }>((resolve, reject) => { + const child = spawn(execPath, argv, { + cwd, + stdio: ["ignore", "pipe", "pipe"], + env: { + ...process.env, + // Ensure lobster never tries to be interactive. + LOBSTER_MODE: "tool", + }, + }); + + let stdout = ""; + let stdoutBytes = 0; + let stderr = ""; + + child.stdout?.setEncoding("utf8"); + child.stderr?.setEncoding("utf8"); + + child.stdout?.on("data", (chunk) => { + const str = String(chunk); + stdoutBytes += Buffer.byteLength(str, "utf8"); + if (stdoutBytes > maxStdoutBytes) { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster output exceeded maxStdoutBytes")); + } + return; + } + stdout += str; + }); + + child.stderr?.on("data", (chunk) => { + stderr += String(chunk); + }); + + const timer = setTimeout(() => { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster subprocess timed out")); + } + }, timeoutMs); + + child.once("error", (err) => { + clearTimeout(timer); + reject(err); + }); + + child.once("exit", (code) => { + clearTimeout(timer); + if (code !== 0) { + reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`)); + return; + } + resolve({ stdout, exitCode: code }); + }); + }); +} + +function parseEnvelope(stdout: string): LobsterEnvelope { + let parsed: unknown; + try { + parsed = JSON.parse(stdout); + } catch { + throw new Error("lobster returned invalid JSON"); + } + + if (!parsed || typeof parsed !== "object") { + throw new Error("lobster returned invalid JSON envelope"); + } + + const ok = (parsed as { ok?: unknown }).ok; + if (ok === true) { + const env = parsed as LobsterEnvelope; + if (!Array.isArray((env as any).output)) { + throw new Error("lobster tool output must include output[]"); + } + return env; + } + + if (ok === false) { + const env = parsed as LobsterEnvelope; + const msg = (env as any)?.error?.message; + if (typeof msg !== "string" || !msg.trim()) { + throw new Error("lobster error envelope missing error.message"); + } + return env; + } + + throw new Error("lobster returned invalid JSON envelope"); +} + +export function createLobsterTool(options: { sandboxed?: boolean } = {}): AnyAgentTool { + const parameters = buildSchema(); + + return { + label: "Lobster", + name: "lobster", + description: + "Run Lobster pipelines as a local-first, typed workflow runtime (tool mode JSON envelope, resumable approvals).", + parameters, + async execute(_callId, paramsRaw) { + if (options.sandboxed) { + throw new Error("lobster tool is not available in sandboxed mode"); + } + + const params = paramsRaw as Record; + const action = readStringParam(params, "action", { required: true }) as LobsterToolParams["action"]; + + const execPath = resolveExecutablePath(readStringParam(params, "lobsterPath")); + const cwd = readStringParam(params, "cwd", { allowEmpty: false }) || process.cwd(); + + const timeoutMs = readNumberParam(params, "timeoutMs", { integer: true }) ?? 20_000; + const maxStdoutBytes = readNumberParam(params, "maxStdoutBytes", { integer: true }) ?? 512_000; + + let argv: string[]; + if (action === "run") { + const pipeline = readStringParam(params, "pipeline", { required: true, label: "pipeline" })!; + argv = ["run", "--mode", "tool", pipeline]; + } else if (action === "resume") { + const token = readStringParam(params, "token", { required: true, label: "token" })!; + const approve = params["approve"]; + if (typeof approve !== "boolean") { + throw new Error("approve required"); + } + argv = ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; + } else { + throw new Error(`Unknown action: ${action}`); + } + + const { stdout } = await runLobsterSubprocess({ + execPath, + argv, + cwd, + timeoutMs, + maxStdoutBytes, + }); + + const envelope = parseEnvelope(stdout); + return jsonResult(envelope); + }, + }; +} From b2650ba6725f11b936a91e6c008e099d32f2aa47 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Sat, 17 Jan 2026 20:18:54 -0800 Subject: [PATCH 2/5] Move lobster integration to optional plugin tool --- docs/tools/lobster.md | 64 ------ extensions/lobster/README.md | 38 ++++ extensions/lobster/index.ts | 7 + extensions/lobster/package.json | 9 + extensions/lobster/src/lobster-tool.test.ts | 87 ++++++++ extensions/lobster/src/lobster-tool.ts | 185 ++++++++++++++++ src/agents/clawdbot-tools.ts | 2 - src/agents/tools/lobster-tool.test.ts | 122 ----------- src/agents/tools/lobster-tool.ts | 231 -------------------- 9 files changed, 326 insertions(+), 419 deletions(-) delete mode 100644 docs/tools/lobster.md create mode 100644 extensions/lobster/README.md create mode 100644 extensions/lobster/index.ts create mode 100644 extensions/lobster/package.json create mode 100644 extensions/lobster/src/lobster-tool.test.ts create mode 100644 extensions/lobster/src/lobster-tool.ts delete mode 100644 src/agents/tools/lobster-tool.test.ts delete mode 100644 src/agents/tools/lobster-tool.ts diff --git a/docs/tools/lobster.md b/docs/tools/lobster.md deleted file mode 100644 index f54fc3da2..000000000 --- a/docs/tools/lobster.md +++ /dev/null @@ -1,64 +0,0 @@ ---- -title: Lobster -description: Run Lobster pipelines (typed workflows) as a first-class Clawdbot tool. ---- - -# Lobster - -The `lobster` tool lets Clawdbot run Lobster pipelines as a **local-first, typed workflow runtime**. - -This is designed for: -- Deterministic orchestration (move multi-step tool workflows out of the LLM) -- Human-in-the-loop approvals that **halt and resume** -- Lower token usage (one `lobster.run` call instead of many tool calls) - -## Security model - -- Lobster runs as a **local subprocess**. -- Lobster does **not** manage OAuth or secrets. -- Side effects still go through Clawdbot tools (messaging, files, etc.). - -Recommendations: -- Prefer configuring `lobsterPath` as an **absolute path** to avoid PATH hijack. -- Use Lobster approvals (`approve`) for any side-effectful step. - -## Actions - -### `run` - -Run a pipeline in tool mode. - -Example: - -```json -{ - "action": "run", - "pipeline": "exec --json \"echo [1]\" | approve --prompt 'ok?'", - "lobsterPath": "/absolute/path/to/lobster", - "timeoutMs": 20000 -} -``` - -### `resume` - -Resume a halted pipeline. - -Example: - -```json -{ - "action": "resume", - "token": "", - "approve": true, - "lobsterPath": "/absolute/path/to/lobster" -} -``` - -## Output - -Lobster returns a JSON envelope: - -- `ok`: boolean -- `status`: `ok` | `needs_approval` | `cancelled` -- `output`: array of items -- `requiresApproval`: approval request object (when `status=needs_approval`) diff --git a/extensions/lobster/README.md b/extensions/lobster/README.md new file mode 100644 index 000000000..13e675b45 --- /dev/null +++ b/extensions/lobster/README.md @@ -0,0 +1,38 @@ +# Lobster (plugin) + +Adds the `lobster` agent tool as an **optional** plugin tool. + +## What this is + +- Lobster is a standalone workflow shell (typed JSON-first pipelines + approvals/resume). +- This plugin integrates Lobster with Clawdbot *without core changes*. + +## Enable + +Because this tool can trigger side effects (via workflows), it is registered with `optional: true`. + +Enable it in an agent allowlist: + +```json +{ + "agents": { + "list": [ + { + "id": "main", + "tools": { + "allow": [ + "lobster" // plugin id (enables all tools from this plugin) + ] + } + } + ] + } +} +``` + +## Security + +- Runs the `lobster` executable as a local subprocess. +- Does not manage OAuth/tokens. +- Uses timeouts, stdout caps, and strict JSON envelope parsing. +- Prefer an absolute `lobsterPath` in production to avoid PATH hijack. diff --git a/extensions/lobster/index.ts b/extensions/lobster/index.ts new file mode 100644 index 000000000..f1c06c554 --- /dev/null +++ b/extensions/lobster/index.ts @@ -0,0 +1,7 @@ +import type { ClawdbotPluginApi } from "../../src/plugins/types.js"; + +import { createLobsterTool } from "./src/lobster-tool.js"; + +export default function register(api: ClawdbotPluginApi) { + api.registerTool(createLobsterTool(api), { optional: true }); +} diff --git a/extensions/lobster/package.json b/extensions/lobster/package.json new file mode 100644 index 000000000..606975434 --- /dev/null +++ b/extensions/lobster/package.json @@ -0,0 +1,9 @@ +{ + "name": "@clawdbot/lobster", + "version": "2026.1.17-1", + "type": "module", + "description": "Lobster workflow tool plugin (typed pipelines + resumable approvals)", + "clawdbot": { + "extensions": ["./index.ts"] + } +} diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts new file mode 100644 index 000000000..3b1dae859 --- /dev/null +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -0,0 +1,87 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import type { ClawdbotPluginApi } from "../../../src/plugins/types.js"; +import { createLobsterTool } from "./lobster-tool.js"; + +async function writeFakeLobster(params: { + payload: unknown; +}) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-plugin-")); + const binPath = path.join(dir, "lobster"); + + const file = `#!/usr/bin/env node\n` + + `process.stdout.write(JSON.stringify(${JSON.stringify(params.payload)}));\n`; + + await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 }); + return { dir, binPath }; +} + +function fakeApi(): ClawdbotPluginApi { + return { + id: "lobster", + name: "lobster", + source: "test", + config: {} as any, + runtime: { version: "test" } as any, + logger: { info() {}, warn() {}, error() {}, debug() {} }, + registerTool() {}, + registerHttpHandler() {}, + registerChannel() {}, + registerGatewayMethod() {}, + registerCli() {}, + registerService() {}, + registerProvider() {}, + resolvePath: (p) => p, + }; +} + +describe("lobster plugin tool", () => { + it("runs lobster and returns parsed envelope in details", async () => { + const fake = await writeFakeLobster({ + payload: { ok: true, status: "ok", output: [{ hello: "world" }], requiresApproval: null }, + }); + + const tool = createLobsterTool(fakeApi()); + const res = await tool.execute("call1", { + action: "run", + pipeline: "noop", + lobsterPath: fake.binPath, + timeoutMs: 1000, + }); + + expect(res.details).toMatchObject({ ok: true, status: "ok" }); + }); + + it("requires absolute lobsterPath when provided", async () => { + const tool = createLobsterTool(fakeApi()); + await expect( + tool.execute("call2", { + action: "run", + pipeline: "noop", + lobsterPath: "./lobster", + }), + ).rejects.toThrow(/absolute path/); + }); + + it("rejects invalid JSON from lobster", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-plugin-bad-")); + const binPath = path.join(dir, "lobster"); + await fs.writeFile(binPath, `#!/usr/bin/env node\nprocess.stdout.write('nope');\n`, { + encoding: "utf8", + mode: 0o755, + }); + + const tool = createLobsterTool(fakeApi()); + await expect( + tool.execute("call3", { + action: "run", + pipeline: "noop", + lobsterPath: binPath, + }), + ).rejects.toThrow(/invalid JSON/); + }); +}); diff --git a/extensions/lobster/src/lobster-tool.ts b/extensions/lobster/src/lobster-tool.ts new file mode 100644 index 000000000..2d76e0821 --- /dev/null +++ b/extensions/lobster/src/lobster-tool.ts @@ -0,0 +1,185 @@ +import { Type } from "@sinclair/typebox"; +import { spawn } from "node:child_process"; +import path from "node:path"; + +import type { ClawdbotPluginApi } from "../../../src/plugins/types.js"; + +type LobsterEnvelope = + | { + ok: true; + status: "ok" | "needs_approval" | "cancelled"; + output: unknown[]; + requiresApproval: null | { + type: "approval_request"; + prompt: string; + items: unknown[]; + resumeToken?: string; + }; + } + | { + ok: false; + error: { type?: string; message: string }; + }; + +function resolveExecutablePath(lobsterPathRaw: string | undefined) { + const lobsterPath = lobsterPathRaw?.trim() || "lobster"; + if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) { + throw new Error("lobsterPath must be an absolute path (or omit to use PATH)"); + } + return lobsterPath; +} + +async function runLobsterSubprocess(params: { + execPath: string; + argv: string[]; + cwd: string; + timeoutMs: number; + maxStdoutBytes: number; +}) { + const { execPath, argv, cwd } = params; + const timeoutMs = Math.max(200, params.timeoutMs); + const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); + + return await new Promise<{ stdout: string }>((resolve, reject) => { + const child = spawn(execPath, argv, { + cwd, + stdio: ["ignore", "pipe", "pipe"], + env: { + ...process.env, + LOBSTER_MODE: "tool", + }, + }); + + let stdout = ""; + let stdoutBytes = 0; + let stderr = ""; + + child.stdout?.setEncoding("utf8"); + child.stderr?.setEncoding("utf8"); + + child.stdout?.on("data", (chunk) => { + const str = String(chunk); + stdoutBytes += Buffer.byteLength(str, "utf8"); + if (stdoutBytes > maxStdoutBytes) { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster output exceeded maxStdoutBytes")); + } + return; + } + stdout += str; + }); + + child.stderr?.on("data", (chunk) => { + stderr += String(chunk); + }); + + const timer = setTimeout(() => { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster subprocess timed out")); + } + }, timeoutMs); + + child.once("error", (err) => { + clearTimeout(timer); + reject(err); + }); + + child.once("exit", (code) => { + clearTimeout(timer); + if (code !== 0) { + reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`)); + return; + } + resolve({ stdout }); + }); + }); +} + +function parseEnvelope(stdout: string): LobsterEnvelope { + let parsed: unknown; + try { + parsed = JSON.parse(stdout); + } catch { + throw new Error("lobster returned invalid JSON"); + } + + if (!parsed || typeof parsed !== "object") { + throw new Error("lobster returned invalid JSON envelope"); + } + + const ok = (parsed as { ok?: unknown }).ok; + if (ok === true || ok === false) { + return parsed as LobsterEnvelope; + } + + throw new Error("lobster returned invalid JSON envelope"); +} + +export function createLobsterTool(api: ClawdbotPluginApi) { + return { + name: "lobster", + description: + "Run Lobster pipelines as a local-first workflow runtime (typed JSON envelope + resumable approvals).", + parameters: Type.Object({ + // NOTE: Prefer string enums in tool schemas; some providers reject unions/anyOf. + action: Type.Unsafe<"run" | "resume">({ type: "string", enum: ["run", "resume"] }), + pipeline: Type.Optional(Type.String()), + token: Type.Optional(Type.String()), + approve: Type.Optional(Type.Boolean()), + lobsterPath: Type.Optional(Type.String()), + cwd: Type.Optional(Type.String()), + timeoutMs: Type.Optional(Type.Number()), + maxStdoutBytes: Type.Optional(Type.Number()), + }), + async execute(_id: string, params: Record) { + const action = String(params.action || "").trim(); + if (!action) throw new Error("action required"); + + const execPath = resolveExecutablePath( + typeof params.lobsterPath === "string" ? params.lobsterPath : undefined, + ); + const cwd = typeof params.cwd === "string" && params.cwd.trim() ? params.cwd.trim() : process.cwd(); + const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : 20_000; + const maxStdoutBytes = typeof params.maxStdoutBytes === "number" ? params.maxStdoutBytes : 512_000; + + const argv = (() => { + if (action === "run") { + const pipeline = typeof params.pipeline === "string" ? params.pipeline : ""; + if (!pipeline.trim()) throw new Error("pipeline required"); + return ["run", "--mode", "tool", pipeline]; + } + if (action === "resume") { + const token = typeof params.token === "string" ? params.token : ""; + if (!token.trim()) throw new Error("token required"); + const approve = params.approve; + if (typeof approve !== "boolean") throw new Error("approve required"); + return ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; + } + throw new Error(`Unknown action: ${action}`); + })(); + + if (api.runtime?.version && api.logger?.debug) { + api.logger.debug(`lobster plugin runtime=${api.runtime.version}`); + } + + const { stdout } = await runLobsterSubprocess({ + execPath, + argv, + cwd, + timeoutMs, + maxStdoutBytes, + }); + + const envelope = parseEnvelope(stdout); + + return { + content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }], + details: envelope, + }; + }, + }; +} diff --git a/src/agents/clawdbot-tools.ts b/src/agents/clawdbot-tools.ts index c95636fb3..5ae4891dc 100644 --- a/src/agents/clawdbot-tools.ts +++ b/src/agents/clawdbot-tools.ts @@ -9,7 +9,6 @@ import type { AnyAgentTool } from "./tools/common.js"; import { createCronTool } from "./tools/cron-tool.js"; import { createGatewayTool } from "./tools/gateway-tool.js"; import { createImageTool } from "./tools/image-tool.js"; -import { createLobsterTool } from "./tools/lobster-tool.js"; import { createMessageTool } from "./tools/message-tool.js"; import { createNodesTool } from "./tools/nodes-tool.js"; import { createSessionStatusTool } from "./tools/session-status-tool.js"; @@ -112,7 +111,6 @@ export function createClawdbotTools(options?: { agentSessionKey: options?.agentSessionKey, config: options?.config, }), - createLobsterTool({ sandboxed: options?.sandboxed }), ...(webSearchTool ? [webSearchTool] : []), ...(webFetchTool ? [webFetchTool] : []), ...(imageTool ? [imageTool] : []), diff --git a/src/agents/tools/lobster-tool.test.ts b/src/agents/tools/lobster-tool.test.ts deleted file mode 100644 index 26441f2d6..000000000 --- a/src/agents/tools/lobster-tool.test.ts +++ /dev/null @@ -1,122 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; - -import { describe, expect, it } from "vitest"; - -import { createLobsterTool } from "./lobster-tool.js"; - -async function writeFakeLobster(params: { - script: (args: string[]) => unknown; -}) { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-")); - const binPath = path.join(dir, "lobster"); - - const file = `#!/usr/bin/env node\n` + - `const args = process.argv.slice(2);\n` + - `const payload = (${params.script.toString()})(args);\n` + - `process.stdout.write(JSON.stringify(payload));\n`; - - await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 }); - return { dir, binPath }; -} - -describe("lobster tool", () => { - it("runs lobster in tool mode and returns envelope", async () => { - const fake = await writeFakeLobster({ - script: (args) => { - if (args[0] !== "run") throw new Error("expected run"); - return { - ok: true, - status: "ok", - output: [{ hello: "world" }], - requiresApproval: null, - }; - }, - }); - - const tool = createLobsterTool(); - const res = await tool.execute("call1", { - action: "run", - pipeline: "exec --json \"echo [1]\"", - lobsterPath: fake.binPath, - timeoutMs: 1000, - }); - - expect(res.details).toMatchObject({ - ok: true, - status: "ok", - output: [{ hello: "world" }], - requiresApproval: null, - }); - }); - - it("supports resume action", async () => { - const fake = await writeFakeLobster({ - script: (args) => { - if (args[0] !== "resume") throw new Error("expected resume"); - return { - ok: true, - status: "ok", - output: ["resumed"], - requiresApproval: null, - }; - }, - }); - - const tool = createLobsterTool(); - const res = await tool.execute("call2", { - action: "resume", - token: "tok", - approve: true, - lobsterPath: fake.binPath, - timeoutMs: 1000, - }); - - expect(res.details).toMatchObject({ ok: true, status: "ok" }); - }); - - it("rejects non-absolute lobsterPath", async () => { - const tool = createLobsterTool(); - await expect( - tool.execute("call3", { - action: "run", - pipeline: "json", - lobsterPath: "./lobster", - }), - ).rejects.toThrow(/absolute path/); - }); - - it("blocks tool in sandboxed mode", async () => { - const tool = createLobsterTool({ sandboxed: true }); - await expect( - tool.execute("call4", { - action: "run", - pipeline: "json", - lobsterPath: "/usr/bin/true", - }), - ).rejects.toThrow(/not available in sandboxed/); - }); - - it("rejects invalid JSON", async () => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-bad-")); - const binPath = path.join(dir, "lobster"); - await fs.writeFile( - binPath, - `#!/usr/bin/env node\nprocess.stdout.write('not-json');\n`, - { - encoding: "utf8", - mode: 0o755, - }, - ); - - const tool = createLobsterTool(); - await expect( - tool.execute("call5", { - action: "run", - pipeline: "json", - lobsterPath: binPath, - }), - ).rejects.toThrow(/invalid JSON/); - }); -}); diff --git a/src/agents/tools/lobster-tool.ts b/src/agents/tools/lobster-tool.ts deleted file mode 100644 index c01cab81a..000000000 --- a/src/agents/tools/lobster-tool.ts +++ /dev/null @@ -1,231 +0,0 @@ -import { Type } from "@sinclair/typebox"; -import { spawn } from "node:child_process"; -import path from "node:path"; - -import { stringEnum } from "../schema/typebox.js"; -import type { AnyAgentTool } from "./common.js"; -import { jsonResult, readNumberParam, readStringParam } from "./common.js"; - -const LobsterActions = ["run", "resume"] as const; - -type LobsterToolParams = { - action: (typeof LobsterActions)[number]; - pipeline?: string; - token?: string; - approve?: boolean; - lobsterPath?: string; - cwd?: string; - timeoutMs?: number; - maxStdoutBytes?: number; -}; - -type LobsterEnvelope = - | { - ok: true; - status: "ok" | "needs_approval" | "cancelled"; - output: unknown[]; - requiresApproval: null | { - type: "approval_request"; - prompt: string; - items: unknown[]; - resumeToken?: string; - }; - } - | { - ok: false; - error: { type?: string; message: string }; - }; - -function buildSchema() { - return Type.Object({ - action: stringEnum(LobsterActions), - pipeline: Type.Optional(Type.String({ description: "Lobster pipeline string." })), - token: Type.Optional(Type.String({ description: "Resume token from lobster tool mode." })), - approve: Type.Optional(Type.Boolean({ description: "Approval decision for resume." })), - lobsterPath: Type.Optional( - Type.String({ - description: - "Path to lobster executable. Prefer an absolute path to avoid PATH hijack. Defaults to 'lobster'.", - }), - ), - cwd: Type.Optional( - Type.String({ - description: "Working directory for lobster subprocess.", - }), - ), - timeoutMs: Type.Optional( - Type.Number({ - description: "Subprocess timeout (ms).", - }), - ), - maxStdoutBytes: Type.Optional( - Type.Number({ - description: "Max stdout bytes to read before aborting.", - }), - ), - }); -} - -function resolveExecutablePath(lobsterPathRaw: string | undefined) { - const lobsterPath = lobsterPathRaw?.trim() || "lobster"; - if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) { - throw new Error("lobsterPath must be an absolute path (or omit to use PATH)"); - } - return lobsterPath; -} - -async function runLobsterSubprocess(params: { - execPath: string; - argv: string[]; - cwd: string; - timeoutMs: number; - maxStdoutBytes: number; -}) { - const { execPath, argv, cwd } = params; - const timeoutMs = Math.max(200, params.timeoutMs); - const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); - - return await new Promise<{ stdout: string; exitCode: number | null }>((resolve, reject) => { - const child = spawn(execPath, argv, { - cwd, - stdio: ["ignore", "pipe", "pipe"], - env: { - ...process.env, - // Ensure lobster never tries to be interactive. - LOBSTER_MODE: "tool", - }, - }); - - let stdout = ""; - let stdoutBytes = 0; - let stderr = ""; - - child.stdout?.setEncoding("utf8"); - child.stderr?.setEncoding("utf8"); - - child.stdout?.on("data", (chunk) => { - const str = String(chunk); - stdoutBytes += Buffer.byteLength(str, "utf8"); - if (stdoutBytes > maxStdoutBytes) { - try { - child.kill("SIGKILL"); - } finally { - reject(new Error("lobster output exceeded maxStdoutBytes")); - } - return; - } - stdout += str; - }); - - child.stderr?.on("data", (chunk) => { - stderr += String(chunk); - }); - - const timer = setTimeout(() => { - try { - child.kill("SIGKILL"); - } finally { - reject(new Error("lobster subprocess timed out")); - } - }, timeoutMs); - - child.once("error", (err) => { - clearTimeout(timer); - reject(err); - }); - - child.once("exit", (code) => { - clearTimeout(timer); - if (code !== 0) { - reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`)); - return; - } - resolve({ stdout, exitCode: code }); - }); - }); -} - -function parseEnvelope(stdout: string): LobsterEnvelope { - let parsed: unknown; - try { - parsed = JSON.parse(stdout); - } catch { - throw new Error("lobster returned invalid JSON"); - } - - if (!parsed || typeof parsed !== "object") { - throw new Error("lobster returned invalid JSON envelope"); - } - - const ok = (parsed as { ok?: unknown }).ok; - if (ok === true) { - const env = parsed as LobsterEnvelope; - if (!Array.isArray((env as any).output)) { - throw new Error("lobster tool output must include output[]"); - } - return env; - } - - if (ok === false) { - const env = parsed as LobsterEnvelope; - const msg = (env as any)?.error?.message; - if (typeof msg !== "string" || !msg.trim()) { - throw new Error("lobster error envelope missing error.message"); - } - return env; - } - - throw new Error("lobster returned invalid JSON envelope"); -} - -export function createLobsterTool(options: { sandboxed?: boolean } = {}): AnyAgentTool { - const parameters = buildSchema(); - - return { - label: "Lobster", - name: "lobster", - description: - "Run Lobster pipelines as a local-first, typed workflow runtime (tool mode JSON envelope, resumable approvals).", - parameters, - async execute(_callId, paramsRaw) { - if (options.sandboxed) { - throw new Error("lobster tool is not available in sandboxed mode"); - } - - const params = paramsRaw as Record; - const action = readStringParam(params, "action", { required: true }) as LobsterToolParams["action"]; - - const execPath = resolveExecutablePath(readStringParam(params, "lobsterPath")); - const cwd = readStringParam(params, "cwd", { allowEmpty: false }) || process.cwd(); - - const timeoutMs = readNumberParam(params, "timeoutMs", { integer: true }) ?? 20_000; - const maxStdoutBytes = readNumberParam(params, "maxStdoutBytes", { integer: true }) ?? 512_000; - - let argv: string[]; - if (action === "run") { - const pipeline = readStringParam(params, "pipeline", { required: true, label: "pipeline" })!; - argv = ["run", "--mode", "tool", pipeline]; - } else if (action === "resume") { - const token = readStringParam(params, "token", { required: true, label: "token" })!; - const approve = params["approve"]; - if (typeof approve !== "boolean") { - throw new Error("approve required"); - } - argv = ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; - } else { - throw new Error(`Unknown action: ${action}`); - } - - const { stdout } = await runLobsterSubprocess({ - execPath, - argv, - cwd, - timeoutMs, - maxStdoutBytes, - }); - - const envelope = parseEnvelope(stdout); - return jsonResult(envelope); - }, - }; -} From e011c764a7adbef8f1a8993b5b302437e776f112 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Sat, 17 Jan 2026 20:33:31 -0800 Subject: [PATCH 3/5] Gate lobster plugin tool in sandboxed contexts --- extensions/lobster/index.ts | 8 +++++- extensions/lobster/src/lobster-tool.test.ts | 27 ++++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/extensions/lobster/index.ts b/extensions/lobster/index.ts index f1c06c554..5f459c939 100644 --- a/extensions/lobster/index.ts +++ b/extensions/lobster/index.ts @@ -3,5 +3,11 @@ import type { ClawdbotPluginApi } from "../../src/plugins/types.js"; import { createLobsterTool } from "./src/lobster-tool.js"; export default function register(api: ClawdbotPluginApi) { - api.registerTool(createLobsterTool(api), { optional: true }); + api.registerTool( + (ctx) => { + if (ctx.sandboxed) return null; + return createLobsterTool(api); + }, + { optional: true }, + ); } diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index 3b1dae859..1c69b3280 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -4,7 +4,7 @@ import path from "node:path"; import { describe, expect, it } from "vitest"; -import type { ClawdbotPluginApi } from "../../../src/plugins/types.js"; +import type { ClawdbotPluginApi, ClawdbotPluginToolContext } from "../../../src/plugins/types.js"; import { createLobsterTool } from "./lobster-tool.js"; async function writeFakeLobster(params: { @@ -39,6 +39,20 @@ function fakeApi(): ClawdbotPluginApi { }; } +function fakeCtx(overrides: Partial = {}): ClawdbotPluginToolContext { + return { + config: {} as any, + workspaceDir: "/tmp", + agentDir: "/tmp", + agentId: "main", + sessionKey: "main", + messageChannel: undefined, + agentAccountId: undefined, + sandboxed: false, + ...overrides, + }; +} + describe("lobster plugin tool", () => { it("runs lobster and returns parsed envelope in details", async () => { const fake = await writeFakeLobster({ @@ -84,4 +98,15 @@ describe("lobster plugin tool", () => { }), ).rejects.toThrow(/invalid JSON/); }); + + it("can be gated off in sandboxed contexts", async () => { + const api = fakeApi(); + const factoryTool = (ctx: ClawdbotPluginToolContext) => { + if (ctx.sandboxed) return null; + return createLobsterTool(api); + }; + + expect(factoryTool(fakeCtx({ sandboxed: true }))).toBeNull(); + expect(factoryTool(fakeCtx({ sandboxed: false }))?.name).toBe("lobster"); + }); }); From 032c780a797e0e226cc622533c14f7fc731aeeda Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Sun, 18 Jan 2026 11:07:47 -0800 Subject: [PATCH 4/5] Add lobster.md documentation --- docs/tools/lobster.md | 109 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 docs/tools/lobster.md diff --git a/docs/tools/lobster.md b/docs/tools/lobster.md new file mode 100644 index 000000000..bb3f4d130 --- /dev/null +++ b/docs/tools/lobster.md @@ -0,0 +1,109 @@ +--- +title: Lobster +description: Typed workflow runtime for Clawdbot — composable pipelines with approval gates. +--- + +# Lobster + +Lobster is a workflow shell that lets Clawdbot run multi-step tool sequences as a single, deterministic operation with explicit approval checkpoints. + +## Why + +Today, complex workflows require many back-and-forth tool calls. Each call costs tokens, and the LLM has to orchestrate every step. Lobster moves that orchestration into a typed runtime: + +- **One call instead of many**: Clawdbot calls `lobster.run(...)` once and gets a structured result. +- **Approvals built in**: Side effects (send email, post comment) halt the workflow until explicitly approved. +- **Resumable**: Halted workflows return a token; approve and resume without re-running everything. + +## Example: Email triage + +Without Lobster: +``` +User: "Check my email and draft replies" +→ clawd calls gmail.list +→ LLM summarizes +→ User: "draft replies to #2 and #5" +→ LLM drafts +→ User: "send #2" +→ clawd calls gmail.send +(repeat daily, no memory of what was triaged) +``` + +With Lobster: +``` +clawd calls: lobster.run("email.triage --limit 20") + +Returns: +{ + "status": "needs_approval", + "output": { + "summary": "5 need replies, 2 need action", + "drafts": [...] + }, + "requiresApproval": { + "prompt": "Send 2 draft replies?", + "resumeToken": "..." + } +} + +User approves → clawd calls: lobster.resume(token, approve: true) +→ Emails sent +``` + +One workflow. Deterministic. Safe. + +## Enable + +Lobster is an **optional** plugin tool. Enable it in your agent config: + +```json +{ + "agents": { + "list": [{ + "id": "main", + "tools": { + "allow": ["lobster"] + } + }] + } +} +``` + +You also need the `lobster` CLI installed locally. + +## Actions + +### `run` + +Execute a Lobster pipeline in tool mode. + +```json +{ + "action": "run", + "pipeline": "gog.gmail.search --query 'newer_than:1d' | email.triage", + "timeoutMs": 30000 +} +``` + +### `resume` + +Continue a halted workflow after approval. + +```json +{ + "action": "resume", + "token": "", + "approve": true +} +``` + +## Security + +- **Local subprocess only** — no network calls from the plugin itself. +- **No secrets** — Lobster doesn't manage OAuth; it calls clawd tools that do. +- **Sandbox-aware** — disabled when `ctx.sandboxed` is true. +- **Hardened** — `lobsterPath` must be absolute if specified; timeouts and output caps enforced. + +## Learn more + +- [Lobster repo](https://github.com/vignesh07/lobster) — runtime, commands, and workflow examples. From 9497ffcc5050d5a636c83c57a183598f12076d9d Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Sun, 18 Jan 2026 12:11:25 -0800 Subject: [PATCH 5/5] Add SKILL.md to teach Clawdbot when/how to use Lobster --- extensions/lobster/SKILL.md | 90 +++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 extensions/lobster/SKILL.md diff --git a/extensions/lobster/SKILL.md b/extensions/lobster/SKILL.md new file mode 100644 index 000000000..12c4f77ec --- /dev/null +++ b/extensions/lobster/SKILL.md @@ -0,0 +1,90 @@ +# Lobster + +Lobster executes multi-step workflows with approval checkpoints. Use it when: + +- User wants a repeatable automation (triage, monitor, sync) +- Actions need human approval before executing (send, post, delete) +- Multiple tool calls should run as one deterministic operation + +## When to use Lobster + +| User intent | Use Lobster? | +|-------------|--------------| +| "Triage my email" | Yes — multi-step, may send replies | +| "Send a message" | No — single action, use message tool directly | +| "Check my email every morning and ask before replying" | Yes — scheduled workflow with approval | +| "What's the weather?" | No — simple query | +| "Monitor this PR and notify me of changes" | Yes — stateful, recurring | + +## Basic usage + +### Run a pipeline + +```json +{ + "action": "run", + "pipeline": "gog.gmail.search --query 'newer_than:1d' --max 20 | email.triage" +} +``` + +Returns structured result: +```json +{ + "protocolVersion": 1, + "ok": true, + "status": "ok", + "output": [{ "summary": {...}, "items": [...] }], + "requiresApproval": null +} +``` + +### Handle approval + +If the workflow needs approval: +```json +{ + "status": "needs_approval", + "output": [], + "requiresApproval": { + "prompt": "Send 3 draft replies?", + "items": [...], + "resumeToken": "..." + } +} +``` + +Present the prompt to the user. If they approve: +```json +{ + "action": "resume", + "token": "", + "approve": true +} +``` + +## Example workflows + +### Email triage +``` +gog.gmail.search --query 'newer_than:1d' --max 20 | email.triage +``` +Fetches recent emails, classifies into buckets (needs_reply, needs_action, fyi). + +### Email triage with approval gate +``` +gog.gmail.search --query 'newer_than:1d' | email.triage | approve --prompt 'Process these?' +``` +Same as above, but halts for approval before returning. + +## Key behaviors + +- **Deterministic**: Same input → same output (no LLM variance in pipeline execution) +- **Approval gates**: `approve` command halts execution, returns token +- **Resumable**: Use `resume` action with token to continue +- **Structured output**: Always returns JSON envelope with `protocolVersion` + +## Don't use Lobster for + +- Simple single-action requests (just use the tool directly) +- Queries that need LLM interpretation mid-flow +- One-off tasks that won't be repeated