fix: improve memory status and batch fallback

This commit is contained in:
Peter Steinberger
2026-01-19 22:48:45 +00:00
parent 39dfdccf6c
commit 4bac76e66d
5 changed files with 564 additions and 25 deletions

View File

@@ -70,7 +70,19 @@ export async function listMemoryFiles(workspaceDir: string): Promise<string[]> {
if (await exists(memoryDir)) {
await walkDir(memoryDir, result);
}
return result;
if (result.length <= 1) return result;
const seen = new Set<string>();
const deduped: string[] = [];
for (const entry of result) {
let key = entry;
try {
key = await fs.realpath(entry);
} catch {}
if (seen.has(key)) continue;
seen.add(key);
deduped.push(entry);
}
return deduped;
}
export function hashText(value: string): string {

View File

@@ -34,6 +34,9 @@ describe("memory indexing with OpenAI batches", () => {
beforeEach(async () => {
embedBatch.mockClear();
embedQuery.mockClear();
embedBatch.mockImplementation(async (texts: string[]) =>
texts.map((_text, index) => [index + 1, 0, 0]),
);
workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-mem-batch-"));
indexPath = path.join(workspaceDir, "index.sqlite");
await fs.mkdir(path.join(workspaceDir, "memory"));
@@ -246,4 +249,218 @@ describe("memory indexing with OpenAI batches", () => {
expect(status.chunks).toBeGreaterThan(0);
expect(batchCreates).toBe(2);
});
it("falls back to non-batch on failure and resets failures after success", async () => {
const content = ["flaky", "batch"].join("\n\n");
await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-09.md"), content);
let uploadedRequests: Array<{ custom_id?: string }> = [];
let mode: "fail" | "ok" = "fail";
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url =
typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
if (url.endsWith("/files")) {
const body = init?.body;
if (!(body instanceof FormData)) {
throw new Error("expected FormData upload");
}
for (const [key, value] of body.entries()) {
if (key !== "file") continue;
if (typeof value === "string") {
uploadedRequests = value
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as { custom_id?: string });
} else {
const text = await value.text();
uploadedRequests = text
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as { custom_id?: string });
}
}
return new Response(JSON.stringify({ id: "file_1" }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
if (url.endsWith("/batches")) {
if (mode === "fail") {
return new Response("batch failed", { status: 500 });
}
return new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
if (url.endsWith("/batches/batch_1")) {
return new Response(
JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
}
if (url.endsWith("/files/file_out/content")) {
const lines = uploadedRequests.map((request, index) =>
JSON.stringify({
custom_id: request.custom_id,
response: {
status_code: 200,
body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] },
},
}),
);
return new Response(lines.join("\n"), {
status: 200,
headers: { "Content-Type": "application/jsonl" },
});
}
throw new Error(`unexpected fetch ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
const cfg = {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "text-embedding-3-small",
store: { path: indexPath },
sync: { watch: false, onSessionStart: false, onSearch: false },
query: { minScore: 0 },
remote: { batch: { enabled: true, wait: true } },
},
},
list: [{ id: "main", default: true }],
},
};
const result = await getMemorySearchManager({ cfg, agentId: "main" });
expect(result.manager).not.toBeNull();
if (!result.manager) throw new Error("manager missing");
manager = result.manager;
await manager.sync({ force: true });
expect(embedBatch).toHaveBeenCalled();
let status = manager.status();
expect(status.batch?.enabled).toBe(true);
expect(status.batch?.failures).toBe(1);
embedBatch.mockClear();
mode = "ok";
await fs.writeFile(
path.join(workspaceDir, "memory", "2026-01-09.md"),
["flaky", "batch", "recovery"].join("\n\n"),
);
await manager.sync({ force: true });
status = manager.status();
expect(status.batch?.enabled).toBe(true);
expect(status.batch?.failures).toBe(0);
expect(embedBatch).not.toHaveBeenCalled();
});
it("disables batch after repeated failures and skips batch thereafter", async () => {
const content = ["repeat", "failures"].join("\n\n");
await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-10.md"), content);
let uploadedRequests: Array<{ custom_id?: string }> = [];
const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => {
const url =
typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
if (url.endsWith("/files")) {
const body = init?.body;
if (!(body instanceof FormData)) {
throw new Error("expected FormData upload");
}
for (const [key, value] of body.entries()) {
if (key !== "file") continue;
if (typeof value === "string") {
uploadedRequests = value
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as { custom_id?: string });
} else {
const text = await value.text();
uploadedRequests = text
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as { custom_id?: string });
}
}
return new Response(JSON.stringify({ id: "file_1" }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
if (url.endsWith("/batches")) {
return new Response("batch failed", { status: 500 });
}
if (url.endsWith("/files/file_out/content")) {
const lines = uploadedRequests.map((request, index) =>
JSON.stringify({
custom_id: request.custom_id,
response: {
status_code: 200,
body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] },
},
}),
);
return new Response(lines.join("\n"), {
status: 200,
headers: { "Content-Type": "application/jsonl" },
});
}
throw new Error(`unexpected fetch ${url}`);
});
vi.stubGlobal("fetch", fetchMock);
const cfg = {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "text-embedding-3-small",
store: { path: indexPath },
sync: { watch: false, onSessionStart: false, onSearch: false },
query: { minScore: 0 },
remote: { batch: { enabled: true, wait: true } },
},
},
list: [{ id: "main", default: true }],
},
};
const result = await getMemorySearchManager({ cfg, agentId: "main" });
expect(result.manager).not.toBeNull();
if (!result.manager) throw new Error("manager missing");
manager = result.manager;
await manager.sync({ force: true });
let status = manager.status();
expect(status.batch?.enabled).toBe(true);
expect(status.batch?.failures).toBe(1);
embedBatch.mockClear();
await fs.writeFile(
path.join(workspaceDir, "memory", "2026-01-10.md"),
["repeat", "failures", "again"].join("\n\n"),
);
await manager.sync({ force: true });
status = manager.status();
expect(status.batch?.enabled).toBe(false);
expect(status.batch?.failures).toBeGreaterThanOrEqual(2);
const fetchCalls = fetchMock.mock.calls.length;
embedBatch.mockClear();
await fs.writeFile(
path.join(workspaceDir, "memory", "2026-01-10.md"),
["repeat", "failures", "fallback"].join("\n\n"),
);
await manager.sync({ force: true });
expect(fetchMock.mock.calls.length).toBe(fetchCalls);
expect(embedBatch).toHaveBeenCalled();
});
});

View File

@@ -100,6 +100,7 @@ const EMBEDDING_INDEX_CONCURRENCY = 4;
const EMBEDDING_RETRY_MAX_ATTEMPTS = 3;
const EMBEDDING_RETRY_BASE_DELAY_MS = 500;
const EMBEDDING_RETRY_MAX_DELAY_MS = 8000;
const BATCH_FAILURE_LIMIT = 2;
const log = createSubsystemLogger("memory");
@@ -127,6 +128,10 @@ export class MemoryIndexManager {
pollIntervalMs: number;
timeoutMs: number;
};
private batchFailureCount = 0;
private batchFailureLastError?: string;
private batchFailureLastProvider?: string;
private batchFailureLock: Promise<void> = Promise.resolve();
private db: DatabaseSync;
private readonly sources: Set<MemorySource>;
private providerKey: string;
@@ -419,6 +424,17 @@ export class MemoryIndexManager {
loadError?: string;
dims?: number;
};
batch?: {
enabled: boolean;
failures: number;
limit: number;
wait: boolean;
concurrency: number;
pollIntervalMs: number;
timeoutMs: number;
lastError?: string;
lastProvider?: string;
};
} {
const sourceFilter = this.buildSourceFilter();
const files = this.db
@@ -498,6 +514,17 @@ export class MemoryIndexManager {
loadError: this.vector.loadError,
dims: this.vector.dims,
},
batch: {
enabled: this.batch.enabled,
failures: this.batchFailureCount,
limit: BATCH_FAILURE_LIMIT,
wait: this.batch.wait,
concurrency: this.batch.concurrency,
pollIntervalMs: this.batch.pollIntervalMs,
timeoutMs: this.batch.timeoutMs,
lastError: this.batchFailureLastError,
lastProvider: this.batchFailureLastProvider,
},
};
}
@@ -1538,7 +1565,8 @@ export class MemoryIndexManager {
entry: MemoryFileEntry | SessionFileEntry,
source: MemorySource,
): Promise<number[][]> {
if (!this.openAi) {
const openAi = this.openAi;
if (!openAi) {
return this.embedChunksInBatches(chunks);
}
if (chunks.length === 0) return [];
@@ -1576,16 +1604,23 @@ export class MemoryIndexManager {
},
});
}
const byCustomId = await runOpenAiEmbeddingBatches({
openAi: this.openAi,
agentId: this.agentId,
requests,
wait: this.batch.wait,
concurrency: this.batch.concurrency,
pollIntervalMs: this.batch.pollIntervalMs,
timeoutMs: this.batch.timeoutMs,
debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }),
const batchResult = await this.runBatchWithFallback({
provider: "openai",
run: async () =>
await runOpenAiEmbeddingBatches({
openAi,
agentId: this.agentId,
requests,
wait: this.batch.wait,
concurrency: this.batch.concurrency,
pollIntervalMs: this.batch.pollIntervalMs,
timeoutMs: this.batch.timeoutMs,
debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }),
}),
fallback: async () => await this.embedChunksInBatches(chunks),
});
if (Array.isArray(batchResult)) return batchResult;
const byCustomId = batchResult;
const toCache: Array<{ hash: string; embedding: number[] }> = [];
for (const [customId, embedding] of byCustomId.entries()) {
@@ -1603,7 +1638,8 @@ export class MemoryIndexManager {
entry: MemoryFileEntry | SessionFileEntry,
source: MemorySource,
): Promise<number[][]> {
if (!this.gemini) {
const gemini = this.gemini;
if (!gemini) {
return this.embedChunksInBatches(chunks);
}
if (chunks.length === 0) return [];
@@ -1638,16 +1674,23 @@ export class MemoryIndexManager {
});
}
const byCustomId = await runGeminiEmbeddingBatches({
gemini: this.gemini,
agentId: this.agentId,
requests,
wait: this.batch.wait,
concurrency: this.batch.concurrency,
pollIntervalMs: this.batch.pollIntervalMs,
timeoutMs: this.batch.timeoutMs,
debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }),
const batchResult = await this.runBatchWithFallback({
provider: "gemini",
run: async () =>
await runGeminiEmbeddingBatches({
gemini,
agentId: this.agentId,
requests,
wait: this.batch.wait,
concurrency: this.batch.concurrency,
pollIntervalMs: this.batch.pollIntervalMs,
timeoutMs: this.batch.timeoutMs,
debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }),
}),
fallback: async () => await this.embedChunksInBatches(chunks),
});
if (Array.isArray(batchResult)) return batchResult;
const byCustomId = batchResult;
const toCache: Array<{ hash: string; embedding: number[] }> = [];
for (const [customId, embedding] of byCustomId.entries()) {
@@ -1717,6 +1760,111 @@ export class MemoryIndexManager {
return results;
}
private async withBatchFailureLock<T>(fn: () => Promise<T>): Promise<T> {
let release: () => void;
const wait = this.batchFailureLock;
this.batchFailureLock = new Promise<void>((resolve) => {
release = resolve;
});
await wait;
try {
return await fn();
} finally {
release!();
}
}
private async resetBatchFailureCount(): Promise<void> {
await this.withBatchFailureLock(async () => {
if (this.batchFailureCount > 0) {
log.debug("memory embeddings: batch recovered; resetting failure count");
}
this.batchFailureCount = 0;
this.batchFailureLastError = undefined;
this.batchFailureLastProvider = undefined;
});
}
private async recordBatchFailure(params: {
provider: string;
message: string;
attempts?: number;
forceDisable?: boolean;
}): Promise<{ disabled: boolean; count: number }> {
return await this.withBatchFailureLock(async () => {
if (!this.batch.enabled) {
return { disabled: true, count: this.batchFailureCount };
}
const increment = params.forceDisable ? BATCH_FAILURE_LIMIT : Math.max(1, params.attempts ?? 1);
this.batchFailureCount += increment;
this.batchFailureLastError = params.message;
this.batchFailureLastProvider = params.provider;
const disabled = params.forceDisable || this.batchFailureCount >= BATCH_FAILURE_LIMIT;
if (disabled) {
this.batch.enabled = false;
}
return { disabled, count: this.batchFailureCount };
});
}
private isBatchTimeoutError(message: string): boolean {
return /timed out|timeout/i.test(message);
}
private async runBatchWithTimeoutRetry<T>(params: {
provider: string;
run: () => Promise<T>;
}): Promise<T> {
try {
return await params.run();
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
if (this.isBatchTimeoutError(message)) {
log.warn(`memory embeddings: ${params.provider} batch timed out; retrying once`);
try {
return await params.run();
} catch (retryErr) {
(retryErr as { batchAttempts?: number }).batchAttempts = 2;
throw retryErr;
}
}
throw err;
}
}
private async runBatchWithFallback<T>(params: {
provider: string;
run: () => Promise<T>;
fallback: () => Promise<number[][]>;
}): Promise<T | number[][]> {
if (!this.batch.enabled) {
return await params.fallback();
}
try {
const result = await this.runBatchWithTimeoutRetry({
provider: params.provider,
run: params.run,
});
await this.resetBatchFailureCount();
return result;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const attempts = (err as { batchAttempts?: number }).batchAttempts ?? 1;
const forceDisable = /asyncBatchEmbedContent not available/i.test(message);
const failure = await this.recordBatchFailure({
provider: params.provider,
message,
attempts,
forceDisable,
});
const suffix = failure.disabled ? "disabling batch" : "keeping batch enabled";
log.warn(
`memory embeddings: ${params.provider} batch failed (${failure.count}/${BATCH_FAILURE_LIMIT}); ${suffix}; falling back to non-batch embeddings: ${message}`,
);
return await params.fallback();
}
}
private getIndexConcurrency(): number {
return this.batch.enabled ? this.batch.concurrency : EMBEDDING_INDEX_CONCURRENCY;
}