refactor: harden session store updates

Co-authored-by: Tyler Yust <tyler6204@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-01-15 23:06:42 +00:00
parent 35492f8513
commit 688a0ce439
24 changed files with 441 additions and 215 deletions

View File

@@ -25,7 +25,7 @@ import {
loadSessionStore,
resolveStorePath,
type SessionEntry,
saveSessionStore,
updateSessionStore,
} from "../../config/sessions.js";
import {
formatUsageSummaryLine,
@@ -263,7 +263,9 @@ export function createSessionStatusTool(opts?: {
delete nextEntry.authProfileOverride;
}
store[resolved.key] = nextEntry;
await saveSessionStore(storePath, store);
await updateSessionStore(storePath, (nextStore) => {
nextStore[resolved.key] = nextEntry;
});
resolved.entry = nextEntry;
changedModel = true;
}

View File

@@ -5,7 +5,7 @@ import {
loadSessionStore,
resolveStorePath,
type SessionEntry,
saveSessionStore,
updateSessionStore,
} from "../../config/sessions.js";
import { parseAgentSessionKey } from "../../routing/session-key.js";
import { resolveCommandAuthorization } from "../command-auth.js";
@@ -90,7 +90,13 @@ export async function tryFastAbortFromMessage(params: {
entry.abortedLastRun = true;
entry.updatedAt = Date.now();
store[key] = entry;
await saveSessionStore(storePath, store);
await updateSessionStore(storePath, (nextStore) => {
const nextEntry = nextStore[key] ?? entry;
if (!nextEntry) return;
nextEntry.abortedLastRun = true;
nextEntry.updatedAt = Date.now();
nextStore[key] = nextEntry;
});
} else if (abortKey) {
setAbortMemory(abortKey, true);
}

View File

@@ -14,7 +14,7 @@ import {
resolveAgentIdFromSessionKey,
resolveSessionTranscriptPath,
type SessionEntry,
saveSessionStore,
updateSessionStore,
} from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
@@ -383,6 +383,7 @@ export async function runAgentTurnWithFallback(params: {
params.activeSessionStore &&
params.storePath
) {
const sessionKey = params.sessionKey;
const corruptedSessionId = params.getActiveSessionEntry()?.sessionId;
defaultRuntime.error(
`Session history corrupted (Gemini function call ordering). Resetting session: ${params.sessionKey}`,
@@ -399,9 +400,10 @@ export async function runAgentTurnWithFallback(params: {
}
}
// Remove session entry from store
delete params.activeSessionStore[params.sessionKey];
await saveSessionStore(params.storePath, params.activeSessionStore);
// Remove session entry from store using a fresh, locked snapshot.
await updateSessionStore(params.storePath, (store) => {
delete store[sessionKey];
});
} catch (cleanupErr) {
defaultRuntime.error(
`Failed to reset corrupted session ${params.sessionKey}: ${String(cleanupErr)}`,

View File

@@ -10,7 +10,7 @@ import {
resolveAgentIdFromSessionKey,
resolveSessionTranscriptPath,
type SessionEntry,
saveSessionStore,
updateSessionStore,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
@@ -156,7 +156,9 @@ export async function runReplyAgent(params: {
activeSessionEntry.updatedAt = Date.now();
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await saveSessionStore(storePath, activeSessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = activeSessionEntry as SessionEntry;
});
}
}
typing.cleanup();
@@ -170,7 +172,9 @@ export async function runReplyAgent(params: {
activeSessionEntry.updatedAt = Date.now();
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await saveSessionStore(storePath, activeSessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = activeSessionEntry as SessionEntry;
});
}
}
typing.cleanup();
@@ -224,7 +228,9 @@ export async function runReplyAgent(params: {
nextEntry.sessionFile = nextSessionFile;
activeSessionStore[sessionKey] = nextEntry;
try {
await saveSessionStore(storePath, activeSessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = nextEntry;
});
} catch (err) {
defaultRuntime.error(
`Failed to persist session reset after compaction failure (${sessionKey}): ${String(err)}`,
@@ -280,7 +286,9 @@ export async function runReplyAgent(params: {
activeSessionEntry.updatedAt = Date.now();
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await saveSessionStore(storePath, activeSessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = activeSessionEntry as SessionEntry;
});
}
}

View File

@@ -1,5 +1,5 @@
import type { SessionEntry } from "../../config/sessions.js";
import { saveSessionStore } from "../../config/sessions.js";
import { updateSessionStore } from "../../config/sessions.js";
import { setAbortMemory } from "./abort.js";
export async function applySessionHints(params: {
@@ -23,7 +23,16 @@ export async function applySessionHints(params: {
params.sessionEntry.updatedAt = Date.now();
params.sessionStore[params.sessionKey] = params.sessionEntry;
if (params.storePath) {
await saveSessionStore(params.storePath, params.sessionStore);
const sessionKey = params.sessionKey;
await updateSessionStore(params.storePath, (store) => {
const entry = store[sessionKey] ?? params.sessionEntry;
if (!entry) return;
store[sessionKey] = {
...entry,
abortedLastRun: false,
updatedAt: Date.now(),
};
});
}
} else if (params.abortKey) {
setAbortMemory(params.abortKey, false);

View File

@@ -1,6 +1,6 @@
import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js";
import type { SessionEntry } from "../../config/sessions.js";
import { saveSessionStore } from "../../config/sessions.js";
import { updateSessionStore } from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { scheduleGatewaySigusr1Restart, triggerClawdbotRestart } from "../../infra/restart.js";
import { parseAgentSessionKey } from "../../routing/session-key.js";
@@ -71,7 +71,9 @@ export const handleActivationCommand: CommandHandler = async (params, allowTextC
params.sessionEntry.updatedAt = Date.now();
params.sessionStore[params.sessionKey] = params.sessionEntry;
if (params.storePath) {
await saveSessionStore(params.storePath, params.sessionStore);
await updateSessionStore(params.storePath, (store) => {
store[params.sessionKey] = params.sessionEntry as SessionEntry;
});
}
}
return {
@@ -107,7 +109,9 @@ export const handleSendPolicyCommand: CommandHandler = async (params, allowTextC
params.sessionEntry.updatedAt = Date.now();
params.sessionStore[params.sessionKey] = params.sessionEntry;
if (params.storePath) {
await saveSessionStore(params.storePath, params.sessionStore);
await updateSessionStore(params.storePath, (store) => {
store[params.sessionKey] = params.sessionEntry as SessionEntry;
});
}
}
const label =
@@ -190,7 +194,9 @@ export const handleStopCommand: CommandHandler = async (params, allowTextCommand
abortTarget.entry.updatedAt = Date.now();
params.sessionStore[abortTarget.key] = abortTarget.entry;
if (params.storePath) {
await saveSessionStore(params.storePath, params.sessionStore);
await updateSessionStore(params.storePath, (store) => {
store[abortTarget.key] = abortTarget.entry as SessionEntry;
});
}
} else if (params.command.abortKey) {
setAbortMemory(params.command.abortKey, true);
@@ -215,7 +221,9 @@ export const handleAbortTrigger: CommandHandler = async (params, allowTextComman
abortTarget.entry.updatedAt = Date.now();
params.sessionStore[abortTarget.key] = abortTarget.entry;
if (params.storePath) {
await saveSessionStore(params.storePath, params.sessionStore);
await updateSessionStore(params.storePath, (store) => {
store[abortTarget.key] = abortTarget.entry as SessionEntry;
});
}
} else if (params.command.abortKey) {
setAbortMemory(params.command.abortKey, true);

View File

@@ -2,7 +2,7 @@ import { resolveAgentDir, resolveSessionAgentId } from "../../agents/agent-scope
import type { ModelAliasIndex } from "../../agents/model-selection.js";
import { resolveSandboxRuntimeStatus } from "../../agents/sandbox.js";
import type { ClawdbotConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { type SessionEntry, updateSessionStore } from "../../config/sessions.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { applyVerboseOverride } from "../../sessions/level-overrides.js";
import { formatThinkingLevels, formatXHighModelHint, supportsXHighThinking } from "../thinking.js";
@@ -288,7 +288,9 @@ export async function handleDirectiveOnly(params: {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
}
if (modelSelection) {
const nextLabel = `${modelSelection.provider}/${modelSelection.model}`;

View File

@@ -14,7 +14,7 @@ import {
resolveModelRefFromString,
} from "../../agents/model-selection.js";
import type { ClawdbotConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { type SessionEntry, updateSessionStore } from "../../config/sessions.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { applyVerboseOverride } from "../../sessions/level-overrides.js";
import { resolveProfileOverride } from "./directive-handling.auth.js";
@@ -184,7 +184,9 @@ export async function persistInlineDirectives(params: {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
}
if (elevatedChanged) {
const nextElevated = (sessionEntry.elevatedLevel ?? "off") as ElevatedLevel;

View File

@@ -9,7 +9,7 @@ import type { ClawdbotConfig } from "../../config/config.js";
import {
resolveSessionFilePath,
type SessionEntry,
saveSessionStore,
updateSessionStore,
} from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { clearCommandLane, getQueueSize } from "../../process/command-queue.js";
@@ -276,7 +276,9 @@ export async function runPreparedReply(
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
}
}
}

View File

@@ -10,7 +10,7 @@ import {
resolveThinkingDefault,
} from "../../agents/model-selection.js";
import type { ClawdbotConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { type SessionEntry, updateSessionStore } from "../../config/sessions.js";
import type { ThinkLevel } from "./directives.js";
export type ModelDirectiveSelection = {
@@ -189,7 +189,9 @@ export async function createModelSelectionState(params: {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
}
resetModelOverride = true;
}
@@ -218,7 +220,9 @@ export async function createModelSelectionState(params: {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
}
}
}

View File

@@ -2,7 +2,7 @@ import crypto from "node:crypto";
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
import type { ClawdbotConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { type SessionEntry, updateSessionStore } from "../../config/sessions.js";
import { buildChannelSummary } from "../../infra/channel-summary.js";
import { drainSystemEventEntries } from "../../infra/system-events.js";
@@ -111,7 +111,9 @@ export async function ensureSkillSnapshot(params: {
};
sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...nextEntry };
if (storePath) {
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = { ...store[sessionKey], ...nextEntry };
});
}
systemSent = true;
}
@@ -143,7 +145,9 @@ export async function ensureSkillSnapshot(params: {
};
sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...nextEntry };
if (storePath) {
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = { ...store[sessionKey], ...nextEntry };
});
}
}
@@ -168,7 +172,13 @@ export async function incrementCompactionCount(params: {
updatedAt: now,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = {
...store[sessionKey],
compactionCount: nextCount,
updatedAt: now,
};
});
}
return nextCount;
}

View File

@@ -20,7 +20,7 @@ import {
resolveStorePath,
type SessionEntry,
type SessionScope,
saveSessionStore,
updateSessionStore,
} from "../../config/sessions.js";
import { normalizeMainKey } from "../../routing/session-key.js";
import { resolveCommandAuthorization } from "../command-auth.js";
@@ -188,6 +188,11 @@ export async function initSessionState(params: {
}
const baseEntry = !isNewSession && freshEntry ? entry : undefined;
// Track the originating channel/to for announce routing (subagent announce-back).
const lastChannel =
(ctx.OriginatingChannel as string | undefined)?.trim() || baseEntry?.lastChannel;
const lastTo = ctx.OriginatingTo?.trim() || ctx.To?.trim() || baseEntry?.lastTo;
const lastAccountId = ctx.AccountId?.trim() || baseEntry?.lastAccountId;
sessionEntry = {
...baseEntry,
sessionId,
@@ -212,6 +217,10 @@ export async function initSessionState(params: {
subject: baseEntry?.subject,
room: baseEntry?.room,
space: baseEntry?.space,
// Track originating channel for subagent announce routing.
lastChannel,
lastTo,
lastAccountId,
};
if (groupResolution?.channel) {
const channel = groupResolution.channel;
@@ -270,7 +279,15 @@ export async function initSessionState(params: {
);
}
sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry };
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
if (groupResolution?.legacyKey && groupResolution.legacyKey !== sessionKey) {
if (store[groupResolution.legacyKey] && !store[sessionKey]) {
store[sessionKey] = store[groupResolution.legacyKey];
}
delete store[groupResolution.legacyKey];
}
store[sessionKey] = { ...store[sessionKey], ...sessionEntry };
});
const sessionCtx: TemplateContext = {
...ctx,

View File

@@ -36,7 +36,7 @@ import {
resolveAgentIdFromSessionKey,
resolveSessionFilePath,
type SessionEntry,
saveSessionStore,
updateSessionStore,
} from "../config/sessions.js";
import {
clearAgentRunContext,
@@ -173,7 +173,9 @@ export async function agentCommand(
skillsSnapshot,
};
sessionStore[sessionKey] = next;
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = next;
});
sessionEntry = next;
}
@@ -188,7 +190,9 @@ export async function agentCommand(
}
applyVerboseOverride(next, verboseOverride);
sessionStore[sessionKey] = next;
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = next;
});
}
const agentModelPrimary = resolveAgentModelPrimary(cfg, sessionAgentId);
@@ -252,7 +256,9 @@ export async function agentCommand(
delete sessionEntry.modelOverride;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
}
}
}
@@ -279,7 +285,9 @@ export async function agentCommand(
sessionEntry.updatedAt = Date.now();
if (sessionStore && sessionKey) {
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
}
}
}
@@ -307,7 +315,9 @@ export async function agentCommand(
sessionEntry.thinkingLevel = "high";
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = sessionEntry;
});
}
}
const sessionFile = resolveSessionFilePath(sessionId, sessionEntry, {

View File

@@ -4,7 +4,7 @@ import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import type { ClawdbotConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { type SessionEntry, updateSessionStore } from "../../config/sessions.js";
type RunResult = Awaited<
ReturnType<(typeof import("../../agents/pi-embedded.js"))["runEmbeddedPiAgent"]>
@@ -68,5 +68,7 @@ export async function updateSessionStoreAfterAgentRun(params: {
next.totalTokens = promptTokens > 0 ? promptTokens : (usage.total ?? input);
}
sessionStore[sessionKey] = next;
await saveSessionStore(storePath, sessionStore);
await updateSessionStore(storePath, (store) => {
store[sessionKey] = next;
});
}

View File

@@ -12,6 +12,7 @@ import {
resolveSessionTranscriptPath,
resolveSessionTranscriptsDir,
updateLastRoute,
updateSessionStore,
updateSessionStoreEntry,
} from "./sessions.js";
@@ -137,6 +138,56 @@ describe("sessions", () => {
expect(store[mainSessionKey]?.compactionCount).toBe(2);
});
it("updateSessionStore preserves concurrent additions", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(storePath, "{}", "utf-8");
await Promise.all([
updateSessionStore(storePath, (store) => {
store["agent:main:one"] = { sessionId: "sess-1", updatedAt: 1 };
}),
updateSessionStore(storePath, (store) => {
store["agent:main:two"] = { sessionId: "sess-2", updatedAt: 2 };
}),
]);
const store = loadSessionStore(storePath);
expect(store["agent:main:one"]?.sessionId).toBe("sess-1");
expect(store["agent:main:two"]?.sessionId).toBe("sess-2");
});
it("updateSessionStore keeps deletions when concurrent writes happen", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(
storePath,
JSON.stringify(
{
"agent:main:old": { sessionId: "sess-old", updatedAt: 1 },
"agent:main:keep": { sessionId: "sess-keep", updatedAt: 2 },
},
null,
2,
),
"utf-8",
);
await Promise.all([
updateSessionStore(storePath, (store) => {
delete store["agent:main:old"];
}),
updateSessionStore(storePath, (store) => {
store["agent:main:new"] = { sessionId: "sess-new", updatedAt: 3 };
}),
]);
const store = loadSessionStore(storePath);
expect(store["agent:main:old"]).toBeUndefined();
expect(store["agent:main:keep"]?.sessionId).toBe("sess-keep");
expect(store["agent:main:new"]?.sessionId).toBe("sess-new");
});
it("loadSessionStore auto-migrates legacy provider keys to channel keys", async () => {
const mainSessionKey = "agent:main:main";
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-"));

View File

@@ -45,9 +45,16 @@ export function clearSessionStoreCacheForTest(): void {
SESSION_STORE_CACHE.clear();
}
export function loadSessionStore(storePath: string): Record<string, SessionEntry> {
type LoadSessionStoreOptions = {
skipCache?: boolean;
};
export function loadSessionStore(
storePath: string,
opts: LoadSessionStoreOptions = {},
): Record<string, SessionEntry> {
// Check cache first if enabled
if (isSessionStoreCacheEnabled()) {
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
const cached = SESSION_STORE_CACHE.get(storePath);
if (cached && isSessionStoreCacheValid(cached)) {
const currentMtimeMs = getFileMtimeMs(storePath);
@@ -88,7 +95,7 @@ export function loadSessionStore(storePath: string): Record<string, SessionEntry
}
// Cache the result if caching is enabled
if (isSessionStoreCacheEnabled()) {
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
SESSION_STORE_CACHE.set(storePath, {
store: structuredClone(store), // Store a copy to prevent external mutations
loadedAt: Date.now(),
@@ -168,6 +175,19 @@ export async function saveSessionStore(
});
}
export async function updateSessionStore<T>(
storePath: string,
mutator: (store: Record<string, SessionEntry>) => Promise<T> | T,
): Promise<T> {
return await withSessionStoreLock(storePath, async () => {
// Always re-read inside the lock to avoid clobbering concurrent writers.
const store = loadSessionStore(storePath, { skipCache: true });
const result = await mutator(store);
await saveSessionStoreUnlocked(storePath, store);
return result;
});
}
type SessionStoreLockOptions = {
timeoutMs?: number;
pollIntervalMs?: number;

View File

@@ -30,7 +30,7 @@ import {
} from "../../auto-reply/thinking.js";
import type { CliDeps } from "../../cli/deps.js";
import type { ClawdbotConfig } from "../../config/config.js";
import { resolveSessionTranscriptPath, saveSessionStore } from "../../config/sessions.js";
import { resolveSessionTranscriptPath, updateSessionStore } from "../../config/sessions.js";
import type { AgentDefaultsConfig } from "../../config/types.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
@@ -217,13 +217,17 @@ export async function runCronIsolatedAgentTurn(params: {
skillsSnapshot,
};
cronSession.store[agentSessionKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
await updateSessionStore(cronSession.storePath, (store) => {
store[agentSessionKey] = cronSession.sessionEntry;
});
}
// Persist systemSent before the run, mirroring the inbound auto-reply behavior.
cronSession.sessionEntry.systemSent = true;
cronSession.store[agentSessionKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
await updateSessionStore(cronSession.storePath, (store) => {
store[agentSessionKey] = cronSession.sessionEntry;
});
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = provider;
@@ -316,7 +320,9 @@ export async function runCronIsolatedAgentTurn(params: {
promptTokens > 0 ? promptTokens : (usage.total ?? input);
}
cronSession.store[agentSessionKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
await updateSessionStore(cronSession.storePath, (store) => {
store[agentSessionKey] = cronSession.sessionEntry;
});
}
const firstText = payloads[0]?.text ?? "";
const summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText);

View File

@@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto";
import { normalizeChannelId } from "../channels/plugins/index.js";
import { agentCommand } from "../commands/agent.js";
import { loadConfig } from "../config/config.js";
import { saveSessionStore } from "../config/sessions.js";
import { updateSessionStore } from "../config/sessions.js";
import { normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import type { BridgeEvent, BridgeHandlersContext } from "./server-bridge-types.js";
@@ -32,22 +32,23 @@ export const handleBridgeEvent = async (
const cfg = loadConfig();
const rawMainKey = normalizeMainKey(cfg.session?.mainKey);
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey;
const { storePath, store, entry, canonicalKey } = loadSessionEntry(sessionKey);
const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
store[canonicalKey] = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
if (storePath) {
await saveSessionStore(storePath, store);
await updateSessionStore(storePath, (store) => {
store[canonicalKey] = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
});
}
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
@@ -102,22 +103,23 @@ export const handleBridgeEvent = async (
const sessionKeyRaw = (link?.sessionKey ?? "").trim();
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`;
const { storePath, store, entry, canonicalKey } = loadSessionEntry(sessionKey);
const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
store[canonicalKey] = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
if (storePath) {
await saveSessionStore(storePath, store);
await updateSessionStore(storePath, (store) => {
store[canonicalKey] = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
});
}
void agentCommand(

View File

@@ -8,10 +8,9 @@ import {
} from "../agents/pi-embedded.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveMainSessionKeyFromConfig,
type SessionEntry,
saveSessionStore,
updateSessionStore,
} from "../config/sessions.js";
import { clearCommandLane } from "../process/command-queue.js";
import {
@@ -126,19 +125,20 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const applied = await applySessionsPatchToStore({
cfg,
store,
storeKey: primaryKey,
patch: p,
loadGatewayModelCatalog: ctx.loadGatewayModelCatalog,
const applied = await updateSessionStore(storePath, async (store) => {
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
return await applySessionsPatchToStore({
cfg,
store,
storeKey: primaryKey,
patch: p,
loadGatewayModelCatalog: ctx.loadGatewayModelCatalog,
});
});
if (!applied.ok) {
return {
@@ -150,7 +150,6 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
},
};
}
await saveSessionStore(storePath, store);
const payload: SessionsPatchResult = {
ok: true,
path: storePath,
@@ -182,32 +181,43 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
};
}
const { storePath, store, entry } = loadSessionEntry(key);
const now = Date.now();
const next: SessionEntry = {
sessionId: randomUUID(),
updatedAt: now,
systemSent: false,
abortedLastRun: false,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
model: entry?.model,
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
label: entry?.label,
displayName: entry?.displayName,
chatType: entry?.chatType,
channel: entry?.channel,
subject: entry?.subject,
room: entry?.room,
space: entry?.space,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
store[key] = next;
await saveSessionStore(storePath, store);
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const next = await updateSessionStore(storePath, (store) => {
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
const now = Date.now();
const nextEntry: SessionEntry = {
sessionId: randomUUID(),
updatedAt: now,
systemSent: false,
abortedLastRun: false,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
model: entry?.model,
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
label: entry?.label,
displayName: entry?.displayName,
chatType: entry?.chatType,
channel: entry?.channel,
subject: entry?.subject,
room: entry?.room,
space: entry?.space,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
store[primaryKey] = nextEntry;
return nextEntry;
});
return {
ok: true,
payloadJSON: JSON.stringify({ ok: true, key, entry: next }),
@@ -249,9 +259,11 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true;
const { storePath, store, entry } = loadSessionEntry(key);
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const { entry } = loadSessionEntry(key);
const sessionId = entry?.sessionId;
const existed = Boolean(store[key]);
clearCommandLane(resolveEmbeddedSessionLane(key));
if (sessionId && isEmbeddedPiRunActive(sessionId)) {
abortEmbeddedPiRun(sessionId);
@@ -266,8 +278,19 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
};
}
}
if (existed) delete store[key];
await saveSessionStore(storePath, store);
const deletion = await updateSessionStore(storePath, (store) => {
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entryToDelete = store[primaryKey];
const existed = Boolean(entryToDelete);
if (existed) delete store[primaryKey];
return { existed, entry: entryToDelete };
});
const existed = deletion.existed;
const archived: string[] = [];
if (deleteTranscript && sessionId) {
@@ -323,7 +346,20 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
? Math.max(1, Math.floor(p.maxLines))
: 400;
const { storePath, store, entry } = loadSessionEntry(key);
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
// Resolve entry inside the lock, but compact outside to avoid holding it.
const compactTarget = await updateSessionStore(storePath, (store) => {
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
return { entry: store[primaryKey], primaryKey };
});
const entry = compactTarget.entry;
const sessionId = entry?.sessionId;
if (!sessionId) {
return {
@@ -373,13 +409,14 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8");
// Token counts no longer match; clear so status + UI reflect reality after the next turn.
if (store[key]) {
delete store[key].inputTokens;
delete store[key].outputTokens;
delete store[key].totalTokens;
store[key].updatedAt = Date.now();
await saveSessionStore(storePath, store);
}
await updateSessionStore(storePath, (store) => {
const entryToUpdate = store[compactTarget.primaryKey];
if (!entryToUpdate) return;
delete entryToUpdate.inputTokens;
delete entryToUpdate.outputTokens;
delete entryToUpdate.totalTokens;
entryToUpdate.updatedAt = Date.now();
});
return {
ok: true,

View File

@@ -6,7 +6,7 @@ import {
resolveAgentIdFromSessionKey,
resolveAgentMainSessionKey,
type SessionEntry,
saveSessionStore,
updateSessionStore,
} from "../../config/sessions.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
@@ -136,7 +136,7 @@ export const agentHandlers: GatewayRequestHandlers = {
let cfgForAgent: ReturnType<typeof loadConfig> | undefined;
if (requestedSessionKey) {
const { cfg, storePath, store, entry, canonicalKey } = loadSessionEntry(requestedSessionKey);
const { cfg, storePath, entry, canonicalKey } = loadSessionEntry(requestedSessionKey);
cfgForAgent = cfg;
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
@@ -178,11 +178,10 @@ export const agentHandlers: GatewayRequestHandlers = {
const canonicalSessionKey = canonicalKey;
const agentId = resolveAgentIdFromSessionKey(canonicalSessionKey);
const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId });
if (store) {
store[canonicalSessionKey] = nextEntry;
if (storePath) {
await saveSessionStore(storePath, store);
}
if (storePath) {
await updateSessionStore(storePath, (store) => {
store[canonicalSessionKey] = nextEntry;
});
}
if (canonicalSessionKey === mainSessionKey || canonicalSessionKey === "global") {
context.addChatRun(idem, {

View File

@@ -9,10 +9,9 @@ import {
} from "../../agents/pi-embedded.js";
import { loadConfig } from "../../config/config.js";
import {
loadSessionStore,
resolveMainSessionKey,
type SessionEntry,
saveSessionStore,
updateSessionStore,
} from "../../config/sessions.js";
import { clearCommandLane } from "../../process/command-queue.js";
import {
@@ -30,6 +29,7 @@ import {
archiveFileOnDisk,
listSessionsFromStore,
loadCombinedSessionStoreForGateway,
loadSessionEntry,
resolveGatewaySessionStoreTarget,
resolveSessionTranscriptCandidates,
type SessionsPatchResult,
@@ -106,26 +106,25 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const applied = await applySessionsPatchToStore({
cfg,
store,
storeKey: primaryKey,
patch: p,
loadGatewayModelCatalog: context.loadGatewayModelCatalog,
const applied = await updateSessionStore(storePath, async (store) => {
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
return await applySessionsPatchToStore({
cfg,
store,
storeKey: primaryKey,
patch: p,
loadGatewayModelCatalog: context.loadGatewayModelCatalog,
});
});
if (!applied.ok) {
respond(false, undefined, applied.error);
return;
}
await saveSessionStore(storePath, store);
const result: SessionsPatchResult = {
ok: true,
path: storePath,
@@ -156,34 +155,35 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
const now = Date.now();
const next: SessionEntry = {
sessionId: randomUUID(),
updatedAt: now,
systemSent: false,
abortedLastRun: false,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
responseUsage: entry?.responseUsage,
model: entry?.model,
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
label: entry?.label,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
store[primaryKey] = next;
await saveSessionStore(storePath, store);
const next = await updateSessionStore(storePath, (store) => {
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
const now = Date.now();
const nextEntry: SessionEntry = {
sessionId: randomUUID(),
updatedAt: now,
systemSent: false,
abortedLastRun: false,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
reasoningLevel: entry?.reasoningLevel,
responseUsage: entry?.responseUsage,
model: entry?.model,
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
label: entry?.label,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
store[primaryKey] = nextEntry;
return nextEntry;
});
respond(true, { ok: true, key: target.canonicalKey, entry: next }, undefined);
},
"sessions.delete": async ({ params, respond }) => {
@@ -220,14 +220,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true;
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
const { entry } = loadSessionEntry(key);
const sessionId = entry?.sessionId;
const existed = Boolean(entry);
clearCommandLane(resolveEmbeddedSessionLane(target.canonicalKey));
@@ -246,8 +239,15 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
}
if (existed) delete store[primaryKey];
await saveSessionStore(storePath, store);
await updateSessionStore(storePath, (store) => {
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
if (store[primaryKey]) delete store[primaryKey];
});
const archived: string[] = [];
if (deleteTranscript && sessionId) {
@@ -295,14 +295,17 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
// Lock + read in a short critical section; transcript work happens outside.
const compactTarget = await updateSessionStore(storePath, (store) => {
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
return { entry: store[primaryKey], primaryKey };
});
const entry = compactTarget.entry;
const sessionId = entry?.sessionId;
if (!sessionId) {
respond(
@@ -358,13 +361,15 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const keptLines = lines.slice(-maxLines);
fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8");
if (store[primaryKey]) {
delete store[primaryKey].inputTokens;
delete store[primaryKey].outputTokens;
delete store[primaryKey].totalTokens;
store[primaryKey].updatedAt = Date.now();
await saveSessionStore(storePath, store);
}
await updateSessionStore(storePath, (store) => {
const entryKey = compactTarget.primaryKey;
const entryToUpdate = store[entryKey];
if (!entryToUpdate) return;
delete entryToUpdate.inputTokens;
delete entryToUpdate.outputTokens;
delete entryToUpdate.totalTokens;
entryToUpdate.updatedAt = Date.now();
});
respond(
true,

View File

@@ -285,9 +285,12 @@ export function loadCombinedSessionStoreForGateway(cfg: ClawdbotConfig): {
const store = loadSessionStore(storePath);
for (const [key, entry] of Object.entries(store)) {
const canonicalKey = canonicalizeSessionKeyForAgent(agentId, key);
// Merge with existing entry if present (avoid overwriting with less complete data)
const existing = combined[canonicalKey];
combined[canonicalKey] = {
...existing,
...entry,
spawnedBy: canonicalizeSpawnedByForAgent(agentId, entry.spawnedBy),
spawnedBy: canonicalizeSpawnedByForAgent(agentId, entry.spawnedBy ?? existing?.spawnedBy),
};
}
}

View File

@@ -17,7 +17,7 @@ import {
resolveAgentIdFromSessionKey,
resolveMainSessionKey,
resolveStorePath,
saveSessionStore,
updateSessionStore,
} from "../config/sessions.js";
import { formatErrorMessage } from "../infra/errors.js";
import { createSubsystemLogger } from "../logging.js";
@@ -150,8 +150,13 @@ async function restoreHeartbeatUpdatedAt(params: {
if (!entry) return;
const nextUpdatedAt = Math.max(entry.updatedAt ?? 0, updatedAt);
if (entry.updatedAt === nextUpdatedAt) return;
store[sessionKey] = { ...entry, updatedAt: nextUpdatedAt };
await saveSessionStore(storePath, store);
await updateSessionStore(storePath, (nextStore) => {
const nextEntry = nextStore[sessionKey] ?? entry;
if (!nextEntry) return;
const resolvedUpdatedAt = Math.max(nextEntry.updatedAt ?? 0, updatedAt);
if (nextEntry.updatedAt === resolvedUpdatedAt) return;
nextStore[sessionKey] = { ...nextEntry, updatedAt: resolvedUpdatedAt };
});
}
function normalizeHeartbeatReply(

View File

@@ -11,7 +11,7 @@ import {
loadSessionStore,
resolveSessionKey,
resolveStorePath,
saveSessionStore,
updateSessionStore,
} from "../../config/sessions.js";
import { emitHeartbeatEvent } from "../../infra/heartbeat-events.js";
import { getChildLogger } from "../../logging.js";
@@ -72,7 +72,14 @@ export async function runWebHeartbeatOnce(opts: {
sessionId,
updatedAt: Date.now(),
};
await saveSessionStore(storePath, store);
await updateSessionStore(storePath, (nextStore) => {
const nextCurrent = nextStore[sessionKey] ?? current;
nextStore[sessionKey] = {
...nextCurrent,
sessionId,
updatedAt: Date.now(),
};
});
}
const sessionSnapshot = getSessionSnapshot(cfg, to, true);
if (verbose) {
@@ -163,7 +170,14 @@ export async function runWebHeartbeatOnce(opts: {
const store = loadSessionStore(storePath);
if (sessionSnapshot.entry && store[sessionSnapshot.key]) {
store[sessionSnapshot.key].updatedAt = sessionSnapshot.entry.updatedAt;
await saveSessionStore(storePath, store);
await updateSessionStore(storePath, (nextStore) => {
const nextEntry = nextStore[sessionSnapshot.key];
if (!nextEntry) return;
nextStore[sessionSnapshot.key] = {
...nextEntry,
updatedAt: sessionSnapshot.entry.updatedAt,
};
});
}
heartbeatLogger.info(