Added a standalone cache tracing module and wired it into the embedded
runner so you can capture message flow and the exact context sent to
Anthropic in a separate JSONL file.
What changed
- New tracing module: src/agents/cache-trace.ts (self‑contained,
env‑gated, writes JSONL, computes per‑message digests).
- Hook points in src/agents/pi-embedded-runner/run/attempt.ts: logs
stage snapshots (loaded/sanitized/limited/prompt/stream/after) and wraps
the
stream fn to record the real context.messages at send time.
How to enable
- CLAWDBOT_CACHE_TRACE=1 enables tracing.
- CLAWDBOT_CACHE_TRACE_FILE=~/.clawdbot/logs/cache-trace.jsonl
overrides output (default is
$CLAWDBOT_STATE_DIR/logs/cache-trace.jsonl).
- Optional filters:
- CLAWDBOT_CACHE_TRACE_MESSAGES=0 to omit full messages (still
logs digests).
- CLAWDBOT_CACHE_TRACE_PROMPT=0 to omit prompt text.
- CLAWDBOT_CACHE_TRACE_SYSTEM=0 to omit system prompt.
What you’ll see
- One JSON object per line with stage, messagesDigest, per‑message
messageFingerprints, and the actual messages if enabled.
- The most important line is stage: "stream:context" — that is the
exact payload pi‑mono is sending. If this diverges from earlier stages,
you’ve
found the mutation point.
774 lines
30 KiB
TypeScript
774 lines
30 KiB
TypeScript
import fs from "node:fs/promises";
|
|
import os from "node:os";
|
|
|
|
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
|
import type { AssistantMessage, ImageContent } from "@mariozechner/pi-ai";
|
|
import { streamSimple } from "@mariozechner/pi-ai";
|
|
import { createAgentSession, SessionManager, SettingsManager } from "@mariozechner/pi-coding-agent";
|
|
|
|
import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js";
|
|
import { listChannelSupportedActions } from "../../channel-tools.js";
|
|
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
|
|
import { getMachineDisplayName } from "../../../infra/machine-name.js";
|
|
import { resolveTelegramInlineButtonsScope } from "../../../telegram/inline-buttons.js";
|
|
import { resolveTelegramReactionLevel } from "../../../telegram/reaction-level.js";
|
|
import { normalizeMessageChannel } from "../../../utils/message-channel.js";
|
|
import { isReasoningTagProvider } from "../../../utils/provider-utils.js";
|
|
import { isSubagentSessionKey } from "../../../routing/session-key.js";
|
|
import { resolveUserPath } from "../../../utils.js";
|
|
import { createCacheTrace } from "../../cache-trace.js";
|
|
import { resolveClawdbotAgentDir } from "../../agent-paths.js";
|
|
import { resolveSessionAgentIds } from "../../agent-scope.js";
|
|
import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js";
|
|
import { resolveClawdbotDocsPath } from "../../docs-path.js";
|
|
import { resolveModelAuthMode } from "../../model-auth.js";
|
|
import {
|
|
isCloudCodeAssistFormatError,
|
|
resolveBootstrapMaxChars,
|
|
validateAnthropicTurns,
|
|
validateGeminiTurns,
|
|
} from "../../pi-embedded-helpers.js";
|
|
import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js";
|
|
import {
|
|
ensurePiCompactionReserveTokens,
|
|
resolveCompactionReserveTokensFloor,
|
|
} from "../../pi-settings.js";
|
|
import { createClawdbotCodingTools } from "../../pi-tools.js";
|
|
import { resolveSandboxContext } from "../../sandbox.js";
|
|
import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js";
|
|
import { acquireSessionWriteLock } from "../../session-write-lock.js";
|
|
import {
|
|
applySkillEnvOverrides,
|
|
applySkillEnvOverridesFromSnapshot,
|
|
loadWorkspaceSkillEntries,
|
|
resolveSkillsPromptForRun,
|
|
} from "../../skills.js";
|
|
import { buildSystemPromptReport } from "../../system-prompt-report.js";
|
|
import { resolveDefaultModelForAgent } from "../../model-selection.js";
|
|
|
|
import { isAbortError } from "../abort.js";
|
|
import { buildEmbeddedExtensionPaths } from "../extensions.js";
|
|
import { applyExtraParamsToAgent } from "../extra-params.js";
|
|
import {
|
|
logToolSchemasForGoogle,
|
|
sanitizeSessionHistory,
|
|
sanitizeToolsForGoogle,
|
|
} from "../google.js";
|
|
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js";
|
|
import { log } from "../logger.js";
|
|
import { buildModelAliasLines } from "../model.js";
|
|
import {
|
|
clearActiveEmbeddedRun,
|
|
type EmbeddedPiQueueHandle,
|
|
setActiveEmbeddedRun,
|
|
} from "../runs.js";
|
|
import { buildEmbeddedSandboxInfo } from "../sandbox-info.js";
|
|
import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js";
|
|
import { prepareSessionManagerForRun } from "../session-manager-init.js";
|
|
import { buildEmbeddedSystemPrompt, createSystemPromptOverride } from "../system-prompt.js";
|
|
import { splitSdkTools } from "../tool-split.js";
|
|
import { toClientToolDefinitions } from "../../pi-tool-definition-adapter.js";
|
|
import { buildSystemPromptParams } from "../../system-prompt-params.js";
|
|
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
|
|
import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js";
|
|
import { isTimeoutError } from "../../failover-error.js";
|
|
import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js";
|
|
import { MAX_IMAGE_BYTES } from "../../../media/constants.js";
|
|
import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js";
|
|
import { detectAndLoadPromptImages } from "./images.js";
|
|
|
|
export async function runEmbeddedAttempt(
|
|
params: EmbeddedRunAttemptParams,
|
|
): Promise<EmbeddedRunAttemptResult> {
|
|
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
|
|
const prevCwd = process.cwd();
|
|
const runAbortController = new AbortController();
|
|
|
|
log.debug(
|
|
`embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`,
|
|
);
|
|
|
|
await fs.mkdir(resolvedWorkspace, { recursive: true });
|
|
|
|
const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId;
|
|
const sandbox = await resolveSandboxContext({
|
|
config: params.config,
|
|
sessionKey: sandboxSessionKey,
|
|
workspaceDir: resolvedWorkspace,
|
|
});
|
|
const effectiveWorkspace = sandbox?.enabled
|
|
? sandbox.workspaceAccess === "rw"
|
|
? resolvedWorkspace
|
|
: sandbox.workspaceDir
|
|
: resolvedWorkspace;
|
|
await fs.mkdir(effectiveWorkspace, { recursive: true });
|
|
|
|
let restoreSkillEnv: (() => void) | undefined;
|
|
process.chdir(effectiveWorkspace);
|
|
try {
|
|
const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
|
|
const skillEntries = shouldLoadSkillEntries
|
|
? loadWorkspaceSkillEntries(effectiveWorkspace)
|
|
: [];
|
|
restoreSkillEnv = params.skillsSnapshot
|
|
? applySkillEnvOverridesFromSnapshot({
|
|
snapshot: params.skillsSnapshot,
|
|
config: params.config,
|
|
})
|
|
: applySkillEnvOverrides({
|
|
skills: skillEntries ?? [],
|
|
config: params.config,
|
|
});
|
|
|
|
const skillsPrompt = resolveSkillsPromptForRun({
|
|
skillsSnapshot: params.skillsSnapshot,
|
|
entries: shouldLoadSkillEntries ? skillEntries : undefined,
|
|
config: params.config,
|
|
workspaceDir: effectiveWorkspace,
|
|
});
|
|
|
|
const sessionLabel = params.sessionKey ?? params.sessionId;
|
|
const { bootstrapFiles: hookAdjustedBootstrapFiles, contextFiles } =
|
|
await resolveBootstrapContextForRun({
|
|
workspaceDir: effectiveWorkspace,
|
|
config: params.config,
|
|
sessionKey: params.sessionKey,
|
|
sessionId: params.sessionId,
|
|
warn: makeBootstrapWarn({ sessionLabel, warn: (message) => log.warn(message) }),
|
|
});
|
|
|
|
const agentDir = params.agentDir ?? resolveClawdbotAgentDir();
|
|
|
|
// Check if the model supports native image input
|
|
const modelHasVision = params.model.input?.includes("image") ?? false;
|
|
const toolsRaw = createClawdbotCodingTools({
|
|
exec: {
|
|
...params.execOverrides,
|
|
elevated: params.bashElevated,
|
|
},
|
|
sandbox,
|
|
messageProvider: params.messageChannel ?? params.messageProvider,
|
|
agentAccountId: params.agentAccountId,
|
|
messageTo: params.messageTo,
|
|
messageThreadId: params.messageThreadId,
|
|
sessionKey: params.sessionKey ?? params.sessionId,
|
|
agentDir,
|
|
workspaceDir: effectiveWorkspace,
|
|
config: params.config,
|
|
abortSignal: runAbortController.signal,
|
|
modelProvider: params.model.provider,
|
|
modelId: params.modelId,
|
|
modelAuthMode: resolveModelAuthMode(params.model.provider, params.config),
|
|
currentChannelId: params.currentChannelId,
|
|
currentThreadTs: params.currentThreadTs,
|
|
replyToMode: params.replyToMode,
|
|
hasRepliedRef: params.hasRepliedRef,
|
|
modelHasVision,
|
|
});
|
|
const tools = sanitizeToolsForGoogle({ tools: toolsRaw, provider: params.provider });
|
|
logToolSchemasForGoogle({ tools, provider: params.provider });
|
|
|
|
const machineName = await getMachineDisplayName();
|
|
const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider);
|
|
let runtimeCapabilities = runtimeChannel
|
|
? (resolveChannelCapabilities({
|
|
cfg: params.config,
|
|
channel: runtimeChannel,
|
|
accountId: params.agentAccountId,
|
|
}) ?? [])
|
|
: undefined;
|
|
if (runtimeChannel === "telegram" && params.config) {
|
|
const inlineButtonsScope = resolveTelegramInlineButtonsScope({
|
|
cfg: params.config,
|
|
accountId: params.agentAccountId ?? undefined,
|
|
});
|
|
if (inlineButtonsScope !== "off") {
|
|
if (!runtimeCapabilities) runtimeCapabilities = [];
|
|
if (
|
|
!runtimeCapabilities.some((cap) => String(cap).trim().toLowerCase() === "inlinebuttons")
|
|
) {
|
|
runtimeCapabilities.push("inlineButtons");
|
|
}
|
|
}
|
|
}
|
|
const reactionGuidance =
|
|
runtimeChannel === "telegram" && params.config
|
|
? (() => {
|
|
const resolved = resolveTelegramReactionLevel({
|
|
cfg: params.config,
|
|
accountId: params.agentAccountId ?? undefined,
|
|
});
|
|
const level = resolved.agentReactionGuidance;
|
|
return level ? { level, channel: "Telegram" } : undefined;
|
|
})()
|
|
: undefined;
|
|
const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({
|
|
sessionKey: params.sessionKey,
|
|
config: params.config,
|
|
});
|
|
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated);
|
|
const reasoningTagHint = isReasoningTagProvider(params.provider);
|
|
// Resolve channel-specific message actions for system prompt
|
|
const channelActions = runtimeChannel
|
|
? listChannelSupportedActions({
|
|
cfg: params.config,
|
|
channel: runtimeChannel,
|
|
})
|
|
: undefined;
|
|
|
|
const defaultModelRef = resolveDefaultModelForAgent({
|
|
cfg: params.config ?? {},
|
|
agentId: sessionAgentId,
|
|
});
|
|
const defaultModelLabel = `${defaultModelRef.provider}/${defaultModelRef.model}`;
|
|
const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({
|
|
config: params.config,
|
|
agentId: sessionAgentId,
|
|
runtime: {
|
|
host: machineName,
|
|
os: `${os.type()} ${os.release()}`,
|
|
arch: os.arch(),
|
|
node: process.version,
|
|
model: `${params.provider}/${params.modelId}`,
|
|
defaultModel: defaultModelLabel,
|
|
channel: runtimeChannel,
|
|
capabilities: runtimeCapabilities,
|
|
channelActions,
|
|
},
|
|
});
|
|
const isDefaultAgent = sessionAgentId === defaultAgentId;
|
|
const promptMode = isSubagentSessionKey(params.sessionKey) ? "minimal" : "full";
|
|
const docsPath = await resolveClawdbotDocsPath({
|
|
workspaceDir: effectiveWorkspace,
|
|
argv1: process.argv[1],
|
|
cwd: process.cwd(),
|
|
moduleUrl: import.meta.url,
|
|
});
|
|
|
|
const appendPrompt = buildEmbeddedSystemPrompt({
|
|
workspaceDir: effectiveWorkspace,
|
|
defaultThinkLevel: params.thinkLevel,
|
|
reasoningLevel: params.reasoningLevel ?? "off",
|
|
extraSystemPrompt: params.extraSystemPrompt,
|
|
ownerNumbers: params.ownerNumbers,
|
|
reasoningTagHint,
|
|
heartbeatPrompt: isDefaultAgent
|
|
? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt)
|
|
: undefined,
|
|
skillsPrompt,
|
|
docsPath: docsPath ?? undefined,
|
|
reactionGuidance,
|
|
promptMode,
|
|
runtimeInfo,
|
|
sandboxInfo,
|
|
tools,
|
|
modelAliasLines: buildModelAliasLines(params.config),
|
|
userTimezone,
|
|
userTime,
|
|
userTimeFormat,
|
|
contextFiles,
|
|
});
|
|
const systemPromptReport = buildSystemPromptReport({
|
|
source: "run",
|
|
generatedAt: Date.now(),
|
|
sessionId: params.sessionId,
|
|
sessionKey: params.sessionKey,
|
|
provider: params.provider,
|
|
model: params.modelId,
|
|
workspaceDir: effectiveWorkspace,
|
|
bootstrapMaxChars: resolveBootstrapMaxChars(params.config),
|
|
sandbox: (() => {
|
|
const runtime = resolveSandboxRuntimeStatus({
|
|
cfg: params.config,
|
|
sessionKey: params.sessionKey ?? params.sessionId,
|
|
});
|
|
return { mode: runtime.mode, sandboxed: runtime.sandboxed };
|
|
})(),
|
|
systemPrompt: appendPrompt,
|
|
bootstrapFiles: hookAdjustedBootstrapFiles,
|
|
injectedFiles: contextFiles,
|
|
skillsPrompt,
|
|
tools,
|
|
});
|
|
const systemPrompt = createSystemPromptOverride(appendPrompt);
|
|
|
|
const sessionLock = await acquireSessionWriteLock({
|
|
sessionFile: params.sessionFile,
|
|
});
|
|
|
|
let sessionManager: ReturnType<typeof guardSessionManager> | undefined;
|
|
let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined;
|
|
try {
|
|
const hadSessionFile = await fs
|
|
.stat(params.sessionFile)
|
|
.then(() => true)
|
|
.catch(() => false);
|
|
|
|
await prewarmSessionFile(params.sessionFile);
|
|
sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), {
|
|
agentId: sessionAgentId,
|
|
sessionKey: params.sessionKey,
|
|
});
|
|
trackSessionManagerAccess(params.sessionFile);
|
|
|
|
await prepareSessionManagerForRun({
|
|
sessionManager,
|
|
sessionFile: params.sessionFile,
|
|
hadSessionFile,
|
|
sessionId: params.sessionId,
|
|
cwd: effectiveWorkspace,
|
|
});
|
|
|
|
const settingsManager = SettingsManager.create(effectiveWorkspace, agentDir);
|
|
ensurePiCompactionReserveTokens({
|
|
settingsManager,
|
|
minReserveTokens: resolveCompactionReserveTokensFloor(params.config),
|
|
});
|
|
|
|
const additionalExtensionPaths = buildEmbeddedExtensionPaths({
|
|
cfg: params.config,
|
|
sessionManager,
|
|
provider: params.provider,
|
|
modelId: params.modelId,
|
|
model: params.model,
|
|
});
|
|
|
|
const { builtInTools, customTools } = splitSdkTools({
|
|
tools,
|
|
sandboxEnabled: !!sandbox?.enabled,
|
|
});
|
|
|
|
// Add client tools (OpenResponses hosted tools) to customTools
|
|
let clientToolCallDetected: { name: string; params: Record<string, unknown> } | null = null;
|
|
const clientToolDefs = params.clientTools
|
|
? toClientToolDefinitions(params.clientTools, (toolName, toolParams) => {
|
|
clientToolCallDetected = { name: toolName, params: toolParams };
|
|
})
|
|
: [];
|
|
|
|
const allCustomTools = [...customTools, ...clientToolDefs];
|
|
|
|
({ session } = await createAgentSession({
|
|
cwd: resolvedWorkspace,
|
|
agentDir,
|
|
authStorage: params.authStorage,
|
|
modelRegistry: params.modelRegistry,
|
|
model: params.model,
|
|
thinkingLevel: mapThinkingLevel(params.thinkLevel),
|
|
systemPrompt,
|
|
tools: builtInTools,
|
|
customTools: allCustomTools,
|
|
sessionManager,
|
|
settingsManager,
|
|
skills: [],
|
|
contextFiles: [],
|
|
additionalExtensionPaths,
|
|
}));
|
|
if (!session) {
|
|
throw new Error("Embedded agent session missing");
|
|
}
|
|
const activeSession = session;
|
|
const cacheTrace = createCacheTrace({
|
|
cfg: params.config,
|
|
env: process.env,
|
|
runId: params.runId,
|
|
sessionId: activeSession.sessionId,
|
|
sessionKey: params.sessionKey,
|
|
provider: params.provider,
|
|
modelId: params.modelId,
|
|
modelApi: params.model.api,
|
|
workspaceDir: params.workspaceDir,
|
|
});
|
|
|
|
// Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai.
|
|
activeSession.agent.streamFn = streamSimple;
|
|
|
|
applyExtraParamsToAgent(
|
|
activeSession.agent,
|
|
params.config,
|
|
params.provider,
|
|
params.modelId,
|
|
params.streamParams,
|
|
);
|
|
|
|
if (cacheTrace) {
|
|
cacheTrace.recordStage("session:loaded", {
|
|
messages: activeSession.messages,
|
|
system: systemPrompt,
|
|
note: "after session create",
|
|
});
|
|
activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn);
|
|
}
|
|
|
|
try {
|
|
const prior = await sanitizeSessionHistory({
|
|
messages: activeSession.messages,
|
|
modelApi: params.model.api,
|
|
modelId: params.modelId,
|
|
provider: params.provider,
|
|
sessionManager,
|
|
sessionId: params.sessionId,
|
|
});
|
|
cacheTrace?.recordStage("session:sanitized", { messages: prior });
|
|
const validatedGemini = validateGeminiTurns(prior);
|
|
const validated = validateAnthropicTurns(validatedGemini);
|
|
const limited = limitHistoryTurns(
|
|
validated,
|
|
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
|
|
);
|
|
cacheTrace?.recordStage("session:limited", { messages: limited });
|
|
if (limited.length > 0) {
|
|
activeSession.agent.replaceMessages(limited);
|
|
}
|
|
} catch (err) {
|
|
sessionManager.flushPendingToolResults?.();
|
|
activeSession.dispose();
|
|
throw err;
|
|
}
|
|
|
|
let aborted = Boolean(params.abortSignal?.aborted);
|
|
let timedOut = false;
|
|
const getAbortReason = (signal: AbortSignal): unknown =>
|
|
"reason" in signal ? (signal as { reason?: unknown }).reason : undefined;
|
|
const makeTimeoutAbortReason = (): Error => {
|
|
const err = new Error("request timed out");
|
|
err.name = "TimeoutError";
|
|
return err;
|
|
};
|
|
const makeAbortError = (signal: AbortSignal): Error => {
|
|
const reason = getAbortReason(signal);
|
|
const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted");
|
|
err.name = "AbortError";
|
|
return err;
|
|
};
|
|
const abortRun = (isTimeout = false, reason?: unknown) => {
|
|
aborted = true;
|
|
if (isTimeout) timedOut = true;
|
|
if (isTimeout) {
|
|
runAbortController.abort(reason ?? makeTimeoutAbortReason());
|
|
} else {
|
|
runAbortController.abort(reason);
|
|
}
|
|
void activeSession.abort();
|
|
};
|
|
const abortable = <T>(promise: Promise<T>): Promise<T> => {
|
|
const signal = runAbortController.signal;
|
|
if (signal.aborted) {
|
|
return Promise.reject(makeAbortError(signal));
|
|
}
|
|
return new Promise<T>((resolve, reject) => {
|
|
const onAbort = () => {
|
|
signal.removeEventListener("abort", onAbort);
|
|
reject(makeAbortError(signal));
|
|
};
|
|
signal.addEventListener("abort", onAbort, { once: true });
|
|
promise.then(
|
|
(value) => {
|
|
signal.removeEventListener("abort", onAbort);
|
|
resolve(value);
|
|
},
|
|
(err) => {
|
|
signal.removeEventListener("abort", onAbort);
|
|
reject(err);
|
|
},
|
|
);
|
|
});
|
|
};
|
|
|
|
const subscription = subscribeEmbeddedPiSession({
|
|
session: activeSession,
|
|
runId: params.runId,
|
|
verboseLevel: params.verboseLevel,
|
|
reasoningMode: params.reasoningLevel ?? "off",
|
|
toolResultFormat: params.toolResultFormat,
|
|
shouldEmitToolResult: params.shouldEmitToolResult,
|
|
shouldEmitToolOutput: params.shouldEmitToolOutput,
|
|
onToolResult: params.onToolResult,
|
|
onReasoningStream: params.onReasoningStream,
|
|
onBlockReply: params.onBlockReply,
|
|
onBlockReplyFlush: params.onBlockReplyFlush,
|
|
blockReplyBreak: params.blockReplyBreak,
|
|
blockReplyChunking: params.blockReplyChunking,
|
|
onPartialReply: params.onPartialReply,
|
|
onAssistantMessageStart: params.onAssistantMessageStart,
|
|
onAgentEvent: params.onAgentEvent,
|
|
enforceFinalTag: params.enforceFinalTag,
|
|
});
|
|
|
|
const {
|
|
assistantTexts,
|
|
toolMetas,
|
|
unsubscribe,
|
|
waitForCompactionRetry,
|
|
getMessagingToolSentTexts,
|
|
getMessagingToolSentTargets,
|
|
didSendViaMessagingTool,
|
|
getLastToolError,
|
|
} = subscription;
|
|
|
|
const queueHandle: EmbeddedPiQueueHandle = {
|
|
queueMessage: async (text: string) => {
|
|
await activeSession.steer(text);
|
|
},
|
|
isStreaming: () => activeSession.isStreaming,
|
|
isCompacting: () => subscription.isCompacting(),
|
|
abort: abortRun,
|
|
};
|
|
setActiveEmbeddedRun(params.sessionId, queueHandle);
|
|
|
|
let abortWarnTimer: NodeJS.Timeout | undefined;
|
|
const abortTimer = setTimeout(
|
|
() => {
|
|
log.warn(
|
|
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
|
|
);
|
|
abortRun(true);
|
|
if (!abortWarnTimer) {
|
|
abortWarnTimer = setTimeout(() => {
|
|
if (!activeSession.isStreaming) return;
|
|
log.warn(
|
|
`embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`,
|
|
);
|
|
}, 10_000);
|
|
}
|
|
},
|
|
Math.max(1, params.timeoutMs),
|
|
);
|
|
|
|
let messagesSnapshot: AgentMessage[] = [];
|
|
let sessionIdUsed = activeSession.sessionId;
|
|
const onAbort = () => {
|
|
const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined;
|
|
const timeout = reason ? isTimeoutError(reason) : false;
|
|
abortRun(timeout, reason);
|
|
};
|
|
if (params.abortSignal) {
|
|
if (params.abortSignal.aborted) {
|
|
onAbort();
|
|
} else {
|
|
params.abortSignal.addEventListener("abort", onAbort, {
|
|
once: true,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Get hook runner once for both before_agent_start and agent_end hooks
|
|
const hookRunner = getGlobalHookRunner();
|
|
|
|
let promptError: unknown = null;
|
|
try {
|
|
const promptStartedAt = Date.now();
|
|
|
|
// Run before_agent_start hooks to allow plugins to inject context
|
|
let effectivePrompt = params.prompt;
|
|
if (hookRunner?.hasHooks("before_agent_start")) {
|
|
try {
|
|
const hookResult = await hookRunner.runBeforeAgentStart(
|
|
{
|
|
prompt: params.prompt,
|
|
messages: activeSession.messages,
|
|
},
|
|
{
|
|
agentId: params.sessionKey?.split(":")[0] ?? "main",
|
|
sessionKey: params.sessionKey,
|
|
workspaceDir: params.workspaceDir,
|
|
messageProvider: params.messageProvider ?? undefined,
|
|
},
|
|
);
|
|
if (hookResult?.prependContext) {
|
|
effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`;
|
|
log.debug(
|
|
`hooks: prepended context to prompt (${hookResult.prependContext.length} chars)`,
|
|
);
|
|
}
|
|
} catch (hookErr) {
|
|
log.warn(`before_agent_start hook failed: ${String(hookErr)}`);
|
|
}
|
|
}
|
|
|
|
log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`);
|
|
cacheTrace?.recordStage("prompt:before", {
|
|
prompt: effectivePrompt,
|
|
messages: activeSession.messages,
|
|
});
|
|
|
|
// Repair orphaned trailing user messages so new prompts don't violate role ordering.
|
|
const leafEntry = sessionManager.getLeafEntry();
|
|
if (leafEntry?.type === "message" && leafEntry.message.role === "user") {
|
|
if (leafEntry.parentId) {
|
|
sessionManager.branch(leafEntry.parentId);
|
|
} else {
|
|
sessionManager.resetLeaf();
|
|
}
|
|
const sessionContext = sessionManager.buildSessionContext();
|
|
activeSession.agent.replaceMessages(sessionContext.messages);
|
|
log.warn(
|
|
`Removed orphaned user message to prevent consecutive user turns. ` +
|
|
`runId=${params.runId} sessionId=${params.sessionId}`,
|
|
);
|
|
}
|
|
|
|
try {
|
|
// Detect and load images referenced in the prompt for vision-capable models.
|
|
// This eliminates the need for an explicit "view" tool call by injecting
|
|
// images directly into the prompt when the model supports it.
|
|
// Also scans conversation history to enable follow-up questions about earlier images.
|
|
const imageResult = await detectAndLoadPromptImages({
|
|
prompt: effectivePrompt,
|
|
workspaceDir: effectiveWorkspace,
|
|
model: params.model,
|
|
existingImages: params.images,
|
|
historyMessages: activeSession.messages,
|
|
maxBytes: MAX_IMAGE_BYTES,
|
|
// Enforce sandbox path restrictions when sandbox is enabled
|
|
sandboxRoot: sandbox?.enabled ? sandbox.workspaceDir : undefined,
|
|
});
|
|
|
|
// Inject history images into their original message positions.
|
|
// This ensures the model sees images in context (e.g., "compare to the first image").
|
|
if (imageResult.historyImagesByIndex.size > 0) {
|
|
for (const [msgIndex, images] of imageResult.historyImagesByIndex) {
|
|
// Bounds check: ensure index is valid before accessing
|
|
if (msgIndex < 0 || msgIndex >= activeSession.messages.length) continue;
|
|
const msg = activeSession.messages[msgIndex];
|
|
if (msg && msg.role === "user") {
|
|
// Convert string content to array format if needed
|
|
if (typeof msg.content === "string") {
|
|
msg.content = [{ type: "text", text: msg.content }];
|
|
}
|
|
if (Array.isArray(msg.content)) {
|
|
// Check for existing image content to avoid duplicates across turns
|
|
const existingImageData = new Set(
|
|
msg.content
|
|
.filter(
|
|
(c): c is ImageContent =>
|
|
c != null &&
|
|
typeof c === "object" &&
|
|
c.type === "image" &&
|
|
typeof c.data === "string",
|
|
)
|
|
.map((c) => c.data),
|
|
);
|
|
for (const img of images) {
|
|
// Only add if this image isn't already in the message
|
|
if (!existingImageData.has(img.data)) {
|
|
msg.content.push(img);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
cacheTrace?.recordStage("prompt:images", {
|
|
prompt: effectivePrompt,
|
|
messages: activeSession.messages,
|
|
note: `images: prompt=${imageResult.images.length} history=${imageResult.historyImagesByIndex.size}`,
|
|
});
|
|
|
|
// Only pass images option if there are actually images to pass
|
|
// This avoids potential issues with models that don't expect the images parameter
|
|
if (imageResult.images.length > 0) {
|
|
await abortable(activeSession.prompt(effectivePrompt, { images: imageResult.images }));
|
|
} else {
|
|
await abortable(activeSession.prompt(effectivePrompt));
|
|
}
|
|
} catch (err) {
|
|
promptError = err;
|
|
} finally {
|
|
log.debug(
|
|
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
|
|
);
|
|
}
|
|
|
|
try {
|
|
await waitForCompactionRetry();
|
|
} catch (err) {
|
|
if (isAbortError(err)) {
|
|
if (!promptError) promptError = err;
|
|
} else {
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
messagesSnapshot = activeSession.messages.slice();
|
|
sessionIdUsed = activeSession.sessionId;
|
|
cacheTrace?.recordStage("session:after", {
|
|
messages: messagesSnapshot,
|
|
note: promptError ? "prompt error" : undefined,
|
|
});
|
|
|
|
// Run agent_end hooks to allow plugins to analyze the conversation
|
|
// This is fire-and-forget, so we don't await
|
|
if (hookRunner?.hasHooks("agent_end")) {
|
|
hookRunner
|
|
.runAgentEnd(
|
|
{
|
|
messages: messagesSnapshot,
|
|
success: !aborted && !promptError,
|
|
error: promptError ? describeUnknownError(promptError) : undefined,
|
|
durationMs: Date.now() - promptStartedAt,
|
|
},
|
|
{
|
|
agentId: params.sessionKey?.split(":")[0] ?? "main",
|
|
sessionKey: params.sessionKey,
|
|
workspaceDir: params.workspaceDir,
|
|
messageProvider: params.messageProvider ?? undefined,
|
|
},
|
|
)
|
|
.catch((err) => {
|
|
log.warn(`agent_end hook failed: ${err}`);
|
|
});
|
|
}
|
|
} finally {
|
|
clearTimeout(abortTimer);
|
|
if (abortWarnTimer) clearTimeout(abortWarnTimer);
|
|
unsubscribe();
|
|
clearActiveEmbeddedRun(params.sessionId, queueHandle);
|
|
params.abortSignal?.removeEventListener?.("abort", onAbort);
|
|
}
|
|
|
|
const lastAssistant = messagesSnapshot
|
|
.slice()
|
|
.reverse()
|
|
.find((m) => (m as AgentMessage)?.role === "assistant") as AssistantMessage | undefined;
|
|
|
|
const toolMetasNormalized = toolMetas
|
|
.filter(
|
|
(entry): entry is { toolName: string; meta?: string } =>
|
|
typeof entry.toolName === "string" && entry.toolName.trim().length > 0,
|
|
)
|
|
.map((entry) => ({ toolName: entry.toolName, meta: entry.meta }));
|
|
|
|
return {
|
|
aborted,
|
|
timedOut,
|
|
promptError,
|
|
sessionIdUsed,
|
|
systemPromptReport,
|
|
messagesSnapshot,
|
|
assistantTexts,
|
|
toolMetas: toolMetasNormalized,
|
|
lastAssistant,
|
|
lastToolError: getLastToolError?.(),
|
|
didSendViaMessagingTool: didSendViaMessagingTool(),
|
|
messagingToolSentTexts: getMessagingToolSentTexts(),
|
|
messagingToolSentTargets: getMessagingToolSentTargets(),
|
|
cloudCodeAssistFormatError: Boolean(
|
|
lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage),
|
|
),
|
|
// Client tool call detected (OpenResponses hosted tools)
|
|
clientToolCall: clientToolCallDetected ?? undefined,
|
|
};
|
|
} finally {
|
|
// Always tear down the session (and release the lock) before we leave this attempt.
|
|
sessionManager?.flushPendingToolResults?.();
|
|
session?.dispose();
|
|
await sessionLock.release();
|
|
}
|
|
} finally {
|
|
restoreSkillEnv?.();
|
|
process.chdir(prevCwd);
|
|
}
|
|
}
|