diff --git a/CHANGELOG.md b/CHANGELOG.md index 4788e8446..f695aa2f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -105,6 +105,7 @@ - Telegram: support media groups (multi-image messages). Thanks @obviyus for PR #220. - Telegram/WhatsApp: parse shared locations (pins, places, live) and expose structured ctx fields. Thanks @nachoiacovino for PR #194. - Auto-reply: block unauthorized `/reset` and infer WhatsApp senders from E.164 inputs. +- Auto-reply: reset corrupted Gemini sessions when function-call ordering breaks. Thanks @VACInc for PR #297. - Auto-reply: track compaction count in session status; verbose mode announces auto-compactions. - Telegram: notify users when inbound media exceeds size limits. Thanks @jarvis-medmatic for PR #283. - Telegram: send GIF media as animations (auto-play) and improve filename sniffing. diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts index 017a51ca9..5ae3b6ec1 100644 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts +++ b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../../config/sessions.js"; +import * as sessions from "../../config/sessions.js"; import type { TemplateContext } from "../templating.js"; import type { GetReplyOptions } from "../types.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; @@ -209,6 +210,151 @@ describe("runReplyAgent typing (heartbeat)", () => { expect(payloads[0]?.text).toContain("count 1"); expect(sessionStore.main.compactionCount).toBe(1); }); + it("resets corrupted Gemini sessions and deletes transcripts", async () => { + const prevStateDir = process.env.CLAWDBOT_STATE_DIR; + const stateDir = await fs.mkdtemp( + path.join(tmpdir(), "clawdbot-session-reset-"), + ); + process.env.CLAWDBOT_STATE_DIR = stateDir; + try { + const sessionId = "session-corrupt"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry = { sessionId, updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "bad", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error( + "function call turn comes immediately after a user turn or after a function response turn", + ); + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(res).toMatchObject({ + text: expect.stringContaining("Session history was corrupted"), + }); + expect(sessionStore.main).toBeUndefined(); + await expect(fs.access(transcriptPath)).rejects.toThrow(); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main).toBeUndefined(); + } finally { + if (prevStateDir) { + process.env.CLAWDBOT_STATE_DIR = prevStateDir; + } else { + delete process.env.CLAWDBOT_STATE_DIR; + } + } + }); + + it("keeps sessions intact on other errors", async () => { + const prevStateDir = process.env.CLAWDBOT_STATE_DIR; + const stateDir = await fs.mkdtemp( + path.join(tmpdir(), "clawdbot-session-noreset-"), + ); + process.env.CLAWDBOT_STATE_DIR = stateDir; + try { + const sessionId = "session-ok"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry = { sessionId, updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "ok", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error("INVALID_ARGUMENT: some other failure"); + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(res).toMatchObject({ + text: expect.stringContaining("Agent failed before reply"), + }); + expect(sessionStore.main).toBeDefined(); + await expect(fs.access(transcriptPath)).resolves.toBeUndefined(); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main).toBeDefined(); + } finally { + if (prevStateDir) { + process.env.CLAWDBOT_STATE_DIR = prevStateDir; + } else { + delete process.env.CLAWDBOT_STATE_DIR; + } + } + }); + + it("still replies even if session reset fails to persist", async () => { + const prevStateDir = process.env.CLAWDBOT_STATE_DIR; + const stateDir = await fs.mkdtemp( + path.join(tmpdir(), "clawdbot-session-reset-fail-"), + ); + process.env.CLAWDBOT_STATE_DIR = stateDir; + const saveSpy = vi + .spyOn(sessions, "saveSessionStore") + .mockRejectedValueOnce(new Error("boom")); + try { + const sessionId = "session-corrupt"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry = { sessionId, updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "bad", "utf-8"); + + runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error( + "function call turn comes immediately after a user turn or after a function response turn", + ); + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(res).toMatchObject({ + text: expect.stringContaining("Session history was corrupted"), + }); + expect(sessionStore.main).toBeUndefined(); + await expect(fs.access(transcriptPath)).rejects.toThrow(); + } finally { + saveSpy.mockRestore(); + if (prevStateDir) { + process.env.CLAWDBOT_STATE_DIR = prevStateDir; + } else { + delete process.env.CLAWDBOT_STATE_DIR; + } + } + }); it("rewrites Bun socket errors into friendly text", async () => { runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 79b85e57a..490b696cd 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -1,4 +1,5 @@ import crypto from "node:crypto"; +import fs from "node:fs"; import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; @@ -9,6 +10,7 @@ import { import { hasNonzeroUsage } from "../../agents/usage.js"; import { loadSessionStore, + resolveSessionTranscriptPath, type SessionEntry, saveSessionStore, } from "../../config/sessions.js"; @@ -378,6 +380,42 @@ export async function runReplyAgent(params: { const message = err instanceof Error ? err.message : String(err); const isContextOverflow = /context.*overflow|too large|context window/i.test(message); + const isSessionCorruption = + /function call turn comes immediately after/i.test(message); + + // Auto-recover from Gemini session corruption by resetting the session + if (isSessionCorruption && sessionKey && sessionStore && storePath) { + const corruptedSessionId = sessionEntry?.sessionId; + defaultRuntime.error( + `Session history corrupted (Gemini function call ordering). Resetting session: ${sessionKey}`, + ); + + try { + // Delete transcript file if it exists + if (corruptedSessionId) { + const transcriptPath = + resolveSessionTranscriptPath(corruptedSessionId); + try { + fs.unlinkSync(transcriptPath); + } catch { + // Ignore if file doesn't exist + } + } + + // Remove session entry from store + delete sessionStore[sessionKey]; + await saveSessionStore(storePath, sessionStore); + } catch (cleanupErr) { + defaultRuntime.error( + `Failed to reset corrupted session ${sessionKey}: ${String(cleanupErr)}`, + ); + } + + return finalizeWithFollowup({ + text: "⚠️ Session history was corrupted. I've reset the conversation - please try again!", + }); + } + defaultRuntime.error(`Embedded agent failed before reply: ${message}`); return finalizeWithFollowup({ text: isContextOverflow