chore: make pi-only rpc with fixed sessions

This commit is contained in:
Peter Steinberger
2025-12-05 17:50:02 +00:00
parent b3e50cbb33
commit fcf0c28132
33 changed files with 217 additions and 1565 deletions

View File

@@ -1,4 +1,5 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { type AgentKind, getAgentSpec } from "../agents/index.js";
@@ -203,75 +204,6 @@ function normalizeToolResults(
.filter((tr) => tr.text.length > 0);
}
export function summarizeClaudeMetadata(payload: unknown): string | undefined {
if (!payload || typeof payload !== "object") return undefined;
const obj = payload as Record<string, unknown>;
const parts: string[] = [];
if (typeof obj.duration_ms === "number") {
parts.push(`duration=${obj.duration_ms}ms`);
}
if (typeof obj.duration_api_ms === "number") {
parts.push(`api=${obj.duration_api_ms}ms`);
}
if (typeof obj.num_turns === "number") {
parts.push(`turns=${obj.num_turns}`);
}
if (typeof obj.total_cost_usd === "number") {
parts.push(`cost=$${obj.total_cost_usd.toFixed(4)}`);
}
const usage = obj.usage;
if (usage && typeof usage === "object") {
const serverToolUse = (
usage as { server_tool_use?: Record<string, unknown> }
).server_tool_use;
if (serverToolUse && typeof serverToolUse === "object") {
const toolCalls = Object.values(serverToolUse).reduce<number>(
(sum, val) => {
if (typeof val === "number") return sum + val;
return sum;
},
0,
);
if (toolCalls > 0) parts.push(`tool_calls=${toolCalls}`);
}
}
const modelUsage = obj.modelUsage;
if (modelUsage && typeof modelUsage === "object") {
const models = Object.keys(modelUsage as Record<string, unknown>);
if (models.length) {
const display =
models.length > 2
? `${models.slice(0, 2).join(",")}+${models.length - 2}`
: models.join(",");
parts.push(`models=${display}`);
}
}
return parts.length ? parts.join(", ") : undefined;
}
function appendThinkingCue(body: string, level?: ThinkLevel): string {
if (!level || level === "off") return body;
const cue = (() => {
switch (level) {
case "high":
return "ultrathink";
case "medium":
return "think harder";
case "low":
return "think hard";
case "minimal":
return "think";
default:
return "";
}
})();
return [body.trim(), cue].filter(Boolean).join(" ");
}
export async function runCommandReply(
params: CommandReplyParams,
): Promise<CommandReplyResult> {
@@ -300,11 +232,11 @@ export async function runCommandReply(
if (!reply.command?.length) {
throw new Error("reply.command is required for mode=command");
}
const agentCfg = reply.agent ?? { kind: "claude" };
const agentKind: AgentKind = agentCfg.kind ?? "claude";
const agentCfg = reply.agent ?? { kind: "pi" };
const agentKind: AgentKind = agentCfg.kind ?? "pi";
const agent = getAgentSpec(agentKind);
let argv = reply.command.map((part) => applyTemplate(part, templatingCtx));
const isAgentInvocation = agent.isInvocation(argv);
const templatePrefix =
reply.template && (!sendSystemOnce || isFirstTurnInSession || !systemSent)
? applyTemplate(reply.template, templatingCtx)
@@ -318,23 +250,12 @@ export async function runCommandReply(
// Session args prepared (templated) and injected generically
if (reply.session) {
const defaultSessionArgs = (() => {
switch (agentCfg.kind) {
case "claude":
return {
newArgs: ["--session-id", "{{SessionId}}"],
resumeArgs: ["--resume", "{{SessionId}}"],
};
case "gemini":
// Gemini CLI supports --resume <id>; starting a new session needs no flag.
return { newArgs: [], resumeArgs: ["--resume", "{{SessionId}}"] };
default:
return {
newArgs: ["--session", "{{SessionId}}"],
resumeArgs: ["--session", "{{SessionId}}"],
};
}
})();
const defaultSessionDir = path.join(os.homedir(), ".clawdis", "sessions");
const sessionPath = path.join(defaultSessionDir, "{{SessionId}}.jsonl");
const defaultSessionArgs = {
newArgs: ["--session", sessionPath],
resumeArgs: ["--session", sessionPath],
};
const defaultNew = defaultSessionArgs.newArgs;
const defaultResume = defaultSessionArgs.resumeArgs;
const sessionArgList = (
@@ -343,10 +264,24 @@ export async function runCommandReply(
: (reply.session.sessionArgResume ?? defaultResume)
).map((p) => applyTemplate(p, templatingCtx));
// If we are writing session files, ensure the directory exists.
const sessionFlagIndex = sessionArgList.indexOf("--session");
const sessionPathArg =
sessionFlagIndex >= 0 ? sessionArgList[sessionFlagIndex + 1] : undefined;
if (sessionPathArg && !sessionPathArg.includes("://")) {
const dir = path.dirname(sessionPathArg);
try {
await fs.mkdir(dir, { recursive: true });
} catch {
// best-effort
}
}
// Tau (pi agent) needs --continue to reload prior messages when resuming.
// Without it, pi starts from a blank state even though we pass the session file path.
if (
agentKind === "pi" &&
isAgentInvocation &&
!isNewSession &&
!sessionArgList.includes("--continue")
) {
@@ -366,25 +301,21 @@ export async function runCommandReply(
}
}
if (thinkLevel && thinkLevel !== "off") {
if (agentKind === "pi") {
const hasThinkingFlag = argv.some(
(p, i) =>
p === "--thinking" ||
(i > 0 && argv[i - 1] === "--thinking") ||
p.startsWith("--thinking="),
);
if (!hasThinkingFlag) {
argv.splice(bodyIndex, 0, "--thinking", thinkLevel);
bodyIndex += 2;
}
} else if (argv[bodyIndex]) {
argv[bodyIndex] = appendThinkingCue(argv[bodyIndex] ?? "", thinkLevel);
const shouldApplyAgent = isAgentInvocation;
if (shouldApplyAgent && thinkLevel && thinkLevel !== "off") {
const hasThinkingFlag = argv.some(
(p, i) =>
p === "--thinking" ||
(i > 0 && argv[i - 1] === "--thinking") ||
p.startsWith("--thinking="),
);
if (!hasThinkingFlag) {
argv.splice(bodyIndex, 0, "--thinking", thinkLevel);
bodyIndex += 2;
}
}
const shouldApplyAgent = agent.isInvocation(argv);
let finalArgv = shouldApplyAgent
const finalArgv = shouldApplyAgent
? agent.buildArgs({
argv,
bodyIndex,
@@ -397,22 +328,6 @@ export async function runCommandReply(
})
: argv;
// For pi/tau: prefer RPC mode so auto-compaction and streaming events run server-side.
let rpcInput: string | undefined;
if (agentKind === "pi") {
const bodyArg = finalArgv[bodyIndex] ?? templatingCtx.Body ?? "";
rpcInput = JSON.stringify({ type: "prompt", message: bodyArg }) + "\n";
// Remove body argument (RPC expects stdin JSON instead of positional message)
finalArgv = finalArgv.filter((_, idx) => idx !== bodyIndex);
// Force --mode rpc
const modeIdx = finalArgv.findIndex((v) => v === "--mode");
if (modeIdx >= 0 && finalArgv[modeIdx + 1]) {
finalArgv[modeIdx + 1] = "rpc";
} else {
finalArgv.push("--mode", "rpc");
}
}
logVerbose(
`Running command auto-reply: ${finalArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`,
);
@@ -475,7 +390,7 @@ export async function runCommandReply(
const run = async () => {
// Prefer long-lived tau RPC for pi agent to avoid cold starts.
if (agentKind === "pi") {
if (agentKind === "pi" && shouldApplyAgent) {
const promptIndex = finalArgv.length - 1;
const body = finalArgv[promptIndex] ?? "";
// Build rpc args without the prompt body; force --mode rpc.
@@ -601,7 +516,6 @@ export async function runCommandReply(
return await commandRunner(finalArgv, {
timeoutMs,
cwd: reply.cwd,
input: rpcInput,
});
};
@@ -640,13 +554,16 @@ export async function runCommandReply(
);
};
const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
const parserProvided = !!parsed;
const parsed =
shouldApplyAgent && trimmed ? agent.parseOutput(trimmed) : undefined;
const _parserProvided = shouldApplyAgent && !!parsed;
// Collect assistant texts and tool results from parseOutput (tau RPC can emit many).
const parsedTexts =
parsed?.texts?.map((t) => t.trim()).filter(Boolean) ?? [];
const parsedToolResults = normalizeToolResults(parsed?.toolResults);
const hasParsedContent =
parsedTexts.length > 0 || parsedToolResults.length > 0;
type ReplyItem = { text: string; media?: string[] };
const replyItems: ReplyItem[] = [];
@@ -716,7 +633,7 @@ export async function runCommandReply(
}
// If parser gave nothing, fall back to raw stdout as a single message.
if (replyItems.length === 0 && trimmed && !parserProvided) {
if (replyItems.length === 0 && trimmed && !hasParsedContent) {
const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(trimmed);
if (cleanedText || mediaFound?.length) {