From 688a0ce4398ddaac01c372c0b7fb5195c37b71a7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 15 Jan 2026 23:06:42 +0000 Subject: [PATCH] refactor: harden session store updates Co-authored-by: Tyler Yust --- src/agents/tools/session-status-tool.ts | 6 +- src/auto-reply/reply/abort.ts | 10 +- .../reply/agent-runner-execution.ts | 10 +- src/auto-reply/reply/agent-runner.ts | 18 ++- src/auto-reply/reply/body.ts | 13 +- src/auto-reply/reply/commands-session.ts | 18 ++- .../reply/directive-handling.impl.ts | 6 +- .../reply/directive-handling.persist.ts | 6 +- src/auto-reply/reply/get-reply-run.ts | 6 +- src/auto-reply/reply/model-selection.ts | 10 +- src/auto-reply/reply/session-updates.ts | 18 ++- src/auto-reply/reply/session.ts | 21 ++- src/commands/agent.ts | 22 ++- src/commands/agent/session-store.ts | 6 +- src/config/sessions.test.ts | 51 ++++++ src/config/sessions/store.ts | 26 +++- src/cron/isolated-agent/run.ts | 14 +- src/gateway/server-bridge-events.ts | 56 +++---- src/gateway/server-bridge-methods-sessions.ts | 145 +++++++++++------- src/gateway/server-methods/agent.ts | 13 +- src/gateway/server-methods/sessions.ts | 145 +++++++++--------- src/gateway/session-utils.ts | 5 +- src/infra/heartbeat-runner.ts | 11 +- src/web/auto-reply/heartbeat-runner.ts | 20 ++- 24 files changed, 441 insertions(+), 215 deletions(-) diff --git a/src/agents/tools/session-status-tool.ts b/src/agents/tools/session-status-tool.ts index 6a3250130..612b56c67 100644 --- a/src/agents/tools/session-status-tool.ts +++ b/src/agents/tools/session-status-tool.ts @@ -25,7 +25,7 @@ import { loadSessionStore, resolveStorePath, type SessionEntry, - saveSessionStore, + updateSessionStore, } from "../../config/sessions.js"; import { formatUsageSummaryLine, @@ -263,7 +263,9 @@ export function createSessionStatusTool(opts?: { delete nextEntry.authProfileOverride; } store[resolved.key] = nextEntry; - await saveSessionStore(storePath, store); + await updateSessionStore(storePath, (nextStore) => { + nextStore[resolved.key] = nextEntry; + }); resolved.entry = nextEntry; changedModel = true; } diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index 32bdca299..43af0f8e1 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -5,7 +5,7 @@ import { loadSessionStore, resolveStorePath, type SessionEntry, - saveSessionStore, + updateSessionStore, } from "../../config/sessions.js"; import { parseAgentSessionKey } from "../../routing/session-key.js"; import { resolveCommandAuthorization } from "../command-auth.js"; @@ -90,7 +90,13 @@ export async function tryFastAbortFromMessage(params: { entry.abortedLastRun = true; entry.updatedAt = Date.now(); store[key] = entry; - await saveSessionStore(storePath, store); + await updateSessionStore(storePath, (nextStore) => { + const nextEntry = nextStore[key] ?? entry; + if (!nextEntry) return; + nextEntry.abortedLastRun = true; + nextEntry.updatedAt = Date.now(); + nextStore[key] = nextEntry; + }); } else if (abortKey) { setAbortMemory(abortKey, true); } diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index d1f0e276d..f9129566a 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -14,7 +14,7 @@ import { resolveAgentIdFromSessionKey, resolveSessionTranscriptPath, type SessionEntry, - saveSessionStore, + updateSessionStore, } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; @@ -383,6 +383,7 @@ export async function runAgentTurnWithFallback(params: { params.activeSessionStore && params.storePath ) { + const sessionKey = params.sessionKey; const corruptedSessionId = params.getActiveSessionEntry()?.sessionId; defaultRuntime.error( `Session history corrupted (Gemini function call ordering). Resetting session: ${params.sessionKey}`, @@ -399,9 +400,10 @@ export async function runAgentTurnWithFallback(params: { } } - // Remove session entry from store - delete params.activeSessionStore[params.sessionKey]; - await saveSessionStore(params.storePath, params.activeSessionStore); + // Remove session entry from store using a fresh, locked snapshot. + await updateSessionStore(params.storePath, (store) => { + delete store[sessionKey]; + }); } catch (cleanupErr) { defaultRuntime.error( `Failed to reset corrupted session ${params.sessionKey}: ${String(cleanupErr)}`, diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 590fc0edc..e747a9b51 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -10,7 +10,7 @@ import { resolveAgentIdFromSessionKey, resolveSessionTranscriptPath, type SessionEntry, - saveSessionStore, + updateSessionStore, updateSessionStoreEntry, } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; @@ -156,7 +156,9 @@ export async function runReplyAgent(params: { activeSessionEntry.updatedAt = Date.now(); activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - await saveSessionStore(storePath, activeSessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = activeSessionEntry as SessionEntry; + }); } } typing.cleanup(); @@ -170,7 +172,9 @@ export async function runReplyAgent(params: { activeSessionEntry.updatedAt = Date.now(); activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - await saveSessionStore(storePath, activeSessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = activeSessionEntry as SessionEntry; + }); } } typing.cleanup(); @@ -224,7 +228,9 @@ export async function runReplyAgent(params: { nextEntry.sessionFile = nextSessionFile; activeSessionStore[sessionKey] = nextEntry; try { - await saveSessionStore(storePath, activeSessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = nextEntry; + }); } catch (err) { defaultRuntime.error( `Failed to persist session reset after compaction failure (${sessionKey}): ${String(err)}`, @@ -280,7 +286,9 @@ export async function runReplyAgent(params: { activeSessionEntry.updatedAt = Date.now(); activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - await saveSessionStore(storePath, activeSessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = activeSessionEntry as SessionEntry; + }); } } diff --git a/src/auto-reply/reply/body.ts b/src/auto-reply/reply/body.ts index c6f0bcbaf..ffac41987 100644 --- a/src/auto-reply/reply/body.ts +++ b/src/auto-reply/reply/body.ts @@ -1,5 +1,5 @@ import type { SessionEntry } from "../../config/sessions.js"; -import { saveSessionStore } from "../../config/sessions.js"; +import { updateSessionStore } from "../../config/sessions.js"; import { setAbortMemory } from "./abort.js"; export async function applySessionHints(params: { @@ -23,7 +23,16 @@ export async function applySessionHints(params: { params.sessionEntry.updatedAt = Date.now(); params.sessionStore[params.sessionKey] = params.sessionEntry; if (params.storePath) { - await saveSessionStore(params.storePath, params.sessionStore); + const sessionKey = params.sessionKey; + await updateSessionStore(params.storePath, (store) => { + const entry = store[sessionKey] ?? params.sessionEntry; + if (!entry) return; + store[sessionKey] = { + ...entry, + abortedLastRun: false, + updatedAt: Date.now(), + }; + }); } } else if (params.abortKey) { setAbortMemory(params.abortKey, false); diff --git a/src/auto-reply/reply/commands-session.ts b/src/auto-reply/reply/commands-session.ts index 8a3f8fa4a..1adde3b5f 100644 --- a/src/auto-reply/reply/commands-session.ts +++ b/src/auto-reply/reply/commands-session.ts @@ -1,6 +1,6 @@ import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js"; import type { SessionEntry } from "../../config/sessions.js"; -import { saveSessionStore } from "../../config/sessions.js"; +import { updateSessionStore } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { scheduleGatewaySigusr1Restart, triggerClawdbotRestart } from "../../infra/restart.js"; import { parseAgentSessionKey } from "../../routing/session-key.js"; @@ -71,7 +71,9 @@ export const handleActivationCommand: CommandHandler = async (params, allowTextC params.sessionEntry.updatedAt = Date.now(); params.sessionStore[params.sessionKey] = params.sessionEntry; if (params.storePath) { - await saveSessionStore(params.storePath, params.sessionStore); + await updateSessionStore(params.storePath, (store) => { + store[params.sessionKey] = params.sessionEntry as SessionEntry; + }); } } return { @@ -107,7 +109,9 @@ export const handleSendPolicyCommand: CommandHandler = async (params, allowTextC params.sessionEntry.updatedAt = Date.now(); params.sessionStore[params.sessionKey] = params.sessionEntry; if (params.storePath) { - await saveSessionStore(params.storePath, params.sessionStore); + await updateSessionStore(params.storePath, (store) => { + store[params.sessionKey] = params.sessionEntry as SessionEntry; + }); } } const label = @@ -190,7 +194,9 @@ export const handleStopCommand: CommandHandler = async (params, allowTextCommand abortTarget.entry.updatedAt = Date.now(); params.sessionStore[abortTarget.key] = abortTarget.entry; if (params.storePath) { - await saveSessionStore(params.storePath, params.sessionStore); + await updateSessionStore(params.storePath, (store) => { + store[abortTarget.key] = abortTarget.entry as SessionEntry; + }); } } else if (params.command.abortKey) { setAbortMemory(params.command.abortKey, true); @@ -215,7 +221,9 @@ export const handleAbortTrigger: CommandHandler = async (params, allowTextComman abortTarget.entry.updatedAt = Date.now(); params.sessionStore[abortTarget.key] = abortTarget.entry; if (params.storePath) { - await saveSessionStore(params.storePath, params.sessionStore); + await updateSessionStore(params.storePath, (store) => { + store[abortTarget.key] = abortTarget.entry as SessionEntry; + }); } } else if (params.command.abortKey) { setAbortMemory(params.command.abortKey, true); diff --git a/src/auto-reply/reply/directive-handling.impl.ts b/src/auto-reply/reply/directive-handling.impl.ts index 752e8984e..8f1e949ed 100644 --- a/src/auto-reply/reply/directive-handling.impl.ts +++ b/src/auto-reply/reply/directive-handling.impl.ts @@ -2,7 +2,7 @@ import { resolveAgentDir, resolveSessionAgentId } from "../../agents/agent-scope import type { ModelAliasIndex } from "../../agents/model-selection.js"; import { resolveSandboxRuntimeStatus } from "../../agents/sandbox.js"; import type { ClawdbotConfig } from "../../config/config.js"; -import { type SessionEntry, saveSessionStore } from "../../config/sessions.js"; +import { type SessionEntry, updateSessionStore } from "../../config/sessions.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { applyVerboseOverride } from "../../sessions/level-overrides.js"; import { formatThinkingLevels, formatXHighModelHint, supportsXHighThinking } from "../thinking.js"; @@ -288,7 +288,9 @@ export async function handleDirectiveOnly(params: { sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = sessionEntry; + }); } if (modelSelection) { const nextLabel = `${modelSelection.provider}/${modelSelection.model}`; diff --git a/src/auto-reply/reply/directive-handling.persist.ts b/src/auto-reply/reply/directive-handling.persist.ts index 27bfdcc28..d2a4d1d0e 100644 --- a/src/auto-reply/reply/directive-handling.persist.ts +++ b/src/auto-reply/reply/directive-handling.persist.ts @@ -14,7 +14,7 @@ import { resolveModelRefFromString, } from "../../agents/model-selection.js"; import type { ClawdbotConfig } from "../../config/config.js"; -import { type SessionEntry, saveSessionStore } from "../../config/sessions.js"; +import { type SessionEntry, updateSessionStore } from "../../config/sessions.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { applyVerboseOverride } from "../../sessions/level-overrides.js"; import { resolveProfileOverride } from "./directive-handling.auth.js"; @@ -184,7 +184,9 @@ export async function persistInlineDirectives(params: { sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = sessionEntry; + }); } if (elevatedChanged) { const nextElevated = (sessionEntry.elevatedLevel ?? "off") as ElevatedLevel; diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index c5287bca8..0691b7b9b 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -9,7 +9,7 @@ import type { ClawdbotConfig } from "../../config/config.js"; import { resolveSessionFilePath, type SessionEntry, - saveSessionStore, + updateSessionStore, } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { clearCommandLane, getQueueSize } from "../../process/command-queue.js"; @@ -276,7 +276,9 @@ export async function runPreparedReply( sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = sessionEntry; + }); } } } diff --git a/src/auto-reply/reply/model-selection.ts b/src/auto-reply/reply/model-selection.ts index 96929ee62..b8d8333df 100644 --- a/src/auto-reply/reply/model-selection.ts +++ b/src/auto-reply/reply/model-selection.ts @@ -10,7 +10,7 @@ import { resolveThinkingDefault, } from "../../agents/model-selection.js"; import type { ClawdbotConfig } from "../../config/config.js"; -import { type SessionEntry, saveSessionStore } from "../../config/sessions.js"; +import { type SessionEntry, updateSessionStore } from "../../config/sessions.js"; import type { ThinkLevel } from "./directives.js"; export type ModelDirectiveSelection = { @@ -189,7 +189,9 @@ export async function createModelSelectionState(params: { sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = sessionEntry; + }); } resetModelOverride = true; } @@ -218,7 +220,9 @@ export async function createModelSelectionState(params: { sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = sessionEntry; + }); } } } diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index c4149e646..c9480d846 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -2,7 +2,7 @@ import crypto from "node:crypto"; import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js"; import type { ClawdbotConfig } from "../../config/config.js"; -import { type SessionEntry, saveSessionStore } from "../../config/sessions.js"; +import { type SessionEntry, updateSessionStore } from "../../config/sessions.js"; import { buildChannelSummary } from "../../infra/channel-summary.js"; import { drainSystemEventEntries } from "../../infra/system-events.js"; @@ -111,7 +111,9 @@ export async function ensureSkillSnapshot(params: { }; sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...nextEntry }; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = { ...store[sessionKey], ...nextEntry }; + }); } systemSent = true; } @@ -143,7 +145,9 @@ export async function ensureSkillSnapshot(params: { }; sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...nextEntry }; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = { ...store[sessionKey], ...nextEntry }; + }); } } @@ -168,7 +172,13 @@ export async function incrementCompactionCount(params: { updatedAt: now, }; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = { + ...store[sessionKey], + compactionCount: nextCount, + updatedAt: now, + }; + }); } return nextCount; } diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index e8b873161..e28ea4dea 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -20,7 +20,7 @@ import { resolveStorePath, type SessionEntry, type SessionScope, - saveSessionStore, + updateSessionStore, } from "../../config/sessions.js"; import { normalizeMainKey } from "../../routing/session-key.js"; import { resolveCommandAuthorization } from "../command-auth.js"; @@ -188,6 +188,11 @@ export async function initSessionState(params: { } const baseEntry = !isNewSession && freshEntry ? entry : undefined; + // Track the originating channel/to for announce routing (subagent announce-back). + const lastChannel = + (ctx.OriginatingChannel as string | undefined)?.trim() || baseEntry?.lastChannel; + const lastTo = ctx.OriginatingTo?.trim() || ctx.To?.trim() || baseEntry?.lastTo; + const lastAccountId = ctx.AccountId?.trim() || baseEntry?.lastAccountId; sessionEntry = { ...baseEntry, sessionId, @@ -212,6 +217,10 @@ export async function initSessionState(params: { subject: baseEntry?.subject, room: baseEntry?.room, space: baseEntry?.space, + // Track originating channel for subagent announce routing. + lastChannel, + lastTo, + lastAccountId, }; if (groupResolution?.channel) { const channel = groupResolution.channel; @@ -270,7 +279,15 @@ export async function initSessionState(params: { ); } sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry }; - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + if (groupResolution?.legacyKey && groupResolution.legacyKey !== sessionKey) { + if (store[groupResolution.legacyKey] && !store[sessionKey]) { + store[sessionKey] = store[groupResolution.legacyKey]; + } + delete store[groupResolution.legacyKey]; + } + store[sessionKey] = { ...store[sessionKey], ...sessionEntry }; + }); const sessionCtx: TemplateContext = { ...ctx, diff --git a/src/commands/agent.ts b/src/commands/agent.ts index afd3ea4c8..1f3ad44c6 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -36,7 +36,7 @@ import { resolveAgentIdFromSessionKey, resolveSessionFilePath, type SessionEntry, - saveSessionStore, + updateSessionStore, } from "../config/sessions.js"; import { clearAgentRunContext, @@ -173,7 +173,9 @@ export async function agentCommand( skillsSnapshot, }; sessionStore[sessionKey] = next; - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = next; + }); sessionEntry = next; } @@ -188,7 +190,9 @@ export async function agentCommand( } applyVerboseOverride(next, verboseOverride); sessionStore[sessionKey] = next; - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = next; + }); } const agentModelPrimary = resolveAgentModelPrimary(cfg, sessionAgentId); @@ -252,7 +256,9 @@ export async function agentCommand( delete sessionEntry.modelOverride; sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = sessionEntry; + }); } } } @@ -279,7 +285,9 @@ export async function agentCommand( sessionEntry.updatedAt = Date.now(); if (sessionStore && sessionKey) { sessionStore[sessionKey] = sessionEntry; - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = sessionEntry; + }); } } } @@ -307,7 +315,9 @@ export async function agentCommand( sessionEntry.thinkingLevel = "high"; sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = sessionEntry; + }); } } const sessionFile = resolveSessionFilePath(sessionId, sessionEntry, { diff --git a/src/commands/agent/session-store.ts b/src/commands/agent/session-store.ts index 9fdb6870e..2c70a5039 100644 --- a/src/commands/agent/session-store.ts +++ b/src/commands/agent/session-store.ts @@ -4,7 +4,7 @@ import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { isCliProvider } from "../../agents/model-selection.js"; import { hasNonzeroUsage } from "../../agents/usage.js"; import type { ClawdbotConfig } from "../../config/config.js"; -import { type SessionEntry, saveSessionStore } from "../../config/sessions.js"; +import { type SessionEntry, updateSessionStore } from "../../config/sessions.js"; type RunResult = Awaited< ReturnType<(typeof import("../../agents/pi-embedded.js"))["runEmbeddedPiAgent"]> @@ -68,5 +68,7 @@ export async function updateSessionStoreAfterAgentRun(params: { next.totalTokens = promptTokens > 0 ? promptTokens : (usage.total ?? input); } sessionStore[sessionKey] = next; - await saveSessionStore(storePath, sessionStore); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = next; + }); } diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index d3d8e8d3e..bc94dce3e 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -12,6 +12,7 @@ import { resolveSessionTranscriptPath, resolveSessionTranscriptsDir, updateLastRoute, + updateSessionStore, updateSessionStoreEntry, } from "./sessions.js"; @@ -137,6 +138,56 @@ describe("sessions", () => { expect(store[mainSessionKey]?.compactionCount).toBe(2); }); + it("updateSessionStore preserves concurrent additions", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-")); + const storePath = path.join(dir, "sessions.json"); + await fs.writeFile(storePath, "{}", "utf-8"); + + await Promise.all([ + updateSessionStore(storePath, (store) => { + store["agent:main:one"] = { sessionId: "sess-1", updatedAt: 1 }; + }), + updateSessionStore(storePath, (store) => { + store["agent:main:two"] = { sessionId: "sess-2", updatedAt: 2 }; + }), + ]); + + const store = loadSessionStore(storePath); + expect(store["agent:main:one"]?.sessionId).toBe("sess-1"); + expect(store["agent:main:two"]?.sessionId).toBe("sess-2"); + }); + + it("updateSessionStore keeps deletions when concurrent writes happen", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-")); + const storePath = path.join(dir, "sessions.json"); + await fs.writeFile( + storePath, + JSON.stringify( + { + "agent:main:old": { sessionId: "sess-old", updatedAt: 1 }, + "agent:main:keep": { sessionId: "sess-keep", updatedAt: 2 }, + }, + null, + 2, + ), + "utf-8", + ); + + await Promise.all([ + updateSessionStore(storePath, (store) => { + delete store["agent:main:old"]; + }), + updateSessionStore(storePath, (store) => { + store["agent:main:new"] = { sessionId: "sess-new", updatedAt: 3 }; + }), + ]); + + const store = loadSessionStore(storePath); + expect(store["agent:main:old"]).toBeUndefined(); + expect(store["agent:main:keep"]?.sessionId).toBe("sess-keep"); + expect(store["agent:main:new"]?.sessionId).toBe("sess-new"); + }); + it("loadSessionStore auto-migrates legacy provider keys to channel keys", async () => { const mainSessionKey = "agent:main:main"; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-")); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 6f993b6aa..c8b5e403a 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -45,9 +45,16 @@ export function clearSessionStoreCacheForTest(): void { SESSION_STORE_CACHE.clear(); } -export function loadSessionStore(storePath: string): Record { +type LoadSessionStoreOptions = { + skipCache?: boolean; +}; + +export function loadSessionStore( + storePath: string, + opts: LoadSessionStoreOptions = {}, +): Record { // Check cache first if enabled - if (isSessionStoreCacheEnabled()) { + if (!opts.skipCache && isSessionStoreCacheEnabled()) { const cached = SESSION_STORE_CACHE.get(storePath); if (cached && isSessionStoreCacheValid(cached)) { const currentMtimeMs = getFileMtimeMs(storePath); @@ -88,7 +95,7 @@ export function loadSessionStore(storePath: string): Record( + storePath: string, + mutator: (store: Record) => Promise | T, +): Promise { + return await withSessionStoreLock(storePath, async () => { + // Always re-read inside the lock to avoid clobbering concurrent writers. + const store = loadSessionStore(storePath, { skipCache: true }); + const result = await mutator(store); + await saveSessionStoreUnlocked(storePath, store); + return result; + }); +} + type SessionStoreLockOptions = { timeoutMs?: number; pollIntervalMs?: number; diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 3654107f9..3d8b3e862 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -30,7 +30,7 @@ import { } from "../../auto-reply/thinking.js"; import type { CliDeps } from "../../cli/deps.js"; import type { ClawdbotConfig } from "../../config/config.js"; -import { resolveSessionTranscriptPath, saveSessionStore } from "../../config/sessions.js"; +import { resolveSessionTranscriptPath, updateSessionStore } from "../../config/sessions.js"; import type { AgentDefaultsConfig } from "../../config/types.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; @@ -217,13 +217,17 @@ export async function runCronIsolatedAgentTurn(params: { skillsSnapshot, }; cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await saveSessionStore(cronSession.storePath, cronSession.store); + await updateSessionStore(cronSession.storePath, (store) => { + store[agentSessionKey] = cronSession.sessionEntry; + }); } // Persist systemSent before the run, mirroring the inbound auto-reply behavior. cronSession.sessionEntry.systemSent = true; cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await saveSessionStore(cronSession.storePath, cronSession.store); + await updateSessionStore(cronSession.storePath, (store) => { + store[agentSessionKey] = cronSession.sessionEntry; + }); let runResult: Awaited>; let fallbackProvider = provider; @@ -316,7 +320,9 @@ export async function runCronIsolatedAgentTurn(params: { promptTokens > 0 ? promptTokens : (usage.total ?? input); } cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await saveSessionStore(cronSession.storePath, cronSession.store); + await updateSessionStore(cronSession.storePath, (store) => { + store[agentSessionKey] = cronSession.sessionEntry; + }); } const firstText = payloads[0]?.text ?? ""; const summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText); diff --git a/src/gateway/server-bridge-events.ts b/src/gateway/server-bridge-events.ts index cd46a4876..fc0720524 100644 --- a/src/gateway/server-bridge-events.ts +++ b/src/gateway/server-bridge-events.ts @@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto"; import { normalizeChannelId } from "../channels/plugins/index.js"; import { agentCommand } from "../commands/agent.js"; import { loadConfig } from "../config/config.js"; -import { saveSessionStore } from "../config/sessions.js"; +import { updateSessionStore } from "../config/sessions.js"; import { normalizeMainKey } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; import type { BridgeEvent, BridgeHandlersContext } from "./server-bridge-types.js"; @@ -32,22 +32,23 @@ export const handleBridgeEvent = async ( const cfg = loadConfig(); const rawMainKey = normalizeMainKey(cfg.session?.mainKey); const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey; - const { storePath, store, entry, canonicalKey } = loadSessionEntry(sessionKey); + const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey); const now = Date.now(); const sessionId = entry?.sessionId ?? randomUUID(); - store[canonicalKey] = { - sessionId, - updatedAt: now, - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - reasoningLevel: entry?.reasoningLevel, - systemSent: entry?.systemSent, - sendPolicy: entry?.sendPolicy, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - }; if (storePath) { - await saveSessionStore(storePath, store); + await updateSessionStore(storePath, (store) => { + store[canonicalKey] = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + reasoningLevel: entry?.reasoningLevel, + systemSent: entry?.systemSent, + sendPolicy: entry?.sendPolicy, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + }); } // Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send). @@ -102,22 +103,23 @@ export const handleBridgeEvent = async ( const sessionKeyRaw = (link?.sessionKey ?? "").trim(); const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`; - const { storePath, store, entry, canonicalKey } = loadSessionEntry(sessionKey); + const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey); const now = Date.now(); const sessionId = entry?.sessionId ?? randomUUID(); - store[canonicalKey] = { - sessionId, - updatedAt: now, - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - reasoningLevel: entry?.reasoningLevel, - systemSent: entry?.systemSent, - sendPolicy: entry?.sendPolicy, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - }; if (storePath) { - await saveSessionStore(storePath, store); + await updateSessionStore(storePath, (store) => { + store[canonicalKey] = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + reasoningLevel: entry?.reasoningLevel, + systemSent: entry?.systemSent, + sendPolicy: entry?.sendPolicy, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + }); } void agentCommand( diff --git a/src/gateway/server-bridge-methods-sessions.ts b/src/gateway/server-bridge-methods-sessions.ts index e593ed437..c89d0aab0 100644 --- a/src/gateway/server-bridge-methods-sessions.ts +++ b/src/gateway/server-bridge-methods-sessions.ts @@ -8,10 +8,9 @@ import { } from "../agents/pi-embedded.js"; import { loadConfig } from "../config/config.js"; import { - loadSessionStore, resolveMainSessionKeyFromConfig, type SessionEntry, - saveSessionStore, + updateSessionStore, } from "../config/sessions.js"; import { clearCommandLane } from "../process/command-queue.js"; import { @@ -126,19 +125,20 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async ( const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; - const store = loadSessionStore(storePath); - const primaryKey = target.storeKeys[0] ?? key; - const existingKey = target.storeKeys.find((candidate) => store[candidate]); - if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { - store[primaryKey] = store[existingKey]; - delete store[existingKey]; - } - const applied = await applySessionsPatchToStore({ - cfg, - store, - storeKey: primaryKey, - patch: p, - loadGatewayModelCatalog: ctx.loadGatewayModelCatalog, + const applied = await updateSessionStore(storePath, async (store) => { + const primaryKey = target.storeKeys[0] ?? key; + const existingKey = target.storeKeys.find((candidate) => store[candidate]); + if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { + store[primaryKey] = store[existingKey]; + delete store[existingKey]; + } + return await applySessionsPatchToStore({ + cfg, + store, + storeKey: primaryKey, + patch: p, + loadGatewayModelCatalog: ctx.loadGatewayModelCatalog, + }); }); if (!applied.ok) { return { @@ -150,7 +150,6 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async ( }, }; } - await saveSessionStore(storePath, store); const payload: SessionsPatchResult = { ok: true, path: storePath, @@ -182,32 +181,43 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async ( }; } - const { storePath, store, entry } = loadSessionEntry(key); - const now = Date.now(); - const next: SessionEntry = { - sessionId: randomUUID(), - updatedAt: now, - systemSent: false, - abortedLastRun: false, - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - reasoningLevel: entry?.reasoningLevel, - model: entry?.model, - contextTokens: entry?.contextTokens, - sendPolicy: entry?.sendPolicy, - label: entry?.label, - displayName: entry?.displayName, - chatType: entry?.chatType, - channel: entry?.channel, - subject: entry?.subject, - room: entry?.room, - space: entry?.space, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - skillsSnapshot: entry?.skillsSnapshot, - }; - store[key] = next; - await saveSessionStore(storePath, store); + const cfg = loadConfig(); + const target = resolveGatewaySessionStoreTarget({ cfg, key }); + const storePath = target.storePath; + const next = await updateSessionStore(storePath, (store) => { + const primaryKey = target.storeKeys[0] ?? key; + const existingKey = target.storeKeys.find((candidate) => store[candidate]); + if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { + store[primaryKey] = store[existingKey]; + delete store[existingKey]; + } + const entry = store[primaryKey]; + const now = Date.now(); + const nextEntry: SessionEntry = { + sessionId: randomUUID(), + updatedAt: now, + systemSent: false, + abortedLastRun: false, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + reasoningLevel: entry?.reasoningLevel, + model: entry?.model, + contextTokens: entry?.contextTokens, + sendPolicy: entry?.sendPolicy, + label: entry?.label, + displayName: entry?.displayName, + chatType: entry?.chatType, + channel: entry?.channel, + subject: entry?.subject, + room: entry?.room, + space: entry?.space, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + skillsSnapshot: entry?.skillsSnapshot, + }; + store[primaryKey] = nextEntry; + return nextEntry; + }); return { ok: true, payloadJSON: JSON.stringify({ ok: true, key, entry: next }), @@ -249,9 +259,11 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async ( const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; - const { storePath, store, entry } = loadSessionEntry(key); + const cfg = loadConfig(); + const target = resolveGatewaySessionStoreTarget({ cfg, key }); + const storePath = target.storePath; + const { entry } = loadSessionEntry(key); const sessionId = entry?.sessionId; - const existed = Boolean(store[key]); clearCommandLane(resolveEmbeddedSessionLane(key)); if (sessionId && isEmbeddedPiRunActive(sessionId)) { abortEmbeddedPiRun(sessionId); @@ -266,8 +278,19 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async ( }; } } - if (existed) delete store[key]; - await saveSessionStore(storePath, store); + const deletion = await updateSessionStore(storePath, (store) => { + const primaryKey = target.storeKeys[0] ?? key; + const existingKey = target.storeKeys.find((candidate) => store[candidate]); + if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { + store[primaryKey] = store[existingKey]; + delete store[existingKey]; + } + const entryToDelete = store[primaryKey]; + const existed = Boolean(entryToDelete); + if (existed) delete store[primaryKey]; + return { existed, entry: entryToDelete }; + }); + const existed = deletion.existed; const archived: string[] = []; if (deleteTranscript && sessionId) { @@ -323,7 +346,20 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async ( ? Math.max(1, Math.floor(p.maxLines)) : 400; - const { storePath, store, entry } = loadSessionEntry(key); + const cfg = loadConfig(); + const target = resolveGatewaySessionStoreTarget({ cfg, key }); + const storePath = target.storePath; + // Resolve entry inside the lock, but compact outside to avoid holding it. + const compactTarget = await updateSessionStore(storePath, (store) => { + const primaryKey = target.storeKeys[0] ?? key; + const existingKey = target.storeKeys.find((candidate) => store[candidate]); + if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { + store[primaryKey] = store[existingKey]; + delete store[existingKey]; + } + return { entry: store[primaryKey], primaryKey }; + }); + const entry = compactTarget.entry; const sessionId = entry?.sessionId; if (!sessionId) { return { @@ -373,13 +409,14 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async ( fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); // Token counts no longer match; clear so status + UI reflect reality after the next turn. - if (store[key]) { - delete store[key].inputTokens; - delete store[key].outputTokens; - delete store[key].totalTokens; - store[key].updatedAt = Date.now(); - await saveSessionStore(storePath, store); - } + await updateSessionStore(storePath, (store) => { + const entryToUpdate = store[compactTarget.primaryKey]; + if (!entryToUpdate) return; + delete entryToUpdate.inputTokens; + delete entryToUpdate.outputTokens; + delete entryToUpdate.totalTokens; + entryToUpdate.updatedAt = Date.now(); + }); return { ok: true, diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 625d17d4c..3f15ad320 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -6,7 +6,7 @@ import { resolveAgentIdFromSessionKey, resolveAgentMainSessionKey, type SessionEntry, - saveSessionStore, + updateSessionStore, } from "../../config/sessions.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { resolveOutboundTarget } from "../../infra/outbound/targets.js"; @@ -136,7 +136,7 @@ export const agentHandlers: GatewayRequestHandlers = { let cfgForAgent: ReturnType | undefined; if (requestedSessionKey) { - const { cfg, storePath, store, entry, canonicalKey } = loadSessionEntry(requestedSessionKey); + const { cfg, storePath, entry, canonicalKey } = loadSessionEntry(requestedSessionKey); cfgForAgent = cfg; const now = Date.now(); const sessionId = entry?.sessionId ?? randomUUID(); @@ -178,11 +178,10 @@ export const agentHandlers: GatewayRequestHandlers = { const canonicalSessionKey = canonicalKey; const agentId = resolveAgentIdFromSessionKey(canonicalSessionKey); const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId }); - if (store) { - store[canonicalSessionKey] = nextEntry; - if (storePath) { - await saveSessionStore(storePath, store); - } + if (storePath) { + await updateSessionStore(storePath, (store) => { + store[canonicalSessionKey] = nextEntry; + }); } if (canonicalSessionKey === mainSessionKey || canonicalSessionKey === "global") { context.addChatRun(idem, { diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index e790eef80..e2b9f5a0c 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -9,10 +9,9 @@ import { } from "../../agents/pi-embedded.js"; import { loadConfig } from "../../config/config.js"; import { - loadSessionStore, resolveMainSessionKey, type SessionEntry, - saveSessionStore, + updateSessionStore, } from "../../config/sessions.js"; import { clearCommandLane } from "../../process/command-queue.js"; import { @@ -30,6 +29,7 @@ import { archiveFileOnDisk, listSessionsFromStore, loadCombinedSessionStoreForGateway, + loadSessionEntry, resolveGatewaySessionStoreTarget, resolveSessionTranscriptCandidates, type SessionsPatchResult, @@ -106,26 +106,25 @@ export const sessionsHandlers: GatewayRequestHandlers = { const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; - const store = loadSessionStore(storePath); - - const primaryKey = target.storeKeys[0] ?? key; - const existingKey = target.storeKeys.find((candidate) => store[candidate]); - if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { - store[primaryKey] = store[existingKey]; - delete store[existingKey]; - } - const applied = await applySessionsPatchToStore({ - cfg, - store, - storeKey: primaryKey, - patch: p, - loadGatewayModelCatalog: context.loadGatewayModelCatalog, + const applied = await updateSessionStore(storePath, async (store) => { + const primaryKey = target.storeKeys[0] ?? key; + const existingKey = target.storeKeys.find((candidate) => store[candidate]); + if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { + store[primaryKey] = store[existingKey]; + delete store[existingKey]; + } + return await applySessionsPatchToStore({ + cfg, + store, + storeKey: primaryKey, + patch: p, + loadGatewayModelCatalog: context.loadGatewayModelCatalog, + }); }); if (!applied.ok) { respond(false, undefined, applied.error); return; } - await saveSessionStore(storePath, store); const result: SessionsPatchResult = { ok: true, path: storePath, @@ -156,34 +155,35 @@ export const sessionsHandlers: GatewayRequestHandlers = { const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; - const store = loadSessionStore(storePath); - const primaryKey = target.storeKeys[0] ?? key; - const existingKey = target.storeKeys.find((candidate) => store[candidate]); - if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { - store[primaryKey] = store[existingKey]; - delete store[existingKey]; - } - const entry = store[primaryKey]; - const now = Date.now(); - const next: SessionEntry = { - sessionId: randomUUID(), - updatedAt: now, - systemSent: false, - abortedLastRun: false, - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - reasoningLevel: entry?.reasoningLevel, - responseUsage: entry?.responseUsage, - model: entry?.model, - contextTokens: entry?.contextTokens, - sendPolicy: entry?.sendPolicy, - label: entry?.label, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - skillsSnapshot: entry?.skillsSnapshot, - }; - store[primaryKey] = next; - await saveSessionStore(storePath, store); + const next = await updateSessionStore(storePath, (store) => { + const primaryKey = target.storeKeys[0] ?? key; + const existingKey = target.storeKeys.find((candidate) => store[candidate]); + if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { + store[primaryKey] = store[existingKey]; + delete store[existingKey]; + } + const entry = store[primaryKey]; + const now = Date.now(); + const nextEntry: SessionEntry = { + sessionId: randomUUID(), + updatedAt: now, + systemSent: false, + abortedLastRun: false, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + reasoningLevel: entry?.reasoningLevel, + responseUsage: entry?.responseUsage, + model: entry?.model, + contextTokens: entry?.contextTokens, + sendPolicy: entry?.sendPolicy, + label: entry?.label, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + skillsSnapshot: entry?.skillsSnapshot, + }; + store[primaryKey] = nextEntry; + return nextEntry; + }); respond(true, { ok: true, key: target.canonicalKey, entry: next }, undefined); }, "sessions.delete": async ({ params, respond }) => { @@ -220,14 +220,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; const storePath = target.storePath; - const store = loadSessionStore(storePath); - const primaryKey = target.storeKeys[0] ?? key; - const existingKey = target.storeKeys.find((candidate) => store[candidate]); - if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { - store[primaryKey] = store[existingKey]; - delete store[existingKey]; - } - const entry = store[primaryKey]; + const { entry } = loadSessionEntry(key); const sessionId = entry?.sessionId; const existed = Boolean(entry); clearCommandLane(resolveEmbeddedSessionLane(target.canonicalKey)); @@ -246,8 +239,15 @@ export const sessionsHandlers: GatewayRequestHandlers = { return; } } - if (existed) delete store[primaryKey]; - await saveSessionStore(storePath, store); + await updateSessionStore(storePath, (store) => { + const primaryKey = target.storeKeys[0] ?? key; + const existingKey = target.storeKeys.find((candidate) => store[candidate]); + if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { + store[primaryKey] = store[existingKey]; + delete store[existingKey]; + } + if (store[primaryKey]) delete store[primaryKey]; + }); const archived: string[] = []; if (deleteTranscript && sessionId) { @@ -295,14 +295,17 @@ export const sessionsHandlers: GatewayRequestHandlers = { const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); const storePath = target.storePath; - const store = loadSessionStore(storePath); - const primaryKey = target.storeKeys[0] ?? key; - const existingKey = target.storeKeys.find((candidate) => store[candidate]); - if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { - store[primaryKey] = store[existingKey]; - delete store[existingKey]; - } - const entry = store[primaryKey]; + // Lock + read in a short critical section; transcript work happens outside. + const compactTarget = await updateSessionStore(storePath, (store) => { + const primaryKey = target.storeKeys[0] ?? key; + const existingKey = target.storeKeys.find((candidate) => store[candidate]); + if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { + store[primaryKey] = store[existingKey]; + delete store[existingKey]; + } + return { entry: store[primaryKey], primaryKey }; + }); + const entry = compactTarget.entry; const sessionId = entry?.sessionId; if (!sessionId) { respond( @@ -358,13 +361,15 @@ export const sessionsHandlers: GatewayRequestHandlers = { const keptLines = lines.slice(-maxLines); fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); - if (store[primaryKey]) { - delete store[primaryKey].inputTokens; - delete store[primaryKey].outputTokens; - delete store[primaryKey].totalTokens; - store[primaryKey].updatedAt = Date.now(); - await saveSessionStore(storePath, store); - } + await updateSessionStore(storePath, (store) => { + const entryKey = compactTarget.primaryKey; + const entryToUpdate = store[entryKey]; + if (!entryToUpdate) return; + delete entryToUpdate.inputTokens; + delete entryToUpdate.outputTokens; + delete entryToUpdate.totalTokens; + entryToUpdate.updatedAt = Date.now(); + }); respond( true, diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index cc4053a71..704351bb6 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -285,9 +285,12 @@ export function loadCombinedSessionStoreForGateway(cfg: ClawdbotConfig): { const store = loadSessionStore(storePath); for (const [key, entry] of Object.entries(store)) { const canonicalKey = canonicalizeSessionKeyForAgent(agentId, key); + // Merge with existing entry if present (avoid overwriting with less complete data) + const existing = combined[canonicalKey]; combined[canonicalKey] = { + ...existing, ...entry, - spawnedBy: canonicalizeSpawnedByForAgent(agentId, entry.spawnedBy), + spawnedBy: canonicalizeSpawnedByForAgent(agentId, entry.spawnedBy ?? existing?.spawnedBy), }; } } diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 60ee7d6ad..7ffad2ed9 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -17,7 +17,7 @@ import { resolveAgentIdFromSessionKey, resolveMainSessionKey, resolveStorePath, - saveSessionStore, + updateSessionStore, } from "../config/sessions.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging.js"; @@ -150,8 +150,13 @@ async function restoreHeartbeatUpdatedAt(params: { if (!entry) return; const nextUpdatedAt = Math.max(entry.updatedAt ?? 0, updatedAt); if (entry.updatedAt === nextUpdatedAt) return; - store[sessionKey] = { ...entry, updatedAt: nextUpdatedAt }; - await saveSessionStore(storePath, store); + await updateSessionStore(storePath, (nextStore) => { + const nextEntry = nextStore[sessionKey] ?? entry; + if (!nextEntry) return; + const resolvedUpdatedAt = Math.max(nextEntry.updatedAt ?? 0, updatedAt); + if (nextEntry.updatedAt === resolvedUpdatedAt) return; + nextStore[sessionKey] = { ...nextEntry, updatedAt: resolvedUpdatedAt }; + }); } function normalizeHeartbeatReply( diff --git a/src/web/auto-reply/heartbeat-runner.ts b/src/web/auto-reply/heartbeat-runner.ts index fb1e0b66f..e6ee3d1d9 100644 --- a/src/web/auto-reply/heartbeat-runner.ts +++ b/src/web/auto-reply/heartbeat-runner.ts @@ -11,7 +11,7 @@ import { loadSessionStore, resolveSessionKey, resolveStorePath, - saveSessionStore, + updateSessionStore, } from "../../config/sessions.js"; import { emitHeartbeatEvent } from "../../infra/heartbeat-events.js"; import { getChildLogger } from "../../logging.js"; @@ -72,7 +72,14 @@ export async function runWebHeartbeatOnce(opts: { sessionId, updatedAt: Date.now(), }; - await saveSessionStore(storePath, store); + await updateSessionStore(storePath, (nextStore) => { + const nextCurrent = nextStore[sessionKey] ?? current; + nextStore[sessionKey] = { + ...nextCurrent, + sessionId, + updatedAt: Date.now(), + }; + }); } const sessionSnapshot = getSessionSnapshot(cfg, to, true); if (verbose) { @@ -163,7 +170,14 @@ export async function runWebHeartbeatOnce(opts: { const store = loadSessionStore(storePath); if (sessionSnapshot.entry && store[sessionSnapshot.key]) { store[sessionSnapshot.key].updatedAt = sessionSnapshot.entry.updatedAt; - await saveSessionStore(storePath, store); + await updateSessionStore(storePath, (nextStore) => { + const nextEntry = nextStore[sessionSnapshot.key]; + if (!nextEntry) return; + nextStore[sessionSnapshot.key] = { + ...nextEntry, + updatedAt: sessionSnapshot.entry.updatedAt, + }; + }); } heartbeatLogger.info(