import type { DatabaseSync } from "node:sqlite"; import { createSubsystemLogger } from "../logging/subsystem.js"; import type { SessionFileEntry } from "./session-files.js"; import { buildSessionEntry, listSessionFilesForAgent, sessionPathForFile, } from "./session-files.js"; const log = createSubsystemLogger("memory"); type ProgressState = { completed: number; total: number; label?: string; report: (update: { completed: number; total: number; label?: string }) => void; }; export async function syncSessionFiles(params: { agentId: string; db: DatabaseSync; needsFullReindex: boolean; progress?: ProgressState; batchEnabled: boolean; concurrency: number; runWithConcurrency: (tasks: Array<() => Promise>, concurrency: number) => Promise; indexFile: (entry: SessionFileEntry) => Promise; vectorTable: string; ftsTable: string; ftsEnabled: boolean; ftsAvailable: boolean; model: string; dirtyFiles: Set; }) { const files = await listSessionFilesForAgent(params.agentId); const activePaths = new Set(files.map((file) => sessionPathForFile(file))); const indexAll = params.needsFullReindex || params.dirtyFiles.size === 0; log.debug("memory sync: indexing session files", { files: files.length, indexAll, dirtyFiles: params.dirtyFiles.size, batch: params.batchEnabled, concurrency: params.concurrency, }); if (params.progress) { params.progress.total += files.length; params.progress.report({ completed: params.progress.completed, total: params.progress.total, label: params.batchEnabled ? "Indexing session files (batch)..." : "Indexing session files…", }); } const tasks = files.map((absPath) => async () => { if (!indexAll && !params.dirtyFiles.has(absPath)) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } const entry = await buildSessionEntry(absPath); if (!entry) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } const record = params.db .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) .get(entry.path, "sessions") as { hash: string } | undefined; if (!params.needsFullReindex && record?.hash === entry.hash) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } await params.indexFile(entry); if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } }); await params.runWithConcurrency(tasks, params.concurrency); const staleRows = params.db .prepare(`SELECT path FROM files WHERE source = ?`) .all("sessions") as Array<{ path: string }>; for (const stale of staleRows) { if (activePaths.has(stale.path)) continue; params.db .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) .run(stale.path, "sessions"); try { params.db .prepare( `DELETE FROM ${params.vectorTable} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(stale.path, "sessions"); } catch {} params.db .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) .run(stale.path, "sessions"); if (params.ftsEnabled && params.ftsAvailable) { try { params.db .prepare(`DELETE FROM ${params.ftsTable} WHERE path = ? AND source = ? AND model = ?`) .run(stale.path, "sessions", params.model); } catch {} } } }