import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; import type { DatabaseSync } from "node:sqlite"; import chokidar, { type FSWatcher } from "chokidar"; import { resolveAgentDir, resolveAgentWorkspaceDir } from "../agents/agent-scope.js"; import type { ResolvedMemorySearchConfig } from "../agents/memory-search.js"; import { resolveMemorySearchConfig } from "../agents/memory-search.js"; import type { ClawdbotConfig } from "../config/config.js"; import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { resolveUserPath } from "../utils.js"; import { createEmbeddingProvider, type EmbeddingProvider, type EmbeddingProviderResult, type GeminiEmbeddingClient, type OpenAiEmbeddingClient, } from "./embeddings.js"; import { DEFAULT_GEMINI_EMBEDDING_MODEL } from "./embeddings-gemini.js"; import { DEFAULT_OPENAI_EMBEDDING_MODEL } from "./embeddings-openai.js"; import { OPENAI_BATCH_ENDPOINT, type OpenAiBatchRequest, runOpenAiEmbeddingBatches, } from "./batch-openai.js"; import { runGeminiEmbeddingBatches, type GeminiBatchRequest } from "./batch-gemini.js"; import { buildFileEntry, chunkMarkdown, ensureDir, hashText, isMemoryPath, listMemoryFiles, type MemoryChunk, type MemoryFileEntry, normalizeRelPath, parseEmbedding, } from "./internal.js"; import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js"; import { searchKeyword, searchVector } from "./manager-search.js"; import { ensureMemoryIndexSchema } from "./memory-schema.js"; import { requireNodeSqlite } from "./sqlite.js"; import { loadSqliteVecExtension } from "./sqlite-vec.js"; type MemorySource = "memory" | "sessions"; export type MemorySearchResult = { path: string; startLine: number; endLine: number; score: number; snippet: string; source: MemorySource; }; type MemoryIndexMeta = { model: string; provider: string; providerKey?: string; chunkTokens: number; chunkOverlap: number; vectorDims?: number; }; type SessionFileEntry = { path: string; absPath: string; mtimeMs: number; size: number; hash: string; content: string; }; type MemorySyncProgressUpdate = { completed: number; total: number; label?: string; }; type MemorySyncProgressState = { completed: number; total: number; label?: string; report: (update: MemorySyncProgressUpdate) => void; }; const META_KEY = "memory_index_meta_v1"; const SNIPPET_MAX_CHARS = 700; const VECTOR_TABLE = "chunks_vec"; const FTS_TABLE = "chunks_fts"; const EMBEDDING_CACHE_TABLE = "embedding_cache"; const SESSION_DIRTY_DEBOUNCE_MS = 5000; const EMBEDDING_BATCH_MAX_TOKENS = 8000; const EMBEDDING_APPROX_CHARS_PER_TOKEN = 1; const EMBEDDING_INDEX_CONCURRENCY = 4; const EMBEDDING_RETRY_MAX_ATTEMPTS = 3; const EMBEDDING_RETRY_BASE_DELAY_MS = 500; const EMBEDDING_RETRY_MAX_DELAY_MS = 8000; const BATCH_FAILURE_LIMIT = 2; const SESSION_DELTA_READ_CHUNK_BYTES = 64 * 1024; const VECTOR_LOAD_TIMEOUT_MS = 30_000; const EMBEDDING_QUERY_TIMEOUT_REMOTE_MS = 60_000; const EMBEDDING_QUERY_TIMEOUT_LOCAL_MS = 5 * 60_000; const EMBEDDING_BATCH_TIMEOUT_REMOTE_MS = 2 * 60_000; const EMBEDDING_BATCH_TIMEOUT_LOCAL_MS = 10 * 60_000; const log = createSubsystemLogger("memory"); const INDEX_CACHE = new Map(); const vectorToBlob = (embedding: number[]): Buffer => Buffer.from(new Float32Array(embedding).buffer); export class MemoryIndexManager { private readonly cacheKey: string; private readonly cfg: ClawdbotConfig; private readonly agentId: string; private readonly workspaceDir: string; private readonly settings: ResolvedMemorySearchConfig; private provider: EmbeddingProvider; private readonly requestedProvider: "openai" | "local" | "gemini" | "auto"; private fallbackFrom?: "openai" | "local" | "gemini"; private fallbackReason?: string; private openAi?: OpenAiEmbeddingClient; private gemini?: GeminiEmbeddingClient; private batch: { enabled: boolean; wait: boolean; concurrency: number; pollIntervalMs: number; timeoutMs: number; }; private batchFailureCount = 0; private batchFailureLastError?: string; private batchFailureLastProvider?: string; private batchFailureLock: Promise = Promise.resolve(); private db: DatabaseSync; private readonly sources: Set; private providerKey: string; private readonly cache: { enabled: boolean; maxEntries?: number }; private readonly vector: { enabled: boolean; available: boolean | null; extensionPath?: string; loadError?: string; dims?: number; }; private readonly fts: { enabled: boolean; available: boolean; loadError?: string; }; private vectorReady: Promise | null = null; private watcher: FSWatcher | null = null; private watchTimer: NodeJS.Timeout | null = null; private sessionWatchTimer: NodeJS.Timeout | null = null; private sessionUnsubscribe: (() => void) | null = null; private intervalTimer: NodeJS.Timeout | null = null; private closed = false; private dirty = false; private sessionsDirty = false; private sessionsDirtyFiles = new Set(); private sessionPendingFiles = new Set(); private sessionDeltas = new Map< string, { lastSize: number; pendingBytes: number; pendingMessages: number } >(); private sessionWarm = new Set(); private syncing: Promise | null = null; static async get(params: { cfg: ClawdbotConfig; agentId: string; }): Promise { const { cfg, agentId } = params; const settings = resolveMemorySearchConfig(cfg, agentId); if (!settings) return null; const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId); const key = `${agentId}:${workspaceDir}:${JSON.stringify(settings)}`; const existing = INDEX_CACHE.get(key); if (existing) return existing; const providerResult = await createEmbeddingProvider({ config: cfg, agentDir: resolveAgentDir(cfg, agentId), provider: settings.provider, remote: settings.remote, model: settings.model, fallback: settings.fallback, local: settings.local, }); const manager = new MemoryIndexManager({ cacheKey: key, cfg, agentId, workspaceDir, settings, providerResult, }); INDEX_CACHE.set(key, manager); return manager; } private constructor(params: { cacheKey: string; cfg: ClawdbotConfig; agentId: string; workspaceDir: string; settings: ResolvedMemorySearchConfig; providerResult: EmbeddingProviderResult; }) { this.cacheKey = params.cacheKey; this.cfg = params.cfg; this.agentId = params.agentId; this.workspaceDir = params.workspaceDir; this.settings = params.settings; this.provider = params.providerResult.provider; this.requestedProvider = params.providerResult.requestedProvider; this.fallbackFrom = params.providerResult.fallbackFrom; this.fallbackReason = params.providerResult.fallbackReason; this.openAi = params.providerResult.openAi; this.gemini = params.providerResult.gemini; this.sources = new Set(params.settings.sources); this.db = this.openDatabase(); this.providerKey = this.computeProviderKey(); this.cache = { enabled: params.settings.cache.enabled, maxEntries: params.settings.cache.maxEntries, }; this.fts = { enabled: params.settings.query.hybrid.enabled, available: false }; this.ensureSchema(); this.vector = { enabled: params.settings.store.vector.enabled, available: null, extensionPath: params.settings.store.vector.extensionPath, }; const meta = this.readMeta(); if (meta?.vectorDims) { this.vector.dims = meta.vectorDims; } this.ensureWatcher(); this.ensureSessionListener(); this.ensureIntervalSync(); this.dirty = this.sources.has("memory"); this.batch = this.resolveBatchConfig(); } async warmSession(sessionKey?: string): Promise { if (!this.settings.sync.onSessionStart) return; const key = sessionKey?.trim() || ""; if (key && this.sessionWarm.has(key)) return; void this.sync({ reason: "session-start" }).catch((err) => { log.warn(`memory sync failed (session-start): ${String(err)}`); }); if (key) this.sessionWarm.add(key); } async search( query: string, opts?: { maxResults?: number; minScore?: number; sessionKey?: string; }, ): Promise { void this.warmSession(opts?.sessionKey); if (this.settings.sync.onSearch && (this.dirty || this.sessionsDirty)) { void this.sync({ reason: "search" }).catch((err) => { log.warn(`memory sync failed (search): ${String(err)}`); }); } const cleaned = query.trim(); if (!cleaned) return []; const minScore = opts?.minScore ?? this.settings.query.minScore; const maxResults = opts?.maxResults ?? this.settings.query.maxResults; const hybrid = this.settings.query.hybrid; const candidates = Math.min( 200, Math.max(1, Math.floor(maxResults * hybrid.candidateMultiplier)), ); const keywordResults = hybrid.enabled ? await this.searchKeyword(cleaned, candidates).catch(() => []) : []; const queryVec = await this.embedQueryWithTimeout(cleaned); const hasVector = queryVec.some((v) => v !== 0); const vectorResults = hasVector ? await this.searchVector(queryVec, candidates).catch(() => []) : []; if (!hybrid.enabled) { return vectorResults.filter((entry) => entry.score >= minScore).slice(0, maxResults); } const merged = this.mergeHybridResults({ vector: vectorResults, keyword: keywordResults, vectorWeight: hybrid.vectorWeight, textWeight: hybrid.textWeight, }); return merged.filter((entry) => entry.score >= minScore).slice(0, maxResults); } private async searchVector( queryVec: number[], limit: number, ): Promise> { const results = await searchVector({ db: this.db, vectorTable: VECTOR_TABLE, providerModel: this.provider.model, queryVec, limit, snippetMaxChars: SNIPPET_MAX_CHARS, ensureVectorReady: async (dimensions) => await this.ensureVectorReady(dimensions), sourceFilterVec: this.buildSourceFilter("c"), sourceFilterChunks: this.buildSourceFilter(), }); return results.map((entry) => entry as MemorySearchResult & { id: string }); } private buildFtsQuery(raw: string): string | null { return buildFtsQuery(raw); } private async searchKeyword( query: string, limit: number, ): Promise> { if (!this.fts.enabled || !this.fts.available) return []; const sourceFilter = this.buildSourceFilter(); const results = await searchKeyword({ db: this.db, ftsTable: FTS_TABLE, providerModel: this.provider.model, query, limit, snippetMaxChars: SNIPPET_MAX_CHARS, sourceFilter, buildFtsQuery: (raw) => this.buildFtsQuery(raw), bm25RankToScore, }); return results.map((entry) => entry as MemorySearchResult & { id: string; textScore: number }); } private mergeHybridResults(params: { vector: Array; keyword: Array; vectorWeight: number; textWeight: number; }): MemorySearchResult[] { const merged = mergeHybridResults({ vector: params.vector.map((r) => ({ id: r.id, path: r.path, startLine: r.startLine, endLine: r.endLine, source: r.source, snippet: r.snippet, vectorScore: r.score, })), keyword: params.keyword.map((r) => ({ id: r.id, path: r.path, startLine: r.startLine, endLine: r.endLine, source: r.source, snippet: r.snippet, textScore: r.textScore, })), vectorWeight: params.vectorWeight, textWeight: params.textWeight, }); return merged.map((entry) => entry as MemorySearchResult); } async sync(params?: { reason?: string; force?: boolean; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise { if (this.syncing) return this.syncing; this.syncing = this.runSync(params).finally(() => { this.syncing = null; }); return this.syncing; } async readFile(params: { relPath: string; from?: number; lines?: number; }): Promise<{ text: string; path: string }> { const relPath = normalizeRelPath(params.relPath); if (!relPath || !isMemoryPath(relPath)) { throw new Error("path required"); } const absPath = path.resolve(this.workspaceDir, relPath); if (!absPath.startsWith(this.workspaceDir)) { throw new Error("path escapes workspace"); } const content = await fs.readFile(absPath, "utf-8"); if (!params.from && !params.lines) { return { text: content, path: relPath }; } const lines = content.split("\n"); const start = Math.max(1, params.from ?? 1); const count = Math.max(1, params.lines ?? lines.length); const slice = lines.slice(start - 1, start - 1 + count); return { text: slice.join("\n"), path: relPath }; } status(): { files: number; chunks: number; dirty: boolean; workspaceDir: string; dbPath: string; provider: string; model: string; requestedProvider: string; sources: MemorySource[]; sourceCounts: Array<{ source: MemorySource; files: number; chunks: number }>; cache?: { enabled: boolean; entries?: number; maxEntries?: number }; fts?: { enabled: boolean; available: boolean; error?: string }; fallback?: { from: string; reason?: string }; vector?: { enabled: boolean; available?: boolean; extensionPath?: string; loadError?: string; dims?: number; }; batch?: { enabled: boolean; failures: number; limit: number; wait: boolean; concurrency: number; pollIntervalMs: number; timeoutMs: number; lastError?: string; lastProvider?: string; }; } { const sourceFilter = this.buildSourceFilter(); const files = this.db .prepare(`SELECT COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql}`) .get(...sourceFilter.params) as { c: number; }; const chunks = this.db .prepare(`SELECT COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql}`) .get(...sourceFilter.params) as { c: number; }; const sourceCounts = (() => { const sources = Array.from(this.sources); if (sources.length === 0) return []; const bySource = new Map(); for (const source of sources) { bySource.set(source, { files: 0, chunks: 0 }); } const fileRows = this.db .prepare( `SELECT source, COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql} GROUP BY source`, ) .all(...sourceFilter.params) as Array<{ source: MemorySource; c: number }>; for (const row of fileRows) { const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 }; entry.files = row.c ?? 0; bySource.set(row.source, entry); } const chunkRows = this.db .prepare( `SELECT source, COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql} GROUP BY source`, ) .all(...sourceFilter.params) as Array<{ source: MemorySource; c: number }>; for (const row of chunkRows) { const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 }; entry.chunks = row.c ?? 0; bySource.set(row.source, entry); } return sources.map((source) => ({ source, ...bySource.get(source)! })); })(); return { files: files?.c ?? 0, chunks: chunks?.c ?? 0, dirty: this.dirty, workspaceDir: this.workspaceDir, dbPath: this.settings.store.path, provider: this.provider.id, model: this.provider.model, requestedProvider: this.requestedProvider, sources: Array.from(this.sources), sourceCounts, cache: this.cache.enabled ? { enabled: true, entries: ( this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as | { c: number } | undefined )?.c ?? 0, maxEntries: this.cache.maxEntries, } : { enabled: false, maxEntries: this.cache.maxEntries }, fts: { enabled: this.fts.enabled, available: this.fts.available, error: this.fts.loadError, }, fallback: this.fallbackReason ? { from: this.fallbackFrom ?? "local", reason: this.fallbackReason } : undefined, vector: { enabled: this.vector.enabled, available: this.vector.available ?? undefined, extensionPath: this.vector.extensionPath, loadError: this.vector.loadError, dims: this.vector.dims, }, batch: { enabled: this.batch.enabled, failures: this.batchFailureCount, limit: BATCH_FAILURE_LIMIT, wait: this.batch.wait, concurrency: this.batch.concurrency, pollIntervalMs: this.batch.pollIntervalMs, timeoutMs: this.batch.timeoutMs, lastError: this.batchFailureLastError, lastProvider: this.batchFailureLastProvider, }, }; } async probeVectorAvailability(): Promise { if (!this.vector.enabled) return false; return this.ensureVectorReady(); } async probeEmbeddingAvailability(): Promise<{ ok: boolean; error?: string }> { try { await this.embedBatchWithRetry(["ping"]); return { ok: true }; } catch (err) { const message = err instanceof Error ? err.message : String(err); return { ok: false, error: message }; } } async close(): Promise { if (this.closed) return; this.closed = true; if (this.watchTimer) { clearTimeout(this.watchTimer); this.watchTimer = null; } if (this.sessionWatchTimer) { clearTimeout(this.sessionWatchTimer); this.sessionWatchTimer = null; } if (this.intervalTimer) { clearInterval(this.intervalTimer); this.intervalTimer = null; } if (this.watcher) { await this.watcher.close(); this.watcher = null; } if (this.sessionUnsubscribe) { this.sessionUnsubscribe(); this.sessionUnsubscribe = null; } this.db.close(); INDEX_CACHE.delete(this.cacheKey); } private async ensureVectorReady(dimensions?: number): Promise { if (!this.vector.enabled) return false; if (!this.vectorReady) { this.vectorReady = this.withTimeout( this.loadVectorExtension(), VECTOR_LOAD_TIMEOUT_MS, `sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`, ); } let ready = false; try { ready = await this.vectorReady; } catch (err) { const message = err instanceof Error ? err.message : String(err); this.vector.available = false; this.vector.loadError = message; this.vectorReady = null; log.warn(`sqlite-vec unavailable: ${message}`); return false; } if (ready && typeof dimensions === "number" && dimensions > 0) { this.ensureVectorTable(dimensions); } return ready; } private async loadVectorExtension(): Promise { if (this.vector.available !== null) return this.vector.available; if (!this.vector.enabled) { this.vector.available = false; return false; } try { const resolvedPath = this.vector.extensionPath?.trim() ? resolveUserPath(this.vector.extensionPath) : undefined; const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath }); if (!loaded.ok) throw new Error(loaded.error ?? "unknown sqlite-vec load error"); this.vector.extensionPath = loaded.extensionPath; this.vector.available = true; return true; } catch (err) { const message = err instanceof Error ? err.message : String(err); this.vector.available = false; this.vector.loadError = message; log.warn(`sqlite-vec unavailable: ${message}`); return false; } } private ensureVectorTable(dimensions: number): void { if (this.vector.dims === dimensions) return; if (this.vector.dims && this.vector.dims !== dimensions) { this.dropVectorTable(); } this.db.exec( `CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` + ` id TEXT PRIMARY KEY,\n` + ` embedding FLOAT[${dimensions}]\n` + `)`, ); this.vector.dims = dimensions; } private dropVectorTable(): void { try { this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`); } catch (err) { const message = err instanceof Error ? err.message : String(err); log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`); } } private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } { const sources = Array.from(this.sources); if (sources.length === 0) return { sql: "", params: [] }; const column = alias ? `${alias}.source` : "source"; const placeholders = sources.map(() => "?").join(", "); return { sql: ` AND ${column} IN (${placeholders})`, params: sources }; } private openDatabase(): DatabaseSync { const dbPath = resolveUserPath(this.settings.store.path); return this.openDatabaseAtPath(dbPath); } private openDatabaseAtPath(dbPath: string): DatabaseSync { const dir = path.dirname(dbPath); ensureDir(dir); const { DatabaseSync } = requireNodeSqlite(); return new DatabaseSync(dbPath, { allowExtension: this.settings.store.vector.enabled }); } private seedEmbeddingCache(sourceDb: DatabaseSync): void { if (!this.cache.enabled) return; try { const rows = sourceDb .prepare( `SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`, ) .all() as Array<{ provider: string; model: string; provider_key: string; hash: string; embedding: string; dims: number | null; updated_at: number; }>; if (!rows.length) return; const insert = this.db.prepare( `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET embedding=excluded.embedding, dims=excluded.dims, updated_at=excluded.updated_at`, ); this.db.exec("BEGIN"); for (const row of rows) { insert.run( row.provider, row.model, row.provider_key, row.hash, row.embedding, row.dims, row.updated_at, ); } this.db.exec("COMMIT"); } catch (err) { try { this.db.exec("ROLLBACK"); } catch {} throw err; } } private async swapIndexFiles(targetPath: string, tempPath: string): Promise { const backupPath = `${targetPath}.backup-${randomUUID()}`; await this.moveIndexFiles(targetPath, backupPath); try { await this.moveIndexFiles(tempPath, targetPath); } catch (err) { await this.moveIndexFiles(backupPath, targetPath); throw err; } await this.removeIndexFiles(backupPath); } private async moveIndexFiles(sourceBase: string, targetBase: string): Promise { const suffixes = ["", "-wal", "-shm"]; for (const suffix of suffixes) { const source = `${sourceBase}${suffix}`; const target = `${targetBase}${suffix}`; try { await fs.rename(source, target); } catch (err) { if ((err as NodeJS.ErrnoException).code !== "ENOENT") { throw err; } } } } private async removeIndexFiles(basePath: string): Promise { const suffixes = ["", "-wal", "-shm"]; await Promise.all(suffixes.map((suffix) => fs.rm(`${basePath}${suffix}`, { force: true }))); } private ensureSchema() { const result = ensureMemoryIndexSchema({ db: this.db, embeddingCacheTable: EMBEDDING_CACHE_TABLE, ftsTable: FTS_TABLE, ftsEnabled: this.fts.enabled, }); this.fts.available = result.ftsAvailable; if (result.ftsError) { this.fts.loadError = result.ftsError; log.warn(`fts unavailable: ${result.ftsError}`); } } private ensureWatcher() { if (!this.sources.has("memory") || !this.settings.sync.watch || this.watcher) return; const watchPaths = [ path.join(this.workspaceDir, "MEMORY.md"), path.join(this.workspaceDir, "memory"), ]; this.watcher = chokidar.watch(watchPaths, { ignoreInitial: true, awaitWriteFinish: { stabilityThreshold: this.settings.sync.watchDebounceMs, pollInterval: 100, }, }); const markDirty = () => { this.dirty = true; this.scheduleWatchSync(); }; this.watcher.on("add", markDirty); this.watcher.on("change", markDirty); this.watcher.on("unlink", markDirty); } private ensureSessionListener() { if (!this.sources.has("sessions") || this.sessionUnsubscribe) return; this.sessionUnsubscribe = onSessionTranscriptUpdate((update) => { if (this.closed) return; const sessionFile = update.sessionFile; if (!this.isSessionFileForAgent(sessionFile)) return; this.scheduleSessionDirty(sessionFile); }); } private scheduleSessionDirty(sessionFile: string) { this.sessionPendingFiles.add(sessionFile); if (this.sessionWatchTimer) return; this.sessionWatchTimer = setTimeout(() => { this.sessionWatchTimer = null; void this.processSessionDeltaBatch().catch((err) => { log.warn(`memory session delta failed: ${String(err)}`); }); }, SESSION_DIRTY_DEBOUNCE_MS); } private async processSessionDeltaBatch(): Promise { if (this.sessionPendingFiles.size === 0) return; const pending = Array.from(this.sessionPendingFiles); this.sessionPendingFiles.clear(); let shouldSync = false; for (const sessionFile of pending) { const delta = await this.updateSessionDelta(sessionFile); if (!delta) continue; const bytesThreshold = delta.deltaBytes; const messagesThreshold = delta.deltaMessages; const bytesHit = bytesThreshold <= 0 ? delta.pendingBytes > 0 : delta.pendingBytes >= bytesThreshold; const messagesHit = messagesThreshold <= 0 ? delta.pendingMessages > 0 : delta.pendingMessages >= messagesThreshold; if (!bytesHit && !messagesHit) continue; this.sessionsDirtyFiles.add(sessionFile); this.sessionsDirty = true; delta.pendingBytes = bytesThreshold > 0 ? Math.max(0, delta.pendingBytes - bytesThreshold) : 0; delta.pendingMessages = messagesThreshold > 0 ? Math.max(0, delta.pendingMessages - messagesThreshold) : 0; shouldSync = true; } if (shouldSync) { void this.sync({ reason: "session-delta" }).catch((err) => { log.warn(`memory sync failed (session-delta): ${String(err)}`); }); } } private async updateSessionDelta(sessionFile: string): Promise<{ deltaBytes: number; deltaMessages: number; pendingBytes: number; pendingMessages: number; } | null> { const thresholds = this.settings.sync.sessions; if (!thresholds) return null; let stat: { size: number }; try { stat = await fs.stat(sessionFile); } catch { return null; } const size = stat.size; let state = this.sessionDeltas.get(sessionFile); if (!state) { state = { lastSize: 0, pendingBytes: 0, pendingMessages: 0 }; this.sessionDeltas.set(sessionFile, state); } const deltaBytes = Math.max(0, size - state.lastSize); if (deltaBytes === 0 && size === state.lastSize) { return { deltaBytes: thresholds.deltaBytes, deltaMessages: thresholds.deltaMessages, pendingBytes: state.pendingBytes, pendingMessages: state.pendingMessages, }; } if (size < state.lastSize) { state.lastSize = size; state.pendingBytes += size; const shouldCountMessages = thresholds.deltaMessages > 0 && (thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes); if (shouldCountMessages) { state.pendingMessages += await this.countNewlines(sessionFile, 0, size); } } else { state.pendingBytes += deltaBytes; const shouldCountMessages = thresholds.deltaMessages > 0 && (thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes); if (shouldCountMessages) { state.pendingMessages += await this.countNewlines(sessionFile, state.lastSize, size); } state.lastSize = size; } this.sessionDeltas.set(sessionFile, state); return { deltaBytes: thresholds.deltaBytes, deltaMessages: thresholds.deltaMessages, pendingBytes: state.pendingBytes, pendingMessages: state.pendingMessages, }; } private async countNewlines(absPath: string, start: number, end: number): Promise { if (end <= start) return 0; const handle = await fs.open(absPath, "r"); try { let offset = start; let count = 0; const buffer = Buffer.alloc(SESSION_DELTA_READ_CHUNK_BYTES); while (offset < end) { const toRead = Math.min(buffer.length, end - offset); const { bytesRead } = await handle.read(buffer, 0, toRead, offset); if (bytesRead <= 0) break; for (let i = 0; i < bytesRead; i += 1) { if (buffer[i] === 10) count += 1; } offset += bytesRead; } return count; } finally { await handle.close(); } } private resetSessionDelta(absPath: string, size: number): void { const state = this.sessionDeltas.get(absPath); if (!state) return; state.lastSize = size; state.pendingBytes = 0; state.pendingMessages = 0; } private isSessionFileForAgent(sessionFile: string): boolean { if (!sessionFile) return false; const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId); const resolvedFile = path.resolve(sessionFile); const resolvedDir = path.resolve(sessionsDir); return resolvedFile.startsWith(`${resolvedDir}${path.sep}`); } private ensureIntervalSync() { const minutes = this.settings.sync.intervalMinutes; if (!minutes || minutes <= 0 || this.intervalTimer) return; const ms = minutes * 60 * 1000; this.intervalTimer = setInterval(() => { void this.sync({ reason: "interval" }).catch((err) => { log.warn(`memory sync failed (interval): ${String(err)}`); }); }, ms); } private scheduleWatchSync() { if (!this.sources.has("memory") || !this.settings.sync.watch) return; if (this.watchTimer) clearTimeout(this.watchTimer); this.watchTimer = setTimeout(() => { this.watchTimer = null; void this.sync({ reason: "watch" }).catch((err) => { log.warn(`memory sync failed (watch): ${String(err)}`); }); }, this.settings.sync.watchDebounceMs); } private shouldSyncSessions( params?: { reason?: string; force?: boolean }, needsFullReindex = false, ) { if (!this.sources.has("sessions")) return false; if (params?.force) return true; const reason = params?.reason; if (reason === "session-start" || reason === "watch") return false; if (needsFullReindex) return true; return this.sessionsDirty && this.sessionsDirtyFiles.size > 0; } private async syncMemoryFiles(params: { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { const files = await listMemoryFiles(this.workspaceDir); const fileEntries = await Promise.all( files.map(async (file) => buildFileEntry(file, this.workspaceDir)), ); log.debug("memory sync: indexing memory files", { files: fileEntries.length, needsFullReindex: params.needsFullReindex, batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), }); const activePaths = new Set(fileEntries.map((entry) => entry.path)); if (params.progress) { params.progress.total += fileEntries.length; params.progress.report({ completed: params.progress.completed, total: params.progress.total, label: this.batch.enabled ? "Indexing memory files (batch)..." : "Indexing memory files…", }); } const tasks = fileEntries.map((entry) => async () => { const record = this.db .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) .get(entry.path, "memory") as { hash: string } | undefined; if (!params.needsFullReindex && record?.hash === entry.hash) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } await this.indexFile(entry, { source: "memory" }); if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } }); await this.runWithConcurrency(tasks, this.getIndexConcurrency()); const staleRows = this.db .prepare(`SELECT path FROM files WHERE source = ?`) .all("memory") as Array<{ path: string }>; for (const stale of staleRows) { if (activePaths.has(stale.path)) continue; this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); try { this.db .prepare( `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(stale.path, "memory"); } catch {} this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); if (this.fts.enabled && this.fts.available) { try { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(stale.path, "memory", this.provider.model); } catch {} } } } private async syncSessionFiles(params: { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { const files = await this.listSessionFiles(); const activePaths = new Set(files.map((file) => this.sessionPathForFile(file))); const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0; log.debug("memory sync: indexing session files", { files: files.length, indexAll, dirtyFiles: this.sessionsDirtyFiles.size, batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), }); if (params.progress) { params.progress.total += files.length; params.progress.report({ completed: params.progress.completed, total: params.progress.total, label: this.batch.enabled ? "Indexing session files (batch)..." : "Indexing session files…", }); } const tasks = files.map((absPath) => async () => { if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } const entry = await this.buildSessionEntry(absPath); if (!entry) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } const record = this.db .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) .get(entry.path, "sessions") as { hash: string } | undefined; if (!params.needsFullReindex && record?.hash === entry.hash) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } this.resetSessionDelta(absPath, entry.size); return; } await this.indexFile(entry, { source: "sessions", content: entry.content }); this.resetSessionDelta(absPath, entry.size); if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } }); await this.runWithConcurrency(tasks, this.getIndexConcurrency()); const staleRows = this.db .prepare(`SELECT path FROM files WHERE source = ?`) .all("sessions") as Array<{ path: string }>; for (const stale of staleRows) { if (activePaths.has(stale.path)) continue; this.db .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) .run(stale.path, "sessions"); try { this.db .prepare( `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(stale.path, "sessions"); } catch {} this.db .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) .run(stale.path, "sessions"); if (this.fts.enabled && this.fts.available) { try { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(stale.path, "sessions", this.provider.model); } catch {} } } } private createSyncProgress( onProgress: (update: MemorySyncProgressUpdate) => void, ): MemorySyncProgressState { const state: MemorySyncProgressState = { completed: 0, total: 0, label: undefined, report: (update) => { if (update.label) state.label = update.label; const label = update.total > 0 && state.label ? `${state.label} ${update.completed}/${update.total}` : state.label; onProgress({ completed: update.completed, total: update.total, label, }); }, }; return state; } private async runSync(params?: { reason?: string; force?: boolean; progress?: (update: MemorySyncProgressUpdate) => void; }) { const progress = params?.progress ? this.createSyncProgress(params.progress) : undefined; if (progress) { progress.report({ completed: progress.completed, total: progress.total, label: "Loading vector extension…", }); } const vectorReady = await this.ensureVectorReady(); const meta = this.readMeta(); const needsFullReindex = params?.force || !meta || meta.model !== this.provider.model || meta.provider !== this.provider.id || meta.providerKey !== this.providerKey || meta.chunkTokens !== this.settings.chunking.tokens || meta.chunkOverlap !== this.settings.chunking.overlap || (vectorReady && !meta?.vectorDims); try { if (needsFullReindex) { await this.runSafeReindex({ reason: params?.reason, force: params?.force, progress: progress ?? undefined, }); return; } const shouldSyncMemory = this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty); const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); if (shouldSyncMemory) { await this.syncMemoryFiles({ needsFullReindex, progress: progress ?? undefined }); this.dirty = false; } if (shouldSyncSessions) { await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined }); this.sessionsDirty = false; this.sessionsDirtyFiles.clear(); } else if (this.sessionsDirtyFiles.size > 0) { this.sessionsDirty = true; } else { this.sessionsDirty = false; } } catch (err) { const reason = err instanceof Error ? err.message : String(err); const activated = this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); if (activated) { await this.runSafeReindex({ reason: params?.reason ?? "fallback", force: true, progress: progress ?? undefined, }); return; } throw err; } } private shouldFallbackOnError(message: string): boolean { return /embedding|embeddings|batch/i.test(message); } private resolveBatchConfig(): { enabled: boolean; wait: boolean; concurrency: number; pollIntervalMs: number; timeoutMs: number; } { const batch = this.settings.remote?.batch; const enabled = Boolean( batch?.enabled && ((this.openAi && this.provider.id === "openai") || (this.gemini && this.provider.id === "gemini")), ); return { enabled, wait: batch?.wait ?? true, concurrency: Math.max(1, batch?.concurrency ?? 2), pollIntervalMs: batch?.pollIntervalMs ?? 2000, timeoutMs: (batch?.timeoutMinutes ?? 60) * 60 * 1000, }; } private async activateFallbackProvider(reason: string): Promise { const fallback = this.settings.fallback; if (!fallback || fallback === "none" || fallback === this.provider.id) return false; if (this.fallbackFrom) return false; const fallbackFrom = this.provider.id as "openai" | "gemini" | "local"; const fallbackModel = fallback === "gemini" ? DEFAULT_GEMINI_EMBEDDING_MODEL : fallback === "openai" ? DEFAULT_OPENAI_EMBEDDING_MODEL : this.settings.model; const fallbackResult = await createEmbeddingProvider({ config: this.cfg, agentDir: resolveAgentDir(this.cfg, this.agentId), provider: fallback, remote: this.settings.remote, model: fallbackModel, fallback: "none", local: this.settings.local, }); this.fallbackFrom = fallbackFrom; this.fallbackReason = reason; this.provider = fallbackResult.provider; this.openAi = fallbackResult.openAi; this.gemini = fallbackResult.gemini; this.providerKey = this.computeProviderKey(); this.batch = this.resolveBatchConfig(); log.warn(`memory embeddings: switched to fallback provider (${fallback})`, { reason }); return true; } private async runSafeReindex(params: { reason?: string; force?: boolean; progress?: MemorySyncProgressState; }): Promise { const dbPath = resolveUserPath(this.settings.store.path); const tempDbPath = `${dbPath}.tmp-${randomUUID()}`; const tempDb = this.openDatabaseAtPath(tempDbPath); const originalDb = this.db; let originalDbClosed = false; const originalState = { ftsAvailable: this.fts.available, ftsError: this.fts.loadError, vectorAvailable: this.vector.available, vectorLoadError: this.vector.loadError, vectorDims: this.vector.dims, vectorReady: this.vectorReady, }; const restoreOriginalState = () => { if (originalDbClosed) { this.db = this.openDatabaseAtPath(dbPath); } else { this.db = originalDb; } this.fts.available = originalState.ftsAvailable; this.fts.loadError = originalState.ftsError; this.vector.available = originalDbClosed ? null : originalState.vectorAvailable; this.vector.loadError = originalState.vectorLoadError; this.vector.dims = originalState.vectorDims; this.vectorReady = originalDbClosed ? null : originalState.vectorReady; }; this.db = tempDb; this.vectorReady = null; this.vector.available = null; this.vector.loadError = undefined; this.vector.dims = undefined; this.fts.available = false; this.fts.loadError = undefined; this.ensureSchema(); let nextMeta: MemoryIndexMeta | null = null; try { this.seedEmbeddingCache(originalDb); const shouldSyncMemory = this.sources.has("memory"); const shouldSyncSessions = this.shouldSyncSessions( { reason: params.reason, force: params.force }, true, ); if (shouldSyncMemory) { await this.syncMemoryFiles({ needsFullReindex: true, progress: params.progress }); this.dirty = false; } if (shouldSyncSessions) { await this.syncSessionFiles({ needsFullReindex: true, progress: params.progress }); this.sessionsDirty = false; this.sessionsDirtyFiles.clear(); } else if (this.sessionsDirtyFiles.size > 0) { this.sessionsDirty = true; } else { this.sessionsDirty = false; } nextMeta = { model: this.provider.model, provider: this.provider.id, providerKey: this.providerKey, chunkTokens: this.settings.chunking.tokens, chunkOverlap: this.settings.chunking.overlap, }; if (this.vector.available && this.vector.dims) { nextMeta.vectorDims = this.vector.dims; } this.writeMeta(nextMeta); this.pruneEmbeddingCacheIfNeeded(); this.db.close(); originalDb.close(); originalDbClosed = true; await this.swapIndexFiles(dbPath, tempDbPath); this.db = this.openDatabaseAtPath(dbPath); this.vectorReady = null; this.vector.available = null; this.vector.loadError = undefined; this.ensureSchema(); this.vector.dims = nextMeta.vectorDims; } catch (err) { try { this.db.close(); } catch {} await this.removeIndexFiles(tempDbPath); restoreOriginalState(); throw err; } } private resetIndex() { this.db.exec(`DELETE FROM files`); this.db.exec(`DELETE FROM chunks`); if (this.fts.enabled && this.fts.available) { try { this.db.exec(`DELETE FROM ${FTS_TABLE}`); } catch {} } this.dropVectorTable(); this.vector.dims = undefined; this.sessionsDirtyFiles.clear(); } private readMeta(): MemoryIndexMeta | null { const row = this.db.prepare(`SELECT value FROM meta WHERE key = ?`).get(META_KEY) as | { value: string } | undefined; if (!row?.value) return null; try { return JSON.parse(row.value) as MemoryIndexMeta; } catch { return null; } } private writeMeta(meta: MemoryIndexMeta) { const value = JSON.stringify(meta); this.db .prepare( `INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value`, ) .run(META_KEY, value); } private async listSessionFiles(): Promise { const dir = resolveSessionTranscriptsDirForAgent(this.agentId); try { const entries = await fs.readdir(dir, { withFileTypes: true }); return entries .filter((entry) => entry.isFile()) .map((entry) => entry.name) .filter((name) => name.endsWith(".jsonl")) .map((name) => path.join(dir, name)); } catch { return []; } } private sessionPathForFile(absPath: string): string { return path.join("sessions", path.basename(absPath)).replace(/\\/g, "/"); } private normalizeSessionText(value: string): string { return value .replace(/\s*\n+\s*/g, " ") .replace(/\s+/g, " ") .trim(); } private extractSessionText(content: unknown): string | null { if (typeof content === "string") { const normalized = this.normalizeSessionText(content); return normalized ? normalized : null; } if (!Array.isArray(content)) return null; const parts: string[] = []; for (const block of content) { if (!block || typeof block !== "object") continue; const record = block as { type?: unknown; text?: unknown }; if (record.type !== "text" || typeof record.text !== "string") continue; const normalized = this.normalizeSessionText(record.text); if (normalized) parts.push(normalized); } if (parts.length === 0) return null; return parts.join(" "); } private async buildSessionEntry(absPath: string): Promise { try { const stat = await fs.stat(absPath); const raw = await fs.readFile(absPath, "utf-8"); const lines = raw.split("\n"); const collected: string[] = []; for (const line of lines) { if (!line.trim()) continue; let record: unknown; try { record = JSON.parse(line); } catch { continue; } if ( !record || typeof record !== "object" || (record as { type?: unknown }).type !== "message" ) { continue; } const message = (record as { message?: unknown }).message as | { role?: unknown; content?: unknown } | undefined; if (!message || typeof message.role !== "string") continue; if (message.role !== "user" && message.role !== "assistant") continue; const text = this.extractSessionText(message.content); if (!text) continue; const label = message.role === "user" ? "User" : "Assistant"; collected.push(`${label}: ${text}`); } const content = collected.join("\n"); return { path: this.sessionPathForFile(absPath), absPath, mtimeMs: stat.mtimeMs, size: stat.size, hash: hashText(content), content, }; } catch (err) { log.debug(`Failed reading session file ${absPath}: ${String(err)}`); return null; } } private estimateEmbeddingTokens(text: string): number { if (!text) return 0; return Math.ceil(text.length / EMBEDDING_APPROX_CHARS_PER_TOKEN); } private buildEmbeddingBatches(chunks: MemoryChunk[]): MemoryChunk[][] { const batches: MemoryChunk[][] = []; let current: MemoryChunk[] = []; let currentTokens = 0; for (const chunk of chunks) { const estimate = this.estimateEmbeddingTokens(chunk.text); const wouldExceed = current.length > 0 && currentTokens + estimate > EMBEDDING_BATCH_MAX_TOKENS; if (wouldExceed) { batches.push(current); current = []; currentTokens = 0; } if (current.length === 0 && estimate > EMBEDDING_BATCH_MAX_TOKENS) { batches.push([chunk]); continue; } current.push(chunk); currentTokens += estimate; } if (current.length > 0) { batches.push(current); } return batches; } private loadEmbeddingCache(hashes: string[]): Map { if (!this.cache.enabled) return new Map(); if (hashes.length === 0) return new Map(); const unique: string[] = []; const seen = new Set(); for (const hash of hashes) { if (!hash) continue; if (seen.has(hash)) continue; seen.add(hash); unique.push(hash); } if (unique.length === 0) return new Map(); const out = new Map(); const baseParams = [this.provider.id, this.provider.model, this.providerKey]; const batchSize = 400; for (let start = 0; start < unique.length; start += batchSize) { const batch = unique.slice(start, start + batchSize); const placeholders = batch.map(() => "?").join(", "); const rows = this.db .prepare( `SELECT hash, embedding FROM ${EMBEDDING_CACHE_TABLE}\n` + ` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`, ) .all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>; for (const row of rows) { out.set(row.hash, parseEmbedding(row.embedding)); } } return out; } private upsertEmbeddingCache(entries: Array<{ hash: string; embedding: number[] }>): void { if (!this.cache.enabled) return; if (entries.length === 0) return; const now = Date.now(); const stmt = this.db.prepare( `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` + ` VALUES (?, ?, ?, ?, ?, ?, ?)\n` + ` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` + ` embedding=excluded.embedding,\n` + ` dims=excluded.dims,\n` + ` updated_at=excluded.updated_at`, ); for (const entry of entries) { const embedding = entry.embedding ?? []; stmt.run( this.provider.id, this.provider.model, this.providerKey, entry.hash, JSON.stringify(embedding), embedding.length, now, ); } } private pruneEmbeddingCacheIfNeeded(): void { if (!this.cache.enabled) return; const max = this.cache.maxEntries; if (!max || max <= 0) return; const row = this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as | { c: number } | undefined; const count = row?.c ?? 0; if (count <= max) return; const excess = count - max; this.db .prepare( `DELETE FROM ${EMBEDDING_CACHE_TABLE}\n` + ` WHERE rowid IN (\n` + ` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` + ` ORDER BY updated_at ASC\n` + ` LIMIT ?\n` + ` )`, ) .run(excess); } private async embedChunksInBatches(chunks: MemoryChunk[]): Promise { if (chunks.length === 0) return []; const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); const missing: Array<{ index: number; chunk: MemoryChunk }> = []; for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; if (hit && hit.length > 0) { embeddings[i] = hit; } else if (chunk) { missing.push({ index: i, chunk }); } } if (missing.length === 0) return embeddings; const missingChunks = missing.map((m) => m.chunk); const batches = this.buildEmbeddingBatches(missingChunks); const toCache: Array<{ hash: string; embedding: number[] }> = []; let cursor = 0; for (const batch of batches) { const batchEmbeddings = await this.embedBatchWithRetry(batch.map((chunk) => chunk.text)); for (let i = 0; i < batch.length; i += 1) { const item = missing[cursor + i]; const embedding = batchEmbeddings[i] ?? []; if (item) { embeddings[item.index] = embedding; toCache.push({ hash: item.chunk.hash, embedding }); } } cursor += batch.length; } this.upsertEmbeddingCache(toCache); return embeddings; } private computeProviderKey(): string { if (this.provider.id === "openai" && this.openAi) { const entries = Object.entries(this.openAi.headers) .filter(([key]) => key.toLowerCase() !== "authorization") .sort(([a], [b]) => a.localeCompare(b)) .map(([key, value]) => [key, value]); return hashText( JSON.stringify({ provider: "openai", baseUrl: this.openAi.baseUrl, model: this.openAi.model, headers: entries, }), ); } if (this.provider.id === "gemini" && this.gemini) { const entries = Object.entries(this.gemini.headers) .filter(([key]) => { const lower = key.toLowerCase(); return lower !== "authorization" && lower !== "x-goog-api-key"; }) .sort(([a], [b]) => a.localeCompare(b)) .map(([key, value]) => [key, value]); return hashText( JSON.stringify({ provider: "gemini", baseUrl: this.gemini.baseUrl, model: this.gemini.model, headers: entries, }), ); } return hashText(JSON.stringify({ provider: this.provider.id, model: this.provider.model })); } private async embedChunksWithBatch( chunks: MemoryChunk[], entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, ): Promise { if (this.provider.id === "openai" && this.openAi) { return this.embedChunksWithOpenAiBatch(chunks, entry, source); } if (this.provider.id === "gemini" && this.gemini) { return this.embedChunksWithGeminiBatch(chunks, entry, source); } return this.embedChunksInBatches(chunks); } private async embedChunksWithOpenAiBatch( chunks: MemoryChunk[], entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, ): Promise { const openAi = this.openAi; if (!openAi) { return this.embedChunksInBatches(chunks); } if (chunks.length === 0) return []; const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); const missing: Array<{ index: number; chunk: MemoryChunk }> = []; for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; if (hit && hit.length > 0) { embeddings[i] = hit; } else if (chunk) { missing.push({ index: i, chunk }); } } if (missing.length === 0) return embeddings; const requests: OpenAiBatchRequest[] = []; const mapping = new Map(); for (const item of missing) { const chunk = item.chunk; const customId = hashText( `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, ); mapping.set(customId, { index: item.index, hash: chunk.hash }); requests.push({ custom_id: customId, method: "POST", url: OPENAI_BATCH_ENDPOINT, body: { model: this.openAi?.model ?? this.provider.model, input: chunk.text, }, }); } const batchResult = await this.runBatchWithFallback({ provider: "openai", run: async () => await runOpenAiEmbeddingBatches({ openAi, agentId: this.agentId, requests, wait: this.batch.wait, concurrency: this.batch.concurrency, pollIntervalMs: this.batch.pollIntervalMs, timeoutMs: this.batch.timeoutMs, debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), }), fallback: async () => await this.embedChunksInBatches(chunks), }); if (Array.isArray(batchResult)) return batchResult; const byCustomId = batchResult; const toCache: Array<{ hash: string; embedding: number[] }> = []; for (const [customId, embedding] of byCustomId.entries()) { const mapped = mapping.get(customId); if (!mapped) continue; embeddings[mapped.index] = embedding; toCache.push({ hash: mapped.hash, embedding }); } this.upsertEmbeddingCache(toCache); return embeddings; } private async embedChunksWithGeminiBatch( chunks: MemoryChunk[], entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, ): Promise { const gemini = this.gemini; if (!gemini) { return this.embedChunksInBatches(chunks); } if (chunks.length === 0) return []; const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); const missing: Array<{ index: number; chunk: MemoryChunk }> = []; for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; if (hit && hit.length > 0) { embeddings[i] = hit; } else if (chunk) { missing.push({ index: i, chunk }); } } if (missing.length === 0) return embeddings; const requests: GeminiBatchRequest[] = []; const mapping = new Map(); for (const item of missing) { const chunk = item.chunk; const customId = hashText( `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, ); mapping.set(customId, { index: item.index, hash: chunk.hash }); requests.push({ custom_id: customId, content: { parts: [{ text: chunk.text }] }, taskType: "RETRIEVAL_DOCUMENT", }); } const batchResult = await this.runBatchWithFallback({ provider: "gemini", run: async () => await runGeminiEmbeddingBatches({ gemini, agentId: this.agentId, requests, wait: this.batch.wait, concurrency: this.batch.concurrency, pollIntervalMs: this.batch.pollIntervalMs, timeoutMs: this.batch.timeoutMs, debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), }), fallback: async () => await this.embedChunksInBatches(chunks), }); if (Array.isArray(batchResult)) return batchResult; const byCustomId = batchResult; const toCache: Array<{ hash: string; embedding: number[] }> = []; for (const [customId, embedding] of byCustomId.entries()) { const mapped = mapping.get(customId); if (!mapped) continue; embeddings[mapped.index] = embedding; toCache.push({ hash: mapped.hash, embedding }); } this.upsertEmbeddingCache(toCache); return embeddings; } private async embedBatchWithRetry(texts: string[]): Promise { if (texts.length === 0) return []; let attempt = 0; let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS; while (true) { try { const timeoutMs = this.resolveEmbeddingTimeout("batch"); log.debug("memory embeddings: batch start", { provider: this.provider.id, items: texts.length, timeoutMs, }); return await this.withTimeout( this.provider.embedBatch(texts), timeoutMs, `memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`, ); } catch (err) { const message = err instanceof Error ? err.message : String(err); if (!this.isRetryableEmbeddingError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) { throw err; } const waitMs = Math.min( EMBEDDING_RETRY_MAX_DELAY_MS, Math.round(delayMs * (1 + Math.random() * 0.2)), ); log.warn(`memory embeddings rate limited; retrying in ${waitMs}ms`); await new Promise((resolve) => setTimeout(resolve, waitMs)); delayMs *= 2; attempt += 1; } } } private isRetryableEmbeddingError(message: string): boolean { return /(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare)/i.test( message, ); } private resolveEmbeddingTimeout(kind: "query" | "batch"): number { const isLocal = this.provider.id === "local"; if (kind === "query") { return isLocal ? EMBEDDING_QUERY_TIMEOUT_LOCAL_MS : EMBEDDING_QUERY_TIMEOUT_REMOTE_MS; } return isLocal ? EMBEDDING_BATCH_TIMEOUT_LOCAL_MS : EMBEDDING_BATCH_TIMEOUT_REMOTE_MS; } private async embedQueryWithTimeout(text: string): Promise { const timeoutMs = this.resolveEmbeddingTimeout("query"); log.debug("memory embeddings: query start", { provider: this.provider.id, timeoutMs }); return await this.withTimeout( this.provider.embedQuery(text), timeoutMs, `memory embeddings query timed out after ${Math.round(timeoutMs / 1000)}s`, ); } private async withTimeout( promise: Promise, timeoutMs: number, message: string, ): Promise { if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) return await promise; let timer: NodeJS.Timeout | null = null; const timeoutPromise = new Promise((_, reject) => { timer = setTimeout(() => reject(new Error(message)), timeoutMs); }); try { return (await Promise.race([promise, timeoutPromise])) as T; } finally { if (timer) clearTimeout(timer); } } private async runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { if (tasks.length === 0) return []; const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); const results: T[] = Array.from({ length: tasks.length }); let next = 0; let firstError: unknown = null; const workers = Array.from({ length: resolvedLimit }, async () => { while (true) { if (firstError) return; const index = next; next += 1; if (index >= tasks.length) return; try { results[index] = await tasks[index](); } catch (err) { firstError = err; return; } } }); await Promise.allSettled(workers); if (firstError) throw firstError; return results; } private async withBatchFailureLock(fn: () => Promise): Promise { let release: () => void; const wait = this.batchFailureLock; this.batchFailureLock = new Promise((resolve) => { release = resolve; }); await wait; try { return await fn(); } finally { release!(); } } private async resetBatchFailureCount(): Promise { await this.withBatchFailureLock(async () => { if (this.batchFailureCount > 0) { log.debug("memory embeddings: batch recovered; resetting failure count"); } this.batchFailureCount = 0; this.batchFailureLastError = undefined; this.batchFailureLastProvider = undefined; }); } private async recordBatchFailure(params: { provider: string; message: string; attempts?: number; forceDisable?: boolean; }): Promise<{ disabled: boolean; count: number }> { return await this.withBatchFailureLock(async () => { if (!this.batch.enabled) { return { disabled: true, count: this.batchFailureCount }; } const increment = params.forceDisable ? BATCH_FAILURE_LIMIT : Math.max(1, params.attempts ?? 1); this.batchFailureCount += increment; this.batchFailureLastError = params.message; this.batchFailureLastProvider = params.provider; const disabled = params.forceDisable || this.batchFailureCount >= BATCH_FAILURE_LIMIT; if (disabled) { this.batch.enabled = false; } return { disabled, count: this.batchFailureCount }; }); } private isBatchTimeoutError(message: string): boolean { return /timed out|timeout/i.test(message); } private async runBatchWithTimeoutRetry(params: { provider: string; run: () => Promise; }): Promise { try { return await params.run(); } catch (err) { const message = err instanceof Error ? err.message : String(err); if (this.isBatchTimeoutError(message)) { log.warn(`memory embeddings: ${params.provider} batch timed out; retrying once`); try { return await params.run(); } catch (retryErr) { (retryErr as { batchAttempts?: number }).batchAttempts = 2; throw retryErr; } } throw err; } } private async runBatchWithFallback(params: { provider: string; run: () => Promise; fallback: () => Promise; }): Promise { if (!this.batch.enabled) { return await params.fallback(); } try { const result = await this.runBatchWithTimeoutRetry({ provider: params.provider, run: params.run, }); await this.resetBatchFailureCount(); return result; } catch (err) { const message = err instanceof Error ? err.message : String(err); const attempts = (err as { batchAttempts?: number }).batchAttempts ?? 1; const forceDisable = /asyncBatchEmbedContent not available/i.test(message); const failure = await this.recordBatchFailure({ provider: params.provider, message, attempts, forceDisable, }); const suffix = failure.disabled ? "disabling batch" : "keeping batch enabled"; log.warn( `memory embeddings: ${params.provider} batch failed (${failure.count}/${BATCH_FAILURE_LIMIT}); ${suffix}; falling back to non-batch embeddings: ${message}`, ); return await params.fallback(); } } private getIndexConcurrency(): number { return this.batch.enabled ? this.batch.concurrency : EMBEDDING_INDEX_CONCURRENCY; } private async indexFile( entry: MemoryFileEntry | SessionFileEntry, options: { source: MemorySource; content?: string }, ) { const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8")); const chunks = chunkMarkdown(content, this.settings.chunking).filter( (chunk) => chunk.text.trim().length > 0, ); const embeddings = this.batch.enabled ? await this.embedChunksWithBatch(chunks, entry, options.source) : await this.embedChunksInBatches(chunks); const sample = embeddings.find((embedding) => embedding.length > 0); const vectorReady = sample ? await this.ensureVectorReady(sample.length) : false; const now = Date.now(); if (vectorReady) { try { this.db .prepare( `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(entry.path, options.source); } catch {} } if (this.fts.enabled && this.fts.available) { try { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(entry.path, options.source, this.provider.model); } catch {} } this.db .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) .run(entry.path, options.source); for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]; const embedding = embeddings[i] ?? []; const id = hashText( `${options.source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${this.provider.model}`, ); this.db .prepare( `INSERT INTO chunks (id, path, source, start_line, end_line, hash, model, text, embedding, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET hash=excluded.hash, model=excluded.model, text=excluded.text, embedding=excluded.embedding, updated_at=excluded.updated_at`, ) .run( id, entry.path, options.source, chunk.startLine, chunk.endLine, chunk.hash, this.provider.model, chunk.text, JSON.stringify(embedding), now, ); if (vectorReady && embedding.length > 0) { try { this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id); } catch {} this.db .prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`) .run(id, vectorToBlob(embedding)); } if (this.fts.enabled && this.fts.available) { this.db .prepare( `INSERT INTO ${FTS_TABLE} (text, id, path, source, model, start_line, end_line)\n` + ` VALUES (?, ?, ?, ?, ?, ?, ?)`, ) .run( chunk.text, id, entry.path, options.source, this.provider.model, chunk.startLine, chunk.endLine, ); } } this.db .prepare( `INSERT INTO files (path, source, hash, mtime, size) VALUES (?, ?, ?, ?, ?) ON CONFLICT(path) DO UPDATE SET source=excluded.source, hash=excluded.hash, mtime=excluded.mtime, size=excluded.size`, ) .run(entry.path, options.source, entry.hash, entry.mtimeMs, entry.size); } }