fix: persist topic session files

This commit is contained in:
Peter Steinberger
2026-01-07 22:56:50 +00:00
parent 67d1f61872
commit b2de667b11
6 changed files with 84 additions and 37 deletions

View File

@@ -25,6 +25,7 @@ import type {
VerboseLevel, VerboseLevel,
} from "../auto-reply/thinking.js"; } from "../auto-reply/thinking.js";
import { formatToolAggregate } from "../auto-reply/tool-meta.js"; import { formatToolAggregate } from "../auto-reply/tool-meta.js";
import { isCacheEnabled, resolveCacheTtlMs } from "../config/cache-utils.js";
import type { ClawdbotConfig } from "../config/config.js"; import type { ClawdbotConfig } from "../config/config.js";
import { getMachineDisplayName } from "../infra/machine-name.js"; import { getMachineDisplayName } from "../infra/machine-name.js";
import { createSubsystemLogger } from "../logging.js"; import { createSubsystemLogger } from "../logging.js";
@@ -340,19 +341,14 @@ const SESSION_MANAGER_CACHE = new Map<string, SessionManagerCacheEntry>();
const DEFAULT_SESSION_MANAGER_TTL_MS = 45_000; // 45 seconds const DEFAULT_SESSION_MANAGER_TTL_MS = 45_000; // 45 seconds
function getSessionManagerTtl(): number { function getSessionManagerTtl(): number {
const envTtl = process.env.CLAWDBOT_SESSION_MANAGER_CACHE_TTL_MS; return resolveCacheTtlMs({
if (envTtl) { envValue: process.env.CLAWDBOT_SESSION_MANAGER_CACHE_TTL_MS,
const parsed = Number.parseInt(envTtl, 10); defaultTtlMs: DEFAULT_SESSION_MANAGER_TTL_MS,
if (Number.isFinite(parsed) && parsed >= 0) { });
return parsed;
}
}
return DEFAULT_SESSION_MANAGER_TTL_MS;
} }
function isSessionManagerCacheEnabled(): boolean { function isSessionManagerCacheEnabled(): boolean {
const ttl = getSessionManagerTtl(); return isCacheEnabled(getSessionManagerTtl());
return ttl > 0;
} }
function trackSessionManagerAccess(sessionFile: string): void { function trackSessionManagerAccess(sessionFile: string): void {

View File

@@ -722,9 +722,7 @@ export async function getReplyFromConfig(
resolvedThinkLevel = await modelState.resolveDefaultThinkingLevel(); resolvedThinkLevel = await modelState.resolveDefaultThinkingLevel();
} }
const sessionIdFinal = sessionId ?? crypto.randomUUID(); const sessionIdFinal = sessionId ?? crypto.randomUUID();
const sessionFile = resolveSessionFilePath(sessionIdFinal, sessionEntry, { const sessionFile = resolveSessionFilePath(sessionIdFinal, sessionEntry);
topicId: ctx.MessageThreadId,
});
const queueBodyBase = transcribedText const queueBodyBase = transcribedText
? [threadStarterNote, baseBodyFinal, `Transcript:\n${transcribedText}`] ? [threadStarterNote, baseBodyFinal, `Transcript:\n${transcribedText}`]
.filter(Boolean) .filter(Boolean)

View File

@@ -82,4 +82,31 @@ describe("initSessionState thread forking", () => {
}; };
expect(parsedHeader.parentSession).toBe(parentSessionFile); expect(parsedHeader.parentSession).toBe(parentSessionFile);
}); });
it("records topic-specific session files when MessageThreadId is present", async () => {
const root = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdbot-topic-session-"),
);
const storePath = path.join(root, "sessions.json");
const cfg = {
session: { store: storePath },
} as ClawdbotConfig;
const result = await initSessionState({
ctx: {
Body: "Hello topic",
SessionKey: "agent:main:telegram:group:123:topic:456",
MessageThreadId: 456,
},
cfg,
commandAuthorized: true,
});
const sessionFile = result.sessionEntry.sessionFile;
expect(sessionFile).toBeTruthy();
expect(path.basename(sessionFile ?? "")).toBe(
`${result.sessionEntry.sessionId}-topic-456.jsonl`,
);
});
}); });

View File

@@ -17,6 +17,7 @@ import {
resolveGroupSessionKey, resolveGroupSessionKey,
resolveSessionFilePath, resolveSessionFilePath,
resolveSessionKey, resolveSessionKey,
resolveSessionTranscriptPath,
resolveStorePath, resolveStorePath,
type SessionEntry, type SessionEntry,
type SessionScope, type SessionScope,
@@ -255,6 +256,13 @@ export async function initSessionState(params: {
sessionEntry.sessionFile = forked.sessionFile; sessionEntry.sessionFile = forked.sessionFile;
} }
} }
if (!sessionEntry.sessionFile) {
sessionEntry.sessionFile = resolveSessionTranscriptPath(
sessionEntry.sessionId,
agentId,
ctx.MessageThreadId,
);
}
sessionStore[sessionKey] = sessionEntry; sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore); await saveSessionStore(storePath, sessionStore);

27
src/config/cache-utils.ts Normal file
View File

