feat: embed pi agent runtime

This commit is contained in:
Peter Steinberger
2025-12-17 11:29:04 +01:00
parent c5867b2876
commit fece42ce0a
42 changed files with 2076 additions and 4009 deletions

View File

@@ -8,17 +8,28 @@ import type { CliDeps } from "../cli/deps.js";
import type { ClawdisConfig } from "../config/config.js";
import type { CronJob } from "./types.js";
vi.mock("../auto-reply/command-reply.js", () => ({
runCommandReply: vi.fn(),
vi.mock("../agents/pi-embedded.js", () => ({
runEmbeddedPiAgent: vi.fn(),
}));
import { runCommandReply } from "../auto-reply/command-reply.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
async function makeSessionStorePath() {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-cron-sessions-"),
);
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
const base = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-"));
const previousHome = process.env.HOME;
process.env.HOME = base;
try {
return await fn(base);
} finally {
process.env.HOME = previousHome;
await fs.rm(base, { recursive: true, force: true });
}
}
async function writeSessionStore(home: string) {
const dir = path.join(home, ".clawdis", "sessions");
await fs.mkdir(dir, { recursive: true });
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(
storePath,
@@ -34,26 +45,17 @@ async function makeSessionStorePath() {
null,
2,
),
"utf-8",
);
return {
storePath,
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
return storePath;
}
function makeCfg(storePath: string): ClawdisConfig {
function makeCfg(home: string, storePath: string): ClawdisConfig {
return {
inbound: {
reply: {
mode: "command",
command: ["echo", "ok"],
session: {
store: storePath,
mainKey: "main",
},
},
workspace: path.join(home, "clawd"),
agent: { provider: "anthropic", model: "claude-opus-4-5" },
session: { store: storePath, mainKey: "main" },
},
} as ClawdisConfig;
}
@@ -76,122 +78,138 @@ function makeJob(payload: CronJob["payload"]): CronJob {
describe("runCronIsolatedAgentTurn", () => {
beforeEach(() => {
vi.mocked(runCommandReply).mockReset();
vi.mocked(runEmbeddedPiAgent).mockReset();
});
it("uses last non-empty agent text as summary", async () => {
const sessions = await makeSessionStorePath();
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runCommandReply).mockResolvedValue({
payloads: [{ text: "first" }, { text: " " }, { text: " last " }],
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "first" }, { text: " " }, { text: " last " }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(res.summary).toBe("last");
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(sessions.storePath),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(res.summary).toBe("last");
await sessions.cleanup();
});
it("truncates long summaries", async () => {
const sessions = await makeSessionStorePath();
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
const long = "a".repeat(2001);
vi.mocked(runCommandReply).mockResolvedValue({
payloads: [{ text: long }],
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
const long = "a".repeat(2001);
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: long }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(String(res.summary ?? "")).toMatch(/…$/);
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(sessions.storePath),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(String(res.summary ?? "")).toMatch(/…$/);
await sessions.cleanup();
});
it("fails delivery without a WhatsApp recipient when bestEffortDeliver=false", async () => {
const sessions = await makeSessionStorePath();
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runCommandReply).mockResolvedValue({
payloads: [{ text: "hello" }],
});
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "hello" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(sessions.storePath),
deps,
job: makeJob({
kind: "agentTurn",
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: makeJob({
kind: "agentTurn",
message: "do it",
deliver: true,
channel: "whatsapp",
bestEffortDeliver: false,
}),
message: "do it",
deliver: true,
channel: "whatsapp",
bestEffortDeliver: false,
}),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("error");
expect(res.summary).toBe("hello");
expect(String(res.error ?? "")).toMatch(/requires a recipient/i);
expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled();
});
expect(res.status).toBe("error");
expect(res.summary).toBe("hello");
expect(String(res.error ?? "")).toMatch(/requires a recipient/i);
expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled();
await sessions.cleanup();
});
it("skips delivery without a WhatsApp recipient when bestEffortDeliver=true", async () => {
const sessions = await makeSessionStorePath();
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runCommandReply).mockResolvedValue({
payloads: [{ text: "hello" }],
});
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "hello" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(sessions.storePath),
deps,
job: makeJob({
kind: "agentTurn",
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: makeJob({
kind: "agentTurn",
message: "do it",
deliver: true,
channel: "whatsapp",
bestEffortDeliver: true,
}),
message: "do it",
deliver: true,
channel: "whatsapp",
bestEffortDeliver: true,
}),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("skipped");
expect(String(res.summary ?? "")).toMatch(/delivery skipped/i);
expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled();
});
expect(res.status).toBe("skipped");
expect(String(res.summary ?? "")).toMatch(/delivery skipped/i);
expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled();
await sessions.cleanup();
});
});

