From 8479dc97daaf3d34133b958a16a8e7168e98d4a0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 21 Jan 2026 10:37:52 +0000 Subject: [PATCH] fix: make session memory indexing async --- CHANGELOG.md | 2 + docs/concepts/memory.md | 22 +++- src/agents/memory-search.test.ts | 17 +++ src/agents/memory-search.ts | 26 +++- src/config/schema.ts | 8 +- src/config/types.tools.ts | 6 + src/config/zod-schema.agent-runtime.ts | 7 ++ src/memory/index.test.ts | 4 + src/memory/manager.async-search.test.ts | 82 +++++++++++++ src/memory/manager.ts | 157 ++++++++++++++++++++++-- 10 files changed, 316 insertions(+), 15 deletions(-) create mode 100644 src/memory/manager.async-search.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 12ab594e7..3d3d6f201 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ Docs: https://docs.clawd.bot - macOS: exec approvals now respect wildcard agent allowlists (`*`). - UI: remove the chat stop button and keep the composer aligned to the bottom edge. - Agents: add diagnostics cache trace config and fix cache trace logging edge cases. (#1370) — thanks @parubets. +- Agents: scrub Anthropic refusal test token from prompts and add a live refusal regression probe. +- Memory: make session memory indexing async and delta-gated to avoid blocking searches. ## 2026.1.20 diff --git a/docs/concepts/memory.md b/docs/concepts/memory.md index 03d09a19c..81320e112 100644 --- a/docs/concepts/memory.md +++ b/docs/concepts/memory.md @@ -194,7 +194,7 @@ Local mode: - File type: Markdown only (`MEMORY.md`, `memory/**/*.md`). - Index storage: per-agent SQLite at `~/.clawdbot/memory/.sqlite` (configurable via `agents.defaults.memorySearch.store.path`, supports `{agentId}` token). -- Freshness: watcher on `MEMORY.md` + `memory/` marks the index dirty (debounce 1.5s). Sync runs on session start, on first search when dirty, and optionally on an interval. +- Freshness: watcher on `MEMORY.md` + `memory/` marks the index dirty (debounce 1.5s). Sync is scheduled on session start, on search, or on an interval and runs asynchronously. Session transcripts use delta thresholds to trigger background sync. - Reindex triggers: the index stores the embedding **provider/model + endpoint fingerprint + chunking params**. If any of those change, Clawdbot automatically resets and reindexes the entire store. ### Hybrid search (BM25 + vector) @@ -299,11 +299,29 @@ agents: { Notes: - Session indexing is **opt-in** (off by default). -- Session updates are debounced and indexed lazily on the next `memory_search` (or manual `clawdbot memory index`). +- Session updates are debounced and **indexed asynchronously** once they cross delta thresholds (best-effort). +- `memory_search` never blocks on indexing; results can be slightly stale until background sync finishes. - Results still include snippets only; `memory_get` remains limited to memory files. - Session indexing is isolated per agent (only that agent’s session logs are indexed). - Session logs live on disk (`~/.clawdbot/agents//sessions/*.jsonl`). Any process/user with filesystem access can read them, so treat disk access as the trust boundary. For stricter isolation, run agents under separate OS users or hosts. +Delta thresholds (defaults shown): + +```json5 +agents: { + defaults: { + memorySearch: { + sync: { + sessions: { + deltaBytes: 100000, // ~100 KB + deltaMessages: 50 // JSONL lines + } + } + } + } +} +``` + ### SQLite vector acceleration (sqlite-vec) When the sqlite-vec extension is available, Clawdbot stores embeddings in a diff --git a/src/agents/memory-search.test.ts b/src/agents/memory-search.test.ts index 2670298b9..e6b86ea3d 100644 --- a/src/agents/memory-search.test.ts +++ b/src/agents/memory-search.test.ts @@ -136,6 +136,23 @@ describe("memory search config", () => { }); }); + it("defaults session delta thresholds", () => { + const cfg = { + agents: { + defaults: { + memorySearch: { + provider: "openai", + }, + }, + }, + }; + const resolved = resolveMemorySearchConfig(cfg, "main"); + expect(resolved?.sync.sessions).toEqual({ + deltaBytes: 100000, + deltaMessages: 50, + }); + }); + it("merges remote defaults with agent overrides", () => { const cfg = { agents: { diff --git a/src/agents/memory-search.ts b/src/agents/memory-search.ts index d42333927..c89bad422 100644 --- a/src/agents/memory-search.ts +++ b/src/agents/memory-search.ts @@ -49,6 +49,10 @@ export type ResolvedMemorySearchConfig = { watch: boolean; watchDebounceMs: number; intervalMinutes: number; + sessions: { + deltaBytes: number; + deltaMessages: number; + }; }; query: { maxResults: number; @@ -71,6 +75,8 @@ const DEFAULT_GEMINI_MODEL = "gemini-embedding-001"; const DEFAULT_CHUNK_TOKENS = 400; const DEFAULT_CHUNK_OVERLAP = 80; const DEFAULT_WATCH_DEBOUNCE_MS = 1500; +const DEFAULT_SESSION_DELTA_BYTES = 100_000; +const DEFAULT_SESSION_DELTA_MESSAGES = 50; const DEFAULT_MAX_RESULTS = 6; const DEFAULT_MIN_SCORE = 0.35; const DEFAULT_HYBRID_ENABLED = true; @@ -172,6 +178,16 @@ function mergeConfig( defaults?.sync?.watchDebounceMs ?? DEFAULT_WATCH_DEBOUNCE_MS, intervalMinutes: overrides?.sync?.intervalMinutes ?? defaults?.sync?.intervalMinutes ?? 0, + sessions: { + deltaBytes: + overrides?.sync?.sessions?.deltaBytes ?? + defaults?.sync?.sessions?.deltaBytes ?? + DEFAULT_SESSION_DELTA_BYTES, + deltaMessages: + overrides?.sync?.sessions?.deltaMessages ?? + defaults?.sync?.sessions?.deltaMessages ?? + DEFAULT_SESSION_DELTA_MESSAGES, + }, }; const query = { maxResults: overrides?.query?.maxResults ?? defaults?.query?.maxResults ?? DEFAULT_MAX_RESULTS, @@ -208,6 +224,8 @@ function mergeConfig( const normalizedVectorWeight = sum > 0 ? vectorWeight / sum : DEFAULT_HYBRID_VECTOR_WEIGHT; const normalizedTextWeight = sum > 0 ? textWeight / sum : DEFAULT_HYBRID_TEXT_WEIGHT; const candidateMultiplier = clampInt(hybrid.candidateMultiplier, 1, 20); + const deltaBytes = clampInt(sync.sessions.deltaBytes, 0, Number.MAX_SAFE_INTEGER); + const deltaMessages = clampInt(sync.sessions.deltaMessages, 0, Number.MAX_SAFE_INTEGER); return { enabled, sources, @@ -221,7 +239,13 @@ function mergeConfig( local, store, chunking: { tokens: Math.max(1, chunking.tokens), overlap }, - sync, + sync: { + ...sync, + sessions: { + deltaBytes, + deltaMessages, + }, + }, query: { ...query, minScore, diff --git a/src/config/schema.ts b/src/config/schema.ts index a0955e47f..1ba527439 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -220,6 +220,8 @@ const FIELD_LABELS: Record = { "agents.defaults.memorySearch.sync.onSearch": "Index on Search (Lazy)", "agents.defaults.memorySearch.sync.watch": "Watch Memory Files", "agents.defaults.memorySearch.sync.watchDebounceMs": "Memory Watch Debounce (ms)", + "agents.defaults.memorySearch.sync.sessions.deltaBytes": "Session Delta Bytes", + "agents.defaults.memorySearch.sync.sessions.deltaMessages": "Session Delta Messages", "agents.defaults.memorySearch.query.maxResults": "Memory Search Max Results", "agents.defaults.memorySearch.query.minScore": "Memory Search Min Score", "agents.defaults.memorySearch.query.hybrid.enabled": "Memory Search Hybrid", @@ -474,8 +476,12 @@ const FIELD_HELP: Record = { "agents.defaults.memorySearch.cache.maxEntries": "Optional cap on cached embeddings (best-effort).", "agents.defaults.memorySearch.sync.onSearch": - "Lazy sync: reindex on first search after a change.", + "Lazy sync: schedule a reindex on search after changes.", "agents.defaults.memorySearch.sync.watch": "Watch memory files for changes (chokidar).", + "agents.defaults.memorySearch.sync.sessions.deltaBytes": + "Minimum appended bytes before session transcripts trigger reindex (default: 100000).", + "agents.defaults.memorySearch.sync.sessions.deltaMessages": + "Minimum appended JSONL lines before session transcripts trigger reindex (default: 50).", "plugins.enabled": "Enable plugin/extension loading (default: true).", "plugins.allow": "Optional allowlist of plugin ids; when set, only listed plugins load.", "plugins.deny": "Optional denylist of plugin ids; deny wins over allowlist.", diff --git a/src/config/types.tools.ts b/src/config/types.tools.ts index 7d7b3a57d..b68d9f5dd 100644 --- a/src/config/types.tools.ts +++ b/src/config/types.tools.ts @@ -244,6 +244,12 @@ export type MemorySearchConfig = { watch?: boolean; watchDebounceMs?: number; intervalMinutes?: number; + sessions?: { + /** Minimum appended bytes before session transcripts are reindexed. */ + deltaBytes?: number; + /** Minimum appended JSONL lines before session transcripts are reindexed. */ + deltaMessages?: number; + }; }; /** Query behavior. */ query?: { diff --git a/src/config/zod-schema.agent-runtime.ts b/src/config/zod-schema.agent-runtime.ts index 716ba02d9..40dea6eb4 100644 --- a/src/config/zod-schema.agent-runtime.ts +++ b/src/config/zod-schema.agent-runtime.ts @@ -305,6 +305,13 @@ export const MemorySearchSchema = z watch: z.boolean().optional(), watchDebounceMs: z.number().int().nonnegative().optional(), intervalMinutes: z.number().int().nonnegative().optional(), + sessions: z + .object({ + deltaBytes: z.number().int().nonnegative().optional(), + deltaMessages: z.number().int().nonnegative().optional(), + }) + .strict() + .optional(), }) .strict() .optional(), diff --git a/src/memory/index.test.ts b/src/memory/index.test.ts index 498cf7119..5510e1688 100644 --- a/src/memory/index.test.ts +++ b/src/memory/index.test.ts @@ -153,6 +153,7 @@ describe("memory index", () => { expect(second.manager).not.toBeNull(); if (!second.manager) throw new Error("manager missing"); manager = second.manager; + await second.manager.sync({ reason: "test" }); const results = await second.manager.search("alpha"); expect(results.length).toBeGreaterThan(0); }); @@ -250,6 +251,7 @@ describe("memory index", () => { const status = manager.status(); if (!status.fts?.available) return; + await manager.sync({ force: true }); const results = await manager.search("zebra"); expect(results.length).toBeGreaterThan(0); expect(results[0]?.path).toContain("memory/2026-01-12.md"); @@ -298,6 +300,7 @@ describe("memory index", () => { const status = manager.status(); if (!status.fts?.available) return; + await manager.sync({ force: true }); const results = await manager.search("alpha beta id123"); expect(results.length).toBeGreaterThan(0); const paths = results.map((r) => r.path); @@ -351,6 +354,7 @@ describe("memory index", () => { const status = manager.status(); if (!status.fts?.available) return; + await manager.sync({ force: true }); const results = await manager.search("alpha beta id123"); expect(results.length).toBeGreaterThan(0); const paths = results.map((r) => r.path); diff --git a/src/memory/manager.async-search.test.ts b/src/memory/manager.async-search.test.ts new file mode 100644 index 000000000..1efe714e6 --- /dev/null +++ b/src/memory/manager.async-search.test.ts @@ -0,0 +1,82 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { getMemorySearchManager, type MemoryIndexManager } from "./index.js"; + +const embedBatch = vi.fn(async () => []); +const embedQuery = vi.fn(async () => [0.2, 0.2, 0.2]); + +vi.mock("./embeddings.js", () => ({ + createEmbeddingProvider: async () => ({ + requestedProvider: "openai", + provider: { + id: "openai", + model: "text-embedding-3-small", + embedQuery, + embedBatch, + }, + openAi: { + baseUrl: "https://api.openai.com/v1", + headers: { Authorization: "Bearer test", "Content-Type": "application/json" }, + model: "text-embedding-3-small", + }, + }), +})); + +describe("memory search async sync", () => { + let workspaceDir: string; + let indexPath: string; + let manager: MemoryIndexManager | null = null; + + beforeEach(async () => { + workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-mem-async-")); + indexPath = path.join(workspaceDir, "index.sqlite"); + await fs.mkdir(path.join(workspaceDir, "memory")); + await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-07.md"), "hello\n"); + }); + + afterEach(async () => { + vi.unstubAllGlobals(); + if (manager) { + await manager.close(); + manager = null; + } + await fs.rm(workspaceDir, { recursive: true, force: true }); + }); + + it("does not await sync when searching", async () => { + const cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "text-embedding-3-small", + store: { path: indexPath }, + sync: { watch: false, onSessionStart: false, onSearch: true }, + query: { minScore: 0 }, + remote: { batch: { enabled: true, wait: true } }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + + const result = await getMemorySearchManager({ cfg, agentId: "main" }); + expect(result.manager).not.toBeNull(); + if (!result.manager) throw new Error("manager missing"); + manager = result.manager; + + const pending = new Promise(() => {}); + (manager as unknown as { sync: () => Promise }).sync = vi.fn(async () => pending); + + const resolved = await Promise.race([ + manager.search("hello").then(() => true), + new Promise((resolve) => setTimeout(() => resolve(false), 50)), + ]); + expect(resolved).toBe(true); + }); +}); diff --git a/src/memory/manager.ts b/src/memory/manager.ts index 0e1a173bc..56e5bb6e9 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -101,6 +101,7 @@ const EMBEDDING_RETRY_MAX_ATTEMPTS = 3; const EMBEDDING_RETRY_BASE_DELAY_MS = 500; const EMBEDDING_RETRY_MAX_DELAY_MS = 8000; const BATCH_FAILURE_LIMIT = 2; +const SESSION_DELTA_READ_CHUNK_BYTES = 64 * 1024; const log = createSubsystemLogger("memory"); @@ -158,6 +159,11 @@ export class MemoryIndexManager { private dirty = false; private sessionsDirty = false; private sessionsDirtyFiles = new Set(); + private sessionPendingFiles = new Set(); + private sessionDeltas = new Map< + string, + { lastSize: number; pendingBytes: number; pendingMessages: number } + >(); private sessionWarm = new Set(); private syncing: Promise | null = null; @@ -234,9 +240,6 @@ export class MemoryIndexManager { this.ensureSessionListener(); this.ensureIntervalSync(); this.dirty = this.sources.has("memory"); - if (this.sources.has("sessions")) { - this.sessionsDirty = true; - } this.batch = this.resolveBatchConfig(); } @@ -244,7 +247,9 @@ export class MemoryIndexManager { if (!this.settings.sync.onSessionStart) return; const key = sessionKey?.trim() || ""; if (key && this.sessionWarm.has(key)) return; - await this.sync({ reason: "session-start" }); + void this.sync({ reason: "session-start" }).catch((err) => { + log.warn(`memory sync failed (session-start): ${String(err)}`); + }); if (key) this.sessionWarm.add(key); } @@ -256,9 +261,11 @@ export class MemoryIndexManager { sessionKey?: string; }, ): Promise { - await this.warmSession(opts?.sessionKey); + void this.warmSession(opts?.sessionKey); if (this.settings.sync.onSearch && (this.dirty || this.sessionsDirty)) { - await this.sync({ reason: "search" }); + void this.sync({ reason: "search" }).catch((err) => { + log.warn(`memory sync failed (search): ${String(err)}`); + }); } const cleaned = query.trim(); if (!cleaned) return []; @@ -774,14 +781,135 @@ export class MemoryIndexManager { } private scheduleSessionDirty(sessionFile: string) { - this.sessionsDirtyFiles.add(sessionFile); + this.sessionPendingFiles.add(sessionFile); if (this.sessionWatchTimer) return; this.sessionWatchTimer = setTimeout(() => { this.sessionWatchTimer = null; - this.sessionsDirty = true; + void this.processSessionDeltaBatch().catch((err) => { + log.warn(`memory session delta failed: ${String(err)}`); + }); }, SESSION_DIRTY_DEBOUNCE_MS); } + private async processSessionDeltaBatch(): Promise { + if (this.sessionPendingFiles.size === 0) return; + const pending = Array.from(this.sessionPendingFiles); + this.sessionPendingFiles.clear(); + let shouldSync = false; + for (const sessionFile of pending) { + const delta = await this.updateSessionDelta(sessionFile); + if (!delta) continue; + const bytesThreshold = delta.deltaBytes; + const messagesThreshold = delta.deltaMessages; + const bytesHit = + bytesThreshold <= 0 ? delta.pendingBytes > 0 : delta.pendingBytes >= bytesThreshold; + const messagesHit = + messagesThreshold <= 0 + ? delta.pendingMessages > 0 + : delta.pendingMessages >= messagesThreshold; + if (!bytesHit && !messagesHit) continue; + this.sessionsDirtyFiles.add(sessionFile); + this.sessionsDirty = true; + delta.pendingBytes = + bytesThreshold > 0 ? Math.max(0, delta.pendingBytes - bytesThreshold) : 0; + delta.pendingMessages = + messagesThreshold > 0 ? Math.max(0, delta.pendingMessages - messagesThreshold) : 0; + shouldSync = true; + } + if (shouldSync) { + void this.sync({ reason: "session-delta" }).catch((err) => { + log.warn(`memory sync failed (session-delta): ${String(err)}`); + }); + } + } + + private async updateSessionDelta(sessionFile: string): Promise<{ + deltaBytes: number; + deltaMessages: number; + pendingBytes: number; + pendingMessages: number; + } | null> { + const thresholds = this.settings.sync.sessions; + if (!thresholds) return null; + let stat: { size: number }; + try { + stat = await fs.stat(sessionFile); + } catch { + return null; + } + const size = stat.size; + let state = this.sessionDeltas.get(sessionFile); + if (!state) { + state = { lastSize: 0, pendingBytes: 0, pendingMessages: 0 }; + this.sessionDeltas.set(sessionFile, state); + } + const deltaBytes = Math.max(0, size - state.lastSize); + if (deltaBytes === 0 && size === state.lastSize) { + return { + deltaBytes: thresholds.deltaBytes, + deltaMessages: thresholds.deltaMessages, + pendingBytes: state.pendingBytes, + pendingMessages: state.pendingMessages, + }; + } + if (size < state.lastSize) { + state.lastSize = size; + state.pendingBytes += size; + const shouldCountMessages = + thresholds.deltaMessages > 0 && + (thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes); + if (shouldCountMessages) { + state.pendingMessages += await this.countNewlines(sessionFile, 0, size); + } + } else { + state.pendingBytes += deltaBytes; + const shouldCountMessages = + thresholds.deltaMessages > 0 && + (thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes); + if (shouldCountMessages) { + state.pendingMessages += await this.countNewlines(sessionFile, state.lastSize, size); + } + state.lastSize = size; + } + this.sessionDeltas.set(sessionFile, state); + return { + deltaBytes: thresholds.deltaBytes, + deltaMessages: thresholds.deltaMessages, + pendingBytes: state.pendingBytes, + pendingMessages: state.pendingMessages, + }; + } + + private async countNewlines(absPath: string, start: number, end: number): Promise { + if (end <= start) return 0; + const handle = await fs.open(absPath, "r"); + try { + let offset = start; + let count = 0; + const buffer = Buffer.alloc(SESSION_DELTA_READ_CHUNK_BYTES); + while (offset < end) { + const toRead = Math.min(buffer.length, end - offset); + const { bytesRead } = await handle.read(buffer, 0, toRead, offset); + if (bytesRead <= 0) break; + for (let i = 0; i < bytesRead; i += 1) { + if (buffer[i] === 10) count += 1; + } + offset += bytesRead; + } + return count; + } finally { + await handle.close(); + } + } + + private resetSessionDelta(absPath: string, size: number): void { + const state = this.sessionDeltas.get(absPath); + if (!state) return; + state.lastSize = size; + state.pendingBytes = 0; + state.pendingMessages = 0; + } + private isSessionFileForAgent(sessionFile: string): boolean { if (!sessionFile) return false; const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId); @@ -820,7 +948,8 @@ export class MemoryIndexManager { if (params?.force) return true; const reason = params?.reason; if (reason === "session-start" || reason === "watch") return false; - return this.sessionsDirty || needsFullReindex; + if (needsFullReindex) return true; + return this.sessionsDirty && this.sessionsDirtyFiles.size > 0; } private async syncMemoryFiles(params: { @@ -952,9 +1081,11 @@ export class MemoryIndexManager { total: params.progress.total, }); } + this.resetSessionDelta(absPath, entry.size); return; } await this.indexFile(entry, { source: "sessions", content: entry.content }); + this.resetSessionDelta(absPath, entry.size); if (params.progress) { params.progress.completed += 1; params.progress.report({ @@ -1056,8 +1187,10 @@ export class MemoryIndexManager { await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined }); this.sessionsDirty = false; this.sessionsDirtyFiles.clear(); - } else if (this.sources.has("sessions")) { + } else if (this.sessionsDirtyFiles.size > 0) { this.sessionsDirty = true; + } else { + this.sessionsDirty = false; } } catch (err) { const reason = err instanceof Error ? err.message : String(err); @@ -1197,8 +1330,10 @@ export class MemoryIndexManager { await this.syncSessionFiles({ needsFullReindex: true, progress: params.progress }); this.sessionsDirty = false; this.sessionsDirtyFiles.clear(); - } else if (this.sources.has("sessions")) { + } else if (this.sessionsDirtyFiles.size > 0) { this.sessionsDirty = true; + } else { + this.sessionsDirty = false; } nextMeta = {