diff --git a/CHANGELOG.md b/CHANGELOG.md index 698bd521b..cf47073ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ Docs: https://docs.clawd.bot - Directory: unify `clawdbot directory` across channels and plugin channels. - UI: allow deleting sessions from the Control UI. - Memory: add sqlite-vec vector acceleration with CLI status details. +- Memory: add experimental session transcript indexing for memory_search (opt-in via memorySearch.experimental.sessionMemory + sources). - Skills: add user-invocable skill commands and expanded skill command registration. - Telegram: default reaction level to minimal and enable reaction notifications by default. - Telegram: allow reply-chain messages to bypass mention gating in groups. (#1038) — thanks @adityashaw2. diff --git a/docs/concepts/memory.md b/docs/concepts/memory.md index cecf8d7a8..138f475b3 100644 --- a/docs/concepts/memory.md +++ b/docs/concepts/memory.md @@ -144,6 +144,28 @@ Local mode: - Index storage: per-agent SQLite at `~/.clawdbot/state/memory/.sqlite` (configurable via `agents.defaults.memorySearch.store.path`, supports `{agentId}` token). - Freshness: watcher on `MEMORY.md` + `memory/` marks the index dirty (debounce 1.5s). Sync runs on session start, on first search when dirty, and optionally on an interval. Reindex triggers when embedding model/provider or chunk sizes change. +### Session memory search (experimental) + +You can optionally index **session transcripts** and surface them via `memory_search`. +This is gated behind an experimental flag. + +```json5 +agents: { + defaults: { + memorySearch: { + experimental: { sessionMemory: true }, + sources: ["memory", "sessions"] + } + } +} +``` + +Notes: +- Session indexing is **opt-in** (off by default). +- Session updates are debounced and indexed lazily on the next `memory_search` (or manual `clawdbot memory index`). +- Results still include snippets only; `memory_get` remains limited to memory files. +- Session indexing is isolated per agent (only that agent’s session logs are indexed). + ### SQLite vector acceleration (sqlite-vec) When the sqlite-vec extension is available, Clawdbot stores embeddings in a diff --git a/src/agents/memory-search.test.ts b/src/agents/memory-search.test.ts index 204e74f97..33a0effc7 100644 --- a/src/agents/memory-search.test.ts +++ b/src/agents/memory-search.test.ts @@ -99,4 +99,42 @@ describe("memory search config", () => { headers: { "X-Default": "on" }, }); }); + + it("gates session sources behind experimental flag", () => { + const cfg = { + agents: { + defaults: { + memorySearch: { + sources: ["memory", "sessions"], + }, + }, + list: [ + { + id: "main", + default: true, + memorySearch: { + experimental: { sessionMemory: false }, + }, + }, + ], + }, + }; + const resolved = resolveMemorySearchConfig(cfg, "main"); + expect(resolved?.sources).toEqual(["memory"]); + }); + + it("allows session sources when experimental flag is enabled", () => { + const cfg = { + agents: { + defaults: { + memorySearch: { + sources: ["memory", "sessions"], + experimental: { sessionMemory: true }, + }, + }, + }, + }; + const resolved = resolveMemorySearchConfig(cfg, "main"); + expect(resolved?.sources).toContain("sessions"); + }); }); diff --git a/src/agents/memory-search.ts b/src/agents/memory-search.ts index 33f5c3ee7..f8a95f357 100644 --- a/src/agents/memory-search.ts +++ b/src/agents/memory-search.ts @@ -8,12 +8,16 @@ import { resolveAgentConfig } from "./agent-scope.js"; export type ResolvedMemorySearchConfig = { enabled: boolean; + sources: Array<"memory" | "sessions">; provider: "openai" | "local"; remote?: { baseUrl?: string; apiKey?: string; headers?: Record; }; + experimental: { + sessionMemory: boolean; + }; fallback: "openai" | "none"; model: string; local: { @@ -51,6 +55,21 @@ const DEFAULT_CHUNK_OVERLAP = 80; const DEFAULT_WATCH_DEBOUNCE_MS = 1500; const DEFAULT_MAX_RESULTS = 6; const DEFAULT_MIN_SCORE = 0.35; +const DEFAULT_SOURCES: Array<"memory" | "sessions"> = ["memory"]; + +function normalizeSources( + sources: Array<"memory" | "sessions"> | undefined, + sessionMemoryEnabled: boolean, +): Array<"memory" | "sessions"> { + const normalized = new Set<"memory" | "sessions">(); + const input = sources?.length ? sources : DEFAULT_SOURCES; + for (const source of input) { + if (source === "memory") normalized.add("memory"); + if (source === "sessions" && sessionMemoryEnabled) normalized.add("sessions"); + } + if (normalized.size === 0) normalized.add("memory"); + return Array.from(normalized); +} function resolveStorePath(agentId: string, raw?: string): string { const stateDir = resolveStateDir(process.env, os.homedir); @@ -66,6 +85,8 @@ function mergeConfig( agentId: string, ): ResolvedMemorySearchConfig { const enabled = overrides?.enabled ?? defaults?.enabled ?? true; + const sessionMemory = + overrides?.experimental?.sessionMemory ?? defaults?.experimental?.sessionMemory ?? false; const provider = overrides?.provider ?? defaults?.provider ?? "openai"; const hasRemote = Boolean(defaults?.remote || overrides?.remote); const remote = hasRemote @@ -81,6 +102,7 @@ function mergeConfig( modelPath: overrides?.local?.modelPath ?? defaults?.local?.modelPath, modelCacheDir: overrides?.local?.modelCacheDir ?? defaults?.local?.modelCacheDir, }; + const sources = normalizeSources(overrides?.sources ?? defaults?.sources, sessionMemory); const vector = { enabled: overrides?.store?.vector?.enabled ?? defaults?.store?.vector?.enabled ?? true, extensionPath: @@ -114,8 +136,12 @@ function mergeConfig( const minScore = Math.max(0, Math.min(1, query.minScore)); return { enabled, + sources, provider, remote, + experimental: { + sessionMemory, + }, fallback, model, local, diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index 77326bd62..43294cdfd 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -2,6 +2,7 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { SessionManager } from "@mariozechner/pi-coding-agent"; import { makeMissingToolResult } from "./session-transcript-repair.js"; +import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; type ToolCall = { id: string; name?: string }; @@ -111,6 +112,12 @@ export function installSessionToolResultGuard(sessionManager: SessionManager): { const result = originalAppend(sanitized as never); + const sessionFile = (sessionManager as { getSessionFile?: () => string | null }) + .getSessionFile?.(); + if (sessionFile) { + emitSessionTranscriptUpdate(sessionFile); + } + if (toolCalls.length > 0) { for (const call of toolCalls) { pending.set(call.id, call.name); diff --git a/src/agents/tools/memory-tool.ts b/src/agents/tools/memory-tool.ts index 28ea5971f..e571d3828 100644 --- a/src/agents/tools/memory-tool.ts +++ b/src/agents/tools/memory-tool.ts @@ -34,7 +34,7 @@ export function createMemorySearchTool(options: { label: "Memory Search", name: "memory_search", description: - "Mandatory recall step: semantically search MEMORY.md + memory/*.md before answering questions about prior work, decisions, dates, people, preferences, or todos; returns top snippets with path + lines.", + "Mandatory recall step: semantically search MEMORY.md + memory/*.md (and optional session transcripts) before answering questions about prior work, decisions, dates, people, preferences, or todos; returns top snippets with path + lines.", parameters: MemorySearchSchema, execute: async (_toolCallId, params) => { const query = readStringParam(params, "query", { required: true }); diff --git a/src/cli/memory-cli.ts b/src/cli/memory-cli.ts index 38e067367..418f4b691 100644 --- a/src/cli/memory-cli.ts +++ b/src/cli/memory-cli.ts @@ -51,6 +51,7 @@ export function registerMemoryCli(program: Command) { `${chalk.bold.cyan("Memory Search")} (${agentId})`, `Provider: ${status.provider} (requested: ${status.requestedProvider})`, status.fallback ? chalk.yellow(`Fallback: ${status.fallback.from}`) : null, + status.sources?.length ? `Sources: ${status.sources.join(", ")}` : null, `Files: ${status.files}`, `Chunks: ${status.chunks}`, `Dirty: ${status.dirty ? "yes" : "no"}`, diff --git a/src/config/schema.ts b/src/config/schema.ts index 11f4aefb6..9fa00a68d 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -168,6 +168,9 @@ const FIELD_LABELS: Record = { "agents.defaults.bootstrapMaxChars": "Bootstrap Max Chars", "agents.defaults.memorySearch": "Memory Search", "agents.defaults.memorySearch.enabled": "Enable Memory Search", + "agents.defaults.memorySearch.sources": "Memory Search Sources", + "agents.defaults.memorySearch.experimental.sessionMemory": + "Memory Search Session Index (Experimental)", "agents.defaults.memorySearch.provider": "Memory Search Provider", "agents.defaults.memorySearch.remote.baseUrl": "Remote Embedding Base URL", "agents.defaults.memorySearch.remote.apiKey": "Remote Embedding API Key", @@ -353,6 +356,10 @@ const FIELD_HELP: Record = { "agents.defaults.models": "Configured model catalog (keys are full provider/model IDs).", "agents.defaults.memorySearch": "Vector search over MEMORY.md and memory/*.md (per-agent overrides supported).", + "agents.defaults.memorySearch.sources": + 'Sources to index for memory search (default: ["memory"]; add "sessions" to include session transcripts).', + "agents.defaults.memorySearch.experimental.sessionMemory": + "Enable experimental session transcript indexing for memory search (default: false).", "agents.defaults.memorySearch.provider": 'Embedding provider ("openai" or "local").', "agents.defaults.memorySearch.remote.baseUrl": "Custom OpenAI-compatible base URL (e.g. for Gemini/OpenRouter proxies).", diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index ace198224..f7d873e75 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -6,6 +6,7 @@ import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding import type { SessionEntry } from "./types.js"; import { loadSessionStore, updateSessionStore } from "./store.js"; import { resolveDefaultSessionStorePath, resolveSessionTranscriptPath } from "./paths.js"; +import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; function stripQuery(value: string): string { const noHash = value.split("#")[0] ?? value; @@ -127,5 +128,6 @@ export async function appendAssistantMessageToSessionTranscript(params: { }); } + emitSessionTranscriptUpdate(sessionFile); return { ok: true, sessionFile }; } diff --git a/src/config/types.tools.ts b/src/config/types.tools.ts index 07b94e82b..0d9b2f02a 100644 --- a/src/config/types.tools.ts +++ b/src/config/types.tools.ts @@ -145,6 +145,13 @@ export type AgentToolsConfig = { export type MemorySearchConfig = { /** Enable vector memory search (default: true). */ enabled?: boolean; + /** Sources to index and search (default: ["memory"]). */ + sources?: Array<"memory" | "sessions">; + /** Experimental memory search settings. */ + experimental?: { + /** Enable session transcript indexing (experimental, default: false). */ + sessionMemory?: boolean; + }; /** Embedding provider mode. */ provider?: "openai" | "local"; remote?: { diff --git a/src/config/zod-schema.agent-runtime.ts b/src/config/zod-schema.agent-runtime.ts index 45c02d2a0..f8d06e17a 100644 --- a/src/config/zod-schema.agent-runtime.ts +++ b/src/config/zod-schema.agent-runtime.ts @@ -194,6 +194,12 @@ export const AgentToolsSchema = z export const MemorySearchSchema = z .object({ enabled: z.boolean().optional(), + sources: z.array(z.union([z.literal("memory"), z.literal("sessions")])).optional(), + experimental: z + .object({ + sessionMemory: z.boolean().optional(), + }) + .optional(), provider: z.union([z.literal("openai"), z.literal("local")]).optional(), remote: z .object({ diff --git a/src/memory/manager.ts b/src/memory/manager.ts index 47072ca06..faf9e1392 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -8,7 +8,9 @@ import { resolveAgentDir, resolveAgentWorkspaceDir } from "../agents/agent-scope import type { ResolvedMemorySearchConfig } from "../agents/memory-search.js"; import { resolveMemorySearchConfig } from "../agents/memory-search.js"; import type { ClawdbotConfig } from "../config/config.js"; +import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; import { createSubsystemLogger } from "../logging.js"; +import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { resolveUserPath, truncateUtf16Safe } from "../utils.js"; import { createEmbeddingProvider, @@ -29,12 +31,15 @@ import { } from "./internal.js"; import { requireNodeSqlite } from "./sqlite.js"; +type MemorySource = "memory" | "sessions"; + export type MemorySearchResult = { path: string; startLine: number; endLine: number; score: number; snippet: string; + source: MemorySource; }; type MemoryIndexMeta = { @@ -45,9 +50,19 @@ type MemoryIndexMeta = { vectorDims?: number; }; +type SessionFileEntry = { + path: string; + absPath: string; + mtimeMs: number; + size: number; + hash: string; + content: string; +}; + const META_KEY = "memory_index_meta_v1"; const SNIPPET_MAX_CHARS = 700; const VECTOR_TABLE = "chunks_vec"; +const SESSION_DIRTY_DEBOUNCE_MS = 5000; const log = createSubsystemLogger("memory"); @@ -66,6 +81,7 @@ export class MemoryIndexManager { private readonly requestedProvider: "openai" | "local"; private readonly fallbackReason?: string; private readonly db: DatabaseSync; + private readonly sources: Set; private readonly vector: { enabled: boolean; available: boolean | null; @@ -76,9 +92,13 @@ export class MemoryIndexManager { private vectorReady: Promise | null = null; private watcher: FSWatcher | null = null; private watchTimer: NodeJS.Timeout | null = null; + private sessionWatchTimer: NodeJS.Timeout | null = null; + private sessionUnsubscribe: (() => void) | null = null; private intervalTimer: NodeJS.Timeout | null = null; private closed = false; private dirty = false; + private sessionsDirty = false; + private sessionsDirtyFiles = new Set(); private sessionWarm = new Set(); private syncing: Promise | null = null; @@ -130,6 +150,7 @@ export class MemoryIndexManager { this.provider = params.providerResult.provider; this.requestedProvider = params.providerResult.requestedProvider; this.fallbackReason = params.providerResult.fallbackReason; + this.sources = new Set(params.settings.sources); this.db = this.openDatabase(); this.ensureSchema(); this.vector = { @@ -142,8 +163,12 @@ export class MemoryIndexManager { this.vector.dims = meta.vectorDims; } this.ensureWatcher(); + this.ensureSessionListener(); this.ensureIntervalSync(); - this.dirty = true; + this.dirty = this.sources.has("memory"); + if (this.sources.has("sessions")) { + this.sessionsDirty = true; + } } async warmSession(sessionKey?: string): Promise { @@ -163,7 +188,7 @@ export class MemoryIndexManager { }, ): Promise { await this.warmSession(opts?.sessionKey); - if (this.settings.sync.onSearch && this.dirty) { + if (this.settings.sync.onSearch && (this.dirty || this.sessionsDirty)) { await this.sync({ reason: "search" }); } const cleaned = query.trim(); @@ -173,21 +198,29 @@ export class MemoryIndexManager { const queryVec = await this.provider.embedQuery(cleaned); if (queryVec.length === 0) return []; if (await this.ensureVectorReady(queryVec.length)) { + const sourceFilter = this.buildSourceFilter("c"); const rows = this.db .prepare( `SELECT c.path, c.start_line, c.end_line, c.text, + c.source, vec_distance_cosine(v.embedding, ?) AS dist FROM ${VECTOR_TABLE} v JOIN chunks c ON c.id = v.id - WHERE c.model = ? + WHERE c.model = ?${sourceFilter.sql} ORDER BY dist ASC LIMIT ?`, ) - .all(vectorToBlob(queryVec), this.provider.model, maxResults) as Array<{ + .all( + vectorToBlob(queryVec), + this.provider.model, + ...sourceFilter.params, + maxResults, + ) as Array<{ path: string; start_line: number; end_line: number; text: string; + source: MemorySource; dist: number; }>; return rows @@ -197,6 +230,7 @@ export class MemoryIndexManager { endLine: row.end_line, score: 1 - row.dist, snippet: truncateUtf16Safe(row.text, SNIPPET_MAX_CHARS), + source: row.source, })) .filter((entry) => entry.score >= minScore); } @@ -217,6 +251,7 @@ export class MemoryIndexManager { endLine: entry.chunk.endLine, score: entry.score, snippet: truncateUtf16Safe(entry.chunk.text, SNIPPET_MAX_CHARS), + source: entry.chunk.source, })); } @@ -261,6 +296,7 @@ export class MemoryIndexManager { provider: string; model: string; requestedProvider: string; + sources: MemorySource[]; fallback?: { from: string; reason?: string }; vector?: { enabled: boolean; @@ -270,10 +306,15 @@ export class MemoryIndexManager { dims?: number; }; } { - const files = this.db.prepare(`SELECT COUNT(*) as c FROM files`).get() as { + const sourceFilter = this.buildSourceFilter(); + const files = this.db + .prepare(`SELECT COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql}`) + .get(...sourceFilter.params) as { c: number; }; - const chunks = this.db.prepare(`SELECT COUNT(*) as c FROM chunks`).get() as { + const chunks = this.db + .prepare(`SELECT COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql}`) + .get(...sourceFilter.params) as { c: number; }; return { @@ -285,6 +326,7 @@ export class MemoryIndexManager { provider: this.provider.id, model: this.provider.model, requestedProvider: this.requestedProvider, + sources: Array.from(this.sources), fallback: this.fallbackReason ? { from: "local", reason: this.fallbackReason } : undefined, vector: { enabled: this.vector.enabled, @@ -303,6 +345,10 @@ export class MemoryIndexManager { clearTimeout(this.watchTimer); this.watchTimer = null; } + if (this.sessionWatchTimer) { + clearTimeout(this.sessionWatchTimer); + this.sessionWatchTimer = null; + } if (this.intervalTimer) { clearInterval(this.intervalTimer); this.intervalTimer = null; @@ -311,6 +357,10 @@ export class MemoryIndexManager { await this.watcher.close(); this.watcher = null; } + if (this.sessionUnsubscribe) { + this.sessionUnsubscribe(); + this.sessionUnsubscribe = null; + } this.db.close(); INDEX_CACHE.delete(this.cacheKey); } @@ -379,6 +429,22 @@ export class MemoryIndexManager { } } + private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } { + const sources = Array.from(this.sources); + if (sources.length === 0) return { sql: "", params: [] }; + const column = alias ? `${alias}.source` : "source"; + const placeholders = sources.map(() => "?").join(", "); + return { sql: ` AND ${column} IN (${placeholders})`, params: sources }; + } + + private ensureColumn(table: "files" | "chunks", column: string, definition: string): void { + const rows = this.db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ + name: string; + }>; + if (rows.some((row) => row.name === column)) return; + this.db.exec(`ALTER TABLE ${table} ADD COLUMN ${column} ${definition}`); + } + private openDatabase(): DatabaseSync { const dbPath = resolveUserPath(this.settings.store.path); const dir = path.dirname(dbPath); @@ -397,6 +463,7 @@ export class MemoryIndexManager { this.db.exec(` CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, + source TEXT NOT NULL DEFAULT 'memory', hash TEXT NOT NULL, mtime INTEGER NOT NULL, size INTEGER NOT NULL @@ -406,6 +473,7 @@ export class MemoryIndexManager { CREATE TABLE IF NOT EXISTS chunks ( id TEXT PRIMARY KEY, path TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'memory', start_line INTEGER NOT NULL, end_line INTEGER NOT NULL, hash TEXT NOT NULL, @@ -415,11 +483,14 @@ export class MemoryIndexManager { updated_at INTEGER NOT NULL ); `); + this.ensureColumn("files", "source", "TEXT NOT NULL DEFAULT 'memory'"); + this.ensureColumn("chunks", "source", "TEXT NOT NULL DEFAULT 'memory'"); this.db.exec(`CREATE INDEX IF NOT EXISTS idx_chunks_path ON chunks(path);`); + this.db.exec(`CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source);`); } private ensureWatcher() { - if (!this.settings.sync.watch || this.watcher) return; + if (!this.sources.has("memory") || !this.settings.sync.watch || this.watcher) return; const watchPaths = [ path.join(this.workspaceDir, "MEMORY.md"), path.join(this.workspaceDir, "memory"), @@ -440,6 +511,33 @@ export class MemoryIndexManager { this.watcher.on("unlink", markDirty); } + private ensureSessionListener() { + if (!this.sources.has("sessions") || this.sessionUnsubscribe) return; + this.sessionUnsubscribe = onSessionTranscriptUpdate((update) => { + if (this.closed) return; + const sessionFile = update.sessionFile; + if (!this.isSessionFileForAgent(sessionFile)) return; + this.scheduleSessionDirty(sessionFile); + }); + } + + private scheduleSessionDirty(sessionFile: string) { + this.sessionsDirtyFiles.add(sessionFile); + if (this.sessionWatchTimer) return; + this.sessionWatchTimer = setTimeout(() => { + this.sessionWatchTimer = null; + this.sessionsDirty = true; + }, SESSION_DIRTY_DEBOUNCE_MS); + } + + private isSessionFileForAgent(sessionFile: string): boolean { + if (!sessionFile) return false; + const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId); + const resolvedFile = path.resolve(sessionFile); + const resolvedDir = path.resolve(sessionsDir); + return resolvedFile.startsWith(`${resolvedDir}${path.sep}`); + } + private ensureIntervalSync() { const minutes = this.settings.sync.intervalMinutes; if (!minutes || minutes <= 0 || this.intervalTimer) return; @@ -452,7 +550,7 @@ export class MemoryIndexManager { } private scheduleWatchSync() { - if (!this.settings.sync.watch) return; + if (!this.sources.has("memory") || !this.settings.sync.watch) return; if (this.watchTimer) clearTimeout(this.watchTimer); this.watchTimer = setTimeout(() => { this.watchTimer = null; @@ -468,15 +566,22 @@ export class MemoryIndexManager { endLine: number; text: string; embedding: number[]; + source: MemorySource; }> { + const sourceFilter = this.buildSourceFilter(); const rows = this.db - .prepare(`SELECT path, start_line, end_line, text, embedding FROM chunks WHERE model = ?`) - .all(this.provider.model) as Array<{ + .prepare( + `SELECT path, start_line, end_line, text, embedding, source + FROM chunks + WHERE model = ?${sourceFilter.sql}`, + ) + .all(this.provider.model, ...sourceFilter.params) as Array<{ path: string; start_line: number; end_line: number; text: string; embedding: string; + source: MemorySource; }>; return rows.map((row) => ({ path: row.path, @@ -484,9 +589,77 @@ export class MemoryIndexManager { endLine: row.end_line, text: row.text, embedding: parseEmbedding(row.embedding), + source: row.source, })); } + private shouldSyncSessions(params?: { reason?: string; force?: boolean }, needsFullReindex = false) { + if (!this.sources.has("sessions")) return false; + if (params?.force) return true; + const reason = params?.reason; + if (reason === "session-start" || reason === "watch") return false; + return this.sessionsDirty || needsFullReindex; + } + + private async syncMemoryFiles(params: { needsFullReindex: boolean }) { + const files = await listMemoryFiles(this.workspaceDir); + const fileEntries = await Promise.all( + files.map(async (file) => buildFileEntry(file, this.workspaceDir)), + ); + const activePaths = new Set(fileEntries.map((entry) => entry.path)); + + for (const entry of fileEntries) { + const record = this.db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(entry.path, "memory") as { hash: string } | undefined; + if (!params.needsFullReindex && record?.hash === entry.hash) { + continue; + } + await this.indexFile(entry, { source: "memory" }); + } + + const staleRows = this.db + .prepare(`SELECT path FROM files WHERE source = ?`) + .all("memory") as Array<{ path: string }>; + for (const stale of staleRows) { + if (activePaths.has(stale.path)) continue; + this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); + this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); + } + } + + private async syncSessionFiles(params: { needsFullReindex: boolean }) { + const files = await this.listSessionFiles(); + const activePaths = new Set(files.map((file) => this.sessionPathForFile(file))); + const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0; + + for (const absPath of files) { + if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) continue; + const entry = await this.buildSessionEntry(absPath); + if (!entry) continue; + const record = this.db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(entry.path, "sessions") as { hash: string } | undefined; + if (!params.needsFullReindex && record?.hash === entry.hash) { + continue; + } + await this.indexFile(entry, { source: "sessions", content: entry.content }); + } + + const staleRows = this.db + .prepare(`SELECT path FROM files WHERE source = ?`) + .all("sessions") as Array<{ path: string }>; + for (const stale of staleRows) { + if (activePaths.has(stale.path)) continue; + this.db + .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) + .run(stale.path, "sessions"); + this.db + .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) + .run(stale.path, "sessions"); + } + } + private async runSync(params?: { reason?: string; force?: boolean }) { const vectorReady = await this.ensureVectorReady(); const meta = this.readMeta(); @@ -502,29 +675,21 @@ export class MemoryIndexManager { this.resetIndex(); } - const files = await listMemoryFiles(this.workspaceDir); - const fileEntries = await Promise.all( - files.map(async (file) => buildFileEntry(file, this.workspaceDir)), - ); - const activePaths = new Set(fileEntries.map((entry) => entry.path)); + const shouldSyncMemory = + this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty); + const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); - for (const entry of fileEntries) { - const record = this.db.prepare(`SELECT hash FROM files WHERE path = ?`).get(entry.path) as - | { hash: string } - | undefined; - if (!needsFullReindex && record?.hash === entry.hash) { - continue; - } - await this.indexFile(entry); + if (shouldSyncMemory) { + await this.syncMemoryFiles({ needsFullReindex }); + this.dirty = false; } - const staleRows = this.db.prepare(`SELECT path FROM files`).all() as Array<{ - path: string; - }>; - for (const stale of staleRows) { - if (activePaths.has(stale.path)) continue; - this.db.prepare(`DELETE FROM files WHERE path = ?`).run(stale.path); - this.db.prepare(`DELETE FROM chunks WHERE path = ?`).run(stale.path); + if (shouldSyncSessions) { + await this.syncSessionFiles({ needsFullReindex }); + this.sessionsDirty = false; + this.sessionsDirtyFiles.clear(); + } else if (needsFullReindex && this.sources.has("sessions")) { + this.sessionsDirty = true; } const nextMeta: MemoryIndexMeta = { @@ -536,8 +701,9 @@ export class MemoryIndexManager { if (this.vector.available && this.vector.dims) { nextMeta.vectorDims = this.vector.dims; } - this.writeMeta(nextMeta); - this.dirty = false; + if (shouldSyncMemory || shouldSyncSessions || needsFullReindex) { + this.writeMeta(nextMeta); + } } private resetIndex() { @@ -545,6 +711,7 @@ export class MemoryIndexManager { this.db.exec(`DELETE FROM chunks`); this.dropVectorTable(); this.vector.dims = undefined; + this.sessionsDirtyFiles.clear(); } private readMeta(): MemoryIndexMeta | null { @@ -568,24 +735,115 @@ export class MemoryIndexManager { .run(META_KEY, value); } - private async indexFile(entry: MemoryFileEntry) { - const content = await fs.readFile(entry.absPath, "utf-8"); + private async listSessionFiles(): Promise { + const dir = resolveSessionTranscriptsDirForAgent(this.agentId); + try { + const entries = await fs.readdir(dir, { withFileTypes: true }); + return entries + .filter((entry) => entry.isFile()) + .map((entry) => entry.name) + .filter((name) => name.endsWith(".jsonl")) + .map((name) => path.join(dir, name)); + } catch { + return []; + } + } + + private sessionPathForFile(absPath: string): string { + return path.join("sessions", path.basename(absPath)).replace(/\\/g, "/"); + } + + private normalizeSessionText(value: string): string { + return value.replace(/\s*\n+\s*/g, " ").replace(/\s+/g, " ").trim(); + } + + private extractSessionText(content: unknown): string | null { + if (typeof content === "string") { + const normalized = this.normalizeSessionText(content); + return normalized ? normalized : null; + } + if (!Array.isArray(content)) return null; + const parts: string[] = []; + for (const block of content) { + if (!block || typeof block !== "object") continue; + const record = block as { type?: unknown; text?: unknown }; + if (record.type !== "text" || typeof record.text !== "string") continue; + const normalized = this.normalizeSessionText(record.text); + if (normalized) parts.push(normalized); + } + if (parts.length === 0) return null; + return parts.join(" "); + } + + private async buildSessionEntry(absPath: string): Promise { + try { + const stat = await fs.stat(absPath); + const raw = await fs.readFile(absPath, "utf-8"); + const lines = raw.split("\n"); + const collected: string[] = []; + for (const line of lines) { + if (!line.trim()) continue; + let record: unknown; + try { + record = JSON.parse(line); + } catch { + continue; + } + if ( + !record || + typeof record !== "object" || + (record as { type?: unknown }).type !== "message" + ) { + continue; + } + const message = (record as { message?: unknown }).message as + | { role?: unknown; content?: unknown } + | undefined; + if (!message || typeof message.role !== "string") continue; + if (message.role !== "user" && message.role !== "assistant") continue; + const text = this.extractSessionText(message.content); + if (!text) continue; + const label = message.role === "user" ? "User" : "Assistant"; + collected.push(`${label}: ${text}`); + } + const content = collected.join("\n"); + return { + path: this.sessionPathForFile(absPath), + absPath, + mtimeMs: stat.mtimeMs, + size: stat.size, + hash: hashText(content), + content, + }; + } catch (err) { + log.debug(`Failed reading session file ${absPath}: ${String(err)}`); + return null; + } + } + + private async indexFile( + entry: MemoryFileEntry | SessionFileEntry, + options: { source: MemorySource; content?: string }, + ) { + const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8")); const chunks = chunkMarkdown(content, this.settings.chunking); const embeddings = await this.provider.embedBatch(chunks.map((chunk) => chunk.text)); const sample = embeddings.find((embedding) => embedding.length > 0); const vectorReady = sample ? await this.ensureVectorReady(sample.length) : false; const now = Date.now(); - this.db.prepare(`DELETE FROM chunks WHERE path = ?`).run(entry.path); + this.db + .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) + .run(entry.path, options.source); for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]; const embedding = embeddings[i] ?? []; const id = hashText( - `${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${this.provider.model}`, + `${options.source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${this.provider.model}`, ); this.db .prepare( - `INSERT INTO chunks (id, path, start_line, end_line, hash, model, text, embedding, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `INSERT INTO chunks (id, path, source, start_line, end_line, hash, model, text, embedding, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET hash=excluded.hash, model=excluded.model, @@ -596,6 +854,7 @@ export class MemoryIndexManager { .run( id, entry.path, + options.source, chunk.startLine, chunk.endLine, chunk.hash, @@ -612,9 +871,13 @@ export class MemoryIndexManager { } this.db .prepare( - `INSERT INTO files (path, hash, mtime, size) VALUES (?, ?, ?, ?) - ON CONFLICT(path) DO UPDATE SET hash=excluded.hash, mtime=excluded.mtime, size=excluded.size`, + `INSERT INTO files (path, source, hash, mtime, size) VALUES (?, ?, ?, ?, ?) + ON CONFLICT(path) DO UPDATE SET + source=excluded.source, + hash=excluded.hash, + mtime=excluded.mtime, + size=excluded.size`, ) - .run(entry.path, entry.hash, entry.mtimeMs, entry.size); + .run(entry.path, options.source, entry.hash, entry.mtimeMs, entry.size); } } diff --git a/src/sessions/transcript-events.ts b/src/sessions/transcript-events.ts new file mode 100644 index 000000000..88a9cd7b7 --- /dev/null +++ b/src/sessions/transcript-events.ts @@ -0,0 +1,23 @@ +type SessionTranscriptUpdate = { + sessionFile: string; +}; + +type SessionTranscriptListener = (update: SessionTranscriptUpdate) => void; + +const SESSION_TRANSCRIPT_LISTENERS = new Set(); + +export function onSessionTranscriptUpdate(listener: SessionTranscriptListener): () => void { + SESSION_TRANSCRIPT_LISTENERS.add(listener); + return () => { + SESSION_TRANSCRIPT_LISTENERS.delete(listener); + }; +} + +export function emitSessionTranscriptUpdate(sessionFile: string): void { + const trimmed = sessionFile.trim(); + if (!trimmed) return; + const update = { sessionFile: trimmed }; + for (const listener of SESSION_TRANSCRIPT_LISTENERS) { + listener(update); + } +}