diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 7fca6d4cf..166b6289a 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -18,7 +18,10 @@ import { updateSessionStore, } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; -import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; +import { + emitAgentEvent, + registerAgentRunContext, +} from "../../infra/agent-events.js"; import { defaultRuntime } from "../../runtime.js"; import { isMarkdownCapableMessageChannel, @@ -29,11 +32,20 @@ import type { TemplateContext } from "../templating.js"; import type { VerboseLevel } from "../thinking.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { buildThreadingToolContext, resolveEnforceFinalTag } from "./agent-runner-utils.js"; -import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js"; +import { + buildThreadingToolContext, + resolveEnforceFinalTag, +} from "./agent-runner-utils.js"; +import { + createBlockReplyPayloadKey, + type BlockReplyPipeline, +} from "./block-reply-pipeline.js"; import type { FollowupRun } from "./queue.js"; import { parseReplyDirectives } from "./reply-directives.js"; -import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js"; +import { + applyReplyTagsToPayload, + isRenderablePayload, +} from "./reply-payloads.js"; import type { TypingSignaler } from "./typing-mode.js"; export type AgentRunLoopResult = @@ -96,9 +108,12 @@ export async function runAgentTurnWithFallback(params: { while (true) { try { const allowPartialStream = !( - params.followupRun.run.reasoningLevel === "stream" && params.opts?.onReasoningStream + params.followupRun.run.reasoningLevel === "stream" && + params.opts?.onReasoningStream ); - const normalizeStreamingText = (payload: ReplyPayload): { text?: string; skip: boolean } => { + const normalizeStreamingText = ( + payload: ReplyPayload, + ): { text?: string; skip: boolean } => { if (!allowPartialStream) return { skip: true }; let text = payload.text; if (!params.isHeartbeat && text?.includes("HEARTBEAT_OK")) { @@ -122,7 +137,9 @@ export async function runAgentTurnWithFallback(params: { if (!sanitized.trim()) return { skip: true }; return { text: sanitized, skip: false }; }; - const handlePartialForTyping = async (payload: ReplyPayload): Promise => { + const handlePartialForTyping = async ( + payload: ReplyPayload, + ): Promise => { const { text, skip } = normalizeStreamingText(payload); if (skip || !text) return undefined; await params.typingSignals.signalTextDelta(text); @@ -157,7 +174,10 @@ export async function runAgentTurnWithFallback(params: { startedAt, }, }); - const cliSessionId = getCliSessionId(params.getActiveSessionEntry(), provider); + const cliSessionId = getCliSessionId( + params.getActiveSessionEntry(), + provider, + ); return runCliAgent({ sessionId: params.followupRun.run.sessionId, sessionKey: params.sessionKey, @@ -207,7 +227,8 @@ export async function runAgentTurnWithFallback(params: { return runEmbeddedPiAgent({ sessionId: params.followupRun.run.sessionId, sessionKey: params.sessionKey, - messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined, + messageProvider: + params.sessionCtx.Provider?.trim().toLowerCase() || undefined, agentAccountId: params.sessionCtx.AccountId, // Provider threading context for tool auto-injection ...buildThreadingToolContext({ @@ -223,7 +244,10 @@ export async function runAgentTurnWithFallback(params: { prompt: params.commandBody, extraSystemPrompt: params.followupRun.run.extraSystemPrompt, ownerNumbers: params.followupRun.run.ownerNumbers, - enforceFinalTag: resolveEnforceFinalTag(params.followupRun.run, provider), + enforceFinalTag: resolveEnforceFinalTag( + params.followupRun.run, + provider, + ), provider, model, authProfileId, @@ -240,7 +264,9 @@ export async function runAgentTurnWithFallback(params: { params.sessionCtx.Provider, ); if (!channel) return "markdown"; - return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; + return isMarkdownCapableMessageChannel(channel) + ? "markdown" + : "plain"; })(), bashElevated: params.followupRun.run.bashElevated, timeoutMs: params.followupRun.run.timeoutMs, @@ -250,7 +276,11 @@ export async function runAgentTurnWithFallback(params: { onPartialReply: allowPartialStream ? async (payload) => { const textForTyping = await handlePartialForTyping(payload); - if (!params.opts?.onPartialReply || textForTyping === undefined) return; + if ( + !params.opts?.onPartialReply || + textForTyping === undefined + ) + return; await params.opts.onPartialReply({ text: textForTyping, mediaUrls: payload.mediaUrls, @@ -261,7 +291,8 @@ export async function runAgentTurnWithFallback(params: { await params.typingSignals.signalMessageStart(); }, onReasoningStream: - params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream + params.typingSignals.shouldStartOnReasoning || + params.opts?.onReasoningStream ? async (payload) => { await params.typingSignals.signalReasoningDelta(); await params.opts?.onReasoningStream?.({ @@ -274,14 +305,16 @@ export async function runAgentTurnWithFallback(params: { // Trigger typing when tools start executing. // Must await to ensure typing indicator starts before tool summaries are emitted. if (evt.stream === "tool") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const phase = + typeof evt.data.phase === "string" ? evt.data.phase : ""; if (phase === "start" || phase === "update") { await params.typingSignals.signalToolStart(); } } // Track auto-compaction completion if (evt.stream === "compaction") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const phase = + typeof evt.data.phase === "string" ? evt.data.phase : ""; const willRetry = Boolean(evt.data.willRetry); if (phase === "end" && !willRetry) { autoCompactionCompleted = true; @@ -305,14 +338,22 @@ export async function runAgentTurnWithFallback(params: { params.sessionCtx.MessageSid, ); // Let through payloads with audioAsVoice flag even if empty (need to track it) - if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return; - const parsed = parseReplyDirectives(taggedPayload.text ?? "", { - currentMessageId: params.sessionCtx.MessageSid, - silentToken: SILENT_REPLY_TOKEN, - }); + if ( + !isRenderablePayload(taggedPayload) && + !payload.audioAsVoice + ) + return; + const parsed = parseReplyDirectives( + taggedPayload.text ?? "", + { + currentMessageId: params.sessionCtx.MessageSid, + silentToken: SILENT_REPLY_TOKEN, + }, + ); const cleaned = parsed.text || undefined; const hasRenderableMedia = - Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; + Boolean(taggedPayload.mediaUrl) || + (taggedPayload.mediaUrls?.length ?? 0) > 0; // Skip empty payloads unless they have audioAsVoice flag (need to track it) if ( !cleaned && @@ -326,25 +367,35 @@ export async function runAgentTurnWithFallback(params: { const blockPayload: ReplyPayload = params.applyReplyToMode({ ...taggedPayload, text: cleaned, - audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice), + audioAsVoice: Boolean( + parsed.audioAsVoice || payload.audioAsVoice, + ), replyToId: taggedPayload.replyToId ?? parsed.replyToId, replyToTag: taggedPayload.replyToTag || parsed.replyToTag, - replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent, + replyToCurrent: + taggedPayload.replyToCurrent || parsed.replyToCurrent, }); void params.typingSignals .signalTextDelta(cleaned ?? taggedPayload.text) .catch((err) => { - logVerbose(`block reply typing signal failed: ${String(err)}`); + logVerbose( + `block reply typing signal failed: ${String(err)}`, + ); }); // Use pipeline if available (block streaming enabled), otherwise send directly - if (params.blockStreamingEnabled && params.blockReplyPipeline) { + if ( + params.blockStreamingEnabled && + params.blockReplyPipeline + ) { params.blockReplyPipeline.enqueue(blockPayload); } else { // Send directly when flushing before tool execution (no streaming). // Track sent key to avoid duplicate in final payloads. - directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); + directlySentBlockKeys.add( + createBlockReplyPayloadKey(blockPayload), + ); await params.opts?.onBlockReply?.(blockPayload); } } @@ -397,10 +448,17 @@ export async function runAgentTurnWithFallback(params: { (await params.resetSessionAfterCompactionFailure(embeddedError.message)) ) { didResetAfterCompactionFailure = true; - continue; + return { + kind: "final", + payload: { + text: "⚠️ Context limit exceeded. I've reset our conversation to start fresh - please try again.\n\nTo prevent this, increase your compaction buffer by setting `agents.defaults.compaction.reserveTokensFloor` to 4000 or higher in your config.", + }, + }; } if (embeddedError?.kind === "role_ordering") { - const didReset = await params.resetSessionAfterRoleOrderingConflict(embeddedError.message); + const didReset = await params.resetSessionAfterRoleOrderingConflict( + embeddedError.message, + ); if (didReset) { return { kind: "final", @@ -418,8 +476,10 @@ export async function runAgentTurnWithFallback(params: { isContextOverflowError(message) || /context.*overflow|too large|context window/i.test(message); const isCompactionFailure = isCompactionFailureError(message); - const isSessionCorruption = /function call turn comes immediately after/i.test(message); - const isRoleOrderingError = /incorrect role information|roles must alternate/i.test(message); + const isSessionCorruption = + /function call turn comes immediately after/i.test(message); + const isRoleOrderingError = + /incorrect role information|roles must alternate/i.test(message); if ( isCompactionFailure && @@ -427,10 +487,16 @@ export async function runAgentTurnWithFallback(params: { (await params.resetSessionAfterCompactionFailure(message)) ) { didResetAfterCompactionFailure = true; - continue; + return { + kind: "final", + payload: { + text: "⚠️ Context limit exceeded during compaction. I've reset our conversation to start fresh - please try again.\n\nTo prevent this, increase your compaction buffer by setting `agents.defaults.compaction.reserveTokensFloor` to 4000 or higher in your config.", + }, + }; } if (isRoleOrderingError) { - const didReset = await params.resetSessionAfterRoleOrderingConflict(message); + const didReset = + await params.resetSessionAfterRoleOrderingConflict(message); if (didReset) { return { kind: "final", @@ -457,7 +523,8 @@ export async function runAgentTurnWithFallback(params: { try { // Delete transcript file if it exists if (corruptedSessionId) { - const transcriptPath = resolveSessionTranscriptPath(corruptedSessionId); + const transcriptPath = + resolveSessionTranscriptPath(corruptedSessionId); try { fs.unlinkSync(transcriptPath); } catch { @@ -507,6 +574,7 @@ export async function runAgentTurnWithFallback(params: { fallbackModel, didLogHeartbeatStrip, autoCompactionCompleted, - directlySentBlockKeys: directlySentBlockKeys.size > 0 ? directlySentBlockKeys : undefined, + directlySentBlockKeys: + directlySentBlockKeys.size > 0 ? directlySentBlockKeys : undefined, }; }