452 lines
15 KiB
TypeScript
452 lines
15 KiB
TypeScript
import crypto from "node:crypto";
|
|
import fs from "node:fs/promises";
|
|
import os from "node:os";
|
|
import path from "node:path";
|
|
|
|
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
|
import type { ImageContent } from "@mariozechner/pi-ai";
|
|
import type { ThinkLevel } from "../../auto-reply/thinking.js";
|
|
import type { ClawdbotConfig } from "../../config/config.js";
|
|
import type { CliBackendConfig } from "../../config/types.js";
|
|
import { runExec } from "../../process/exec.js";
|
|
import type { EmbeddedContextFile } from "../pi-embedded-helpers.js";
|
|
import { buildSystemPromptParams } from "../system-prompt-params.js";
|
|
import { resolveDefaultModelForAgent } from "../model-selection.js";
|
|
import { buildAgentSystemPrompt } from "../system-prompt.js";
|
|
import { buildTtsSystemPromptHint } from "../../tts/tts.js";
|
|
|
|
const CLI_RUN_QUEUE = new Map<string, Promise<unknown>>();
|
|
|
|
function escapeRegex(value: string): string {
|
|
return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
|
|
}
|
|
|
|
export async function cleanupResumeProcesses(
|
|
backend: CliBackendConfig,
|
|
sessionId: string,
|
|
): Promise<void> {
|
|
if (process.platform === "win32") return;
|
|
const resumeArgs = backend.resumeArgs ?? [];
|
|
if (resumeArgs.length === 0) return;
|
|
if (!resumeArgs.some((arg) => arg.includes("{sessionId}"))) return;
|
|
const commandToken = path.basename(backend.command ?? "").trim();
|
|
if (!commandToken) return;
|
|
|
|
const resumeTokens = resumeArgs.map((arg) => arg.replaceAll("{sessionId}", sessionId));
|
|
const pattern = [commandToken, ...resumeTokens]
|
|
.filter(Boolean)
|
|
.map((token) => escapeRegex(token))
|
|
.join(".*");
|
|
if (!pattern) return;
|
|
|
|
try {
|
|
await runExec("pkill", ["-f", pattern]);
|
|
} catch {
|
|
// ignore missing pkill or no matches
|
|
}
|
|
}
|
|
|
|
function buildSessionMatchers(backend: CliBackendConfig): RegExp[] {
|
|
const commandToken = path.basename(backend.command ?? "").trim();
|
|
if (!commandToken) return [];
|
|
const matchers: RegExp[] = [];
|
|
const sessionArg = backend.sessionArg?.trim();
|
|
const sessionArgs = backend.sessionArgs ?? [];
|
|
const resumeArgs = backend.resumeArgs ?? [];
|
|
|
|
const addMatcher = (args: string[]) => {
|
|
if (args.length === 0) return;
|
|
const tokens = [commandToken, ...args];
|
|
const pattern = tokens
|
|
.map((token, index) => {
|
|
const tokenPattern = tokenToRegex(token);
|
|
return index === 0 ? `(?:^|\\s)${tokenPattern}` : `\\s+${tokenPattern}`;
|
|
})
|
|
.join("");
|
|
matchers.push(new RegExp(pattern));
|
|
};
|
|
|
|
if (sessionArgs.some((arg) => arg.includes("{sessionId}"))) {
|
|
addMatcher(sessionArgs);
|
|
} else if (sessionArg) {
|
|
addMatcher([sessionArg, "{sessionId}"]);
|
|
}
|
|
|
|
if (resumeArgs.some((arg) => arg.includes("{sessionId}"))) {
|
|
addMatcher(resumeArgs);
|
|
}
|
|
|
|
return matchers;
|
|
}
|
|
|
|
function tokenToRegex(token: string): string {
|
|
if (!token.includes("{sessionId}")) return escapeRegex(token);
|
|
const parts = token.split("{sessionId}").map((part) => escapeRegex(part));
|
|
return parts.join("\\S+");
|
|
}
|
|
|
|
/**
|
|
* Cleanup suspended Clawdbot CLI processes that have accumulated.
|
|
* Only cleans up if there are more than the threshold (default: 10).
|
|
*/
|
|
export async function cleanupSuspendedCliProcesses(
|
|
backend: CliBackendConfig,
|
|
threshold = 10,
|
|
): Promise<void> {
|
|
if (process.platform === "win32") return;
|
|
const matchers = buildSessionMatchers(backend);
|
|
if (matchers.length === 0) return;
|
|
|
|
try {
|
|
const { stdout } = await runExec("ps", ["-ax", "-o", "pid=,stat=,command="]);
|
|
const suspended: number[] = [];
|
|
for (const line of stdout.split("\n")) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed) continue;
|
|
const match = /^(\d+)\s+(\S+)\s+(.*)$/.exec(trimmed);
|
|
if (!match) continue;
|
|
const pid = Number(match[1]);
|
|
const stat = match[2] ?? "";
|
|
const command = match[3] ?? "";
|
|
if (!Number.isFinite(pid)) continue;
|
|
if (!stat.includes("T")) continue;
|
|
if (!matchers.some((matcher) => matcher.test(command))) continue;
|
|
suspended.push(pid);
|
|
}
|
|
|
|
if (suspended.length > threshold) {
|
|
// Verified locally: stopped (T) processes ignore SIGTERM, so use SIGKILL.
|
|
await runExec("kill", ["-9", ...suspended.map((pid) => String(pid))]);
|
|
}
|
|
} catch {
|
|
// ignore errors - best effort cleanup
|
|
}
|
|
}
|
|
export function enqueueCliRun<T>(key: string, task: () => Promise<T>): Promise<T> {
|
|
const prior = CLI_RUN_QUEUE.get(key) ?? Promise.resolve();
|
|
const chained = prior.catch(() => undefined).then(task);
|
|
const tracked = chained.finally(() => {
|
|
if (CLI_RUN_QUEUE.get(key) === tracked) {
|
|
CLI_RUN_QUEUE.delete(key);
|
|
}
|
|
});
|
|
CLI_RUN_QUEUE.set(key, tracked);
|
|
return chained;
|
|
}
|
|
|
|
type CliUsage = {
|
|
input?: number;
|
|
output?: number;
|
|
cacheRead?: number;
|
|
cacheWrite?: number;
|
|
total?: number;
|
|
};
|
|
|
|
export type CliOutput = {
|
|
text: string;
|
|
sessionId?: string;
|
|
usage?: CliUsage;
|
|
};
|
|
|
|
function buildModelAliasLines(cfg?: ClawdbotConfig) {
|
|
const models = cfg?.agents?.defaults?.models ?? {};
|
|
const entries: Array<{ alias: string; model: string }> = [];
|
|
for (const [keyRaw, entryRaw] of Object.entries(models)) {
|
|
const model = String(keyRaw ?? "").trim();
|
|
if (!model) continue;
|
|
const alias = String((entryRaw as { alias?: string } | undefined)?.alias ?? "").trim();
|
|
if (!alias) continue;
|
|
entries.push({ alias, model });
|
|
}
|
|
return entries
|
|
.sort((a, b) => a.alias.localeCompare(b.alias))
|
|
.map((entry) => `- ${entry.alias}: ${entry.model}`);
|
|
}
|
|
|
|
export function buildSystemPrompt(params: {
|
|
workspaceDir: string;
|
|
config?: ClawdbotConfig;
|
|
defaultThinkLevel?: ThinkLevel;
|
|
extraSystemPrompt?: string;
|
|
ownerNumbers?: string[];
|
|
heartbeatPrompt?: string;
|
|
docsPath?: string;
|
|
tools: AgentTool[];
|
|
contextFiles?: EmbeddedContextFile[];
|
|
modelDisplay: string;
|
|
agentId?: string;
|
|
}) {
|
|
const defaultModelRef = resolveDefaultModelForAgent({
|
|
cfg: params.config ?? {},
|
|
agentId: params.agentId,
|
|
});
|
|
const defaultModelLabel = `${defaultModelRef.provider}/${defaultModelRef.model}`;
|
|
const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({
|
|
config: params.config,
|
|
agentId: params.agentId,
|
|
workspaceDir: params.workspaceDir,
|
|
cwd: process.cwd(),
|
|
runtime: {
|
|
host: "clawdbot",
|
|
os: `${os.type()} ${os.release()}`,
|
|
arch: os.arch(),
|
|
node: process.version,
|
|
model: params.modelDisplay,
|
|
defaultModel: defaultModelLabel,
|
|
},
|
|
});
|
|
const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined;
|
|
return buildAgentSystemPrompt({
|
|
workspaceDir: params.workspaceDir,
|
|
defaultThinkLevel: params.defaultThinkLevel,
|
|
extraSystemPrompt: params.extraSystemPrompt,
|
|
ownerNumbers: params.ownerNumbers,
|
|
reasoningTagHint: false,
|
|
heartbeatPrompt: params.heartbeatPrompt,
|
|
docsPath: params.docsPath,
|
|
runtimeInfo,
|
|
toolNames: params.tools.map((tool) => tool.name),
|
|
modelAliasLines: buildModelAliasLines(params.config),
|
|
userTimezone,
|
|
userTime,
|
|
userTimeFormat,
|
|
contextFiles: params.contextFiles,
|
|
ttsHint,
|
|
});
|
|
}
|
|
|
|
export function normalizeCliModel(modelId: string, backend: CliBackendConfig): string {
|
|
const trimmed = modelId.trim();
|
|
if (!trimmed) return trimmed;
|
|
const direct = backend.modelAliases?.[trimmed];
|
|
if (direct) return direct;
|
|
const lower = trimmed.toLowerCase();
|
|
const mapped = backend.modelAliases?.[lower];
|
|
if (mapped) return mapped;
|
|
return trimmed;
|
|
}
|
|
|
|
function toUsage(raw: Record<string, unknown>): CliUsage | undefined {
|
|
const pick = (key: string) =>
|
|
typeof raw[key] === "number" && raw[key] > 0 ? (raw[key] as number) : undefined;
|
|
const input = pick("input_tokens") ?? pick("inputTokens");
|
|
const output = pick("output_tokens") ?? pick("outputTokens");
|
|
const cacheRead =
|
|
pick("cache_read_input_tokens") ?? pick("cached_input_tokens") ?? pick("cacheRead");
|
|
const cacheWrite = pick("cache_write_input_tokens") ?? pick("cacheWrite");
|
|
const total = pick("total_tokens") ?? pick("total");
|
|
if (!input && !output && !cacheRead && !cacheWrite && !total) return undefined;
|
|
return { input, output, cacheRead, cacheWrite, total };
|
|
}
|
|
|
|
function isRecord(value: unknown): value is Record<string, unknown> {
|
|
return Boolean(value && typeof value === "object" && !Array.isArray(value));
|
|
}
|
|
|
|
function collectText(value: unknown): string {
|
|
if (!value) return "";
|
|
if (typeof value === "string") return value;
|
|
if (Array.isArray(value)) return value.map((entry) => collectText(entry)).join("");
|
|
if (!isRecord(value)) return "";
|
|
if (typeof value.text === "string") return value.text;
|
|
if (typeof value.content === "string") return value.content;
|
|
if (Array.isArray(value.content))
|
|
return value.content.map((entry) => collectText(entry)).join("");
|
|
if (isRecord(value.message)) return collectText(value.message);
|
|
return "";
|
|
}
|
|
|
|
function pickSessionId(
|
|
parsed: Record<string, unknown>,
|
|
backend: CliBackendConfig,
|
|
): string | undefined {
|
|
const fields = backend.sessionIdFields ?? [
|
|
"session_id",
|
|
"sessionId",
|
|
"conversation_id",
|
|
"conversationId",
|
|
];
|
|
for (const field of fields) {
|
|
const value = parsed[field];
|
|
if (typeof value === "string" && value.trim()) return value.trim();
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
export function parseCliJson(raw: string, backend: CliBackendConfig): CliOutput | null {
|
|
const trimmed = raw.trim();
|
|
if (!trimmed) return null;
|
|
let parsed: unknown;
|
|
try {
|
|
parsed = JSON.parse(trimmed);
|
|
} catch {
|
|
return null;
|
|
}
|
|
if (!isRecord(parsed)) return null;
|
|
const sessionId = pickSessionId(parsed, backend);
|
|
const usage = isRecord(parsed.usage) ? toUsage(parsed.usage) : undefined;
|
|
const text =
|
|
collectText(parsed.message) ||
|
|
collectText(parsed.content) ||
|
|
collectText(parsed.result) ||
|
|
collectText(parsed);
|
|
return { text: text.trim(), sessionId, usage };
|
|
}
|
|
|
|
export function parseCliJsonl(raw: string, backend: CliBackendConfig): CliOutput | null {
|
|
const lines = raw
|
|
.split(/\r?\n/g)
|
|
.map((line) => line.trim())
|
|
.filter(Boolean);
|
|
if (lines.length === 0) return null;
|
|
let sessionId: string | undefined;
|
|
let usage: CliUsage | undefined;
|
|
const texts: string[] = [];
|
|
for (const line of lines) {
|
|
let parsed: unknown;
|
|
try {
|
|
parsed = JSON.parse(line);
|
|
} catch {
|
|
continue;
|
|
}
|
|
if (!isRecord(parsed)) continue;
|
|
if (!sessionId) sessionId = pickSessionId(parsed, backend);
|
|
if (!sessionId && typeof parsed.thread_id === "string") {
|
|
sessionId = parsed.thread_id.trim();
|
|
}
|
|
if (isRecord(parsed.usage)) {
|
|
usage = toUsage(parsed.usage) ?? usage;
|
|
}
|
|
const item = isRecord(parsed.item) ? parsed.item : null;
|
|
if (item && typeof item.text === "string") {
|
|
const type = typeof item.type === "string" ? item.type.toLowerCase() : "";
|
|
if (!type || type.includes("message")) {
|
|
texts.push(item.text);
|
|
}
|
|
}
|
|
}
|
|
const text = texts.join("\n").trim();
|
|
if (!text) return null;
|
|
return { text, sessionId, usage };
|
|
}
|
|
|
|
export function resolveSystemPromptUsage(params: {
|
|
backend: CliBackendConfig;
|
|
isNewSession: boolean;
|
|
systemPrompt?: string;
|
|
}): string | null {
|
|
const systemPrompt = params.systemPrompt?.trim();
|
|
if (!systemPrompt) return null;
|
|
const when = params.backend.systemPromptWhen ?? "first";
|
|
if (when === "never") return null;
|
|
if (when === "first" && !params.isNewSession) return null;
|
|
if (!params.backend.systemPromptArg?.trim()) return null;
|
|
return systemPrompt;
|
|
}
|
|
|
|
export function resolveSessionIdToSend(params: {
|
|
backend: CliBackendConfig;
|
|
cliSessionId?: string;
|
|
}): { sessionId?: string; isNew: boolean } {
|
|
const mode = params.backend.sessionMode ?? "always";
|
|
const existing = params.cliSessionId?.trim();
|
|
if (mode === "none") return { sessionId: undefined, isNew: !existing };
|
|
if (mode === "existing") return { sessionId: existing, isNew: !existing };
|
|
if (existing) return { sessionId: existing, isNew: false };
|
|
return { sessionId: crypto.randomUUID(), isNew: true };
|
|
}
|
|
|
|
export function resolvePromptInput(params: { backend: CliBackendConfig; prompt: string }): {
|
|
argsPrompt?: string;
|
|
stdin?: string;
|
|
} {
|
|
const inputMode = params.backend.input ?? "arg";
|
|
if (inputMode === "stdin") {
|
|
return { stdin: params.prompt };
|
|
}
|
|
if (params.backend.maxPromptArgChars && params.prompt.length > params.backend.maxPromptArgChars) {
|
|
return { stdin: params.prompt };
|
|
}
|
|
return { argsPrompt: params.prompt };
|
|
}
|
|
|
|
function resolveImageExtension(mimeType: string): string {
|
|
const normalized = mimeType.toLowerCase();
|
|
if (normalized.includes("png")) return "png";
|
|
if (normalized.includes("jpeg") || normalized.includes("jpg")) return "jpg";
|
|
if (normalized.includes("gif")) return "gif";
|
|
if (normalized.includes("webp")) return "webp";
|
|
return "bin";
|
|
}
|
|
|
|
export function appendImagePathsToPrompt(prompt: string, paths: string[]): string {
|
|
if (!paths.length) return prompt;
|
|
const trimmed = prompt.trimEnd();
|
|
const separator = trimmed ? "\n\n" : "";
|
|
return `${trimmed}${separator}${paths.join("\n")}`;
|
|
}
|
|
|
|
export async function writeCliImages(
|
|
images: ImageContent[],
|
|
): Promise<{ paths: string[]; cleanup: () => Promise<void> }> {
|
|
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-cli-images-"));
|
|
const paths: string[] = [];
|
|
for (let i = 0; i < images.length; i += 1) {
|
|
const image = images[i];
|
|
const ext = resolveImageExtension(image.mimeType);
|
|
const filePath = path.join(tempDir, `image-${i + 1}.${ext}`);
|
|
const buffer = Buffer.from(image.data, "base64");
|
|
await fs.writeFile(filePath, buffer, { mode: 0o600 });
|
|
paths.push(filePath);
|
|
}
|
|
const cleanup = async () => {
|
|
await fs.rm(tempDir, { recursive: true, force: true });
|
|
};
|
|
return { paths, cleanup };
|
|
}
|
|
|
|
export function buildCliArgs(params: {
|
|
backend: CliBackendConfig;
|
|
baseArgs: string[];
|
|
modelId: string;
|
|
sessionId?: string;
|
|
systemPrompt?: string | null;
|
|
imagePaths?: string[];
|
|
promptArg?: string;
|
|
useResume: boolean;
|
|
}): string[] {
|
|
const args: string[] = [...params.baseArgs];
|
|
if (!params.useResume && params.backend.modelArg && params.modelId) {
|
|
args.push(params.backend.modelArg, params.modelId);
|
|
}
|
|
if (!params.useResume && params.systemPrompt && params.backend.systemPromptArg) {
|
|
args.push(params.backend.systemPromptArg, params.systemPrompt);
|
|
}
|
|
if (!params.useResume && params.sessionId) {
|
|
if (params.backend.sessionArgs && params.backend.sessionArgs.length > 0) {
|
|
for (const entry of params.backend.sessionArgs) {
|
|
args.push(entry.replaceAll("{sessionId}", params.sessionId));
|
|
}
|
|
} else if (params.backend.sessionArg) {
|
|
args.push(params.backend.sessionArg, params.sessionId);
|
|
}
|
|
}
|
|
if (params.imagePaths && params.imagePaths.length > 0) {
|
|
const mode = params.backend.imageMode ?? "repeat";
|
|
const imageArg = params.backend.imageArg;
|
|
if (imageArg) {
|
|
if (mode === "list") {
|
|
args.push(imageArg, params.imagePaths.join(","));
|
|
} else {
|
|
for (const imagePath of params.imagePaths) {
|
|
args.push(imageArg, imagePath);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (params.promptArg !== undefined) {
|
|
args.push(params.promptArg);
|
|
}
|
|
return args;
|
|
}
|