From 4fdecfb845a06a4b7b4687745d06fe25d5b46fbd Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 17 Jan 2026 20:10:04 +0000 Subject: [PATCH] fix: split memory embedding batches --- CHANGELOG.md | 1 + src/memory/manager.embedding-batches.test.ts | 112 +++++++++++++++++++ src/memory/manager.ts | 51 ++++++++- 3 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 src/memory/manager.embedding-batches.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bc7380c3..dc780fd2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Docs: https://docs.clawd.bot - CLI: add WSL2/systemd unavailable hints in daemon status/doctor output. - Status: show both usage windows with reset hints when usage data is available. (#1101) — thanks @rhjoh. - Memory: probe sqlite-vec availability in `clawdbot memory status`. +- Memory: split embedding batches to avoid OpenAI token limits during indexing. ## 2026.1.16-2 diff --git a/src/memory/manager.embedding-batches.test.ts b/src/memory/manager.embedding-batches.test.ts new file mode 100644 index 000000000..f5f77aff4 --- /dev/null +++ b/src/memory/manager.embedding-batches.test.ts @@ -0,0 +1,112 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { getMemorySearchManager, type MemoryIndexManager } from "./index.js"; + +const embedBatch = vi.fn(async (texts: string[]) => texts.map(() => [0, 1, 0])); +const embedQuery = vi.fn(async () => [0, 1, 0]); + +vi.mock("./embeddings.js", () => ({ + createEmbeddingProvider: async () => ({ + requestedProvider: "openai", + provider: { + id: "mock", + model: "mock-embed", + embedQuery, + embedBatch, + }, + }), +})); + +describe("memory embedding batches", () => { + let workspaceDir: string; + let indexPath: string; + let manager: MemoryIndexManager | null = null; + + beforeEach(async () => { + embedBatch.mockClear(); + embedQuery.mockClear(); + workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-mem-")); + indexPath = path.join(workspaceDir, "index.sqlite"); + await fs.mkdir(path.join(workspaceDir, "memory")); + }); + + afterEach(async () => { + if (manager) { + await manager.close(); + manager = null; + } + await fs.rm(workspaceDir, { recursive: true, force: true }); + }); + + it("splits large files across multiple embedding batches", async () => { + const line = "a".repeat(200); + const content = Array.from({ length: 200 }, () => line).join("\n"); + await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-03.md"), content); + + const cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "mock-embed", + store: { path: indexPath }, + chunking: { tokens: 200, overlap: 0 }, + sync: { watch: false, onSessionStart: false, onSearch: false }, + query: { minScore: 0 }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + + const result = await getMemorySearchManager({ cfg, agentId: "main" }); + expect(result.manager).not.toBeNull(); + if (!result.manager) throw new Error("manager missing"); + manager = result.manager; + await manager.sync({ force: true }); + + const status = manager.status(); + const totalTexts = embedBatch.mock.calls.reduce( + (sum, call) => sum + (call[0]?.length ?? 0), + 0, + ); + expect(totalTexts).toBe(status.chunks); + expect(embedBatch.mock.calls.length).toBeGreaterThan(1); + }); + + it("keeps small files in a single embedding batch", async () => { + const line = "b".repeat(120); + const content = Array.from({ length: 12 }, () => line).join("\n"); + await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-04.md"), content); + + const cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "mock-embed", + store: { path: indexPath }, + chunking: { tokens: 200, overlap: 0 }, + sync: { watch: false, onSessionStart: false, onSearch: false }, + query: { minScore: 0 }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + + const result = await getMemorySearchManager({ cfg, agentId: "main" }); + expect(result.manager).not.toBeNull(); + if (!result.manager) throw new Error("manager missing"); + manager = result.manager; + await manager.sync({ force: true }); + + expect(embedBatch.mock.calls.length).toBe(1); + }); +}); diff --git a/src/memory/manager.ts b/src/memory/manager.ts index be13a9fce..e01e27376 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -25,6 +25,7 @@ import { hashText, isMemoryPath, listMemoryFiles, + type MemoryChunk, type MemoryFileEntry, normalizeRelPath, parseEmbedding, @@ -63,6 +64,8 @@ const META_KEY = "memory_index_meta_v1"; const SNIPPET_MAX_CHARS = 700; const VECTOR_TABLE = "chunks_vec"; const SESSION_DIRTY_DEBOUNCE_MS = 5000; +const EMBEDDING_BATCH_MAX_TOKENS = 8000; +const EMBEDDING_APPROX_CHARS_PER_TOKEN = 2; const log = createSubsystemLogger("memory"); @@ -857,13 +860,59 @@ export class MemoryIndexManager { } } + private estimateEmbeddingTokens(text: string): number { + if (!text) return 0; + return Math.ceil(text.length / EMBEDDING_APPROX_CHARS_PER_TOKEN); + } + + private buildEmbeddingBatches(chunks: MemoryChunk[]): MemoryChunk[][] { + const batches: MemoryChunk[][] = []; + let current: MemoryChunk[] = []; + let currentTokens = 0; + + for (const chunk of chunks) { + const estimate = this.estimateEmbeddingTokens(chunk.text); + const wouldExceed = + current.length > 0 && currentTokens + estimate > EMBEDDING_BATCH_MAX_TOKENS; + if (wouldExceed) { + batches.push(current); + current = []; + currentTokens = 0; + } + if (current.length === 0 && estimate > EMBEDDING_BATCH_MAX_TOKENS) { + batches.push([chunk]); + continue; + } + current.push(chunk); + currentTokens += estimate; + } + + if (current.length > 0) { + batches.push(current); + } + return batches; + } + + private async embedChunksInBatches(chunks: MemoryChunk[]): Promise { + if (chunks.length === 0) return []; + const batches = this.buildEmbeddingBatches(chunks); + const embeddings: number[][] = []; + for (const batch of batches) { + const batchEmbeddings = await this.provider.embedBatch(batch.map((chunk) => chunk.text)); + for (let i = 0; i < batch.length; i += 1) { + embeddings.push(batchEmbeddings[i] ?? []); + } + } + return embeddings; + } + private async indexFile( entry: MemoryFileEntry | SessionFileEntry, options: { source: MemorySource; content?: string }, ) { const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8")); const chunks = chunkMarkdown(content, this.settings.chunking); - const embeddings = await this.provider.embedBatch(chunks.map((chunk) => chunk.text)); + const embeddings = await this.embedChunksInBatches(chunks); const sample = embeddings.find((embedding) => embedding.length > 0); const vectorReady = sample ? await this.ensureVectorReady(sample.length) : false; const now = Date.now();