fix(agents): enforce single-writer session files

This commit is contained in:
Peter Steinberger
2026-01-11 02:24:25 +00:00
parent 3a113b7752
commit 6668805aca
4 changed files with 220 additions and 69 deletions

View File

@@ -68,6 +68,7 @@ import {
} from "./model-auth.js"; } from "./model-auth.js";
import { ensureClawdbotModelsJson } from "./models-config.js"; import { ensureClawdbotModelsJson } from "./models-config.js";
import type { MessagingToolSend } from "./pi-embedded-messaging.js"; import type { MessagingToolSend } from "./pi-embedded-messaging.js";
import { acquireSessionWriteLock } from "./session-write-lock.js";
export type { MessagingToolSend } from "./pi-embedded-messaging.js"; export type { MessagingToolSend } from "./pi-embedded-messaging.js";
@@ -952,70 +953,79 @@ export async function compactEmbeddedPiSession(params: {
}); });
const systemPrompt = createSystemPromptOverride(appendPrompt); const systemPrompt = createSystemPromptOverride(appendPrompt);
// Pre-warm session file to bring it into OS page cache const sessionLock = await acquireSessionWriteLock({
await prewarmSessionFile(params.sessionFile); sessionFile: params.sessionFile,
const sessionManager = SessionManager.open(params.sessionFile);
trackSessionManagerAccess(params.sessionFile);
const settingsManager = SettingsManager.create(
effectiveWorkspace,
agentDir,
);
const pruning = buildContextPruningExtension({
cfg: params.config,
sessionManager,
provider,
modelId,
model,
}); });
const additionalExtensionPaths = pruning.additionalExtensionPaths;
const { builtInTools, customTools } = splitSdkTools({
tools,
sandboxEnabled: !!sandbox?.enabled,
});
let session: Awaited<ReturnType<typeof createAgentSession>>["session"];
({ session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage,
modelRegistry,
model,
thinkingLevel: mapThinkingLevel(params.thinkLevel),
systemPrompt,
tools: builtInTools,
customTools,
sessionManager,
settingsManager,
skills: [],
contextFiles: [],
additionalExtensionPaths,
}));
try { try {
const prior = await sanitizeSessionHistory({ // Pre-warm session file to bring it into OS page cache
messages: session.messages, await prewarmSessionFile(params.sessionFile);
modelApi: model.api, const sessionManager = SessionManager.open(params.sessionFile);
trackSessionManagerAccess(params.sessionFile);
const settingsManager = SettingsManager.create(
effectiveWorkspace,
agentDir,
);
const pruning = buildContextPruningExtension({
cfg: params.config,
sessionManager, sessionManager,
sessionId: params.sessionId, provider,
modelId,
model,
}); });
const validated = validateGeminiTurns(prior); const additionalExtensionPaths = pruning.additionalExtensionPaths;
if (validated.length > 0) {
session.agent.replaceMessages(validated); const { builtInTools, customTools } = splitSdkTools({
tools,
sandboxEnabled: !!sandbox?.enabled,
});
let session: Awaited<
ReturnType<typeof createAgentSession>
>["session"];
({ session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage,
modelRegistry,
model,
thinkingLevel: mapThinkingLevel(params.thinkLevel),
systemPrompt,
tools: builtInTools,
customTools,
sessionManager,
settingsManager,
skills: [],
contextFiles: [],
additionalExtensionPaths,
}));
try {
const prior = await sanitizeSessionHistory({
messages: session.messages,
modelApi: model.api,
sessionManager,
sessionId: params.sessionId,
});
const validated = validateGeminiTurns(prior);
if (validated.length > 0) {
session.agent.replaceMessages(validated);
}
const result = await session.compact(params.customInstructions);
return {
ok: true,
compacted: true,
result: {
summary: result.summary,
firstKeptEntryId: result.firstKeptEntryId,
tokensBefore: result.tokensBefore,
details: result.details,
},
};
} finally {
session.dispose();
} }
const result = await session.compact(params.customInstructions);
return {
ok: true,
compacted: true,
result: {
summary: result.summary,
firstKeptEntryId: result.firstKeptEntryId,
tokensBefore: result.tokensBefore,
details: result.details,
},
};
} finally { } finally {
session.dispose(); await sessionLock.release();
} }
} catch (err) { } catch (err) {
return { return {
@@ -1333,6 +1343,9 @@ export async function runEmbeddedPiAgent(params: {
}); });
const systemPrompt = createSystemPromptOverride(appendPrompt); const systemPrompt = createSystemPromptOverride(appendPrompt);
const sessionLock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
});
// Pre-warm session file to bring it into OS page cache // Pre-warm session file to bring it into OS page cache
await prewarmSessionFile(params.sessionFile); await prewarmSessionFile(params.sessionFile);
const sessionManager = SessionManager.open(params.sessionFile); const sessionManager = SessionManager.open(params.sessionFile);
@@ -1390,6 +1403,7 @@ export async function runEmbeddedPiAgent(params: {
} }
} catch (err) { } catch (err) {
session.dispose(); session.dispose();
await sessionLock.release();
throw err; throw err;
} }
let aborted = Boolean(params.abortSignal?.aborted); let aborted = Boolean(params.abortSignal?.aborted);
@@ -1419,6 +1433,7 @@ export async function runEmbeddedPiAgent(params: {
}); });
} catch (err) { } catch (err) {
session.dispose(); session.dispose();
await sessionLock.release();
throw err; throw err;
} }
const { const {
@@ -1515,6 +1530,7 @@ export async function runEmbeddedPiAgent(params: {
notifyEmbeddedRunEnded(params.sessionId); notifyEmbeddedRunEnded(params.sessionId);
} }
session.dispose(); session.dispose();
await sessionLock.release();
params.abortSignal?.removeEventListener?.("abort", onAbort); params.abortSignal?.removeEventListener?.("abort", onAbort);
} }
if (promptError && !aborted) { if (promptError && !aborted) {

View File

@@ -92,4 +92,25 @@ describe("sanitizeToolUseResultPairing", () => {
expect(results).toHaveLength(1); expect(results).toHaveLength(1);
expect(results[0]?.toolCallId).toBe("call_1"); expect(results[0]?.toolCallId).toBe("call_1");
}); });
it("drops orphan tool results that do not match any tool call", () => {
const input = [
{ role: "user", content: "hello" },
{
role: "toolResult",
toolCallId: "call_orphan",
toolName: "read",
content: [{ type: "text", text: "orphan" }],
isError: false,
},
{
role: "assistant",
content: [{ type: "text", text: "ok" }],
},
] satisfies AgentMessage[];
const out = sanitizeToolUseResultPairing(input);
expect(out.some((m) => m.role === "toolResult")).toBe(false);
expect(out.map((m) => m.role)).toEqual(["user", "assistant"]);
});
}); });

