diff --git a/CHANGELOG.md b/CHANGELOG.md index 51b822391..f85bfe3da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - Agents: treat message tool errors as failures so fallback replies still send; require `to` + `message` for `action=send`. (#717) — thanks @theglove44. - Agents: route subagent transcripts to the target agent sessions directory and add regression coverage. (#708) — thanks @xMikeMickelson. - Agents/Tools: preserve action enums when flattening tool schemas. (#708) — thanks @xMikeMickelson. +- Agents: reset sessions and retry when auto-compaction overflows instead of crashing the gateway. ## 2026.1.10 diff --git a/src/agents/pi-embedded-helpers.test.ts b/src/agents/pi-embedded-helpers.test.ts index 4d981663b..3b3f5e3fb 100644 --- a/src/agents/pi-embedded-helpers.test.ts +++ b/src/agents/pi-embedded-helpers.test.ts @@ -7,6 +7,7 @@ import { formatAssistantErrorText, isBillingErrorMessage, isCloudCodeAssistFormatError, + isCompactionFailureError, isContextOverflowError, isFailoverErrorMessage, isMessagingToolDuplicate, @@ -208,6 +209,8 @@ describe("isContextOverflowError", () => { "Request exceeds the maximum size", "context length exceeded", "Maximum context length", + "prompt is too long: 208423 tokens > 200000 maximum", + "Context overflow: Summarization failed", "413 Request Entity Too Large", ]; for (const sample of samples) { @@ -220,6 +223,26 @@ describe("isContextOverflowError", () => { }); }); +describe("isCompactionFailureError", () => { + it("matches compaction overflow failures", () => { + const samples = [ + "Context overflow: Summarization failed: 400 {\"message\":\"prompt is too long\"}", + "auto-compaction failed due to context overflow", + "Compaction failed: prompt is too long", + ]; + for (const sample of samples) { + expect(isCompactionFailureError(sample)).toBe(true); + } + }); + + it("ignores non-compaction overflow errors", () => { + expect( + isCompactionFailureError("Context overflow: prompt too large"), + ).toBe(false); + expect(isCompactionFailureError("rate limit exceeded")).toBe(false); + }); +}); + describe("isBillingErrorMessage", () => { it("matches credit / payment failures", () => { const samples = [ diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts index 755d6290f..7b24dc732 100644 --- a/src/agents/pi-embedded-helpers.ts +++ b/src/agents/pi-embedded-helpers.ts @@ -244,10 +244,24 @@ export function isContextOverflowError(errorMessage?: string): boolean { lower.includes("request exceeds the maximum size") || lower.includes("context length exceeded") || lower.includes("maximum context length") || + lower.includes("prompt is too long") || + lower.includes("context overflow") || (lower.includes("413") && lower.includes("too large")) ); } +export function isCompactionFailureError(errorMessage?: string): boolean { + if (!errorMessage) return false; + if (!isContextOverflowError(errorMessage)) return false; + const lower = errorMessage.toLowerCase(); + return ( + lower.includes("summarization failed") || + lower.includes("auto-compaction") || + lower.includes("compaction failed") || + lower.includes("compaction") + ); +} + export function formatAssistantErrorText( msg: AssistantMessage, opts?: { cfg?: ClawdbotConfig; sessionKey?: string }, diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 9083e9e59..a5aa190ce 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -36,6 +36,7 @@ import { isCacheEnabled, resolveCacheTtlMs } from "../config/cache-utils.js"; import type { ClawdbotConfig } from "../config/config.js"; import { resolveProviderCapabilities } from "../config/provider-capabilities.js"; import { getMachineDisplayName } from "../infra/machine-name.js"; +import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js"; import { createSubsystemLogger } from "../logging.js"; import { type enqueueCommand, @@ -85,6 +86,7 @@ import { formatAssistantErrorText, isAuthAssistantError, isCloudCodeAssistFormatError, + isCompactionFailureError, isContextOverflowError, isFailoverAssistantError, isFailoverErrorMessage, @@ -408,6 +410,13 @@ type EmbeddedPiQueueHandle = { const log = createSubsystemLogger("agent/embedded"); const GOOGLE_TURN_ORDERING_CUSTOM_TYPE = "google-turn-ordering-bootstrap"; +registerUnhandledRejectionHandler((reason) => { + const message = describeUnknownError(reason); + if (!isCompactionFailureError(message)) return false; + log.error(`Auto-compaction failed (unhandled): ${message}`); + return true; +}); + type CustomEntryLike = { type?: unknown; customType?: unknown }; function hasGoogleTurnOrderingMarker(sessionManager: SessionManager): boolean { diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts index 91f3086ca..4375d44b1 100644 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts +++ b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts @@ -375,6 +375,57 @@ describe("runReplyAgent typing (heartbeat)", () => { } }); + it("retries after compaction failure by resetting the session", async () => { + const prevStateDir = process.env.CLAWDBOT_STATE_DIR; + const stateDir = await fs.mkdtemp( + path.join(tmpdir(), "clawdbot-session-compaction-reset-"), + ); + process.env.CLAWDBOT_STATE_DIR = stateDir; + try { + const sessionId = "session"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry = { sessionId, updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + + runEmbeddedPiAgentMock + .mockImplementationOnce(async () => { + throw new Error( + "Context overflow: Summarization failed: 400 {\"message\":\"prompt is too long\"}", + ); + }) + .mockImplementationOnce(async () => ({ + payloads: [{ text: "ok" }], + meta: {}, + })); + + const callsBefore = runEmbeddedPiAgentMock.mock.calls.length; + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + + expect(runEmbeddedPiAgentMock.mock.calls.length - callsBefore).toBe(2); + const payload = Array.isArray(res) ? res[0] : res; + expect(payload).toMatchObject({ text: "ok" }); + expect(sessionStore.main.sessionId).not.toBe(sessionId); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); + } finally { + if (prevStateDir) { + process.env.CLAWDBOT_STATE_DIR = prevStateDir; + } else { + delete process.env.CLAWDBOT_STATE_DIR; + } + } + }); + it("still replies even if session reset fails to persist", async () => { const prevStateDir = process.env.CLAWDBOT_STATE_DIR; const stateDir = await fs.mkdtemp( diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 45a54543b..7721cb164 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -7,6 +7,10 @@ import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { resolveModelAuthMode } from "../../agents/model-auth.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; import { isCliProvider } from "../../agents/model-selection.js"; +import { + isCompactionFailureError, + isContextOverflowError, +} from "../../agents/pi-embedded-helpers.js"; import { queueEmbeddedPiMessage, runEmbeddedPiAgent, @@ -15,6 +19,7 @@ import { hasNonzeroUsage, type NormalizedUsage } from "../../agents/usage.js"; import type { ClawdbotConfig } from "../../config/config.js"; import { loadSessionStore, + resolveAgentIdFromSessionKey, resolveSessionTranscriptPath, type SessionEntry, saveSessionStore, @@ -231,6 +236,10 @@ export async function runReplyAgent(params: { typingMode, } = params; + let activeSessionEntry = sessionEntry; + let activeSessionStore = sessionStore; + let activeIsNewSession = isNewSession; + const isHeartbeat = opts?.isHeartbeat === true; const typingSignals = createTypingSignaler({ typing, @@ -303,11 +312,11 @@ export async function runReplyAgent(params: { followupRun.prompt, ); if (steered && !shouldFollowup) { - if (sessionEntry && sessionStore && sessionKey) { - sessionEntry.updatedAt = Date.now(); - sessionStore[sessionKey] = sessionEntry; + if (activeSessionEntry && activeSessionStore && sessionKey) { + activeSessionEntry.updatedAt = Date.now(); + activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await saveSessionStore(storePath, activeSessionStore); } } typing.cleanup(); @@ -317,11 +326,11 @@ export async function runReplyAgent(params: { if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) { enqueueFollowupRun(queueKey, followupRun, resolvedQueue); - if (sessionEntry && sessionStore && sessionKey) { - sessionEntry.updatedAt = Date.now(); - sessionStore[sessionKey] = sessionEntry; + if (activeSessionEntry && activeSessionStore && sessionKey) { + activeSessionEntry.updatedAt = Date.now(); + activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await saveSessionStore(storePath, activeSessionStore); } } typing.cleanup(); @@ -332,8 +341,8 @@ export async function runReplyAgent(params: { opts, typing, typingMode, - sessionEntry, - sessionStore, + sessionEntry: activeSessionEntry, + sessionStore: activeSessionStore, sessionKey, storePath, defaultModel, @@ -348,6 +357,46 @@ export async function runReplyAgent(params: { let didLogHeartbeatStrip = false; let autoCompactionCompleted = false; let responseUsageLine: string | undefined; + const resetSessionAfterCompactionFailure = async ( + reason: string, + ): Promise => { + if (!sessionKey || !activeSessionStore || !storePath) return false; + const nextSessionId = crypto.randomUUID(); + const nextEntry: SessionEntry = { + ...(activeSessionStore[sessionKey] ?? activeSessionEntry), + sessionId: nextSessionId, + updatedAt: Date.now(), + systemSent: false, + abortedLastRun: false, + }; + const agentId = resolveAgentIdFromSessionKey(sessionKey); + const topicId = + typeof sessionCtx.MessageThreadId === "number" + ? sessionCtx.MessageThreadId + : undefined; + const nextSessionFile = resolveSessionTranscriptPath( + nextSessionId, + agentId, + topicId, + ); + nextEntry.sessionFile = nextSessionFile; + activeSessionStore[sessionKey] = nextEntry; + try { + await saveSessionStore(storePath, activeSessionStore); + } catch (err) { + defaultRuntime.error( + `Failed to persist session reset after compaction failure (${sessionKey}): ${String(err)}`, + ); + } + followupRun.run.sessionId = nextSessionId; + followupRun.run.sessionFile = nextSessionFile; + activeSessionEntry = nextEntry; + activeIsNewSession = true; + defaultRuntime.error( + `Auto-compaction failed (${reason}). Restarting session ${sessionKey} -> ${nextSessionId} and retrying.`, + ); + return true; + }; try { const runId = crypto.randomUUID(); if (sessionKey) { @@ -359,342 +408,360 @@ export async function runReplyAgent(params: { let runResult: Awaited>; let fallbackProvider = followupRun.run.provider; let fallbackModel = followupRun.run.model; - try { - const allowPartialStream = !( - followupRun.run.reasoningLevel === "stream" && opts?.onReasoningStream - ); - const fallbackResult = await runWithModelFallback({ - cfg: followupRun.run.config, - provider: followupRun.run.provider, - model: followupRun.run.model, - run: (provider, model) => { - if (isCliProvider(provider, followupRun.run.config)) { - const startedAt = Date.now(); - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "start", - startedAt, - }, - }); - const cliSessionId = getCliSessionId(sessionEntry, provider); - return runCliAgent({ + let didResetAfterCompactionFailure = false; + while (true) { + try { + const allowPartialStream = !( + followupRun.run.reasoningLevel === "stream" && opts?.onReasoningStream + ); + const fallbackResult = await runWithModelFallback({ + cfg: followupRun.run.config, + provider: followupRun.run.provider, + model: followupRun.run.model, + run: (provider, model) => { + if (isCliProvider(provider, followupRun.run.config)) { + const startedAt = Date.now(); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "start", + startedAt, + }, + }); + const cliSessionId = getCliSessionId(activeSessionEntry, provider); + return runCliAgent({ + sessionId: followupRun.run.sessionId, + sessionKey, + sessionFile: followupRun.run.sessionFile, + workspaceDir: followupRun.run.workspaceDir, + config: followupRun.run.config, + prompt: commandBody, + provider, + model, + thinkLevel: followupRun.run.thinkLevel, + timeoutMs: followupRun.run.timeoutMs, + runId, + extraSystemPrompt: followupRun.run.extraSystemPrompt, + ownerNumbers: followupRun.run.ownerNumbers, + cliSessionId, + }) + .then((result) => { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt, + endedAt: Date.now(), + }, + }); + return result; + }) + .catch((err) => { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "error", + startedAt, + endedAt: Date.now(), + error: err instanceof Error ? err.message : String(err), + }, + }); + throw err; + }); + } + return runEmbeddedPiAgent({ sessionId: followupRun.run.sessionId, sessionKey, + messageProvider: + sessionCtx.Provider?.trim().toLowerCase() || undefined, + agentAccountId: sessionCtx.AccountId, + // Provider threading context for tool auto-injection + ...buildThreadingToolContext({ + sessionCtx, + config: followupRun.run.config, + hasRepliedRef: opts?.hasRepliedRef, + }), sessionFile: followupRun.run.sessionFile, workspaceDir: followupRun.run.workspaceDir, + agentDir: followupRun.run.agentDir, config: followupRun.run.config, + skillsSnapshot: followupRun.run.skillsSnapshot, prompt: commandBody, - provider, - model, - thinkLevel: followupRun.run.thinkLevel, - timeoutMs: followupRun.run.timeoutMs, - runId, extraSystemPrompt: followupRun.run.extraSystemPrompt, ownerNumbers: followupRun.run.ownerNumbers, - cliSessionId, - }) - .then((result) => { - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "end", - startedAt, - endedAt: Date.now(), - }, - }); - return result; - }) - .catch((err) => { - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "error", - startedAt, - endedAt: Date.now(), - error: err instanceof Error ? err.message : String(err), - }, - }); - throw err; - }); - } - return runEmbeddedPiAgent({ - sessionId: followupRun.run.sessionId, - sessionKey, - messageProvider: - sessionCtx.Provider?.trim().toLowerCase() || undefined, - agentAccountId: sessionCtx.AccountId, - // Provider threading context for tool auto-injection - ...buildThreadingToolContext({ - sessionCtx, - config: followupRun.run.config, - hasRepliedRef: opts?.hasRepliedRef, - }), - sessionFile: followupRun.run.sessionFile, - workspaceDir: followupRun.run.workspaceDir, - agentDir: followupRun.run.agentDir, - config: followupRun.run.config, - skillsSnapshot: followupRun.run.skillsSnapshot, - prompt: commandBody, - extraSystemPrompt: followupRun.run.extraSystemPrompt, - ownerNumbers: followupRun.run.ownerNumbers, - enforceFinalTag: followupRun.run.enforceFinalTag, - provider, - model, - authProfileId: followupRun.run.authProfileId, - thinkLevel: followupRun.run.thinkLevel, - verboseLevel: followupRun.run.verboseLevel, - reasoningLevel: followupRun.run.reasoningLevel, - bashElevated: followupRun.run.bashElevated, - timeoutMs: followupRun.run.timeoutMs, - runId, - blockReplyBreak: resolvedBlockStreamingBreak, - blockReplyChunking, - onPartialReply: - opts?.onPartialReply && allowPartialStream - ? async (payload) => { - let text = payload.text; - if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { - const stripped = stripHeartbeatToken(text, { - mode: "message", - }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose( - "Stripped stray HEARTBEAT_OK token from reply", - ); + enforceFinalTag: followupRun.run.enforceFinalTag, + provider, + model, + authProfileId: followupRun.run.authProfileId, + thinkLevel: followupRun.run.thinkLevel, + verboseLevel: followupRun.run.verboseLevel, + reasoningLevel: followupRun.run.reasoningLevel, + bashElevated: followupRun.run.bashElevated, + timeoutMs: followupRun.run.timeoutMs, + runId, + blockReplyBreak: resolvedBlockStreamingBreak, + blockReplyChunking, + onPartialReply: + opts?.onPartialReply && allowPartialStream + ? async (payload) => { + let text = payload.text; + if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { + const stripped = stripHeartbeatToken(text, { + mode: "message", + }); + if (stripped.didStrip && !didLogHeartbeatStrip) { + didLogHeartbeatStrip = true; + logVerbose( + "Stripped stray HEARTBEAT_OK token from reply", + ); + } + if ( + stripped.shouldSkip && + (payload.mediaUrls?.length ?? 0) === 0 + ) { + return; + } + text = stripped.text; } - if ( - stripped.shouldSkip && - (payload.mediaUrls?.length ?? 0) === 0 - ) { - return; - } - text = stripped.text; - } - if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return; - await typingSignals.signalTextDelta(text); - await opts.onPartialReply?.({ - text, - mediaUrls: payload.mediaUrls, - }); - } - : undefined, - onReasoningStream: - typingSignals.shouldStartOnReasoning || opts?.onReasoningStream - ? async (payload) => { - await typingSignals.signalReasoningDelta(); - await opts?.onReasoningStream?.({ - text: payload.text, - mediaUrls: payload.mediaUrls, - }); - } - : undefined, - onAgentEvent: (evt) => { - // Trigger typing when tools start executing - if (evt.stream === "tool") { - const phase = - typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "start") { - void typingSignals.signalToolStart(); - } - } - // Track auto-compaction completion - if (evt.stream === "compaction") { - const phase = - typeof evt.data.phase === "string" ? evt.data.phase : ""; - const willRetry = Boolean(evt.data.willRetry); - if (phase === "end" && !willRetry) { - autoCompactionCompleted = true; - } - } - }, - onBlockReply: - blockStreamingEnabled && opts?.onBlockReply - ? async (payload) => { - let text = payload.text; - if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { - const stripped = stripHeartbeatToken(text, { - mode: "message", - }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose( - "Stripped stray HEARTBEAT_OK token from reply", - ); - } - const hasMedia = (payload.mediaUrls?.length ?? 0) > 0; - if (stripped.shouldSkip && !hasMedia) return; - text = stripped.text; - } - const taggedPayload = applyReplyTagsToPayload( - { + if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return; + await typingSignals.signalTextDelta(text); + await opts.onPartialReply?.({ text, mediaUrls: payload.mediaUrls, - mediaUrl: payload.mediaUrls?.[0], - }, - sessionCtx.MessageSid, - ); - // Let through payloads with audioAsVoice flag even if empty (need to track it) - if ( - !isRenderablePayload(taggedPayload) && - !payload.audioAsVoice - ) - return; - const parsed = parseReplyDirectives( - taggedPayload.text ?? "", - { - currentMessageId: sessionCtx.MessageSid, - silentToken: SILENT_REPLY_TOKEN, - }, - ); - const cleaned = parsed.text || undefined; - const hasMedia = - Boolean(taggedPayload.mediaUrl) || - (taggedPayload.mediaUrls?.length ?? 0) > 0; - // Skip empty payloads unless they have audioAsVoice flag (need to track it) - if ( - !cleaned && - !hasMedia && - !payload.audioAsVoice && - !parsed.audioAsVoice - ) - return; - if (parsed.isSilent && !hasMedia) return; + }); + } + : undefined, + onReasoningStream: + typingSignals.shouldStartOnReasoning || opts?.onReasoningStream + ? async (payload) => { + await typingSignals.signalReasoningDelta(); + await opts?.onReasoningStream?.({ + text: payload.text, + mediaUrls: payload.mediaUrls, + }); + } + : undefined, + onAgentEvent: (evt) => { + // Trigger typing when tools start executing + if (evt.stream === "tool") { + const phase = + typeof evt.data.phase === "string" ? evt.data.phase : ""; + if (phase === "start") { + void typingSignals.signalToolStart(); + } + } + // Track auto-compaction completion + if (evt.stream === "compaction") { + const phase = + typeof evt.data.phase === "string" ? evt.data.phase : ""; + const willRetry = Boolean(evt.data.willRetry); + if (phase === "end" && !willRetry) { + autoCompactionCompleted = true; + } + } + }, + onBlockReply: + blockStreamingEnabled && opts?.onBlockReply + ? async (payload) => { + let text = payload.text; + if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { + const stripped = stripHeartbeatToken(text, { + mode: "message", + }); + if (stripped.didStrip && !didLogHeartbeatStrip) { + didLogHeartbeatStrip = true; + logVerbose( + "Stripped stray HEARTBEAT_OK token from reply", + ); + } + const hasMedia = (payload.mediaUrls?.length ?? 0) > 0; + if (stripped.shouldSkip && !hasMedia) return; + text = stripped.text; + } + const taggedPayload = applyReplyTagsToPayload( + { + text, + mediaUrls: payload.mediaUrls, + mediaUrl: payload.mediaUrls?.[0], + }, + sessionCtx.MessageSid, + ); + // Let through payloads with audioAsVoice flag even if empty (need to track it) + if ( + !isRenderablePayload(taggedPayload) && + !payload.audioAsVoice + ) + return; + const parsed = parseReplyDirectives( + taggedPayload.text ?? "", + { + currentMessageId: sessionCtx.MessageSid, + silentToken: SILENT_REPLY_TOKEN, + }, + ); + const cleaned = parsed.text || undefined; + const hasMedia = + Boolean(taggedPayload.mediaUrl) || + (taggedPayload.mediaUrls?.length ?? 0) > 0; + // Skip empty payloads unless they have audioAsVoice flag (need to track it) + if ( + !cleaned && + !hasMedia && + !payload.audioAsVoice && + !parsed.audioAsVoice + ) + return; + if (parsed.isSilent && !hasMedia) return; - const blockPayload: ReplyPayload = applyReplyToMode({ - ...taggedPayload, - text: cleaned, - audioAsVoice: Boolean( - parsed.audioAsVoice || payload.audioAsVoice, - ), - replyToId: taggedPayload.replyToId ?? parsed.replyToId, - replyToTag: taggedPayload.replyToTag || parsed.replyToTag, - replyToCurrent: - taggedPayload.replyToCurrent || parsed.replyToCurrent, - }); + const blockPayload: ReplyPayload = applyReplyToMode({ + ...taggedPayload, + text: cleaned, + audioAsVoice: Boolean( + parsed.audioAsVoice || payload.audioAsVoice, + ), + replyToId: taggedPayload.replyToId ?? parsed.replyToId, + replyToTag: + taggedPayload.replyToTag || parsed.replyToTag, + replyToCurrent: + taggedPayload.replyToCurrent || parsed.replyToCurrent, + }); - void typingSignals - .signalTextDelta(cleaned ?? taggedPayload.text) + void typingSignals + .signalTextDelta(cleaned ?? taggedPayload.text) + .catch((err) => { + logVerbose( + `block reply typing signal failed: ${String(err)}`, + ); + }); + + blockReplyPipeline?.enqueue(blockPayload); + } + : undefined, + shouldEmitToolResult, + onToolResult: opts?.onToolResult + ? (payload) => { + // `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them. + // If a tool callback starts typing after the run finalized, we can end up with + // a typing loop that never sees a matching markRunComplete(). Track and drain. + const task = (async () => { + let text = payload.text; + if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { + const stripped = stripHeartbeatToken(text, { + mode: "message", + }); + if (stripped.didStrip && !didLogHeartbeatStrip) { + didLogHeartbeatStrip = true; + logVerbose( + "Stripped stray HEARTBEAT_OK token from reply", + ); + } + if ( + stripped.shouldSkip && + (payload.mediaUrls?.length ?? 0) === 0 + ) { + return; + } + text = stripped.text; + } + await typingSignals.signalTextDelta(text); + await opts.onToolResult?.({ + text, + mediaUrls: payload.mediaUrls, + }); + })() .catch((err) => { logVerbose( - `block reply typing signal failed: ${String(err)}`, + `tool result delivery failed: ${String(err)}`, ); + }) + .finally(() => { + pendingToolTasks.delete(task); }); - - blockReplyPipeline?.enqueue(blockPayload); + pendingToolTasks.add(task); } : undefined, - shouldEmitToolResult, - onToolResult: opts?.onToolResult - ? (payload) => { - // `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them. - // If a tool callback starts typing after the run finalized, we can end up with - // a typing loop that never sees a matching markRunComplete(). Track and drain. - const task = (async () => { - let text = payload.text; - if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { - const stripped = stripHeartbeatToken(text, { - mode: "message", - }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose( - "Stripped stray HEARTBEAT_OK token from reply", - ); - } - if ( - stripped.shouldSkip && - (payload.mediaUrls?.length ?? 0) === 0 - ) { - return; - } - text = stripped.text; - } - await typingSignals.signalTextDelta(text); - await opts.onToolResult?.({ - text, - mediaUrls: payload.mediaUrls, - }); - })() - .catch((err) => { - logVerbose(`tool result delivery failed: ${String(err)}`); - }) - .finally(() => { - pendingToolTasks.delete(task); - }); - pendingToolTasks.add(task); - } - : undefined, - }); - }, - }); - runResult = fallbackResult.result; - fallbackProvider = fallbackResult.provider; - fallbackModel = fallbackResult.model; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - const isContextOverflow = - /context.*overflow|too large|context window/i.test(message); - const isSessionCorruption = - /function call turn comes immediately after/i.test(message); + }); + }, + }); + runResult = fallbackResult.result; + fallbackProvider = fallbackResult.provider; + fallbackModel = fallbackResult.model; + break; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + const isContextOverflow = + isContextOverflowError(message) || + /context.*overflow|too large|context window/i.test(message); + const isCompactionFailure = isCompactionFailureError(message); + const isSessionCorruption = + /function call turn comes immediately after/i.test(message); - // Auto-recover from Gemini session corruption by resetting the session - if (isSessionCorruption && sessionKey && sessionStore && storePath) { - const corruptedSessionId = sessionEntry?.sessionId; - defaultRuntime.error( - `Session history corrupted (Gemini function call ordering). Resetting session: ${sessionKey}`, - ); - - try { - // Delete transcript file if it exists - if (corruptedSessionId) { - const transcriptPath = - resolveSessionTranscriptPath(corruptedSessionId); - try { - fs.unlinkSync(transcriptPath); - } catch { - // Ignore if file doesn't exist - } - } - - // Remove session entry from store - delete sessionStore[sessionKey]; - await saveSessionStore(storePath, sessionStore); - } catch (cleanupErr) { - defaultRuntime.error( - `Failed to reset corrupted session ${sessionKey}: ${String(cleanupErr)}`, - ); + if ( + isCompactionFailure && + !didResetAfterCompactionFailure && + (await resetSessionAfterCompactionFailure(message)) + ) { + didResetAfterCompactionFailure = true; + continue; } + // Auto-recover from Gemini session corruption by resetting the session + if (isSessionCorruption && sessionKey && activeSessionStore && storePath) { + const corruptedSessionId = activeSessionEntry?.sessionId; + defaultRuntime.error( + `Session history corrupted (Gemini function call ordering). Resetting session: ${sessionKey}`, + ); + + try { + // Delete transcript file if it exists + if (corruptedSessionId) { + const transcriptPath = + resolveSessionTranscriptPath(corruptedSessionId); + try { + fs.unlinkSync(transcriptPath); + } catch { + // Ignore if file doesn't exist + } + } + + // Remove session entry from store + delete activeSessionStore[sessionKey]; + await saveSessionStore(storePath, activeSessionStore); + } catch (cleanupErr) { + defaultRuntime.error( + `Failed to reset corrupted session ${sessionKey}: ${String(cleanupErr)}`, + ); + } + + return finalizeWithFollowup({ + text: "⚠️ Session history was corrupted. I've reset the conversation - please try again!", + }); + } + + defaultRuntime.error(`Embedded agent failed before reply: ${message}`); return finalizeWithFollowup({ - text: "⚠️ Session history was corrupted. I've reset the conversation - please try again!", + text: isContextOverflow + ? "⚠️ Context overflow - conversation too long. Starting fresh might help!" + : `⚠️ Agent failed before reply: ${message}. Check gateway logs for details.`, }); } - - defaultRuntime.error(`Embedded agent failed before reply: ${message}`); - return finalizeWithFollowup({ - text: isContextOverflow - ? "⚠️ Context overflow - conversation too long. Starting fresh might help!" - : `⚠️ Agent failed before reply: ${message}. Check gateway logs for details.`, - }); } if ( shouldInjectGroupIntro && - sessionEntry && - sessionStore && + activeSessionEntry && + activeSessionStore && sessionKey && - sessionEntry.groupActivationNeedsSystemIntro + activeSessionEntry.groupActivationNeedsSystemIntro ) { - sessionEntry.groupActivationNeedsSystemIntro = false; - sessionEntry.updatedAt = Date.now(); - sessionStore[sessionKey] = sessionEntry; + activeSessionEntry.groupActivationNeedsSystemIntro = false; + activeSessionEntry.updatedAt = Date.now(); + activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - await saveSessionStore(storePath, sessionStore); + await saveSessionStore(storePath, activeSessionStore); } } @@ -814,7 +881,7 @@ export async function runReplyAgent(params: { const contextTokensUsed = agentCfgContextTokens ?? lookupContextTokens(modelUsed) ?? - sessionEntry?.contextTokens ?? + activeSessionEntry?.contextTokens ?? DEFAULT_CONTEXT_TOKENS; if (storePath && sessionKey) { @@ -884,9 +951,9 @@ export async function runReplyAgent(params: { } const responseUsageEnabled = - (sessionEntry?.responseUsage ?? + (activeSessionEntry?.responseUsage ?? (sessionKey - ? sessionStore?.[sessionKey]?.responseUsage + ? activeSessionStore?.[sessionKey]?.responseUsage : undefined)) === "on"; if (responseUsageEnabled && hasNonzeroUsage(usage)) { const authMode = resolveModelAuthMode(providerUsed, cfg); @@ -910,8 +977,8 @@ export async function runReplyAgent(params: { let finalPayloads = replyPayloads; if (autoCompactionCompleted) { const count = await incrementCompactionCount({ - sessionEntry, - sessionStore, + sessionEntry: activeSessionEntry, + sessionStore: activeSessionStore, sessionKey, storePath, }); @@ -923,7 +990,7 @@ export async function runReplyAgent(params: { ]; } } - if (resolvedVerboseLevel === "on" && isNewSession) { + if (resolvedVerboseLevel === "on" && activeIsNewSession) { finalPayloads = [ { text: `🧭 New session: ${followupRun.run.sessionId}` }, ...finalPayloads,