fix(routing): handle cross-provider messages in collect mode
When queued messages come from different providers (Slack + Telegram), process them individually instead of collecting into a single prompt. This ensures each reply routes back to its originating provider. - Add hasCrossProviderItems() to detect multi-provider queues - Skip collect mode when cross-provider detected - Preserve originatingChannel/originatingTo when collecting same-provider
This commit is contained in:
committed by
Peter Steinberger
parent
2d67ec5bfa
commit
5414da9fd4
@@ -82,9 +82,13 @@ export function createFollowupRunner(params: {
|
|||||||
cfg: queued.run.config,
|
cfg: queued.run.config,
|
||||||
});
|
});
|
||||||
if (!result.ok) {
|
if (!result.ok) {
|
||||||
logVerbose(
|
// Log error and fall back to dispatcher if available.
|
||||||
`followup queue: route-reply failed: ${result.error ?? "unknown error"}`,
|
const errorMsg = result.error ?? "unknown error";
|
||||||
);
|
logVerbose(`followup queue: route-reply failed: ${errorMsg}`);
|
||||||
|
// Fallback: try the dispatcher if routing failed.
|
||||||
|
if (opts?.onBlockReply) {
|
||||||
|
await opts.onBlockReply(payload);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (opts?.onBlockReply) {
|
} else if (opts?.onBlockReply) {
|
||||||
await opts.onBlockReply(payload);
|
await opts.onBlockReply(payload);
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import type { SessionEntry } from "../../config/sessions.js";
|
|||||||
import { defaultRuntime } from "../../runtime.js";
|
import { defaultRuntime } from "../../runtime.js";
|
||||||
import type { OriginatingChannelType } from "../templating.js";
|
import type { OriginatingChannelType } from "../templating.js";
|
||||||
import type { ElevatedLevel, ThinkLevel, VerboseLevel } from "./directives.js";
|
import type { ElevatedLevel, ThinkLevel, VerboseLevel } from "./directives.js";
|
||||||
|
import { isRoutableChannel } from "./route-reply.js";
|
||||||
export type QueueMode =
|
export type QueueMode =
|
||||||
| "steer"
|
| "steer"
|
||||||
| "followup"
|
| "followup"
|
||||||
@@ -386,6 +387,33 @@ function buildCollectPrompt(items: FollowupRun[], summary?: string): string {
|
|||||||
});
|
});
|
||||||
return blocks.join("\n\n");
|
return blocks.join("\n\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if queued items have different routable originating channels.
|
||||||
|
*
|
||||||
|
* Returns true if messages come from different providers (e.g., Slack + Telegram),
|
||||||
|
* meaning they cannot be safely collected into one prompt without losing routing.
|
||||||
|
* Also returns true for a mix of routable and non-routable channels.
|
||||||
|
*/
|
||||||
|
function hasCrossProviderItems(items: FollowupRun[]): boolean {
|
||||||
|
const routableChannels = new Set<string>();
|
||||||
|
let hasNonRoutable = false;
|
||||||
|
|
||||||
|
for (const item of items) {
|
||||||
|
const channel = item.originatingChannel;
|
||||||
|
if (isRoutableChannel(channel)) {
|
||||||
|
routableChannels.add(channel);
|
||||||
|
} else if (channel) {
|
||||||
|
// Has a channel but it's not routable (whatsapp, webchat).
|
||||||
|
hasNonRoutable = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cross-provider if: multiple routable channels, or mix of routable + non-routable.
|
||||||
|
return (
|
||||||
|
routableChannels.size > 1 || (routableChannels.size > 0 && hasNonRoutable)
|
||||||
|
);
|
||||||
|
}
|
||||||
export function scheduleFollowupDrain(
|
export function scheduleFollowupDrain(
|
||||||
key: string,
|
key: string,
|
||||||
runFollowup: (run: FollowupRun) => Promise<void>,
|
runFollowup: (run: FollowupRun) => Promise<void>,
|
||||||
@@ -398,15 +426,39 @@ export function scheduleFollowupDrain(
|
|||||||
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
||||||
await waitForQueueDebounce(queue);
|
await waitForQueueDebounce(queue);
|
||||||
if (queue.mode === "collect") {
|
if (queue.mode === "collect") {
|
||||||
|
// Check if messages span multiple providers.
|
||||||
|
// If so, process individually to preserve per-message routing.
|
||||||
|
const isCrossProvider = hasCrossProviderItems(queue.items);
|
||||||
|
|
||||||
|
if (isCrossProvider) {
|
||||||
|
// Process one at a time to preserve per-message routing info.
|
||||||
|
const next = queue.items.shift();
|
||||||
|
if (!next) break;
|
||||||
|
await runFollowup(next);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Same-provider messages can be safely collected.
|
||||||
const items = queue.items.splice(0, queue.items.length);
|
const items = queue.items.splice(0, queue.items.length);
|
||||||
const summary = buildSummaryPrompt(queue);
|
const summary = buildSummaryPrompt(queue);
|
||||||
const run = items.at(-1)?.run ?? queue.lastRun;
|
const run = items.at(-1)?.run ?? queue.lastRun;
|
||||||
if (!run) break;
|
if (!run) break;
|
||||||
|
|
||||||
|
// Preserve originating channel from items when collecting same-provider.
|
||||||
|
const originatingChannel = items.find(
|
||||||
|
(i) => i.originatingChannel,
|
||||||
|
)?.originatingChannel;
|
||||||
|
const originatingTo = items.find(
|
||||||
|
(i) => i.originatingTo,
|
||||||
|
)?.originatingTo;
|
||||||
|
|
||||||
const prompt = buildCollectPrompt(items, summary);
|
const prompt = buildCollectPrompt(items, summary);
|
||||||
await runFollowup({
|
await runFollowup({
|
||||||
prompt,
|
prompt,
|
||||||
run,
|
run,
|
||||||
enqueuedAt: Date.now(),
|
enqueuedAt: Date.now(),
|
||||||
|
originatingChannel,
|
||||||
|
originatingTo,
|
||||||
});
|
});
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user