From 5414da9fd42102155de16cbfae3fb768ceaf4f9f Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Tue, 6 Jan 2026 13:43:10 -0800 Subject: [PATCH] fix(routing): handle cross-provider messages in collect mode When queued messages come from different providers (Slack + Telegram), process them individually instead of collecting into a single prompt. This ensures each reply routes back to its originating provider. - Add hasCrossProviderItems() to detect multi-provider queues - Skip collect mode when cross-provider detected - Preserve originatingChannel/originatingTo when collecting same-provider --- src/auto-reply/reply/followup-runner.ts | 10 +++-- src/auto-reply/reply/queue.ts | 52 +++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index e386e33e0..ea88ef983 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -82,9 +82,13 @@ export function createFollowupRunner(params: { cfg: queued.run.config, }); if (!result.ok) { - logVerbose( - `followup queue: route-reply failed: ${result.error ?? "unknown error"}`, - ); + // Log error and fall back to dispatcher if available. + const errorMsg = result.error ?? "unknown error"; + logVerbose(`followup queue: route-reply failed: ${errorMsg}`); + // Fallback: try the dispatcher if routing failed. + if (opts?.onBlockReply) { + await opts.onBlockReply(payload); + } } } else if (opts?.onBlockReply) { await opts.onBlockReply(payload); diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index 2645a8704..51bc9a84d 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -5,6 +5,7 @@ import type { SessionEntry } from "../../config/sessions.js"; import { defaultRuntime } from "../../runtime.js"; import type { OriginatingChannelType } from "../templating.js"; import type { ElevatedLevel, ThinkLevel, VerboseLevel } from "./directives.js"; +import { isRoutableChannel } from "./route-reply.js"; export type QueueMode = | "steer" | "followup" @@ -386,6 +387,33 @@ function buildCollectPrompt(items: FollowupRun[], summary?: string): string { }); return blocks.join("\n\n"); } + +/** + * Checks if queued items have different routable originating channels. + * + * Returns true if messages come from different providers (e.g., Slack + Telegram), + * meaning they cannot be safely collected into one prompt without losing routing. + * Also returns true for a mix of routable and non-routable channels. + */ +function hasCrossProviderItems(items: FollowupRun[]): boolean { + const routableChannels = new Set(); + let hasNonRoutable = false; + + for (const item of items) { + const channel = item.originatingChannel; + if (isRoutableChannel(channel)) { + routableChannels.add(channel); + } else if (channel) { + // Has a channel but it's not routable (whatsapp, webchat). + hasNonRoutable = true; + } + } + + // Cross-provider if: multiple routable channels, or mix of routable + non-routable. + return ( + routableChannels.size > 1 || (routableChannels.size > 0 && hasNonRoutable) + ); +} export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, @@ -398,15 +426,39 @@ export function scheduleFollowupDrain( while (queue.items.length > 0 || queue.droppedCount > 0) { await waitForQueueDebounce(queue); if (queue.mode === "collect") { + // Check if messages span multiple providers. + // If so, process individually to preserve per-message routing. + const isCrossProvider = hasCrossProviderItems(queue.items); + + if (isCrossProvider) { + // Process one at a time to preserve per-message routing info. + const next = queue.items.shift(); + if (!next) break; + await runFollowup(next); + continue; + } + + // Same-provider messages can be safely collected. const items = queue.items.splice(0, queue.items.length); const summary = buildSummaryPrompt(queue); const run = items.at(-1)?.run ?? queue.lastRun; if (!run) break; + + // Preserve originating channel from items when collecting same-provider. + const originatingChannel = items.find( + (i) => i.originatingChannel, + )?.originatingChannel; + const originatingTo = items.find( + (i) => i.originatingTo, + )?.originatingTo; + const prompt = buildCollectPrompt(items, summary); await runFollowup({ prompt, run, enqueuedAt: Date.now(), + originatingChannel, + originatingTo, }); continue; }