@@ -0,0 +1,27 @@
import fs from "node:fs";
export function resolveCacheTtlMs(params: {
envValue: string | undefined;
defaultTtlMs: number;
}): number {
const { envValue, defaultTtlMs } = params;
if (envValue) {
const parsed = Number.parseInt(envValue, 10);
if (Number.isFinite(parsed) && parsed >= 0) {
return parsed;
}
}
return defaultTtlMs;
}
export function isCacheEnabled(ttlMs: number): boolean {
return ttlMs > 0;
}
export function getFileMtimeMs(filePath: string): number | undefined {
try {
return fs.statSync(filePath).mtimeMs;
} catch {
return undefined;
}
}

View File

@@ -14,6 +14,11 @@ import {
parseAgentSessionKey, parseAgentSessionKey,
} from "../routing/session-key.js"; } from "../routing/session-key.js";
import { normalizeE164 } from "../utils.js"; import { normalizeE164 } from "../utils.js";
import {
getFileMtimeMs,
isCacheEnabled,
resolveCacheTtlMs,
} from "./cache-utils.js";
import { resolveStateDir } from "./paths.js"; import { resolveStateDir } from "./paths.js";
// ============================================================================ // ============================================================================
@@ -31,20 +36,14 @@ const SESSION_STORE_CACHE = new Map<string, SessionStoreCacheEntry>();
const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s) const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s)
function getSessionStoreTtl(): number { function getSessionStoreTtl(): number {
// Allow runtime override via environment variable return resolveCacheTtlMs({
const envTtl = process.env.CLAWDBOT_SESSION_CACHE_TTL_MS; envValue: process.env.CLAWDBOT_SESSION_CACHE_TTL_MS,
if (envTtl) { defaultTtlMs: DEFAULT_SESSION_STORE_TTL_MS,
const parsed = Number.parseInt(envTtl, 10); });
if (Number.isFinite(parsed) && parsed >= 0) {
return parsed;
}
}
return DEFAULT_SESSION_STORE_TTL_MS;
} }
function isSessionStoreCacheEnabled(): boolean { function isSessionStoreCacheEnabled(): boolean {
const ttl = getSessionStoreTtl(); return isCacheEnabled(getSessionStoreTtl());
return ttl > 0;
} }
function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean { function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean {
@@ -53,14 +52,6 @@ function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean {
return now - entry.loadedAt <= ttl; return now - entry.loadedAt <= ttl;
} }
function getSessionStoreMtimeMs(storePath: string): number | undefined {
try {
return fs.statSync(storePath).mtimeMs;
} catch {
return undefined;
}
}
function invalidateSessionStoreCache(storePath: string): void { function invalidateSessionStoreCache(storePath: string): void {
SESSION_STORE_CACHE.delete(storePath); SESSION_STORE_CACHE.delete(storePath);
} }
@@ -199,12 +190,12 @@ export function resolveSessionTranscriptPath(
export function resolveSessionFilePath( export function resolveSessionFilePath(
sessionId: string, sessionId: string,
entry?: SessionEntry, entry?: SessionEntry,
opts?: { agentId?: string; topicId?: number }, opts?: { agentId?: string },
): string { ): string {
const candidate = entry?.sessionFile?.trim(); const candidate = entry?.sessionFile?.trim();
return candidate return candidate
? candidate ? candidate
: resolveSessionTranscriptPath(sessionId, opts?.agentId, opts?.topicId); : resolveSessionTranscriptPath(sessionId, opts?.agentId);
} }
export function resolveStorePath(store?: string, opts?: { agentId?: string }) { export function resolveStorePath(store?: string, opts?: { agentId?: string }) {
@@ -402,7 +393,7 @@ export function loadSessionStore(
if (isSessionStoreCacheEnabled()) { if (isSessionStoreCacheEnabled()) {
const cached = SESSION_STORE_CACHE.get(storePath); const cached = SESSION_STORE_CACHE.get(storePath);
if (cached && isSessionStoreCacheValid(cached)) { if (cached && isSessionStoreCacheValid(cached)) {
const currentMtimeMs = getSessionStoreMtimeMs(storePath); const currentMtimeMs = getFileMtimeMs(storePath);
if (currentMtimeMs === cached.mtimeMs) { if (currentMtimeMs === cached.mtimeMs) {
// Return a shallow copy to prevent external mutations affecting cache // Return a shallow copy to prevent external mutations affecting cache
return { ...cached.store }; return { ...cached.store };
@@ -413,14 +404,14 @@ export function loadSessionStore(
// Cache miss or disabled - load from disk // Cache miss or disabled - load from disk
let store: Record<string, SessionEntry> = {}; let store: Record<string, SessionEntry> = {};
let mtimeMs = getSessionStoreMtimeMs(storePath); let mtimeMs = getFileMtimeMs(storePath);
try { try {
const raw = fs.readFileSync(storePath, "utf-8"); const raw = fs.readFileSync(storePath, "utf-8");
const parsed = JSON5.parse(raw); const parsed = JSON5.parse(raw);
if (parsed && typeof parsed === "object") { if (parsed && typeof parsed === "object") {
store = parsed as Record<string, SessionEntry>; store = parsed as Record<string, SessionEntry>;
} }
mtimeMs = getSessionStoreMtimeMs(storePath) ?? mtimeMs; mtimeMs = getFileMtimeMs(storePath) ?? mtimeMs;
} catch { } catch {
// ignore missing/invalid store; we'll recreate it // ignore missing/invalid store; we'll recreate it
} }