import fs from "node:fs/promises"; import path from "node:path"; import type { DatabaseSync } from "node:sqlite"; import chokidar, { type FSWatcher } from "chokidar"; import { resolveAgentDir, resolveAgentWorkspaceDir } from "../agents/agent-scope.js"; import type { ResolvedMemorySearchConfig } from "../agents/memory-search.js"; import { resolveMemorySearchConfig } from "../agents/memory-search.js"; import type { ClawdbotConfig } from "../config/config.js"; import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; import { createSubsystemLogger } from "../logging.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { resolveUserPath, truncateUtf16Safe } from "../utils.js"; import { colorize, isRich, theme } from "../terminal/theme.js"; import { createEmbeddingProvider, type EmbeddingProvider, type EmbeddingProviderResult, type OpenAiEmbeddingClient, } from "./embeddings.js"; import { buildFileEntry, chunkMarkdown, cosineSimilarity, ensureDir, hashText, isMemoryPath, listMemoryFiles, type MemoryChunk, type MemoryFileEntry, normalizeRelPath, parseEmbedding, } from "./internal.js"; import { requireNodeSqlite } from "./sqlite.js"; type MemorySource = "memory" | "sessions"; export type MemorySearchResult = { path: string; startLine: number; endLine: number; score: number; snippet: string; source: MemorySource; }; type MemoryIndexMeta = { model: string; provider: string; providerKey?: string; chunkTokens: number; chunkOverlap: number; vectorDims?: number; }; type SessionFileEntry = { path: string; absPath: string; mtimeMs: number; size: number; hash: string; content: string; }; type MemorySyncProgressUpdate = { completed: number; total: number; label?: string; }; type MemorySyncProgressState = { completed: number; total: number; label?: string; report: (update: MemorySyncProgressUpdate) => void; }; type OpenAiBatchRequest = { custom_id: string; method: "POST"; url: "/v1/embeddings"; body: { model: string; input: string; }; }; type OpenAiBatchStatus = { id?: string; status?: string; output_file_id?: string | null; error_file_id?: string | null; }; type OpenAiBatchOutputLine = { custom_id?: string; response?: { status_code?: number; body?: { data?: Array<{ embedding?: number[]; index?: number }>; error?: { message?: string }; }; }; error?: { message?: string }; }; const META_KEY = "memory_index_meta_v1"; const SNIPPET_MAX_CHARS = 700; const VECTOR_TABLE = "chunks_vec"; const FTS_TABLE = "chunks_fts"; const EMBEDDING_CACHE_TABLE = "embedding_cache"; const SESSION_DIRTY_DEBOUNCE_MS = 5000; const EMBEDDING_BATCH_MAX_TOKENS = 8000; const EMBEDDING_APPROX_CHARS_PER_TOKEN = 1; const EMBEDDING_INDEX_CONCURRENCY = 4; const EMBEDDING_RETRY_MAX_ATTEMPTS = 3; const EMBEDDING_RETRY_BASE_DELAY_MS = 500; const EMBEDDING_RETRY_MAX_DELAY_MS = 8000; const OPENAI_BATCH_ENDPOINT = "/v1/embeddings"; const OPENAI_BATCH_COMPLETION_WINDOW = "24h"; const OPENAI_BATCH_MAX_REQUESTS = 50000; const log = createSubsystemLogger("memory"); const INDEX_CACHE = new Map(); const vectorToBlob = (embedding: number[]): Buffer => Buffer.from(new Float32Array(embedding).buffer); export class MemoryIndexManager { private readonly cacheKey: string; private readonly cfg: ClawdbotConfig; private readonly agentId: string; private readonly workspaceDir: string; private readonly settings: ResolvedMemorySearchConfig; private readonly provider: EmbeddingProvider; private readonly requestedProvider: "openai" | "local"; private readonly fallbackReason?: string; private readonly openAi?: OpenAiEmbeddingClient; private readonly batch: { enabled: boolean; wait: boolean; concurrency: number; pollIntervalMs: number; timeoutMs: number; }; private readonly db: DatabaseSync; private readonly sources: Set; private readonly providerKey: string; private readonly cache: { enabled: boolean; maxEntries?: number }; private readonly vector: { enabled: boolean; available: boolean | null; extensionPath?: string; loadError?: string; dims?: number; }; private readonly fts: { enabled: boolean; available: boolean; loadError?: string; }; private vectorReady: Promise | null = null; private watcher: FSWatcher | null = null; private watchTimer: NodeJS.Timeout | null = null; private sessionWatchTimer: NodeJS.Timeout | null = null; private sessionUnsubscribe: (() => void) | null = null; private intervalTimer: NodeJS.Timeout | null = null; private closed = false; private dirty = false; private sessionsDirty = false; private sessionsDirtyFiles = new Set(); private sessionWarm = new Set(); private syncing: Promise | null = null; static async get(params: { cfg: ClawdbotConfig; agentId: string; }): Promise { const { cfg, agentId } = params; const settings = resolveMemorySearchConfig(cfg, agentId); if (!settings) return null; const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId); const key = `${agentId}:${workspaceDir}:${JSON.stringify(settings)}`; const existing = INDEX_CACHE.get(key); if (existing) return existing; const providerResult = await createEmbeddingProvider({ config: cfg, agentDir: resolveAgentDir(cfg, agentId), provider: settings.provider, remote: settings.remote, model: settings.model, fallback: settings.fallback, local: settings.local, }); const manager = new MemoryIndexManager({ cacheKey: key, cfg, agentId, workspaceDir, settings, providerResult, }); INDEX_CACHE.set(key, manager); return manager; } private constructor(params: { cacheKey: string; cfg: ClawdbotConfig; agentId: string; workspaceDir: string; settings: ResolvedMemorySearchConfig; providerResult: EmbeddingProviderResult; }) { this.cacheKey = params.cacheKey; this.cfg = params.cfg; this.agentId = params.agentId; this.workspaceDir = params.workspaceDir; this.settings = params.settings; this.provider = params.providerResult.provider; this.requestedProvider = params.providerResult.requestedProvider; this.fallbackReason = params.providerResult.fallbackReason; this.openAi = params.providerResult.openAi; this.sources = new Set(params.settings.sources); this.db = this.openDatabase(); this.providerKey = this.computeProviderKey(); this.cache = { enabled: params.settings.cache.enabled, maxEntries: params.settings.cache.maxEntries, }; this.fts = { enabled: params.settings.query.hybrid.enabled, available: false }; this.ensureSchema(); this.vector = { enabled: params.settings.store.vector.enabled, available: null, extensionPath: params.settings.store.vector.extensionPath, }; const meta = this.readMeta(); if (meta?.vectorDims) { this.vector.dims = meta.vectorDims; } this.ensureWatcher(); this.ensureSessionListener(); this.ensureIntervalSync(); this.dirty = this.sources.has("memory"); if (this.sources.has("sessions")) { this.sessionsDirty = true; } const batch = params.settings.remote?.batch; this.batch = { enabled: Boolean(batch?.enabled && this.openAi && this.provider.id === "openai"), wait: batch?.wait ?? true, concurrency: Math.max(1, batch?.concurrency ?? 2), pollIntervalMs: batch?.pollIntervalMs ?? 2000, timeoutMs: (batch?.timeoutMinutes ?? 60) * 60 * 1000, }; } async warmSession(sessionKey?: string): Promise { if (!this.settings.sync.onSessionStart) return; const key = sessionKey?.trim() || ""; if (key && this.sessionWarm.has(key)) return; await this.sync({ reason: "session-start" }); if (key) this.sessionWarm.add(key); } async search( query: string, opts?: { maxResults?: number; minScore?: number; sessionKey?: string; }, ): Promise { await this.warmSession(opts?.sessionKey); if (this.settings.sync.onSearch && (this.dirty || this.sessionsDirty)) { await this.sync({ reason: "search" }); } const cleaned = query.trim(); if (!cleaned) return []; const minScore = opts?.minScore ?? this.settings.query.minScore; const maxResults = opts?.maxResults ?? this.settings.query.maxResults; const hybrid = this.settings.query.hybrid; const candidates = Math.min( 200, Math.max(1, Math.floor(maxResults * hybrid.candidateMultiplier)), ); const keywordResults = hybrid.enabled ? await this.searchKeyword(cleaned, candidates).catch(() => []) : []; const queryVec = await this.provider.embedQuery(cleaned); const hasVector = queryVec.some((v) => v !== 0); const vectorResults = hasVector ? await this.searchVector(queryVec, candidates).catch(() => []) : []; if (!hybrid.enabled) { return vectorResults.filter((entry) => entry.score >= minScore).slice(0, maxResults); } const merged = this.mergeHybridResults({ vector: vectorResults, keyword: keywordResults, vectorWeight: hybrid.vectorWeight, textWeight: hybrid.textWeight, }); return merged.filter((entry) => entry.score >= minScore).slice(0, maxResults); } private async searchVector( queryVec: number[], limit: number, ): Promise> { if (queryVec.length === 0 || limit <= 0) return []; if (await this.ensureVectorReady(queryVec.length)) { const sourceFilter = this.buildSourceFilter("c"); const rows = this.db .prepare( `SELECT c.id, c.path, c.start_line, c.end_line, c.text,\n` + ` c.source,\n` + ` vec_distance_cosine(v.embedding, ?) AS dist\n` + ` FROM ${VECTOR_TABLE} v\n` + ` JOIN chunks c ON c.id = v.id\n` + ` WHERE c.model = ?${sourceFilter.sql}\n` + ` ORDER BY dist ASC\n` + ` LIMIT ?`, ) .all(vectorToBlob(queryVec), this.provider.model, ...sourceFilter.params, limit) as Array<{ id: string; path: string; start_line: number; end_line: number; text: string; source: MemorySource; dist: number; }>; return rows.map((row) => ({ id: row.id, path: row.path, startLine: row.start_line, endLine: row.end_line, score: 1 - row.dist, snippet: truncateUtf16Safe(row.text, SNIPPET_MAX_CHARS), source: row.source, })); } const candidates = this.listChunks(); const scored = candidates .map((chunk) => ({ chunk, score: cosineSimilarity(queryVec, chunk.embedding), })) .filter((entry) => Number.isFinite(entry.score)); return scored .sort((a, b) => b.score - a.score) .slice(0, limit) .map((entry) => ({ id: entry.chunk.id, path: entry.chunk.path, startLine: entry.chunk.startLine, endLine: entry.chunk.endLine, score: entry.score, snippet: truncateUtf16Safe(entry.chunk.text, SNIPPET_MAX_CHARS), source: entry.chunk.source, })); } private buildFtsQuery(raw: string): string | null { const tokens = raw .match(/[A-Za-z0-9_]+/g) ?.map((t) => t.trim()) .filter(Boolean) ?? []; if (tokens.length === 0) return null; const quoted = tokens.map((t) => `"${t.replaceAll('"', "")}"`); return quoted.join(" AND "); } private async searchKeyword( query: string, limit: number, ): Promise> { if (!this.fts.enabled || !this.fts.available) return []; if (limit <= 0) return []; const ftsQuery = this.buildFtsQuery(query); if (!ftsQuery) return []; const sourceFilter = this.buildSourceFilter(); const rows = this.db .prepare( `SELECT id, path, source, start_line, end_line, text,\n` + ` bm25(${FTS_TABLE}) AS rank\n` + ` FROM ${FTS_TABLE}\n` + ` WHERE ${FTS_TABLE} MATCH ? AND model = ?${sourceFilter.sql}\n` + ` ORDER BY rank ASC\n` + ` LIMIT ?`, ) .all(ftsQuery, this.provider.model, ...sourceFilter.params, limit) as Array<{ id: string; path: string; source: MemorySource; start_line: number; end_line: number; text: string; rank: number; }>; return rows.map((row) => { const rank = Number.isFinite(row.rank) ? Math.max(0, row.rank) : 999; const textScore = 1 / (1 + rank); return { id: row.id, path: row.path, startLine: row.start_line, endLine: row.end_line, score: textScore, textScore, snippet: truncateUtf16Safe(row.text, SNIPPET_MAX_CHARS), source: row.source, }; }); } private mergeHybridResults(params: { vector: Array; keyword: Array; vectorWeight: number; textWeight: number; }): MemorySearchResult[] { const byId = new Map< string, { id: string; path: string; startLine: number; endLine: number; source: MemorySource; snippet: string; vectorScore: number; textScore: number; } >(); for (const r of params.vector) { byId.set(r.id, { id: r.id, path: r.path, startLine: r.startLine, endLine: r.endLine, source: r.source, snippet: r.snippet, vectorScore: r.score, textScore: 0, }); } for (const r of params.keyword) { const existing = byId.get(r.id); if (existing) { existing.textScore = r.textScore; if (r.snippet && r.snippet.length > 0) existing.snippet = r.snippet; } else { byId.set(r.id, { id: r.id, path: r.path, startLine: r.startLine, endLine: r.endLine, source: r.source, snippet: r.snippet, vectorScore: 0, textScore: r.textScore, }); } } const merged = Array.from(byId.values()).map((entry) => { const score = params.vectorWeight * entry.vectorScore + params.textWeight * entry.textScore; return { path: entry.path, startLine: entry.startLine, endLine: entry.endLine, score, snippet: entry.snippet, source: entry.source, } satisfies MemorySearchResult; }); return merged.sort((a, b) => b.score - a.score); } async sync(params?: { reason?: string; force?: boolean; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise { if (this.syncing) return this.syncing; this.syncing = this.runSync(params).finally(() => { this.syncing = null; }); return this.syncing; } async readFile(params: { relPath: string; from?: number; lines?: number; }): Promise<{ text: string; path: string }> { const relPath = normalizeRelPath(params.relPath); if (!relPath || !isMemoryPath(relPath)) { throw new Error("path required"); } const absPath = path.resolve(this.workspaceDir, relPath); if (!absPath.startsWith(this.workspaceDir)) { throw new Error("path escapes workspace"); } const content = await fs.readFile(absPath, "utf-8"); if (!params.from && !params.lines) { return { text: content, path: relPath }; } const lines = content.split("\n"); const start = Math.max(1, params.from ?? 1); const count = Math.max(1, params.lines ?? lines.length); const slice = lines.slice(start - 1, start - 1 + count); return { text: slice.join("\n"), path: relPath }; } status(): { files: number; chunks: number; dirty: boolean; workspaceDir: string; dbPath: string; provider: string; model: string; requestedProvider: string; sources: MemorySource[]; sourceCounts: Array<{ source: MemorySource; files: number; chunks: number }>; cache?: { enabled: boolean; entries?: number; maxEntries?: number }; fts?: { enabled: boolean; available: boolean; error?: string }; fallback?: { from: string; reason?: string }; vector?: { enabled: boolean; available?: boolean; extensionPath?: string; loadError?: string; dims?: number; }; } { const sourceFilter = this.buildSourceFilter(); const files = this.db .prepare(`SELECT COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql}`) .get(...sourceFilter.params) as { c: number; }; const chunks = this.db .prepare(`SELECT COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql}`) .get(...sourceFilter.params) as { c: number; }; const sourceCounts = (() => { const sources = Array.from(this.sources); if (sources.length === 0) return []; const bySource = new Map(); for (const source of sources) { bySource.set(source, { files: 0, chunks: 0 }); } const fileRows = this.db .prepare( `SELECT source, COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql} GROUP BY source`, ) .all(...sourceFilter.params) as Array<{ source: MemorySource; c: number }>; for (const row of fileRows) { const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 }; entry.files = row.c ?? 0; bySource.set(row.source, entry); } const chunkRows = this.db .prepare( `SELECT source, COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql} GROUP BY source`, ) .all(...sourceFilter.params) as Array<{ source: MemorySource; c: number }>; for (const row of chunkRows) { const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 }; entry.chunks = row.c ?? 0; bySource.set(row.source, entry); } return sources.map((source) => ({ source, ...bySource.get(source)! })); })(); return { files: files?.c ?? 0, chunks: chunks?.c ?? 0, dirty: this.dirty, workspaceDir: this.workspaceDir, dbPath: this.settings.store.path, provider: this.provider.id, model: this.provider.model, requestedProvider: this.requestedProvider, sources: Array.from(this.sources), sourceCounts, cache: this.cache.enabled ? { enabled: true, entries: ( this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as | { c: number } | undefined )?.c ?? 0, maxEntries: this.cache.maxEntries, } : { enabled: false, maxEntries: this.cache.maxEntries }, fts: { enabled: this.fts.enabled, available: this.fts.available, error: this.fts.loadError, }, fallback: this.fallbackReason ? { from: "local", reason: this.fallbackReason } : undefined, vector: { enabled: this.vector.enabled, available: this.vector.available ?? undefined, extensionPath: this.vector.extensionPath, loadError: this.vector.loadError, dims: this.vector.dims, }, }; } async probeVectorAvailability(): Promise { if (!this.vector.enabled) return false; return this.ensureVectorReady(); } async probeEmbeddingAvailability(): Promise<{ ok: boolean; error?: string }> { try { await this.embedBatchWithRetry(["ping"]); return { ok: true }; } catch (err) { const message = err instanceof Error ? err.message : String(err); return { ok: false, error: message }; } } async close(): Promise { if (this.closed) return; this.closed = true; if (this.watchTimer) { clearTimeout(this.watchTimer); this.watchTimer = null; } if (this.sessionWatchTimer) { clearTimeout(this.sessionWatchTimer); this.sessionWatchTimer = null; } if (this.intervalTimer) { clearInterval(this.intervalTimer); this.intervalTimer = null; } if (this.watcher) { await this.watcher.close(); this.watcher = null; } if (this.sessionUnsubscribe) { this.sessionUnsubscribe(); this.sessionUnsubscribe = null; } this.db.close(); INDEX_CACHE.delete(this.cacheKey); } private async ensureVectorReady(dimensions?: number): Promise { if (!this.vector.enabled) return false; if (!this.vectorReady) { this.vectorReady = this.loadVectorExtension(); } const ready = await this.vectorReady; if (ready && typeof dimensions === "number" && dimensions > 0) { this.ensureVectorTable(dimensions); } return ready; } private async loadVectorExtension(): Promise { if (this.vector.available !== null) return this.vector.available; if (!this.vector.enabled) { this.vector.available = false; return false; } try { const sqliteVec = await import("sqlite-vec"); const extensionPath = this.vector.extensionPath?.trim() ? resolveUserPath(this.vector.extensionPath) : sqliteVec.getLoadablePath(); this.db.enableLoadExtension(true); if (this.vector.extensionPath?.trim()) { this.db.loadExtension(extensionPath); } else { sqliteVec.load(this.db); } this.vector.extensionPath = extensionPath; this.vector.available = true; return true; } catch (err) { const message = err instanceof Error ? err.message : String(err); this.vector.available = false; this.vector.loadError = message; log.warn(`sqlite-vec unavailable: ${message}`); return false; } } private ensureVectorTable(dimensions: number): void { if (this.vector.dims === dimensions) return; if (this.vector.dims && this.vector.dims !== dimensions) { this.dropVectorTable(); } this.db.exec( `CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` + ` id TEXT PRIMARY KEY,\n` + ` embedding FLOAT[${dimensions}]\n` + `)`, ); this.vector.dims = dimensions; } private dropVectorTable(): void { try { this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`); } catch (err) { const message = err instanceof Error ? err.message : String(err); log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`); } } private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } { const sources = Array.from(this.sources); if (sources.length === 0) return { sql: "", params: [] }; const column = alias ? `${alias}.source` : "source"; const placeholders = sources.map(() => "?").join(", "); return { sql: ` AND ${column} IN (${placeholders})`, params: sources }; } private ensureColumn(table: "files" | "chunks", column: string, definition: string): void { const rows = this.db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ name: string; }>; if (rows.some((row) => row.name === column)) return; this.db.exec(`ALTER TABLE ${table} ADD COLUMN ${column} ${definition}`); } private openDatabase(): DatabaseSync { const dbPath = resolveUserPath(this.settings.store.path); const dir = path.dirname(dbPath); ensureDir(dir); const { DatabaseSync } = requireNodeSqlite(); return new DatabaseSync(dbPath, { allowExtension: this.settings.store.vector.enabled }); } private ensureSchema() { this.db.exec(` CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); `); this.db.exec(` CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, source TEXT NOT NULL DEFAULT 'memory', hash TEXT NOT NULL, mtime INTEGER NOT NULL, size INTEGER NOT NULL ); `); this.db.exec(` CREATE TABLE IF NOT EXISTS chunks ( id TEXT PRIMARY KEY, path TEXT NOT NULL, source TEXT NOT NULL DEFAULT 'memory', start_line INTEGER NOT NULL, end_line INTEGER NOT NULL, hash TEXT NOT NULL, model TEXT NOT NULL, text TEXT NOT NULL, embedding TEXT NOT NULL, updated_at INTEGER NOT NULL ); `); this.db.exec(` CREATE TABLE IF NOT EXISTS ${EMBEDDING_CACHE_TABLE} ( provider TEXT NOT NULL, model TEXT NOT NULL, provider_key TEXT NOT NULL, hash TEXT NOT NULL, embedding TEXT NOT NULL, dims INTEGER, updated_at INTEGER NOT NULL, PRIMARY KEY (provider, model, provider_key, hash) ); `); this.db.exec( `CREATE INDEX IF NOT EXISTS idx_embedding_cache_updated_at ON ${EMBEDDING_CACHE_TABLE}(updated_at);`, ); if (this.fts.enabled) { try { this.db.exec( `CREATE VIRTUAL TABLE IF NOT EXISTS ${FTS_TABLE} USING fts5(\n` + ` text,\n` + ` id UNINDEXED,\n` + ` path UNINDEXED,\n` + ` source UNINDEXED,\n` + ` model UNINDEXED,\n` + ` start_line UNINDEXED,\n` + ` end_line UNINDEXED\n` + `);`, ); this.fts.available = true; } catch (err) { const message = err instanceof Error ? err.message : String(err); this.fts.available = false; this.fts.loadError = message; log.warn(`fts unavailable: ${message}`); } } this.ensureColumn("files", "source", "TEXT NOT NULL DEFAULT 'memory'"); this.ensureColumn("chunks", "source", "TEXT NOT NULL DEFAULT 'memory'"); this.db.exec(`CREATE INDEX IF NOT EXISTS idx_chunks_path ON chunks(path);`); this.db.exec(`CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source);`); } private ensureWatcher() { if (!this.sources.has("memory") || !this.settings.sync.watch || this.watcher) return; const watchPaths = [ path.join(this.workspaceDir, "MEMORY.md"), path.join(this.workspaceDir, "memory"), ]; this.watcher = chokidar.watch(watchPaths, { ignoreInitial: true, awaitWriteFinish: { stabilityThreshold: this.settings.sync.watchDebounceMs, pollInterval: 100, }, }); const markDirty = () => { this.dirty = true; this.scheduleWatchSync(); }; this.watcher.on("add", markDirty); this.watcher.on("change", markDirty); this.watcher.on("unlink", markDirty); } private ensureSessionListener() { if (!this.sources.has("sessions") || this.sessionUnsubscribe) return; this.sessionUnsubscribe = onSessionTranscriptUpdate((update) => { if (this.closed) return; const sessionFile = update.sessionFile; if (!this.isSessionFileForAgent(sessionFile)) return; this.scheduleSessionDirty(sessionFile); }); } private scheduleSessionDirty(sessionFile: string) { this.sessionsDirtyFiles.add(sessionFile); if (this.sessionWatchTimer) return; this.sessionWatchTimer = setTimeout(() => { this.sessionWatchTimer = null; this.sessionsDirty = true; }, SESSION_DIRTY_DEBOUNCE_MS); } private isSessionFileForAgent(sessionFile: string): boolean { if (!sessionFile) return false; const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId); const resolvedFile = path.resolve(sessionFile); const resolvedDir = path.resolve(sessionsDir); return resolvedFile.startsWith(`${resolvedDir}${path.sep}`); } private ensureIntervalSync() { const minutes = this.settings.sync.intervalMinutes; if (!minutes || minutes <= 0 || this.intervalTimer) return; const ms = minutes * 60 * 1000; this.intervalTimer = setInterval(() => { void this.sync({ reason: "interval" }).catch((err) => { log.warn(`memory sync failed (interval): ${String(err)}`); }); }, ms); } private scheduleWatchSync() { if (!this.sources.has("memory") || !this.settings.sync.watch) return; if (this.watchTimer) clearTimeout(this.watchTimer); this.watchTimer = setTimeout(() => { this.watchTimer = null; void this.sync({ reason: "watch" }).catch((err) => { log.warn(`memory sync failed (watch): ${String(err)}`); }); }, this.settings.sync.watchDebounceMs); } private listChunks(): Array<{ id: string; path: string; startLine: number; endLine: number; text: string; embedding: number[]; source: MemorySource; }> { const sourceFilter = this.buildSourceFilter(); const rows = this.db .prepare( `SELECT id, path, start_line, end_line, text, embedding, source FROM chunks WHERE model = ?${sourceFilter.sql}`, ) .all(this.provider.model, ...sourceFilter.params) as Array<{ id: string; path: string; start_line: number; end_line: number; text: string; embedding: string; source: MemorySource; }>; return rows.map((row) => ({ id: row.id, path: row.path, startLine: row.start_line, endLine: row.end_line, text: row.text, embedding: parseEmbedding(row.embedding), source: row.source, })); } private shouldSyncSessions( params?: { reason?: string; force?: boolean }, needsFullReindex = false, ) { if (!this.sources.has("sessions")) return false; if (params?.force) return true; const reason = params?.reason; if (reason === "session-start" || reason === "watch") return false; return this.sessionsDirty || needsFullReindex; } private async syncMemoryFiles(params: { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { const files = await listMemoryFiles(this.workspaceDir); const fileEntries = await Promise.all( files.map(async (file) => buildFileEntry(file, this.workspaceDir)), ); log.debug("memory sync: indexing memory files", { files: fileEntries.length, needsFullReindex: params.needsFullReindex, batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), }); const activePaths = new Set(fileEntries.map((entry) => entry.path)); if (params.progress) { params.progress.total += fileEntries.length; params.progress.report({ completed: params.progress.completed, total: params.progress.total, label: this.batch.enabled ? "Indexing memory files (batch)..." : "Indexing memory files…", }); } const tasks = fileEntries.map((entry) => async () => { const record = this.db .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) .get(entry.path, "memory") as { hash: string } | undefined; if (!params.needsFullReindex && record?.hash === entry.hash) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } await this.indexFile(entry, { source: "memory" }); if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } }); await this.runWithConcurrency(tasks, this.getIndexConcurrency()); const staleRows = this.db .prepare(`SELECT path FROM files WHERE source = ?`) .all("memory") as Array<{ path: string }>; for (const stale of staleRows) { if (activePaths.has(stale.path)) continue; this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); try { this.db .prepare( `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(stale.path, "memory"); } catch {} this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); if (this.fts.enabled && this.fts.available) { try { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(stale.path, "memory", this.provider.model); } catch {} } } } private async syncSessionFiles(params: { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { const files = await this.listSessionFiles(); const activePaths = new Set(files.map((file) => this.sessionPathForFile(file))); const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0; log.debug("memory sync: indexing session files", { files: files.length, indexAll, dirtyFiles: this.sessionsDirtyFiles.size, batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), }); if (params.progress) { params.progress.total += files.length; params.progress.report({ completed: params.progress.completed, total: params.progress.total, label: this.batch.enabled ? "Indexing session files (batch)..." : "Indexing session files…", }); } const tasks = files.map((absPath) => async () => { if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } const entry = await this.buildSessionEntry(absPath); if (!entry) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } const record = this.db .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) .get(entry.path, "sessions") as { hash: string } | undefined; if (!params.needsFullReindex && record?.hash === entry.hash) { if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } return; } await this.indexFile(entry, { source: "sessions", content: entry.content }); if (params.progress) { params.progress.completed += 1; params.progress.report({ completed: params.progress.completed, total: params.progress.total, }); } }); await this.runWithConcurrency(tasks, this.getIndexConcurrency()); const staleRows = this.db .prepare(`SELECT path FROM files WHERE source = ?`) .all("sessions") as Array<{ path: string }>; for (const stale of staleRows) { if (activePaths.has(stale.path)) continue; this.db .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) .run(stale.path, "sessions"); try { this.db .prepare( `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(stale.path, "sessions"); } catch {} this.db .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) .run(stale.path, "sessions"); if (this.fts.enabled && this.fts.available) { try { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(stale.path, "sessions", this.provider.model); } catch {} } } } private createSyncProgress( onProgress: (update: MemorySyncProgressUpdate) => void, ): MemorySyncProgressState { const state: MemorySyncProgressState = { completed: 0, total: 0, label: undefined, report: (update) => { if (update.label) state.label = update.label; const label = update.total > 0 && state.label ? `${state.label} ${update.completed}/${update.total}` : state.label; onProgress({ completed: update.completed, total: update.total, label, }); }, }; return state; } private async runSync(params?: { reason?: string; force?: boolean; progress?: (update: MemorySyncProgressUpdate) => void; }) { const progress = params?.progress ? this.createSyncProgress(params.progress) : undefined; const vectorReady = await this.ensureVectorReady(); const meta = this.readMeta(); const needsFullReindex = params?.force || !meta || meta.model !== this.provider.model || meta.provider !== this.provider.id || meta.providerKey !== this.providerKey || meta.chunkTokens !== this.settings.chunking.tokens || meta.chunkOverlap !== this.settings.chunking.overlap || (vectorReady && !meta?.vectorDims); if (needsFullReindex) { this.resetIndex(); } const shouldSyncMemory = this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty); const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); if (shouldSyncMemory) { await this.syncMemoryFiles({ needsFullReindex, progress: progress ?? undefined }); this.dirty = false; } if (shouldSyncSessions) { await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined }); this.sessionsDirty = false; this.sessionsDirtyFiles.clear(); } else if (needsFullReindex && this.sources.has("sessions")) { this.sessionsDirty = true; } const nextMeta: MemoryIndexMeta = { model: this.provider.model, provider: this.provider.id, providerKey: this.providerKey, chunkTokens: this.settings.chunking.tokens, chunkOverlap: this.settings.chunking.overlap, }; if (this.vector.available && this.vector.dims) { nextMeta.vectorDims = this.vector.dims; } if (shouldSyncMemory || shouldSyncSessions || needsFullReindex) { this.writeMeta(nextMeta); } if (shouldSyncMemory || shouldSyncSessions || needsFullReindex) { this.pruneEmbeddingCacheIfNeeded(); } } private resetIndex() { this.db.exec(`DELETE FROM files`); this.db.exec(`DELETE FROM chunks`); if (this.fts.enabled && this.fts.available) { try { this.db.exec(`DELETE FROM ${FTS_TABLE}`); } catch {} } this.dropVectorTable(); this.vector.dims = undefined; this.sessionsDirtyFiles.clear(); } private readMeta(): MemoryIndexMeta | null { const row = this.db.prepare(`SELECT value FROM meta WHERE key = ?`).get(META_KEY) as | { value: string } | undefined; if (!row?.value) return null; try { return JSON.parse(row.value) as MemoryIndexMeta; } catch { return null; } } private writeMeta(meta: MemoryIndexMeta) { const value = JSON.stringify(meta); this.db .prepare( `INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value`, ) .run(META_KEY, value); } private async listSessionFiles(): Promise { const dir = resolveSessionTranscriptsDirForAgent(this.agentId); try { const entries = await fs.readdir(dir, { withFileTypes: true }); return entries .filter((entry) => entry.isFile()) .map((entry) => entry.name) .filter((name) => name.endsWith(".jsonl")) .map((name) => path.join(dir, name)); } catch { return []; } } private sessionPathForFile(absPath: string): string { return path.join("sessions", path.basename(absPath)).replace(/\\/g, "/"); } private normalizeSessionText(value: string): string { return value .replace(/\s*\n+\s*/g, " ") .replace(/\s+/g, " ") .trim(); } private extractSessionText(content: unknown): string | null { if (typeof content === "string") { const normalized = this.normalizeSessionText(content); return normalized ? normalized : null; } if (!Array.isArray(content)) return null; const parts: string[] = []; for (const block of content) { if (!block || typeof block !== "object") continue; const record = block as { type?: unknown; text?: unknown }; if (record.type !== "text" || typeof record.text !== "string") continue; const normalized = this.normalizeSessionText(record.text); if (normalized) parts.push(normalized); } if (parts.length === 0) return null; return parts.join(" "); } private async buildSessionEntry(absPath: string): Promise { try { const stat = await fs.stat(absPath); const raw = await fs.readFile(absPath, "utf-8"); const lines = raw.split("\n"); const collected: string[] = []; for (const line of lines) { if (!line.trim()) continue; let record: unknown; try { record = JSON.parse(line); } catch { continue; } if ( !record || typeof record !== "object" || (record as { type?: unknown }).type !== "message" ) { continue; } const message = (record as { message?: unknown }).message as | { role?: unknown; content?: unknown } | undefined; if (!message || typeof message.role !== "string") continue; if (message.role !== "user" && message.role !== "assistant") continue; const text = this.extractSessionText(message.content); if (!text) continue; const label = message.role === "user" ? "User" : "Assistant"; collected.push(`${label}: ${text}`); } const content = collected.join("\n"); return { path: this.sessionPathForFile(absPath), absPath, mtimeMs: stat.mtimeMs, size: stat.size, hash: hashText(content), content, }; } catch (err) { log.debug(`Failed reading session file ${absPath}: ${String(err)}`); return null; } } private estimateEmbeddingTokens(text: string): number { if (!text) return 0; return Math.ceil(text.length / EMBEDDING_APPROX_CHARS_PER_TOKEN); } private buildEmbeddingBatches(chunks: MemoryChunk[]): MemoryChunk[][] { const batches: MemoryChunk[][] = []; let current: MemoryChunk[] = []; let currentTokens = 0; for (const chunk of chunks) { const estimate = this.estimateEmbeddingTokens(chunk.text); const wouldExceed = current.length > 0 && currentTokens + estimate > EMBEDDING_BATCH_MAX_TOKENS; if (wouldExceed) { batches.push(current); current = []; currentTokens = 0; } if (current.length === 0 && estimate > EMBEDDING_BATCH_MAX_TOKENS) { batches.push([chunk]); continue; } current.push(chunk); currentTokens += estimate; } if (current.length > 0) { batches.push(current); } return batches; } private loadEmbeddingCache(hashes: string[]): Map { if (!this.cache.enabled) return new Map(); if (hashes.length === 0) return new Map(); const unique: string[] = []; const seen = new Set(); for (const hash of hashes) { if (!hash) continue; if (seen.has(hash)) continue; seen.add(hash); unique.push(hash); } if (unique.length === 0) return new Map(); const out = new Map(); const baseParams = [this.provider.id, this.provider.model, this.providerKey]; const batchSize = 400; for (let start = 0; start < unique.length; start += batchSize) { const batch = unique.slice(start, start + batchSize); const placeholders = batch.map(() => "?").join(", "); const rows = this.db .prepare( `SELECT hash, embedding FROM ${EMBEDDING_CACHE_TABLE}\n` + ` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`, ) .all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>; for (const row of rows) { out.set(row.hash, parseEmbedding(row.embedding)); } } return out; } private upsertEmbeddingCache(entries: Array<{ hash: string; embedding: number[] }>): void { if (!this.cache.enabled) return; if (entries.length === 0) return; const now = Date.now(); const stmt = this.db.prepare( `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` + ` VALUES (?, ?, ?, ?, ?, ?, ?)\n` + ` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` + ` embedding=excluded.embedding,\n` + ` dims=excluded.dims,\n` + ` updated_at=excluded.updated_at`, ); for (const entry of entries) { const embedding = entry.embedding ?? []; stmt.run( this.provider.id, this.provider.model, this.providerKey, entry.hash, JSON.stringify(embedding), embedding.length, now, ); } } private pruneEmbeddingCacheIfNeeded(): void { if (!this.cache.enabled) return; const max = this.cache.maxEntries; if (!max || max <= 0) return; const row = this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as | { c: number } | undefined; const count = row?.c ?? 0; if (count <= max) return; const excess = count - max; this.db .prepare( `DELETE FROM ${EMBEDDING_CACHE_TABLE}\n` + ` WHERE rowid IN (\n` + ` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` + ` ORDER BY updated_at ASC\n` + ` LIMIT ?\n` + ` )`, ) .run(excess); } private async embedChunksInBatches(chunks: MemoryChunk[]): Promise { if (chunks.length === 0) return []; const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); const missing: Array<{ index: number; chunk: MemoryChunk }> = []; for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; if (hit && hit.length > 0) { embeddings[i] = hit; } else if (chunk) { missing.push({ index: i, chunk }); } } if (missing.length === 0) return embeddings; const missingChunks = missing.map((m) => m.chunk); const batches = this.buildEmbeddingBatches(missingChunks); const toCache: Array<{ hash: string; embedding: number[] }> = []; let cursor = 0; for (const batch of batches) { const batchEmbeddings = await this.embedBatchWithRetry(batch.map((chunk) => chunk.text)); for (let i = 0; i < batch.length; i += 1) { const item = missing[cursor + i]; const embedding = batchEmbeddings[i] ?? []; if (item) { embeddings[item.index] = embedding; toCache.push({ hash: item.chunk.hash, embedding }); } } cursor += batch.length; } this.upsertEmbeddingCache(toCache); return embeddings; } private getOpenAiBaseUrl(): string { return this.openAi?.baseUrl?.replace(/\/$/, "") ?? ""; } private getOpenAiHeaders(params: { json: boolean }): Record { const headers = this.openAi?.headers ? { ...this.openAi.headers } : {}; if (params.json) { if (!headers["Content-Type"] && !headers["content-type"]) { headers["Content-Type"] = "application/json"; } } else { delete headers["Content-Type"]; delete headers["content-type"]; } return headers; } private computeProviderKey(): string { if (this.provider.id === "openai" && this.openAi) { const entries = Object.entries(this.openAi.headers) .filter(([key]) => key.toLowerCase() !== "authorization") .sort(([a], [b]) => a.localeCompare(b)) .map(([key, value]) => [key, value]); return hashText( JSON.stringify({ provider: "openai", baseUrl: this.openAi.baseUrl, model: this.openAi.model, headers: entries, }), ); } return hashText(JSON.stringify({ provider: this.provider.id, model: this.provider.model })); } private buildOpenAiBatchRequests( chunks: MemoryChunk[], entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, ): { requests: OpenAiBatchRequest[]; mapping: Map } { const requests: OpenAiBatchRequest[] = []; const mapping = new Map(); for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; const customId = hashText( `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${i}`, ); mapping.set(customId, i); requests.push({ custom_id: customId, method: "POST", url: OPENAI_BATCH_ENDPOINT, body: { model: this.openAi?.model ?? this.provider.model, input: chunk.text, }, }); } return { requests, mapping }; } private splitOpenAiBatchRequests(requests: OpenAiBatchRequest[]): OpenAiBatchRequest[][] { if (requests.length <= OPENAI_BATCH_MAX_REQUESTS) return [requests]; const groups: OpenAiBatchRequest[][] = []; for (let i = 0; i < requests.length; i += OPENAI_BATCH_MAX_REQUESTS) { groups.push(requests.slice(i, i + OPENAI_BATCH_MAX_REQUESTS)); } return groups; } private async submitOpenAiBatch(requests: OpenAiBatchRequest[]): Promise { if (!this.openAi) { throw new Error("OpenAI batch requested without an OpenAI embedding client."); } const baseUrl = this.getOpenAiBaseUrl(); const jsonl = requests.map((request) => JSON.stringify(request)).join("\n"); const form = new FormData(); form.append("purpose", "batch"); form.append( "file", new Blob([jsonl], { type: "application/jsonl" }), "memory-embeddings.jsonl", ); const fileRes = await fetch(`${baseUrl}/files`, { method: "POST", headers: this.getOpenAiHeaders({ json: false }), body: form, }); if (!fileRes.ok) { const text = await fileRes.text(); throw new Error(`openai batch file upload failed: ${fileRes.status} ${text}`); } const filePayload = (await fileRes.json()) as { id?: string }; if (!filePayload.id) { throw new Error("openai batch file upload failed: missing file id"); } const batchRes = await fetch(`${baseUrl}/batches`, { method: "POST", headers: this.getOpenAiHeaders({ json: true }), body: JSON.stringify({ input_file_id: filePayload.id, endpoint: OPENAI_BATCH_ENDPOINT, completion_window: OPENAI_BATCH_COMPLETION_WINDOW, metadata: { source: "clawdbot-memory", agent: this.agentId, }, }), }); if (!batchRes.ok) { const text = await batchRes.text(); throw new Error(`openai batch create failed: ${batchRes.status} ${text}`); } return (await batchRes.json()) as OpenAiBatchStatus; } private async fetchOpenAiBatchStatus(batchId: string): Promise { const baseUrl = this.getOpenAiBaseUrl(); const res = await fetch(`${baseUrl}/batches/${batchId}`, { headers: this.getOpenAiHeaders({ json: true }), }); if (!res.ok) { const text = await res.text(); throw new Error(`openai batch status failed: ${res.status} ${text}`); } return (await res.json()) as OpenAiBatchStatus; } private async fetchOpenAiFileContent(fileId: string): Promise { const baseUrl = this.getOpenAiBaseUrl(); const res = await fetch(`${baseUrl}/files/${fileId}/content`, { headers: this.getOpenAiHeaders({ json: true }), }); if (!res.ok) { const text = await res.text(); throw new Error(`openai batch file content failed: ${res.status} ${text}`); } return await res.text(); } private parseOpenAiBatchOutput(text: string): OpenAiBatchOutputLine[] { if (!text.trim()) return []; return text .split("\n") .map((line) => line.trim()) .filter(Boolean) .map((line) => JSON.parse(line) as OpenAiBatchOutputLine); } private async readOpenAiBatchError(errorFileId: string): Promise { try { const content = await this.fetchOpenAiFileContent(errorFileId); const lines = this.parseOpenAiBatchOutput(content); const first = lines.find((line) => line.error?.message || line.response?.body?.error); const message = first?.error?.message ?? (typeof first?.response?.body?.error?.message === "string" ? first?.response?.body?.error?.message : undefined); return message; } catch (err) { const message = err instanceof Error ? err.message : String(err); return message ? `error file unavailable: ${message}` : undefined; } } private async waitForOpenAiBatch( batchId: string, initial?: OpenAiBatchStatus, ): Promise<{ outputFileId: string; errorFileId?: string }> { const start = Date.now(); let current: OpenAiBatchStatus | undefined = initial; while (true) { const status = current ?? (await this.fetchOpenAiBatchStatus(batchId)); const state = status.status ?? "unknown"; if (state === "completed") { log.debug(`openai batch ${batchId} ${state}`, { consoleMessage: this.formatOpenAiBatchConsoleMessage({ batchId, state }), }); if (!status.output_file_id) { throw new Error(`openai batch ${batchId} completed without output file`); } return { outputFileId: status.output_file_id, errorFileId: status.error_file_id ?? undefined, }; } if (["failed", "expired", "cancelled", "canceled"].includes(state)) { const detail = status.error_file_id ? await this.readOpenAiBatchError(status.error_file_id) : undefined; const suffix = detail ? `: ${detail}` : ""; throw new Error(`openai batch ${batchId} ${state}${suffix}`); } if (!this.batch.wait) { throw new Error(`openai batch ${batchId} still ${state}; wait disabled`); } if (Date.now() - start > this.batch.timeoutMs) { throw new Error(`openai batch ${batchId} timed out after ${this.batch.timeoutMs}ms`); } log.debug(`openai batch ${batchId} ${state}; waiting ${this.batch.pollIntervalMs}ms`, { consoleMessage: this.formatOpenAiBatchConsoleMessage({ batchId, state, waitMs: this.batch.pollIntervalMs, }), }); await new Promise((resolve) => setTimeout(resolve, this.batch.pollIntervalMs)); current = undefined; } } private formatOpenAiBatchConsoleMessage(params: { batchId: string; state: string; waitMs?: number; }): string { const rich = isRich(); const normalized = params.state.toLowerCase(); const successStates = new Set(["completed", "succeeded"]); const errorStates = new Set(["failed", "expired", "cancelled", "canceled"]); const warnStates = new Set(["finalizing", "validating"]); let color = theme.info; if (successStates.has(normalized)) color = theme.success; else if (errorStates.has(normalized)) color = theme.error; else if (warnStates.has(normalized)) color = theme.warn; const status = colorize(rich, color, params.state); const suffix = typeof params.waitMs === "number" ? `; waiting ${params.waitMs}ms` : ""; return `openai batch ${params.batchId} ${status}${suffix}`; } private async embedChunksWithBatch( chunks: MemoryChunk[], entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, ): Promise { if (!this.openAi) { return this.embedChunksInBatches(chunks); } if (chunks.length === 0) return []; const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); const missing: Array<{ index: number; chunk: MemoryChunk }> = []; for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; if (hit && hit.length > 0) { embeddings[i] = hit; } else if (chunk) { missing.push({ index: i, chunk }); } } if (missing.length === 0) return embeddings; const requests: OpenAiBatchRequest[] = []; const mapping = new Map(); for (const item of missing) { const chunk = item.chunk; const customId = hashText( `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, ); mapping.set(customId, item.index); requests.push({ custom_id: customId, method: "POST", url: OPENAI_BATCH_ENDPOINT, body: { model: this.openAi?.model ?? this.provider.model, input: chunk.text, }, }); } const groups = this.splitOpenAiBatchRequests(requests); log.debug("memory embeddings: openai batch submit", { source, chunks: chunks.length, requests: requests.length, groups: groups.length, wait: this.batch.wait, concurrency: this.batch.concurrency, pollIntervalMs: this.batch.pollIntervalMs, timeoutMs: this.batch.timeoutMs, }); const toCache: Array<{ hash: string; embedding: number[] }> = []; const tasks = groups.map((group, groupIndex) => async () => { const batchInfo = await this.submitOpenAiBatch(group); if (!batchInfo.id) { throw new Error("openai batch create failed: missing batch id"); } log.debug("memory embeddings: openai batch created", { batchId: batchInfo.id, status: batchInfo.status, group: groupIndex + 1, groups: groups.length, requests: group.length, }); if (!this.batch.wait && batchInfo.status !== "completed") { throw new Error( `openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`, ); } const completed = batchInfo.status === "completed" ? { outputFileId: batchInfo.output_file_id ?? "", errorFileId: batchInfo.error_file_id ?? undefined, } : await this.waitForOpenAiBatch(batchInfo.id, batchInfo); if (!completed.outputFileId) { throw new Error(`openai batch ${batchInfo.id} completed without output file`); } const content = await this.fetchOpenAiFileContent(completed.outputFileId); const outputLines = this.parseOpenAiBatchOutput(content); const errors: string[] = []; const remaining = new Set(group.map((request) => request.custom_id)); for (const line of outputLines) { const customId = line.custom_id; if (!customId) continue; const index = mapping.get(customId); if (index === undefined) continue; remaining.delete(customId); if (line.error?.message) { errors.push(`${customId}: ${line.error.message}`); continue; } const response = line.response; const statusCode = response?.status_code ?? 0; if (statusCode >= 400) { const message = response?.body?.error?.message ?? (typeof response?.body === "string" ? response.body : undefined) ?? "unknown error"; errors.push(`${customId}: ${message}`); continue; } const data = response?.body?.data ?? []; const embedding = data[0]?.embedding ?? []; if (embedding.length === 0) { errors.push(`${customId}: empty embedding`); continue; } embeddings[index] = embedding; const chunk = chunks[index]; if (chunk) toCache.push({ hash: chunk.hash, embedding }); } if (errors.length > 0) { throw new Error(`openai batch ${batchInfo.id} failed: ${errors.join("; ")}`); } if (remaining.size > 0) { throw new Error( `openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`, ); } }); await this.runWithConcurrency(tasks, this.batch.concurrency); this.upsertEmbeddingCache(toCache); return embeddings; } private async embedBatchWithRetry(texts: string[]): Promise { if (texts.length === 0) return []; let attempt = 0; let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS; while (true) { try { return await this.provider.embedBatch(texts); } catch (err) { const message = err instanceof Error ? err.message : String(err); if (!this.isRetryableEmbeddingError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) { throw err; } const waitMs = Math.min( EMBEDDING_RETRY_MAX_DELAY_MS, Math.round(delayMs * (1 + Math.random() * 0.2)), ); log.warn(`memory embeddings rate limited; retrying in ${waitMs}ms`); await new Promise((resolve) => setTimeout(resolve, waitMs)); delayMs *= 2; attempt += 1; } } } private isRetryableEmbeddingError(message: string): boolean { return /(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare)/i.test( message, ); } private async runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { if (tasks.length === 0) return []; const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); const results: T[] = Array.from({ length: tasks.length }); let next = 0; let firstError: unknown = null; const workers = Array.from({ length: resolvedLimit }, async () => { while (true) { if (firstError) return; const index = next; next += 1; if (index >= tasks.length) return; try { results[index] = await tasks[index](); } catch (err) { firstError = err; return; } } }); await Promise.allSettled(workers); if (firstError) throw firstError; return results; } private getIndexConcurrency(): number { return this.batch.enabled ? this.batch.concurrency : EMBEDDING_INDEX_CONCURRENCY; } private async indexFile( entry: MemoryFileEntry | SessionFileEntry, options: { source: MemorySource; content?: string }, ) { const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8")); const chunks = chunkMarkdown(content, this.settings.chunking).filter( (chunk) => chunk.text.trim().length > 0, ); const embeddings = this.batch.enabled ? await this.embedChunksWithBatch(chunks, entry, options.source) : await this.embedChunksInBatches(chunks); const sample = embeddings.find((embedding) => embedding.length > 0); const vectorReady = sample ? await this.ensureVectorReady(sample.length) : false; const now = Date.now(); if (vectorReady) { try { this.db .prepare( `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(entry.path, options.source); } catch {} } if (this.fts.enabled && this.fts.available) { try { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(entry.path, options.source, this.provider.model); } catch {} } this.db .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) .run(entry.path, options.source); for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]; const embedding = embeddings[i] ?? []; const id = hashText( `${options.source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${this.provider.model}`, ); this.db .prepare( `INSERT INTO chunks (id, path, source, start_line, end_line, hash, model, text, embedding, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET hash=excluded.hash, model=excluded.model, text=excluded.text, embedding=excluded.embedding, updated_at=excluded.updated_at`, ) .run( id, entry.path, options.source, chunk.startLine, chunk.endLine, chunk.hash, this.provider.model, chunk.text, JSON.stringify(embedding), now, ); if (vectorReady && embedding.length > 0) { this.db .prepare(`INSERT OR REPLACE INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`) .run(id, vectorToBlob(embedding)); } if (this.fts.enabled && this.fts.available) { this.db .prepare( `INSERT INTO ${FTS_TABLE} (text, id, path, source, model, start_line, end_line)\n` + ` VALUES (?, ?, ?, ?, ?, ?, ?)`, ) .run( chunk.text, id, entry.path, options.source, this.provider.model, chunk.startLine, chunk.endLine, ); } } this.db .prepare( `INSERT INTO files (path, source, hash, mtime, size) VALUES (?, ?, ?, ?, ?) ON CONFLICT(path) DO UPDATE SET source=excluded.source, hash=excluded.hash, mtime=excluded.mtime, size=excluded.size`, ) .run(entry.path, options.source, entry.hash, entry.mtimeMs, entry.size); } }