import fs from "node:fs/promises"; import os from "node:os"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage, ImageContent } from "@mariozechner/pi-ai"; import { streamSimple } from "@mariozechner/pi-ai"; import { createAgentSession, SessionManager, SettingsManager } from "@mariozechner/pi-coding-agent"; import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js"; import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js"; import { getMachineDisplayName } from "../../../infra/machine-name.js"; import { resolveTelegramInlineButtonsScope } from "../../../telegram/inline-buttons.js"; import { resolveTelegramReactionLevel } from "../../../telegram/reaction-level.js"; import { normalizeMessageChannel } from "../../../utils/message-channel.js"; import { isReasoningTagProvider } from "../../../utils/provider-utils.js"; import { isSubagentSessionKey } from "../../../routing/session-key.js"; import { resolveUserPath } from "../../../utils.js"; import { resolveClawdbotAgentDir } from "../../agent-paths.js"; import { resolveSessionAgentIds } from "../../agent-scope.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js"; import { resolveModelAuthMode } from "../../model-auth.js"; import { isCloudCodeAssistFormatError, resolveBootstrapMaxChars, validateAnthropicTurns, validateGeminiTurns, } from "../../pi-embedded-helpers.js"; import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js"; import { ensurePiCompactionReserveTokens, resolveCompactionReserveTokensFloor, } from "../../pi-settings.js"; import { createClawdbotCodingTools } from "../../pi-tools.js"; import { resolveSandboxContext } from "../../sandbox.js"; import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js"; import { acquireSessionWriteLock } from "../../session-write-lock.js"; import { applySkillEnvOverrides, applySkillEnvOverridesFromSnapshot, loadWorkspaceSkillEntries, resolveSkillsPromptForRun, } from "../../skills.js"; import { buildSystemPromptReport } from "../../system-prompt-report.js"; import { isAbortError } from "../abort.js"; import { buildEmbeddedExtensionPaths } from "../extensions.js"; import { applyExtraParamsToAgent } from "../extra-params.js"; import { logToolSchemasForGoogle, sanitizeSessionHistory, sanitizeToolsForGoogle, } from "../google.js"; import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js"; import { log } from "../logger.js"; import { buildModelAliasLines } from "../model.js"; import { clearActiveEmbeddedRun, type EmbeddedPiQueueHandle, setActiveEmbeddedRun, } from "../runs.js"; import { buildEmbeddedSandboxInfo } from "../sandbox-info.js"; import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js"; import { prepareSessionManagerForRun } from "../session-manager-init.js"; import { buildEmbeddedSystemPrompt, createSystemPromptOverride } from "../system-prompt.js"; import { splitSdkTools } from "../tool-split.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../../date-time.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js"; import { isTimeoutError } from "../../failover-error.js"; import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; import { MAX_IMAGE_BYTES } from "../../../media/constants.js"; import { MAX_IMAGE_BYTES } from "../../../media/constants.js"; import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js"; import { detectAndLoadPromptImages } from "./images.js"; export async function runEmbeddedAttempt( params: EmbeddedRunAttemptParams, ): Promise { const resolvedWorkspace = resolveUserPath(params.workspaceDir); const prevCwd = process.cwd(); const runAbortController = new AbortController(); log.debug( `embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`, ); await fs.mkdir(resolvedWorkspace, { recursive: true }); const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId; const sandbox = await resolveSandboxContext({ config: params.config, sessionKey: sandboxSessionKey, workspaceDir: resolvedWorkspace, }); const effectiveWorkspace = sandbox?.enabled ? sandbox.workspaceAccess === "rw" ? resolvedWorkspace : sandbox.workspaceDir : resolvedWorkspace; await fs.mkdir(effectiveWorkspace, { recursive: true }); let restoreSkillEnv: (() => void) | undefined; process.chdir(effectiveWorkspace); try { const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; const skillEntries = shouldLoadSkillEntries ? loadWorkspaceSkillEntries(effectiveWorkspace) : []; restoreSkillEnv = params.skillsSnapshot ? applySkillEnvOverridesFromSnapshot({ snapshot: params.skillsSnapshot, config: params.config, }) : applySkillEnvOverrides({ skills: skillEntries ?? [], config: params.config, }); const skillsPrompt = resolveSkillsPromptForRun({ skillsSnapshot: params.skillsSnapshot, entries: shouldLoadSkillEntries ? skillEntries : undefined, config: params.config, workspaceDir: effectiveWorkspace, }); const sessionLabel = params.sessionKey ?? params.sessionId; const { bootstrapFiles: hookAdjustedBootstrapFiles, contextFiles } = await resolveBootstrapContextForRun({ workspaceDir: effectiveWorkspace, config: params.config, sessionKey: params.sessionKey, sessionId: params.sessionId, warn: makeBootstrapWarn({ sessionLabel, warn: (message) => log.warn(message) }), }); const agentDir = params.agentDir ?? resolveClawdbotAgentDir(); // Check if the model supports native image input const modelHasVision = params.model.input?.includes("image") ?? false; const toolsRaw = createClawdbotCodingTools({ exec: { ...params.execOverrides, elevated: params.bashElevated, }, sandbox, messageProvider: params.messageChannel ?? params.messageProvider, agentAccountId: params.agentAccountId, sessionKey: params.sessionKey ?? params.sessionId, agentDir, workspaceDir: effectiveWorkspace, config: params.config, abortSignal: runAbortController.signal, modelProvider: params.model.provider, modelId: params.modelId, modelAuthMode: resolveModelAuthMode(params.model.provider, params.config), currentChannelId: params.currentChannelId, currentThreadTs: params.currentThreadTs, replyToMode: params.replyToMode, hasRepliedRef: params.hasRepliedRef, modelHasVision, }); const tools = sanitizeToolsForGoogle({ tools: toolsRaw, provider: params.provider }); logToolSchemasForGoogle({ tools, provider: params.provider }); const machineName = await getMachineDisplayName(); const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider); let runtimeCapabilities = runtimeChannel ? (resolveChannelCapabilities({ cfg: params.config, channel: runtimeChannel, accountId: params.agentAccountId, }) ?? []) : undefined; if (runtimeChannel === "telegram" && params.config) { const inlineButtonsScope = resolveTelegramInlineButtonsScope({ cfg: params.config, accountId: params.agentAccountId ?? undefined, }); if (inlineButtonsScope !== "off") { if (!runtimeCapabilities) runtimeCapabilities = []; if ( !runtimeCapabilities.some((cap) => String(cap).trim().toLowerCase() === "inlinebuttons") ) { runtimeCapabilities.push("inlineButtons"); } } } const reactionGuidance = runtimeChannel === "telegram" && params.config ? (() => { const resolved = resolveTelegramReactionLevel({ cfg: params.config, accountId: params.agentAccountId ?? undefined, }); const level = resolved.agentReactionGuidance; return level ? { level, channel: "Telegram" } : undefined; })() : undefined; const runtimeInfo = { host: machineName, os: `${os.type()} ${os.release()}`, arch: os.arch(), node: process.version, model: `${params.provider}/${params.modelId}`, channel: runtimeChannel, capabilities: runtimeCapabilities, }; const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated); const reasoningTagHint = isReasoningTagProvider(params.provider); const userTimezone = resolveUserTimezone(params.config?.agents?.defaults?.userTimezone); const userTimeFormat = resolveUserTimeFormat(params.config?.agents?.defaults?.timeFormat); const userTime = formatUserTime(new Date(), userTimezone, userTimeFormat); const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({ sessionKey: params.sessionKey, config: params.config, }); const isDefaultAgent = sessionAgentId === defaultAgentId; const promptMode = isSubagentSessionKey(params.sessionKey) ? "minimal" : "full"; const appendPrompt = buildEmbeddedSystemPrompt({ workspaceDir: effectiveWorkspace, defaultThinkLevel: params.thinkLevel, reasoningLevel: params.reasoningLevel ?? "off", extraSystemPrompt: params.extraSystemPrompt, ownerNumbers: params.ownerNumbers, reasoningTagHint, heartbeatPrompt: isDefaultAgent ? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt) : undefined, skillsPrompt, reactionGuidance, promptMode, runtimeInfo, sandboxInfo, tools, modelAliasLines: buildModelAliasLines(params.config), userTimezone, userTime, userTimeFormat, contextFiles, }); const systemPromptReport = buildSystemPromptReport({ source: "run", generatedAt: Date.now(), sessionId: params.sessionId, sessionKey: params.sessionKey, provider: params.provider, model: params.modelId, workspaceDir: effectiveWorkspace, bootstrapMaxChars: resolveBootstrapMaxChars(params.config), sandbox: (() => { const runtime = resolveSandboxRuntimeStatus({ cfg: params.config, sessionKey: params.sessionKey ?? params.sessionId, }); return { mode: runtime.mode, sandboxed: runtime.sandboxed }; })(), systemPrompt: appendPrompt, bootstrapFiles: hookAdjustedBootstrapFiles, injectedFiles: contextFiles, skillsPrompt, tools, }); const systemPrompt = createSystemPromptOverride(appendPrompt); const sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, }); let sessionManager: ReturnType | undefined; let session: Awaited>["session"] | undefined; try { const hadSessionFile = await fs .stat(params.sessionFile) .then(() => true) .catch(() => false); await prewarmSessionFile(params.sessionFile); sessionManager = guardSessionManager(SessionManager.open(params.sessionFile)); trackSessionManagerAccess(params.sessionFile); await prepareSessionManagerForRun({ sessionManager, sessionFile: params.sessionFile, hadSessionFile, sessionId: params.sessionId, cwd: effectiveWorkspace, }); const settingsManager = SettingsManager.create(effectiveWorkspace, agentDir); ensurePiCompactionReserveTokens({ settingsManager, minReserveTokens: resolveCompactionReserveTokensFloor(params.config), }); const additionalExtensionPaths = buildEmbeddedExtensionPaths({ cfg: params.config, sessionManager, provider: params.provider, modelId: params.modelId, model: params.model, }); const { builtInTools, customTools } = splitSdkTools({ tools, sandboxEnabled: !!sandbox?.enabled, }); ({ session } = await createAgentSession({ cwd: resolvedWorkspace, agentDir, authStorage: params.authStorage, modelRegistry: params.modelRegistry, model: params.model, thinkingLevel: mapThinkingLevel(params.thinkLevel), systemPrompt, tools: builtInTools, customTools, sessionManager, settingsManager, skills: [], contextFiles: [], additionalExtensionPaths, })); if (!session) { throw new Error("Embedded agent session missing"); } const activeSession = session; // Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai. activeSession.agent.streamFn = streamSimple; applyExtraParamsToAgent(activeSession.agent, params.config, params.provider, params.modelId); try { const prior = await sanitizeSessionHistory({ messages: activeSession.messages, modelApi: params.model.api, modelId: params.modelId, provider: params.provider, sessionManager, sessionId: params.sessionId, }); const validatedGemini = validateGeminiTurns(prior); const validated = validateAnthropicTurns(validatedGemini); const limited = limitHistoryTurns( validated, getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), ); if (limited.length > 0) { activeSession.agent.replaceMessages(limited); } } catch (err) { sessionManager.flushPendingToolResults?.(); activeSession.dispose(); throw err; } let aborted = Boolean(params.abortSignal?.aborted); let timedOut = false; const getAbortReason = (signal: AbortSignal): unknown => "reason" in signal ? (signal as { reason?: unknown }).reason : undefined; const makeTimeoutAbortReason = (): Error => { const err = new Error("request timed out"); err.name = "TimeoutError"; return err; }; const makeAbortError = (signal: AbortSignal): Error => { const reason = getAbortReason(signal); const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted"); err.name = "AbortError"; return err; }; const abortRun = (isTimeout = false, reason?: unknown) => { aborted = true; if (isTimeout) timedOut = true; if (isTimeout) { runAbortController.abort(reason ?? makeTimeoutAbortReason()); } else { runAbortController.abort(reason); } void activeSession.abort(); }; const abortable = (promise: Promise): Promise => { const signal = runAbortController.signal; if (signal.aborted) { return Promise.reject(makeAbortError(signal)); } return new Promise((resolve, reject) => { const onAbort = () => { signal.removeEventListener("abort", onAbort); reject(makeAbortError(signal)); }; signal.addEventListener("abort", onAbort, { once: true }); promise.then( (value) => { signal.removeEventListener("abort", onAbort); resolve(value); }, (err) => { signal.removeEventListener("abort", onAbort); reject(err); }, ); }); }; const subscription = subscribeEmbeddedPiSession({ session: activeSession, runId: params.runId, verboseLevel: params.verboseLevel, reasoningMode: params.reasoningLevel ?? "off", toolResultFormat: params.toolResultFormat, shouldEmitToolResult: params.shouldEmitToolResult, shouldEmitToolOutput: params.shouldEmitToolOutput, onToolResult: params.onToolResult, onReasoningStream: params.onReasoningStream, onBlockReply: params.onBlockReply, onBlockReplyFlush: params.onBlockReplyFlush, blockReplyBreak: params.blockReplyBreak, blockReplyChunking: params.blockReplyChunking, onPartialReply: params.onPartialReply, onAssistantMessageStart: params.onAssistantMessageStart, onAgentEvent: params.onAgentEvent, enforceFinalTag: params.enforceFinalTag, }); const { assistantTexts, toolMetas, unsubscribe, waitForCompactionRetry, getMessagingToolSentTexts, getMessagingToolSentTargets, didSendViaMessagingTool, } = subscription; const queueHandle: EmbeddedPiQueueHandle = { queueMessage: async (text: string) => { await activeSession.steer(text); }, isStreaming: () => activeSession.isStreaming, isCompacting: () => subscription.isCompacting(), abort: abortRun, }; setActiveEmbeddedRun(params.sessionId, queueHandle); let abortWarnTimer: NodeJS.Timeout | undefined; const abortTimer = setTimeout( () => { log.warn( `embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`, ); abortRun(true); if (!abortWarnTimer) { abortWarnTimer = setTimeout(() => { if (!activeSession.isStreaming) return; log.warn( `embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`, ); }, 10_000); } }, Math.max(1, params.timeoutMs), ); let messagesSnapshot: AgentMessage[] = []; let sessionIdUsed = activeSession.sessionId; const onAbort = () => { const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined; const timeout = reason ? isTimeoutError(reason) : false; abortRun(timeout, reason); }; if (params.abortSignal) { if (params.abortSignal.aborted) { onAbort(); } else { params.abortSignal.addEventListener("abort", onAbort, { once: true, }); } } // Get hook runner once for both before_agent_start and agent_end hooks const hookRunner = getGlobalHookRunner(); let promptError: unknown = null; try { const promptStartedAt = Date.now(); // Run before_agent_start hooks to allow plugins to inject context let effectivePrompt = params.prompt; if (hookRunner?.hasHooks("before_agent_start")) { try { const hookResult = await hookRunner.runBeforeAgentStart( { prompt: params.prompt, messages: activeSession.messages, }, { agentId: params.sessionKey?.split(":")[0] ?? "main", sessionKey: params.sessionKey, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, }, ); if (hookResult?.prependContext) { effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`; log.debug( `hooks: prepended context to prompt (${hookResult.prependContext.length} chars)`, ); } } catch (hookErr) { log.warn(`before_agent_start hook failed: ${String(hookErr)}`); } } log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`); // Repair orphaned trailing user messages so new prompts don't violate role ordering. const leafEntry = sessionManager.getLeafEntry(); if (leafEntry?.type === "message" && leafEntry.message.role === "user") { if (leafEntry.parentId) { sessionManager.branch(leafEntry.parentId); } else { sessionManager.resetLeaf(); } const sessionContext = sessionManager.buildSessionContext(); activeSession.agent.replaceMessages(sessionContext.messages); log.warn( `Removed orphaned user message to prevent consecutive user turns. ` + `runId=${params.runId} sessionId=${params.sessionId}`, ); } try { // Detect and load images referenced in the prompt for vision-capable models. // This eliminates the need for an explicit "view" tool call by injecting // images directly into the prompt when the model supports it. // Also scans conversation history to enable follow-up questions about earlier images. const imageResult = await detectAndLoadPromptImages({ prompt: effectivePrompt, workspaceDir: effectiveWorkspace, model: params.model, existingImages: params.images, historyMessages: activeSession.messages, maxBytes: MAX_IMAGE_BYTES, // Enforce sandbox path restrictions when sandbox is enabled sandboxRoot: sandbox?.enabled ? sandbox.workspaceDir : undefined, }); // Inject history images into their original message positions. // This ensures the model sees images in context (e.g., "compare to the first image"). if (imageResult.historyImagesByIndex.size > 0) { for (const [msgIndex, images] of imageResult.historyImagesByIndex) { // Bounds check: ensure index is valid before accessing if (msgIndex < 0 || msgIndex >= activeSession.messages.length) continue; const msg = activeSession.messages[msgIndex]; if (msg && msg.role === "user") { // Convert string content to array format if needed if (typeof msg.content === "string") { msg.content = [{ type: "text", text: msg.content }]; } if (Array.isArray(msg.content)) { // Check for existing image content to avoid duplicates across turns const existingImageData = new Set( msg.content .filter((c): c is ImageContent => c != null && typeof c === "object" && c.type === "image" && typeof c.data === "string", ) .map((c) => c.data), ); for (const img of images) { // Only add if this image isn't already in the message if (!existingImageData.has(img.data)) { msg.content.push(img); } } } } } } // Only pass images option if there are actually images to pass // This avoids potential issues with models that don't expect the images parameter if (imageResult.images.length > 0) { await abortable(activeSession.prompt(effectivePrompt, { images: imageResult.images })); } else { await abortable(activeSession.prompt(effectivePrompt)); } } catch (err) { promptError = err; } finally { log.debug( `embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`, ); } try { await waitForCompactionRetry(); } catch (err) { if (isAbortError(err)) { if (!promptError) promptError = err; } else { throw err; } } messagesSnapshot = activeSession.messages.slice(); sessionIdUsed = activeSession.sessionId; // Run agent_end hooks to allow plugins to analyze the conversation // This is fire-and-forget, so we don't await if (hookRunner?.hasHooks("agent_end")) { hookRunner .runAgentEnd( { messages: messagesSnapshot, success: !aborted && !promptError, error: promptError ? describeUnknownError(promptError) : undefined, durationMs: Date.now() - promptStartedAt, }, { agentId: params.sessionKey?.split(":")[0] ?? "main", sessionKey: params.sessionKey, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, }, ) .catch((err) => { log.warn(`agent_end hook failed: ${err}`); }); } } finally { clearTimeout(abortTimer); if (abortWarnTimer) clearTimeout(abortWarnTimer); unsubscribe(); clearActiveEmbeddedRun(params.sessionId, queueHandle); params.abortSignal?.removeEventListener?.("abort", onAbort); } const lastAssistant = messagesSnapshot .slice() .reverse() .find((m) => (m as AgentMessage)?.role === "assistant") as AssistantMessage | undefined; const toolMetasNormalized = toolMetas .filter( (entry): entry is { toolName: string; meta?: string } => typeof entry.toolName === "string" && entry.toolName.trim().length > 0, ) .map((entry) => ({ toolName: entry.toolName, meta: entry.meta })); return { aborted, timedOut, promptError, sessionIdUsed, systemPromptReport, messagesSnapshot, assistantTexts, toolMetas: toolMetasNormalized, lastAssistant, didSendViaMessagingTool: didSendViaMessagingTool(), messagingToolSentTexts: getMessagingToolSentTexts(), messagingToolSentTargets: getMessagingToolSentTargets(), cloudCodeAssistFormatError: Boolean( lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage), ), }; } finally { // Always tear down the session (and release the lock) before we leave this attempt. sessionManager?.flushPendingToolResults?.(); session?.dispose(); await sessionLock.release(); } } finally { restoreSkillEnv?.(); process.chdir(prevCwd); } }