feat: speed up memory batch indexing
This commit is contained in:
@@ -8,6 +8,8 @@ Docs: https://docs.clawd.bot
|
||||
- Tools: allow `sessions_spawn` to override thinking level for sub-agent runs.
|
||||
- Channels: unify thread/topic allowlist matching + command/mention gating helpers across core providers.
|
||||
- Models: add Qwen Portal OAuth provider support. (#1120) — thanks @mukhtharcm.
|
||||
- Memory: add `--verbose` logging for memory status + batch indexing details.
|
||||
- Memory: allow parallel OpenAI batch indexing jobs (default concurrency: 2).
|
||||
|
||||
### Fixes
|
||||
- Memory: apply OpenAI batch defaults even without explicit remote config.
|
||||
|
||||
@@ -18,6 +18,11 @@ Related:
|
||||
clawdbot memory status
|
||||
clawdbot memory status --deep
|
||||
clawdbot memory status --deep --index
|
||||
clawdbot memory status --deep --index --verbose
|
||||
clawdbot memory index
|
||||
clawdbot memory search "release checklist"
|
||||
```
|
||||
|
||||
## Options
|
||||
|
||||
- `--verbose`: emit debug logs during memory probes and indexing.
|
||||
|
||||
@@ -111,8 +111,16 @@ If you don't want to set an API key, use `memorySearch.provider = "local"` or se
|
||||
Batch indexing (OpenAI only):
|
||||
- Enabled by default for OpenAI embeddings. Set `agents.defaults.memorySearch.remote.batch.enabled = false` to disable.
|
||||
- Default behavior waits for batch completion; tune `remote.batch.wait`, `remote.batch.pollIntervalMs`, and `remote.batch.timeoutMinutes` if needed.
|
||||
- Set `remote.batch.concurrency` to control how many batch jobs we submit in parallel (default: 2).
|
||||
- Batch mode currently applies only when `memorySearch.provider = "openai"` and uses your OpenAI API key.
|
||||
|
||||
Why OpenAI batch is fast + cheap:
|
||||
- For large backfills, OpenAI is typically the fastest option we support because we can submit many embedding requests in a single batch job and let OpenAI process them asynchronously.
|
||||
- OpenAI offers discounted pricing for Batch API workloads, so large indexing runs are usually cheaper than sending the same requests synchronously.
|
||||
- See the OpenAI Batch API docs and pricing for details:
|
||||
- https://platform.openai.com/docs/api-reference/batch
|
||||
- https://platform.openai.com/pricing
|
||||
|
||||
Config example:
|
||||
|
||||
```json5
|
||||
@@ -123,7 +131,7 @@ agents: {
|
||||
model: "text-embedding-3-small",
|
||||
fallback: "openai",
|
||||
remote: {
|
||||
batch: { enabled: false }
|
||||
batch: { enabled: true, concurrency: 2 }
|
||||
},
|
||||
sync: { watch: true }
|
||||
}
|
||||
|
||||
@@ -81,6 +81,7 @@ describe("memory search config", () => {
|
||||
expect(resolved?.remote?.batch).toEqual({
|
||||
enabled: true,
|
||||
wait: true,
|
||||
concurrency: 2,
|
||||
pollIntervalMs: 5000,
|
||||
timeoutMinutes: 60,
|
||||
});
|
||||
@@ -133,6 +134,7 @@ describe("memory search config", () => {
|
||||
batch: {
|
||||
enabled: true,
|
||||
wait: true,
|
||||
concurrency: 2,
|
||||
pollIntervalMs: 5000,
|
||||
timeoutMinutes: 60,
|
||||
},
|
||||
|
||||
@@ -17,6 +17,7 @@ export type ResolvedMemorySearchConfig = {
|
||||
batch?: {
|
||||
enabled: boolean;
|
||||
wait: boolean;
|
||||
concurrency: number;
|
||||
pollIntervalMs: number;
|
||||
timeoutMinutes: number;
|
||||
};
|
||||
@@ -99,6 +100,10 @@ function mergeConfig(
|
||||
const batch = {
|
||||
enabled: overrides?.remote?.batch?.enabled ?? defaults?.remote?.batch?.enabled ?? true,
|
||||
wait: overrides?.remote?.batch?.wait ?? defaults?.remote?.batch?.wait ?? true,
|
||||
concurrency: Math.max(
|
||||
1,
|
||||
overrides?.remote?.batch?.concurrency ?? defaults?.remote?.batch?.concurrency ?? 2,
|
||||
),
|
||||
pollIntervalMs:
|
||||
overrides?.remote?.batch?.pollIntervalMs ?? defaults?.remote?.batch?.pollIntervalMs ?? 5000,
|
||||
timeoutMinutes:
|
||||
|
||||
@@ -17,10 +17,12 @@ vi.mock("../agents/agent-scope.js", () => ({
|
||||
resolveDefaultAgentId,
|
||||
}));
|
||||
|
||||
afterEach(() => {
|
||||
afterEach(async () => {
|
||||
vi.restoreAllMocks();
|
||||
getMemorySearchManager.mockReset();
|
||||
process.exitCode = undefined;
|
||||
const { setVerbose } = await import("../globals.js");
|
||||
setVerbose(false);
|
||||
});
|
||||
|
||||
describe("memory cli", () => {
|
||||
@@ -135,6 +137,36 @@ describe("memory cli", () => {
|
||||
expect(close).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("enables verbose logging with --verbose", async () => {
|
||||
const { registerMemoryCli } = await import("./memory-cli.js");
|
||||
const { isVerbose } = await import("../globals.js");
|
||||
const close = vi.fn(async () => {});
|
||||
getMemorySearchManager.mockResolvedValueOnce({
|
||||
manager: {
|
||||
probeVectorAvailability: vi.fn(async () => true),
|
||||
status: () => ({
|
||||
files: 0,
|
||||
chunks: 0,
|
||||
dirty: false,
|
||||
workspaceDir: "/tmp/clawd",
|
||||
dbPath: "/tmp/memory.sqlite",
|
||||
provider: "openai",
|
||||
model: "text-embedding-3-small",
|
||||
requestedProvider: "openai",
|
||||
vector: { enabled: true, available: true },
|
||||
}),
|
||||
close,
|
||||
},
|
||||
});
|
||||
|
||||
const program = new Command();
|
||||
program.name("test");
|
||||
registerMemoryCli(program);
|
||||
await program.parseAsync(["memory", "status", "--verbose"], { from: "user" });
|
||||
|
||||
expect(isVerbose()).toBe(true);
|
||||
});
|
||||
|
||||
it("logs close failure after status", async () => {
|
||||
const { registerMemoryCli } = await import("./memory-cli.js");
|
||||
const { defaultRuntime } = await import("../runtime.js");
|
||||
|
||||
@@ -2,6 +2,7 @@ import type { Command } from "commander";
|
||||
|
||||
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { setVerbose } from "../globals.js";
|
||||
import { withProgress, withProgressTotals } from "./progress.js";
|
||||
import { formatErrorMessage, withManager } from "./cli-utils.js";
|
||||
import { getMemorySearchManager, type MemorySearchManagerResult } from "../memory/index.js";
|
||||
@@ -14,6 +15,7 @@ type MemoryCommandOptions = {
|
||||
json?: boolean;
|
||||
deep?: boolean;
|
||||
index?: boolean;
|
||||
verbose?: boolean;
|
||||
};
|
||||
|
||||
type MemoryManager = NonNullable<MemorySearchManagerResult["manager"]>;
|
||||
@@ -41,7 +43,9 @@ export function registerMemoryCli(program: Command) {
|
||||
.option("--json", "Print JSON")
|
||||
.option("--deep", "Probe embedding provider availability")
|
||||
.option("--index", "Reindex if dirty (implies --deep)")
|
||||
.option("--verbose", "Verbose logging", false)
|
||||
.action(async (opts: MemoryCommandOptions) => {
|
||||
setVerbose(Boolean(opts.verbose));
|
||||
const cfg = loadConfig();
|
||||
const agentId = resolveAgent(cfg, opts.agent);
|
||||
await withManager<MemoryManager>({
|
||||
|
||||
@@ -175,6 +175,7 @@ const FIELD_LABELS: Record<string, string> = {
|
||||
"agents.defaults.memorySearch.remote.baseUrl": "Remote Embedding Base URL",
|
||||
"agents.defaults.memorySearch.remote.apiKey": "Remote Embedding API Key",
|
||||
"agents.defaults.memorySearch.remote.headers": "Remote Embedding Headers",
|
||||
"agents.defaults.memorySearch.remote.batch.concurrency": "Remote Batch Concurrency",
|
||||
"agents.defaults.memorySearch.model": "Memory Search Model",
|
||||
"agents.defaults.memorySearch.fallback": "Memory Search Fallback",
|
||||
"agents.defaults.memorySearch.local.modelPath": "Local Embedding Model Path",
|
||||
@@ -370,6 +371,8 @@ const FIELD_HELP: Record<string, string> = {
|
||||
"Enable OpenAI Batch API for memory embeddings (default: true).",
|
||||
"agents.defaults.memorySearch.remote.batch.wait":
|
||||
"Wait for OpenAI batch completion when indexing (default: true).",
|
||||
"agents.defaults.memorySearch.remote.batch.concurrency":
|
||||
"Max concurrent OpenAI batch jobs for memory indexing (default: 2).",
|
||||
"agents.defaults.memorySearch.remote.batch.pollIntervalMs":
|
||||
"Polling interval in ms for OpenAI batch status (default: 5000).",
|
||||
"agents.defaults.memorySearch.remote.batch.timeoutMinutes":
|
||||
|
||||
@@ -163,6 +163,8 @@ export type MemorySearchConfig = {
|
||||
enabled?: boolean;
|
||||
/** Wait for batch completion (default: true). */
|
||||
wait?: boolean;
|
||||
/** Max concurrent batch jobs (default: 2). */
|
||||
concurrency?: number;
|
||||
/** Poll interval in ms (default: 5000). */
|
||||
pollIntervalMs?: number;
|
||||
/** Timeout in minutes (default: 60). */
|
||||
|
||||
@@ -210,6 +210,7 @@ export const MemorySearchSchema = z
|
||||
.object({
|
||||
enabled: z.boolean().optional(),
|
||||
wait: z.boolean().optional(),
|
||||
concurrency: z.number().int().positive().optional(),
|
||||
pollIntervalMs: z.number().int().nonnegative().optional(),
|
||||
timeoutMinutes: z.number().int().positive().optional(),
|
||||
})
|
||||
|
||||
@@ -137,6 +137,7 @@ export class MemoryIndexManager {
|
||||
private readonly batch: {
|
||||
enabled: boolean;
|
||||
wait: boolean;
|
||||
concurrency: number;
|
||||
pollIntervalMs: number;
|
||||
timeoutMs: number;
|
||||
};
|
||||
@@ -234,6 +235,7 @@ export class MemoryIndexManager {
|
||||
this.batch = {
|
||||
enabled: Boolean(batch?.enabled && this.openAi && this.provider.id === "openai"),
|
||||
wait: batch?.wait ?? true,
|
||||
concurrency: Math.max(1, batch?.concurrency ?? 2),
|
||||
pollIntervalMs: batch?.pollIntervalMs ?? 5000,
|
||||
timeoutMs: (batch?.timeoutMinutes ?? 60) * 60 * 1000,
|
||||
};
|
||||
@@ -730,6 +732,12 @@ export class MemoryIndexManager {
|
||||
const fileEntries = await Promise.all(
|
||||
files.map(async (file) => buildFileEntry(file, this.workspaceDir)),
|
||||
);
|
||||
log.debug("memory sync: indexing memory files", {
|
||||
files: fileEntries.length,
|
||||
needsFullReindex: params.needsFullReindex,
|
||||
batch: this.batch.enabled,
|
||||
concurrency: this.getIndexConcurrency(),
|
||||
});
|
||||
const activePaths = new Set(fileEntries.map((entry) => entry.path));
|
||||
if (params.progress) {
|
||||
params.progress.total += fileEntries.length;
|
||||
@@ -782,6 +790,13 @@ export class MemoryIndexManager {
|
||||
const files = await this.listSessionFiles();
|
||||
const activePaths = new Set(files.map((file) => this.sessionPathForFile(file)));
|
||||
const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0;
|
||||
log.debug("memory sync: indexing session files", {
|
||||
files: files.length,
|
||||
indexAll,
|
||||
dirtyFiles: this.sessionsDirtyFiles.size,
|
||||
batch: this.batch.enabled,
|
||||
concurrency: this.getIndexConcurrency(),
|
||||
});
|
||||
if (params.progress) {
|
||||
params.progress.total += files.length;
|
||||
params.progress.report({
|
||||
@@ -1270,6 +1285,7 @@ export class MemoryIndexManager {
|
||||
if (Date.now() - start > this.batch.timeoutMs) {
|
||||
throw new Error(`openai batch ${batchId} timed out after ${this.batch.timeoutMs}ms`);
|
||||
}
|
||||
log.debug(`openai batch ${batchId} ${state}; waiting ${this.batch.pollIntervalMs}ms`);
|
||||
await new Promise((resolve) => setTimeout(resolve, this.batch.pollIntervalMs));
|
||||
current = undefined;
|
||||
}
|
||||
@@ -1287,13 +1303,30 @@ export class MemoryIndexManager {
|
||||
|
||||
const { requests, mapping } = this.buildOpenAiBatchRequests(chunks, entry, source);
|
||||
const groups = this.splitOpenAiBatchRequests(requests);
|
||||
log.debug("memory embeddings: openai batch submit", {
|
||||
source,
|
||||
chunks: chunks.length,
|
||||
requests: requests.length,
|
||||
groups: groups.length,
|
||||
wait: this.batch.wait,
|
||||
concurrency: this.batch.concurrency,
|
||||
pollIntervalMs: this.batch.pollIntervalMs,
|
||||
timeoutMs: this.batch.timeoutMs,
|
||||
});
|
||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
||||
|
||||
for (const group of groups) {
|
||||
const tasks = groups.map((group, groupIndex) => async () => {
|
||||
const batchInfo = await this.submitOpenAiBatch(group);
|
||||
if (!batchInfo.id) {
|
||||
throw new Error("openai batch create failed: missing batch id");
|
||||
}
|
||||
log.debug("memory embeddings: openai batch created", {
|
||||
batchId: batchInfo.id,
|
||||
status: batchInfo.status,
|
||||
group: groupIndex + 1,
|
||||
groups: groups.length,
|
||||
requests: group.length,
|
||||
});
|
||||
if (!this.batch.wait && batchInfo.status !== "completed") {
|
||||
throw new Error(
|
||||
`openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`,
|
||||
@@ -1349,7 +1382,8 @@ export class MemoryIndexManager {
|
||||
`openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`,
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
await this.runWithConcurrency(tasks, this.batch.concurrency);
|
||||
|
||||
return embeddings;
|
||||
}
|
||||
@@ -1412,7 +1446,7 @@ export class MemoryIndexManager {
|
||||
}
|
||||
|
||||
private getIndexConcurrency(): number {
|
||||
return this.batch.enabled ? 1 : EMBEDDING_INDEX_CONCURRENCY;
|
||||
return this.batch.enabled ? this.batch.concurrency : EMBEDDING_INDEX_CONCURRENCY;
|
||||
}
|
||||
|
||||
private async indexFile(
|
||||
|
||||
Reference in New Issue
Block a user