import crypto from "node:crypto"; import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { hasNonzeroUsage } from "../../agents/usage.js"; import { type SessionEntry, updateSessionStoreEntry, } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { defaultRuntime } from "../../runtime.js"; import { stripHeartbeatToken } from "../heartbeat.js"; import type { OriginatingChannelType } from "../templating.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { FollowupRun } from "./queue.js"; import { applyReplyThreading, filterMessagingToolDuplicates, shouldSuppressMessagingToolReplies, } from "./reply-payloads.js"; import { resolveReplyToMode } from "./reply-threading.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; import { createTypingSignaler } from "./typing-mode.js"; export function createFollowupRunner(params: { opts?: GetReplyOptions; typing: TypingController; typingMode: TypingMode; sessionEntry?: SessionEntry; sessionStore?: Record; sessionKey?: string; storePath?: string; defaultModel: string; agentCfgContextTokens?: number; }): (queued: FollowupRun) => Promise { const { opts, typing, typingMode, sessionEntry, sessionStore, sessionKey, storePath, defaultModel, agentCfgContextTokens, } = params; const typingSignals = createTypingSignaler({ typing, mode: typingMode, isHeartbeat: opts?.isHeartbeat === true, }); /** * Sends followup payloads, routing to the originating channel if set. * * When originatingChannel/originatingTo are set on the queued run, * replies are routed directly to that provider instead of using the * session's current dispatcher. This ensures replies go back to * where the message originated. */ const sendFollowupPayloads = async ( payloads: ReplyPayload[], queued: FollowupRun, ) => { // Check if we should route to originating channel. const { originatingChannel, originatingTo } = queued; const shouldRouteToOriginating = isRoutableChannel(originatingChannel) && originatingTo; if (!shouldRouteToOriginating && !opts?.onBlockReply) { logVerbose("followup queue: no onBlockReply handler; dropping payloads"); return; } for (const payload of payloads) { if (!payload?.text && !payload?.mediaUrl && !payload?.mediaUrls?.length) { continue; } if ( isSilentReplyText(payload.text, SILENT_REPLY_TOKEN) && !payload.mediaUrl && !payload.mediaUrls?.length ) { continue; } await typingSignals.signalTextDelta(payload.text); // Route to originating channel if set, otherwise fall back to dispatcher. if (shouldRouteToOriginating) { const result = await routeReply({ payload, channel: originatingChannel, to: originatingTo, sessionKey: queued.run.sessionKey, accountId: queued.originatingAccountId, threadId: queued.originatingThreadId, cfg: queued.run.config, }); if (!result.ok) { // Log error and fall back to dispatcher if available. const errorMsg = result.error ?? "unknown error"; logVerbose(`followup queue: route-reply failed: ${errorMsg}`); // Fallback: try the dispatcher if routing failed. if (opts?.onBlockReply) { await opts.onBlockReply(payload); } } } else if (opts?.onBlockReply) { await opts.onBlockReply(payload); } } }; return async (queued: FollowupRun) => { await typingSignals.signalRunStart(); try { const runId = crypto.randomUUID(); if (queued.run.sessionKey) { registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey, verboseLevel: queued.run.verboseLevel, }); } let autoCompactionCompleted = false; let runResult: Awaited>; let fallbackProvider = queued.run.provider; let fallbackModel = queued.run.model; try { const fallbackResult = await runWithModelFallback({ cfg: queued.run.config, provider: queued.run.provider, model: queued.run.model, run: (provider, model) => runEmbeddedPiAgent({ sessionId: queued.run.sessionId, sessionKey: queued.run.sessionKey, messageProvider: queued.run.messageProvider, agentAccountId: queued.run.agentAccountId, sessionFile: queued.run.sessionFile, workspaceDir: queued.run.workspaceDir, config: queued.run.config, skillsSnapshot: queued.run.skillsSnapshot, prompt: queued.prompt, extraSystemPrompt: queued.run.extraSystemPrompt, ownerNumbers: queued.run.ownerNumbers, enforceFinalTag: queued.run.enforceFinalTag, provider, model, authProfileId: queued.run.authProfileId, thinkLevel: queued.run.thinkLevel, verboseLevel: queued.run.verboseLevel, reasoningLevel: queued.run.reasoningLevel, bashElevated: queued.run.bashElevated, timeoutMs: queued.run.timeoutMs, runId, blockReplyBreak: queued.run.blockReplyBreak, onAgentEvent: (evt) => { if (evt.stream !== "compaction") return; const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; const willRetry = Boolean(evt.data.willRetry); if (phase === "end" && !willRetry) { autoCompactionCompleted = true; } }, }), }); runResult = fallbackResult.result; fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; } catch (err) { const message = err instanceof Error ? err.message : String(err); defaultRuntime.error?.( `Followup agent failed before reply: ${message}`, ); return; } const payloadArray = runResult.payloads ?? []; if (payloadArray.length === 0) return; const sanitizedPayloads = payloadArray.flatMap((payload) => { const text = payload.text; if (!text || !text.includes("HEARTBEAT_OK")) return [payload]; const stripped = stripHeartbeatToken(text, { mode: "message" }); const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; if (stripped.shouldSkip && !hasMedia) return []; return [{ ...payload, text: stripped.text }]; }); const replyToChannel = queued.originatingChannel ?? (queued.run.messageProvider?.toLowerCase() as | OriginatingChannelType | undefined); const replyToMode = resolveReplyToMode( queued.run.config, replyToChannel, queued.originatingAccountId, ); const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({ payloads: sanitizedPayloads, replyToMode, replyToChannel, }); const dedupedPayloads = filterMessagingToolDuplicates({ payloads: replyTaggedPayloads, sentTexts: runResult.messagingToolSentTexts ?? [], }); const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ messageProvider: queued.run.messageProvider, messagingToolSentTargets: runResult.messagingToolSentTargets, originatingTo: queued.originatingTo, accountId: queued.run.agentAccountId, }); const finalPayloads = suppressMessagingToolReplies ? [] : dedupedPayloads; if (finalPayloads.length === 0) return; if (autoCompactionCompleted) { const count = await incrementCompactionCount({ sessionEntry, sessionStore, sessionKey, storePath, }); if (queued.run.verboseLevel === "on") { const suffix = typeof count === "number" ? ` (count ${count})` : ""; finalPayloads.unshift({ text: `🧹 Auto-compaction complete${suffix}.`, }); } } if (storePath && sessionKey) { const usage = runResult.meta.agentMeta?.usage; const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; const contextTokensUsed = agentCfgContextTokens ?? lookupContextTokens(modelUsed) ?? sessionEntry?.contextTokens ?? DEFAULT_CONTEXT_TOKENS; if (hasNonzeroUsage(usage)) { try { await updateSessionStoreEntry({ storePath, sessionKey, update: async (entry) => { const input = usage.input ?? 0; const output = usage.output ?? 0; const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); return { inputTokens: input, outputTokens: output, totalTokens: promptTokens > 0 ? promptTokens : (usage.total ?? input), modelProvider: fallbackProvider ?? entry.modelProvider, model: modelUsed, contextTokens: contextTokensUsed ?? entry.contextTokens, updatedAt: Date.now(), }; }, }); } catch (err) { logVerbose( `failed to persist followup usage update: ${String(err)}`, ); } } else if (modelUsed || contextTokensUsed) { try { await updateSessionStoreEntry({ storePath, sessionKey, update: async (entry) => ({ modelProvider: fallbackProvider ?? entry.modelProvider, model: modelUsed ?? entry.model, contextTokens: contextTokensUsed ?? entry.contextTokens, updatedAt: Date.now(), }), }); } catch (err) { logVerbose( `failed to persist followup model/context update: ${String(err)}`, ); } } } await sendFollowupPayloads(finalPayloads, queued); } finally { typing.markRunComplete(); } }; }