From f6d359932ae83da21497f442fb498dfa71c410dd Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 17 Jan 2026 21:56:17 +0000 Subject: [PATCH] fix: parallelize memory embedding indexing --- CHANGELOG.md | 1 + src/memory/manager.embedding-batches.test.ts | 41 ++++++++++ src/memory/manager.ts | 81 +++++++++++++++++--- 3 files changed, 113 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 763de4ccc..2f5ecd4b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.clawd.bot ### Fixes - Tools: show exec elevated flag before the command and keep it outside markdown in tool summaries. +- Memory: parallelize embedding indexing with rate-limit retries. - Memory: split overly long lines to keep embeddings under token limits. ## 2026.1.17-1 diff --git a/src/memory/manager.embedding-batches.test.ts b/src/memory/manager.embedding-batches.test.ts index 0bc9e56a1..a11839db2 100644 --- a/src/memory/manager.embedding-batches.test.ts +++ b/src/memory/manager.embedding-batches.test.ts @@ -150,4 +150,45 @@ describe("memory embedding batches", () => { expect(last?.total).toBeGreaterThan(0); expect(last?.completed).toBe(last?.total); }); + + it("retries embeddings on rate limit errors", async () => { + const line = "d".repeat(120); + const content = Array.from({ length: 12 }, () => line).join("\n"); + await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-06.md"), content); + + let calls = 0; + embedBatch.mockImplementation(async (texts: string[]) => { + calls += 1; + if (calls < 3) { + throw new Error("openai embeddings failed: 429 rate limit"); + } + return texts.map(() => [0, 1, 0]); + }); + + const cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "mock-embed", + store: { path: indexPath }, + chunking: { tokens: 200, overlap: 0 }, + sync: { watch: false, onSessionStart: false, onSearch: false }, + query: { minScore: 0 }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + + const result = await getMemorySearchManager({ cfg, agentId: "main" }); + expect(result.manager).not.toBeNull(); + if (!result.manager) throw new Error("manager missing"); + manager = result.manager; + + await manager.sync({ force: true }); + + expect(calls).toBe(3); + }, 10000); }); diff --git a/src/memory/manager.ts b/src/memory/manager.ts index abfa6d580..b5d638fa0 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -79,6 +79,10 @@ const VECTOR_TABLE = "chunks_vec"; const SESSION_DIRTY_DEBOUNCE_MS = 5000; const EMBEDDING_BATCH_MAX_TOKENS = 8000; const EMBEDDING_APPROX_CHARS_PER_TOKEN = 1; +const EMBEDDING_INDEX_CONCURRENCY = 4; +const EMBEDDING_RETRY_MAX_ATTEMPTS = 3; +const EMBEDDING_RETRY_BASE_DELAY_MS = 500; +const EMBEDDING_RETRY_MAX_DELAY_MS = 8000; const log = createSubsystemLogger("memory"); @@ -396,7 +400,7 @@ export class MemoryIndexManager { async probeEmbeddingAvailability(): Promise<{ ok: boolean; error?: string }> { try { - await this.provider.embedQuery("ping"); + await this.embedBatchWithRetry(["ping"]); return { ok: true }; } catch (err) { const message = err instanceof Error ? err.message : String(err); @@ -685,7 +689,7 @@ export class MemoryIndexManager { }); } - for (const entry of fileEntries) { + const tasks = fileEntries.map((entry) => async () => { const record = this.db .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) .get(entry.path, "memory") as { hash: string } | undefined; @@ -697,7 +701,7 @@ export class MemoryIndexManager { total: params.progress.total, }); } - continue; + return; } await this.indexFile(entry, { source: "memory" }); if (params.progress) { @@ -707,7 +711,8 @@ export class MemoryIndexManager { total: params.progress.total, }); } - } + }); + await this.runWithConcurrency(tasks, EMBEDDING_INDEX_CONCURRENCY); const staleRows = this.db .prepare(`SELECT path FROM files WHERE source = ?`) @@ -735,7 +740,7 @@ export class MemoryIndexManager { }); } - for (const absPath of files) { + const tasks = files.map((absPath) => async () => { if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) { if (params.progress) { params.progress.completed += 1; @@ -744,7 +749,7 @@ export class MemoryIndexManager { total: params.progress.total, }); } - continue; + return; } const entry = await this.buildSessionEntry(absPath); if (!entry) { @@ -755,7 +760,7 @@ export class MemoryIndexManager { total: params.progress.total, }); } - continue; + return; } const record = this.db .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) @@ -768,7 +773,7 @@ export class MemoryIndexManager { total: params.progress.total, }); } - continue; + return; } await this.indexFile(entry, { source: "sessions", content: entry.content }); if (params.progress) { @@ -778,7 +783,8 @@ export class MemoryIndexManager { total: params.progress.total, }); } - } + }); + await this.runWithConcurrency(tasks, EMBEDDING_INDEX_CONCURRENCY); const staleRows = this.db .prepare(`SELECT path FROM files WHERE source = ?`) @@ -1017,7 +1023,7 @@ export class MemoryIndexManager { const batches = this.buildEmbeddingBatches(chunks); const embeddings: number[][] = []; for (const batch of batches) { - const batchEmbeddings = await this.provider.embedBatch(batch.map((chunk) => chunk.text)); + const batchEmbeddings = await this.embedBatchWithRetry(batch.map((chunk) => chunk.text)); for (let i = 0; i < batch.length; i += 1) { embeddings.push(batchEmbeddings[i] ?? []); } @@ -1025,6 +1031,61 @@ export class MemoryIndexManager { return embeddings; } + private async embedBatchWithRetry(texts: string[]): Promise { + if (texts.length === 0) return []; + let attempt = 0; + let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS; + while (true) { + try { + return await this.provider.embedBatch(texts); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (!this.isRateLimitError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) { + throw err; + } + const waitMs = Math.min( + EMBEDDING_RETRY_MAX_DELAY_MS, + Math.round(delayMs * (1 + Math.random() * 0.2)), + ); + log.warn(`memory embeddings rate limited; retrying in ${waitMs}ms`); + await new Promise((resolve) => setTimeout(resolve, waitMs)); + delayMs *= 2; + attempt += 1; + } + } + } + + private isRateLimitError(message: string): boolean { + return /(rate[_ ]limit|too many requests|429|resource has been exhausted)/i.test(message); + } + + private async runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { + if (tasks.length === 0) return []; + const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); + const results: T[] = Array.from({ length: tasks.length }); + let next = 0; + let firstError: unknown | null = null; + + const workers = Array.from({ length: resolvedLimit }, async () => { + while (true) { + if (firstError) return; + const index = next; + next += 1; + if (index >= tasks.length) return; + try { + results[index] = await tasks[index](); + } catch (err) { + firstError = err; + return; + } + } + }); + + await Promise.allSettled(workers); + if (firstError) throw firstError; + return results; + } + private async indexFile( entry: MemoryFileEntry | SessionFileEntry, options: { source: MemorySource; content?: string },