fix: harden session caching and topic transcripts

This commit is contained in:
Peter Steinberger
2026-01-07 22:38:41 +00:00
parent 8da4f259dd
commit 67d1f61872
13 changed files with 75 additions and 289 deletions

View File

@@ -278,14 +278,12 @@ export function pickFallbackThinkingLevel(params: {
* Gemini requires strict alternating user→assistant→tool→user pattern.
* This function:
* 1. Detects consecutive messages from the same role
* 2. Merges consecutive assistant/tool messages together
* 2. Merges consecutive assistant messages together
* 3. Preserves metadata (usage, stopReason, etc.)
*
* This prevents the "function call turn comes immediately after a user turn or after a function response turn" error.
*/
export function validateGeminiTurns(
messages: AgentMessage[],
): AgentMessage[] {
export function validateGeminiTurns(messages: AgentMessage[]): AgentMessage[] {
if (!Array.isArray(messages) || messages.length === 0) {
return messages;
}
@@ -299,9 +297,7 @@ export function validateGeminiTurns(
continue;
}
const msgRole = (msg as { role?: unknown }).role as
| string
| undefined;
const msgRole = (msg as { role?: unknown }).role as string | undefined;
if (!msgRole) {
result.push(msg);
continue;

View File

@@ -334,7 +334,6 @@ const EMBEDDED_RUN_WAITERS = new Map<string, Set<EmbeddedRunWaiter>>();
type SessionManagerCacheEntry = {
sessionFile: string;
loadedAt: number;
lastAccessAt: number;
};
const SESSION_MANAGER_CACHE = new Map<string, SessionManagerCacheEntry>();
@@ -362,7 +361,6 @@ function trackSessionManagerAccess(sessionFile: string): void {
SESSION_MANAGER_CACHE.set(sessionFile, {
sessionFile,
loadedAt: now,
lastAccessAt: now,
});
}
@@ -380,9 +378,14 @@ async function prewarmSessionFile(sessionFile: string): Promise<void> {
if (isSessionManagerCached(sessionFile)) return;
try {
// Touch the file to bring it into OS page cache
// This is much faster than letting SessionManager.open() do it cold
await fs.stat(sessionFile);
// Read a small chunk to encourage OS page cache warmup.
const handle = await fs.open(sessionFile, "r");
try {
const buffer = Buffer.alloc(4096);
await handle.read(buffer, 0, buffer.length, 0);
} finally {
await handle.close();
}
trackSessionManagerAccess(sessionFile);
} catch {
// File doesn't exist yet, SessionManager will create it

View File

@@ -1,12 +1,12 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import fs from "node:fs";
import path from "node:path";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
loadSessionStore,
saveSessionStore,
clearSessionStoreCacheForTest,
loadSessionStore,
type SessionEntry,
saveSessionStore,
} from "./sessions.js";
describe("Session Store Cache", () => {
@@ -52,7 +52,7 @@ describe("Session Store Cache", () => {
expect(loaded).toEqual(testStore);
});
it("should cache session store on first load", async () => {
it("should cache session store on first load when file is unchanged", async () => {
const testStore: Record<string, SessionEntry> = {
"session:1": {
sessionId: "id-1",
@@ -63,26 +63,20 @@ describe("Session Store Cache", () => {
await saveSessionStore(storePath, testStore);
const readSpy = vi.spyOn(fs, "readFileSync");
// First load - from disk
const loaded1 = loadSessionStore(storePath);
expect(loaded1).toEqual(testStore);
// Modify file on disk
const modifiedStore: Record<string, SessionEntry> = {
"session:2": {
sessionId: "id-2",
updatedAt: Date.now(),
displayName: "Test Session 2",
},
};
fs.writeFileSync(storePath, JSON.stringify(modifiedStore, null, 2));
// Second load - should still return cached data (not the modified file)
// Second load - should return cached data (no extra disk read)
const loaded2 = loadSessionStore(storePath);
expect(loaded2).toEqual(testStore); // Should be original, not modified
expect(loaded2).toEqual(testStore);
expect(readSpy).toHaveBeenCalledTimes(1);
readSpy.mockRestore();
});
it("should cache multiple calls to the same store path", async () => {
it("should refresh cache when store file changes on disk", async () => {
const testStore: Record<string, SessionEntry> = {
"session:1": {
sessionId: "id-1",
@@ -98,12 +92,16 @@ describe("Session Store Cache", () => {
expect(loaded1).toEqual(testStore);
// Modify file on disk while cache is valid
fs.writeFileSync(storePath, JSON.stringify({ "session:99": { sessionId: "id-99", updatedAt: Date.now() } }, null, 2));
const modifiedStore: Record<string, SessionEntry> = {
"session:99": { sessionId: "id-99", updatedAt: Date.now() },
};
fs.writeFileSync(storePath, JSON.stringify(modifiedStore, null, 2));
const bump = new Date(Date.now() + 2000);
fs.utimesSync(storePath, bump, bump);
// Second load - should still return original cached data
// Second load - should return the updated store
const loaded2 = loadSessionStore(storePath);
expect(loaded2).toEqual(testStore);
expect(loaded2).not.toHaveProperty("session:99");
expect(loaded2).toEqual(modifiedStore);
});
it("should invalidate cache on write", async () => {

View File

@@ -8,6 +8,7 @@ import {
deriveSessionKey,
loadSessionStore,
resolveSessionKey,
resolveSessionTranscriptPath,
resolveSessionTranscriptsDir,
updateLastRoute,
} from "./sessions.js";
@@ -147,4 +148,21 @@ describe("sessions", () => {
);
expect(dir).toBe("/legacy/state/agents/main/sessions");
});
it("includes topic ids in session transcript filenames", () => {
const prev = process.env.CLAWDBOT_STATE_DIR;
process.env.CLAWDBOT_STATE_DIR = "/custom/state";
try {
const sessionFile = resolveSessionTranscriptPath("sess-1", "main", 123);
expect(sessionFile).toBe(
"/custom/state/agents/main/sessions/sess-1-topic-123.jsonl",
);
} finally {
if (prev === undefined) {
delete process.env.CLAWDBOT_STATE_DIR;
} else {
process.env.CLAWDBOT_STATE_DIR = prev;
}
}
});
});

View File

@@ -24,6 +24,7 @@ type SessionStoreCacheEntry = {
store: Record<string, SessionEntry>;
loadedAt: number;
storePath: string;
mtimeMs?: number;
};
const SESSION_STORE_CACHE = new Map<string, SessionStoreCacheEntry>();
@@ -52,6 +53,14 @@ function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean {
return now - entry.loadedAt <= ttl;
}
function getSessionStoreMtimeMs(storePath: string): number | undefined {
try {
return fs.statSync(storePath).mtimeMs;
} catch {
return undefined;
}
}
function invalidateSessionStoreCache(storePath: string): void {
SESSION_STORE_CACHE.delete(storePath);
}
@@ -180,19 +189,22 @@ export function resolveSessionTranscriptPath(
agentId?: string,
topicId?: number,
): string {
const fileName = topicId !== undefined ? `${sessionId}-topic-${topicId}.jsonl` : `${sessionId}.jsonl`;
const fileName =
topicId !== undefined
? `${sessionId}-topic-${topicId}.jsonl`
: `${sessionId}.jsonl`;
return path.join(resolveAgentSessionsDir(agentId), fileName);
}
export function resolveSessionFilePath(
sessionId: string,
entry?: SessionEntry,
opts?: { agentId?: string },
opts?: { agentId?: string; topicId?: number },
): string {
const candidate = entry?.sessionFile?.trim();
return candidate
? candidate
: resolveSessionTranscriptPath(sessionId, opts?.agentId);
: resolveSessionTranscriptPath(sessionId, opts?.agentId, opts?.topicId);
}
export function resolveStorePath(store?: string, opts?: { agentId?: string }) {
@@ -390,19 +402,25 @@ export function loadSessionStore(
if (isSessionStoreCacheEnabled()) {
const cached = SESSION_STORE_CACHE.get(storePath);
if (cached && isSessionStoreCacheValid(cached)) {
// Return a shallow copy to prevent external mutations affecting cache
return { ...cached.store };
const currentMtimeMs = getSessionStoreMtimeMs(storePath);
if (currentMtimeMs === cached.mtimeMs) {
// Return a shallow copy to prevent external mutations affecting cache
return { ...cached.store };
}
invalidateSessionStoreCache(storePath);
}
}
// Cache miss or disabled - load from disk
let store: Record<string, SessionEntry> = {};
let mtimeMs = getSessionStoreMtimeMs(storePath);
try {
const raw = fs.readFileSync(storePath, "utf-8");
const parsed = JSON5.parse(raw);
if (parsed && typeof parsed === "object") {
store = parsed as Record<string, SessionEntry>;
}
mtimeMs = getSessionStoreMtimeMs(storePath) ?? mtimeMs;
} catch {
// ignore missing/invalid store; we'll recreate it
}
@@ -413,6 +431,7 @@ export function loadSessionStore(
store: { ...store }, // Store a copy to prevent external mutations
loadedAt: Date.now(),
storePath,
mtimeMs,
});
}

View File

@@ -31,15 +31,6 @@ export type SessionSendPolicyConfig = {
rules?: SessionSendPolicyRule[];
};
export type SessionCacheConfig = {
/** Enable session store caching (default: true). Set to false to disable. */
enabled?: boolean;
/** Session store cache TTL in milliseconds (default: 45000 = 45s). Set to 0 to disable. */
storeTtlMs?: number;
/** SessionManager cache TTL in milliseconds (default: 45000 = 45s). Set to 0 to disable. */
managerTtlMs?: number;
};
export type SessionConfig = {
scope?: SessionScope;
resetTriggers?: string[];
@@ -50,8 +41,6 @@ export type SessionConfig = {
typingMode?: TypingMode;
mainKey?: string;
sendPolicy?: SessionSendPolicyConfig;
/** Session caching configuration. */
cache?: SessionCacheConfig;
agentToAgent?: {
/** Max ping-pong turns between requester/target (05). Default: 5. */
maxPingPongTurns?: number;