import { randomUUID } from "node:crypto"; import fs from "node:fs"; import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../../agents/pi-embedded.js"; import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js"; import { clearSessionQueues } from "../../auto-reply/reply/queue.js"; import { loadConfig } from "../../config/config.js"; import { resolveMainSessionKey, type SessionEntry, updateSessionStore, } from "../../config/sessions.js"; import { ErrorCodes, errorShape, formatValidationErrors, validateSessionsCompactParams, validateSessionsDeleteParams, validateSessionsListParams, validateSessionsPatchParams, validateSessionsResetParams, validateSessionsResolveParams, } from "../protocol/index.js"; import { archiveFileOnDisk, listSessionsFromStore, loadCombinedSessionStoreForGateway, loadSessionEntry, resolveGatewaySessionStoreTarget, resolveSessionTranscriptCandidates, type SessionsPatchResult, } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; import type { GatewayRequestHandlers } from "./types.js"; export const sessionsHandlers: GatewayRequestHandlers = { "sessions.list": ({ params, respond }) => { if (!validateSessionsListParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`, ), ); return; } const p = params as import("../protocol/index.js").SessionsListParams; const cfg = loadConfig(); const { storePath, store } = loadCombinedSessionStoreForGateway(cfg); const result = listSessionsFromStore({ cfg, storePath, store, opts: p, }); respond(true, result, undefined); }, "sessions.resolve": ({ params, respond }) => { if (!validateSessionsResolveParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.resolve params: ${formatValidationErrors(validateSessionsResolveParams.errors)}`, ), ); return; } const p = params as import("../protocol/index.js").SessionsResolveParams; const cfg = loadConfig(); const resolved = resolveSessionKeyFromResolveParams({ cfg, p }); if (!resolved.ok) { respond(false, undefined, resolved.error); return; } respond(true, { ok: true, key: resolved.key }, undefined); }, "sessions.patch": async ({ params, respond, context }) => { if (!validateSessionsPatchParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`, ), ); return; } const p = params as import("../protocol/index.js").SessionsPatchParams; const key = String(p.key ?? "").trim(); if (!key) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return; } const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; const applied = await updateSessionStore(storePath, async (store) => { const primaryKey = target.storeKeys[0] ?? key; const existingKey = target.storeKeys.find((candidate) => store[candidate]); if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { store[primaryKey] = store[existingKey]; delete store[existingKey]; } return await applySessionsPatchToStore({ cfg, store, storeKey: primaryKey, patch: p, loadGatewayModelCatalog: context.loadGatewayModelCatalog, }); }); if (!applied.ok) { respond(false, undefined, applied.error); return; } const result: SessionsPatchResult = { ok: true, path: storePath, key: target.canonicalKey, entry: applied.entry, }; respond(true, result, undefined); }, "sessions.reset": async ({ params, respond }) => { if (!validateSessionsResetParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.reset params: ${formatValidationErrors(validateSessionsResetParams.errors)}`, ), ); return; } const p = params as import("../protocol/index.js").SessionsResetParams; const key = String(p.key ?? "").trim(); if (!key) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return; } const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; const next = await updateSessionStore(storePath, (store) => { const primaryKey = target.storeKeys[0] ?? key; const existingKey = target.storeKeys.find((candidate) => store[candidate]); if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { store[primaryKey] = store[existingKey]; delete store[existingKey]; } const entry = store[primaryKey]; const now = Date.now(); const nextEntry: SessionEntry = { sessionId: randomUUID(), updatedAt: now, systemSent: false, abortedLastRun: false, thinkingLevel: entry?.thinkingLevel, verboseLevel: entry?.verboseLevel, reasoningLevel: entry?.reasoningLevel, responseUsage: entry?.responseUsage, model: entry?.model, contextTokens: entry?.contextTokens, sendPolicy: entry?.sendPolicy, label: entry?.label, lastChannel: entry?.lastChannel, lastTo: entry?.lastTo, skillsSnapshot: entry?.skillsSnapshot, }; store[primaryKey] = nextEntry; return nextEntry; }); respond(true, { ok: true, key: target.canonicalKey, entry: next }, undefined); }, "sessions.delete": async ({ params, respond }) => { if (!validateSessionsDeleteParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.delete params: ${formatValidationErrors(validateSessionsDeleteParams.errors)}`, ), ); return; } const p = params as import("../protocol/index.js").SessionsDeleteParams; const key = String(p.key ?? "").trim(); if (!key) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return; } const cfg = loadConfig(); const mainKey = resolveMainSessionKey(cfg); const target = resolveGatewaySessionStoreTarget({ cfg, key }); if (target.canonicalKey === mainKey) { respond( false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, `Cannot delete the main session (${mainKey}).`), ); return; } const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; const storePath = target.storePath; const { entry } = loadSessionEntry(key); const sessionId = entry?.sessionId; const existed = Boolean(entry); const queueKeys = new Set(target.storeKeys); queueKeys.add(target.canonicalKey); if (sessionId) queueKeys.add(sessionId); clearSessionQueues([...queueKeys]); stopSubagentsForRequester({ cfg, requesterSessionKey: target.canonicalKey }); if (sessionId) { abortEmbeddedPiRun(sessionId); const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000); if (!ended) { respond( false, undefined, errorShape( ErrorCodes.UNAVAILABLE, `Session ${key} is still active; try again in a moment.`, ), ); return; } } await updateSessionStore(storePath, (store) => { const primaryKey = target.storeKeys[0] ?? key; const existingKey = target.storeKeys.find((candidate) => store[candidate]); if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { store[primaryKey] = store[existingKey]; delete store[existingKey]; } if (store[primaryKey]) delete store[primaryKey]; }); const archived: string[] = []; if (deleteTranscript && sessionId) { for (const candidate of resolveSessionTranscriptCandidates( sessionId, storePath, entry?.sessionFile, target.agentId, )) { if (!fs.existsSync(candidate)) continue; try { archived.push(archiveFileOnDisk(candidate, "deleted")); } catch { // Best-effort. } } } respond(true, { ok: true, key: target.canonicalKey, deleted: existed, archived }, undefined); }, "sessions.compact": async ({ params, respond }) => { if (!validateSessionsCompactParams(params)) { respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `invalid sessions.compact params: ${formatValidationErrors(validateSessionsCompactParams.errors)}`, ), ); return; } const p = params as import("../protocol/index.js").SessionsCompactParams; const key = String(p.key ?? "").trim(); if (!key) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return; } const maxLines = typeof p.maxLines === "number" && Number.isFinite(p.maxLines) ? Math.max(1, Math.floor(p.maxLines)) : 400; const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; // Lock + read in a short critical section; transcript work happens outside. const compactTarget = await updateSessionStore(storePath, (store) => { const primaryKey = target.storeKeys[0] ?? key; const existingKey = target.storeKeys.find((candidate) => store[candidate]); if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { store[primaryKey] = store[existingKey]; delete store[existingKey]; } return { entry: store[primaryKey], primaryKey }; }); const entry = compactTarget.entry; const sessionId = entry?.sessionId; if (!sessionId) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, reason: "no sessionId", }, undefined, ); return; } const filePath = resolveSessionTranscriptCandidates( sessionId, storePath, entry?.sessionFile, target.agentId, ).find((candidate) => fs.existsSync(candidate)); if (!filePath) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, reason: "no transcript", }, undefined, ); return; } const raw = fs.readFileSync(filePath, "utf-8"); const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0); if (lines.length <= maxLines) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, kept: lines.length, }, undefined, ); return; } const archived = archiveFileOnDisk(filePath, "bak"); const keptLines = lines.slice(-maxLines); fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); await updateSessionStore(storePath, (store) => { const entryKey = compactTarget.primaryKey; const entryToUpdate = store[entryKey]; if (!entryToUpdate) return; delete entryToUpdate.inputTokens; delete entryToUpdate.outputTokens; delete entryToUpdate.totalTokens; entryToUpdate.updatedAt = Date.now(); }); respond( true, { ok: true, key: target.canonicalKey, compacted: true, archived, kept: keptLines.length, }, undefined, ); }, };