diff --git a/docs/tools/lobster.md b/docs/tools/lobster.md new file mode 100644 index 000000000..bb3f4d130 --- /dev/null +++ b/docs/tools/lobster.md @@ -0,0 +1,109 @@ +--- +title: Lobster +description: Typed workflow runtime for Clawdbot — composable pipelines with approval gates. +--- + +# Lobster + +Lobster is a workflow shell that lets Clawdbot run multi-step tool sequences as a single, deterministic operation with explicit approval checkpoints. + +## Why + +Today, complex workflows require many back-and-forth tool calls. Each call costs tokens, and the LLM has to orchestrate every step. Lobster moves that orchestration into a typed runtime: + +- **One call instead of many**: Clawdbot calls `lobster.run(...)` once and gets a structured result. +- **Approvals built in**: Side effects (send email, post comment) halt the workflow until explicitly approved. +- **Resumable**: Halted workflows return a token; approve and resume without re-running everything. + +## Example: Email triage + +Without Lobster: +``` +User: "Check my email and draft replies" +→ clawd calls gmail.list +→ LLM summarizes +→ User: "draft replies to #2 and #5" +→ LLM drafts +→ User: "send #2" +→ clawd calls gmail.send +(repeat daily, no memory of what was triaged) +``` + +With Lobster: +``` +clawd calls: lobster.run("email.triage --limit 20") + +Returns: +{ + "status": "needs_approval", + "output": { + "summary": "5 need replies, 2 need action", + "drafts": [...] + }, + "requiresApproval": { + "prompt": "Send 2 draft replies?", + "resumeToken": "..." + } +} + +User approves → clawd calls: lobster.resume(token, approve: true) +→ Emails sent +``` + +One workflow. Deterministic. Safe. + +## Enable + +Lobster is an **optional** plugin tool. Enable it in your agent config: + +```json +{ + "agents": { + "list": [{ + "id": "main", + "tools": { + "allow": ["lobster"] + } + }] + } +} +``` + +You also need the `lobster` CLI installed locally. + +## Actions + +### `run` + +Execute a Lobster pipeline in tool mode. + +```json +{ + "action": "run", + "pipeline": "gog.gmail.search --query 'newer_than:1d' | email.triage", + "timeoutMs": 30000 +} +``` + +### `resume` + +Continue a halted workflow after approval. + +```json +{ + "action": "resume", + "token": "", + "approve": true +} +``` + +## Security + +- **Local subprocess only** — no network calls from the plugin itself. +- **No secrets** — Lobster doesn't manage OAuth; it calls clawd tools that do. +- **Sandbox-aware** — disabled when `ctx.sandboxed` is true. +- **Hardened** — `lobsterPath` must be absolute if specified; timeouts and output caps enforced. + +## Learn more + +- [Lobster repo](https://github.com/vignesh07/lobster) — runtime, commands, and workflow examples. diff --git a/extensions/lobster/README.md b/extensions/lobster/README.md new file mode 100644 index 000000000..13e675b45 --- /dev/null +++ b/extensions/lobster/README.md @@ -0,0 +1,38 @@ +# Lobster (plugin) + +Adds the `lobster` agent tool as an **optional** plugin tool. + +## What this is + +- Lobster is a standalone workflow shell (typed JSON-first pipelines + approvals/resume). +- This plugin integrates Lobster with Clawdbot *without core changes*. + +## Enable + +Because this tool can trigger side effects (via workflows), it is registered with `optional: true`. + +Enable it in an agent allowlist: + +```json +{ + "agents": { + "list": [ + { + "id": "main", + "tools": { + "allow": [ + "lobster" // plugin id (enables all tools from this plugin) + ] + } + } + ] + } +} +``` + +## Security + +- Runs the `lobster` executable as a local subprocess. +- Does not manage OAuth/tokens. +- Uses timeouts, stdout caps, and strict JSON envelope parsing. +- Prefer an absolute `lobsterPath` in production to avoid PATH hijack. diff --git a/extensions/lobster/SKILL.md b/extensions/lobster/SKILL.md new file mode 100644 index 000000000..12c4f77ec --- /dev/null +++ b/extensions/lobster/SKILL.md @@ -0,0 +1,90 @@ +# Lobster + +Lobster executes multi-step workflows with approval checkpoints. Use it when: + +- User wants a repeatable automation (triage, monitor, sync) +- Actions need human approval before executing (send, post, delete) +- Multiple tool calls should run as one deterministic operation + +## When to use Lobster + +| User intent | Use Lobster? | +|-------------|--------------| +| "Triage my email" | Yes — multi-step, may send replies | +| "Send a message" | No — single action, use message tool directly | +| "Check my email every morning and ask before replying" | Yes — scheduled workflow with approval | +| "What's the weather?" | No — simple query | +| "Monitor this PR and notify me of changes" | Yes — stateful, recurring | + +## Basic usage + +### Run a pipeline + +```json +{ + "action": "run", + "pipeline": "gog.gmail.search --query 'newer_than:1d' --max 20 | email.triage" +} +``` + +Returns structured result: +```json +{ + "protocolVersion": 1, + "ok": true, + "status": "ok", + "output": [{ "summary": {...}, "items": [...] }], + "requiresApproval": null +} +``` + +### Handle approval + +If the workflow needs approval: +```json +{ + "status": "needs_approval", + "output": [], + "requiresApproval": { + "prompt": "Send 3 draft replies?", + "items": [...], + "resumeToken": "..." + } +} +``` + +Present the prompt to the user. If they approve: +```json +{ + "action": "resume", + "token": "", + "approve": true +} +``` + +## Example workflows + +### Email triage +``` +gog.gmail.search --query 'newer_than:1d' --max 20 | email.triage +``` +Fetches recent emails, classifies into buckets (needs_reply, needs_action, fyi). + +### Email triage with approval gate +``` +gog.gmail.search --query 'newer_than:1d' | email.triage | approve --prompt 'Process these?' +``` +Same as above, but halts for approval before returning. + +## Key behaviors + +- **Deterministic**: Same input → same output (no LLM variance in pipeline execution) +- **Approval gates**: `approve` command halts execution, returns token +- **Resumable**: Use `resume` action with token to continue +- **Structured output**: Always returns JSON envelope with `protocolVersion` + +## Don't use Lobster for + +- Simple single-action requests (just use the tool directly) +- Queries that need LLM interpretation mid-flow +- One-off tasks that won't be repeated diff --git a/extensions/lobster/index.ts b/extensions/lobster/index.ts new file mode 100644 index 000000000..5f459c939 --- /dev/null +++ b/extensions/lobster/index.ts @@ -0,0 +1,13 @@ +import type { ClawdbotPluginApi } from "../../src/plugins/types.js"; + +import { createLobsterTool } from "./src/lobster-tool.js"; + +export default function register(api: ClawdbotPluginApi) { + api.registerTool( + (ctx) => { + if (ctx.sandboxed) return null; + return createLobsterTool(api); + }, + { optional: true }, + ); +} diff --git a/extensions/lobster/package.json b/extensions/lobster/package.json new file mode 100644 index 000000000..606975434 --- /dev/null +++ b/extensions/lobster/package.json @@ -0,0 +1,9 @@ +{ + "name": "@clawdbot/lobster", + "version": "2026.1.17-1", + "type": "module", + "description": "Lobster workflow tool plugin (typed pipelines + resumable approvals)", + "clawdbot": { + "extensions": ["./index.ts"] + } +} diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts new file mode 100644 index 000000000..1c69b3280 --- /dev/null +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -0,0 +1,112 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import type { ClawdbotPluginApi, ClawdbotPluginToolContext } from "../../../src/plugins/types.js"; +import { createLobsterTool } from "./lobster-tool.js"; + +async function writeFakeLobster(params: { + payload: unknown; +}) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-plugin-")); + const binPath = path.join(dir, "lobster"); + + const file = `#!/usr/bin/env node\n` + + `process.stdout.write(JSON.stringify(${JSON.stringify(params.payload)}));\n`; + + await fs.writeFile(binPath, file, { encoding: "utf8", mode: 0o755 }); + return { dir, binPath }; +} + +function fakeApi(): ClawdbotPluginApi { + return { + id: "lobster", + name: "lobster", + source: "test", + config: {} as any, + runtime: { version: "test" } as any, + logger: { info() {}, warn() {}, error() {}, debug() {} }, + registerTool() {}, + registerHttpHandler() {}, + registerChannel() {}, + registerGatewayMethod() {}, + registerCli() {}, + registerService() {}, + registerProvider() {}, + resolvePath: (p) => p, + }; +} + +function fakeCtx(overrides: Partial = {}): ClawdbotPluginToolContext { + return { + config: {} as any, + workspaceDir: "/tmp", + agentDir: "/tmp", + agentId: "main", + sessionKey: "main", + messageChannel: undefined, + agentAccountId: undefined, + sandboxed: false, + ...overrides, + }; +} + +describe("lobster plugin tool", () => { + it("runs lobster and returns parsed envelope in details", async () => { + const fake = await writeFakeLobster({ + payload: { ok: true, status: "ok", output: [{ hello: "world" }], requiresApproval: null }, + }); + + const tool = createLobsterTool(fakeApi()); + const res = await tool.execute("call1", { + action: "run", + pipeline: "noop", + lobsterPath: fake.binPath, + timeoutMs: 1000, + }); + + expect(res.details).toMatchObject({ ok: true, status: "ok" }); + }); + + it("requires absolute lobsterPath when provided", async () => { + const tool = createLobsterTool(fakeApi()); + await expect( + tool.execute("call2", { + action: "run", + pipeline: "noop", + lobsterPath: "./lobster", + }), + ).rejects.toThrow(/absolute path/); + }); + + it("rejects invalid JSON from lobster", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-lobster-plugin-bad-")); + const binPath = path.join(dir, "lobster"); + await fs.writeFile(binPath, `#!/usr/bin/env node\nprocess.stdout.write('nope');\n`, { + encoding: "utf8", + mode: 0o755, + }); + + const tool = createLobsterTool(fakeApi()); + await expect( + tool.execute("call3", { + action: "run", + pipeline: "noop", + lobsterPath: binPath, + }), + ).rejects.toThrow(/invalid JSON/); + }); + + it("can be gated off in sandboxed contexts", async () => { + const api = fakeApi(); + const factoryTool = (ctx: ClawdbotPluginToolContext) => { + if (ctx.sandboxed) return null; + return createLobsterTool(api); + }; + + expect(factoryTool(fakeCtx({ sandboxed: true }))).toBeNull(); + expect(factoryTool(fakeCtx({ sandboxed: false }))?.name).toBe("lobster"); + }); +}); diff --git a/extensions/lobster/src/lobster-tool.ts b/extensions/lobster/src/lobster-tool.ts new file mode 100644 index 000000000..2d76e0821 --- /dev/null +++ b/extensions/lobster/src/lobster-tool.ts @@ -0,0 +1,185 @@ +import { Type } from "@sinclair/typebox"; +import { spawn } from "node:child_process"; +import path from "node:path"; + +import type { ClawdbotPluginApi } from "../../../src/plugins/types.js"; + +type LobsterEnvelope = + | { + ok: true; + status: "ok" | "needs_approval" | "cancelled"; + output: unknown[]; + requiresApproval: null | { + type: "approval_request"; + prompt: string; + items: unknown[]; + resumeToken?: string; + }; + } + | { + ok: false; + error: { type?: string; message: string }; + }; + +function resolveExecutablePath(lobsterPathRaw: string | undefined) { + const lobsterPath = lobsterPathRaw?.trim() || "lobster"; + if (lobsterPath !== "lobster" && !path.isAbsolute(lobsterPath)) { + throw new Error("lobsterPath must be an absolute path (or omit to use PATH)"); + } + return lobsterPath; +} + +async function runLobsterSubprocess(params: { + execPath: string; + argv: string[]; + cwd: string; + timeoutMs: number; + maxStdoutBytes: number; +}) { + const { execPath, argv, cwd } = params; + const timeoutMs = Math.max(200, params.timeoutMs); + const maxStdoutBytes = Math.max(1024, params.maxStdoutBytes); + + return await new Promise<{ stdout: string }>((resolve, reject) => { + const child = spawn(execPath, argv, { + cwd, + stdio: ["ignore", "pipe", "pipe"], + env: { + ...process.env, + LOBSTER_MODE: "tool", + }, + }); + + let stdout = ""; + let stdoutBytes = 0; + let stderr = ""; + + child.stdout?.setEncoding("utf8"); + child.stderr?.setEncoding("utf8"); + + child.stdout?.on("data", (chunk) => { + const str = String(chunk); + stdoutBytes += Buffer.byteLength(str, "utf8"); + if (stdoutBytes > maxStdoutBytes) { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster output exceeded maxStdoutBytes")); + } + return; + } + stdout += str; + }); + + child.stderr?.on("data", (chunk) => { + stderr += String(chunk); + }); + + const timer = setTimeout(() => { + try { + child.kill("SIGKILL"); + } finally { + reject(new Error("lobster subprocess timed out")); + } + }, timeoutMs); + + child.once("error", (err) => { + clearTimeout(timer); + reject(err); + }); + + child.once("exit", (code) => { + clearTimeout(timer); + if (code !== 0) { + reject(new Error(`lobster failed (${code ?? "?"}): ${stderr.trim() || stdout.trim()}`)); + return; + } + resolve({ stdout }); + }); + }); +} + +function parseEnvelope(stdout: string): LobsterEnvelope { + let parsed: unknown; + try { + parsed = JSON.parse(stdout); + } catch { + throw new Error("lobster returned invalid JSON"); + } + + if (!parsed || typeof parsed !== "object") { + throw new Error("lobster returned invalid JSON envelope"); + } + + const ok = (parsed as { ok?: unknown }).ok; + if (ok === true || ok === false) { + return parsed as LobsterEnvelope; + } + + throw new Error("lobster returned invalid JSON envelope"); +} + +export function createLobsterTool(api: ClawdbotPluginApi) { + return { + name: "lobster", + description: + "Run Lobster pipelines as a local-first workflow runtime (typed JSON envelope + resumable approvals).", + parameters: Type.Object({ + // NOTE: Prefer string enums in tool schemas; some providers reject unions/anyOf. + action: Type.Unsafe<"run" | "resume">({ type: "string", enum: ["run", "resume"] }), + pipeline: Type.Optional(Type.String()), + token: Type.Optional(Type.String()), + approve: Type.Optional(Type.Boolean()), + lobsterPath: Type.Optional(Type.String()), + cwd: Type.Optional(Type.String()), + timeoutMs: Type.Optional(Type.Number()), + maxStdoutBytes: Type.Optional(Type.Number()), + }), + async execute(_id: string, params: Record) { + const action = String(params.action || "").trim(); + if (!action) throw new Error("action required"); + + const execPath = resolveExecutablePath( + typeof params.lobsterPath === "string" ? params.lobsterPath : undefined, + ); + const cwd = typeof params.cwd === "string" && params.cwd.trim() ? params.cwd.trim() : process.cwd(); + const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : 20_000; + const maxStdoutBytes = typeof params.maxStdoutBytes === "number" ? params.maxStdoutBytes : 512_000; + + const argv = (() => { + if (action === "run") { + const pipeline = typeof params.pipeline === "string" ? params.pipeline : ""; + if (!pipeline.trim()) throw new Error("pipeline required"); + return ["run", "--mode", "tool", pipeline]; + } + if (action === "resume") { + const token = typeof params.token === "string" ? params.token : ""; + if (!token.trim()) throw new Error("token required"); + const approve = params.approve; + if (typeof approve !== "boolean") throw new Error("approve required"); + return ["resume", "--token", token, "--approve", approve ? "yes" : "no"]; + } + throw new Error(`Unknown action: ${action}`); + })(); + + if (api.runtime?.version && api.logger?.debug) { + api.logger.debug(`lobster plugin runtime=${api.runtime.version}`); + } + + const { stdout } = await runLobsterSubprocess({ + execPath, + argv, + cwd, + timeoutMs, + maxStdoutBytes, + }); + + const envelope = parseEnvelope(stdout); + + return { + content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }], + details: envelope, + }; + }, + }; +}