fix: reset session after compaction overflow

This commit is contained in:
Peter Steinberger
2026-01-12 00:28:02 +00:00
parent 32df2ef7bd
commit 67743325ee
6 changed files with 485 additions and 320 deletions

View File

@@ -29,6 +29,7 @@
- Agents: treat message tool errors as failures so fallback replies still send; require `to` + `message` for `action=send`. (#717) — thanks @theglove44. - Agents: treat message tool errors as failures so fallback replies still send; require `to` + `message` for `action=send`. (#717) — thanks @theglove44.
- Agents: route subagent transcripts to the target agent sessions directory and add regression coverage. (#708) — thanks @xMikeMickelson. - Agents: route subagent transcripts to the target agent sessions directory and add regression coverage. (#708) — thanks @xMikeMickelson.
- Agents/Tools: preserve action enums when flattening tool schemas. (#708) — thanks @xMikeMickelson. - Agents/Tools: preserve action enums when flattening tool schemas. (#708) — thanks @xMikeMickelson.
- Agents: reset sessions and retry when auto-compaction overflows instead of crashing the gateway.
## 2026.1.10 ## 2026.1.10

View File

@@ -7,6 +7,7 @@ import {
formatAssistantErrorText, formatAssistantErrorText,
isBillingErrorMessage, isBillingErrorMessage,
isCloudCodeAssistFormatError, isCloudCodeAssistFormatError,
isCompactionFailureError,
isContextOverflowError, isContextOverflowError,
isFailoverErrorMessage, isFailoverErrorMessage,
isMessagingToolDuplicate, isMessagingToolDuplicate,
@@ -208,6 +209,8 @@ describe("isContextOverflowError", () => {
"Request exceeds the maximum size", "Request exceeds the maximum size",
"context length exceeded", "context length exceeded",
"Maximum context length", "Maximum context length",
"prompt is too long: 208423 tokens > 200000 maximum",
"Context overflow: Summarization failed",
"413 Request Entity Too Large", "413 Request Entity Too Large",
]; ];
for (const sample of samples) { for (const sample of samples) {
@@ -220,6 +223,26 @@ describe("isContextOverflowError", () => {
}); });
}); });
describe("isCompactionFailureError", () => {
it("matches compaction overflow failures", () => {
const samples = [
"Context overflow: Summarization failed: 400 {\"message\":\"prompt is too long\"}",
"auto-compaction failed due to context overflow",
"Compaction failed: prompt is too long",
];
for (const sample of samples) {
expect(isCompactionFailureError(sample)).toBe(true);
}
});
it("ignores non-compaction overflow errors", () => {
expect(
isCompactionFailureError("Context overflow: prompt too large"),
).toBe(false);
expect(isCompactionFailureError("rate limit exceeded")).toBe(false);
});
});
describe("isBillingErrorMessage", () => { describe("isBillingErrorMessage", () => {
it("matches credit / payment failures", () => { it("matches credit / payment failures", () => {
const samples = [ const samples = [

View File

@@ -244,10 +244,24 @@ export function isContextOverflowError(errorMessage?: string): boolean {
lower.includes("request exceeds the maximum size") || lower.includes("request exceeds the maximum size") ||
lower.includes("context length exceeded") || lower.includes("context length exceeded") ||
lower.includes("maximum context length") || lower.includes("maximum context length") ||
lower.includes("prompt is too long") ||
lower.includes("context overflow") ||
(lower.includes("413") && lower.includes("too large")) (lower.includes("413") && lower.includes("too large"))
); );
} }
export function isCompactionFailureError(errorMessage?: string): boolean {
if (!errorMessage) return false;
if (!isContextOverflowError(errorMessage)) return false;
const lower = errorMessage.toLowerCase();
return (
lower.includes("summarization failed") ||
lower.includes("auto-compaction") ||
lower.includes("compaction failed") ||
lower.includes("compaction")
);
}
export function formatAssistantErrorText( export function formatAssistantErrorText(
msg: AssistantMessage, msg: AssistantMessage,
opts?: { cfg?: ClawdbotConfig; sessionKey?: string }, opts?: { cfg?: ClawdbotConfig; sessionKey?: string },

View File

@@ -36,6 +36,7 @@ import { isCacheEnabled, resolveCacheTtlMs } from "../config/cache-utils.js";
import type { ClawdbotConfig } from "../config/config.js"; import type { ClawdbotConfig } from "../config/config.js";
import { resolveProviderCapabilities } from "../config/provider-capabilities.js"; import { resolveProviderCapabilities } from "../config/provider-capabilities.js";
import { getMachineDisplayName } from "../infra/machine-name.js"; import { getMachineDisplayName } from "../infra/machine-name.js";
import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js";
import { createSubsystemLogger } from "../logging.js"; import { createSubsystemLogger } from "../logging.js";
import { import {
type enqueueCommand, type enqueueCommand,
@@ -85,6 +86,7 @@ import {
formatAssistantErrorText, formatAssistantErrorText,
isAuthAssistantError, isAuthAssistantError,
isCloudCodeAssistFormatError, isCloudCodeAssistFormatError,
isCompactionFailureError,
isContextOverflowError, isContextOverflowError,
isFailoverAssistantError, isFailoverAssistantError,
isFailoverErrorMessage, isFailoverErrorMessage,
@@ -408,6 +410,13 @@ type EmbeddedPiQueueHandle = {
const log = createSubsystemLogger("agent/embedded"); const log = createSubsystemLogger("agent/embedded");
const GOOGLE_TURN_ORDERING_CUSTOM_TYPE = "google-turn-ordering-bootstrap"; const GOOGLE_TURN_ORDERING_CUSTOM_TYPE = "google-turn-ordering-bootstrap";
registerUnhandledRejectionHandler((reason) => {
const message = describeUnknownError(reason);
if (!isCompactionFailureError(message)) return false;
log.error(`Auto-compaction failed (unhandled): ${message}`);
return true;
});
type CustomEntryLike = { type?: unknown; customType?: unknown }; type CustomEntryLike = { type?: unknown; customType?: unknown };
function hasGoogleTurnOrderingMarker(sessionManager: SessionManager): boolean { function hasGoogleTurnOrderingMarker(sessionManager: SessionManager): boolean {

View File

@@ -375,6 +375,57 @@ describe("runReplyAgent typing (heartbeat)", () => {
} }
}); });
it("retries after compaction failure by resetting the session", async () => {
const prevStateDir = process.env.CLAWDBOT_STATE_DIR;
const stateDir = await fs.mkdtemp(
path.join(tmpdir(), "clawdbot-session-compaction-reset-"),
);
process.env.CLAWDBOT_STATE_DIR = stateDir;
try {
const sessionId = "session";
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry = { sessionId, updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8");
runEmbeddedPiAgentMock
.mockImplementationOnce(async () => {
throw new Error(
"Context overflow: Summarization failed: 400 {\"message\":\"prompt is too long\"}",
);
})
.mockImplementationOnce(async () => ({
payloads: [{ text: "ok" }],
meta: {},
}));
const callsBefore = runEmbeddedPiAgentMock.mock.calls.length;
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(runEmbeddedPiAgentMock.mock.calls.length - callsBefore).toBe(2);
const payload = Array.isArray(res) ? res[0] : res;
expect(payload).toMatchObject({ text: "ok" });
expect(sessionStore.main.sessionId).not.toBe(sessionId);
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
} finally {
if (prevStateDir) {
process.env.CLAWDBOT_STATE_DIR = prevStateDir;
} else {
delete process.env.CLAWDBOT_STATE_DIR;
}
}
});
it("still replies even if session reset fails to persist", async () => { it("still replies even if session reset fails to persist", async () => {
const prevStateDir = process.env.CLAWDBOT_STATE_DIR; const prevStateDir = process.env.CLAWDBOT_STATE_DIR;
const stateDir = await fs.mkdtemp( const stateDir = await fs.mkdtemp(

View File

@@ -7,6 +7,10 @@ import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { resolveModelAuthMode } from "../../agents/model-auth.js"; import { resolveModelAuthMode } from "../../agents/model-auth.js";
import { runWithModelFallback } from "../../agents/model-fallback.js"; import { runWithModelFallback } from "../../agents/model-fallback.js";
import { isCliProvider } from "../../agents/model-selection.js"; import { isCliProvider } from "../../agents/model-selection.js";
import {
isCompactionFailureError,
isContextOverflowError,
} from "../../agents/pi-embedded-helpers.js";
import { import {
queueEmbeddedPiMessage, queueEmbeddedPiMessage,
runEmbeddedPiAgent, runEmbeddedPiAgent,
@@ -15,6 +19,7 @@ import { hasNonzeroUsage, type NormalizedUsage } from "../../agents/usage.js";
import type { ClawdbotConfig } from "../../config/config.js"; import type { ClawdbotConfig } from "../../config/config.js";
import { import {
loadSessionStore, loadSessionStore,
resolveAgentIdFromSessionKey,
resolveSessionTranscriptPath, resolveSessionTranscriptPath,
type SessionEntry, type SessionEntry,
saveSessionStore, saveSessionStore,
@@ -231,6 +236,10 @@ export async function runReplyAgent(params: {
typingMode, typingMode,
} = params; } = params;
let activeSessionEntry = sessionEntry;
let activeSessionStore = sessionStore;
let activeIsNewSession = isNewSession;
const isHeartbeat = opts?.isHeartbeat === true; const isHeartbeat = opts?.isHeartbeat === true;
const typingSignals = createTypingSignaler({ const typingSignals = createTypingSignaler({
typing, typing,
@@ -303,11 +312,11 @@ export async function runReplyAgent(params: {
followupRun.prompt, followupRun.prompt,
); );
if (steered && !shouldFollowup) { if (steered && !shouldFollowup) {
if (sessionEntry && sessionStore && sessionKey) { if (activeSessionEntry && activeSessionStore && sessionKey) {
sessionEntry.updatedAt = Date.now(); activeSessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry; activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) { if (storePath) {
await saveSessionStore(storePath, sessionStore); await saveSessionStore(storePath, activeSessionStore);
} }
} }
typing.cleanup(); typing.cleanup();
@@ -317,11 +326,11 @@ export async function runReplyAgent(params: {
if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) { if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
enqueueFollowupRun(queueKey, followupRun, resolvedQueue); enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
if (sessionEntry && sessionStore && sessionKey) { if (activeSessionEntry && activeSessionStore && sessionKey) {
sessionEntry.updatedAt = Date.now(); activeSessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry; activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) { if (storePath) {
await saveSessionStore(storePath, sessionStore); await saveSessionStore(storePath, activeSessionStore);
} }
} }
typing.cleanup(); typing.cleanup();
@@ -332,8 +341,8 @@ export async function runReplyAgent(params: {
opts, opts,
typing, typing,
typingMode, typingMode,
sessionEntry, sessionEntry: activeSessionEntry,
sessionStore, sessionStore: activeSessionStore,
sessionKey, sessionKey,
storePath, storePath,
defaultModel, defaultModel,
@@ -348,6 +357,46 @@ export async function runReplyAgent(params: {
let didLogHeartbeatStrip = false; let didLogHeartbeatStrip = false;
let autoCompactionCompleted = false; let autoCompactionCompleted = false;
let responseUsageLine: string | undefined; let responseUsageLine: string | undefined;
const resetSessionAfterCompactionFailure = async (
reason: string,
): Promise<boolean> => {
if (!sessionKey || !activeSessionStore || !storePath) return false;
const nextSessionId = crypto.randomUUID();
const nextEntry: SessionEntry = {
...(activeSessionStore[sessionKey] ?? activeSessionEntry),
sessionId: nextSessionId,
updatedAt: Date.now(),
systemSent: false,
abortedLastRun: false,
};
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const topicId =
typeof sessionCtx.MessageThreadId === "number"
? sessionCtx.MessageThreadId
: undefined;
const nextSessionFile = resolveSessionTranscriptPath(
nextSessionId,
agentId,
topicId,
);
nextEntry.sessionFile = nextSessionFile;
activeSessionStore[sessionKey] = nextEntry;
try {
await saveSessionStore(storePath, activeSessionStore);
} catch (err) {
defaultRuntime.error(
`Failed to persist session reset after compaction failure (${sessionKey}): ${String(err)}`,
);
}
followupRun.run.sessionId = nextSessionId;
followupRun.run.sessionFile = nextSessionFile;
activeSessionEntry = nextEntry;
activeIsNewSession = true;
defaultRuntime.error(
`Auto-compaction failed (${reason}). Restarting session ${sessionKey} -> ${nextSessionId} and retrying.`,
);
return true;
};
try { try {
const runId = crypto.randomUUID(); const runId = crypto.randomUUID();
if (sessionKey) { if (sessionKey) {
@@ -359,342 +408,360 @@ export async function runReplyAgent(params: {
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>; let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = followupRun.run.provider; let fallbackProvider = followupRun.run.provider;
let fallbackModel = followupRun.run.model; let fallbackModel = followupRun.run.model;
try { let didResetAfterCompactionFailure = false;
const allowPartialStream = !( while (true) {
followupRun.run.reasoningLevel === "stream" && opts?.onReasoningStream try {
); const allowPartialStream = !(
const fallbackResult = await runWithModelFallback({ followupRun.run.reasoningLevel === "stream" && opts?.onReasoningStream
cfg: followupRun.run.config, );
provider: followupRun.run.provider, const fallbackResult = await runWithModelFallback({
model: followupRun.run.model, cfg: followupRun.run.config,
run: (provider, model) => { provider: followupRun.run.provider,
if (isCliProvider(provider, followupRun.run.config)) { model: followupRun.run.model,
const startedAt = Date.now(); run: (provider, model) => {
emitAgentEvent({ if (isCliProvider(provider, followupRun.run.config)) {
runId, const startedAt = Date.now();
stream: "lifecycle", emitAgentEvent({
data: { runId,
phase: "start", stream: "lifecycle",
startedAt, data: {
}, phase: "start",
}); startedAt,
const cliSessionId = getCliSessionId(sessionEntry, provider); },
return runCliAgent({ });
const cliSessionId = getCliSessionId(activeSessionEntry, provider);
return runCliAgent({
sessionId: followupRun.run.sessionId,
sessionKey,
sessionFile: followupRun.run.sessionFile,
workspaceDir: followupRun.run.workspaceDir,
config: followupRun.run.config,
prompt: commandBody,
provider,
model,
thinkLevel: followupRun.run.thinkLevel,
timeoutMs: followupRun.run.timeoutMs,
runId,
extraSystemPrompt: followupRun.run.extraSystemPrompt,
ownerNumbers: followupRun.run.ownerNumbers,
cliSessionId,
})
.then((result) => {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt,
endedAt: Date.now(),
},
});
return result;
})
.catch((err) => {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "error",
startedAt,
endedAt: Date.now(),
error: err instanceof Error ? err.message : String(err),
},
});
throw err;
});
}
return runEmbeddedPiAgent({
sessionId: followupRun.run.sessionId, sessionId: followupRun.run.sessionId,
sessionKey, sessionKey,
messageProvider:
sessionCtx.Provider?.trim().toLowerCase() || undefined,
agentAccountId: sessionCtx.AccountId,
// Provider threading context for tool auto-injection
...buildThreadingToolContext({
sessionCtx,
config: followupRun.run.config,
hasRepliedRef: opts?.hasRepliedRef,
}),
sessionFile: followupRun.run.sessionFile, sessionFile: followupRun.run.sessionFile,
workspaceDir: followupRun.run.workspaceDir, workspaceDir: followupRun.run.workspaceDir,
agentDir: followupRun.run.agentDir,
config: followupRun.run.config, config: followupRun.run.config,
skillsSnapshot: followupRun.run.skillsSnapshot,
prompt: commandBody, prompt: commandBody,
provider,
model,
thinkLevel: followupRun.run.thinkLevel,
timeoutMs: followupRun.run.timeoutMs,
runId,
extraSystemPrompt: followupRun.run.extraSystemPrompt, extraSystemPrompt: followupRun.run.extraSystemPrompt,
ownerNumbers: followupRun.run.ownerNumbers, ownerNumbers: followupRun.run.ownerNumbers,
cliSessionId, enforceFinalTag: followupRun.run.enforceFinalTag,
}) provider,
.then((result) => { model,
emitAgentEvent({ authProfileId: followupRun.run.authProfileId,
runId, thinkLevel: followupRun.run.thinkLevel,
stream: "lifecycle", verboseLevel: followupRun.run.verboseLevel,
data: { reasoningLevel: followupRun.run.reasoningLevel,
phase: "end", bashElevated: followupRun.run.bashElevated,
startedAt, timeoutMs: followupRun.run.timeoutMs,
endedAt: Date.now(), runId,
}, blockReplyBreak: resolvedBlockStreamingBreak,
}); blockReplyChunking,
return result; onPartialReply:
}) opts?.onPartialReply && allowPartialStream
.catch((err) => { ? async (payload) => {
emitAgentEvent({ let text = payload.text;
runId, if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
stream: "lifecycle", const stripped = stripHeartbeatToken(text, {
data: { mode: "message",
phase: "error", });
startedAt, if (stripped.didStrip && !didLogHeartbeatStrip) {
endedAt: Date.now(), didLogHeartbeatStrip = true;
error: err instanceof Error ? err.message : String(err), logVerbose(
}, "Stripped stray HEARTBEAT_OK token from reply",
}); );
throw err; }
}); if (
} stripped.shouldSkip &&
return runEmbeddedPiAgent({ (payload.mediaUrls?.length ?? 0) === 0
sessionId: followupRun.run.sessionId, ) {
sessionKey, return;
messageProvider: }
sessionCtx.Provider?.trim().toLowerCase() || undefined, text = stripped.text;
agentAccountId: sessionCtx.AccountId,
// Provider threading context for tool auto-injection
...buildThreadingToolContext({
sessionCtx,
config: followupRun.run.config,
hasRepliedRef: opts?.hasRepliedRef,
}),
sessionFile: followupRun.run.sessionFile,
workspaceDir: followupRun.run.workspaceDir,
agentDir: followupRun.run.agentDir,
config: followupRun.run.config,
skillsSnapshot: followupRun.run.skillsSnapshot,
prompt: commandBody,
extraSystemPrompt: followupRun.run.extraSystemPrompt,
ownerNumbers: followupRun.run.ownerNumbers,
enforceFinalTag: followupRun.run.enforceFinalTag,
provider,
model,
authProfileId: followupRun.run.authProfileId,
thinkLevel: followupRun.run.thinkLevel,
verboseLevel: followupRun.run.verboseLevel,
reasoningLevel: followupRun.run.reasoningLevel,
bashElevated: followupRun.run.bashElevated,
timeoutMs: followupRun.run.timeoutMs,
runId,
blockReplyBreak: resolvedBlockStreamingBreak,
blockReplyChunking,
onPartialReply:
opts?.onPartialReply && allowPartialStream
? async (payload) => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
} }
if ( if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return;
stripped.shouldSkip && await typingSignals.signalTextDelta(text);
(payload.mediaUrls?.length ?? 0) === 0 await opts.onPartialReply?.({
) {
return;
}
text = stripped.text;
}
if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return;
await typingSignals.signalTextDelta(text);
await opts.onPartialReply?.({
text,
mediaUrls: payload.mediaUrls,
});
}
: undefined,
onReasoningStream:
typingSignals.shouldStartOnReasoning || opts?.onReasoningStream
? async (payload) => {
await typingSignals.signalReasoningDelta();
await opts?.onReasoningStream?.({
text: payload.text,
mediaUrls: payload.mediaUrls,
});
}
: undefined,
onAgentEvent: (evt) => {
// Trigger typing when tools start executing
if (evt.stream === "tool") {
const phase =
typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start") {
void typingSignals.signalToolStart();
}
}
// Track auto-compaction completion
if (evt.stream === "compaction") {
const phase =
typeof evt.data.phase === "string" ? evt.data.phase : "";
const willRetry = Boolean(evt.data.willRetry);
if (phase === "end" && !willRetry) {
autoCompactionCompleted = true;
}
}
},
onBlockReply:
blockStreamingEnabled && opts?.onBlockReply
? async (payload) => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return;
text = stripped.text;
}
const taggedPayload = applyReplyTagsToPayload(
{
text, text,
mediaUrls: payload.mediaUrls, mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0], });
}, }
sessionCtx.MessageSid, : undefined,
); onReasoningStream:
// Let through payloads with audioAsVoice flag even if empty (need to track it) typingSignals.shouldStartOnReasoning || opts?.onReasoningStream
if ( ? async (payload) => {
!isRenderablePayload(taggedPayload) && await typingSignals.signalReasoningDelta();
!payload.audioAsVoice await opts?.onReasoningStream?.({
) text: payload.text,
return; mediaUrls: payload.mediaUrls,
const parsed = parseReplyDirectives( });
taggedPayload.text ?? "", }
{ : undefined,
currentMessageId: sessionCtx.MessageSid, onAgentEvent: (evt) => {
silentToken: SILENT_REPLY_TOKEN, // Trigger typing when tools start executing
}, if (evt.stream === "tool") {
); const phase =
const cleaned = parsed.text || undefined; typeof evt.data.phase === "string" ? evt.data.phase : "";
const hasMedia = if (phase === "start") {
Boolean(taggedPayload.mediaUrl) || void typingSignals.signalToolStart();
(taggedPayload.mediaUrls?.length ?? 0) > 0; }
// Skip empty payloads unless they have audioAsVoice flag (need to track it) }
if ( // Track auto-compaction completion
!cleaned && if (evt.stream === "compaction") {
!hasMedia && const phase =
!payload.audioAsVoice && typeof evt.data.phase === "string" ? evt.data.phase : "";
!parsed.audioAsVoice const willRetry = Boolean(evt.data.willRetry);
) if (phase === "end" && !willRetry) {
return; autoCompactionCompleted = true;
if (parsed.isSilent && !hasMedia) return; }
}
},
onBlockReply:
blockStreamingEnabled && opts?.onBlockReply
? async (payload) => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return;
text = stripped.text;
}
const taggedPayload = applyReplyTagsToPayload(
{
text,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
},
sessionCtx.MessageSid,
);
// Let through payloads with audioAsVoice flag even if empty (need to track it)
if (
!isRenderablePayload(taggedPayload) &&
!payload.audioAsVoice
)
return;
const parsed = parseReplyDirectives(
taggedPayload.text ?? "",
{
currentMessageId: sessionCtx.MessageSid,
silentToken: SILENT_REPLY_TOKEN,
},
);
const cleaned = parsed.text || undefined;
const hasMedia =
Boolean(taggedPayload.mediaUrl) ||
(taggedPayload.mediaUrls?.length ?? 0) > 0;
// Skip empty payloads unless they have audioAsVoice flag (need to track it)
if (
!cleaned &&
!hasMedia &&
!payload.audioAsVoice &&
!parsed.audioAsVoice
)
return;
if (parsed.isSilent && !hasMedia) return;
const blockPayload: ReplyPayload = applyReplyToMode({ const blockPayload: ReplyPayload = applyReplyToMode({
...taggedPayload, ...taggedPayload,
text: cleaned, text: cleaned,
audioAsVoice: Boolean( audioAsVoice: Boolean(
parsed.audioAsVoice || payload.audioAsVoice, parsed.audioAsVoice || payload.audioAsVoice,
), ),
replyToId: taggedPayload.replyToId ?? parsed.replyToId, replyToId: taggedPayload.replyToId ?? parsed.replyToId,
replyToTag: taggedPayload.replyToTag || parsed.replyToTag, replyToTag:
replyToCurrent: taggedPayload.replyToTag || parsed.replyToTag,
taggedPayload.replyToCurrent || parsed.replyToCurrent, replyToCurrent:
}); taggedPayload.replyToCurrent || parsed.replyToCurrent,
});
void typingSignals void typingSignals
.signalTextDelta(cleaned ?? taggedPayload.text) .signalTextDelta(cleaned ?? taggedPayload.text)
.catch((err) => {
logVerbose(
`block reply typing signal failed: ${String(err)}`,
);
});
blockReplyPipeline?.enqueue(blockPayload);
}
: undefined,
shouldEmitToolResult,
onToolResult: opts?.onToolResult
? (payload) => {
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them.
// If a tool callback starts typing after the run finalized, we can end up with
// a typing loop that never sees a matching markRunComplete(). Track and drain.
const task = (async () => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
await typingSignals.signalTextDelta(text);
await opts.onToolResult?.({
text,
mediaUrls: payload.mediaUrls,
});
})()
.catch((err) => { .catch((err) => {
logVerbose( logVerbose(
`block reply typing signal failed: ${String(err)}`, `tool result delivery failed: ${String(err)}`,
); );
})
.finally(() => {
pendingToolTasks.delete(task);
}); });
pendingToolTasks.add(task);
blockReplyPipeline?.enqueue(blockPayload);
} }
: undefined, : undefined,
shouldEmitToolResult, });
onToolResult: opts?.onToolResult },
? (payload) => { });
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them. runResult = fallbackResult.result;
// If a tool callback starts typing after the run finalized, we can end up with fallbackProvider = fallbackResult.provider;
// a typing loop that never sees a matching markRunComplete(). Track and drain. fallbackModel = fallbackResult.model;
const task = (async () => { break;
let text = payload.text; } catch (err) {
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { const message = err instanceof Error ? err.message : String(err);
const stripped = stripHeartbeatToken(text, { const isContextOverflow =
mode: "message", isContextOverflowError(message) ||
}); /context.*overflow|too large|context window/i.test(message);
if (stripped.didStrip && !didLogHeartbeatStrip) { const isCompactionFailure = isCompactionFailureError(message);
didLogHeartbeatStrip = true; const isSessionCorruption =
logVerbose( /function call turn comes immediately after/i.test(message);
"Stripped stray HEARTBEAT_OK token from reply",
);
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
await typingSignals.signalTextDelta(text);
await opts.onToolResult?.({
text,
mediaUrls: payload.mediaUrls,
});
})()
.catch((err) => {
logVerbose(`tool result delivery failed: ${String(err)}`);
})
.finally(() => {
pendingToolTasks.delete(task);
});
pendingToolTasks.add(task);
}
: undefined,
});
},
});
runResult = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const isContextOverflow =
/context.*overflow|too large|context window/i.test(message);
const isSessionCorruption =
/function call turn comes immediately after/i.test(message);
// Auto-recover from Gemini session corruption by resetting the session if (
if (isSessionCorruption && sessionKey && sessionStore && storePath) { isCompactionFailure &&
const corruptedSessionId = sessionEntry?.sessionId; !didResetAfterCompactionFailure &&
defaultRuntime.error( (await resetSessionAfterCompactionFailure(message))
`Session history corrupted (Gemini function call ordering). Resetting session: ${sessionKey}`, ) {
); didResetAfterCompactionFailure = true;
continue;
try {
// Delete transcript file if it exists
if (corruptedSessionId) {
const transcriptPath =
resolveSessionTranscriptPath(corruptedSessionId);
try {
fs.unlinkSync(transcriptPath);
} catch {
// Ignore if file doesn't exist
}
}
// Remove session entry from store
delete sessionStore[sessionKey];
await saveSessionStore(storePath, sessionStore);
} catch (cleanupErr) {
defaultRuntime.error(
`Failed to reset corrupted session ${sessionKey}: ${String(cleanupErr)}`,
);
} }
// Auto-recover from Gemini session corruption by resetting the session
if (isSessionCorruption && sessionKey && activeSessionStore && storePath) {
const corruptedSessionId = activeSessionEntry?.sessionId;
defaultRuntime.error(
`Session history corrupted (Gemini function call ordering). Resetting session: ${sessionKey}`,
);
try {
// Delete transcript file if it exists
if (corruptedSessionId) {
const transcriptPath =
resolveSessionTranscriptPath(corruptedSessionId);
try {
fs.unlinkSync(transcriptPath);
} catch {
// Ignore if file doesn't exist
}
}
// Remove session entry from store
delete activeSessionStore[sessionKey];
await saveSessionStore(storePath, activeSessionStore);
} catch (cleanupErr) {
defaultRuntime.error(
`Failed to reset corrupted session ${sessionKey}: ${String(cleanupErr)}`,
);
}
return finalizeWithFollowup({
text: "⚠️ Session history was corrupted. I've reset the conversation - please try again!",
});
}
defaultRuntime.error(`Embedded agent failed before reply: ${message}`);
return finalizeWithFollowup({ return finalizeWithFollowup({
text: "⚠️ Session history was corrupted. I've reset the conversation - please try again!", text: isContextOverflow
? "⚠️ Context overflow - conversation too long. Starting fresh might help!"
: `⚠️ Agent failed before reply: ${message}. Check gateway logs for details.`,
}); });
} }
defaultRuntime.error(`Embedded agent failed before reply: ${message}`);
return finalizeWithFollowup({
text: isContextOverflow
? "⚠️ Context overflow - conversation too long. Starting fresh might help!"
: `⚠️ Agent failed before reply: ${message}. Check gateway logs for details.`,
});
} }
if ( if (
shouldInjectGroupIntro && shouldInjectGroupIntro &&
sessionEntry && activeSessionEntry &&
sessionStore && activeSessionStore &&
sessionKey && sessionKey &&
sessionEntry.groupActivationNeedsSystemIntro activeSessionEntry.groupActivationNeedsSystemIntro
) { ) {
sessionEntry.groupActivationNeedsSystemIntro = false; activeSessionEntry.groupActivationNeedsSystemIntro = false;
sessionEntry.updatedAt = Date.now(); activeSessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry; activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) { if (storePath) {
await saveSessionStore(storePath, sessionStore); await saveSessionStore(storePath, activeSessionStore);
} }
} }
@@ -814,7 +881,7 @@ export async function runReplyAgent(params: {
const contextTokensUsed = const contextTokensUsed =
agentCfgContextTokens ?? agentCfgContextTokens ??
lookupContextTokens(modelUsed) ?? lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ?? activeSessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS; DEFAULT_CONTEXT_TOKENS;
if (storePath && sessionKey) { if (storePath && sessionKey) {
@@ -884,9 +951,9 @@ export async function runReplyAgent(params: {
} }
const responseUsageEnabled = const responseUsageEnabled =
(sessionEntry?.responseUsage ?? (activeSessionEntry?.responseUsage ??
(sessionKey (sessionKey
? sessionStore?.[sessionKey]?.responseUsage ? activeSessionStore?.[sessionKey]?.responseUsage
: undefined)) === "on"; : undefined)) === "on";
if (responseUsageEnabled && hasNonzeroUsage(usage)) { if (responseUsageEnabled && hasNonzeroUsage(usage)) {
const authMode = resolveModelAuthMode(providerUsed, cfg); const authMode = resolveModelAuthMode(providerUsed, cfg);
@@ -910,8 +977,8 @@ export async function runReplyAgent(params: {
let finalPayloads = replyPayloads; let finalPayloads = replyPayloads;
if (autoCompactionCompleted) { if (autoCompactionCompleted) {
const count = await incrementCompactionCount({ const count = await incrementCompactionCount({
sessionEntry, sessionEntry: activeSessionEntry,
sessionStore, sessionStore: activeSessionStore,
sessionKey, sessionKey,
storePath, storePath,
}); });
@@ -923,7 +990,7 @@ export async function runReplyAgent(params: {
]; ];
} }
} }
if (resolvedVerboseLevel === "on" && isNewSession) { if (resolvedVerboseLevel === "on" && activeIsNewSession) {
finalPayloads = [ finalPayloads = [
{ text: `🧭 New session: ${followupRun.run.sessionId}` }, { text: `🧭 New session: ${followupRun.run.sessionId}` },
...finalPayloads, ...finalPayloads,