From 9717f2d3740ef2f788e72e30f648c20ad5187d9a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 22 Dec 2025 20:45:22 +0000 Subject: [PATCH] fix: bump pi deps and fix lint --- package.json | 6 +- pnpm-lock.yaml | 46 +- src/agents/pi-embedded-helpers.ts | 106 +++++ src/agents/pi-embedded-runner.ts | 413 ++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 215 +++++++++ src/agents/pi-embedded.ts | 700 +--------------------------- src/auto-reply/reply.ts | 4 +- src/auto-reply/status.ts | 7 +- src/gateway/protocol/index.ts | 4 +- src/gateway/protocol/schema.ts | 6 +- src/gateway/server.test.ts | 35 +- src/gateway/server.ts | 11 +- src/web/auto-reply.test.ts | 2 +- src/web/auto-reply.ts | 14 +- 14 files changed, 812 insertions(+), 757 deletions(-) create mode 100644 src/agents/pi-embedded-helpers.ts create mode 100644 src/agents/pi-embedded-runner.ts create mode 100644 src/agents/pi-embedded-subscribe.ts diff --git a/package.json b/package.json index 759032407..205707144 100644 --- a/package.json +++ b/package.json @@ -67,9 +67,9 @@ "dependencies": { "@grammyjs/transformer-throttler": "^1.2.1", "@homebridge/ciao": "^1.3.4", - "@mariozechner/pi-agent-core": "^0.26.0", - "@mariozechner/pi-ai": "^0.26.0", - "@mariozechner/pi-coding-agent": "^0.26.0", + "@mariozechner/pi-agent-core": "^0.27.1", + "@mariozechner/pi-ai": "^0.27.1", + "@mariozechner/pi-coding-agent": "^0.27.1", "@sinclair/typebox": "^0.34.41", "@whiskeysockets/baileys": "7.0.0-rc.9", "ajv": "^8.17.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6ca49372d..7e8aa3f59 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -15,14 +15,14 @@ importers: specifier: ^1.3.4 version: 1.3.4 '@mariozechner/pi-agent-core': - specifier: ^0.26.0 - version: 0.26.1(ws@8.18.3)(zod@4.2.1) + specifier: ^0.27.1 + version: 0.27.1(ws@8.18.3)(zod@4.2.1) '@mariozechner/pi-ai': - specifier: ^0.26.0 - version: 0.26.1(ws@8.18.3)(zod@4.2.1) + specifier: ^0.27.1 + version: 0.27.1(ws@8.18.3)(zod@4.2.1) '@mariozechner/pi-coding-agent': - specifier: ^0.26.0 - version: 0.26.1(ws@8.18.3)(zod@4.2.1) + specifier: ^0.27.1 + version: 0.27.1(ws@8.18.3)(zod@4.2.1) '@sinclair/typebox': specifier: ^0.34.41 version: 0.34.41 @@ -659,21 +659,21 @@ packages: peerDependencies: lit: ^3.3.1 - '@mariozechner/pi-agent-core@0.26.1': - resolution: {integrity: sha512-yH15oPK9l8F2vGrz2mXl0dRydKkw0x4p1WChVuQALqDaFOf48V2XbLS7SvTE3qx095ylNp/Q+RQ+NiB5I2myFA==} + '@mariozechner/pi-agent-core@0.27.1': + resolution: {integrity: sha512-EctMh4kIVc4HqsX6vbqkLqTey6/TmyxFg/vi2GQe03dkHYJkufiSOWp3Kei5emMGNjaNL0kPeJ3H0nwfobsWCg==} engines: {node: '>=20.0.0'} - '@mariozechner/pi-ai@0.26.1': - resolution: {integrity: sha512-VEH9kwQoo0N1KtBQnAHDZaIwe0nLwikGytNvjCV3RltQirywwUUsw0xQ/2YUXaN3vl3nqDO/VY1qgdSnVZE5iA==} + '@mariozechner/pi-ai@0.27.1': + resolution: {integrity: sha512-Y9eMs1vdHCIzpVZdg4MBMWlz9LKsyXlGDZVShPoSXgYU8Kg3/3DbkBVwBsEA7mTedKS6FDueWv9TYsFPydWMww==} engines: {node: '>=20.0.0'} - '@mariozechner/pi-coding-agent@0.26.1': - resolution: {integrity: sha512-o1WOhzwPQTiUBNxlANDXJ9bTOIIpxxkwRh9+nnz9F28uEzkSfTrJLTgJoWxuRAU7Xvj5//pkKYaUPfhCd69R9g==} + '@mariozechner/pi-coding-agent@0.27.1': + resolution: {integrity: sha512-dbuXgrz1b0O4GWiqyDuzurxH30PSwdb8gbHGb4NnHxTP8ECwsDbC2ykTQpg12iLBbzwB2Ze0Q+iuvnjoaonuAw==} engines: {node: '>=20.0.0'} hasBin: true - '@mariozechner/pi-tui@0.26.1': - resolution: {integrity: sha512-qGKS4SwxJw4pinttl3UvzylC1IuB31QpuoM3X36mz/GmLq52RNYnriK4si52GpeTrqNm8vXDpeevI0zhPQPjYw==} + '@mariozechner/pi-tui@0.27.1': + resolution: {integrity: sha512-rTVQ031UR/KKJ3/6f/pRLH7Fg/hOqVnXnVEY37J2txBWd6r9uZClAL0RThGlH2xHsB0+qgER2kgiS/Kwf9I76g==} engines: {node: '>=20.0.0'} '@mistralai/mistralai@1.10.0': @@ -2991,10 +2991,10 @@ snapshots: transitivePeerDependencies: - tailwindcss - '@mariozechner/pi-agent-core@0.26.1(ws@8.18.3)(zod@4.2.1)': + '@mariozechner/pi-agent-core@0.27.1(ws@8.18.3)(zod@4.2.1)': dependencies: - '@mariozechner/pi-ai': 0.26.1(ws@8.18.3)(zod@4.2.1) - '@mariozechner/pi-tui': 0.26.1 + '@mariozechner/pi-ai': 0.27.1(ws@8.18.3)(zod@4.2.1) + '@mariozechner/pi-tui': 0.27.1 transitivePeerDependencies: - '@modelcontextprotocol/sdk' - bufferutil @@ -3003,7 +3003,7 @@ snapshots: - ws - zod - '@mariozechner/pi-ai@0.26.1(ws@8.18.3)(zod@4.2.1)': + '@mariozechner/pi-ai@0.27.1(ws@8.18.3)(zod@4.2.1)': dependencies: '@anthropic-ai/sdk': 0.71.2(zod@4.2.1) '@google/genai': 1.34.0 @@ -3023,11 +3023,11 @@ snapshots: - ws - zod - '@mariozechner/pi-coding-agent@0.26.1(ws@8.18.3)(zod@4.2.1)': + '@mariozechner/pi-coding-agent@0.27.1(ws@8.18.3)(zod@4.2.1)': dependencies: - '@mariozechner/pi-agent-core': 0.26.1(ws@8.18.3)(zod@4.2.1) - '@mariozechner/pi-ai': 0.26.1(ws@8.18.3)(zod@4.2.1) - '@mariozechner/pi-tui': 0.26.1 + '@mariozechner/pi-agent-core': 0.27.1(ws@8.18.3)(zod@4.2.1) + '@mariozechner/pi-ai': 0.27.1(ws@8.18.3)(zod@4.2.1) + '@mariozechner/pi-tui': 0.27.1 chalk: 5.6.2 cli-highlight: 2.1.11 diff: 8.0.2 @@ -3042,7 +3042,7 @@ snapshots: - ws - zod - '@mariozechner/pi-tui@0.26.1': + '@mariozechner/pi-tui@0.27.1': dependencies: '@types/mime-types': 2.1.4 chalk: 5.6.2 diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts new file mode 100644 index 000000000..0514102bc --- /dev/null +++ b/src/agents/pi-embedded-helpers.ts @@ -0,0 +1,106 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +import type { AppMessage } from "@mariozechner/pi-agent-core"; +import type { AgentToolResult, AssistantMessage } from "@mariozechner/pi-ai"; + +import { sanitizeContentBlocksImages } from "./pi-tools.js"; +import type { WorkspaceBootstrapFile } from "./workspace.js"; + +export type EmbeddedContextFile = { path: string; content: string }; + +export async function ensureSessionHeader(params: { + sessionFile: string; + sessionId: string; + cwd: string; +}) { + const file = params.sessionFile; + try { + await fs.stat(file); + return; + } catch { + // create + } + await fs.mkdir(path.dirname(file), { recursive: true }); + const entry = { + type: "session", + id: params.sessionId, + timestamp: new Date().toISOString(), + cwd: params.cwd, + }; + await fs.writeFile(file, `${JSON.stringify(entry)}\n`, "utf-8"); +} + +type ContentBlock = AgentToolResult["content"][number]; + +export async function sanitizeSessionMessagesImages( + messages: AppMessage[], + label: string, +): Promise { + // We sanitize historical session messages because Anthropic can reject a request + // if the transcript contains oversized base64 images (see MAX_IMAGE_DIMENSION_PX). + const out: AppMessage[] = []; + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + out.push(msg); + continue; + } + + const role = (msg as { role?: unknown }).role; + if (role === "toolResult") { + const toolMsg = msg as Extract; + const content = Array.isArray(toolMsg.content) ? toolMsg.content : []; + const nextContent = (await sanitizeContentBlocksImages( + content as ContentBlock[], + label, + )) as unknown as typeof toolMsg.content; + out.push({ ...toolMsg, content: nextContent }); + continue; + } + + if (role === "user") { + const userMsg = msg as Extract; + const content = userMsg.content; + if (Array.isArray(content)) { + const nextContent = (await sanitizeContentBlocksImages( + content as unknown as ContentBlock[], + label, + )) as unknown as typeof userMsg.content; + out.push({ ...userMsg, content: nextContent }); + continue; + } + } + + out.push(msg); + } + return out; +} + +export function buildBootstrapContextFiles( + files: WorkspaceBootstrapFile[], +): EmbeddedContextFile[] { + return files.map((file) => ({ + path: file.name, + content: file.missing + ? `[MISSING] Expected at: ${file.path}` + : (file.content ?? ""), + })); +} + +export function formatAssistantErrorText( + msg: AssistantMessage, +): string | undefined { + if (msg.stopReason !== "error") return undefined; + const raw = (msg.errorMessage ?? "").trim(); + if (!raw) return "LLM request failed with an unknown error."; + + const invalidRequest = raw.match( + /"type":"invalid_request_error".*?"message":"([^"]+)"/, + ); + if (invalidRequest?.[1]) { + return `LLM request rejected: ${invalidRequest[1]}`; + } + + // Keep it short for WhatsApp. + return raw.length > 600 ? `${raw.slice(0, 600)}…` : raw; +} diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts new file mode 100644 index 000000000..ebae7b70a --- /dev/null +++ b/src/agents/pi-embedded-runner.ts @@ -0,0 +1,413 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import type { AppMessage, ThinkingLevel } from "@mariozechner/pi-agent-core"; +import type { Api, AssistantMessage, Model } from "@mariozechner/pi-ai"; +import { + buildSystemPrompt, + createAgentSession, + defaultGetApiKey, + findModelByProviderAndId, + SessionManager, + SettingsManager, + type Skill, +} from "@mariozechner/pi-coding-agent"; +import type { ThinkLevel, VerboseLevel } from "../auto-reply/thinking.js"; +import { formatToolAggregate } from "../auto-reply/tool-meta.js"; +import type { ClawdisConfig } from "../config/config.js"; +import { splitMediaFromOutput } from "../media/parse.js"; +import { enqueueCommand } from "../process/command-queue.js"; +import { resolveUserPath } from "../utils.js"; +import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js"; +import { + buildBootstrapContextFiles, + ensureSessionHeader, + formatAssistantErrorText, + sanitizeSessionMessagesImages, +} from "./pi-embedded-helpers.js"; +import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; +import { extractAssistantText } from "./pi-embedded-utils.js"; +import { createClawdisCodingTools } from "./pi-tools.js"; +import { + applySkillEnvOverrides, + applySkillEnvOverridesFromSnapshot, + buildWorkspaceSkillSnapshot, + loadWorkspaceSkillEntries, + type SkillEntry, + type SkillSnapshot, +} from "./skills.js"; +import { buildAgentSystemPromptAppend } from "./system-prompt.js"; +import { loadWorkspaceBootstrapFiles } from "./workspace.js"; + +export type EmbeddedPiAgentMeta = { + sessionId: string; + provider: string; + model: string; + usage?: { + input?: number; + output?: number; + cacheRead?: number; + cacheWrite?: number; + total?: number; + }; +}; + +export type EmbeddedPiRunMeta = { + durationMs: number; + agentMeta?: EmbeddedPiAgentMeta; + aborted?: boolean; +}; + +export type EmbeddedPiRunResult = { + payloads?: Array<{ + text?: string; + mediaUrl?: string; + mediaUrls?: string[]; + }>; + meta: EmbeddedPiRunMeta; +}; + +type EmbeddedPiQueueHandle = { + queueMessage: (text: string) => Promise; + isStreaming: () => boolean; +}; + +const ACTIVE_EMBEDDED_RUNS = new Map(); + +export function queueEmbeddedPiMessage( + sessionId: string, + text: string, +): boolean { + const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); + if (!handle) return false; + if (!handle.isStreaming()) return false; + void handle.queueMessage(text); + return true; +} + +function mapThinkingLevel(level?: ThinkLevel): ThinkingLevel { + // pi-agent-core supports "xhigh" too; Clawdis doesn't surface it for now. + if (!level) return "off"; + return level; +} + +function resolveModel( + provider: string, + modelId: string, + agentDir?: string, +): { model?: Model; error?: string } { + const model = findModelByProviderAndId( + provider, + modelId, + agentDir, + ) as Model | null; + if (!model) return { error: `Unknown model: ${provider}/${modelId}` }; + return { model }; +} + +const defaultApiKey = defaultGetApiKey(); + +async function getApiKeyForModel(model: Model): Promise { + if (model.provider === "anthropic") { + const oauthEnv = process.env.ANTHROPIC_OAUTH_TOKEN; + if (oauthEnv?.trim()) return oauthEnv.trim(); + } + const key = await defaultApiKey(model); + if (key) return key; + throw new Error(`No API key found for provider "${model.provider}"`); +} + +function resolvePromptSkills( + snapshot: SkillSnapshot, + entries: SkillEntry[], +): Skill[] { + if (snapshot.resolvedSkills?.length) { + return snapshot.resolvedSkills; + } + + const snapshotNames = snapshot.skills.map((entry) => entry.name); + if (snapshotNames.length === 0) return []; + + const entryByName = new Map( + entries.map((entry) => [entry.skill.name, entry.skill]), + ); + return snapshotNames + .map((name) => entryByName.get(name)) + .filter((skill): skill is Skill => Boolean(skill)); +} + +export async function runEmbeddedPiAgent(params: { + sessionId: string; + sessionFile: string; + workspaceDir: string; + config?: ClawdisConfig; + skillsSnapshot?: SkillSnapshot; + prompt: string; + provider?: string; + model?: string; + thinkLevel?: ThinkLevel; + verboseLevel?: VerboseLevel; + timeoutMs: number; + runId: string; + abortSignal?: AbortSignal; + shouldEmitToolResult?: () => boolean; + onPartialReply?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; + onToolResult?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; + onAgentEvent?: (evt: { + stream: string; + data: Record; + }) => void; + enqueue?: typeof enqueueCommand; +}): Promise { + const enqueue = params.enqueue ?? enqueueCommand; + return enqueue(async () => { + const started = Date.now(); + const resolvedWorkspace = resolveUserPath(params.workspaceDir); + const prevCwd = process.cwd(); + + const provider = + (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; + const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; + const agentDir = + process.env.PI_CODING_AGENT_DIR ?? + path.join(os.homedir(), ".pi", "agent"); + const { model, error } = resolveModel(provider, modelId, agentDir); + if (!model) { + throw new Error(error ?? `Unknown model: ${provider}/${modelId}`); + } + + const thinkingLevel = mapThinkingLevel(params.thinkLevel); + + await fs.mkdir(resolvedWorkspace, { recursive: true }); + await ensureSessionHeader({ + sessionFile: params.sessionFile, + sessionId: params.sessionId, + cwd: resolvedWorkspace, + }); + + let restoreSkillEnv: (() => void) | undefined; + process.chdir(resolvedWorkspace); + try { + const shouldLoadSkillEntries = + !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; + const skillEntries = shouldLoadSkillEntries + ? loadWorkspaceSkillEntries(resolvedWorkspace) + : []; + const skillsSnapshot = + params.skillsSnapshot ?? + buildWorkspaceSkillSnapshot(resolvedWorkspace, { + config: params.config, + entries: skillEntries, + }); + restoreSkillEnv = params.skillsSnapshot + ? applySkillEnvOverridesFromSnapshot({ + snapshot: params.skillsSnapshot, + config: params.config, + }) + : applySkillEnvOverrides({ + skills: skillEntries ?? [], + config: params.config, + }); + + const bootstrapFiles = + await loadWorkspaceBootstrapFiles(resolvedWorkspace); + const contextFiles = buildBootstrapContextFiles(bootstrapFiles); + const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries); + const tools = createClawdisCodingTools(); + const systemPrompt = buildSystemPrompt({ + appendPrompt: buildAgentSystemPromptAppend({ + workspaceDir: resolvedWorkspace, + defaultThinkLevel: params.thinkLevel, + }), + contextFiles, + skills: promptSkills, + cwd: resolvedWorkspace, + tools, + }); + + const sessionManager = SessionManager.open(params.sessionFile, agentDir); + const settingsManager = SettingsManager.create( + resolvedWorkspace, + agentDir, + ); + + const { session } = await createAgentSession({ + cwd: resolvedWorkspace, + agentDir, + model, + thinkingLevel, + systemPrompt, + // TODO(steipete): Once pi-mono publishes file-magic MIME detection in `read` image payloads, + // remove `createClawdisCodingTools()` and use upstream `codingTools` again. + tools, + sessionManager, + settingsManager, + getApiKey: async (m) => { + return await getApiKeyForModel(m as Model); + }, + skills: promptSkills, + contextFiles, + }); + + const prior = await sanitizeSessionMessagesImages( + session.messages, + "session:history", + ); + if (prior.length > 0) { + session.agent.replaceMessages(prior); + } + const queueHandle: EmbeddedPiQueueHandle = { + queueMessage: async (text: string) => { + await session.queueMessage(text); + }, + isStreaming: () => session.isStreaming, + }; + ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); + let aborted = Boolean(params.abortSignal?.aborted); + + const { + assistantTexts, + toolMetas, + unsubscribe, + flush: flushToolDebouncer, + } = subscribeEmbeddedPiSession({ + session, + runId: params.runId, + verboseLevel: params.verboseLevel, + shouldEmitToolResult: params.shouldEmitToolResult, + onToolResult: params.onToolResult, + onPartialReply: params.onPartialReply, + onAgentEvent: params.onAgentEvent, + }); + + const abortTimer = setTimeout( + () => { + aborted = true; + void session.abort(); + }, + Math.max(1, params.timeoutMs), + ); + + let messagesSnapshot: AppMessage[] = []; + let sessionIdUsed = session.sessionId; + const onAbort = () => { + aborted = true; + void session.abort(); + }; + if (params.abortSignal) { + if (params.abortSignal.aborted) { + onAbort(); + } else { + params.abortSignal.addEventListener("abort", onAbort, { once: true }); + } + } + let promptError: unknown | null = null; + try { + try { + await session.prompt(params.prompt); + } catch (err) { + promptError = err; + } finally { + messagesSnapshot = session.messages.slice(); + sessionIdUsed = session.sessionId; + } + } finally { + clearTimeout(abortTimer); + unsubscribe(); + flushToolDebouncer(); + if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { + ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); + } + session.dispose(); + params.abortSignal?.removeEventListener?.("abort", onAbort); + } + if (promptError && !aborted) { + throw promptError; + } + + const lastAssistant = messagesSnapshot + .slice() + .reverse() + .find((m) => (m as AppMessage)?.role === "assistant") as + | AssistantMessage + | undefined; + + const usage = lastAssistant?.usage; + const agentMeta: EmbeddedPiAgentMeta = { + sessionId: sessionIdUsed, + provider: lastAssistant?.provider ?? provider, + model: lastAssistant?.model ?? model.id, + usage: usage + ? { + input: usage.input, + output: usage.output, + cacheRead: usage.cacheRead, + cacheWrite: usage.cacheWrite, + total: usage.totalTokens, + } + : undefined, + }; + + const replyItems: Array<{ text: string; media?: string[] }> = []; + + const errorText = lastAssistant + ? formatAssistantErrorText(lastAssistant) + : undefined; + if (errorText) replyItems.push({ text: errorText }); + + const inlineToolResults = + params.verboseLevel === "on" && + !params.onPartialReply && + !params.onToolResult && + toolMetas.length > 0; + if (inlineToolResults) { + for (const { toolName, meta } of toolMetas) { + const agg = formatToolAggregate(toolName, meta ? [meta] : []); + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg); + if (cleanedText) + replyItems.push({ text: cleanedText, media: mediaUrls }); + } + } + + for (const text of assistantTexts.length + ? assistantTexts + : lastAssistant + ? [extractAssistantText(lastAssistant)] + : []) { + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); + if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue; + replyItems.push({ text: cleanedText, media: mediaUrls }); + } + + const payloads = replyItems + .map((item) => ({ + text: item.text?.trim() ? item.text.trim() : undefined, + mediaUrls: item.media?.length ? item.media : undefined, + mediaUrl: item.media?.[0], + })) + .filter( + (p) => + p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0), + ); + + return { + payloads: payloads.length ? payloads : undefined, + meta: { + durationMs: Date.now() - started, + agentMeta, + aborted, + }, + }; + } finally { + restoreSkillEnv?.(); + process.chdir(prevCwd); + } + }); +} diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts new file mode 100644 index 000000000..68b265259 --- /dev/null +++ b/src/agents/pi-embedded-subscribe.ts @@ -0,0 +1,215 @@ +import type { AgentEvent, AppMessage } from "@mariozechner/pi-agent-core"; +import type { AssistantMessage } from "@mariozechner/pi-ai"; +import type { AgentSession } from "@mariozechner/pi-coding-agent"; + +import { + createToolDebouncer, + formatToolAggregate, +} from "../auto-reply/tool-meta.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; +import { splitMediaFromOutput } from "../media/parse.js"; +import { + extractAssistantText, + inferToolMetaFromArgs, +} from "./pi-embedded-utils.js"; + +export function subscribeEmbeddedPiSession(params: { + session: AgentSession; + runId: string; + verboseLevel?: "off" | "on"; + shouldEmitToolResult?: () => boolean; + onToolResult?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; + onPartialReply?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; + onAgentEvent?: (evt: { + stream: string; + data: Record; + }) => void; +}) { + const assistantTexts: string[] = []; + const toolMetas: Array<{ toolName?: string; meta?: string }> = []; + const toolMetaById = new Map(); + let deltaBuffer = ""; + let lastStreamedAssistant: string | undefined; + + const toolDebouncer = createToolDebouncer((toolName, metas) => { + if (!params.onPartialReply) return; + const text = formatToolAggregate(toolName, metas); + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); + void params.onPartialReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + }); + + const unsubscribe = params.session.subscribe( + (evt: AgentEvent | { type: string; [k: string]: unknown }) => { + if (evt.type === "tool_execution_start") { + const toolName = String( + (evt as AgentEvent & { toolName: string }).toolName, + ); + const toolCallId = String( + (evt as AgentEvent & { toolCallId: string }).toolCallId, + ); + const args = (evt as AgentEvent & { args: unknown }).args; + const meta = inferToolMetaFromArgs(toolName, args); + toolMetaById.set(toolCallId, meta); + + emitAgentEvent({ + runId: params.runId, + stream: "tool", + data: { + phase: "start", + name: toolName, + toolCallId, + args: args as Record, + }, + }); + params.onAgentEvent?.({ + stream: "tool", + data: { phase: "start", name: toolName, toolCallId }, + }); + } + + if (evt.type === "tool_execution_end") { + const toolName = String( + (evt as AgentEvent & { toolName: string }).toolName, + ); + const toolCallId = String( + (evt as AgentEvent & { toolCallId: string }).toolCallId, + ); + const isError = Boolean( + (evt as AgentEvent & { isError: boolean }).isError, + ); + const meta = toolMetaById.get(toolCallId); + toolMetas.push({ toolName, meta }); + toolDebouncer.push(toolName, meta); + + emitAgentEvent({ + runId: params.runId, + stream: "tool", + data: { + phase: "result", + name: toolName, + toolCallId, + meta, + isError, + }, + }); + params.onAgentEvent?.({ + stream: "tool", + data: { + phase: "result", + name: toolName, + toolCallId, + meta, + isError, + }, + }); + + const emitToolResult = + typeof params.shouldEmitToolResult === "function" + ? params.shouldEmitToolResult() + : params.verboseLevel === "on"; + if (emitToolResult && params.onToolResult) { + const agg = formatToolAggregate(toolName, meta ? [meta] : undefined); + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg); + if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { + try { + void params.onToolResult({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + } catch { + // ignore tool result delivery failures + } + } + } + } + + if (evt.type === "message_update") { + const msg = (evt as AgentEvent & { message: AppMessage }).message; + if (msg?.role === "assistant") { + const assistantEvent = ( + evt as AgentEvent & { assistantMessageEvent?: unknown } + ).assistantMessageEvent; + const assistantRecord = + assistantEvent && typeof assistantEvent === "object" + ? (assistantEvent as Record) + : undefined; + const evtType = + typeof assistantRecord?.type === "string" + ? assistantRecord.type + : ""; + if ( + evtType === "text_delta" || + evtType === "text_start" || + evtType === "text_end" + ) { + const chunk = + typeof assistantRecord?.delta === "string" + ? assistantRecord.delta + : typeof assistantRecord?.content === "string" + ? assistantRecord.content + : ""; + if (chunk) { + deltaBuffer += chunk; + const next = deltaBuffer.trim(); + if (next && next !== lastStreamedAssistant) { + lastStreamedAssistant = next; + const { text: cleanedText, mediaUrls } = + splitMediaFromOutput(next); + emitAgentEvent({ + runId: params.runId, + stream: "assistant", + data: { + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, + }); + params.onAgentEvent?.({ + stream: "assistant", + data: { + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, + }); + if (params.onPartialReply) { + void params.onPartialReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + } + } + } + } + } + } + + if (evt.type === "message_end") { + const msg = (evt as AgentEvent & { message: AppMessage }).message; + if (msg?.role === "assistant") { + const text = extractAssistantText(msg as AssistantMessage); + if (text) assistantTexts.push(text); + deltaBuffer = ""; + } + } + + if (evt.type === "agent_end") { + toolDebouncer.flush(); + } + }, + ); + + return { + assistantTexts, + toolMetas, + unsubscribe, + flush: () => toolDebouncer.flush(), + }; +} diff --git a/src/agents/pi-embedded.ts b/src/agents/pi-embedded.ts index 70683e586..3611934e2 100644 --- a/src/agents/pi-embedded.ts +++ b/src/agents/pi-embedded.ts @@ -1,691 +1,9 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; - -import type { - AgentEvent, - AppMessage, - ThinkingLevel, -} from "@mariozechner/pi-agent-core"; -import type { - AgentToolResult, - Api, - AssistantMessage, - Model, -} from "@mariozechner/pi-ai"; -import { - buildSystemPrompt, - createAgentSession, - defaultGetApiKey, - findModel, - SessionManager, - SettingsManager, - type Skill, -} from "@mariozechner/pi-coding-agent"; -import type { ThinkLevel, VerboseLevel } from "../auto-reply/thinking.js"; -import { - createToolDebouncer, - formatToolAggregate, -} from "../auto-reply/tool-meta.js"; -import type { ClawdisConfig } from "../config/config.js"; -import { emitAgentEvent } from "../infra/agent-events.js"; -import { splitMediaFromOutput } from "../media/parse.js"; -import { enqueueCommand } from "../process/command-queue.js"; -import { resolveUserPath } from "../utils.js"; -import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js"; -import { - extractAssistantText, - inferToolMetaFromArgs, -} from "./pi-embedded-utils.js"; -import { - createClawdisCodingTools, - sanitizeContentBlocksImages, -} from "./pi-tools.js"; -import { - applySkillEnvOverrides, - applySkillEnvOverridesFromSnapshot, - buildWorkspaceSkillSnapshot, - loadWorkspaceSkillEntries, - type SkillEntry, - type SkillSnapshot, -} from "./skills.js"; -import { buildAgentSystemPromptAppend } from "./system-prompt.js"; -import { - loadWorkspaceBootstrapFiles, - type WorkspaceBootstrapFile, -} from "./workspace.js"; - -export type EmbeddedPiAgentMeta = { - sessionId: string; - provider: string; - model: string; - usage?: { - input?: number; - output?: number; - cacheRead?: number; - cacheWrite?: number; - total?: number; - }; -}; - -export type EmbeddedPiRunMeta = { - durationMs: number; - agentMeta?: EmbeddedPiAgentMeta; - aborted?: boolean; -}; - -export type EmbeddedPiRunResult = { - payloads?: Array<{ - text?: string; - mediaUrl?: string; - mediaUrls?: string[]; - }>; - meta: EmbeddedPiRunMeta; -}; - -type EmbeddedPiQueueHandle = { - queueMessage: (text: string) => Promise; - isStreaming: () => boolean; -}; - -const ACTIVE_EMBEDDED_RUNS = new Map(); - -export function queueEmbeddedPiMessage( - sessionId: string, - text: string, -): boolean { - const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); - if (!handle) return false; - if (!handle.isStreaming()) return false; - void handle.queueMessage(text); - return true; -} - -function mapThinkingLevel(level?: ThinkLevel): ThinkingLevel { - // pi-agent-core supports "xhigh" too; Clawdis doesn't surface it for now. - if (!level) return "off"; - return level; -} - -function resolveModel( - provider: string, - modelId: string, - agentDir?: string, -): { model?: Model; error?: string } { - const result = findModel(provider, modelId, agentDir); - return { - model: (result.model ?? undefined) as Model | undefined, - error: result.error ?? undefined, - }; -} - -async function ensureSessionHeader(params: { - sessionFile: string; - sessionId: string; - cwd: string; - provider: string; - modelId: string; - thinkingLevel: ThinkingLevel; -}) { - const file = params.sessionFile; - try { - await fs.stat(file); - return; - } catch { - // create - } - await fs.mkdir(path.dirname(file), { recursive: true }); - const entry = { - type: "session", - id: params.sessionId, - timestamp: new Date().toISOString(), - cwd: params.cwd, - provider: params.provider, - modelId: params.modelId, - thinkingLevel: params.thinkingLevel, - }; - await fs.writeFile(file, `${JSON.stringify(entry)}\n`, "utf-8"); -} - -const defaultApiKey = defaultGetApiKey(); - -async function getApiKeyForModel(model: { provider: string }): Promise { - if (model.provider === "anthropic") { - const oauthEnv = process.env.ANTHROPIC_OAUTH_TOKEN; - if (oauthEnv?.trim()) return oauthEnv.trim(); - } - const key = await defaultApiKey(model as unknown as Model); - if (key) return key; - throw new Error(`No API key found for provider "${model.provider}"`); -} - -type ContentBlock = AgentToolResult["content"][number]; - -type ContextFile = { path: string; content: string }; - -async function sanitizeSessionMessagesImages( - messages: AppMessage[], - label: string, -): Promise { - // We sanitize historical session messages because Anthropic can reject a request - // if the transcript contains oversized base64 images (see MAX_IMAGE_DIMENSION_PX). - const out: AppMessage[] = []; - for (const msg of messages) { - if (!msg || typeof msg !== "object") { - out.push(msg); - continue; - } - - const role = (msg as { role?: unknown }).role; - if (role === "toolResult") { - const toolMsg = msg as Extract; - const content = Array.isArray(toolMsg.content) ? toolMsg.content : []; - const nextContent = (await sanitizeContentBlocksImages( - content as ContentBlock[], - label, - )) as unknown as typeof toolMsg.content; - out.push({ ...toolMsg, content: nextContent }); - continue; - } - - if (role === "user") { - const userMsg = msg as Extract; - const content = userMsg.content; - if (Array.isArray(content)) { - const nextContent = (await sanitizeContentBlocksImages( - content as unknown as ContentBlock[], - label, - )) as unknown as typeof userMsg.content; - out.push({ ...userMsg, content: nextContent }); - continue; - } - } - - out.push(msg); - } - return out; -} - -function buildBootstrapContextFiles( - files: WorkspaceBootstrapFile[], -): ContextFile[] { - return files.map((file) => ({ - path: file.name, - content: file.missing - ? `[MISSING] Expected at: ${file.path}` - : (file.content ?? ""), - })); -} - -function resolvePromptSkills( - snapshot: SkillSnapshot, - entries: SkillEntry[], -): Skill[] { - if (snapshot.resolvedSkills?.length) { - return snapshot.resolvedSkills; - } - - const snapshotNames = snapshot.skills.map((entry) => entry.name); - if (snapshotNames.length === 0) return []; - - const entryByName = new Map( - entries.map((entry) => [entry.skill.name, entry.skill]), - ); - return snapshotNames - .map((name) => entryByName.get(name)) - .filter((skill): skill is Skill => Boolean(skill)); -} - -function formatAssistantErrorText(msg: AssistantMessage): string | undefined { - if (msg.stopReason !== "error") return undefined; - const raw = (msg.errorMessage ?? "").trim(); - if (!raw) return "LLM request failed with an unknown error."; - - const invalidRequest = raw.match( - /"type":"invalid_request_error".*?"message":"([^"]+)"/, - ); - if (invalidRequest?.[1]) { - return `LLM request rejected: ${invalidRequest[1]}`; - } - - // Keep it short for WhatsApp. - return raw.length > 600 ? `${raw.slice(0, 600)}…` : raw; -} - -export async function runEmbeddedPiAgent(params: { - sessionId: string; - sessionFile: string; - workspaceDir: string; - config?: ClawdisConfig; - skillsSnapshot?: SkillSnapshot; - prompt: string; - provider?: string; - model?: string; - thinkLevel?: ThinkLevel; - verboseLevel?: VerboseLevel; - timeoutMs: number; - runId: string; - abortSignal?: AbortSignal; - shouldEmitToolResult?: () => boolean; - onPartialReply?: (payload: { - text?: string; - mediaUrls?: string[]; - }) => void | Promise; - onToolResult?: (payload: { - text?: string; - mediaUrls?: string[]; - }) => void | Promise; - onAgentEvent?: (evt: { - stream: string; - data: Record; - }) => void; - enqueue?: typeof enqueueCommand; -}): Promise { - const enqueue = params.enqueue ?? enqueueCommand; - return enqueue(async () => { - const started = Date.now(); - const resolvedWorkspace = resolveUserPath(params.workspaceDir); - const prevCwd = process.cwd(); - - const provider = - (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; - const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; - const agentDir = - process.env.PI_CODING_AGENT_DIR ?? - path.join(os.homedir(), ".pi", "agent"); - const { model, error } = resolveModel(provider, modelId, agentDir); - if (!model) { - throw new Error(error ?? `Unknown model: ${provider}/${modelId}`); - } - - const thinkingLevel = mapThinkingLevel(params.thinkLevel); - - await fs.mkdir(resolvedWorkspace, { recursive: true }); - await ensureSessionHeader({ - sessionFile: params.sessionFile, - sessionId: params.sessionId, - cwd: resolvedWorkspace, - provider, - modelId, - thinkingLevel, - }); - - let restoreSkillEnv: (() => void) | undefined; - process.chdir(resolvedWorkspace); - try { - const shouldLoadSkillEntries = - !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; - const skillEntries = shouldLoadSkillEntries - ? loadWorkspaceSkillEntries(resolvedWorkspace) - : []; - const skillsSnapshot = - params.skillsSnapshot ?? - buildWorkspaceSkillSnapshot(resolvedWorkspace, { - config: params.config, - entries: skillEntries, - }); - restoreSkillEnv = params.skillsSnapshot - ? applySkillEnvOverridesFromSnapshot({ - snapshot: params.skillsSnapshot, - config: params.config, - }) - : applySkillEnvOverrides({ - skills: skillEntries ?? [], - config: params.config, - }); - - const bootstrapFiles = - await loadWorkspaceBootstrapFiles(resolvedWorkspace); - const contextFiles = buildBootstrapContextFiles(bootstrapFiles); - const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries); - const tools = createClawdisCodingTools(); - const systemPrompt = buildSystemPrompt({ - appendPrompt: buildAgentSystemPromptAppend({ - workspaceDir: resolvedWorkspace, - defaultThinkLevel: params.thinkLevel, - }), - contextFiles, - skills: promptSkills, - cwd: resolvedWorkspace, - }); - - const sessionManager = SessionManager.open(params.sessionFile, agentDir); - const settingsManager = SettingsManager.create( - resolvedWorkspace, - agentDir, - ); - - const { session } = await createAgentSession({ - cwd: resolvedWorkspace, - agentDir, - model, - thinkingLevel, - systemPrompt, - // TODO(steipete): Once pi-mono publishes file-magic MIME detection in `read` image payloads, - // remove `createClawdisCodingTools()` and use upstream `codingTools` again. - tools, - sessionManager, - settingsManager, - getApiKey: getApiKeyForModel, - skills: promptSkills, - contextFiles, - }); - - const prior = await sanitizeSessionMessagesImages( - session.messages, - "session:history", - ); - if (prior.length > 0) { - session.agent.replaceMessages(prior); - } - const queueHandle: EmbeddedPiQueueHandle = { - queueMessage: async (text: string) => { - await session.queueMessage(text); - }, - isStreaming: () => session.isStreaming, - }; - ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); - - const assistantTexts: string[] = []; - const toolDebouncer = createToolDebouncer((toolName, metas) => { - if (!params.onPartialReply) return; - const text = formatToolAggregate(toolName, metas); - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); - void params.onPartialReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }); - }); - - const toolMetas: Array<{ toolName?: string; meta?: string }> = []; - const toolMetaById = new Map(); - let deltaBuffer = ""; - let lastStreamedAssistant: string | undefined; - let aborted = Boolean(params.abortSignal?.aborted); - - const unsubscribe = session.subscribe( - (evt: AgentEvent | { type: string; [k: string]: unknown }) => { - if (evt.type === "tool_execution_start") { - const toolName = String( - (evt as AgentEvent & { toolName: string }).toolName, - ); - const toolCallId = String( - (evt as AgentEvent & { toolCallId: string }).toolCallId, - ); - const args = (evt as AgentEvent & { args: unknown }).args; - const meta = inferToolMetaFromArgs(toolName, args); - toolMetaById.set(toolCallId, meta); - - emitAgentEvent({ - runId: params.runId, - stream: "tool", - data: { - phase: "start", - name: toolName, - toolCallId, - args: args as Record, - }, - }); - params.onAgentEvent?.({ - stream: "tool", - data: { phase: "start", name: toolName, toolCallId }, - }); - } - - if (evt.type === "tool_execution_end") { - const toolName = String( - (evt as AgentEvent & { toolName: string }).toolName, - ); - const toolCallId = String( - (evt as AgentEvent & { toolCallId: string }).toolCallId, - ); - const isError = Boolean( - (evt as AgentEvent & { isError: boolean }).isError, - ); - const meta = toolMetaById.get(toolCallId); - toolMetas.push({ toolName, meta }); - toolDebouncer.push(toolName, meta); - - emitAgentEvent({ - runId: params.runId, - stream: "tool", - data: { - phase: "result", - name: toolName, - toolCallId, - meta, - isError, - }, - }); - params.onAgentEvent?.({ - stream: "tool", - data: { - phase: "result", - name: toolName, - toolCallId, - meta, - isError, - }, - }); - const emitToolResult = - typeof params.shouldEmitToolResult === "function" - ? params.shouldEmitToolResult() - : params.verboseLevel === "on"; - if (emitToolResult && params.onToolResult) { - const agg = formatToolAggregate( - toolName, - meta ? [meta] : undefined, - ); - const { text: cleanedText, mediaUrls } = - splitMediaFromOutput(agg); - if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { - try { - void params.onToolResult({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }); - } catch { - // ignore tool result delivery failures - } - } - } - } - - if (evt.type === "message_update") { - const msg = (evt as AgentEvent & { message: AppMessage }).message; - if (msg?.role === "assistant") { - const assistantEvent = ( - evt as AgentEvent & { assistantMessageEvent?: unknown } - ).assistantMessageEvent; - const assistantRecord = - assistantEvent && typeof assistantEvent === "object" - ? (assistantEvent as Record) - : undefined; - const evtType = - typeof assistantRecord?.type === "string" - ? assistantRecord.type - : ""; - if ( - evtType === "text_delta" || - evtType === "text_start" || - evtType === "text_end" - ) { - const chunk = - typeof assistantRecord?.delta === "string" - ? assistantRecord.delta - : typeof assistantRecord?.content === "string" - ? assistantRecord.content - : ""; - if (chunk) { - deltaBuffer += chunk; - const next = deltaBuffer.trim(); - if (next && next !== lastStreamedAssistant) { - lastStreamedAssistant = next; - const { text: cleanedText, mediaUrls } = - splitMediaFromOutput(next); - emitAgentEvent({ - runId: params.runId, - stream: "assistant", - data: { - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - params.onAgentEvent?.({ - stream: "assistant", - data: { - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - if (params.onPartialReply) { - void params.onPartialReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }); - } - } - } - } - } - } - - if (evt.type === "message_end") { - const msg = (evt as AgentEvent & { message: AppMessage }).message; - if (msg?.role === "assistant") { - const text = extractAssistantText(msg as AssistantMessage); - if (text) assistantTexts.push(text); - deltaBuffer = ""; - } - } - - if (evt.type === "agent_end") { - toolDebouncer.flush(); - } - }, - ); - - const abortTimer = setTimeout( - () => { - aborted = true; - void session.abort(); - }, - Math.max(1, params.timeoutMs), - ); - - let messagesSnapshot: AppMessage[] = []; - let sessionIdUsed = session.sessionId; - const onAbort = () => { - aborted = true; - void session.abort(); - }; - if (params.abortSignal) { - if (params.abortSignal.aborted) { - onAbort(); - } else { - params.abortSignal.addEventListener("abort", onAbort, { once: true }); - } - } - let promptError: unknown | null = null; - try { - try { - await session.prompt(params.prompt); - } catch (err) { - promptError = err; - } finally { - messagesSnapshot = session.messages.slice(); - sessionIdUsed = session.sessionId; - } - } finally { - clearTimeout(abortTimer); - unsubscribe(); - toolDebouncer.flush(); - if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { - ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); - } - session.dispose(); - params.abortSignal?.removeEventListener?.("abort", onAbort); - } - if (promptError && !aborted) { - throw promptError; - } - - const lastAssistant = messagesSnapshot - .slice() - .reverse() - .find((m) => (m as AppMessage)?.role === "assistant") as - | AssistantMessage - | undefined; - - const usage = lastAssistant?.usage; - const agentMeta: EmbeddedPiAgentMeta = { - sessionId: sessionIdUsed, - provider: lastAssistant?.provider ?? provider, - model: lastAssistant?.model ?? model.id, - usage: usage - ? { - input: usage.input, - output: usage.output, - cacheRead: usage.cacheRead, - cacheWrite: usage.cacheWrite, - total: usage.totalTokens, - } - : undefined, - }; - - const replyItems: Array<{ text: string; media?: string[] }> = []; - - const errorText = lastAssistant - ? formatAssistantErrorText(lastAssistant) - : undefined; - if (errorText) replyItems.push({ text: errorText }); - - const inlineToolResults = - params.verboseLevel === "on" && - !params.onPartialReply && - !params.onToolResult && - toolMetas.length > 0; - if (inlineToolResults) { - for (const { toolName, meta } of toolMetas) { - const agg = formatToolAggregate(toolName, meta ? [meta] : []); - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg); - if (cleanedText) - replyItems.push({ text: cleanedText, media: mediaUrls }); - } - } - - for (const text of assistantTexts.length - ? assistantTexts - : lastAssistant - ? [extractAssistantText(lastAssistant)] - : []) { - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); - if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue; - replyItems.push({ text: cleanedText, media: mediaUrls }); - } - - const payloads = replyItems - .map((item) => ({ - text: item.text?.trim() ? item.text.trim() : undefined, - mediaUrls: item.media?.length ? item.media : undefined, - mediaUrl: item.media?.[0], - })) - .filter( - (p) => - p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0), - ); - - return { - payloads: payloads.length ? payloads : undefined, - meta: { - durationMs: Date.now() - started, - agentMeta, - aborted, - }, - }; - } finally { - restoreSkillEnv?.(); - process.chdir(prevCwd); - } - }); -} +export type { + EmbeddedPiAgentMeta, + EmbeddedPiRunMeta, + EmbeddedPiRunResult, +} from "./pi-embedded-runner.js"; +export { + queueEmbeddedPiMessage, + runEmbeddedPiAgent, +} from "./pi-embedded-runner.js"; diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 7b251a69e..7752859b9 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -35,8 +35,8 @@ import { normalizeE164 } from "../utils.js"; import { resolveHeartbeatSeconds } from "../web/reconnect.js"; import { getWebAuthAgeMs, webAuthExists } from "../web/session.js"; import { - parseActivationCommand, normalizeGroupActivation, + parseActivationCommand, } from "./group-activation.js"; import { buildStatusMessage } from "./status.js"; import type { MsgContext, TemplateContext } from "./templating.js"; @@ -46,9 +46,9 @@ import { type ThinkLevel, type VerboseLevel, } from "./thinking.js"; +import { SILENT_REPLY_TOKEN } from "./tokens.js"; import { isAudio, transcribeInboundAudio } from "./transcription.js"; import type { GetReplyOptions, ReplyPayload } from "./types.js"; -import { SILENT_REPLY_TOKEN } from "./tokens.js"; export type { GetReplyOptions, ReplyPayload } from "./types.js"; diff --git a/src/auto-reply/status.ts b/src/auto-reply/status.ts index 39af72a48..774cf0894 100644 --- a/src/auto-reply/status.ts +++ b/src/auto-reply/status.ts @@ -181,10 +181,9 @@ export function buildStatusMessage(args: StatusArgs): string { .filter(Boolean) .join(" • "); - const groupActivationLine = - args.sessionKey?.startsWith("group:") - ? `Group activation: ${entry?.groupActivation ?? "mention"}` - : undefined; + const groupActivationLine = args.sessionKey?.startsWith("group:") + ? `Group activation: ${entry?.groupActivation ?? "mention"}` + : undefined; const contextLine = `Context: ${formatTokens( totalTokens, diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index c3162d432..1a0c69fed 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -71,12 +71,12 @@ import { type ResponseFrame, ResponseFrameSchema, SendParamsSchema, - type SessionsListParams, - SessionsListParamsSchema, type SessionsCompactParams, SessionsCompactParamsSchema, type SessionsDeleteParams, SessionsDeleteParamsSchema, + type SessionsListParams, + SessionsListParamsSchema, type SessionsPatchParams, SessionsPatchParamsSchema, type SessionsResetParams, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index a27da20e0..72b4fa803 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -292,7 +292,11 @@ export const SessionsPatchParamsSchema = Type.Object( thinkingLevel: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), verboseLevel: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), groupActivation: Type.Optional( - Type.Union([Type.Literal("mention"), Type.Literal("always"), Type.Null()]), + Type.Union([ + Type.Literal("mention"), + Type.Literal("always"), + Type.Null(), + ]), ), syncing: Type.Optional( Type.Union([Type.Boolean(), NonEmptyString, Type.Null()]), diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 05f92abdd..e54f3557e 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -3390,14 +3390,16 @@ describe("gateway server", () => { await fs.writeFile( path.join(dir, "sess-main.jsonl"), - Array.from({ length: 10 }) - .map((_, idx) => JSON.stringify({ role: "user", content: `line ${idx}` })) - .join("\n") + "\n", + `${Array.from({ length: 10 }) + .map((_, idx) => + JSON.stringify({ role: "user", content: `line ${idx}` }), + ) + .join("\n")}\n`, "utf-8", ); await fs.writeFile( path.join(dir, "sess-group.jsonl"), - JSON.stringify({ role: "user", content: "group line 0" }) + "\n", + `${JSON.stringify({ role: "user", content: "group line 0" })}\n`, "utf-8", ); @@ -3532,8 +3534,9 @@ describe("gateway server", () => { .filter((l) => l.trim().length > 0); expect(compactedLines).toHaveLength(3); const filesAfterCompact = await fs.readdir(dir); - expect(filesAfterCompact.some((f) => f.startsWith("sess-main.jsonl.bak."))) - .toBe(true); + expect( + filesAfterCompact.some((f) => f.startsWith("sess-main.jsonl.bak.")), + ).toBe(true); const deleted = await rpcReq<{ ok: true; deleted: boolean }>( ws, @@ -3546,17 +3549,19 @@ describe("gateway server", () => { sessions: Array<{ key: string }>; }>(ws, "sessions.list", {}); expect(listAfterDelete.ok).toBe(true); - expect(listAfterDelete.payload?.sessions.some((s) => s.key === "group:dev")) - .toBe(false); + expect( + listAfterDelete.payload?.sessions.some((s) => s.key === "group:dev"), + ).toBe(false); const filesAfterDelete = await fs.readdir(dir); - expect(filesAfterDelete.some((f) => f.startsWith("sess-group.jsonl.deleted."))) - .toBe(true); + expect( + filesAfterDelete.some((f) => f.startsWith("sess-group.jsonl.deleted.")), + ).toBe(true); - const reset = await rpcReq<{ ok: true; key: string; entry: { sessionId: string } }>( - ws, - "sessions.reset", - { key: "main" }, - ); + const reset = await rpcReq<{ + ok: true; + key: string; + entry: { sessionId: string }; + }>(ws, "sessions.reset", { key: "main" }); expect(reset.ok).toBe(true); expect(reset.payload?.key).toBe("main"); expect(reset.payload?.entry.sessionId).not.toBe("sess-main"); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 4ff0e0a5a..b4bc7f32d 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -271,9 +271,9 @@ import { formatValidationErrors, PROTOCOL_VERSION, type RequestFrame, - type SessionsListParams, type SessionsCompactParams, type SessionsDeleteParams, + type SessionsListParams, type SessionsPatchParams, type SessionsResetParams, type Snapshot, @@ -303,9 +303,9 @@ import { validateProvidersStatusParams, validateRequestFrame, validateSendParams, - validateSessionsListParams, validateSessionsCompactParams, validateSessionsDeleteParams, + validateSessionsListParams, validateSessionsPatchParams, validateSessionsResetParams, validateSkillsInstallParams, @@ -718,7 +718,6 @@ function readSessionMessages( if (!line.trim()) continue; try { const parsed = JSON.parse(line); - // pi/tau logs either raw message or wrapper { message } if (parsed?.message) { messages.push(parsed.message); } else if (parsed?.role && parsed?.content) { @@ -2183,8 +2182,10 @@ export async function startGatewayServer( }; } - const filePath = resolveSessionTranscriptCandidates(sessionId, storePath) - .find((candidate) => fs.existsSync(candidate)); + const filePath = resolveSessionTranscriptCandidates( + sessionId, + storePath, + ).find((candidate) => fs.existsSync(candidate)); if (!filePath) { return { ok: true, diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 6bd23e194..18277331e 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -19,11 +19,11 @@ import * as commandQueue from "../process/command-queue.js"; import { HEARTBEAT_PROMPT, HEARTBEAT_TOKEN, - SILENT_REPLY_TOKEN, monitorWebProvider, resolveHeartbeatRecipients, resolveReplyHeartbeatMinutes, runWebHeartbeatOnce, + SILENT_REPLY_TOKEN, stripHeartbeatToken, } from "./auto-reply.js"; import type { sendMessageWhatsApp } from "./outbound.js"; diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 3f6871be3..ec14ff97e 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1,15 +1,12 @@ import { chunkText } from "../auto-reply/chunk.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; -import { getReplyFromConfig } from "../auto-reply/reply.js"; -import type { ReplyPayload } from "../auto-reply/types.js"; import { normalizeGroupActivation, parseActivationCommand, } from "../auto-reply/group-activation.js"; -import { - HEARTBEAT_TOKEN, - SILENT_REPLY_TOKEN, -} from "../auto-reply/tokens.js"; +import { getReplyFromConfig } from "../auto-reply/reply.js"; +import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; +import type { ReplyPayload } from "../auto-reply/types.js"; import { waitForever } from "../cli/wait.js"; import { loadConfig } from "../config/config.js"; import { @@ -1244,10 +1241,7 @@ export async function monitorWebProvider( if (msg.chatType === "group") { noteGroupMember(conversationId, msg.senderE164, msg.senderName); - const commandBody = stripMentionsForCommand( - msg.body, - msg.selfE164, - ); + const commandBody = stripMentionsForCommand(msg.body, msg.selfE164); const activationCommand = parseActivationCommand(commandBody); const isOwner = isOwnerSender(msg); const statusCommand = isStatusCommand(commandBody);