import crypto from "node:crypto"; import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import type { Skill } from "@mariozechner/pi-coding-agent"; import JSON5 from "json5"; import type { MsgContext } from "../auto-reply/templating.js"; import { buildAgentMainSessionKey, DEFAULT_AGENT_ID, DEFAULT_MAIN_KEY, normalizeAgentId, parseAgentSessionKey, } from "../routing/session-key.js"; import { normalizeE164 } from "../utils.js"; import { getFileMtimeMs, isCacheEnabled, resolveCacheTtlMs, } from "./cache-utils.js"; import { resolveStateDir } from "./paths.js"; // ============================================================================ // Session Store Cache with TTL Support // ============================================================================ type SessionStoreCacheEntry = { store: Record; loadedAt: number; storePath: string; mtimeMs?: number; }; const SESSION_STORE_CACHE = new Map(); const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s) function getSessionStoreTtl(): number { return resolveCacheTtlMs({ envValue: process.env.CLAWDBOT_SESSION_CACHE_TTL_MS, defaultTtlMs: DEFAULT_SESSION_STORE_TTL_MS, }); } function isSessionStoreCacheEnabled(): boolean { return isCacheEnabled(getSessionStoreTtl()); } function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean { const now = Date.now(); const ttl = getSessionStoreTtl(); return now - entry.loadedAt <= ttl; } function invalidateSessionStoreCache(storePath: string): void { SESSION_STORE_CACHE.delete(storePath); } export function clearSessionStoreCacheForTest(): void { SESSION_STORE_CACHE.clear(); } export type SessionScope = "per-sender" | "global"; const GROUP_SURFACES = new Set([ "whatsapp", "telegram", "discord", "signal", "imessage", "webchat", "slack", ]); export type SessionChatType = "direct" | "group" | "room"; export type SessionEntry = { sessionId: string; updatedAt: number; sessionFile?: string; /** Parent session key that spawned this session (used for sandbox session-tool scoping). */ spawnedBy?: string; systemSent?: boolean; abortedLastRun?: boolean; chatType?: SessionChatType; thinkingLevel?: string; verboseLevel?: string; reasoningLevel?: string; elevatedLevel?: string; providerOverride?: string; modelOverride?: string; authProfileOverride?: string; groupActivation?: "mention" | "always"; groupActivationNeedsSystemIntro?: boolean; sendPolicy?: "allow" | "deny"; queueMode?: | "steer" | "followup" | "collect" | "steer-backlog" | "steer+backlog" | "queue" | "interrupt"; queueDebounceMs?: number; queueCap?: number; queueDrop?: "old" | "new" | "summarize"; inputTokens?: number; outputTokens?: number; totalTokens?: number; modelProvider?: string; model?: string; contextTokens?: number; compactionCount?: number; displayName?: string; provider?: string; subject?: string; room?: string; space?: string; lastProvider?: | "whatsapp" | "telegram" | "discord" | "slack" | "signal" | "imessage" | "webchat"; lastTo?: string; lastAccountId?: string; skillsSnapshot?: SessionSkillSnapshot; }; export type GroupKeyResolution = { key: string; legacyKey?: string; provider?: string; id?: string; chatType?: SessionChatType; }; export type SessionSkillSnapshot = { prompt: string; skills: Array<{ name: string; primaryEnv?: string }>; resolvedSkills?: Skill[]; }; function resolveAgentSessionsDir( agentId?: string, env: NodeJS.ProcessEnv = process.env, homedir: () => string = os.homedir, ): string { const root = resolveStateDir(env, homedir); const id = normalizeAgentId(agentId ?? DEFAULT_AGENT_ID); return path.join(root, "agents", id, "sessions"); } export function resolveSessionTranscriptsDir( env: NodeJS.ProcessEnv = process.env, homedir: () => string = os.homedir, ): string { return resolveAgentSessionsDir(DEFAULT_AGENT_ID, env, homedir); } export function resolveSessionTranscriptsDirForAgent( agentId?: string, env: NodeJS.ProcessEnv = process.env, homedir: () => string = os.homedir, ): string { return resolveAgentSessionsDir(agentId, env, homedir); } export function resolveDefaultSessionStorePath(agentId?: string): string { return path.join(resolveAgentSessionsDir(agentId), "sessions.json"); } export const DEFAULT_RESET_TRIGGER = "/new"; export const DEFAULT_RESET_TRIGGERS = ["/new", "/reset"]; export const DEFAULT_IDLE_MINUTES = 60; export function resolveSessionTranscriptPath( sessionId: string, agentId?: string, topicId?: number, ): string { const fileName = topicId !== undefined ? `${sessionId}-topic-${topicId}.jsonl` : `${sessionId}.jsonl`; return path.join(resolveAgentSessionsDir(agentId), fileName); } export function resolveSessionFilePath( sessionId: string, entry?: SessionEntry, opts?: { agentId?: string }, ): string { const candidate = entry?.sessionFile?.trim(); return candidate ? candidate : resolveSessionTranscriptPath(sessionId, opts?.agentId); } export function resolveStorePath(store?: string, opts?: { agentId?: string }) { const agentId = normalizeAgentId(opts?.agentId ?? DEFAULT_AGENT_ID); if (!store) return resolveDefaultSessionStorePath(agentId); if (store.includes("{agentId}")) { const expanded = store.replaceAll("{agentId}", agentId); if (expanded.startsWith("~")) { return path.resolve(expanded.replace(/^~(?=$|[\\/])/, os.homedir())); } return path.resolve(expanded); } if (store.startsWith("~")) return path.resolve(store.replace(/^~(?=$|[\\/])/, os.homedir())); return path.resolve(store); } export function resolveMainSessionKey(cfg?: { session?: { scope?: SessionScope; mainKey?: string }; routing?: { defaultAgentId?: string }; }): string { if (cfg?.session?.scope === "global") return "global"; const agentId = normalizeAgentId( cfg?.routing?.defaultAgentId ?? DEFAULT_AGENT_ID, ); const mainKey = (cfg?.session?.mainKey ?? DEFAULT_MAIN_KEY).trim() || DEFAULT_MAIN_KEY; return buildAgentMainSessionKey({ agentId, mainKey }); } export function resolveAgentIdFromSessionKey( sessionKey?: string | null, ): string { const parsed = parseAgentSessionKey(sessionKey); return normalizeAgentId(parsed?.agentId ?? DEFAULT_AGENT_ID); } export function resolveAgentMainSessionKey(params: { cfg?: { session?: { mainKey?: string } }; agentId: string; }): string { const mainKey = (params.cfg?.session?.mainKey ?? DEFAULT_MAIN_KEY).trim() || DEFAULT_MAIN_KEY; return buildAgentMainSessionKey({ agentId: params.agentId, mainKey }); } function normalizeGroupLabel(raw?: string) { const trimmed = raw?.trim().toLowerCase() ?? ""; if (!trimmed) return ""; const dashed = trimmed.replace(/\s+/g, "-"); const cleaned = dashed.replace(/[^a-z0-9#@._+-]+/g, "-"); return cleaned.replace(/-{2,}/g, "-").replace(/^[-.]+|[-.]+$/g, ""); } function shortenGroupId(value?: string) { const trimmed = value?.trim() ?? ""; if (!trimmed) return ""; if (trimmed.length <= 14) return trimmed; return `${trimmed.slice(0, 6)}...${trimmed.slice(-4)}`; } export function buildGroupDisplayName(params: { provider?: string; subject?: string; room?: string; space?: string; id?: string; key: string; }) { const providerKey = (params.provider?.trim().toLowerCase() || "group").trim(); const room = params.room?.trim(); const space = params.space?.trim(); const subject = params.subject?.trim(); const detail = (room && space ? `${space}${room.startsWith("#") ? "" : "#"}${room}` : room || subject || space || "") || ""; const fallbackId = params.id?.trim() || params.key.replace(/^group:/, ""); const rawLabel = detail || fallbackId; let token = normalizeGroupLabel(rawLabel); if (!token) { token = normalizeGroupLabel(shortenGroupId(rawLabel)); } if (!params.room && token.startsWith("#")) { token = token.replace(/^#+/, ""); } if ( token && !/^[@#]/.test(token) && !token.startsWith("g-") && !token.includes("#") ) { token = `g-${token}`; } return token ? `${providerKey}:${token}` : providerKey; } export function resolveGroupSessionKey( ctx: MsgContext, ): GroupKeyResolution | null { const from = typeof ctx.From === "string" ? ctx.From.trim() : ""; if (!from) return null; const chatType = ctx.ChatType?.trim().toLowerCase(); const isGroup = chatType === "group" || from.startsWith("group:") || from.includes("@g.us") || from.includes(":group:") || from.includes(":channel:"); if (!isGroup) return null; const providerHint = ctx.Provider?.trim().toLowerCase(); const hasLegacyGroupPrefix = from.startsWith("group:"); const raw = ( hasLegacyGroupPrefix ? from.slice("group:".length) : from ).trim(); let provider: string | undefined; let kind: "group" | "channel" | undefined; let id = ""; const parseKind = (value: string) => { if (value === "channel") return "channel"; return "group"; }; const parseParts = (parts: string[]) => { if (parts.length >= 2 && GROUP_SURFACES.has(parts[0])) { provider = parts[0]; if (parts.length >= 3) { const kindCandidate = parts[1]; if (["group", "channel"].includes(kindCandidate)) { kind = parseKind(kindCandidate); id = parts.slice(2).join(":"); } else { id = parts.slice(1).join(":"); } } else { id = parts[1]; } return; } if (parts.length >= 2 && ["group", "channel"].includes(parts[0])) { kind = parseKind(parts[0]); id = parts.slice(1).join(":"); } }; if (hasLegacyGroupPrefix) { const legacyParts = raw.split(":").filter(Boolean); if (legacyParts.length > 1) { parseParts(legacyParts); } else { id = raw; } } else if (from.includes("@g.us") && !from.includes(":")) { id = from; } else { parseParts(from.split(":").filter(Boolean)); if (!id) { id = raw || from; } } const resolvedProvider = provider ?? providerHint; if (!resolvedProvider) { const legacy = hasLegacyGroupPrefix ? `group:${raw}` : `group:${from}`; return { key: legacy, id: raw || from, legacyKey: legacy, chatType: "group", }; } const resolvedKind = kind === "channel" ? "channel" : "group"; const key = `${resolvedProvider}:${resolvedKind}:${id || raw || from}`; let legacyKey: string | undefined; if (hasLegacyGroupPrefix || from.includes("@g.us")) { legacyKey = `group:${id || raw || from}`; } return { key, legacyKey, provider: resolvedProvider, id: id || raw || from, chatType: resolvedKind === "channel" ? "room" : "group", }; } export function loadSessionStore( storePath: string, ): Record { // Check cache first if enabled if (isSessionStoreCacheEnabled()) { const cached = SESSION_STORE_CACHE.get(storePath); if (cached && isSessionStoreCacheValid(cached)) { const currentMtimeMs = getFileMtimeMs(storePath); if (currentMtimeMs === cached.mtimeMs) { // Return a shallow copy to prevent external mutations affecting cache return { ...cached.store }; } invalidateSessionStoreCache(storePath); } } // Cache miss or disabled - load from disk let store: Record = {}; let mtimeMs = getFileMtimeMs(storePath); try { const raw = fs.readFileSync(storePath, "utf-8"); const parsed = JSON5.parse(raw); if (parsed && typeof parsed === "object") { store = parsed as Record; } mtimeMs = getFileMtimeMs(storePath) ?? mtimeMs; } catch { // ignore missing/invalid store; we'll recreate it } // Cache the result if caching is enabled if (isSessionStoreCacheEnabled()) { SESSION_STORE_CACHE.set(storePath, { store: { ...store }, // Store a copy to prevent external mutations loadedAt: Date.now(), storePath, mtimeMs, }); } return store; } export async function saveSessionStore( storePath: string, store: Record, ) { // Invalidate cache on write to ensure consistency invalidateSessionStoreCache(storePath); await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); const json = JSON.stringify(store, null, 2); const tmp = `${storePath}.${process.pid}.${crypto.randomUUID()}.tmp`; try { await fs.promises.writeFile(tmp, json, "utf-8"); await fs.promises.rename(tmp, storePath); } catch (err) { const code = err && typeof err === "object" && "code" in err ? String((err as { code?: unknown }).code) : null; if (code === "ENOENT") { // In tests the temp session-store directory may be deleted while writes are in-flight. // Best-effort: try a direct write (recreating the parent dir), otherwise ignore. try { await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); await fs.promises.writeFile(storePath, json, "utf-8"); } catch (err2) { const code2 = err2 && typeof err2 === "object" && "code" in err2 ? String((err2 as { code?: unknown }).code) : null; if (code2 === "ENOENT") return; throw err2; } return; } throw err; } finally { await fs.promises.rm(tmp, { force: true }); } } export async function updateLastRoute(params: { storePath: string; sessionKey: string; provider: SessionEntry["lastProvider"]; to?: string; accountId?: string; }) { const { storePath, sessionKey, provider, to, accountId } = params; const store = loadSessionStore(storePath); const existing = store[sessionKey]; const now = Date.now(); const next: SessionEntry = { sessionId: existing?.sessionId ?? crypto.randomUUID(), updatedAt: Math.max(existing?.updatedAt ?? 0, now), sessionFile: existing?.sessionFile, systemSent: existing?.systemSent, abortedLastRun: existing?.abortedLastRun, thinkingLevel: existing?.thinkingLevel, verboseLevel: existing?.verboseLevel, providerOverride: existing?.providerOverride, modelOverride: existing?.modelOverride, sendPolicy: existing?.sendPolicy, queueMode: existing?.queueMode, inputTokens: existing?.inputTokens, outputTokens: existing?.outputTokens, totalTokens: existing?.totalTokens, modelProvider: existing?.modelProvider, model: existing?.model, contextTokens: existing?.contextTokens, displayName: existing?.displayName, chatType: existing?.chatType, provider: existing?.provider, subject: existing?.subject, room: existing?.room, space: existing?.space, skillsSnapshot: existing?.skillsSnapshot, lastProvider: provider, lastTo: to?.trim() ? to.trim() : undefined, lastAccountId: accountId?.trim() ? accountId.trim() : existing?.lastAccountId, }; store[sessionKey] = next; await saveSessionStore(storePath, store); return next; } // Decide which session bucket to use (per-sender vs global). export function deriveSessionKey(scope: SessionScope, ctx: MsgContext) { if (scope === "global") return "global"; const resolvedGroup = resolveGroupSessionKey(ctx); if (resolvedGroup) return resolvedGroup.key; const from = ctx.From ? normalizeE164(ctx.From) : ""; return from || "unknown"; } /** * Resolve the session key with a canonical direct-chat bucket (default: "main"). * All non-group direct chats collapse to this bucket; groups stay isolated. */ export function resolveSessionKey( scope: SessionScope, ctx: MsgContext, mainKey?: string, ) { const explicit = ctx.SessionKey?.trim(); if (explicit) return explicit; const raw = deriveSessionKey(scope, ctx); if (scope === "global") return raw; const canonicalMainKey = (mainKey ?? DEFAULT_MAIN_KEY).trim() || DEFAULT_MAIN_KEY; const canonical = buildAgentMainSessionKey({ agentId: DEFAULT_AGENT_ID, mainKey: canonicalMainKey, }); const isGroup = raw.startsWith("group:") || raw.includes(":group:") || raw.includes(":channel:"); if (!isGroup) return canonical; return `agent:${DEFAULT_AGENT_ID}:${raw}`; }