diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index db64c6048..47b4905a8 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -68,6 +68,7 @@ import { } from "./model-auth.js"; import { ensureClawdbotModelsJson } from "./models-config.js"; import type { MessagingToolSend } from "./pi-embedded-messaging.js"; +import { acquireSessionWriteLock } from "./session-write-lock.js"; export type { MessagingToolSend } from "./pi-embedded-messaging.js"; @@ -952,70 +953,79 @@ export async function compactEmbeddedPiSession(params: { }); const systemPrompt = createSystemPromptOverride(appendPrompt); - // Pre-warm session file to bring it into OS page cache - await prewarmSessionFile(params.sessionFile); - const sessionManager = SessionManager.open(params.sessionFile); - trackSessionManagerAccess(params.sessionFile); - const settingsManager = SettingsManager.create( - effectiveWorkspace, - agentDir, - ); - const pruning = buildContextPruningExtension({ - cfg: params.config, - sessionManager, - provider, - modelId, - model, + const sessionLock = await acquireSessionWriteLock({ + sessionFile: params.sessionFile, }); - const additionalExtensionPaths = pruning.additionalExtensionPaths; - - const { builtInTools, customTools } = splitSdkTools({ - tools, - sandboxEnabled: !!sandbox?.enabled, - }); - - let session: Awaited>["session"]; - ({ session } = await createAgentSession({ - cwd: resolvedWorkspace, - agentDir, - authStorage, - modelRegistry, - model, - thinkingLevel: mapThinkingLevel(params.thinkLevel), - systemPrompt, - tools: builtInTools, - customTools, - sessionManager, - settingsManager, - skills: [], - contextFiles: [], - additionalExtensionPaths, - })); - try { - const prior = await sanitizeSessionHistory({ - messages: session.messages, - modelApi: model.api, + // Pre-warm session file to bring it into OS page cache + await prewarmSessionFile(params.sessionFile); + const sessionManager = SessionManager.open(params.sessionFile); + trackSessionManagerAccess(params.sessionFile); + const settingsManager = SettingsManager.create( + effectiveWorkspace, + agentDir, + ); + const pruning = buildContextPruningExtension({ + cfg: params.config, sessionManager, - sessionId: params.sessionId, + provider, + modelId, + model, }); - const validated = validateGeminiTurns(prior); - if (validated.length > 0) { - session.agent.replaceMessages(validated); + const additionalExtensionPaths = pruning.additionalExtensionPaths; + + const { builtInTools, customTools } = splitSdkTools({ + tools, + sandboxEnabled: !!sandbox?.enabled, + }); + + let session: Awaited< + ReturnType + >["session"]; + ({ session } = await createAgentSession({ + cwd: resolvedWorkspace, + agentDir, + authStorage, + modelRegistry, + model, + thinkingLevel: mapThinkingLevel(params.thinkLevel), + systemPrompt, + tools: builtInTools, + customTools, + sessionManager, + settingsManager, + skills: [], + contextFiles: [], + additionalExtensionPaths, + })); + + try { + const prior = await sanitizeSessionHistory({ + messages: session.messages, + modelApi: model.api, + sessionManager, + sessionId: params.sessionId, + }); + const validated = validateGeminiTurns(prior); + if (validated.length > 0) { + session.agent.replaceMessages(validated); + } + const result = await session.compact(params.customInstructions); + return { + ok: true, + compacted: true, + result: { + summary: result.summary, + firstKeptEntryId: result.firstKeptEntryId, + tokensBefore: result.tokensBefore, + details: result.details, + }, + }; + } finally { + session.dispose(); } - const result = await session.compact(params.customInstructions); - return { - ok: true, - compacted: true, - result: { - summary: result.summary, - firstKeptEntryId: result.firstKeptEntryId, - tokensBefore: result.tokensBefore, - details: result.details, - }, - }; } finally { - session.dispose(); + await sessionLock.release(); } } catch (err) { return { @@ -1333,6 +1343,9 @@ export async function runEmbeddedPiAgent(params: { }); const systemPrompt = createSystemPromptOverride(appendPrompt); + const sessionLock = await acquireSessionWriteLock({ + sessionFile: params.sessionFile, + }); // Pre-warm session file to bring it into OS page cache await prewarmSessionFile(params.sessionFile); const sessionManager = SessionManager.open(params.sessionFile); @@ -1390,6 +1403,7 @@ export async function runEmbeddedPiAgent(params: { } } catch (err) { session.dispose(); + await sessionLock.release(); throw err; } let aborted = Boolean(params.abortSignal?.aborted); @@ -1419,6 +1433,7 @@ export async function runEmbeddedPiAgent(params: { }); } catch (err) { session.dispose(); + await sessionLock.release(); throw err; } const { @@ -1515,6 +1530,7 @@ export async function runEmbeddedPiAgent(params: { notifyEmbeddedRunEnded(params.sessionId); } session.dispose(); + await sessionLock.release(); params.abortSignal?.removeEventListener?.("abort", onAbort); } if (promptError && !aborted) { diff --git a/src/agents/session-transcript-repair.test.ts b/src/agents/session-transcript-repair.test.ts index acb92fa31..7c6819ac4 100644 --- a/src/agents/session-transcript-repair.test.ts +++ b/src/agents/session-transcript-repair.test.ts @@ -92,4 +92,25 @@ describe("sanitizeToolUseResultPairing", () => { expect(results).toHaveLength(1); expect(results[0]?.toolCallId).toBe("call_1"); }); + + it("drops orphan tool results that do not match any tool call", () => { + const input = [ + { role: "user", content: "hello" }, + { + role: "toolResult", + toolCallId: "call_orphan", + toolName: "read", + content: [{ type: "text", text: "orphan" }], + isError: false, + }, + { + role: "assistant", + content: [{ type: "text", text: "ok" }], + }, + ] satisfies AgentMessage[]; + + const out = sanitizeToolUseResultPairing(input); + expect(out.some((m) => m.role === "toolResult")).toBe(false); + expect(out.map((m) => m.role)).toEqual(["user", "assistant"]); + }); }); diff --git a/src/agents/session-transcript-repair.ts b/src/agents/session-transcript-repair.ts index 1c03c1add..7e7f86fe8 100644 --- a/src/agents/session-transcript-repair.ts +++ b/src/agents/session-transcript-repair.ts @@ -90,11 +90,10 @@ export function sanitizeToolUseResultPairing( const role = (msg as { role?: unknown }).role; if (role !== "assistant") { - if (role === "toolResult") { - pushToolResult(msg as Extract); - } else { - out.push(msg); - } + // Tool results must only appear directly after the matching assistant tool call turn. + // Any "free-floating" toolResult entries in session history can make strict providers + // (Anthropic-compatible APIs, MiniMax, Cloud Code Assist) reject the entire request. + if (role !== "toolResult") out.push(msg); continue; } @@ -141,7 +140,8 @@ export function sanitizeToolUseResultPairing( } } - remainder.push(next); + // Drop tool results that don't match the current assistant tool calls. + if (nextRole !== "toolResult") remainder.push(next); } out.push(msg); @@ -159,11 +159,6 @@ export function sanitizeToolUseResultPairing( out.push(rem); continue; } - const remRole = (rem as { role?: unknown }).role; - if (remRole === "toolResult") { - pushToolResult(rem as Extract); - continue; - } out.push(rem); } i = j - 1; diff --git a/src/agents/session-write-lock.ts b/src/agents/session-write-lock.ts new file mode 100644 index 000000000..fd0283ddf --- /dev/null +++ b/src/agents/session-write-lock.ts @@ -0,0 +1,119 @@ +import fs from "node:fs/promises"; + +type LockFilePayload = { + pid: number; + createdAt: string; +}; + +type HeldLock = { + count: number; + handle: fs.FileHandle; + lockPath: string; +}; + +const HELD_LOCKS = new Map(); + +function isAlive(pid: number): boolean { + if (!Number.isFinite(pid) || pid <= 0) return false; + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +async function readLockPayload( + lockPath: string, +): Promise { + try { + const raw = await fs.readFile(lockPath, "utf8"); + const parsed = JSON.parse(raw) as Partial; + if (typeof parsed.pid !== "number") return null; + if (typeof parsed.createdAt !== "string") return null; + return { pid: parsed.pid, createdAt: parsed.createdAt }; + } catch { + return null; + } +} + +export async function acquireSessionWriteLock(params: { + sessionFile: string; + timeoutMs?: number; + staleMs?: number; +}): Promise<{ + release: () => Promise; +}> { + const timeoutMs = params.timeoutMs ?? 10_000; + const staleMs = params.staleMs ?? 30 * 60 * 1000; + const sessionFile = params.sessionFile; + const lockPath = `${sessionFile}.lock`; + + const held = HELD_LOCKS.get(sessionFile); + if (held) { + held.count += 1; + return { + release: async () => { + const current = HELD_LOCKS.get(sessionFile); + if (!current) return; + current.count -= 1; + if (current.count > 0) return; + HELD_LOCKS.delete(sessionFile); + await current.handle.close(); + await fs.rm(current.lockPath, { force: true }); + }, + }; + } + + const startedAt = Date.now(); + let attempt = 0; + while (Date.now() - startedAt < timeoutMs) { + attempt += 1; + try { + const handle = await fs.open(lockPath, "wx"); + await handle.writeFile( + JSON.stringify( + { pid: process.pid, createdAt: new Date().toISOString() }, + null, + 2, + ), + "utf8", + ); + HELD_LOCKS.set(sessionFile, { count: 1, handle, lockPath }); + return { + release: async () => { + const current = HELD_LOCKS.get(sessionFile); + if (!current) return; + current.count -= 1; + if (current.count > 0) return; + HELD_LOCKS.delete(sessionFile); + await current.handle.close(); + await fs.rm(current.lockPath, { force: true }); + }, + }; + } catch (err) { + const code = (err as { code?: unknown }).code; + if (code !== "EEXIST") throw err; + const payload = await readLockPayload(lockPath); + const createdAt = payload?.createdAt + ? Date.parse(payload.createdAt) + : NaN; + const stale = + !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs; + const alive = payload?.pid ? isAlive(payload.pid) : false; + if (stale || !alive) { + await fs.rm(lockPath, { force: true }); + continue; + } + + const delay = Math.min(1000, 50 * attempt); + await new Promise((r) => setTimeout(r, delay)); + } + } + + const payload = await readLockPayload(lockPath); + const owner = payload?.pid ? `pid=${payload.pid}` : "unknown"; + throw new Error( + `session file locked (timeout ${timeoutMs}ms): ${owner} ${lockPath}`, + ); +}