From 835162fb622b05e39c2a8b2223a9b4c4627e85b3 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 18 Jan 2026 16:08:19 +0000 Subject: [PATCH] fix: retry openai batch indexing --- src/memory/manager.batch.test.ts | 98 +++++++++++++++++++++ src/memory/openai-batch.ts | 147 +++++++++++++++++++++++-------- 2 files changed, 208 insertions(+), 37 deletions(-) diff --git a/src/memory/manager.batch.test.ts b/src/memory/manager.batch.test.ts index 9dc28283d..e1c76d80b 100644 --- a/src/memory/manager.batch.test.ts +++ b/src/memory/manager.batch.test.ts @@ -148,4 +148,102 @@ describe("memory indexing with OpenAI batches", () => { expect(fetchMock).toHaveBeenCalled(); expect(labels.some((label) => label.toLowerCase().includes("batch"))).toBe(true); }); + + it("retries OpenAI batch create on transient failures", async () => { + const content = ["retry", "the", "batch"].join("\n\n"); + await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-08.md"), content); + + let uploadedRequests: Array<{ custom_id?: string }> = []; + let batchCreates = 0; + const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { + const url = + typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + if (url.endsWith("/files")) { + const body = init?.body; + if (!(body instanceof FormData)) { + throw new Error("expected FormData upload"); + } + for (const [key, value] of body.entries()) { + if (key !== "file") continue; + if (typeof value === "string") { + uploadedRequests = value + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as { custom_id?: string }); + } else { + const text = await value.text(); + uploadedRequests = text + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as { custom_id?: string }); + } + } + return new Response(JSON.stringify({ id: "file_1" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + if (url.endsWith("/batches")) { + batchCreates += 1; + if (batchCreates === 1) { + return new Response("upstream connect error", { status: 503 }); + } + return new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + if (url.endsWith("/batches/batch_1")) { + return new Response( + JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ); + } + if (url.endsWith("/files/file_out/content")) { + const lines = uploadedRequests.map((request, index) => + JSON.stringify({ + custom_id: request.custom_id, + response: { + status_code: 200, + body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] }, + }, + }), + ); + return new Response(lines.join("\n"), { + status: 200, + headers: { "Content-Type": "application/jsonl" }, + }); + } + throw new Error(`unexpected fetch ${url}`); + }); + + vi.stubGlobal("fetch", fetchMock); + + const cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "text-embedding-3-small", + store: { path: indexPath }, + sync: { watch: false, onSessionStart: false, onSearch: false }, + query: { minScore: 0 }, + remote: { batch: { enabled: true, wait: true } }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + + const result = await getMemorySearchManager({ cfg, agentId: "main" }); + expect(result.manager).not.toBeNull(); + if (!result.manager) throw new Error("manager missing"); + manager = result.manager; + await manager.sync({ force: true }); + + const status = manager.status(); + expect(status.chunks).toBeGreaterThan(0); + expect(batchCreates).toBe(2); + }); }); diff --git a/src/memory/openai-batch.ts b/src/memory/openai-batch.ts index 6810bef97..69851f810 100644 --- a/src/memory/openai-batch.ts +++ b/src/memory/openai-batch.ts @@ -1,3 +1,4 @@ +import { retryAsync } from "../infra/retry.js"; import type { OpenAiEmbeddingClient } from "./embeddings.js"; import { hashText } from "./internal.js"; @@ -33,6 +34,30 @@ export type OpenAiBatchOutputLine = { export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings"; const OPENAI_BATCH_COMPLETION_WINDOW = "24h"; const OPENAI_BATCH_MAX_REQUESTS = 50000; +const OPENAI_BATCH_RETRY = { + attempts: 3, + minDelayMs: 500, + maxDelayMs: 5000, + jitter: 0.1, +}; + +type RetryableError = Error & { status?: number }; + +function isRetryableBatchError(err: unknown): boolean { + const status = + typeof (err as RetryableError)?.status === "number" ? (err as RetryableError).status : undefined; + if (typeof status === "number") { + return status === 429 || status >= 500; + } + const message = err instanceof Error ? err.message : String(err); + return /timeout|timed out|ECONNRESET|ECONNREFUSED|EHOSTUNREACH|ENOTFOUND|EAI_AGAIN|network|fetch failed|upstream connect/i.test( + message, + ); +} + +function formatRetryError(err: unknown): string { + return err instanceof Error ? err.message : String(err); +} function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string { return openAi.baseUrl?.replace(/\/$/, "") ?? ""; @@ -63,10 +88,43 @@ function splitOpenAiBatchRequests(requests: OpenAiBatchRequest[]): OpenAiBatchRe return groups; } +async function fetchOpenAiWithRetry(params: { + openAi: OpenAiEmbeddingClient; + url: string; + init?: RequestInit; + label: string; + debug?: (message: string, data?: Record) => void; +}): Promise { + return await retryAsync( + async () => { + const res = await fetch(params.url, params.init); + if (!res.ok) { + const text = await res.text(); + const err = new Error(`openai batch ${params.label} failed: ${res.status} ${text}`); + (err as RetryableError).status = res.status; + throw err; + } + return res; + }, + { + ...OPENAI_BATCH_RETRY, + label: params.label, + shouldRetry: isRetryableBatchError, + onRetry: (info) => { + params.debug?.( + `openai batch ${params.label} retry ${info.attempt}/${info.maxAttempts} in ${info.delayMs}ms`, + { error: formatRetryError(info.err) }, + ); + }, + }, + ); +} + async function submitOpenAiBatch(params: { openAi: OpenAiEmbeddingClient; requests: OpenAiBatchRequest[]; agentId: string; + debug?: (message: string, data?: Record) => void; }): Promise { const baseUrl = getOpenAiBaseUrl(params.openAi); const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); @@ -78,67 +136,73 @@ async function submitOpenAiBatch(params: { `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, ); - const fileRes = await fetch(`${baseUrl}/files`, { - method: "POST", - headers: getOpenAiHeaders(params.openAi, { json: false }), - body: form, + const fileRes = await fetchOpenAiWithRetry({ + openAi: params.openAi, + url: `${baseUrl}/files`, + init: { + method: "POST", + headers: getOpenAiHeaders(params.openAi, { json: false }), + body: form, + }, + label: "file upload", + debug: params.debug, }); - if (!fileRes.ok) { - const text = await fileRes.text(); - throw new Error(`openai batch file upload failed: ${fileRes.status} ${text}`); - } const filePayload = (await fileRes.json()) as { id?: string }; if (!filePayload.id) { throw new Error("openai batch file upload failed: missing file id"); } - const batchRes = await fetch(`${baseUrl}/batches`, { - method: "POST", - headers: getOpenAiHeaders(params.openAi, { json: true }), - body: JSON.stringify({ - input_file_id: filePayload.id, - endpoint: OPENAI_BATCH_ENDPOINT, - completion_window: OPENAI_BATCH_COMPLETION_WINDOW, - metadata: { - source: "clawdbot-memory", - agent: params.agentId, - }, - }), + const batchRes = await fetchOpenAiWithRetry({ + openAi: params.openAi, + url: `${baseUrl}/batches`, + init: { + method: "POST", + headers: getOpenAiHeaders(params.openAi, { json: true }), + body: JSON.stringify({ + input_file_id: filePayload.id, + endpoint: OPENAI_BATCH_ENDPOINT, + completion_window: OPENAI_BATCH_COMPLETION_WINDOW, + metadata: { + source: "clawdbot-memory", + agent: params.agentId, + }, + }), + }, + label: "create", + debug: params.debug, }); - if (!batchRes.ok) { - const text = await batchRes.text(); - throw new Error(`openai batch create failed: ${batchRes.status} ${text}`); - } return (await batchRes.json()) as OpenAiBatchStatus; } async function fetchOpenAiBatchStatus(params: { openAi: OpenAiEmbeddingClient; batchId: string; + debug?: (message: string, data?: Record) => void; }): Promise { const baseUrl = getOpenAiBaseUrl(params.openAi); - const res = await fetch(`${baseUrl}/batches/${params.batchId}`, { - headers: getOpenAiHeaders(params.openAi, { json: true }), + const res = await fetchOpenAiWithRetry({ + openAi: params.openAi, + url: `${baseUrl}/batches/${params.batchId}`, + init: { headers: getOpenAiHeaders(params.openAi, { json: true }) }, + label: "status", + debug: params.debug, }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`openai batch status failed: ${res.status} ${text}`); - } return (await res.json()) as OpenAiBatchStatus; } async function fetchOpenAiFileContent(params: { openAi: OpenAiEmbeddingClient; fileId: string; + debug?: (message: string, data?: Record) => void; }): Promise { const baseUrl = getOpenAiBaseUrl(params.openAi); - const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, { - headers: getOpenAiHeaders(params.openAi, { json: true }), + const res = await fetchOpenAiWithRetry({ + openAi: params.openAi, + url: `${baseUrl}/files/${params.fileId}/content`, + init: { headers: getOpenAiHeaders(params.openAi, { json: true }) }, + label: "file content", + debug: params.debug, }); - if (!res.ok) { - const text = await res.text(); - throw new Error(`openai batch file content failed: ${res.status} ${text}`); - } return await res.text(); } @@ -154,11 +218,13 @@ function parseOpenAiBatchOutput(text: string): OpenAiBatchOutputLine[] { async function readOpenAiBatchError(params: { openAi: OpenAiEmbeddingClient; errorFileId: string; + debug?: (message: string, data?: Record) => void; }): Promise { try { const content = await fetchOpenAiFileContent({ openAi: params.openAi, fileId: params.errorFileId, + debug: params.debug, }); const lines = parseOpenAiBatchOutput(content); const first = lines.find((line) => line.error?.message || line.response?.body?.error); @@ -191,6 +257,7 @@ async function waitForOpenAiBatch(params: { (await fetchOpenAiBatchStatus({ openAi: params.openAi, batchId: params.batchId, + debug: params.debug, })); const state = status.status ?? "unknown"; if (state === "completed") { @@ -204,7 +271,11 @@ async function waitForOpenAiBatch(params: { } if (["failed", "expired", "cancelled", "canceled"].includes(state)) { const detail = status.error_file_id - ? await readOpenAiBatchError({ openAi: params.openAi, errorFileId: status.error_file_id }) + ? await readOpenAiBatchError({ + openAi: params.openAi, + errorFileId: status.error_file_id, + debug: params.debug, + }) : undefined; const suffix = detail ? `: ${detail}` : ""; throw new Error(`openai batch ${params.batchId} ${state}${suffix}`); @@ -267,6 +338,7 @@ export async function runOpenAiEmbeddingBatches(params: { openAi: params.openAi, requests: group, agentId: params.agentId, + debug: params.debug, }); if (!batchInfo.id) { throw new Error("openai batch create failed: missing batch id"); @@ -308,6 +380,7 @@ export async function runOpenAiEmbeddingBatches(params: { const content = await fetchOpenAiFileContent({ openAi: params.openAi, fileId: completed.outputFileId, + debug: params.debug, }); const outputLines = parseOpenAiBatchOutput(content); const errors: string[] = [];