Files
clawdbot/src/agents/pi-embedded-runner/run/attempt.ts
2026-01-15 01:06:35 +00:00

477 lines
17 KiB
TypeScript

import fs from "node:fs/promises";
import os from "node:os";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { streamSimple } from "@mariozechner/pi-ai";
import { createAgentSession, SessionManager, SettingsManager } from "@mariozechner/pi-coding-agent";
import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js";
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
import { getMachineDisplayName } from "../../../infra/machine-name.js";
import { normalizeMessageChannel } from "../../../utils/message-channel.js";
import { isReasoningTagProvider } from "../../../utils/provider-utils.js";
import { resolveUserPath } from "../../../utils.js";
import { resolveClawdbotAgentDir } from "../../agent-paths.js";
import { resolveSessionAgentIds } from "../../agent-scope.js";
import { resolveModelAuthMode } from "../../model-auth.js";
import {
buildBootstrapContextFiles,
isCloudCodeAssistFormatError,
resolveBootstrapMaxChars,
validateAnthropicTurns,
validateGeminiTurns,
} from "../../pi-embedded-helpers.js";
import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js";
import {
ensurePiCompactionReserveTokens,
resolveCompactionReserveTokensFloor,
} from "../../pi-settings.js";
import { createClawdbotCodingTools } from "../../pi-tools.js";
import { resolveSandboxContext } from "../../sandbox.js";
import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js";
import { acquireSessionWriteLock } from "../../session-write-lock.js";
import {
applySkillEnvOverrides,
applySkillEnvOverridesFromSnapshot,
loadWorkspaceSkillEntries,
resolveSkillsPromptForRun,
} from "../../skills.js";
import { buildSystemPromptReport } from "../../system-prompt-report.js";
import { filterBootstrapFilesForSession, loadWorkspaceBootstrapFiles } from "../../workspace.js";
import { isAbortError } from "../abort.js";
import { buildEmbeddedExtensionPaths } from "../extensions.js";
import { applyExtraParamsToAgent } from "../extra-params.js";
import { logToolSchemasForGoogle, sanitizeSessionHistory } from "../google.js";
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js";
import { log } from "../logger.js";
import { buildModelAliasLines } from "../model.js";
import {
clearActiveEmbeddedRun,
type EmbeddedPiQueueHandle,
setActiveEmbeddedRun,
} from "../runs.js";
import { buildEmbeddedSandboxInfo } from "../sandbox-info.js";
import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js";
import { prepareSessionManagerForRun } from "../session-manager-init.js";
import { buildEmbeddedSystemPrompt, createSystemPromptOverride } from "../system-prompt.js";
import { splitSdkTools } from "../tool-split.js";
import {
formatUserTime,
mapThinkingLevel,
resolveExecToolDefaults,
resolveUserTimezone,
} from "../utils.js";
import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js";
import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js";
export async function runEmbeddedAttempt(
params: EmbeddedRunAttemptParams,
): Promise<EmbeddedRunAttemptResult> {
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const runAbortController = new AbortController();
log.debug(
`embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`,
);
await fs.mkdir(resolvedWorkspace, { recursive: true });
const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId;
const sandbox = await resolveSandboxContext({
config: params.config,
sessionKey: sandboxSessionKey,
workspaceDir: resolvedWorkspace,
});
const effectiveWorkspace = sandbox?.enabled
? sandbox.workspaceAccess === "rw"
? resolvedWorkspace
: sandbox.workspaceDir
: resolvedWorkspace;
await fs.mkdir(effectiveWorkspace, { recursive: true });
let restoreSkillEnv: (() => void) | undefined;
process.chdir(effectiveWorkspace);
try {
const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(effectiveWorkspace)
: [];
restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot,
config: params.config,
})
: applySkillEnvOverrides({
skills: skillEntries ?? [],
config: params.config,
});
const skillsPrompt = resolveSkillsPromptForRun({
skillsSnapshot: params.skillsSnapshot,
entries: shouldLoadSkillEntries ? skillEntries : undefined,
config: params.config,
workspaceDir: effectiveWorkspace,
});
const bootstrapFiles = filterBootstrapFilesForSession(
await loadWorkspaceBootstrapFiles(effectiveWorkspace),
params.sessionKey ?? params.sessionId,
);
const sessionLabel = params.sessionKey ?? params.sessionId;
const contextFiles = buildBootstrapContextFiles(bootstrapFiles, {
maxChars: resolveBootstrapMaxChars(params.config),
warn: (message) => log.warn(`${message} (sessionKey=${sessionLabel})`),
});
const agentDir = params.agentDir ?? resolveClawdbotAgentDir();
const tools = createClawdbotCodingTools({
exec: {
...resolveExecToolDefaults(params.config),
elevated: params.bashElevated,
},
sandbox,
messageProvider: params.messageChannel ?? params.messageProvider,
agentAccountId: params.agentAccountId,
sessionKey: params.sessionKey ?? params.sessionId,
agentDir,
workspaceDir: effectiveWorkspace,
config: params.config,
abortSignal: runAbortController.signal,
modelProvider: params.model.provider,
modelId: params.modelId,
modelAuthMode: resolveModelAuthMode(params.model.provider, params.config),
currentChannelId: params.currentChannelId,
currentThreadTs: params.currentThreadTs,
replyToMode: params.replyToMode,
hasRepliedRef: params.hasRepliedRef,
});
logToolSchemasForGoogle({ tools, provider: params.provider });
const machineName = await getMachineDisplayName();
const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider);
const runtimeCapabilities = runtimeChannel
? (resolveChannelCapabilities({
cfg: params.config,
channel: runtimeChannel,
accountId: params.agentAccountId,
}) ?? [])
: undefined;
const runtimeInfo = {
host: machineName,
os: `${os.type()} ${os.release()}`,
arch: os.arch(),
node: process.version,
model: `${params.provider}/${params.modelId}`,
channel: runtimeChannel,
capabilities: runtimeCapabilities,
};
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated);
const reasoningTagHint = isReasoningTagProvider(params.provider);
const userTimezone = resolveUserTimezone(params.config?.agents?.defaults?.userTimezone);
const userTime = formatUserTime(new Date(), userTimezone);
const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({
sessionKey: params.sessionKey,
config: params.config,
});
const isDefaultAgent = sessionAgentId === defaultAgentId;
const appendPrompt = buildEmbeddedSystemPrompt({
workspaceDir: effectiveWorkspace,
defaultThinkLevel: params.thinkLevel,
reasoningLevel: params.reasoningLevel ?? "off",
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
reasoningTagHint,
heartbeatPrompt: isDefaultAgent
? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt)
: undefined,
skillsPrompt,
runtimeInfo,
sandboxInfo,
tools,
modelAliasLines: buildModelAliasLines(params.config),
userTimezone,
userTime,
contextFiles,
});
const systemPromptReport = buildSystemPromptReport({
source: "run",
generatedAt: Date.now(),
sessionId: params.sessionId,
sessionKey: params.sessionKey,
provider: params.provider,
model: params.modelId,
workspaceDir: effectiveWorkspace,
bootstrapMaxChars: resolveBootstrapMaxChars(params.config),
sandbox: (() => {
const runtime = resolveSandboxRuntimeStatus({
cfg: params.config,
sessionKey: params.sessionKey ?? params.sessionId,
});
return { mode: runtime.mode, sandboxed: runtime.sandboxed };
})(),
systemPrompt: appendPrompt,
bootstrapFiles,
injectedFiles: contextFiles,
skillsPrompt,
tools,
});
const systemPrompt = createSystemPromptOverride(appendPrompt);
const sessionLock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
});
let sessionManager: ReturnType<typeof guardSessionManager> | undefined;
let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined;
try {
const hadSessionFile = await fs
.stat(params.sessionFile)
.then(() => true)
.catch(() => false);
await prewarmSessionFile(params.sessionFile);
sessionManager = guardSessionManager(SessionManager.open(params.sessionFile));
trackSessionManagerAccess(params.sessionFile);
await prepareSessionManagerForRun({
sessionManager,
sessionFile: params.sessionFile,
hadSessionFile,
sessionId: params.sessionId,
cwd: effectiveWorkspace,
});
const settingsManager = SettingsManager.create(effectiveWorkspace, agentDir);
ensurePiCompactionReserveTokens({
settingsManager,
minReserveTokens: resolveCompactionReserveTokensFloor(params.config),
});
const additionalExtensionPaths = buildEmbeddedExtensionPaths({
cfg: params.config,
sessionManager,
provider: params.provider,
modelId: params.modelId,
model: params.model,
});
const { builtInTools, customTools } = splitSdkTools({
tools,
sandboxEnabled: !!sandbox?.enabled,
});
({ session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage: params.authStorage,
modelRegistry: params.modelRegistry,
model: params.model,
thinkingLevel: mapThinkingLevel(params.thinkLevel),
systemPrompt,
tools: builtInTools,
customTools,
sessionManager,
settingsManager,
skills: [],
contextFiles: [],
additionalExtensionPaths,
}));
if (!session) {
throw new Error("Embedded agent session missing");
}
const activeSession = session;
// Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai.
activeSession.agent.streamFn = streamSimple;
applyExtraParamsToAgent(
activeSession.agent,
params.config,
params.provider,
params.modelId,
params.thinkLevel,
);
try {
const prior = await sanitizeSessionHistory({
messages: activeSession.messages,
modelApi: params.model.api,
sessionManager,
sessionId: params.sessionId,
});
const validatedGemini = validateGeminiTurns(prior);
const validated = validateAnthropicTurns(validatedGemini);
const limited = limitHistoryTurns(
validated,
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
);
if (limited.length > 0) {
activeSession.agent.replaceMessages(limited);
}
} catch (err) {
sessionManager.flushPendingToolResults?.();
activeSession.dispose();
throw err;
}
let aborted = Boolean(params.abortSignal?.aborted);
let timedOut = false;
const abortRun = (isTimeout = false) => {
aborted = true;
if (isTimeout) timedOut = true;
runAbortController.abort();
void activeSession.abort();
};
const subscription = subscribeEmbeddedPiSession({
session: activeSession,
runId: params.runId,
verboseLevel: params.verboseLevel,
reasoningMode: params.reasoningLevel ?? "off",
shouldEmitToolResult: params.shouldEmitToolResult,
onToolResult: params.onToolResult,
onReasoningStream: params.onReasoningStream,
onBlockReply: params.onBlockReply,
onBlockReplyFlush: params.onBlockReplyFlush,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAssistantMessageStart: params.onAssistantMessageStart,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
});
const {
assistantTexts,
toolMetas,
unsubscribe,
waitForCompactionRetry,
getMessagingToolSentTexts,
getMessagingToolSentTargets,
didSendViaMessagingTool,
} = subscription;
const queueHandle: EmbeddedPiQueueHandle = {
queueMessage: async (text: string) => {
await activeSession.steer(text);
},
isStreaming: () => activeSession.isStreaming,
isCompacting: () => subscription.isCompacting(),
abort: abortRun,
};
setActiveEmbeddedRun(params.sessionId, queueHandle);
let abortWarnTimer: NodeJS.Timeout | undefined;
const abortTimer = setTimeout(
() => {
log.warn(
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
);
abortRun(true);
if (!abortWarnTimer) {
abortWarnTimer = setTimeout(() => {
if (!activeSession.isStreaming) return;
log.warn(
`embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`,
);
}, 10_000);
}
},
Math.max(1, params.timeoutMs),
);
let messagesSnapshot: AgentMessage[] = [];
let sessionIdUsed = activeSession.sessionId;
const onAbort = () => abortRun();
if (params.abortSignal) {
if (params.abortSignal.aborted) {
onAbort();
} else {
params.abortSignal.addEventListener("abort", onAbort, {
once: true,
});
}
}
let promptError: unknown = null;
try {
const promptStartedAt = Date.now();
log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`);
try {
await activeSession.prompt(params.prompt, { images: params.images });
} catch (err) {
promptError = err;
} finally {
log.debug(
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
);
}
try {
await waitForCompactionRetry();
} catch (err) {
if (isAbortError(err)) {
if (!promptError) promptError = err;
} else {
throw err;
}
}
messagesSnapshot = activeSession.messages.slice();
sessionIdUsed = activeSession.sessionId;
} finally {
clearTimeout(abortTimer);
if (abortWarnTimer) clearTimeout(abortWarnTimer);
unsubscribe();
clearActiveEmbeddedRun(params.sessionId, queueHandle);
params.abortSignal?.removeEventListener?.("abort", onAbort);
}
const lastAssistant = messagesSnapshot
.slice()
.reverse()
.find((m) => (m as AgentMessage)?.role === "assistant") as AssistantMessage | undefined;
const toolMetasNormalized = toolMetas
.filter(
(entry): entry is { toolName: string; meta?: string } =>
typeof entry.toolName === "string" && entry.toolName.trim().length > 0,
)
.map((entry) => ({ toolName: entry.toolName, meta: entry.meta }));
return {
aborted,
timedOut,
promptError,
sessionIdUsed,
systemPromptReport,
messagesSnapshot,
assistantTexts,
toolMetas: toolMetasNormalized,
lastAssistant,
didSendViaMessagingTool: didSendViaMessagingTool(),
messagingToolSentTexts: getMessagingToolSentTexts(),
messagingToolSentTargets: getMessagingToolSentTargets(),
cloudCodeAssistFormatError: Boolean(
lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage),
),
};
} finally {
// Always tear down the session (and release the lock) before we leave this attempt.
sessionManager?.flushPendingToolResults?.();
session?.dispose();
await sessionLock.release();
}
} finally {
restoreSkillEnv?.();
process.chdir(prevCwd);
}
}