From b2de667b11272d3784fc1ff593dac57dd46d1439 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 7 Jan 2026 22:56:50 +0000 Subject: [PATCH] fix: persist topic session files --- src/agents/pi-embedded-runner.ts | 16 +++++------- src/auto-reply/reply.ts | 4 +-- src/auto-reply/reply/session.test.ts | 27 +++++++++++++++++++ src/auto-reply/reply/session.ts | 8 ++++++ src/config/cache-utils.ts | 27 +++++++++++++++++++ src/config/sessions.ts | 39 +++++++++++----------------- 6 files changed, 84 insertions(+), 37 deletions(-) create mode 100644 src/config/cache-utils.ts diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index ec15416f1..cf428d64f 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -25,6 +25,7 @@ import type { VerboseLevel, } from "../auto-reply/thinking.js"; import { formatToolAggregate } from "../auto-reply/tool-meta.js"; +import { isCacheEnabled, resolveCacheTtlMs } from "../config/cache-utils.js"; import type { ClawdbotConfig } from "../config/config.js"; import { getMachineDisplayName } from "../infra/machine-name.js"; import { createSubsystemLogger } from "../logging.js"; @@ -340,19 +341,14 @@ const SESSION_MANAGER_CACHE = new Map(); const DEFAULT_SESSION_MANAGER_TTL_MS = 45_000; // 45 seconds function getSessionManagerTtl(): number { - const envTtl = process.env.CLAWDBOT_SESSION_MANAGER_CACHE_TTL_MS; - if (envTtl) { - const parsed = Number.parseInt(envTtl, 10); - if (Number.isFinite(parsed) && parsed >= 0) { - return parsed; - } - } - return DEFAULT_SESSION_MANAGER_TTL_MS; + return resolveCacheTtlMs({ + envValue: process.env.CLAWDBOT_SESSION_MANAGER_CACHE_TTL_MS, + defaultTtlMs: DEFAULT_SESSION_MANAGER_TTL_MS, + }); } function isSessionManagerCacheEnabled(): boolean { - const ttl = getSessionManagerTtl(); - return ttl > 0; + return isCacheEnabled(getSessionManagerTtl()); } function trackSessionManagerAccess(sessionFile: string): void { diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 9f481978d..e7f95bb0d 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -722,9 +722,7 @@ export async function getReplyFromConfig( resolvedThinkLevel = await modelState.resolveDefaultThinkingLevel(); } const sessionIdFinal = sessionId ?? crypto.randomUUID(); - const sessionFile = resolveSessionFilePath(sessionIdFinal, sessionEntry, { - topicId: ctx.MessageThreadId, - }); + const sessionFile = resolveSessionFilePath(sessionIdFinal, sessionEntry); const queueBodyBase = transcribedText ? [threadStarterNote, baseBodyFinal, `Transcript:\n${transcribedText}`] .filter(Boolean) diff --git a/src/auto-reply/reply/session.test.ts b/src/auto-reply/reply/session.test.ts index f511aceab..abcfe6996 100644 --- a/src/auto-reply/reply/session.test.ts +++ b/src/auto-reply/reply/session.test.ts @@ -82,4 +82,31 @@ describe("initSessionState thread forking", () => { }; expect(parsedHeader.parentSession).toBe(parentSessionFile); }); + + it("records topic-specific session files when MessageThreadId is present", async () => { + const root = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdbot-topic-session-"), + ); + const storePath = path.join(root, "sessions.json"); + + const cfg = { + session: { store: storePath }, + } as ClawdbotConfig; + + const result = await initSessionState({ + ctx: { + Body: "Hello topic", + SessionKey: "agent:main:telegram:group:123:topic:456", + MessageThreadId: 456, + }, + cfg, + commandAuthorized: true, + }); + + const sessionFile = result.sessionEntry.sessionFile; + expect(sessionFile).toBeTruthy(); + expect(path.basename(sessionFile ?? "")).toBe( + `${result.sessionEntry.sessionId}-topic-456.jsonl`, + ); + }); }); diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 872e798f0..0b141d82a 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -17,6 +17,7 @@ import { resolveGroupSessionKey, resolveSessionFilePath, resolveSessionKey, + resolveSessionTranscriptPath, resolveStorePath, type SessionEntry, type SessionScope, @@ -255,6 +256,13 @@ export async function initSessionState(params: { sessionEntry.sessionFile = forked.sessionFile; } } + if (!sessionEntry.sessionFile) { + sessionEntry.sessionFile = resolveSessionTranscriptPath( + sessionEntry.sessionId, + agentId, + ctx.MessageThreadId, + ); + } sessionStore[sessionKey] = sessionEntry; await saveSessionStore(storePath, sessionStore); diff --git a/src/config/cache-utils.ts b/src/config/cache-utils.ts new file mode 100644 index 000000000..df0178764 --- /dev/null +++ b/src/config/cache-utils.ts @@ -0,0 +1,27 @@ +import fs from "node:fs"; + +export function resolveCacheTtlMs(params: { + envValue: string | undefined; + defaultTtlMs: number; +}): number { + const { envValue, defaultTtlMs } = params; + if (envValue) { + const parsed = Number.parseInt(envValue, 10); + if (Number.isFinite(parsed) && parsed >= 0) { + return parsed; + } + } + return defaultTtlMs; +} + +export function isCacheEnabled(ttlMs: number): boolean { + return ttlMs > 0; +} + +export function getFileMtimeMs(filePath: string): number | undefined { + try { + return fs.statSync(filePath).mtimeMs; + } catch { + return undefined; + } +} diff --git a/src/config/sessions.ts b/src/config/sessions.ts index 6f30474c0..cbb348a27 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -14,6 +14,11 @@ import { parseAgentSessionKey, } from "../routing/session-key.js"; import { normalizeE164 } from "../utils.js"; +import { + getFileMtimeMs, + isCacheEnabled, + resolveCacheTtlMs, +} from "./cache-utils.js"; import { resolveStateDir } from "./paths.js"; // ============================================================================ @@ -31,20 +36,14 @@ const SESSION_STORE_CACHE = new Map(); const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s) function getSessionStoreTtl(): number { - // Allow runtime override via environment variable - const envTtl = process.env.CLAWDBOT_SESSION_CACHE_TTL_MS; - if (envTtl) { - const parsed = Number.parseInt(envTtl, 10); - if (Number.isFinite(parsed) && parsed >= 0) { - return parsed; - } - } - return DEFAULT_SESSION_STORE_TTL_MS; + return resolveCacheTtlMs({ + envValue: process.env.CLAWDBOT_SESSION_CACHE_TTL_MS, + defaultTtlMs: DEFAULT_SESSION_STORE_TTL_MS, + }); } function isSessionStoreCacheEnabled(): boolean { - const ttl = getSessionStoreTtl(); - return ttl > 0; + return isCacheEnabled(getSessionStoreTtl()); } function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean { @@ -53,14 +52,6 @@ function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean { return now - entry.loadedAt <= ttl; } -function getSessionStoreMtimeMs(storePath: string): number | undefined { - try { - return fs.statSync(storePath).mtimeMs; - } catch { - return undefined; - } -} - function invalidateSessionStoreCache(storePath: string): void { SESSION_STORE_CACHE.delete(storePath); } @@ -199,12 +190,12 @@ export function resolveSessionTranscriptPath( export function resolveSessionFilePath( sessionId: string, entry?: SessionEntry, - opts?: { agentId?: string; topicId?: number }, + opts?: { agentId?: string }, ): string { const candidate = entry?.sessionFile?.trim(); return candidate ? candidate - : resolveSessionTranscriptPath(sessionId, opts?.agentId, opts?.topicId); + : resolveSessionTranscriptPath(sessionId, opts?.agentId); } export function resolveStorePath(store?: string, opts?: { agentId?: string }) { @@ -402,7 +393,7 @@ export function loadSessionStore( if (isSessionStoreCacheEnabled()) { const cached = SESSION_STORE_CACHE.get(storePath); if (cached && isSessionStoreCacheValid(cached)) { - const currentMtimeMs = getSessionStoreMtimeMs(storePath); + const currentMtimeMs = getFileMtimeMs(storePath); if (currentMtimeMs === cached.mtimeMs) { // Return a shallow copy to prevent external mutations affecting cache return { ...cached.store }; @@ -413,14 +404,14 @@ export function loadSessionStore( // Cache miss or disabled - load from disk let store: Record = {}; - let mtimeMs = getSessionStoreMtimeMs(storePath); + let mtimeMs = getFileMtimeMs(storePath); try { const raw = fs.readFileSync(storePath, "utf-8"); const parsed = JSON5.parse(raw); if (parsed && typeof parsed === "object") { store = parsed as Record; } - mtimeMs = getSessionStoreMtimeMs(storePath) ?? mtimeMs; + mtimeMs = getFileMtimeMs(storePath) ?? mtimeMs; } catch { // ignore missing/invalid store; we'll recreate it }