From c9c95162067ae0e2a21c4ace225c13f665d6aaa4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 18 Jan 2026 07:02:14 +0000 Subject: [PATCH] refactor(memory): extract sync + status helpers --- src/cli/memory-cli.ts | 44 ++--- src/commands/status.command.ts | 39 ++-- src/memory/headers-fingerprint.ts | 16 ++ src/memory/manager-cache-key.ts | 55 ++++++ src/memory/manager.ts | 312 ++++-------------------------- src/memory/provider-key.ts | 22 +++ src/memory/session-files.ts | 103 ++++++++++ src/memory/status-format.ts | 34 ++++ src/memory/sync-memory-files.ts | 102 ++++++++++ src/memory/sync-session-files.ts | 126 ++++++++++++ 10 files changed, 532 insertions(+), 321 deletions(-) create mode 100644 src/memory/headers-fingerprint.ts create mode 100644 src/memory/manager-cache-key.ts create mode 100644 src/memory/provider-key.ts create mode 100644 src/memory/session-files.ts create mode 100644 src/memory/status-format.ts create mode 100644 src/memory/sync-memory-files.ts create mode 100644 src/memory/sync-session-files.ts diff --git a/src/cli/memory-cli.ts b/src/cli/memory-cli.ts index adf31a1bc..f92dce8ec 100644 --- a/src/cli/memory-cli.ts +++ b/src/cli/memory-cli.ts @@ -6,6 +6,12 @@ import { setVerbose } from "../globals.js"; import { withProgress, withProgressTotals } from "./progress.js"; import { formatErrorMessage, withManager } from "./cli-utils.js"; import { getMemorySearchManager, type MemorySearchManagerResult } from "../memory/index.js"; +import { + resolveMemoryCacheState, + resolveMemoryFtsState, + resolveMemoryVectorState, + type Tone, +} from "../memory/status-format.js"; import { defaultRuntime } from "../runtime.js"; import { formatDocsLink } from "../terminal/links.js"; import { colorize, isRich, theme } from "../terminal/theme.js"; @@ -131,6 +137,8 @@ export function registerMemoryCli(program: Command) { const warn = (text: string) => colorize(rich, theme.warn, text); const accent = (text: string) => colorize(rich, theme.accent, text); const label = (text: string) => muted(`${text}:`); + const colorForTone = (tone: Tone) => + tone === "ok" ? theme.success : tone === "warn" ? theme.warn : theme.muted; const lines = [ `${heading("Memory Search")} ${muted(`(${agentId})`)}`, `${label("Provider")} ${info(status.provider)} ${muted( @@ -164,18 +172,9 @@ export function registerMemoryCli(program: Command) { lines.push(`${label("Fallback")} ${warn(status.fallback.from)}`); } if (status.vector) { - const vectorState = status.vector.enabled - ? status.vector.available - ? "ready" - : "unavailable" - : "disabled"; - const vectorColor = - vectorState === "ready" - ? theme.success - : vectorState === "unavailable" - ? theme.warn - : theme.muted; - lines.push(`${label("Vector")} ${colorize(rich, vectorColor, vectorState)}`); + const vectorState = resolveMemoryVectorState(status.vector); + const vectorColor = colorForTone(vectorState.tone); + lines.push(`${label("Vector")} ${colorize(rich, vectorColor, vectorState.state)}`); if (status.vector.dims) { lines.push(`${label("Vector dims")} ${info(String(status.vector.dims))}`); } @@ -187,31 +186,22 @@ export function registerMemoryCli(program: Command) { } } if (status.fts) { - const ftsState = status.fts.enabled - ? status.fts.available - ? "ready" - : "unavailable" - : "disabled"; - const ftsColor = - ftsState === "ready" - ? theme.success - : ftsState === "unavailable" - ? theme.warn - : theme.muted; - lines.push(`${label("FTS")} ${colorize(rich, ftsColor, ftsState)}`); + const ftsState = resolveMemoryFtsState(status.fts); + const ftsColor = colorForTone(ftsState.tone); + lines.push(`${label("FTS")} ${colorize(rich, ftsColor, ftsState.state)}`); if (status.fts.error) { lines.push(`${label("FTS error")} ${warn(status.fts.error)}`); } } if (status.cache) { - const cacheState = status.cache.enabled ? "enabled" : "disabled"; - const cacheColor = status.cache.enabled ? theme.success : theme.muted; + const cacheState = resolveMemoryCacheState(status.cache); + const cacheColor = colorForTone(cacheState.tone); const suffix = status.cache.enabled && typeof status.cache.entries === "number" ? ` (${status.cache.entries} entries)` : ""; lines.push( - `${label("Embedding cache")} ${colorize(rich, cacheColor, cacheState)}${suffix}`, + `${label("Embedding cache")} ${colorize(rich, cacheColor, cacheState.state)}${suffix}`, ); if (status.cache.enabled && typeof status.cache.maxEntries === "number") { lines.push(`${label("Cache cap")} ${info(String(status.cache.maxEntries))}`); diff --git a/src/commands/status.command.ts b/src/commands/status.command.ts index fc30a133a..0c1e7b667 100644 --- a/src/commands/status.command.ts +++ b/src/commands/status.command.ts @@ -7,6 +7,12 @@ import type { RuntimeEnv } from "../runtime.js"; import { runSecurityAudit } from "../security/audit.js"; import { renderTable } from "../terminal/table.js"; import { theme } from "../terminal/theme.js"; +import { + resolveMemoryCacheSummary, + resolveMemoryFtsState, + resolveMemoryVectorState, + type Tone, +} from "../memory/status-format.js"; import { formatHealthChannelLines, type HealthSummary } from "./health.js"; import { resolveControlUiLinks } from "./onboard-helpers.js"; import { getDaemonStatusSummary } from "./status.daemon.js"; @@ -250,33 +256,24 @@ export async function statusCommand( parts.push(`${memory.files} files · ${memory.chunks} chunks${dirtySuffix}`); if (memory.sources?.length) parts.push(`sources ${memory.sources.join(", ")}`); if (memoryPlugin.slot) parts.push(`plugin ${memoryPlugin.slot}`); + const colorByTone = (tone: Tone, text: string) => + tone === "ok" ? ok(text) : tone === "warn" ? warn(text) : muted(text); const vector = memory.vector; - parts.push( - vector?.enabled === false - ? muted("vector off") - : vector?.available - ? ok("vector ready") - : vector?.available === false - ? warn("vector unavailable") - : muted("vector unknown"), - ); + if (vector) { + const state = resolveMemoryVectorState(vector); + const label = state.state === "disabled" ? "vector off" : `vector ${state.state}`; + parts.push(colorByTone(state.tone, label)); + } const fts = memory.fts; if (fts) { - parts.push( - fts.enabled === false - ? muted("fts off") - : fts.available - ? ok("fts ready") - : warn("fts unavailable"), - ); + const state = resolveMemoryFtsState(fts); + const label = state.state === "disabled" ? "fts off" : `fts ${state.state}`; + parts.push(colorByTone(state.tone, label)); } const cache = memory.cache; if (cache) { - parts.push( - cache.enabled - ? ok(`cache on${typeof cache.entries === "number" ? ` (${cache.entries})` : ""}`) - : muted("cache off"), - ); + const summary = resolveMemoryCacheSummary(cache); + parts.push(colorByTone(summary.tone, summary.text)); } return parts.join(" · "); })(); diff --git a/src/memory/headers-fingerprint.ts b/src/memory/headers-fingerprint.ts new file mode 100644 index 000000000..ec9028e90 --- /dev/null +++ b/src/memory/headers-fingerprint.ts @@ -0,0 +1,16 @@ +function normalizeHeaderName(name: string): string { + return name.trim().toLowerCase(); +} + +export function fingerprintHeaderNames(headers: Record | undefined): string[] { + if (!headers) return []; + const out: string[] = []; + for (const key of Object.keys(headers)) { + const normalized = normalizeHeaderName(key); + if (!normalized) continue; + out.push(normalized); + } + out.sort((a, b) => a.localeCompare(b)); + return out; +} + diff --git a/src/memory/manager-cache-key.ts b/src/memory/manager-cache-key.ts new file mode 100644 index 000000000..763949825 --- /dev/null +++ b/src/memory/manager-cache-key.ts @@ -0,0 +1,55 @@ +import type { ResolvedMemorySearchConfig } from "../agents/memory-search.js"; + +import { hashText } from "./internal.js"; +import { fingerprintHeaderNames } from "./headers-fingerprint.js"; + +export function computeMemoryManagerCacheKey(params: { + agentId: string; + workspaceDir: string; + settings: ResolvedMemorySearchConfig; +}): string { + const settings = params.settings; + const fingerprint = hashText( + JSON.stringify({ + enabled: settings.enabled, + sources: [...settings.sources].sort((a, b) => a.localeCompare(b)), + provider: settings.provider, + model: settings.model, + fallback: settings.fallback, + local: { + modelPath: settings.local.modelPath, + modelCacheDir: settings.local.modelCacheDir, + }, + remote: settings.remote + ? { + baseUrl: settings.remote.baseUrl, + headerNames: fingerprintHeaderNames(settings.remote.headers), + batch: settings.remote.batch + ? { + enabled: settings.remote.batch.enabled, + wait: settings.remote.batch.wait, + concurrency: settings.remote.batch.concurrency, + pollIntervalMs: settings.remote.batch.pollIntervalMs, + timeoutMinutes: settings.remote.batch.timeoutMinutes, + } + : undefined, + } + : undefined, + experimental: settings.experimental, + store: { + driver: settings.store.driver, + path: settings.store.path, + vector: { + enabled: settings.store.vector.enabled, + extensionPath: settings.store.vector.extensionPath, + }, + }, + chunking: settings.chunking, + sync: settings.sync, + query: settings.query, + cache: settings.cache, + }), + ); + return `${params.agentId}:${params.workspaceDir}:${fingerprint}`; +} + diff --git a/src/memory/manager.ts b/src/memory/manager.ts index d73b73972..6c7a0c3d8 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -24,12 +24,10 @@ import { runOpenAiEmbeddingBatches, } from "./openai-batch.js"; import { - buildFileEntry, chunkMarkdown, ensureDir, hashText, isMemoryPath, - listMemoryFiles, type MemoryChunk, type MemoryFileEntry, normalizeRelPath, @@ -38,8 +36,13 @@ import { import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js"; import { searchKeyword, searchVector } from "./manager-search.js"; import { ensureMemoryIndexSchema } from "./memory-schema.js"; +import { computeMemoryManagerCacheKey } from "./manager-cache-key.js"; +import { computeEmbeddingProviderKey } from "./provider-key.js"; import { requireNodeSqlite } from "./sqlite.js"; import { loadSqliteVecExtension } from "./sqlite-vec.js"; +import type { SessionFileEntry } from "./session-files.js"; +import { syncMemoryFiles } from "./sync-memory-files.js"; +import { syncSessionFiles } from "./sync-session-files.js"; type MemorySource = "memory" | "sessions"; @@ -61,15 +64,6 @@ type MemoryIndexMeta = { vectorDims?: number; }; -type SessionFileEntry = { - path: string; - absPath: string; - mtimeMs: number; - size: number; - hash: string; - content: string; -}; - type MemorySyncProgressUpdate = { completed: number; total: number; @@ -157,7 +151,7 @@ export class MemoryIndexManager { const settings = resolveMemorySearchConfig(cfg, agentId); if (!settings) return null; const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId); - const key = `${agentId}:${workspaceDir}:${JSON.stringify(settings)}`; + const key = computeMemoryManagerCacheKey({ agentId, workspaceDir, settings }); const existing = INDEX_CACHE.get(key); if (existing) return existing; const providerResult = await createEmbeddingProvider({ @@ -200,7 +194,13 @@ export class MemoryIndexManager { this.openAi = params.providerResult.openAi; this.sources = new Set(params.settings.sources); this.db = this.openDatabase(); - this.providerKey = this.computeProviderKey(); + this.providerKey = computeEmbeddingProviderKey({ + providerId: this.provider.id, + providerModel: this.provider.model, + openAi: this.openAi + ? { baseUrl: this.openAi.baseUrl, model: this.openAi.model, headers: this.openAi.headers } + : undefined, + }); this.cache = { enabled: params.settings.cache.enabled, maxEntries: params.settings.cache.maxEntries, @@ -714,170 +714,43 @@ export class MemoryIndexManager { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { - const files = await listMemoryFiles(this.workspaceDir); - const fileEntries = await Promise.all( - files.map(async (file) => buildFileEntry(file, this.workspaceDir)), - ); - log.debug("memory sync: indexing memory files", { - files: fileEntries.length, + await syncMemoryFiles({ + workspaceDir: this.workspaceDir, + db: this.db, needsFullReindex: params.needsFullReindex, - batch: this.batch.enabled, + progress: params.progress, + batchEnabled: this.batch.enabled, concurrency: this.getIndexConcurrency(), + runWithConcurrency: this.runWithConcurrency.bind(this), + indexFile: async (entry) => await this.indexFile(entry, { source: "memory" }), + vectorTable: VECTOR_TABLE, + ftsTable: FTS_TABLE, + ftsEnabled: this.fts.enabled, + ftsAvailable: this.fts.available, + model: this.provider.model, }); - const activePaths = new Set(fileEntries.map((entry) => entry.path)); - if (params.progress) { - params.progress.total += fileEntries.length; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - label: this.batch.enabled ? "Indexing memory files (batch)..." : "Indexing memory files…", - }); - } - - const tasks = fileEntries.map((entry) => async () => { - const record = this.db - .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) - .get(entry.path, "memory") as { hash: string } | undefined; - if (!params.needsFullReindex && record?.hash === entry.hash) { - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - return; - } - await this.indexFile(entry, { source: "memory" }); - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - }); - await this.runWithConcurrency(tasks, this.getIndexConcurrency()); - - const staleRows = this.db - .prepare(`SELECT path FROM files WHERE source = ?`) - .all("memory") as Array<{ path: string }>; - for (const stale of staleRows) { - if (activePaths.has(stale.path)) continue; - this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); - try { - this.db - .prepare( - `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, - ) - .run(stale.path, "memory"); - } catch {} - this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); - if (this.fts.enabled && this.fts.available) { - try { - this.db - .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) - .run(stale.path, "memory", this.provider.model); - } catch {} - } - } } private async syncSessionFiles(params: { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { - const files = await this.listSessionFiles(); - const activePaths = new Set(files.map((file) => this.sessionPathForFile(file))); - const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0; - log.debug("memory sync: indexing session files", { - files: files.length, - indexAll, - dirtyFiles: this.sessionsDirtyFiles.size, - batch: this.batch.enabled, + await syncSessionFiles({ + agentId: this.agentId, + db: this.db, + needsFullReindex: params.needsFullReindex, + progress: params.progress, + batchEnabled: this.batch.enabled, concurrency: this.getIndexConcurrency(), + runWithConcurrency: this.runWithConcurrency.bind(this), + indexFile: async (entry) => await this.indexFile(entry, { source: "sessions", content: entry.content }), + vectorTable: VECTOR_TABLE, + ftsTable: FTS_TABLE, + ftsEnabled: this.fts.enabled, + ftsAvailable: this.fts.available, + model: this.provider.model, + dirtyFiles: this.sessionsDirtyFiles, }); - if (params.progress) { - params.progress.total += files.length; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - label: this.batch.enabled ? "Indexing session files (batch)..." : "Indexing session files…", - }); - } - - const tasks = files.map((absPath) => async () => { - if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) { - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - return; - } - const entry = await this.buildSessionEntry(absPath); - if (!entry) { - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - return; - } - const record = this.db - .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) - .get(entry.path, "sessions") as { hash: string } | undefined; - if (!params.needsFullReindex && record?.hash === entry.hash) { - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - return; - } - await this.indexFile(entry, { source: "sessions", content: entry.content }); - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - }); - await this.runWithConcurrency(tasks, this.getIndexConcurrency()); - - const staleRows = this.db - .prepare(`SELECT path FROM files WHERE source = ?`) - .all("sessions") as Array<{ path: string }>; - for (const stale of staleRows) { - if (activePaths.has(stale.path)) continue; - this.db - .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) - .run(stale.path, "sessions"); - try { - this.db - .prepare( - `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, - ) - .run(stale.path, "sessions"); - } catch {} - this.db - .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) - .run(stale.path, "sessions"); - if (this.fts.enabled && this.fts.available) { - try { - this.db - .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) - .run(stale.path, "sessions", this.provider.model); - } catch {} - } - } } private createSyncProgress( @@ -993,95 +866,6 @@ export class MemoryIndexManager { .run(META_KEY, value); } - private async listSessionFiles(): Promise { - const dir = resolveSessionTranscriptsDirForAgent(this.agentId); - try { - const entries = await fs.readdir(dir, { withFileTypes: true }); - return entries - .filter((entry) => entry.isFile()) - .map((entry) => entry.name) - .filter((name) => name.endsWith(".jsonl")) - .map((name) => path.join(dir, name)); - } catch { - return []; - } - } - - private sessionPathForFile(absPath: string): string { - return path.join("sessions", path.basename(absPath)).replace(/\\/g, "/"); - } - - private normalizeSessionText(value: string): string { - return value - .replace(/\s*\n+\s*/g, " ") - .replace(/\s+/g, " ") - .trim(); - } - - private extractSessionText(content: unknown): string | null { - if (typeof content === "string") { - const normalized = this.normalizeSessionText(content); - return normalized ? normalized : null; - } - if (!Array.isArray(content)) return null; - const parts: string[] = []; - for (const block of content) { - if (!block || typeof block !== "object") continue; - const record = block as { type?: unknown; text?: unknown }; - if (record.type !== "text" || typeof record.text !== "string") continue; - const normalized = this.normalizeSessionText(record.text); - if (normalized) parts.push(normalized); - } - if (parts.length === 0) return null; - return parts.join(" "); - } - - private async buildSessionEntry(absPath: string): Promise { - try { - const stat = await fs.stat(absPath); - const raw = await fs.readFile(absPath, "utf-8"); - const lines = raw.split("\n"); - const collected: string[] = []; - for (const line of lines) { - if (!line.trim()) continue; - let record: unknown; - try { - record = JSON.parse(line); - } catch { - continue; - } - if ( - !record || - typeof record !== "object" || - (record as { type?: unknown }).type !== "message" - ) { - continue; - } - const message = (record as { message?: unknown }).message as - | { role?: unknown; content?: unknown } - | undefined; - if (!message || typeof message.role !== "string") continue; - if (message.role !== "user" && message.role !== "assistant") continue; - const text = this.extractSessionText(message.content); - if (!text) continue; - const label = message.role === "user" ? "User" : "Assistant"; - collected.push(`${label}: ${text}`); - } - const content = collected.join("\n"); - return { - path: this.sessionPathForFile(absPath), - absPath, - mtimeMs: stat.mtimeMs, - size: stat.size, - hash: hashText(content), - content, - }; - } catch (err) { - log.debug(`Failed reading session file ${absPath}: ${String(err)}`); - return null; - } - } - private estimateEmbeddingTokens(text: string): number { if (!text) return 0; return Math.ceil(text.length / EMBEDDING_APPROX_CHARS_PER_TOKEN); @@ -1233,24 +1017,6 @@ export class MemoryIndexManager { return embeddings; } - private computeProviderKey(): string { - if (this.provider.id === "openai" && this.openAi) { - const entries = Object.entries(this.openAi.headers) - .filter(([key]) => key.toLowerCase() !== "authorization") - .sort(([a], [b]) => a.localeCompare(b)) - .map(([key, value]) => [key, value]); - return hashText( - JSON.stringify({ - provider: "openai", - baseUrl: this.openAi.baseUrl, - model: this.openAi.model, - headers: entries, - }), - ); - } - return hashText(JSON.stringify({ provider: this.provider.id, model: this.provider.model })); - } - private async embedChunksWithBatch( chunks: MemoryChunk[], entry: MemoryFileEntry | SessionFileEntry, diff --git a/src/memory/provider-key.ts b/src/memory/provider-key.ts new file mode 100644 index 000000000..f7d755c49 --- /dev/null +++ b/src/memory/provider-key.ts @@ -0,0 +1,22 @@ +import { hashText } from "./internal.js"; +import { fingerprintHeaderNames } from "./headers-fingerprint.js"; + +export function computeEmbeddingProviderKey(params: { + providerId: string; + providerModel: string; + openAi?: { baseUrl: string; model: string; headers: Record }; +}): string { + if (params.openAi) { + const headerNames = fingerprintHeaderNames(params.openAi.headers); + return hashText( + JSON.stringify({ + provider: "openai", + baseUrl: params.openAi.baseUrl, + model: params.openAi.model, + headerNames, + }), + ); + } + return hashText(JSON.stringify({ provider: params.providerId, model: params.providerModel })); +} + diff --git a/src/memory/session-files.ts b/src/memory/session-files.ts new file mode 100644 index 000000000..364777f18 --- /dev/null +++ b/src/memory/session-files.ts @@ -0,0 +1,103 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; +import { createSubsystemLogger } from "../logging.js"; +import { hashText } from "./internal.js"; + +const log = createSubsystemLogger("memory"); + +export type SessionFileEntry = { + path: string; + absPath: string; + mtimeMs: number; + size: number; + hash: string; + content: string; +}; + +export async function listSessionFilesForAgent(agentId: string): Promise { + const dir = resolveSessionTranscriptsDirForAgent(agentId); + try { + const entries = await fs.readdir(dir, { withFileTypes: true }); + return entries + .filter((entry) => entry.isFile()) + .map((entry) => entry.name) + .filter((name) => name.endsWith(".jsonl")) + .map((name) => path.join(dir, name)); + } catch { + return []; + } +} + +export function sessionPathForFile(absPath: string): string { + return path.join("sessions", path.basename(absPath)).replace(/\\/g, "/"); +} + +function normalizeSessionText(value: string): string { + return value + .replace(/\s*\n+\s*/g, " ") + .replace(/\s+/g, " ") + .trim(); +} + +export function extractSessionText(content: unknown): string | null { + if (typeof content === "string") { + const normalized = normalizeSessionText(content); + return normalized ? normalized : null; + } + if (!Array.isArray(content)) return null; + const parts: string[] = []; + for (const block of content) { + if (!block || typeof block !== "object") continue; + const record = block as { type?: unknown; text?: unknown }; + if (record.type !== "text" || typeof record.text !== "string") continue; + const normalized = normalizeSessionText(record.text); + if (normalized) parts.push(normalized); + } + if (parts.length === 0) return null; + return parts.join(" "); +} + +export async function buildSessionEntry(absPath: string): Promise { + try { + const stat = await fs.stat(absPath); + const raw = await fs.readFile(absPath, "utf-8"); + const lines = raw.split("\n"); + const collected: string[] = []; + for (const line of lines) { + if (!line.trim()) continue; + let record: unknown; + try { + record = JSON.parse(line); + } catch { + continue; + } + if (!record || typeof record !== "object" || (record as { type?: unknown }).type !== "message") { + continue; + } + const message = (record as { message?: unknown }).message as + | { role?: unknown; content?: unknown } + | undefined; + if (!message || typeof message.role !== "string") continue; + if (message.role !== "user" && message.role !== "assistant") continue; + const text = extractSessionText(message.content); + if (!text) continue; + const label = message.role === "user" ? "User" : "Assistant"; + collected.push(`${label}: ${text}`); + } + const content = collected.join("\n"); + return { + path: sessionPathForFile(absPath), + absPath, + mtimeMs: stat.mtimeMs, + size: stat.size, + hash: hashText(content), + content, + }; + } catch (err) { + log.debug(`Failed reading session file ${absPath}: ${String(err)}`); + return null; + } +} + diff --git a/src/memory/status-format.ts b/src/memory/status-format.ts new file mode 100644 index 000000000..ff28919f2 --- /dev/null +++ b/src/memory/status-format.ts @@ -0,0 +1,34 @@ +export type Tone = "ok" | "warn" | "muted"; + +export function resolveMemoryVectorState(vector: { + enabled: boolean; + available?: boolean; +}): { tone: Tone; state: "ready" | "unavailable" | "disabled" | "unknown" } { + if (vector.enabled === false) return { tone: "muted", state: "disabled" }; + if (vector.available === true) return { tone: "ok", state: "ready" }; + if (vector.available === false) return { tone: "warn", state: "unavailable" }; + return { tone: "muted", state: "unknown" }; +} + +export function resolveMemoryFtsState(fts: { + enabled: boolean; + available: boolean; +}): { tone: Tone; state: "ready" | "unavailable" | "disabled" } { + if (fts.enabled === false) return { tone: "muted", state: "disabled" }; + return fts.available ? { tone: "ok", state: "ready" } : { tone: "warn", state: "unavailable" }; +} + +export function resolveMemoryCacheSummary(cache: { + enabled: boolean; + entries?: number; +}): { tone: Tone; text: string } { + if (!cache.enabled) return { tone: "muted", text: "cache off" }; + const suffix = typeof cache.entries === "number" ? ` (${cache.entries})` : ""; + return { tone: "ok", text: `cache on${suffix}` }; +} + +export function resolveMemoryCacheState(cache: { + enabled: boolean; +}): { tone: Tone; state: "enabled" | "disabled" } { + return cache.enabled ? { tone: "ok", state: "enabled" } : { tone: "muted", state: "disabled" }; +} diff --git a/src/memory/sync-memory-files.ts b/src/memory/sync-memory-files.ts new file mode 100644 index 000000000..d7bcd19c7 --- /dev/null +++ b/src/memory/sync-memory-files.ts @@ -0,0 +1,102 @@ +import type { DatabaseSync } from "node:sqlite"; + +import { createSubsystemLogger } from "../logging.js"; +import { + buildFileEntry, + listMemoryFiles, + type MemoryFileEntry, +} from "./internal.js"; + +const log = createSubsystemLogger("memory"); + +type ProgressState = { + completed: number; + total: number; + label?: string; + report: (update: { completed: number; total: number; label?: string }) => void; +}; + +export async function syncMemoryFiles(params: { + workspaceDir: string; + db: DatabaseSync; + needsFullReindex: boolean; + progress?: ProgressState; + batchEnabled: boolean; + concurrency: number; + runWithConcurrency: (tasks: Array<() => Promise>, concurrency: number) => Promise; + indexFile: (entry: MemoryFileEntry) => Promise; + vectorTable: string; + ftsTable: string; + ftsEnabled: boolean; + ftsAvailable: boolean; + model: string; +}) { + const files = await listMemoryFiles(params.workspaceDir); + const fileEntries = await Promise.all(files.map(async (file) => buildFileEntry(file, params.workspaceDir))); + + log.debug("memory sync: indexing memory files", { + files: fileEntries.length, + needsFullReindex: params.needsFullReindex, + batch: params.batchEnabled, + concurrency: params.concurrency, + }); + + const activePaths = new Set(fileEntries.map((entry) => entry.path)); + if (params.progress) { + params.progress.total += fileEntries.length; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + label: params.batchEnabled ? "Indexing memory files (batch)..." : "Indexing memory files…", + }); + } + + const tasks = fileEntries.map((entry) => async () => { + const record = params.db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(entry.path, "memory") as { hash: string } | undefined; + if (!params.needsFullReindex && record?.hash === entry.hash) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + await params.indexFile(entry); + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + }); + + await params.runWithConcurrency(tasks, params.concurrency); + + const staleRows = params.db + .prepare(`SELECT path FROM files WHERE source = ?`) + .all("memory") as Array<{ path: string }>; + for (const stale of staleRows) { + if (activePaths.has(stale.path)) continue; + params.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); + try { + params.db + .prepare( + `DELETE FROM ${params.vectorTable} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, + ) + .run(stale.path, "memory"); + } catch {} + params.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); + if (params.ftsEnabled && params.ftsAvailable) { + try { + params.db + .prepare(`DELETE FROM ${params.ftsTable} WHERE path = ? AND source = ? AND model = ?`) + .run(stale.path, "memory", params.model); + } catch {} + } + } +} diff --git a/src/memory/sync-session-files.ts b/src/memory/sync-session-files.ts new file mode 100644 index 000000000..d5c486759 --- /dev/null +++ b/src/memory/sync-session-files.ts @@ -0,0 +1,126 @@ +import type { DatabaseSync } from "node:sqlite"; + +import { createSubsystemLogger } from "../logging.js"; +import type { SessionFileEntry } from "./session-files.js"; +import { buildSessionEntry, listSessionFilesForAgent, sessionPathForFile } from "./session-files.js"; + +const log = createSubsystemLogger("memory"); + +type ProgressState = { + completed: number; + total: number; + label?: string; + report: (update: { completed: number; total: number; label?: string }) => void; +}; + +export async function syncSessionFiles(params: { + agentId: string; + db: DatabaseSync; + needsFullReindex: boolean; + progress?: ProgressState; + batchEnabled: boolean; + concurrency: number; + runWithConcurrency: (tasks: Array<() => Promise>, concurrency: number) => Promise; + indexFile: (entry: SessionFileEntry) => Promise; + vectorTable: string; + ftsTable: string; + ftsEnabled: boolean; + ftsAvailable: boolean; + model: string; + dirtyFiles: Set; +}) { + const files = await listSessionFilesForAgent(params.agentId); + const activePaths = new Set(files.map((file) => sessionPathForFile(file))); + const indexAll = params.needsFullReindex || params.dirtyFiles.size === 0; + + log.debug("memory sync: indexing session files", { + files: files.length, + indexAll, + dirtyFiles: params.dirtyFiles.size, + batch: params.batchEnabled, + concurrency: params.concurrency, + }); + + if (params.progress) { + params.progress.total += files.length; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + label: params.batchEnabled ? "Indexing session files (batch)..." : "Indexing session files…", + }); + } + + const tasks = files.map((absPath) => async () => { + if (!indexAll && !params.dirtyFiles.has(absPath)) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + const entry = await buildSessionEntry(absPath); + if (!entry) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + const record = params.db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(entry.path, "sessions") as { hash: string } | undefined; + if (!params.needsFullReindex && record?.hash === entry.hash) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + await params.indexFile(entry); + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + }); + + await params.runWithConcurrency(tasks, params.concurrency); + + const staleRows = params.db + .prepare(`SELECT path FROM files WHERE source = ?`) + .all("sessions") as Array<{ path: string }>; + for (const stale of staleRows) { + if (activePaths.has(stale.path)) continue; + params.db + .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) + .run(stale.path, "sessions"); + try { + params.db + .prepare( + `DELETE FROM ${params.vectorTable} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, + ) + .run(stale.path, "sessions"); + } catch {} + params.db + .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) + .run(stale.path, "sessions"); + if (params.ftsEnabled && params.ftsAvailable) { + try { + params.db + .prepare(`DELETE FROM ${params.ftsTable} WHERE path = ? AND source = ? AND model = ?`) + .run(stale.path, "sessions", params.model); + } catch {} + } + } +}