From 946477413372127d369267e96eadb554d1a80191 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 18 Jan 2026 15:29:30 +0000 Subject: [PATCH] feat(memory): add gemini batches + safe reindex Co-authored-by: Gustavo Madeira Santana --- src/memory/batch-gemini.ts | 407 ++++++++++++++++ src/memory/batch-openai.ts | 362 ++++++++++++++ src/memory/index.test.ts | 42 ++ src/memory/manager.ts | 939 ++++++++++++++++++++++++++----------- src/memory/openai-batch.ts | 437 +---------------- 5 files changed, 1472 insertions(+), 715 deletions(-) create mode 100644 src/memory/batch-gemini.ts create mode 100644 src/memory/batch-openai.ts diff --git a/src/memory/batch-gemini.ts b/src/memory/batch-gemini.ts new file mode 100644 index 000000000..6a4ddb12e --- /dev/null +++ b/src/memory/batch-gemini.ts @@ -0,0 +1,407 @@ +import { createSubsystemLogger } from "../logging.js"; +import type { GeminiEmbeddingClient } from "./embeddings-gemini.js"; +import { hashText } from "./internal.js"; + +export type GeminiBatchRequest = { + custom_id: string; + content: { parts: Array<{ text: string }> }; + taskType: "RETRIEVAL_DOCUMENT" | "RETRIEVAL_QUERY"; +}; + +export type GeminiBatchStatus = { + name?: string; + state?: string; + outputConfig?: { file?: string; fileId?: string }; + metadata?: { + output?: { + responsesFile?: string; + }; + }; + error?: { message?: string }; +}; + +export type GeminiBatchOutputLine = { + key?: string; + custom_id?: string; + request_id?: string; + embedding?: { values?: number[] }; + response?: { + embedding?: { values?: number[] }; + error?: { message?: string }; + }; + error?: { message?: string }; +}; + +const GEMINI_BATCH_MAX_REQUESTS = 50000; +const debugEmbeddings = process.env.CLAWDBOT_DEBUG_MEMORY_EMBEDDINGS === "1"; +const log = createSubsystemLogger("memory/embeddings"); + +const debugLog = (message: string, meta?: Record) => { + if (!debugEmbeddings) return; + const suffix = meta ? ` ${JSON.stringify(meta)}` : ""; + log.raw(`${message}${suffix}`); +}; + +function getGeminiBaseUrl(gemini: GeminiEmbeddingClient): string { + return gemini.baseUrl?.replace(/\/$/, "") ?? ""; +} + +function getGeminiHeaders( + gemini: GeminiEmbeddingClient, + params: { json: boolean }, +): Record { + const headers = gemini.headers ? { ...gemini.headers } : {}; + if (params.json) { + if (!headers["Content-Type"] && !headers["content-type"]) { + headers["Content-Type"] = "application/json"; + } + } else { + delete headers["Content-Type"]; + delete headers["content-type"]; + } + return headers; +} + +function getGeminiUploadUrl(baseUrl: string): string { + if (baseUrl.includes("/v1beta")) { + return baseUrl.replace(/\/v1beta\/?$/, "/upload/v1beta"); + } + return `${baseUrl.replace(/\/$/, "")}/upload`; +} + +function splitGeminiBatchRequests(requests: GeminiBatchRequest[]): GeminiBatchRequest[][] { + if (requests.length <= GEMINI_BATCH_MAX_REQUESTS) return [requests]; + const groups: GeminiBatchRequest[][] = []; + for (let i = 0; i < requests.length; i += GEMINI_BATCH_MAX_REQUESTS) { + groups.push(requests.slice(i, i + GEMINI_BATCH_MAX_REQUESTS)); + } + return groups; +} + +function buildGeminiUploadBody(params: { jsonl: string; displayName: string }): { + body: Blob; + contentType: string; +} { + const boundary = `clawdbot-${hashText(params.displayName)}`; + const jsonPart = JSON.stringify({ + file: { + displayName: params.displayName, + mimeType: "application/jsonl", + }, + }); + const delimiter = `--${boundary}\r\n`; + const closeDelimiter = `--${boundary}--\r\n`; + const parts = [ + `${delimiter}Content-Type: application/json; charset=UTF-8\r\n\r\n${jsonPart}\r\n`, + `${delimiter}Content-Type: application/jsonl; charset=UTF-8\r\n\r\n${params.jsonl}\r\n`, + closeDelimiter, + ]; + const body = new Blob([parts.join("")], { type: "multipart/related" }); + return { + body, + contentType: `multipart/related; boundary=${boundary}`, + }; +} + +async function submitGeminiBatch(params: { + gemini: GeminiEmbeddingClient; + requests: GeminiBatchRequest[]; + agentId: string; +}): Promise { + const baseUrl = getGeminiBaseUrl(params.gemini); + const jsonl = params.requests + .map((request) => + JSON.stringify({ + key: request.custom_id, + request: { + content: request.content, + task_type: request.taskType, + }, + }), + ) + .join("\n"); + const displayName = `memory-embeddings-${hashText(String(Date.now()))}`; + const uploadPayload = buildGeminiUploadBody({ jsonl, displayName }); + + const uploadUrl = `${getGeminiUploadUrl(baseUrl)}/files?uploadType=multipart`; + debugLog("memory embeddings: gemini batch upload", { + uploadUrl, + baseUrl, + requests: params.requests.length, + }); + const fileRes = await fetch(uploadUrl, { + method: "POST", + headers: { + ...getGeminiHeaders(params.gemini, { json: false }), + "Content-Type": uploadPayload.contentType, + }, + body: uploadPayload.body, + }); + if (!fileRes.ok) { + const text = await fileRes.text(); + throw new Error(`gemini batch file upload failed: ${fileRes.status} ${text}`); + } + const filePayload = (await fileRes.json()) as { name?: string; file?: { name?: string } }; + const fileId = filePayload.name ?? filePayload.file?.name; + if (!fileId) { + throw new Error("gemini batch file upload failed: missing file id"); + } + + const batchBody = { + batch: { + displayName: `memory-embeddings-${params.agentId}`, + inputConfig: { + file_name: fileId, + }, + }, + }; + + const batchEndpoint = `${baseUrl}/${params.gemini.modelPath}:asyncBatchEmbedContent`; + debugLog("memory embeddings: gemini batch create", { + batchEndpoint, + fileId, + }); + const batchRes = await fetch(batchEndpoint, { + method: "POST", + headers: getGeminiHeaders(params.gemini, { json: true }), + body: JSON.stringify(batchBody), + }); + if (batchRes.ok) { + return (await batchRes.json()) as GeminiBatchStatus; + } + const text = await batchRes.text(); + if (batchRes.status === 404) { + throw new Error( + "gemini batch create failed: 404 (asyncBatchEmbedContent not available for this model/baseUrl). Disable remote.batch.enabled or switch providers.", + ); + } + throw new Error(`gemini batch create failed: ${batchRes.status} ${text}`); +} + +async function fetchGeminiBatchStatus(params: { + gemini: GeminiEmbeddingClient; + batchName: string; +}): Promise { + const baseUrl = getGeminiBaseUrl(params.gemini); + const name = params.batchName.startsWith("batches/") ? params.batchName : `batches/${params.batchName}`; + const statusUrl = `${baseUrl}/${name}`; + debugLog("memory embeddings: gemini batch status", { statusUrl }); + const res = await fetch(statusUrl, { + headers: getGeminiHeaders(params.gemini, { json: true }), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(`gemini batch status failed: ${res.status} ${text}`); + } + return (await res.json()) as GeminiBatchStatus; +} + +async function fetchGeminiFileContent(params: { + gemini: GeminiEmbeddingClient; + fileId: string; +}): Promise { + const baseUrl = getGeminiBaseUrl(params.gemini); + const file = params.fileId.startsWith("files/") ? params.fileId : `files/${params.fileId}`; + const downloadUrl = `${baseUrl}/${file}:download`; + debugLog("memory embeddings: gemini batch download", { downloadUrl }); + const res = await fetch(downloadUrl, { + headers: getGeminiHeaders(params.gemini, { json: true }), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(`gemini batch file content failed: ${res.status} ${text}`); + } + return await res.text(); +} + +function parseGeminiBatchOutput(text: string): GeminiBatchOutputLine[] { + if (!text.trim()) return []; + return text + .split("\n") + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => JSON.parse(line) as GeminiBatchOutputLine); +} + +async function waitForGeminiBatch(params: { + gemini: GeminiEmbeddingClient; + batchName: string; + wait: boolean; + pollIntervalMs: number; + timeoutMs: number; + debug?: (message: string, data?: Record) => void; + initial?: GeminiBatchStatus; +}): Promise<{ outputFileId: string }> { + const start = Date.now(); + let current: GeminiBatchStatus | undefined = params.initial; + while (true) { + const status = + current ?? + (await fetchGeminiBatchStatus({ + gemini: params.gemini, + batchName: params.batchName, + })); + const state = status.state ?? "UNKNOWN"; + if (["SUCCEEDED", "COMPLETED", "DONE"].includes(state)) { + const outputFileId = + status.outputConfig?.file ?? + status.outputConfig?.fileId ?? + status.metadata?.output?.responsesFile; + if (!outputFileId) { + throw new Error(`gemini batch ${params.batchName} completed without output file`); + } + return { outputFileId }; + } + if (["FAILED", "CANCELLED", "CANCELED", "EXPIRED"].includes(state)) { + const message = status.error?.message ?? "unknown error"; + throw new Error(`gemini batch ${params.batchName} ${state}: ${message}`); + } + if (!params.wait) { + throw new Error(`gemini batch ${params.batchName} still ${state}; wait disabled`); + } + if (Date.now() - start > params.timeoutMs) { + throw new Error(`gemini batch ${params.batchName} timed out after ${params.timeoutMs}ms`); + } + params.debug?.(`gemini batch ${params.batchName} ${state}; waiting ${params.pollIntervalMs}ms`); + await new Promise((resolve) => setTimeout(resolve, params.pollIntervalMs)); + current = undefined; + } +} + +async function runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { + if (tasks.length === 0) return []; + const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); + const results: T[] = Array.from({ length: tasks.length }); + let next = 0; + let firstError: unknown = null; + + const workers = Array.from({ length: resolvedLimit }, async () => { + while (true) { + if (firstError) return; + const index = next; + next += 1; + if (index >= tasks.length) return; + try { + results[index] = await tasks[index](); + } catch (err) { + firstError = err; + return; + } + } + }); + + await Promise.allSettled(workers); + if (firstError) throw firstError; + return results; +} + +export async function runGeminiEmbeddingBatches(params: { + gemini: GeminiEmbeddingClient; + agentId: string; + requests: GeminiBatchRequest[]; + wait: boolean; + pollIntervalMs: number; + timeoutMs: number; + concurrency: number; + debug?: (message: string, data?: Record) => void; +}): Promise> { + if (params.requests.length === 0) return new Map(); + const groups = splitGeminiBatchRequests(params.requests); + const byCustomId = new Map(); + + const tasks = groups.map((group, groupIndex) => async () => { + const batchInfo = await submitGeminiBatch({ + gemini: params.gemini, + requests: group, + agentId: params.agentId, + }); + const batchName = batchInfo.name ?? ""; + if (!batchName) { + throw new Error("gemini batch create failed: missing batch name"); + } + + params.debug?.("memory embeddings: gemini batch created", { + batchName, + state: batchInfo.state, + group: groupIndex + 1, + groups: groups.length, + requests: group.length, + }); + + if (!params.wait && batchInfo.state && !["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state)) { + throw new Error( + `gemini batch ${batchName} submitted; enable remote.batch.wait to await completion`, + ); + } + + const completed = + batchInfo.state && ["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state) + ? { + outputFileId: + batchInfo.outputConfig?.file ?? + batchInfo.outputConfig?.fileId ?? + batchInfo.metadata?.output?.responsesFile ?? + "", + } + : await waitForGeminiBatch({ + gemini: params.gemini, + batchName, + wait: params.wait, + pollIntervalMs: params.pollIntervalMs, + timeoutMs: params.timeoutMs, + debug: params.debug, + initial: batchInfo, + }); + if (!completed.outputFileId) { + throw new Error(`gemini batch ${batchName} completed without output file`); + } + + const content = await fetchGeminiFileContent({ + gemini: params.gemini, + fileId: completed.outputFileId, + }); + const outputLines = parseGeminiBatchOutput(content); + const errors: string[] = []; + const remaining = new Set(group.map((request) => request.custom_id)); + + for (const line of outputLines) { + const customId = line.key ?? line.custom_id ?? line.request_id; + if (!customId) continue; + remaining.delete(customId); + if (line.error?.message) { + errors.push(`${customId}: ${line.error.message}`); + continue; + } + if (line.response?.error?.message) { + errors.push(`${customId}: ${line.response.error.message}`); + continue; + } + const embedding = + line.embedding?.values ?? line.response?.embedding?.values ?? []; + if (embedding.length === 0) { + errors.push(`${customId}: empty embedding`); + continue; + } + byCustomId.set(customId, embedding); + } + + if (errors.length > 0) { + throw new Error(`gemini batch ${batchName} failed: ${errors.join("; ")}`); + } + if (remaining.size > 0) { + throw new Error(`gemini batch ${batchName} missing ${remaining.size} embedding responses`); + } + }); + + params.debug?.("memory embeddings: gemini batch submit", { + requests: params.requests.length, + groups: groups.length, + wait: params.wait, + concurrency: params.concurrency, + pollIntervalMs: params.pollIntervalMs, + timeoutMs: params.timeoutMs, + }); + + await runWithConcurrency(tasks, params.concurrency); + return byCustomId; +} diff --git a/src/memory/batch-openai.ts b/src/memory/batch-openai.ts new file mode 100644 index 000000000..7d729832a --- /dev/null +++ b/src/memory/batch-openai.ts @@ -0,0 +1,362 @@ +import type { OpenAiEmbeddingClient } from "./embeddings-openai.js"; +import { hashText } from "./internal.js"; + +export type OpenAiBatchRequest = { + custom_id: string; + method: "POST"; + url: "/v1/embeddings"; + body: { + model: string; + input: string; + }; +}; + +export type OpenAiBatchStatus = { + id?: string; + status?: string; + output_file_id?: string | null; + error_file_id?: string | null; +}; + +export type OpenAiBatchOutputLine = { + custom_id?: string; + response?: { + status_code?: number; + body?: { + data?: Array<{ embedding?: number[]; index?: number }>; + error?: { message?: string }; + }; + }; + error?: { message?: string }; +}; + +export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings"; +const OPENAI_BATCH_COMPLETION_WINDOW = "24h"; +const OPENAI_BATCH_MAX_REQUESTS = 50000; + +function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string { + return openAi.baseUrl?.replace(/\/$/, "") ?? ""; +} + +function getOpenAiHeaders( + openAi: OpenAiEmbeddingClient, + params: { json: boolean }, +): Record { + const headers = openAi.headers ? { ...openAi.headers } : {}; + if (params.json) { + if (!headers["Content-Type"] && !headers["content-type"]) { + headers["Content-Type"] = "application/json"; + } + } else { + delete headers["Content-Type"]; + delete headers["content-type"]; + } + return headers; +} + +function splitOpenAiBatchRequests(requests: OpenAiBatchRequest[]): OpenAiBatchRequest[][] { + if (requests.length <= OPENAI_BATCH_MAX_REQUESTS) return [requests]; + const groups: OpenAiBatchRequest[][] = []; + for (let i = 0; i < requests.length; i += OPENAI_BATCH_MAX_REQUESTS) { + groups.push(requests.slice(i, i + OPENAI_BATCH_MAX_REQUESTS)); + } + return groups; +} + +async function submitOpenAiBatch(params: { + openAi: OpenAiEmbeddingClient; + requests: OpenAiBatchRequest[]; + agentId: string; +}): Promise { + const baseUrl = getOpenAiBaseUrl(params.openAi); + const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); + const form = new FormData(); + form.append("purpose", "batch"); + form.append( + "file", + new Blob([jsonl], { type: "application/jsonl" }), + `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, + ); + + const fileRes = await fetch(`${baseUrl}/files`, { + method: "POST", + headers: getOpenAiHeaders(params.openAi, { json: false }), + body: form, + }); + if (!fileRes.ok) { + const text = await fileRes.text(); + throw new Error(`openai batch file upload failed: ${fileRes.status} ${text}`); + } + const filePayload = (await fileRes.json()) as { id?: string }; + if (!filePayload.id) { + throw new Error("openai batch file upload failed: missing file id"); + } + + const batchRes = await fetch(`${baseUrl}/batches`, { + method: "POST", + headers: getOpenAiHeaders(params.openAi, { json: true }), + body: JSON.stringify({ + input_file_id: filePayload.id, + endpoint: OPENAI_BATCH_ENDPOINT, + completion_window: OPENAI_BATCH_COMPLETION_WINDOW, + metadata: { + source: "clawdbot-memory", + agent: params.agentId, + }, + }), + }); + if (!batchRes.ok) { + const text = await batchRes.text(); + throw new Error(`openai batch create failed: ${batchRes.status} ${text}`); + } + return (await batchRes.json()) as OpenAiBatchStatus; +} + +async function fetchOpenAiBatchStatus(params: { + openAi: OpenAiEmbeddingClient; + batchId: string; +}): Promise { + const baseUrl = getOpenAiBaseUrl(params.openAi); + const res = await fetch(`${baseUrl}/batches/${params.batchId}`, { + headers: getOpenAiHeaders(params.openAi, { json: true }), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(`openai batch status failed: ${res.status} ${text}`); + } + return (await res.json()) as OpenAiBatchStatus; +} + +async function fetchOpenAiFileContent(params: { + openAi: OpenAiEmbeddingClient; + fileId: string; +}): Promise { + const baseUrl = getOpenAiBaseUrl(params.openAi); + const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, { + headers: getOpenAiHeaders(params.openAi, { json: true }), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(`openai batch file content failed: ${res.status} ${text}`); + } + return await res.text(); +} + +function parseOpenAiBatchOutput(text: string): OpenAiBatchOutputLine[] { + if (!text.trim()) return []; + return text + .split("\n") + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => JSON.parse(line) as OpenAiBatchOutputLine); +} + +async function readOpenAiBatchError(params: { + openAi: OpenAiEmbeddingClient; + errorFileId: string; +}): Promise { + try { + const content = await fetchOpenAiFileContent({ + openAi: params.openAi, + fileId: params.errorFileId, + }); + const lines = parseOpenAiBatchOutput(content); + const first = lines.find((line) => line.error?.message || line.response?.body?.error); + const message = + first?.error?.message ?? + (typeof first?.response?.body?.error?.message === "string" + ? first?.response?.body?.error?.message + : undefined); + return message; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return message ? `error file unavailable: ${message}` : undefined; + } +} + +async function waitForOpenAiBatch(params: { + openAi: OpenAiEmbeddingClient; + batchId: string; + wait: boolean; + pollIntervalMs: number; + timeoutMs: number; + debug?: (message: string, data?: Record) => void; + initial?: OpenAiBatchStatus; +}): Promise<{ outputFileId: string; errorFileId?: string }> { + const start = Date.now(); + let current: OpenAiBatchStatus | undefined = params.initial; + while (true) { + const status = + current ?? + (await fetchOpenAiBatchStatus({ + openAi: params.openAi, + batchId: params.batchId, + })); + const state = status.status ?? "unknown"; + if (state === "completed") { + if (!status.output_file_id) { + throw new Error(`openai batch ${params.batchId} completed without output file`); + } + return { + outputFileId: status.output_file_id, + errorFileId: status.error_file_id ?? undefined, + }; + } + if (["failed", "expired", "cancelled", "canceled"].includes(state)) { + const detail = status.error_file_id + ? await readOpenAiBatchError({ openAi: params.openAi, errorFileId: status.error_file_id }) + : undefined; + const suffix = detail ? `: ${detail}` : ""; + throw new Error(`openai batch ${params.batchId} ${state}${suffix}`); + } + if (!params.wait) { + throw new Error(`openai batch ${params.batchId} still ${state}; wait disabled`); + } + if (Date.now() - start > params.timeoutMs) { + throw new Error(`openai batch ${params.batchId} timed out after ${params.timeoutMs}ms`); + } + params.debug?.(`openai batch ${params.batchId} ${state}; waiting ${params.pollIntervalMs}ms`); + await new Promise((resolve) => setTimeout(resolve, params.pollIntervalMs)); + current = undefined; + } +} + +async function runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { + if (tasks.length === 0) return []; + const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); + const results: T[] = Array.from({ length: tasks.length }); + let next = 0; + let firstError: unknown = null; + + const workers = Array.from({ length: resolvedLimit }, async () => { + while (true) { + if (firstError) return; + const index = next; + next += 1; + if (index >= tasks.length) return; + try { + results[index] = await tasks[index](); + } catch (err) { + firstError = err; + return; + } + } + }); + + await Promise.allSettled(workers); + if (firstError) throw firstError; + return results; +} + +export async function runOpenAiEmbeddingBatches(params: { + openAi: OpenAiEmbeddingClient; + agentId: string; + requests: OpenAiBatchRequest[]; + wait: boolean; + pollIntervalMs: number; + timeoutMs: number; + concurrency: number; + debug?: (message: string, data?: Record) => void; +}): Promise> { + if (params.requests.length === 0) return new Map(); + const groups = splitOpenAiBatchRequests(params.requests); + const byCustomId = new Map(); + + const tasks = groups.map((group, groupIndex) => async () => { + const batchInfo = await submitOpenAiBatch({ + openAi: params.openAi, + requests: group, + agentId: params.agentId, + }); + if (!batchInfo.id) { + throw new Error("openai batch create failed: missing batch id"); + } + + params.debug?.("memory embeddings: openai batch created", { + batchId: batchInfo.id, + status: batchInfo.status, + group: groupIndex + 1, + groups: groups.length, + requests: group.length, + }); + + if (!params.wait && batchInfo.status !== "completed") { + throw new Error( + `openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`, + ); + } + + const completed = + batchInfo.status === "completed" + ? { + outputFileId: batchInfo.output_file_id ?? "", + errorFileId: batchInfo.error_file_id ?? undefined, + } + : await waitForOpenAiBatch({ + openAi: params.openAi, + batchId: batchInfo.id, + wait: params.wait, + pollIntervalMs: params.pollIntervalMs, + timeoutMs: params.timeoutMs, + debug: params.debug, + initial: batchInfo, + }); + if (!completed.outputFileId) { + throw new Error(`openai batch ${batchInfo.id} completed without output file`); + } + + const content = await fetchOpenAiFileContent({ + openAi: params.openAi, + fileId: completed.outputFileId, + }); + const outputLines = parseOpenAiBatchOutput(content); + const errors: string[] = []; + const remaining = new Set(group.map((request) => request.custom_id)); + + for (const line of outputLines) { + const customId = line.custom_id; + if (!customId) continue; + remaining.delete(customId); + if (line.error?.message) { + errors.push(`${customId}: ${line.error.message}`); + continue; + } + const response = line.response; + const statusCode = response?.status_code ?? 0; + if (statusCode >= 400) { + const message = + response?.body?.error?.message ?? + (typeof response?.body === "string" ? response.body : undefined) ?? + "unknown error"; + errors.push(`${customId}: ${message}`); + continue; + } + const data = response?.body?.data ?? []; + const embedding = data[0]?.embedding ?? []; + if (embedding.length === 0) { + errors.push(`${customId}: empty embedding`); + continue; + } + byCustomId.set(customId, embedding); + } + + if (errors.length > 0) { + throw new Error(`openai batch ${batchInfo.id} failed: ${errors.join("; ")}`); + } + if (remaining.size > 0) { + throw new Error(`openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`); + } + }); + + params.debug?.("memory embeddings: openai batch submit", { + requests: params.requests.length, + groups: groups.length, + wait: params.wait, + concurrency: params.concurrency, + pollIntervalMs: params.pollIntervalMs, + timeoutMs: params.timeoutMs, + }); + + await runWithConcurrency(tasks, params.concurrency); + return byCustomId; +} diff --git a/src/memory/index.test.ts b/src/memory/index.test.ts index 4c920838b..498cf7119 100644 --- a/src/memory/index.test.ts +++ b/src/memory/index.test.ts @@ -7,6 +7,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { getMemorySearchManager, type MemoryIndexManager } from "./index.js"; let embedBatchCalls = 0; +let failEmbeddings = false; vi.mock("./embeddings.js", () => { const embedText = (text: string) => { @@ -24,6 +25,9 @@ vi.mock("./embeddings.js", () => { embedQuery: async (text: string) => embedText(text), embedBatch: async (texts: string[]) => { embedBatchCalls += 1; + if (failEmbeddings) { + throw new Error("mock embeddings failed"); + } return texts.map(embedText); }, }, @@ -38,6 +42,7 @@ describe("memory index", () => { beforeEach(async () => { embedBatchCalls = 0; + failEmbeddings = false; workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-mem-")); indexPath = path.join(workspaceDir, "index.sqlite"); await fs.mkdir(path.join(workspaceDir, "memory")); @@ -181,6 +186,43 @@ describe("memory index", () => { expect(embedBatchCalls).toBe(afterFirst); }); + it("preserves existing index when forced reindex fails", async () => { + const cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "mock-embed", + store: { path: indexPath, vector: { enabled: false } }, + sync: { watch: false, onSessionStart: false, onSearch: false }, + query: { minScore: 0 }, + cache: { enabled: false }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + const result = await getMemorySearchManager({ cfg, agentId: "main" }); + expect(result.manager).not.toBeNull(); + if (!result.manager) throw new Error("manager missing"); + manager = result.manager; + + await manager.sync({ force: true }); + const before = manager.status(); + expect(before.files).toBeGreaterThan(0); + + failEmbeddings = true; + await expect(manager.sync({ force: true })).rejects.toThrow(/mock embeddings failed/i); + + const after = manager.status(); + expect(after.files).toBe(before.files); + expect(after.chunks).toBe(before.chunks); + + const files = await fs.readdir(workspaceDir); + expect(files.some((name) => name.includes(".tmp-"))).toBe(false); + }); + it("finds keyword matches via hybrid search when query embedding is zero", async () => { const cfg = { agents: { diff --git a/src/memory/manager.ts b/src/memory/manager.ts index 0994c7379..11c9a68b6 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -1,3 +1,4 @@ +import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; @@ -19,16 +20,21 @@ import { type GeminiEmbeddingClient, type OpenAiEmbeddingClient, } from "./embeddings.js"; +import { DEFAULT_GEMINI_EMBEDDING_MODEL } from "./embeddings-gemini.js"; +import { DEFAULT_OPENAI_EMBEDDING_MODEL } from "./embeddings-openai.js"; import { OPENAI_BATCH_ENDPOINT, type OpenAiBatchRequest, runOpenAiEmbeddingBatches, -} from "./openai-batch.js"; +} from "./batch-openai.js"; +import { runGeminiEmbeddingBatches, type GeminiBatchRequest } from "./batch-gemini.js"; import { + buildFileEntry, chunkMarkdown, ensureDir, hashText, isMemoryPath, + listMemoryFiles, type MemoryChunk, type MemoryFileEntry, normalizeRelPath, @@ -37,13 +43,8 @@ import { import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js"; import { searchKeyword, searchVector } from "./manager-search.js"; import { ensureMemoryIndexSchema } from "./memory-schema.js"; -import { computeMemoryManagerCacheKey } from "./manager-cache-key.js"; -import { computeEmbeddingProviderKey } from "./provider-key.js"; import { requireNodeSqlite } from "./sqlite.js"; import { loadSqliteVecExtension } from "./sqlite-vec.js"; -import type { SessionFileEntry } from "./session-files.js"; -import { syncMemoryFiles } from "./sync-memory-files.js"; -import { syncSessionFiles } from "./sync-session-files.js"; type MemorySource = "memory" | "sessions"; @@ -65,6 +66,15 @@ type MemoryIndexMeta = { vectorDims?: number; }; +type SessionFileEntry = { + path: string; + absPath: string; + mtimeMs: number; + size: number; + hash: string; + content: string; +}; + type MemorySyncProgressUpdate = { completed: number; total: number; @@ -104,12 +114,13 @@ export class MemoryIndexManager { private readonly agentId: string; private readonly workspaceDir: string; private readonly settings: ResolvedMemorySearchConfig; - private readonly provider: EmbeddingProvider; - private readonly requestedProvider: "openai" | "gemini" | "local"; - private readonly fallbackReason?: string; - private readonly openAi?: OpenAiEmbeddingClient; - private readonly gemini?: GeminiEmbeddingClient; - private readonly batch: { + private provider: EmbeddingProvider; + private readonly requestedProvider: "openai" | "local" | "gemini" | "auto"; + private fallbackFrom?: "openai" | "local" | "gemini"; + private fallbackReason?: string; + private openAi?: OpenAiEmbeddingClient; + private gemini?: GeminiEmbeddingClient; + private batch: { enabled: boolean; wait: boolean; concurrency: number; @@ -118,7 +129,7 @@ export class MemoryIndexManager { }; private db: DatabaseSync; private readonly sources: Set; - private readonly providerKey: string; + private providerKey: string; private readonly cache: { enabled: boolean; maxEntries?: number }; private readonly vector: { enabled: boolean; @@ -144,7 +155,6 @@ export class MemoryIndexManager { private sessionsDirtyFiles = new Set(); private sessionWarm = new Set(); private syncing: Promise | null = null; - private readonly allowAtomicReindex: boolean; static async get(params: { cfg: ClawdbotConfig; @@ -154,7 +164,7 @@ export class MemoryIndexManager { const settings = resolveMemorySearchConfig(cfg, agentId); if (!settings) return null; const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId); - const key = computeMemoryManagerCacheKey({ agentId, workspaceDir, settings }); + const key = `${agentId}:${workspaceDir}:${JSON.stringify(settings)}`; const existing = INDEX_CACHE.get(key); if (existing) return existing; const providerResult = await createEmbeddingProvider({ @@ -185,7 +195,6 @@ export class MemoryIndexManager { workspaceDir: string; settings: ResolvedMemorySearchConfig; providerResult: EmbeddingProviderResult; - options?: { allowAtomicReindex?: boolean; enableBackgroundSync?: boolean }; }) { this.cacheKey = params.cacheKey; this.cfg = params.cfg; @@ -194,26 +203,13 @@ export class MemoryIndexManager { this.settings = params.settings; this.provider = params.providerResult.provider; this.requestedProvider = params.providerResult.requestedProvider; + this.fallbackFrom = params.providerResult.fallbackFrom; this.fallbackReason = params.providerResult.fallbackReason; this.openAi = params.providerResult.openAi; this.gemini = params.providerResult.gemini; - this.allowAtomicReindex = params.options?.allowAtomicReindex ?? true; this.sources = new Set(params.settings.sources); this.db = this.openDatabase(); - this.providerKey = computeEmbeddingProviderKey({ - providerId: this.provider.id, - providerModel: this.provider.model, - openAi: this.openAi - ? { baseUrl: this.openAi.baseUrl, model: this.openAi.model, headers: this.openAi.headers } - : undefined, - gemini: this.gemini - ? { - baseUrl: this.gemini.baseUrl, - model: this.gemini.model, - headers: this.gemini.headers, - } - : undefined, - }); + this.providerKey = this.computeProviderKey(); this.cache = { enabled: params.settings.cache.enabled, maxEntries: params.settings.cache.maxEntries, @@ -229,24 +225,14 @@ export class MemoryIndexManager { if (meta?.vectorDims) { this.vector.dims = meta.vectorDims; } - const enableBackgroundSync = params.options?.enableBackgroundSync ?? true; - if (enableBackgroundSync) { - this.ensureWatcher(); - this.ensureSessionListener(); - this.ensureIntervalSync(); - } + this.ensureWatcher(); + this.ensureSessionListener(); + this.ensureIntervalSync(); this.dirty = this.sources.has("memory"); if (this.sources.has("sessions")) { this.sessionsDirty = true; } - const batch = params.settings.remote?.batch; - this.batch = { - enabled: Boolean(batch?.enabled && this.openAi && this.provider.id === "openai"), - wait: batch?.wait ?? true, - concurrency: Math.max(1, batch?.concurrency ?? 2), - pollIntervalMs: batch?.pollIntervalMs ?? 2000, - timeoutMs: (batch?.timeoutMinutes ?? 60) * 60 * 1000, - }; + this.batch = this.resolveBatchConfig(); } async warmSession(sessionKey?: string): Promise { @@ -502,7 +488,9 @@ export class MemoryIndexManager { available: this.fts.available, error: this.fts.loadError, }, - fallback: this.fallbackReason ? { from: "local", reason: this.fallbackReason } : undefined, + fallback: this.fallbackReason + ? { from: this.fallbackFrom ?? "local", reason: this.fallbackReason } + : undefined, vector: { enabled: this.vector.enabled, available: this.vector.available ?? undefined, @@ -624,12 +612,96 @@ export class MemoryIndexManager { private openDatabase(): DatabaseSync { const dbPath = resolveUserPath(this.settings.store.path); + return this.openDatabaseAtPath(dbPath); + } + + private openDatabaseAtPath(dbPath: string): DatabaseSync { const dir = path.dirname(dbPath); ensureDir(dir); const { DatabaseSync } = requireNodeSqlite(); return new DatabaseSync(dbPath, { allowExtension: this.settings.store.vector.enabled }); } + private seedEmbeddingCache(sourceDb: DatabaseSync): void { + if (!this.cache.enabled) return; + try { + const rows = sourceDb + .prepare( + `SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`, + ) + .all() as Array<{ + provider: string; + model: string; + provider_key: string; + hash: string; + embedding: string; + dims: number | null; + updated_at: number; + }>; + if (!rows.length) return; + const insert = this.db.prepare( + `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET + embedding=excluded.embedding, + dims=excluded.dims, + updated_at=excluded.updated_at`, + ); + this.db.exec("BEGIN"); + for (const row of rows) { + insert.run( + row.provider, + row.model, + row.provider_key, + row.hash, + row.embedding, + row.dims, + row.updated_at, + ); + } + this.db.exec("COMMIT"); + } catch (err) { + try { + this.db.exec("ROLLBACK"); + } catch {} + throw err; + } + } + + private async swapIndexFiles(targetPath: string, tempPath: string): Promise { + const backupPath = `${targetPath}.backup-${randomUUID()}`; + await this.moveIndexFiles(targetPath, backupPath); + try { + await this.moveIndexFiles(tempPath, targetPath); + } catch (err) { + await this.moveIndexFiles(backupPath, targetPath); + throw err; + } + await this.removeIndexFiles(backupPath); + } + + private async moveIndexFiles(sourceBase: string, targetBase: string): Promise { + const suffixes = ["", "-wal", "-shm"]; + for (const suffix of suffixes) { + const source = `${sourceBase}${suffix}`; + const target = `${targetBase}${suffix}`; + try { + await fs.rename(source, target); + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") { + throw err; + } + } + } + } + + private async removeIndexFiles(basePath: string): Promise { + const suffixes = ["", "-wal", "-shm"]; + await Promise.all( + suffixes.map((suffix) => fs.rm(`${basePath}${suffix}`, { force: true })), + ); + } + private ensureSchema() { const result = ensureMemoryIndexSchema({ db: this.db, @@ -730,44 +802,170 @@ export class MemoryIndexManager { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { - await syncMemoryFiles({ - workspaceDir: this.workspaceDir, - db: this.db, + const files = await listMemoryFiles(this.workspaceDir); + const fileEntries = await Promise.all( + files.map(async (file) => buildFileEntry(file, this.workspaceDir)), + ); + log.debug("memory sync: indexing memory files", { + files: fileEntries.length, needsFullReindex: params.needsFullReindex, - progress: params.progress, - batchEnabled: this.batch.enabled, + batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), - runWithConcurrency: this.runWithConcurrency.bind(this), - indexFile: async (entry) => await this.indexFile(entry, { source: "memory" }), - vectorTable: VECTOR_TABLE, - ftsTable: FTS_TABLE, - ftsEnabled: this.fts.enabled, - ftsAvailable: this.fts.available, - model: this.provider.model, }); + const activePaths = new Set(fileEntries.map((entry) => entry.path)); + if (params.progress) { + params.progress.total += fileEntries.length; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + label: this.batch.enabled ? "Indexing memory files (batch)..." : "Indexing memory files…", + }); + } + + const tasks = fileEntries.map((entry) => async () => { + const record = this.db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(entry.path, "memory") as { hash: string } | undefined; + if (!params.needsFullReindex && record?.hash === entry.hash) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + await this.indexFile(entry, { source: "memory" }); + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + }); + await this.runWithConcurrency(tasks, this.getIndexConcurrency()); + + const staleRows = this.db + .prepare(`SELECT path FROM files WHERE source = ?`) + .all("memory") as Array<{ path: string }>; + for (const stale of staleRows) { + if (activePaths.has(stale.path)) continue; + this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); + try { + this.db + .prepare( + `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, + ) + .run(stale.path, "memory"); + } catch {} + this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); + if (this.fts.enabled && this.fts.available) { + try { + this.db + .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) + .run(stale.path, "memory", this.provider.model); + } catch {} + } + } } private async syncSessionFiles(params: { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { - await syncSessionFiles({ - agentId: this.agentId, - db: this.db, - needsFullReindex: params.needsFullReindex, - progress: params.progress, - batchEnabled: this.batch.enabled, + const files = await this.listSessionFiles(); + const activePaths = new Set(files.map((file) => this.sessionPathForFile(file))); + const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0; + log.debug("memory sync: indexing session files", { + files: files.length, + indexAll, + dirtyFiles: this.sessionsDirtyFiles.size, + batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), - runWithConcurrency: this.runWithConcurrency.bind(this), - indexFile: async (entry) => - await this.indexFile(entry, { source: "sessions", content: entry.content }), - vectorTable: VECTOR_TABLE, - ftsTable: FTS_TABLE, - ftsEnabled: this.fts.enabled, - ftsAvailable: this.fts.available, - model: this.provider.model, - dirtyFiles: this.sessionsDirtyFiles, }); + if (params.progress) { + params.progress.total += files.length; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + label: this.batch.enabled ? "Indexing session files (batch)..." : "Indexing session files…", + }); + } + + const tasks = files.map((absPath) => async () => { + if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + const entry = await this.buildSessionEntry(absPath); + if (!entry) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + const record = this.db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(entry.path, "sessions") as { hash: string } | undefined; + if (!params.needsFullReindex && record?.hash === entry.hash) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + await this.indexFile(entry, { source: "sessions", content: entry.content }); + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + }); + await this.runWithConcurrency(tasks, this.getIndexConcurrency()); + + const staleRows = this.db + .prepare(`SELECT path FROM files WHERE source = ?`) + .all("sessions") as Array<{ path: string }>; + for (const stale of staleRows) { + if (activePaths.has(stale.path)) continue; + this.db + .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) + .run(stale.path, "sessions"); + try { + this.db + .prepare( + `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, + ) + .run(stale.path, "sessions"); + } catch {} + this.db + .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) + .run(stale.path, "sessions"); + if (this.fts.enabled && this.fts.available) { + try { + this.db + .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) + .run(stale.path, "sessions", this.provider.model); + } catch {} + } + } } private createSyncProgress( @@ -798,7 +996,7 @@ export class MemoryIndexManager { force?: boolean; progress?: (update: MemorySyncProgressUpdate) => void; }) { - const progressCallback = params?.progress; + const progress = params?.progress ? this.createSyncProgress(params.progress) : undefined; const vectorReady = await this.ensureVectorReady(); const meta = this.readMeta(); const needsFullReindex = @@ -810,228 +1008,209 @@ export class MemoryIndexManager { meta.chunkTokens !== this.settings.chunking.tokens || meta.chunkOverlap !== this.settings.chunking.overlap || (vectorReady && !meta?.vectorDims); - if (needsFullReindex && this.allowAtomicReindex) { - await this.runAtomicReindex({ reason: params?.reason, progress: progressCallback }); - return; - } - - const progress = progressCallback ? this.createSyncProgress(progressCallback) : undefined; - if (needsFullReindex) { - this.resetIndex(); - } - - const shouldSyncMemory = - this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty); - const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); - - if (shouldSyncMemory) { - await this.syncMemoryFiles({ needsFullReindex, progress: progress ?? undefined }); - this.dirty = false; - } - - if (shouldSyncSessions) { - await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined }); - this.sessionsDirty = false; - this.sessionsDirtyFiles.clear(); - } else if (needsFullReindex && this.sources.has("sessions")) { - this.sessionsDirty = true; - } - - const nextMeta: MemoryIndexMeta = { - model: this.provider.model, - provider: this.provider.id, - providerKey: this.providerKey, - chunkTokens: this.settings.chunking.tokens, - chunkOverlap: this.settings.chunking.overlap, - }; - if (this.vector.available && this.vector.dims) { - nextMeta.vectorDims = this.vector.dims; - } - if (shouldSyncMemory || shouldSyncSessions || needsFullReindex) { - this.writeMeta(nextMeta); - } - if (shouldSyncMemory || shouldSyncSessions || needsFullReindex) { - this.pruneEmbeddingCacheIfNeeded(); - } - } - - private createScratchManager(tempPath: string): MemoryIndexManager { - const scratchSettings: ResolvedMemorySearchConfig = { - ...this.settings, - store: { - ...this.settings.store, - path: tempPath, - }, - sync: { - ...this.settings.sync, - watch: false, - intervalMinutes: 0, - }, - }; - - return new MemoryIndexManager({ - cacheKey: `${this.cacheKey}:scratch:${Date.now()}`, - cfg: this.cfg, - agentId: this.agentId, - workspaceDir: this.workspaceDir, - settings: scratchSettings, - providerResult: { - provider: this.provider, - requestedProvider: this.requestedProvider, - fallbackReason: this.fallbackReason, - openAi: this.openAi, - gemini: this.gemini, - }, - options: { - allowAtomicReindex: false, - enableBackgroundSync: false, - }, - }); - } - - private buildTempIndexPath(): string { - const basePath = resolveUserPath(this.settings.store.path); - const dir = path.dirname(basePath); - ensureDir(dir); - const stamp = `${Date.now()}-${Math.random().toString(16).slice(2, 10)}`; - return path.join(dir, `${path.basename(basePath)}.tmp-${stamp}`); - } - - private seedEmbeddingCacheFrom(source: DatabaseSync): void { - if (!this.cache.enabled) return; try { - const insert = this.db.prepare( - `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` + - ` VALUES (?, ?, ?, ?, ?, ?, ?)\n` + - ` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` + - ` embedding=excluded.embedding,\n` + - ` dims=excluded.dims,\n` + - ` updated_at=excluded.updated_at`, - ); - const select = source.prepare( - `SELECT rowid, provider, model, provider_key, hash, embedding, dims, updated_at\n` + - ` FROM ${EMBEDDING_CACHE_TABLE}\n` + - ` WHERE provider = ? AND model = ? AND provider_key = ? AND rowid > ?\n` + - ` ORDER BY rowid\n` + - ` LIMIT ?`, - ); - const batchSize = 500; - let lastRowId = 0; - while (true) { - const rows = select.all( - this.provider.id, - this.provider.model, - this.providerKey, - lastRowId, - batchSize, - ) as Array<{ - rowid: number; - provider: string; - model: string; - provider_key: string; - hash: string; - embedding: string; - dims: number | null; - updated_at: number; - }>; - if (rows.length === 0) break; - for (const row of rows) { - insert.run( - row.provider, - row.model, - row.provider_key, - row.hash, - row.embedding, - row.dims, - row.updated_at, - ); - lastRowId = row.rowid; - } + if (needsFullReindex) { + await this.runSafeReindex({ + reason: params?.reason, + force: params?.force, + progress: progress ?? undefined, + }); + return; } - } catch { - // Swallow cache seed errors to avoid blocking indexing. + + const shouldSyncMemory = + this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty); + const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); + + if (shouldSyncMemory) { + await this.syncMemoryFiles({ needsFullReindex, progress: progress ?? undefined }); + this.dirty = false; + } + + if (shouldSyncSessions) { + await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined }); + this.sessionsDirty = false; + this.sessionsDirtyFiles.clear(); + } else if (this.sources.has("sessions")) { + this.sessionsDirty = true; + } + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + const activated = + this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); + if (activated) { + await this.runSafeReindex({ + reason: params?.reason ?? "fallback", + force: true, + progress: progress ?? undefined, + }); + return; + } + throw err; } } - private reopenDatabase() { - this.db = this.openDatabase(); + private shouldFallbackOnError(message: string): boolean { + return /embedding|embeddings|batch/i.test(message); + } + + private resolveBatchConfig(): { + enabled: boolean; + wait: boolean; + concurrency: number; + pollIntervalMs: number; + timeoutMs: number; + } { + const batch = this.settings.remote?.batch; + const enabled = Boolean( + batch?.enabled && + ((this.openAi && this.provider.id === "openai") || + (this.gemini && this.provider.id === "gemini")), + ); + return { + enabled, + wait: batch?.wait ?? true, + concurrency: Math.max(1, batch?.concurrency ?? 2), + pollIntervalMs: batch?.pollIntervalMs ?? 2000, + timeoutMs: (batch?.timeoutMinutes ?? 60) * 60 * 1000, + }; + } + + private async activateFallbackProvider(reason: string): Promise { + const fallback = this.settings.fallback; + if (!fallback || fallback === "none" || fallback === this.provider.id) return false; + if (this.fallbackFrom) return false; + + const fallbackModel = + fallback === "gemini" + ? DEFAULT_GEMINI_EMBEDDING_MODEL + : fallback === "openai" + ? DEFAULT_OPENAI_EMBEDDING_MODEL + : this.settings.model; + + const fallbackResult = await createEmbeddingProvider({ + config: this.cfg, + agentDir: resolveAgentDir(this.cfg, this.agentId), + provider: fallback, + remote: this.settings.remote, + model: fallbackModel, + fallback: "none", + local: this.settings.local, + }); + + this.fallbackFrom = this.requestedProvider; + this.fallbackReason = reason; + this.provider = fallbackResult.provider; + this.openAi = fallbackResult.openAi; + this.gemini = fallbackResult.gemini; + this.providerKey = this.computeProviderKey(); + this.batch = this.resolveBatchConfig(); + log.warn(`memory embeddings: switched to fallback provider (${fallback})`, { reason }); + return true; + } + + private async runSafeReindex(params: { + reason?: string; + force?: boolean; + progress?: MemorySyncProgressState; + }): Promise { + const dbPath = resolveUserPath(this.settings.store.path); + const tempDbPath = `${dbPath}.tmp-${randomUUID()}`; + const tempDb = this.openDatabaseAtPath(tempDbPath); + + const originalDb = this.db; + let originalDbClosed = false; + const originalState = { + ftsAvailable: this.fts.available, + ftsError: this.fts.loadError, + vectorAvailable: this.vector.available, + vectorLoadError: this.vector.loadError, + vectorDims: this.vector.dims, + vectorReady: this.vectorReady, + }; + + const restoreOriginalState = () => { + if (originalDbClosed) { + this.db = this.openDatabaseAtPath(dbPath); + } else { + this.db = originalDb; + } + this.fts.available = originalState.ftsAvailable; + this.fts.loadError = originalState.ftsError; + this.vector.available = originalDbClosed ? null : originalState.vectorAvailable; + this.vector.loadError = originalState.vectorLoadError; + this.vector.dims = originalState.vectorDims; + this.vectorReady = originalDbClosed ? null : originalState.vectorReady; + }; + + this.db = tempDb; + this.vectorReady = null; + this.vector.available = null; + this.vector.loadError = undefined; + this.vector.dims = undefined; this.fts.available = false; this.fts.loadError = undefined; this.ensureSchema(); - this.vector.available = null; - this.vector.loadError = undefined; - this.vectorReady = null; - this.vector.dims = undefined; - const meta = this.readMeta(); - if (meta?.vectorDims) { - this.vector.dims = meta.vectorDims; - } - } - private async swapIndexFile(tempPath: string): Promise { - const dbPath = resolveUserPath(this.settings.store.path); - const backupPath = `${dbPath}.bak-${Date.now()}`; - let hasBackup = false; - let shouldReopen = false; - - this.db.close(); + let nextMeta: MemoryIndexMeta | null = null; try { + this.seedEmbeddingCache(originalDb); + const shouldSyncMemory = this.sources.has("memory"); + const shouldSyncSessions = this.shouldSyncSessions( + { reason: params.reason, force: params.force }, + true, + ); + + if (shouldSyncMemory) { + await this.syncMemoryFiles({ needsFullReindex: true, progress: params.progress }); + this.dirty = false; + } + + if (shouldSyncSessions) { + await this.syncSessionFiles({ needsFullReindex: true, progress: params.progress }); + this.sessionsDirty = false; + this.sessionsDirtyFiles.clear(); + } else if (this.sources.has("sessions")) { + this.sessionsDirty = true; + } + + nextMeta = { + model: this.provider.model, + provider: this.provider.id, + providerKey: this.providerKey, + chunkTokens: this.settings.chunking.tokens, + chunkOverlap: this.settings.chunking.overlap, + }; + if (this.vector.available && this.vector.dims) { + nextMeta.vectorDims = this.vector.dims; + } + + this.writeMeta(nextMeta); + this.pruneEmbeddingCacheIfNeeded(); + + this.db.close(); + originalDb.close(); + originalDbClosed = true; + + await this.swapIndexFiles(dbPath, tempDbPath); + + this.db = this.openDatabaseAtPath(dbPath); + this.vectorReady = null; + this.vector.available = null; + this.vector.loadError = undefined; + this.ensureSchema(); + this.vector.dims = nextMeta.vectorDims; + } catch (err) { try { - await fs.rename(dbPath, backupPath); - hasBackup = true; - } catch (err) { - const code = (err as NodeJS.ErrnoException).code; - if (code !== "ENOENT") throw err; - } - await fs.rename(tempPath, dbPath); - shouldReopen = true; - if (hasBackup) { - await fs.rm(backupPath, { force: true }); - } - } catch (err) { - if (hasBackup) { - try { - await fs.rename(backupPath, dbPath); - shouldReopen = true; - } catch {} - } - if (!shouldReopen) { - try { - await fs.access(dbPath); - shouldReopen = true; - } catch {} - } + this.db.close(); + } catch {} + await this.removeIndexFiles(tempDbPath); + restoreOriginalState(); throw err; - } finally { - await fs.rm(tempPath, { force: true }); - if (shouldReopen) { - this.reopenDatabase(); - } } } - private async runAtomicReindex(params: { - reason?: string; - progress?: (update: MemorySyncProgressUpdate) => void; - }) { - const tempPath = this.buildTempIndexPath(); - const scratch = this.createScratchManager(tempPath); - try { - scratch.seedEmbeddingCacheFrom(this.db); - await scratch.sync({ reason: params.reason, force: true, progress: params.progress }); - } catch (err) { - await fs.rm(tempPath, { force: true }); - throw err; - } finally { - await scratch.close().catch(() => undefined); - } - await this.swapIndexFile(tempPath); - this.dirty = false; - this.sessionsDirty = false; - this.sessionsDirtyFiles.clear(); - } - private resetIndex() { this.db.exec(`DELETE FROM files`); this.db.exec(`DELETE FROM chunks`); @@ -1066,6 +1245,95 @@ export class MemoryIndexManager { .run(META_KEY, value); } + private async listSessionFiles(): Promise { + const dir = resolveSessionTranscriptsDirForAgent(this.agentId); + try { + const entries = await fs.readdir(dir, { withFileTypes: true }); + return entries + .filter((entry) => entry.isFile()) + .map((entry) => entry.name) + .filter((name) => name.endsWith(".jsonl")) + .map((name) => path.join(dir, name)); + } catch { + return []; + } + } + + private sessionPathForFile(absPath: string): string { + return path.join("sessions", path.basename(absPath)).replace(/\\/g, "/"); + } + + private normalizeSessionText(value: string): string { + return value + .replace(/\s*\n+\s*/g, " ") + .replace(/\s+/g, " ") + .trim(); + } + + private extractSessionText(content: unknown): string | null { + if (typeof content === "string") { + const normalized = this.normalizeSessionText(content); + return normalized ? normalized : null; + } + if (!Array.isArray(content)) return null; + const parts: string[] = []; + for (const block of content) { + if (!block || typeof block !== "object") continue; + const record = block as { type?: unknown; text?: unknown }; + if (record.type !== "text" || typeof record.text !== "string") continue; + const normalized = this.normalizeSessionText(record.text); + if (normalized) parts.push(normalized); + } + if (parts.length === 0) return null; + return parts.join(" "); + } + + private async buildSessionEntry(absPath: string): Promise { + try { + const stat = await fs.stat(absPath); + const raw = await fs.readFile(absPath, "utf-8"); + const lines = raw.split("\n"); + const collected: string[] = []; + for (const line of lines) { + if (!line.trim()) continue; + let record: unknown; + try { + record = JSON.parse(line); + } catch { + continue; + } + if ( + !record || + typeof record !== "object" || + (record as { type?: unknown }).type !== "message" + ) { + continue; + } + const message = (record as { message?: unknown }).message as + | { role?: unknown; content?: unknown } + | undefined; + if (!message || typeof message.role !== "string") continue; + if (message.role !== "user" && message.role !== "assistant") continue; + const text = this.extractSessionText(message.content); + if (!text) continue; + const label = message.role === "user" ? "User" : "Assistant"; + collected.push(`${label}: ${text}`); + } + const content = collected.join("\n"); + return { + path: this.sessionPathForFile(absPath), + absPath, + mtimeMs: stat.mtimeMs, + size: stat.size, + hash: hashText(content), + content, + }; + } catch (err) { + log.debug(`Failed reading session file ${absPath}: ${String(err)}`); + return null; + } + } + private estimateEmbeddingTokens(text: string): number { if (!text) return 0; return Math.ceil(text.length / EMBEDDING_APPROX_CHARS_PER_TOKEN); @@ -1217,10 +1485,59 @@ export class MemoryIndexManager { return embeddings; } + private computeProviderKey(): string { + if (this.provider.id === "openai" && this.openAi) { + const entries = Object.entries(this.openAi.headers) + .filter(([key]) => key.toLowerCase() !== "authorization") + .sort(([a], [b]) => a.localeCompare(b)) + .map(([key, value]) => [key, value]); + return hashText( + JSON.stringify({ + provider: "openai", + baseUrl: this.openAi.baseUrl, + model: this.openAi.model, + headers: entries, + }), + ); + } + if (this.provider.id === "gemini" && this.gemini) { + const entries = Object.entries(this.gemini.headers) + .filter(([key]) => { + const lower = key.toLowerCase(); + return lower !== "authorization" && lower !== "x-goog-api-key"; + }) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([key, value]) => [key, value]); + return hashText( + JSON.stringify({ + provider: "gemini", + baseUrl: this.gemini.baseUrl, + model: this.gemini.model, + headers: entries, + }), + ); + } + return hashText(JSON.stringify({ provider: this.provider.id, model: this.provider.model })); + } + private async embedChunksWithBatch( chunks: MemoryChunk[], entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, + ): Promise { + if (this.provider.id === "openai" && this.openAi) { + return this.embedChunksWithOpenAiBatch(chunks, entry, source); + } + if (this.provider.id === "gemini" && this.gemini) { + return this.embedChunksWithGeminiBatch(chunks, entry, source); + } + return this.embedChunksInBatches(chunks); + } + + private async embedChunksWithOpenAiBatch( + chunks: MemoryChunk[], + entry: MemoryFileEntry | SessionFileEntry, + source: MemorySource, ): Promise { if (!this.openAi) { return this.embedChunksInBatches(chunks); @@ -1282,6 +1599,68 @@ export class MemoryIndexManager { return embeddings; } + private async embedChunksWithGeminiBatch( + chunks: MemoryChunk[], + entry: MemoryFileEntry | SessionFileEntry, + source: MemorySource, + ): Promise { + if (!this.gemini) { + return this.embedChunksInBatches(chunks); + } + if (chunks.length === 0) return []; + const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); + const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); + const missing: Array<{ index: number; chunk: MemoryChunk }> = []; + + for (let i = 0; i < chunks.length; i += 1) { + const chunk = chunks[i]; + const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; + if (hit && hit.length > 0) { + embeddings[i] = hit; + } else if (chunk) { + missing.push({ index: i, chunk }); + } + } + + if (missing.length === 0) return embeddings; + + const requests: GeminiBatchRequest[] = []; + const mapping = new Map(); + for (const item of missing) { + const chunk = item.chunk; + const customId = hashText( + `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, + ); + mapping.set(customId, { index: item.index, hash: chunk.hash }); + requests.push({ + custom_id: customId, + content: { parts: [{ text: chunk.text }] }, + taskType: "RETRIEVAL_DOCUMENT", + }); + } + + const byCustomId = await runGeminiEmbeddingBatches({ + gemini: this.gemini, + agentId: this.agentId, + requests, + wait: this.batch.wait, + concurrency: this.batch.concurrency, + pollIntervalMs: this.batch.pollIntervalMs, + timeoutMs: this.batch.timeoutMs, + debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), + }); + + const toCache: Array<{ hash: string; embedding: number[] }> = []; + for (const [customId, embedding] of byCustomId.entries()) { + const mapped = mapping.get(customId); + if (!mapped) continue; + embeddings[mapped.index] = embedding; + toCache.push({ hash: mapped.hash, embedding }); + } + this.upsertEmbeddingCache(toCache); + return embeddings; + } + private async embedBatchWithRetry(texts: string[]): Promise { if (texts.length === 0) return []; let attempt = 0; diff --git a/src/memory/openai-batch.ts b/src/memory/openai-batch.ts index 69851f810..b828b7df9 100644 --- a/src/memory/openai-batch.ts +++ b/src/memory/openai-batch.ts @@ -1,435 +1,2 @@ -import { retryAsync } from "../infra/retry.js"; -import type { OpenAiEmbeddingClient } from "./embeddings.js"; -import { hashText } from "./internal.js"; - -export type OpenAiBatchRequest = { - custom_id: string; - method: "POST"; - url: "/v1/embeddings"; - body: { - model: string; - input: string; - }; -}; - -export type OpenAiBatchStatus = { - id?: string; - status?: string; - output_file_id?: string | null; - error_file_id?: string | null; -}; - -export type OpenAiBatchOutputLine = { - custom_id?: string; - response?: { - status_code?: number; - body?: { - data?: Array<{ embedding?: number[]; index?: number }>; - error?: { message?: string }; - }; - }; - error?: { message?: string }; -}; - -export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings"; -const OPENAI_BATCH_COMPLETION_WINDOW = "24h"; -const OPENAI_BATCH_MAX_REQUESTS = 50000; -const OPENAI_BATCH_RETRY = { - attempts: 3, - minDelayMs: 500, - maxDelayMs: 5000, - jitter: 0.1, -}; - -type RetryableError = Error & { status?: number }; - -function isRetryableBatchError(err: unknown): boolean { - const status = - typeof (err as RetryableError)?.status === "number" ? (err as RetryableError).status : undefined; - if (typeof status === "number") { - return status === 429 || status >= 500; - } - const message = err instanceof Error ? err.message : String(err); - return /timeout|timed out|ECONNRESET|ECONNREFUSED|EHOSTUNREACH|ENOTFOUND|EAI_AGAIN|network|fetch failed|upstream connect/i.test( - message, - ); -} - -function formatRetryError(err: unknown): string { - return err instanceof Error ? err.message : String(err); -} - -function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string { - return openAi.baseUrl?.replace(/\/$/, "") ?? ""; -} - -function getOpenAiHeaders( - openAi: OpenAiEmbeddingClient, - params: { json: boolean }, -): Record { - const headers = openAi.headers ? { ...openAi.headers } : {}; - if (params.json) { - if (!headers["Content-Type"] && !headers["content-type"]) { - headers["Content-Type"] = "application/json"; - } - } else { - delete headers["Content-Type"]; - delete headers["content-type"]; - } - return headers; -} - -function splitOpenAiBatchRequests(requests: OpenAiBatchRequest[]): OpenAiBatchRequest[][] { - if (requests.length <= OPENAI_BATCH_MAX_REQUESTS) return [requests]; - const groups: OpenAiBatchRequest[][] = []; - for (let i = 0; i < requests.length; i += OPENAI_BATCH_MAX_REQUESTS) { - groups.push(requests.slice(i, i + OPENAI_BATCH_MAX_REQUESTS)); - } - return groups; -} - -async function fetchOpenAiWithRetry(params: { - openAi: OpenAiEmbeddingClient; - url: string; - init?: RequestInit; - label: string; - debug?: (message: string, data?: Record) => void; -}): Promise { - return await retryAsync( - async () => { - const res = await fetch(params.url, params.init); - if (!res.ok) { - const text = await res.text(); - const err = new Error(`openai batch ${params.label} failed: ${res.status} ${text}`); - (err as RetryableError).status = res.status; - throw err; - } - return res; - }, - { - ...OPENAI_BATCH_RETRY, - label: params.label, - shouldRetry: isRetryableBatchError, - onRetry: (info) => { - params.debug?.( - `openai batch ${params.label} retry ${info.attempt}/${info.maxAttempts} in ${info.delayMs}ms`, - { error: formatRetryError(info.err) }, - ); - }, - }, - ); -} - -async function submitOpenAiBatch(params: { - openAi: OpenAiEmbeddingClient; - requests: OpenAiBatchRequest[]; - agentId: string; - debug?: (message: string, data?: Record) => void; -}): Promise { - const baseUrl = getOpenAiBaseUrl(params.openAi); - const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); - const form = new FormData(); - form.append("purpose", "batch"); - form.append( - "file", - new Blob([jsonl], { type: "application/jsonl" }), - `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, - ); - - const fileRes = await fetchOpenAiWithRetry({ - openAi: params.openAi, - url: `${baseUrl}/files`, - init: { - method: "POST", - headers: getOpenAiHeaders(params.openAi, { json: false }), - body: form, - }, - label: "file upload", - debug: params.debug, - }); - const filePayload = (await fileRes.json()) as { id?: string }; - if (!filePayload.id) { - throw new Error("openai batch file upload failed: missing file id"); - } - - const batchRes = await fetchOpenAiWithRetry({ - openAi: params.openAi, - url: `${baseUrl}/batches`, - init: { - method: "POST", - headers: getOpenAiHeaders(params.openAi, { json: true }), - body: JSON.stringify({ - input_file_id: filePayload.id, - endpoint: OPENAI_BATCH_ENDPOINT, - completion_window: OPENAI_BATCH_COMPLETION_WINDOW, - metadata: { - source: "clawdbot-memory", - agent: params.agentId, - }, - }), - }, - label: "create", - debug: params.debug, - }); - return (await batchRes.json()) as OpenAiBatchStatus; -} - -async function fetchOpenAiBatchStatus(params: { - openAi: OpenAiEmbeddingClient; - batchId: string; - debug?: (message: string, data?: Record) => void; -}): Promise { - const baseUrl = getOpenAiBaseUrl(params.openAi); - const res = await fetchOpenAiWithRetry({ - openAi: params.openAi, - url: `${baseUrl}/batches/${params.batchId}`, - init: { headers: getOpenAiHeaders(params.openAi, { json: true }) }, - label: "status", - debug: params.debug, - }); - return (await res.json()) as OpenAiBatchStatus; -} - -async function fetchOpenAiFileContent(params: { - openAi: OpenAiEmbeddingClient; - fileId: string; - debug?: (message: string, data?: Record) => void; -}): Promise { - const baseUrl = getOpenAiBaseUrl(params.openAi); - const res = await fetchOpenAiWithRetry({ - openAi: params.openAi, - url: `${baseUrl}/files/${params.fileId}/content`, - init: { headers: getOpenAiHeaders(params.openAi, { json: true }) }, - label: "file content", - debug: params.debug, - }); - return await res.text(); -} - -function parseOpenAiBatchOutput(text: string): OpenAiBatchOutputLine[] { - if (!text.trim()) return []; - return text - .split("\n") - .map((line) => line.trim()) - .filter(Boolean) - .map((line) => JSON.parse(line) as OpenAiBatchOutputLine); -} - -async function readOpenAiBatchError(params: { - openAi: OpenAiEmbeddingClient; - errorFileId: string; - debug?: (message: string, data?: Record) => void; -}): Promise { - try { - const content = await fetchOpenAiFileContent({ - openAi: params.openAi, - fileId: params.errorFileId, - debug: params.debug, - }); - const lines = parseOpenAiBatchOutput(content); - const first = lines.find((line) => line.error?.message || line.response?.body?.error); - const message = - first?.error?.message ?? - (typeof first?.response?.body?.error?.message === "string" - ? first?.response?.body?.error?.message - : undefined); - return message; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - return message ? `error file unavailable: ${message}` : undefined; - } -} - -async function waitForOpenAiBatch(params: { - openAi: OpenAiEmbeddingClient; - batchId: string; - wait: boolean; - pollIntervalMs: number; - timeoutMs: number; - debug?: (message: string, data?: Record) => void; - initial?: OpenAiBatchStatus; -}): Promise<{ outputFileId: string; errorFileId?: string }> { - const start = Date.now(); - let current: OpenAiBatchStatus | undefined = params.initial; - while (true) { - const status = - current ?? - (await fetchOpenAiBatchStatus({ - openAi: params.openAi, - batchId: params.batchId, - debug: params.debug, - })); - const state = status.status ?? "unknown"; - if (state === "completed") { - if (!status.output_file_id) { - throw new Error(`openai batch ${params.batchId} completed without output file`); - } - return { - outputFileId: status.output_file_id, - errorFileId: status.error_file_id ?? undefined, - }; - } - if (["failed", "expired", "cancelled", "canceled"].includes(state)) { - const detail = status.error_file_id - ? await readOpenAiBatchError({ - openAi: params.openAi, - errorFileId: status.error_file_id, - debug: params.debug, - }) - : undefined; - const suffix = detail ? `: ${detail}` : ""; - throw new Error(`openai batch ${params.batchId} ${state}${suffix}`); - } - if (!params.wait) { - throw new Error(`openai batch ${params.batchId} still ${state}; wait disabled`); - } - if (Date.now() - start > params.timeoutMs) { - throw new Error(`openai batch ${params.batchId} timed out after ${params.timeoutMs}ms`); - } - params.debug?.(`openai batch ${params.batchId} ${state}; waiting ${params.pollIntervalMs}ms`); - await new Promise((resolve) => setTimeout(resolve, params.pollIntervalMs)); - current = undefined; - } -} - -async function runWithConcurrency(tasks: Array<() => Promise>, limit: number): Promise { - if (tasks.length === 0) return []; - const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); - const results: T[] = Array.from({ length: tasks.length }); - let next = 0; - let firstError: unknown = null; - - const workers = Array.from({ length: resolvedLimit }, async () => { - while (true) { - if (firstError) return; - const index = next; - next += 1; - if (index >= tasks.length) return; - try { - results[index] = await tasks[index](); - } catch (err) { - firstError = err; - return; - } - } - }); - - await Promise.allSettled(workers); - if (firstError) throw firstError; - return results; -} - -export async function runOpenAiEmbeddingBatches(params: { - openAi: OpenAiEmbeddingClient; - agentId: string; - requests: OpenAiBatchRequest[]; - wait: boolean; - pollIntervalMs: number; - timeoutMs: number; - concurrency: number; - debug?: (message: string, data?: Record) => void; -}): Promise> { - if (params.requests.length === 0) return new Map(); - const groups = splitOpenAiBatchRequests(params.requests); - const byCustomId = new Map(); - - const tasks = groups.map((group, groupIndex) => async () => { - const batchInfo = await submitOpenAiBatch({ - openAi: params.openAi, - requests: group, - agentId: params.agentId, - debug: params.debug, - }); - if (!batchInfo.id) { - throw new Error("openai batch create failed: missing batch id"); - } - - params.debug?.("memory embeddings: openai batch created", { - batchId: batchInfo.id, - status: batchInfo.status, - group: groupIndex + 1, - groups: groups.length, - requests: group.length, - }); - - if (!params.wait && batchInfo.status !== "completed") { - throw new Error( - `openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`, - ); - } - - const completed = - batchInfo.status === "completed" - ? { - outputFileId: batchInfo.output_file_id ?? "", - errorFileId: batchInfo.error_file_id ?? undefined, - } - : await waitForOpenAiBatch({ - openAi: params.openAi, - batchId: batchInfo.id, - wait: params.wait, - pollIntervalMs: params.pollIntervalMs, - timeoutMs: params.timeoutMs, - debug: params.debug, - initial: batchInfo, - }); - if (!completed.outputFileId) { - throw new Error(`openai batch ${batchInfo.id} completed without output file`); - } - - const content = await fetchOpenAiFileContent({ - openAi: params.openAi, - fileId: completed.outputFileId, - debug: params.debug, - }); - const outputLines = parseOpenAiBatchOutput(content); - const errors: string[] = []; - const remaining = new Set(group.map((request) => request.custom_id)); - - for (const line of outputLines) { - const customId = line.custom_id; - if (!customId) continue; - remaining.delete(customId); - if (line.error?.message) { - errors.push(`${customId}: ${line.error.message}`); - continue; - } - const response = line.response; - const statusCode = response?.status_code ?? 0; - if (statusCode >= 400) { - const message = - response?.body?.error?.message ?? - (typeof response?.body === "string" ? response.body : undefined) ?? - "unknown error"; - errors.push(`${customId}: ${message}`); - continue; - } - const data = response?.body?.data ?? []; - const embedding = data[0]?.embedding ?? []; - if (embedding.length === 0) { - errors.push(`${customId}: empty embedding`); - continue; - } - byCustomId.set(customId, embedding); - } - - if (errors.length > 0) { - throw new Error(`openai batch ${batchInfo.id} failed: ${errors.join("; ")}`); - } - if (remaining.size > 0) { - throw new Error(`openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`); - } - }); - - params.debug?.("memory embeddings: openai batch submit", { - requests: params.requests.length, - groups: groups.length, - wait: params.wait, - concurrency: params.concurrency, - pollIntervalMs: params.pollIntervalMs, - timeoutMs: params.timeoutMs, - }); - - await runWithConcurrency(tasks, params.concurrency); - return byCustomId; -} +// Deprecated: use ./batch-openai.js +export * from "./batch-openai.js";