View File

@@ -1,22 +1,27 @@
import crypto from "node:crypto";
import { chunkText } from "../auto-reply/chunk.js";
import { runCommandReply } from "../auto-reply/command-reply.js";
import { lookupContextTokens } from "../agents/context.js";
import {
applyTemplate,
type TemplateContext,
} from "../auto-reply/templating.js";
DEFAULT_CONTEXT_TOKENS,
DEFAULT_MODEL,
DEFAULT_PROVIDER,
} from "../agents/defaults.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import {
DEFAULT_AGENT_WORKSPACE_DIR,
ensureAgentWorkspace,
} from "../agents/workspace.js";
import { chunkText } from "../auto-reply/chunk.js";
import { normalizeThinkLevel } from "../auto-reply/thinking.js";
import type { CliDeps } from "../cli/deps.js";
import type { ClawdisConfig } from "../config/config.js";
import {
DEFAULT_IDLE_MINUTES,
loadSessionStore,
resolveSessionTranscriptPath,
resolveStorePath,
type SessionEntry,
saveSessionStore,
} from "../config/sessions.js";
import { enqueueCommandInLane } from "../process/command-queue.js";
import { normalizeE164 } from "../utils.js";
import type { CronJob } from "./types.js";
@@ -26,21 +31,6 @@ export type RunCronAgentTurnResult = {
error?: string;
};
function assertCommandReplyConfig(cfg: ClawdisConfig) {
const reply = cfg.inbound?.reply;
if (!reply || reply.mode !== "command" || !reply.command?.length) {
throw new Error(
"Configure inbound.reply.mode=command with reply.command before using cron agent jobs.",
);
}
return reply as NonNullable<
NonNullable<ClawdisConfig["inbound"]>["reply"]
> & {
mode: "command";
command: string[];
};
}
function pickSummaryFromOutput(text: string | undefined) {
const clean = (text ?? "").trim();
if (!clean) return undefined;
@@ -72,7 +62,7 @@ function resolveDeliveryTarget(
? jobPayload.to.trim()
: undefined;
const sessionCfg = cfg.inbound?.reply?.session;
const sessionCfg = cfg.inbound?.session;
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const storePath = resolveStorePath(sessionCfg?.store);
const store = loadSessionStore(storePath);
@@ -120,7 +110,7 @@ function resolveCronSession(params: {
sessionKey: string;
nowMs: number;
}) {
const sessionCfg = params.cfg.inbound?.reply?.session;
const sessionCfg = params.cfg.inbound?.session;
const idleMinutes = Math.max(
sessionCfg?.idleMinutes ?? DEFAULT_IDLE_MINUTES,
1,
@@ -155,28 +145,28 @@ export async function runCronIsolatedAgentTurn(params: {
sessionKey: string;
lane?: string;
}): Promise<RunCronAgentTurnResult> {
const replyCfg = assertCommandReplyConfig(params.cfg);
const agentCfg = params.cfg.inbound?.agent;
void params.lane;
const workspaceDirRaw =
params.cfg.inbound?.workspace ?? DEFAULT_AGENT_WORKSPACE_DIR;
const workspace = await ensureAgentWorkspace({
dir: workspaceDirRaw,
ensureBootstrapFiles: true,
});
const workspaceDir = workspace.dir;
const provider = agentCfg?.provider?.trim() || DEFAULT_PROVIDER;
const model = agentCfg?.model?.trim() || DEFAULT_MODEL;
const now = Date.now();
const cronSession = resolveCronSession({
cfg: params.cfg,
sessionKey: params.sessionKey,
nowMs: now,
});
const sendSystemOnce = replyCfg.session?.sendSystemOnce === true;
const isFirstTurnInSession =
cronSession.isNewSession || !cronSession.systemSent;
const sessionIntro = replyCfg.session?.sessionIntro
? applyTemplate(replyCfg.session.sessionIntro, {
SessionId: cronSession.sessionEntry.sessionId,
})
: "";
const bodyPrefix = replyCfg.bodyPrefix
? applyTemplate(replyCfg.bodyPrefix, {
SessionId: cronSession.sessionEntry.sessionId,
})
: "";
const thinkOverride = normalizeThinkLevel(replyCfg.thinkingDefault);
const thinkOverride = normalizeThinkLevel(agentCfg?.thinkingDefault);
const jobThink = normalizeThinkLevel(
(params.job.payload.kind === "agentTurn"
? params.job.payload.thinking
@@ -187,7 +177,7 @@ export async function runCronIsolatedAgentTurn(params: {
const timeoutSecondsRaw =
params.job.payload.kind === "agentTurn" && params.job.payload.timeoutSeconds
? params.job.payload.timeoutSeconds
: (replyCfg.timeoutSeconds ?? 600);
: (agentCfg?.timeoutSeconds ?? 600);
const timeoutSeconds = Math.max(Math.floor(timeoutSecondsRaw), 1);
const timeoutMs = timeoutSeconds * 1000;
@@ -212,26 +202,10 @@ export async function runCronIsolatedAgentTurn(params: {
const base =
`[cron:${params.job.id}${params.job.name ? ` ${params.job.name}` : ""}] ${params.message}`.trim();
let commandBody = base;
if (!sendSystemOnce || isFirstTurnInSession) {
commandBody = bodyPrefix ? `${bodyPrefix}${commandBody}` : commandBody;
}
if (sessionIntro) {
commandBody = `${sessionIntro}\n\n${commandBody}`;
}
const templatingCtx: TemplateContext = {
Body: commandBody,
BodyStripped: commandBody,
SessionId: cronSession.sessionEntry.sessionId,
From: resolvedDelivery.to ?? "",
To: resolvedDelivery.to ?? "",
Surface: "Cron",
IsNewSession: cronSession.isNewSession ? "true" : "false",
};
const commandBody = base;
// Persist systemSent before the run, mirroring the inbound auto-reply behavior.
if (sendSystemOnce && isFirstTurnInSession) {
if (isFirstTurnInSession) {
cronSession.sessionEntry.systemSent = true;
cronSession.store[params.sessionKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
@@ -240,21 +214,23 @@ export async function runCronIsolatedAgentTurn(params: {
await saveSessionStore(cronSession.storePath, cronSession.store);
}
const lane = params.lane?.trim() || "cron";
let runResult: Awaited<ReturnType<typeof runCommandReply>>;
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
try {
runResult = await runCommandReply({
reply: { ...replyCfg, mode: "command" },
templatingCtx,
sendSystemOnce,
isNewSession: cronSession.isNewSession,
isFirstTurnInSession,
systemSent: cronSession.sessionEntry.systemSent ?? false,
timeoutMs,
timeoutSeconds,
const sessionFile = resolveSessionTranscriptPath(
cronSession.sessionEntry.sessionId,
);
runResult = await runEmbeddedPiAgent({
sessionId: cronSession.sessionEntry.sessionId,
sessionFile,
workspaceDir,
prompt: commandBody,
provider,
model,
thinkLevel,
enqueue: (task, opts) => enqueueCommandInLane(lane, task, opts),
verboseLevel:
(cronSession.sessionEntry.verboseLevel as "on" | "off" | undefined) ??
(agentCfg?.verboseDefault as "on" | "off" | undefined),
timeoutMs,
runId: cronSession.sessionEntry.sessionId,
});
} catch (err) {
@@ -262,6 +238,31 @@ export async function runCronIsolatedAgentTurn(params: {
}
const payloads = runResult.payloads ?? [];
// Update token+model fields in the session store.
{
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? model;
const contextTokens =
agentCfg?.contextTokens ??
lookupContextTokens(modelUsed) ??
DEFAULT_CONTEXT_TOKENS;
cronSession.sessionEntry.model = modelUsed;
cronSession.sessionEntry.contextTokens = contextTokens;
if (usage) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
cronSession.sessionEntry.inputTokens = input;
cronSession.sessionEntry.outputTokens = output;
cronSession.sessionEntry.totalTokens =
promptTokens > 0 ? promptTokens : (usage.total ?? input);
}
cronSession.store[params.sessionKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
}
const firstText = payloads[0]?.text ?? "";
const summary =
pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText);