fix: parallelize memory embedding indexing
This commit is contained in:
@@ -8,6 +8,7 @@ Docs: https://docs.clawd.bot
|
|||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
- Tools: show exec elevated flag before the command and keep it outside markdown in tool summaries.
|
- Tools: show exec elevated flag before the command and keep it outside markdown in tool summaries.
|
||||||
|
- Memory: parallelize embedding indexing with rate-limit retries.
|
||||||
- Memory: split overly long lines to keep embeddings under token limits.
|
- Memory: split overly long lines to keep embeddings under token limits.
|
||||||
|
|
||||||
## 2026.1.17-1
|
## 2026.1.17-1
|
||||||
|
|||||||
@@ -150,4 +150,45 @@ describe("memory embedding batches", () => {
|
|||||||
expect(last?.total).toBeGreaterThan(0);
|
expect(last?.total).toBeGreaterThan(0);
|
||||||
expect(last?.completed).toBe(last?.total);
|
expect(last?.completed).toBe(last?.total);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("retries embeddings on rate limit errors", async () => {
|
||||||
|
const line = "d".repeat(120);
|
||||||
|
const content = Array.from({ length: 12 }, () => line).join("\n");
|
||||||
|
await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-06.md"), content);
|
||||||
|
|
||||||
|
let calls = 0;
|
||||||
|
embedBatch.mockImplementation(async (texts: string[]) => {
|
||||||
|
calls += 1;
|
||||||
|
if (calls < 3) {
|
||||||
|
throw new Error("openai embeddings failed: 429 rate limit");
|
||||||
|
}
|
||||||
|
return texts.map(() => [0, 1, 0]);
|
||||||
|
});
|
||||||
|
|
||||||
|
const cfg = {
|
||||||
|
agents: {
|
||||||
|
defaults: {
|
||||||
|
workspace: workspaceDir,
|
||||||
|
memorySearch: {
|
||||||
|
provider: "openai",
|
||||||
|
model: "mock-embed",
|
||||||
|
store: { path: indexPath },
|
||||||
|
chunking: { tokens: 200, overlap: 0 },
|
||||||
|
sync: { watch: false, onSessionStart: false, onSearch: false },
|
||||||
|
query: { minScore: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
list: [{ id: "main", default: true }],
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await getMemorySearchManager({ cfg, agentId: "main" });
|
||||||
|
expect(result.manager).not.toBeNull();
|
||||||
|
if (!result.manager) throw new Error("manager missing");
|
||||||
|
manager = result.manager;
|
||||||
|
|
||||||
|
await manager.sync({ force: true });
|
||||||
|
|
||||||
|
expect(calls).toBe(3);
|
||||||
|
}, 10000);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -79,6 +79,10 @@ const VECTOR_TABLE = "chunks_vec";
|
|||||||
const SESSION_DIRTY_DEBOUNCE_MS = 5000;
|
const SESSION_DIRTY_DEBOUNCE_MS = 5000;
|
||||||
const EMBEDDING_BATCH_MAX_TOKENS = 8000;
|
const EMBEDDING_BATCH_MAX_TOKENS = 8000;
|
||||||
const EMBEDDING_APPROX_CHARS_PER_TOKEN = 1;
|
const EMBEDDING_APPROX_CHARS_PER_TOKEN = 1;
|
||||||
|
const EMBEDDING_INDEX_CONCURRENCY = 4;
|
||||||
|
const EMBEDDING_RETRY_MAX_ATTEMPTS = 3;
|
||||||
|
const EMBEDDING_RETRY_BASE_DELAY_MS = 500;
|
||||||
|
const EMBEDDING_RETRY_MAX_DELAY_MS = 8000;
|
||||||
|
|
||||||
const log = createSubsystemLogger("memory");
|
const log = createSubsystemLogger("memory");
|
||||||
|
|
||||||
@@ -396,7 +400,7 @@ export class MemoryIndexManager {
|
|||||||
|
|
||||||
async probeEmbeddingAvailability(): Promise<{ ok: boolean; error?: string }> {
|
async probeEmbeddingAvailability(): Promise<{ ok: boolean; error?: string }> {
|
||||||
try {
|
try {
|
||||||
await this.provider.embedQuery("ping");
|
await this.embedBatchWithRetry(["ping"]);
|
||||||
return { ok: true };
|
return { ok: true };
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const message = err instanceof Error ? err.message : String(err);
|
const message = err instanceof Error ? err.message : String(err);
|
||||||
@@ -685,7 +689,7 @@ export class MemoryIndexManager {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const entry of fileEntries) {
|
const tasks = fileEntries.map((entry) => async () => {
|
||||||
const record = this.db
|
const record = this.db
|
||||||
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
|
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
|
||||||
.get(entry.path, "memory") as { hash: string } | undefined;
|
.get(entry.path, "memory") as { hash: string } | undefined;
|
||||||
@@ -697,7 +701,7 @@ export class MemoryIndexManager {
|
|||||||
total: params.progress.total,
|
total: params.progress.total,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
continue;
|
return;
|
||||||
}
|
}
|
||||||
await this.indexFile(entry, { source: "memory" });
|
await this.indexFile(entry, { source: "memory" });
|
||||||
if (params.progress) {
|
if (params.progress) {
|
||||||
@@ -707,7 +711,8 @@ export class MemoryIndexManager {
|
|||||||
total: params.progress.total,
|
total: params.progress.total,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
await this.runWithConcurrency(tasks, EMBEDDING_INDEX_CONCURRENCY);
|
||||||
|
|
||||||
const staleRows = this.db
|
const staleRows = this.db
|
||||||
.prepare(`SELECT path FROM files WHERE source = ?`)
|
.prepare(`SELECT path FROM files WHERE source = ?`)
|
||||||
@@ -735,7 +740,7 @@ export class MemoryIndexManager {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const absPath of files) {
|
const tasks = files.map((absPath) => async () => {
|
||||||
if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) {
|
if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) {
|
||||||
if (params.progress) {
|
if (params.progress) {
|
||||||
params.progress.completed += 1;
|
params.progress.completed += 1;
|
||||||
@@ -744,7 +749,7 @@ export class MemoryIndexManager {
|
|||||||
total: params.progress.total,
|
total: params.progress.total,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
continue;
|
return;
|
||||||
}
|
}
|
||||||
const entry = await this.buildSessionEntry(absPath);
|
const entry = await this.buildSessionEntry(absPath);
|
||||||
if (!entry) {
|
if (!entry) {
|
||||||
@@ -755,7 +760,7 @@ export class MemoryIndexManager {
|
|||||||
total: params.progress.total,
|
total: params.progress.total,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
continue;
|
return;
|
||||||
}
|
}
|
||||||
const record = this.db
|
const record = this.db
|
||||||
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
|
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
|
||||||
@@ -768,7 +773,7 @@ export class MemoryIndexManager {
|
|||||||
total: params.progress.total,
|
total: params.progress.total,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
continue;
|
return;
|
||||||
}
|
}
|
||||||
await this.indexFile(entry, { source: "sessions", content: entry.content });
|
await this.indexFile(entry, { source: "sessions", content: entry.content });
|
||||||
if (params.progress) {
|
if (params.progress) {
|
||||||
@@ -778,7 +783,8 @@ export class MemoryIndexManager {
|
|||||||
total: params.progress.total,
|
total: params.progress.total,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
await this.runWithConcurrency(tasks, EMBEDDING_INDEX_CONCURRENCY);
|
||||||
|
|
||||||
const staleRows = this.db
|
const staleRows = this.db
|
||||||
.prepare(`SELECT path FROM files WHERE source = ?`)
|
.prepare(`SELECT path FROM files WHERE source = ?`)
|
||||||
@@ -1017,7 +1023,7 @@ export class MemoryIndexManager {
|
|||||||
const batches = this.buildEmbeddingBatches(chunks);
|
const batches = this.buildEmbeddingBatches(chunks);
|
||||||
const embeddings: number[][] = [];
|
const embeddings: number[][] = [];
|
||||||
for (const batch of batches) {
|
for (const batch of batches) {
|
||||||
const batchEmbeddings = await this.provider.embedBatch(batch.map((chunk) => chunk.text));
|
const batchEmbeddings = await this.embedBatchWithRetry(batch.map((chunk) => chunk.text));
|
||||||
for (let i = 0; i < batch.length; i += 1) {
|
for (let i = 0; i < batch.length; i += 1) {
|
||||||
embeddings.push(batchEmbeddings[i] ?? []);
|
embeddings.push(batchEmbeddings[i] ?? []);
|
||||||
}
|
}
|
||||||
@@ -1025,6 +1031,61 @@ export class MemoryIndexManager {
|
|||||||
return embeddings;
|
return embeddings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async embedBatchWithRetry(texts: string[]): Promise<number[][]> {
|
||||||
|
if (texts.length === 0) return [];
|
||||||
|
let attempt = 0;
|
||||||
|
let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return await this.provider.embedBatch(texts);
|
||||||
|
} catch (err) {
|
||||||
|
const message = err instanceof Error ? err.message : String(err);
|
||||||
|
if (!this.isRateLimitError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const waitMs = Math.min(
|
||||||
|
EMBEDDING_RETRY_MAX_DELAY_MS,
|
||||||
|
Math.round(delayMs * (1 + Math.random() * 0.2)),
|
||||||
|
);
|
||||||
|
log.warn(`memory embeddings rate limited; retrying in ${waitMs}ms`);
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, waitMs));
|
||||||
|
delayMs *= 2;
|
||||||
|
attempt += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private isRateLimitError(message: string): boolean {
|
||||||
|
return /(rate[_ ]limit|too many requests|429|resource has been exhausted)/i.test(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async runWithConcurrency<T>(tasks: Array<() => Promise<T>>, limit: number): Promise<T[]> {
|
||||||
|
if (tasks.length === 0) return [];
|
||||||
|
const resolvedLimit = Math.max(1, Math.min(limit, tasks.length));
|
||||||
|
const results: T[] = Array.from({ length: tasks.length });
|
||||||
|
let next = 0;
|
||||||
|
let firstError: unknown | null = null;
|
||||||
|
|
||||||
|
const workers = Array.from({ length: resolvedLimit }, async () => {
|
||||||
|
while (true) {
|
||||||
|
if (firstError) return;
|
||||||
|
const index = next;
|
||||||
|
next += 1;
|
||||||
|
if (index >= tasks.length) return;
|
||||||
|
try {
|
||||||
|
results[index] = await tasks[index]();
|
||||||
|
} catch (err) {
|
||||||
|
firstError = err;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.allSettled(workers);
|
||||||
|
if (firstError) throw firstError;
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
private async indexFile(
|
private async indexFile(
|
||||||
entry: MemoryFileEntry | SessionFileEntry,
|
entry: MemoryFileEntry | SessionFileEntry,
|
||||||
options: { source: MemorySource; content?: string },
|
options: { source: MemorySource; content?: string },
|
||||||
|
|||||||
Reference in New Issue
Block a user