View File

@@ -90,11 +90,10 @@ export function sanitizeToolUseResultPairing(
const role = (msg as { role?: unknown }).role; const role = (msg as { role?: unknown }).role;
if (role !== "assistant") { if (role !== "assistant") {
if (role === "toolResult") { // Tool results must only appear directly after the matching assistant tool call turn.
pushToolResult(msg as Extract<AgentMessage, { role: "toolResult" }>); // Any "free-floating" toolResult entries in session history can make strict providers
} else { // (Anthropic-compatible APIs, MiniMax, Cloud Code Assist) reject the entire request.
out.push(msg); if (role !== "toolResult") out.push(msg);
}
continue; continue;
} }
@@ -141,7 +140,8 @@ export function sanitizeToolUseResultPairing(
} }
} }
remainder.push(next); // Drop tool results that don't match the current assistant tool calls.
if (nextRole !== "toolResult") remainder.push(next);
} }
out.push(msg); out.push(msg);
@@ -159,11 +159,6 @@ export function sanitizeToolUseResultPairing(
out.push(rem); out.push(rem);
continue; continue;
} }
const remRole = (rem as { role?: unknown }).role;
if (remRole === "toolResult") {
pushToolResult(rem as Extract<AgentMessage, { role: "toolResult" }>);
continue;
}
out.push(rem); out.push(rem);
} }
i = j - 1; i = j - 1;

View File

@@ -0,0 +1,119 @@
import fs from "node:fs/promises";
type LockFilePayload = {
pid: number;
createdAt: string;
};
type HeldLock = {
count: number;
handle: fs.FileHandle;
lockPath: string;
};
const HELD_LOCKS = new Map<string, HeldLock>();
function isAlive(pid: number): boolean {
if (!Number.isFinite(pid) || pid <= 0) return false;
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
async function readLockPayload(
lockPath: string,
): Promise<LockFilePayload | null> {
try {
const raw = await fs.readFile(lockPath, "utf8");
const parsed = JSON.parse(raw) as Partial<LockFilePayload>;
if (typeof parsed.pid !== "number") return null;
if (typeof parsed.createdAt !== "string") return null;
return { pid: parsed.pid, createdAt: parsed.createdAt };
} catch {
return null;
}
}
export async function acquireSessionWriteLock(params: {
sessionFile: string;
timeoutMs?: number;
staleMs?: number;
}): Promise<{
release: () => Promise<void>;
}> {
const timeoutMs = params.timeoutMs ?? 10_000;
const staleMs = params.staleMs ?? 30 * 60 * 1000;
const sessionFile = params.sessionFile;
const lockPath = `${sessionFile}.lock`;
const held = HELD_LOCKS.get(sessionFile);
if (held) {
held.count += 1;
return {
release: async () => {
const current = HELD_LOCKS.get(sessionFile);
if (!current) return;
current.count -= 1;
if (current.count > 0) return;
HELD_LOCKS.delete(sessionFile);
await current.handle.close();
await fs.rm(current.lockPath, { force: true });
},
};
}
const startedAt = Date.now();
let attempt = 0;
while (Date.now() - startedAt < timeoutMs) {
attempt += 1;
try {
const handle = await fs.open(lockPath, "wx");
await handle.writeFile(
JSON.stringify(
{ pid: process.pid, createdAt: new Date().toISOString() },
null,
2,
),
"utf8",
);
HELD_LOCKS.set(sessionFile, { count: 1, handle, lockPath });
return {
release: async () => {
const current = HELD_LOCKS.get(sessionFile);
if (!current) return;
current.count -= 1;
if (current.count > 0) return;
HELD_LOCKS.delete(sessionFile);
await current.handle.close();
await fs.rm(current.lockPath, { force: true });
},
};
} catch (err) {
const code = (err as { code?: unknown }).code;
if (code !== "EEXIST") throw err;
const payload = await readLockPayload(lockPath);
const createdAt = payload?.createdAt
? Date.parse(payload.createdAt)
: NaN;
const stale =
!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs;
const alive = payload?.pid ? isAlive(payload.pid) : false;
if (stale || !alive) {
await fs.rm(lockPath, { force: true });
continue;
}
const delay = Math.min(1000, 50 * attempt);
await new Promise((r) => setTimeout(r, delay));
}
}
const payload = await readLockPayload(lockPath);
const owner = payload?.pid ? `pid=${payload.pid}` : "unknown";
throw new Error(
`session file locked (timeout ${timeoutMs}ms): ${owner} ${lockPath}`,
);
}