feat: add memory indexing progress options
This commit is contained in:
@@ -28,3 +28,5 @@ clawdbot memory search "release checklist"
|
||||
## Options
|
||||
|
||||
- `--verbose`: emit debug logs during memory probes and indexing.
|
||||
- `--index-mode auto|batch|direct`: override batch usage when indexing (`direct` favors speed; `batch` favors OpenAI Batch pricing).
|
||||
- `--progress auto|line|log|none`: progress output mode (`log` prints updates even without a TTY).
|
||||
|
||||
@@ -248,6 +248,69 @@ describe("memory cli", () => {
|
||||
expect(close).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("applies index mode overrides", async () => {
|
||||
const { registerMemoryCli } = await import("./memory-cli.js");
|
||||
const close = vi.fn(async () => {});
|
||||
const sync = vi.fn(async () => {});
|
||||
const probeEmbeddingAvailability = vi.fn(async () => ({ ok: true }));
|
||||
loadConfig.mockReturnValueOnce({
|
||||
agents: {
|
||||
defaults: {
|
||||
memorySearch: {
|
||||
remote: {
|
||||
batch: { enabled: true },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
getMemorySearchManager.mockResolvedValueOnce({
|
||||
manager: {
|
||||
probeVectorAvailability: vi.fn(async () => true),
|
||||
probeEmbeddingAvailability,
|
||||
sync,
|
||||
status: () => ({
|
||||
files: 1,
|
||||
chunks: 1,
|
||||
dirty: false,
|
||||
workspaceDir: "/tmp/clawd",
|
||||
dbPath: "/tmp/memory.sqlite",
|
||||
provider: "openai",
|
||||
model: "text-embedding-3-small",
|
||||
requestedProvider: "openai",
|
||||
vector: { enabled: true, available: true },
|
||||
}),
|
||||
close,
|
||||
},
|
||||
});
|
||||
|
||||
const program = new Command();
|
||||
program.name("test");
|
||||
registerMemoryCli(program);
|
||||
await program.parseAsync(["memory", "status", "--index", "--index-mode", "direct"], {
|
||||
from: "user",
|
||||
});
|
||||
|
||||
expect(getMemorySearchManager).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
cfg: expect.objectContaining({
|
||||
agents: expect.objectContaining({
|
||||
defaults: expect.objectContaining({
|
||||
memorySearch: expect.objectContaining({
|
||||
remote: expect.objectContaining({
|
||||
batch: expect.objectContaining({ enabled: false }),
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
);
|
||||
expect(sync).toHaveBeenCalled();
|
||||
expect(probeEmbeddingAvailability).toHaveBeenCalled();
|
||||
expect(close).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("closes manager after index", async () => {
|
||||
const { registerMemoryCli } = await import("./memory-cli.js");
|
||||
const { defaultRuntime } = await import("../runtime.js");
|
||||
@@ -266,11 +329,33 @@ describe("memory cli", () => {
|
||||
registerMemoryCli(program);
|
||||
await program.parseAsync(["memory", "index"], { from: "user" });
|
||||
|
||||
expect(sync).toHaveBeenCalledWith({ reason: "cli", force: false });
|
||||
expect(sync).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ reason: "cli", force: false, progress: expect.any(Function) }),
|
||||
);
|
||||
expect(close).toHaveBeenCalled();
|
||||
expect(log).toHaveBeenCalledWith("Memory index updated.");
|
||||
});
|
||||
|
||||
it("skips progress when --progress none", async () => {
|
||||
const { registerMemoryCli } = await import("./memory-cli.js");
|
||||
const close = vi.fn(async () => {});
|
||||
const sync = vi.fn(async () => {});
|
||||
getMemorySearchManager.mockResolvedValueOnce({
|
||||
manager: {
|
||||
sync,
|
||||
close,
|
||||
},
|
||||
});
|
||||
|
||||
const program = new Command();
|
||||
program.name("test");
|
||||
registerMemoryCli(program);
|
||||
await program.parseAsync(["memory", "index", "--progress", "none"], { from: "user" });
|
||||
|
||||
expect(sync).toHaveBeenCalledWith({ reason: "cli", force: false });
|
||||
expect(close).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("logs close failures without failing the command", async () => {
|
||||
const { registerMemoryCli } = await import("./memory-cli.js");
|
||||
const { defaultRuntime } = await import("../runtime.js");
|
||||
@@ -291,7 +376,9 @@ describe("memory cli", () => {
|
||||
registerMemoryCli(program);
|
||||
await program.parseAsync(["memory", "index"], { from: "user" });
|
||||
|
||||
expect(sync).toHaveBeenCalledWith({ reason: "cli", force: false });
|
||||
expect(sync).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ reason: "cli", force: false, progress: expect.any(Function) }),
|
||||
);
|
||||
expect(close).toHaveBeenCalled();
|
||||
expect(error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Memory manager close failed: close boom"),
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import type { Command } from "commander";
|
||||
|
||||
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import type { ClawdbotConfig } from "../config/config.js";
|
||||
import type { MemorySearchConfig } from "../config/types.tools.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { setVerbose } from "../globals.js";
|
||||
import { withProgress, withProgressTotals } from "./progress.js";
|
||||
@@ -21,10 +23,14 @@ type MemoryCommandOptions = {
|
||||
json?: boolean;
|
||||
deep?: boolean;
|
||||
index?: boolean;
|
||||
indexMode?: IndexMode;
|
||||
progress?: ProgressMode;
|
||||
verbose?: boolean;
|
||||
};
|
||||
|
||||
type MemoryManager = NonNullable<MemorySearchManagerResult["manager"]>;
|
||||
type IndexMode = "auto" | "batch" | "direct";
|
||||
type ProgressMode = "auto" | "line" | "log" | "none";
|
||||
|
||||
function resolveAgent(cfg: ReturnType<typeof loadConfig>, agent?: string) {
|
||||
const trimmed = agent?.trim();
|
||||
@@ -32,6 +38,68 @@ function resolveAgent(cfg: ReturnType<typeof loadConfig>, agent?: string) {
|
||||
return resolveDefaultAgentId(cfg);
|
||||
}
|
||||
|
||||
function resolveIndexMode(raw?: string): IndexMode {
|
||||
if (!raw) return "auto";
|
||||
const trimmed = raw.trim().toLowerCase();
|
||||
if (trimmed === "batch") return "batch";
|
||||
if (trimmed === "direct") return "direct";
|
||||
return "auto";
|
||||
}
|
||||
|
||||
function resolveProgressMode(raw?: string): ProgressMode {
|
||||
if (!raw) return "auto";
|
||||
const trimmed = raw.trim().toLowerCase();
|
||||
if (trimmed === "line") return "line";
|
||||
if (trimmed === "log") return "log";
|
||||
if (trimmed === "none") return "none";
|
||||
return "auto";
|
||||
}
|
||||
|
||||
function applyIndexMode(cfg: ClawdbotConfig, agentId: string, mode: IndexMode): ClawdbotConfig {
|
||||
if (mode === "auto") return cfg;
|
||||
const enabled = mode === "batch";
|
||||
const patchMemorySearch = (memorySearch?: MemorySearchConfig) => {
|
||||
const remote = memorySearch?.remote;
|
||||
const batch = remote?.batch;
|
||||
return {
|
||||
...memorySearch,
|
||||
remote: {
|
||||
...remote,
|
||||
batch: {
|
||||
...batch,
|
||||
enabled,
|
||||
},
|
||||
},
|
||||
};
|
||||
};
|
||||
const nextAgents = { ...cfg.agents };
|
||||
nextAgents.defaults = {
|
||||
...cfg.agents?.defaults,
|
||||
memorySearch: patchMemorySearch(cfg.agents?.defaults?.memorySearch),
|
||||
};
|
||||
if (cfg.agents?.list?.length) {
|
||||
nextAgents.list = cfg.agents.list.map((agent) =>
|
||||
agent.id === agentId
|
||||
? {
|
||||
...agent,
|
||||
memorySearch: patchMemorySearch(agent.memorySearch),
|
||||
}
|
||||
: agent,
|
||||
);
|
||||
}
|
||||
return { ...cfg, agents: nextAgents };
|
||||
}
|
||||
|
||||
function resolveProgressOptions(
|
||||
mode: ProgressMode,
|
||||
verbose: boolean,
|
||||
): { enabled?: boolean; fallback?: "spinner" | "line" | "log" | "none" } {
|
||||
if (mode === "none") return { enabled: false, fallback: "none" };
|
||||
if (mode === "line") return { fallback: "line" };
|
||||
if (mode === "log") return { fallback: "log" };
|
||||
return { fallback: verbose ? "line" : undefined };
|
||||
}
|
||||
|
||||
export function registerMemoryCli(program: Command) {
|
||||
const memory = program
|
||||
.command("memory")
|
||||
@@ -49,11 +117,21 @@ export function registerMemoryCli(program: Command) {
|
||||
.option("--json", "Print JSON")
|
||||
.option("--deep", "Probe embedding provider availability")
|
||||
.option("--index", "Reindex if dirty (implies --deep)")
|
||||
.option(
|
||||
"--index-mode <mode>",
|
||||
"Index mode (auto|batch|direct) when indexing",
|
||||
"auto",
|
||||
)
|
||||
.option("--progress <mode>", "Progress output (auto|line|log|none)", "auto")
|
||||
.option("--verbose", "Verbose logging", false)
|
||||
.action(async (opts: MemoryCommandOptions) => {
|
||||
setVerbose(Boolean(opts.verbose));
|
||||
const cfg = loadConfig();
|
||||
const agentId = resolveAgent(cfg, opts.agent);
|
||||
const rawCfg = loadConfig();
|
||||
const agentId = resolveAgent(rawCfg, opts.agent);
|
||||
const indexMode = resolveIndexMode(opts.indexMode);
|
||||
const progressMode = resolveProgressMode(opts.progress);
|
||||
const progressOptions = resolveProgressOptions(progressMode, Boolean(opts.verbose));
|
||||
const cfg = applyIndexMode(rawCfg, agentId, indexMode);
|
||||
await withManager<MemoryManager>({
|
||||
getManager: () => getMemorySearchManager({ cfg, agentId }),
|
||||
onMissing: (error) => defaultRuntime.log(error ?? "Memory search disabled."),
|
||||
@@ -67,20 +145,23 @@ export function registerMemoryCli(program: Command) {
|
||||
| undefined;
|
||||
let indexError: string | undefined;
|
||||
if (deep) {
|
||||
await withProgress({ label: "Checking memory…", total: 2 }, async (progress) => {
|
||||
await withProgress(
|
||||
{ label: "Checking memory…", total: 2, ...progressOptions },
|
||||
async (progress) => {
|
||||
progress.setLabel("Probing vector…");
|
||||
await manager.probeVectorAvailability();
|
||||
progress.tick();
|
||||
progress.setLabel("Probing embeddings…");
|
||||
embeddingProbe = await manager.probeEmbeddingAvailability();
|
||||
progress.tick();
|
||||
});
|
||||
},
|
||||
);
|
||||
if (opts.index) {
|
||||
await withProgressTotals(
|
||||
{
|
||||
label: "Indexing memory…",
|
||||
total: 0,
|
||||
fallback: opts.verbose ? "line" : undefined,
|
||||
...progressOptions,
|
||||
},
|
||||
async (update, progress) => {
|
||||
try {
|
||||
@@ -223,9 +304,19 @@ export function registerMemoryCli(program: Command) {
|
||||
.description("Reindex memory files")
|
||||
.option("--agent <id>", "Agent id (default: default agent)")
|
||||
.option("--force", "Force full reindex", false)
|
||||
.option(
|
||||
"--index-mode <mode>",
|
||||
"Index mode (auto|batch|direct) when indexing",
|
||||
"auto",
|
||||
)
|
||||
.option("--progress <mode>", "Progress output (auto|line|log|none)", "auto")
|
||||
.action(async (opts: MemoryCommandOptions & { force?: boolean }) => {
|
||||
const cfg = loadConfig();
|
||||
const agentId = resolveAgent(cfg, opts.agent);
|
||||
const rawCfg = loadConfig();
|
||||
const agentId = resolveAgent(rawCfg, opts.agent);
|
||||
const indexMode = resolveIndexMode(opts.indexMode);
|
||||
const progressMode = resolveProgressMode(opts.progress);
|
||||
const progressOptions = resolveProgressOptions(progressMode, Boolean(opts.verbose));
|
||||
const cfg = applyIndexMode(rawCfg, agentId, indexMode);
|
||||
await withManager<MemoryManager>({
|
||||
getManager: () => getMemorySearchManager({ cfg, agentId }),
|
||||
onMissing: (error) => defaultRuntime.log(error ?? "Memory search disabled."),
|
||||
@@ -234,7 +325,31 @@ export function registerMemoryCli(program: Command) {
|
||||
close: (manager) => manager.close(),
|
||||
run: async (manager) => {
|
||||
try {
|
||||
await manager.sync({ reason: "cli", force: opts.force });
|
||||
if (progressMode === "none") {
|
||||
await manager.sync({ reason: "cli", force: opts.force });
|
||||
} else {
|
||||
await withProgressTotals(
|
||||
{
|
||||
label: "Indexing memory…",
|
||||
total: 0,
|
||||
...progressOptions,
|
||||
},
|
||||
async (update, progress) => {
|
||||
await manager.sync({
|
||||
reason: "cli",
|
||||
force: opts.force,
|
||||
progress: (syncUpdate) => {
|
||||
update({
|
||||
completed: syncUpdate.completed,
|
||||
total: syncUpdate.total,
|
||||
label: syncUpdate.label,
|
||||
});
|
||||
if (syncUpdate.label) progress.setLabel(syncUpdate.label);
|
||||
},
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
defaultRuntime.log("Memory index updated.");
|
||||
} catch (err) {
|
||||
const message = formatErrorMessage(err);
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
} from "../../daemon/constants.js";
|
||||
import { resolveGatewayLogPaths } from "../../daemon/launchd.js";
|
||||
import { resolveNodeService } from "../../daemon/node-service.js";
|
||||
import type { GatewayServiceRuntime } from "../../daemon/service-runtime.js";
|
||||
import { isSystemdUserServiceAvailable } from "../../daemon/systemd.js";
|
||||
import { renderSystemdUnavailableHints } from "../../daemon/systemd-hints.js";
|
||||
import { resolveIsNixMode } from "../../config/paths.js";
|
||||
@@ -492,7 +493,11 @@ export async function runNodeDaemonStatus(opts: NodeDaemonStatusOptions = {}) {
|
||||
const [loaded, command, runtime] = await Promise.all([
|
||||
service.isLoaded({ env: process.env }).catch(() => false),
|
||||
service.readCommand(process.env).catch(() => null),
|
||||
service.readRuntime(process.env).catch((err) => ({ status: "unknown", detail: String(err) })),
|
||||
service
|
||||
.readRuntime(process.env)
|
||||
.catch(
|
||||
(err): GatewayServiceRuntime => ({ status: "unknown", detail: String(err) }),
|
||||
),
|
||||
]);
|
||||
|
||||
const payload = {
|
||||
|
||||
47
src/cli/progress.test.ts
Normal file
47
src/cli/progress.test.ts
Normal file
@@ -0,0 +1,47 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { createCliProgress } from "./progress.js";
|
||||
|
||||
describe("cli progress", () => {
|
||||
it("logs progress when non-tty and fallback=log", () => {
|
||||
const writes: string[] = [];
|
||||
const stream = {
|
||||
isTTY: false,
|
||||
write: vi.fn((chunk: string) => {
|
||||
writes.push(chunk);
|
||||
}),
|
||||
} as unknown as NodeJS.WriteStream;
|
||||
|
||||
const progress = createCliProgress({
|
||||
label: "Indexing memory...",
|
||||
total: 10,
|
||||
stream,
|
||||
fallback: "log",
|
||||
});
|
||||
progress.setPercent(50);
|
||||
progress.done();
|
||||
|
||||
const output = writes.join("");
|
||||
expect(output).toContain("Indexing memory... 0%");
|
||||
expect(output).toContain("Indexing memory... 50%");
|
||||
});
|
||||
|
||||
it("does not log without a tty when fallback is none", () => {
|
||||
const write = vi.fn();
|
||||
const stream = {
|
||||
isTTY: false,
|
||||
write,
|
||||
} as unknown as NodeJS.WriteStream;
|
||||
|
||||
const progress = createCliProgress({
|
||||
label: "Nope",
|
||||
total: 2,
|
||||
stream,
|
||||
fallback: "none",
|
||||
});
|
||||
progress.setPercent(50);
|
||||
progress.done();
|
||||
|
||||
expect(write).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -17,7 +17,7 @@ type ProgressOptions = {
|
||||
enabled?: boolean;
|
||||
delayMs?: number;
|
||||
stream?: NodeJS.WriteStream;
|
||||
fallback?: "spinner" | "line" | "none";
|
||||
fallback?: "spinner" | "line" | "log" | "none";
|
||||
};
|
||||
|
||||
export type ProgressReporter = {
|
||||
@@ -45,12 +45,14 @@ export function createCliProgress(options: ProgressOptions): ProgressReporter {
|
||||
if (activeProgress > 0) return noopReporter;
|
||||
|
||||
const stream = options.stream ?? process.stderr;
|
||||
if (!stream.isTTY) return noopReporter;
|
||||
const isTty = stream.isTTY;
|
||||
const allowLog = !isTty && options.fallback === "log";
|
||||
if (!isTty && !allowLog) return noopReporter;
|
||||
|
||||
const delayMs = typeof options.delayMs === "number" ? options.delayMs : DEFAULT_DELAY_MS;
|
||||
const canOsc = supportsOscProgress(process.env, stream.isTTY);
|
||||
const allowSpinner = options.fallback === undefined || options.fallback === "spinner";
|
||||
const allowLine = options.fallback === "line";
|
||||
const canOsc = isTty && supportsOscProgress(process.env, isTty);
|
||||
const allowSpinner = isTty && (options.fallback === undefined || options.fallback === "spinner");
|
||||
const allowLine = isTty && options.fallback === "line";
|
||||
|
||||
let started = false;
|
||||
let label = options.label;
|
||||
@@ -61,7 +63,9 @@ export function createCliProgress(options: ProgressOptions): ProgressReporter {
|
||||
options.indeterminate ?? (options.total === undefined || options.total === null);
|
||||
|
||||
activeProgress += 1;
|
||||
registerActiveProgressLine(stream);
|
||||
if (isTty) {
|
||||
registerActiveProgressLine(stream);
|
||||
}
|
||||
|
||||
const controller = canOsc
|
||||
? createOscProgressController({
|
||||
@@ -80,6 +84,23 @@ export function createCliProgress(options: ProgressOptions): ProgressReporter {
|
||||
stream.write(`${theme.accent(label)}${suffix}`);
|
||||
}
|
||||
: null;
|
||||
const renderLog = allowLog
|
||||
? (() => {
|
||||
let lastLine = "";
|
||||
let lastAt = 0;
|
||||
const throttleMs = 250;
|
||||
return () => {
|
||||
if (!started) return;
|
||||
const suffix = indeterminate ? "" : ` ${percent}%`;
|
||||
const nextLine = `${label}${suffix}`;
|
||||
const now = Date.now();
|
||||
if (nextLine === lastLine && now - lastAt < throttleMs) return;
|
||||
lastLine = nextLine;
|
||||
lastAt = now;
|
||||
stream.write(`${nextLine}\n`);
|
||||
};
|
||||
})()
|
||||
: null;
|
||||
let timer: NodeJS.Timeout | null = null;
|
||||
|
||||
const applyState = () => {
|
||||
@@ -94,6 +115,9 @@ export function createCliProgress(options: ProgressOptions): ProgressReporter {
|
||||
if (renderLine) {
|
||||
renderLine();
|
||||
}
|
||||
if (renderLog) {
|
||||
renderLog();
|
||||
}
|
||||
};
|
||||
|
||||
const start = () => {
|
||||
@@ -141,7 +165,9 @@ export function createCliProgress(options: ProgressOptions): ProgressReporter {
|
||||
if (controller) controller.clear();
|
||||
if (spin) spin.stop();
|
||||
clearActiveProgressLine();
|
||||
unregisterActiveProgressLine(stream);
|
||||
if (isTty) {
|
||||
unregisterActiveProgressLine(stream);
|
||||
}
|
||||
activeProgress = Math.max(0, activeProgress - 1);
|
||||
};
|
||||
|
||||
|
||||
@@ -91,13 +91,13 @@ export class BridgeClient {
|
||||
socket.on("connect", () => {
|
||||
this.sendHello();
|
||||
});
|
||||
socket.on("error", (err) => {
|
||||
socket.on("error", (err: Error) => {
|
||||
this.handleDisconnect(err);
|
||||
});
|
||||
socket.on("close", () => {
|
||||
this.handleDisconnect();
|
||||
});
|
||||
socket.on("data", (chunk) => {
|
||||
socket.on("data", (chunk: Buffer) => {
|
||||
this.buffer += chunk.toString("utf8");
|
||||
this.flush();
|
||||
});
|
||||
@@ -124,7 +124,7 @@ export class BridgeClient {
|
||||
}
|
||||
this.connected = false;
|
||||
this.pendingRpc.forEach((pending) => {
|
||||
pending.timer && clearTimeout(pending.timer);
|
||||
if (pending.timer) clearTimeout(pending.timer);
|
||||
pending.reject(new Error("bridge client closed"));
|
||||
});
|
||||
this.pendingRpc.clear();
|
||||
@@ -213,7 +213,7 @@ export class BridgeClient {
|
||||
this.connected = false;
|
||||
this.socket = null;
|
||||
this.pendingRpc.forEach((pending) => {
|
||||
pending.timer && clearTimeout(pending.timer);
|
||||
if (pending.timer) clearTimeout(pending.timer);
|
||||
pending.reject(err ?? new Error("bridge connection closed"));
|
||||
});
|
||||
this.pendingRpc.clear();
|
||||
@@ -286,7 +286,7 @@ export class BridgeClient {
|
||||
const res = frame as BridgeRPCResponseFrame;
|
||||
const pending = this.pendingRpc.get(res.id);
|
||||
if (pending) {
|
||||
pending.timer && clearTimeout(pending.timer);
|
||||
if (pending.timer) clearTimeout(pending.timer);
|
||||
this.pendingRpc.delete(res.id);
|
||||
pending.resolve(res);
|
||||
}
|
||||
|
||||
@@ -335,7 +335,9 @@ export async function runNodeHost(opts: NodeHostRunOptions): Promise<void> {
|
||||
|
||||
const skillBins = new SkillBinsCache(async () => {
|
||||
const res = await client.request("skills.bins", {});
|
||||
const bins = Array.isArray(res?.bins) ? res.bins.map((b) => String(b)) : [];
|
||||
const bins = Array.isArray(res?.bins)
|
||||
? res.bins.map((bin: unknown) => String(bin))
|
||||
: [];
|
||||
return bins;
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user