fix: retry openai batch indexing

This commit is contained in:
Peter Steinberger
2026-01-18 16:08:19 +00:00
parent 82883095fe
commit 835162fb62
2 changed files with 208 additions and 37 deletions

View File

@@ -148,4 +148,102 @@ describe("memory indexing with OpenAI batches", () => {
expect(fetchMock).toHaveBeenCalled();
expect(labels.some((label) => label.toLowerCase().includes("batch"))).toBe(true);
});
it("retries OpenAI batch create on transient failures", async () => {
const content = ["retry", "the", "batch"].join("\n\n");
await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-08.md"), content);
let uploadedRequests: Array<{ custom_id?: string }> = [];
let batchCreates = 0;
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url =
typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
if (url.endsWith("/files")) {
const body = init?.body;
if (!(body instanceof FormData)) {
throw new Error("expected FormData upload");
}
for (const [key, value] of body.entries()) {
if (key !== "file") continue;
if (typeof value === "string") {
uploadedRequests = value
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as { custom_id?: string });
} else {
const text = await value.text();
uploadedRequests = text
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as { custom_id?: string });
}
}
return new Response(JSON.stringify({ id: "file_1" }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
if (url.endsWith("/batches")) {
batchCreates += 1;
if (batchCreates === 1) {
return new Response("upstream connect error", { status: 503 });
}
return new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
if (url.endsWith("/batches/batch_1")) {
return new Response(
JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
}
if (url.endsWith("/files/file_out/content")) {
const lines = uploadedRequests.map((request, index) =>
JSON.stringify({
custom_id: request.custom_id,
response: {
status_code: 200,
body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] },
},
}),
);
return new Response(lines.join("\n"), {
status: 200,
headers: { "Content-Type": "application/jsonl" },
});
}
throw new Error(`unexpected fetch ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
const cfg = {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "text-embedding-3-small",
store: { path: indexPath },
sync: { watch: false, onSessionStart: false, onSearch: false },
query: { minScore: 0 },
remote: { batch: { enabled: true, wait: true } },
},
},
list: [{ id: "main", default: true }],
},
};
const result = await getMemorySearchManager({ cfg, agentId: "main" });
expect(result.manager).not.toBeNull();
if (!result.manager) throw new Error("manager missing");
manager = result.manager;
await manager.sync({ force: true });
const status = manager.status();
expect(status.chunks).toBeGreaterThan(0);
expect(batchCreates).toBe(2);
});
});

View File

@@ -1,3 +1,4 @@
import { retryAsync } from "../infra/retry.js";
import type { OpenAiEmbeddingClient } from "./embeddings.js";
import { hashText } from "./internal.js";
@@ -33,6 +34,30 @@ export type OpenAiBatchOutputLine = {
export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings";
const OPENAI_BATCH_COMPLETION_WINDOW = "24h";
const OPENAI_BATCH_MAX_REQUESTS = 50000;
const OPENAI_BATCH_RETRY = {
attempts: 3,
minDelayMs: 500,
maxDelayMs: 5000,
jitter: 0.1,
};
type RetryableError = Error & { status?: number };
function isRetryableBatchError(err: unknown): boolean {
const status =
typeof (err as RetryableError)?.status === "number" ? (err as RetryableError).status : undefined;
if (typeof status === "number") {
return status === 429 || status >= 500;
}
const message = err instanceof Error ? err.message : String(err);
return /timeout|timed out|ECONNRESET|ECONNREFUSED|EHOSTUNREACH|ENOTFOUND|EAI_AGAIN|network|fetch failed|upstream connect/i.test(
message,
);
}
function formatRetryError(err: unknown): string {
return err instanceof Error ? err.message : String(err);
}
function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string {
return openAi.baseUrl?.replace(/\/$/, "") ?? "";
@@ -63,10 +88,43 @@ function splitOpenAiBatchRequests(requests: OpenAiBatchRequest[]): OpenAiBatchRe
return groups;
}
async function fetchOpenAiWithRetry(params: {
openAi: OpenAiEmbeddingClient;
url: string;
init?: RequestInit;
label: string;
debug?: (message: string, data?: Record<string, unknown>) => void;
}): Promise<Response> {
return await retryAsync(
async () => {
const res = await fetch(params.url, params.init);
if (!res.ok) {
const text = await res.text();
const err = new Error(`openai batch ${params.label} failed: ${res.status} ${text}`);
(err as RetryableError).status = res.status;
throw err;
}
return res;
},
{
...OPENAI_BATCH_RETRY,
label: params.label,
shouldRetry: isRetryableBatchError,
onRetry: (info) => {
params.debug?.(
`openai batch ${params.label} retry ${info.attempt}/${info.maxAttempts} in ${info.delayMs}ms`,
{ error: formatRetryError(info.err) },
);
},
},
);
}
async function submitOpenAiBatch(params: {
openAi: OpenAiEmbeddingClient;
requests: OpenAiBatchRequest[];
agentId: string;
debug?: (message: string, data?: Record<string, unknown>) => void;
}): Promise<OpenAiBatchStatus> {
const baseUrl = getOpenAiBaseUrl(params.openAi);
const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n");
@@ -78,67 +136,73 @@ async function submitOpenAiBatch(params: {
`memory-embeddings.${hashText(String(Date.now()))}.jsonl`,
);
const fileRes = await fetch(`${baseUrl}/files`, {
method: "POST",
headers: getOpenAiHeaders(params.openAi, { json: false }),
body: form,
const fileRes = await fetchOpenAiWithRetry({
openAi: params.openAi,
url: `${baseUrl}/files`,
init: {
method: "POST",
headers: getOpenAiHeaders(params.openAi, { json: false }),
body: form,
},
label: "file upload",
debug: params.debug,
});
if (!fileRes.ok) {
const text = await fileRes.text();
throw new Error(`openai batch file upload failed: ${fileRes.status} ${text}`);
}
const filePayload = (await fileRes.json()) as { id?: string };
if (!filePayload.id) {
throw new Error("openai batch file upload failed: missing file id");
}
const batchRes = await fetch(`${baseUrl}/batches`, {
method: "POST",
headers: getOpenAiHeaders(params.openAi, { json: true }),
body: JSON.stringify({
input_file_id: filePayload.id,
endpoint: OPENAI_BATCH_ENDPOINT,
completion_window: OPENAI_BATCH_COMPLETION_WINDOW,
metadata: {
source: "clawdbot-memory",
agent: params.agentId,
},
}),
const batchRes = await fetchOpenAiWithRetry({
openAi: params.openAi,
url: `${baseUrl}/batches`,
init: {
method: "POST",
headers: getOpenAiHeaders(params.openAi, { json: true }),
body: JSON.stringify({
input_file_id: filePayload.id,
endpoint: OPENAI_BATCH_ENDPOINT,
completion_window: OPENAI_BATCH_COMPLETION_WINDOW,
metadata: {
source: "clawdbot-memory",
agent: params.agentId,
},
}),
},
label: "create",
debug: params.debug,
});
if (!batchRes.ok) {
const text = await batchRes.text();
throw new Error(`openai batch create failed: ${batchRes.status} ${text}`);
}
return (await batchRes.json()) as OpenAiBatchStatus;
}
async function fetchOpenAiBatchStatus(params: {
openAi: OpenAiEmbeddingClient;
batchId: string;
debug?: (message: string, data?: Record<string, unknown>) => void;
}): Promise<OpenAiBatchStatus> {
const baseUrl = getOpenAiBaseUrl(params.openAi);
const res = await fetch(`${baseUrl}/batches/${params.batchId}`, {
headers: getOpenAiHeaders(params.openAi, { json: true }),
const res = await fetchOpenAiWithRetry({
openAi: params.openAi,
url: `${baseUrl}/batches/${params.batchId}`,
init: { headers: getOpenAiHeaders(params.openAi, { json: true }) },
label: "status",
debug: params.debug,
});
if (!res.ok) {
const text = await res.text();
throw new Error(`openai batch status failed: ${res.status} ${text}`);
}
return (await res.json()) as OpenAiBatchStatus;
}
async function fetchOpenAiFileContent(params: {
openAi: OpenAiEmbeddingClient;
fileId: string;
debug?: (message: string, data?: Record<string, unknown>) => void;
}): Promise<string> {
const baseUrl = getOpenAiBaseUrl(params.openAi);
const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, {
headers: getOpenAiHeaders(params.openAi, { json: true }),
const res = await fetchOpenAiWithRetry({
openAi: params.openAi,
url: `${baseUrl}/files/${params.fileId}/content`,
init: { headers: getOpenAiHeaders(params.openAi, { json: true }) },
label: "file content",
debug: params.debug,
});
if (!res.ok) {
const text = await res.text();
throw new Error(`openai batch file content failed: ${res.status} ${text}`);
}
return await res.text();
}
@@ -154,11 +218,13 @@ function parseOpenAiBatchOutput(text: string): OpenAiBatchOutputLine[] {
async function readOpenAiBatchError(params: {
openAi: OpenAiEmbeddingClient;
errorFileId: string;
debug?: (message: string, data?: Record<string, unknown>) => void;
}): Promise<string | undefined> {
try {
const content = await fetchOpenAiFileContent({
openAi: params.openAi,
fileId: params.errorFileId,
debug: params.debug,
});
const lines = parseOpenAiBatchOutput(content);
const first = lines.find((line) => line.error?.message || line.response?.body?.error);
@@ -191,6 +257,7 @@ async function waitForOpenAiBatch(params: {
(await fetchOpenAiBatchStatus({
openAi: params.openAi,
batchId: params.batchId,
debug: params.debug,
}));
const state = status.status ?? "unknown";
if (state === "completed") {
@@ -204,7 +271,11 @@ async function waitForOpenAiBatch(params: {
}
if (["failed", "expired", "cancelled", "canceled"].includes(state)) {
const detail = status.error_file_id
? await readOpenAiBatchError({ openAi: params.openAi, errorFileId: status.error_file_id })
? await readOpenAiBatchError({
openAi: params.openAi,
errorFileId: status.error_file_id,
debug: params.debug,
})
: undefined;
const suffix = detail ? `: ${detail}` : "";
throw new Error(`openai batch ${params.batchId} ${state}${suffix}`);
@@ -267,6 +338,7 @@ export async function runOpenAiEmbeddingBatches(params: {
openAi: params.openAi,
requests: group,
agentId: params.agentId,
debug: params.debug,
});
if (!batchInfo.id) {
throw new Error("openai batch create failed: missing batch id");
@@ -308,6 +380,7 @@ export async function runOpenAiEmbeddingBatches(params: {
const content = await fetchOpenAiFileContent({
openAi: params.openAi,
fileId: completed.outputFileId,
debug: params.debug,
});
const outputLines = parseOpenAiBatchOutput(content);
const errors: string[] = [];