fix: make session memory indexing async

This commit is contained in:
Peter Steinberger
2026-01-21 10:37:52 +00:00
parent 86ddd3c69c
commit 8479dc97da
10 changed files with 316 additions and 15 deletions

View File

@@ -153,6 +153,7 @@ describe("memory index", () => {
expect(second.manager).not.toBeNull();
if (!second.manager) throw new Error("manager missing");
manager = second.manager;
await second.manager.sync({ reason: "test" });
const results = await second.manager.search("alpha");
expect(results.length).toBeGreaterThan(0);
});
@@ -250,6 +251,7 @@ describe("memory index", () => {
const status = manager.status();
if (!status.fts?.available) return;
await manager.sync({ force: true });
const results = await manager.search("zebra");
expect(results.length).toBeGreaterThan(0);
expect(results[0]?.path).toContain("memory/2026-01-12.md");
@@ -298,6 +300,7 @@ describe("memory index", () => {
const status = manager.status();
if (!status.fts?.available) return;
await manager.sync({ force: true });
const results = await manager.search("alpha beta id123");
expect(results.length).toBeGreaterThan(0);
const paths = results.map((r) => r.path);
@@ -351,6 +354,7 @@ describe("memory index", () => {
const status = manager.status();
if (!status.fts?.available) return;
await manager.sync({ force: true });
const results = await manager.search("alpha beta id123");
expect(results.length).toBeGreaterThan(0);
const paths = results.map((r) => r.path);

View File

@@ -0,0 +1,82 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { getMemorySearchManager, type MemoryIndexManager } from "./index.js";
const embedBatch = vi.fn(async () => []);
const embedQuery = vi.fn(async () => [0.2, 0.2, 0.2]);
vi.mock("./embeddings.js", () => ({
createEmbeddingProvider: async () => ({
requestedProvider: "openai",
provider: {
id: "openai",
model: "text-embedding-3-small",
embedQuery,
embedBatch,
},
openAi: {
baseUrl: "https://api.openai.com/v1",
headers: { Authorization: "Bearer test", "Content-Type": "application/json" },
model: "text-embedding-3-small",
},
}),
}));
describe("memory search async sync", () => {
let workspaceDir: string;
let indexPath: string;
let manager: MemoryIndexManager | null = null;
beforeEach(async () => {
workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-mem-async-"));
indexPath = path.join(workspaceDir, "index.sqlite");
await fs.mkdir(path.join(workspaceDir, "memory"));
await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-07.md"), "hello\n");
});
afterEach(async () => {
vi.unstubAllGlobals();
if (manager) {
await manager.close();
manager = null;
}
await fs.rm(workspaceDir, { recursive: true, force: true });
});
it("does not await sync when searching", async () => {
const cfg = {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "text-embedding-3-small",
store: { path: indexPath },
sync: { watch: false, onSessionStart: false, onSearch: true },
query: { minScore: 0 },
remote: { batch: { enabled: true, wait: true } },
},
},
list: [{ id: "main", default: true }],
},
};
const result = await getMemorySearchManager({ cfg, agentId: "main" });
expect(result.manager).not.toBeNull();
if (!result.manager) throw new Error("manager missing");
manager = result.manager;
const pending = new Promise<void>(() => {});
(manager as unknown as { sync: () => Promise<void> }).sync = vi.fn(async () => pending);
const resolved = await Promise.race([
manager.search("hello").then(() => true),
new Promise<boolean>((resolve) => setTimeout(() => resolve(false), 50)),
]);
expect(resolved).toBe(true);
});
});

View File

@@ -101,6 +101,7 @@ const EMBEDDING_RETRY_MAX_ATTEMPTS = 3;
const EMBEDDING_RETRY_BASE_DELAY_MS = 500;
const EMBEDDING_RETRY_MAX_DELAY_MS = 8000;
const BATCH_FAILURE_LIMIT = 2;
const SESSION_DELTA_READ_CHUNK_BYTES = 64 * 1024;
const log = createSubsystemLogger("memory");
@@ -158,6 +159,11 @@ export class MemoryIndexManager {
private dirty = false;
private sessionsDirty = false;
private sessionsDirtyFiles = new Set<string>();
private sessionPendingFiles = new Set<string>();
private sessionDeltas = new Map<
string,
{ lastSize: number; pendingBytes: number; pendingMessages: number }
>();
private sessionWarm = new Set<string>();
private syncing: Promise<void> | null = null;
@@ -234,9 +240,6 @@ export class MemoryIndexManager {
this.ensureSessionListener();
this.ensureIntervalSync();
this.dirty = this.sources.has("memory");
if (this.sources.has("sessions")) {
this.sessionsDirty = true;
}
this.batch = this.resolveBatchConfig();
}
@@ -244,7 +247,9 @@ export class MemoryIndexManager {
if (!this.settings.sync.onSessionStart) return;
const key = sessionKey?.trim() || "";
if (key && this.sessionWarm.has(key)) return;
await this.sync({ reason: "session-start" });
void this.sync({ reason: "session-start" }).catch((err) => {
log.warn(`memory sync failed (session-start): ${String(err)}`);
});
if (key) this.sessionWarm.add(key);
}
@@ -256,9 +261,11 @@ export class MemoryIndexManager {
sessionKey?: string;
},
): Promise<MemorySearchResult[]> {
await this.warmSession(opts?.sessionKey);
void this.warmSession(opts?.sessionKey);
if (this.settings.sync.onSearch && (this.dirty || this.sessionsDirty)) {
await this.sync({ reason: "search" });
void this.sync({ reason: "search" }).catch((err) => {
log.warn(`memory sync failed (search): ${String(err)}`);
});
}
const cleaned = query.trim();
if (!cleaned) return [];
@@ -774,14 +781,135 @@ export class MemoryIndexManager {
}
private scheduleSessionDirty(sessionFile: string) {
this.sessionsDirtyFiles.add(sessionFile);
this.sessionPendingFiles.add(sessionFile);
if (this.sessionWatchTimer) return;
this.sessionWatchTimer = setTimeout(() => {
this.sessionWatchTimer = null;
this.sessionsDirty = true;
void this.processSessionDeltaBatch().catch((err) => {
log.warn(`memory session delta failed: ${String(err)}`);
});
}, SESSION_DIRTY_DEBOUNCE_MS);
}
private async processSessionDeltaBatch(): Promise<void> {
if (this.sessionPendingFiles.size === 0) return;
const pending = Array.from(this.sessionPendingFiles);
this.sessionPendingFiles.clear();
let shouldSync = false;
for (const sessionFile of pending) {
const delta = await this.updateSessionDelta(sessionFile);
if (!delta) continue;
const bytesThreshold = delta.deltaBytes;
const messagesThreshold = delta.deltaMessages;
const bytesHit =
bytesThreshold <= 0 ? delta.pendingBytes > 0 : delta.pendingBytes >= bytesThreshold;
const messagesHit =
messagesThreshold <= 0
? delta.pendingMessages > 0
: delta.pendingMessages >= messagesThreshold;
if (!bytesHit && !messagesHit) continue;
this.sessionsDirtyFiles.add(sessionFile);
this.sessionsDirty = true;
delta.pendingBytes =
bytesThreshold > 0 ? Math.max(0, delta.pendingBytes - bytesThreshold) : 0;
delta.pendingMessages =
messagesThreshold > 0 ? Math.max(0, delta.pendingMessages - messagesThreshold) : 0;
shouldSync = true;
}
if (shouldSync) {
void this.sync({ reason: "session-delta" }).catch((err) => {
log.warn(`memory sync failed (session-delta): ${String(err)}`);
});
}
}
private async updateSessionDelta(sessionFile: string): Promise<{
deltaBytes: number;
deltaMessages: number;
pendingBytes: number;
pendingMessages: number;
} | null> {
const thresholds = this.settings.sync.sessions;
if (!thresholds) return null;
let stat: { size: number };
try {
stat = await fs.stat(sessionFile);
} catch {
return null;
}
const size = stat.size;
let state = this.sessionDeltas.get(sessionFile);
if (!state) {
state = { lastSize: 0, pendingBytes: 0, pendingMessages: 0 };
this.sessionDeltas.set(sessionFile, state);
}
const deltaBytes = Math.max(0, size - state.lastSize);
if (deltaBytes === 0 && size === state.lastSize) {
return {
deltaBytes: thresholds.deltaBytes,
deltaMessages: thresholds.deltaMessages,
pendingBytes: state.pendingBytes,
pendingMessages: state.pendingMessages,
};
}
if (size < state.lastSize) {
state.lastSize = size;
state.pendingBytes += size;
const shouldCountMessages =
thresholds.deltaMessages > 0 &&
(thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes);
if (shouldCountMessages) {
state.pendingMessages += await this.countNewlines(sessionFile, 0, size);
}
} else {
state.pendingBytes += deltaBytes;
const shouldCountMessages =
thresholds.deltaMessages > 0 &&
(thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes);
if (shouldCountMessages) {
state.pendingMessages += await this.countNewlines(sessionFile, state.lastSize, size);
}
state.lastSize = size;
}
this.sessionDeltas.set(sessionFile, state);
return {
deltaBytes: thresholds.deltaBytes,
deltaMessages: thresholds.deltaMessages,
pendingBytes: state.pendingBytes,
pendingMessages: state.pendingMessages,
};
}
private async countNewlines(absPath: string, start: number, end: number): Promise<number> {
if (end <= start) return 0;
const handle = await fs.open(absPath, "r");
try {
let offset = start;
let count = 0;
const buffer = Buffer.alloc(SESSION_DELTA_READ_CHUNK_BYTES);
while (offset < end) {
const toRead = Math.min(buffer.length, end - offset);
const { bytesRead } = await handle.read(buffer, 0, toRead, offset);
if (bytesRead <= 0) break;
for (let i = 0; i < bytesRead; i += 1) {
if (buffer[i] === 10) count += 1;
}
offset += bytesRead;
}
return count;
} finally {
await handle.close();
}
}
private resetSessionDelta(absPath: string, size: number): void {
const state = this.sessionDeltas.get(absPath);
if (!state) return;
state.lastSize = size;
state.pendingBytes = 0;
state.pendingMessages = 0;
}
private isSessionFileForAgent(sessionFile: string): boolean {
if (!sessionFile) return false;
const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId);
@@ -820,7 +948,8 @@ export class MemoryIndexManager {
if (params?.force) return true;
const reason = params?.reason;
if (reason === "session-start" || reason === "watch") return false;
return this.sessionsDirty || needsFullReindex;
if (needsFullReindex) return true;
return this.sessionsDirty && this.sessionsDirtyFiles.size > 0;
}
private async syncMemoryFiles(params: {
@@ -952,9 +1081,11 @@ export class MemoryIndexManager {
total: params.progress.total,
});
}
this.resetSessionDelta(absPath, entry.size);
return;
}
await this.indexFile(entry, { source: "sessions", content: entry.content });
this.resetSessionDelta(absPath, entry.size);
if (params.progress) {
params.progress.completed += 1;
params.progress.report({
@@ -1056,8 +1187,10 @@ export class MemoryIndexManager {
await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined });
this.sessionsDirty = false;
this.sessionsDirtyFiles.clear();
} else if (this.sources.has("sessions")) {
} else if (this.sessionsDirtyFiles.size > 0) {
this.sessionsDirty = true;
} else {
this.sessionsDirty = false;
}
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
@@ -1197,8 +1330,10 @@ export class MemoryIndexManager {
await this.syncSessionFiles({ needsFullReindex: true, progress: params.progress });
this.sessionsDirty = false;
this.sessionsDirtyFiles.clear();
} else if (this.sources.has("sessions")) {
} else if (this.sessionsDirtyFiles.size > 0) {
this.sessionsDirty = true;
} else {
this.sessionsDirty = false;
}
nextMeta = {