fix: harden session cache + heartbeat restore
Co-authored-by: Ronak Guliani <ronak-guliani@users.noreply.github.com>
This commit is contained in:
@@ -38,7 +38,7 @@ import {
|
||||
type SessionEntry,
|
||||
saveSessionStore,
|
||||
} from "../config/sessions.js";
|
||||
import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js";
|
||||
import { clearAgentRunContext, emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js";
|
||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { applyVerboseOverride } from "../sessions/level-overrides.js";
|
||||
import { resolveSendPolicy } from "../sessions/send-policy.js";
|
||||
@@ -123,18 +123,19 @@ export async function agentCommand(
|
||||
let sessionEntry = resolvedSessionEntry;
|
||||
const runId = opts.runId?.trim() || sessionId;
|
||||
|
||||
if (opts.deliver === true) {
|
||||
const sendPolicy = resolveSendPolicy({
|
||||
cfg,
|
||||
entry: sessionEntry,
|
||||
sessionKey,
|
||||
channel: sessionEntry?.channel,
|
||||
chatType: sessionEntry?.chatType,
|
||||
});
|
||||
if (sendPolicy === "deny") {
|
||||
throw new Error("send blocked by session policy");
|
||||
try {
|
||||
if (opts.deliver === true) {
|
||||
const sendPolicy = resolveSendPolicy({
|
||||
cfg,
|
||||
entry: sessionEntry,
|
||||
sessionKey,
|
||||
channel: sessionEntry?.channel,
|
||||
chatType: sessionEntry?.chatType,
|
||||
});
|
||||
if (sendPolicy === "deny") {
|
||||
throw new Error("send blocked by session policy");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let resolvedThinkLevel =
|
||||
thinkOnce ??
|
||||
@@ -368,7 +369,7 @@ export async function agentCommand(
|
||||
) {
|
||||
lifecycleEnded = true;
|
||||
}
|
||||
emitAgentEvent({
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: evt.stream,
|
||||
data: evt.data,
|
||||
@@ -435,4 +436,7 @@ export async function agentCommand(
|
||||
result,
|
||||
payloads,
|
||||
});
|
||||
} finally {
|
||||
clearAgentRunContext(runId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,6 +76,32 @@ describe("Session Store Cache", () => {
|
||||
readSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("should not allow cached session mutations to leak across loads", async () => {
|
||||
const testStore: Record<string, SessionEntry> = {
|
||||
"session:1": {
|
||||
sessionId: "id-1",
|
||||
updatedAt: Date.now(),
|
||||
cliSessionIds: { openai: "sess-1" },
|
||||
skillsSnapshot: {
|
||||
prompt: "skills",
|
||||
skills: [{ name: "alpha" }],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
await saveSessionStore(storePath, testStore);
|
||||
|
||||
const loaded1 = loadSessionStore(storePath);
|
||||
loaded1["session:1"].cliSessionIds = { openai: "mutated" };
|
||||
if (loaded1["session:1"].skillsSnapshot?.skills?.length) {
|
||||
loaded1["session:1"].skillsSnapshot!.skills[0].name = "mutated";
|
||||
}
|
||||
|
||||
const loaded2 = loadSessionStore(storePath);
|
||||
expect(loaded2["session:1"].cliSessionIds?.openai).toBe("sess-1");
|
||||
expect(loaded2["session:1"].skillsSnapshot?.skills?.[0]?.name).toBe("alpha");
|
||||
});
|
||||
|
||||
it("should refresh cache when store file changes on disk", async () => {
|
||||
const testStore: Record<string, SessionEntry> = {
|
||||
"session:1": {
|
||||
|
||||
@@ -52,8 +52,8 @@ export function loadSessionStore(storePath: string): Record<string, SessionEntry
|
||||
if (cached && isSessionStoreCacheValid(cached)) {
|
||||
const currentMtimeMs = getFileMtimeMs(storePath);
|
||||
if (currentMtimeMs === cached.mtimeMs) {
|
||||
// Return a shallow copy to prevent external mutations affecting cache
|
||||
return { ...cached.store };
|
||||
// Return a deep copy to prevent external mutations affecting cache
|
||||
return structuredClone(cached.store);
|
||||
}
|
||||
invalidateSessionStoreCache(storePath);
|
||||
}
|
||||
@@ -90,14 +90,14 @@ export function loadSessionStore(storePath: string): Record<string, SessionEntry
|
||||
// Cache the result if caching is enabled
|
||||
if (isSessionStoreCacheEnabled()) {
|
||||
SESSION_STORE_CACHE.set(storePath, {
|
||||
store: { ...store }, // Store a copy to prevent external mutations
|
||||
store: structuredClone(store), // Store a copy to prevent external mutations
|
||||
loadedAt: Date.now(),
|
||||
storePath,
|
||||
mtimeMs,
|
||||
});
|
||||
}
|
||||
|
||||
return store;
|
||||
return structuredClone(store);
|
||||
}
|
||||
|
||||
async function saveSessionStoreUnlocked(
|
||||
|
||||
@@ -129,6 +129,73 @@ describe("resolveHeartbeatIntervalMs", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("does not regress updatedAt when restoring heartbeat sessions", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
try {
|
||||
const originalUpdatedAt = 1000;
|
||||
const bumpedUpdatedAt = 2000;
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
main: {
|
||||
sessionId: "sid",
|
||||
updatedAt: originalUpdatedAt,
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
const cfg: ClawdbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "whatsapp",
|
||||
to: "+1555",
|
||||
},
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
|
||||
replySpy.mockImplementationOnce(async () => {
|
||||
const raw = await fs.readFile(storePath, "utf-8");
|
||||
const parsed = JSON.parse(raw) as { main?: { updatedAt?: number } };
|
||||
if (parsed.main) {
|
||||
parsed.main.updatedAt = bumpedUpdatedAt;
|
||||
}
|
||||
await fs.writeFile(storePath, JSON.stringify(parsed, null, 2));
|
||||
return { text: "" };
|
||||
});
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
deps: {
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
webAuthExists: async () => true,
|
||||
hasActiveWebListener: () => true,
|
||||
},
|
||||
});
|
||||
|
||||
const finalStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
main?: { updatedAt?: number };
|
||||
};
|
||||
expect(finalStore.main?.updatedAt).toBe(bumpedUpdatedAt);
|
||||
} finally {
|
||||
replySpy.mockRestore();
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("skips WhatsApp delivery when not linked or running", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
|
||||
@@ -148,8 +148,9 @@ async function restoreHeartbeatUpdatedAt(params: {
|
||||
const store = loadSessionStore(storePath);
|
||||
const entry = store[sessionKey];
|
||||
if (!entry) return;
|
||||
if (entry.updatedAt === updatedAt) return;
|
||||
store[sessionKey] = { ...entry, updatedAt };
|
||||
const nextUpdatedAt = Math.max(entry.updatedAt ?? 0, updatedAt);
|
||||
if (entry.updatedAt === nextUpdatedAt) return;
|
||||
store[sessionKey] = { ...entry, updatedAt: nextUpdatedAt };
|
||||
await saveSessionStore(storePath, store);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user