Files
clawdbot/src/memory/sync-session-files.ts
2026-01-19 00:15:44 +00:00

131 lines
3.9 KiB
TypeScript

import type { DatabaseSync } from "node:sqlite";
import { createSubsystemLogger } from "../logging/subsystem.js";
import type { SessionFileEntry } from "./session-files.js";
import {
buildSessionEntry,
listSessionFilesForAgent,
sessionPathForFile,
} from "./session-files.js";
const log = createSubsystemLogger("memory");
type ProgressState = {
completed: number;
total: number;
label?: string;
report: (update: { completed: number; total: number; label?: string }) => void;
};
export async function syncSessionFiles(params: {
agentId: string;
db: DatabaseSync;
needsFullReindex: boolean;
progress?: ProgressState;
batchEnabled: boolean;
concurrency: number;
runWithConcurrency: <T>(tasks: Array<() => Promise<T>>, concurrency: number) => Promise<T[]>;
indexFile: (entry: SessionFileEntry) => Promise<void>;
vectorTable: string;
ftsTable: string;
ftsEnabled: boolean;
ftsAvailable: boolean;
model: string;
dirtyFiles: Set<string>;
}) {
const files = await listSessionFilesForAgent(params.agentId);
const activePaths = new Set(files.map((file) => sessionPathForFile(file)));
const indexAll = params.needsFullReindex || params.dirtyFiles.size === 0;
log.debug("memory sync: indexing session files", {
files: files.length,
indexAll,
dirtyFiles: params.dirtyFiles.size,
batch: params.batchEnabled,
concurrency: params.concurrency,
});
if (params.progress) {
params.progress.total += files.length;
params.progress.report({
completed: params.progress.completed,
total: params.progress.total,
label: params.batchEnabled ? "Indexing session files (batch)..." : "Indexing session files…",
});
}
const tasks = files.map((absPath) => async () => {
if (!indexAll && !params.dirtyFiles.has(absPath)) {
if (params.progress) {
params.progress.completed += 1;
params.progress.report({
completed: params.progress.completed,
total: params.progress.total,
});
}
return;
}
const entry = await buildSessionEntry(absPath);
if (!entry) {
if (params.progress) {
params.progress.completed += 1;
params.progress.report({
completed: params.progress.completed,
total: params.progress.total,
});
}
return;
}
const record = params.db
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
.get(entry.path, "sessions") as { hash: string } | undefined;
if (!params.needsFullReindex && record?.hash === entry.hash) {
if (params.progress) {
params.progress.completed += 1;
params.progress.report({
completed: params.progress.completed,
total: params.progress.total,
});
}
return;
}
await params.indexFile(entry);
if (params.progress) {
params.progress.completed += 1;
params.progress.report({
completed: params.progress.completed,
total: params.progress.total,
});
}
});
await params.runWithConcurrency(tasks, params.concurrency);
const staleRows = params.db
.prepare(`SELECT path FROM files WHERE source = ?`)
.all("sessions") as Array<{ path: string }>;
for (const stale of staleRows) {
if (activePaths.has(stale.path)) continue;
params.db
.prepare(`DELETE FROM files WHERE path = ? AND source = ?`)
.run(stale.path, "sessions");
try {
params.db
.prepare(
`DELETE FROM ${params.vectorTable} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
)
.run(stale.path, "sessions");
} catch {}
params.db
.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`)
.run(stale.path, "sessions");
if (params.ftsEnabled && params.ftsAvailable) {
try {
params.db
.prepare(`DELETE FROM ${params.ftsTable} WHERE path = ? AND source = ? AND model = ?`)
.run(stale.path, "sessions", params.model);
} catch {}
}
}
}