fix(routing): only route to originating channel when cross-provider
When OriginatingChannel matches Surface (same provider), use normal dispatcher. Only route via routeReply() when they differ, ensuring cross-provider messages (e.g., Telegram queued while Slack active) get routed back to their origin.
This commit is contained in:
committed by
Peter Steinberger
parent
9d50ebad7d
commit
2d67ec5bfa
@@ -1,8 +1,10 @@
|
|||||||
import type { ClawdbotConfig } from "../../config/config.js";
|
import type { ClawdbotConfig } from "../../config/config.js";
|
||||||
|
import { logVerbose } from "../../globals.js";
|
||||||
import { getReplyFromConfig } from "../reply.js";
|
import { getReplyFromConfig } from "../reply.js";
|
||||||
import type { MsgContext } from "../templating.js";
|
import type { MsgContext } from "../templating.js";
|
||||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||||
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
|
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
|
||||||
|
import { isRoutableChannel, routeReply } from "./route-reply.js";
|
||||||
|
|
||||||
type DispatchFromConfigResult = {
|
type DispatchFromConfigResult = {
|
||||||
queuedFinal: boolean;
|
queuedFinal: boolean;
|
||||||
@@ -16,18 +18,68 @@ export async function dispatchReplyFromConfig(params: {
|
|||||||
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
|
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
|
||||||
replyResolver?: typeof getReplyFromConfig;
|
replyResolver?: typeof getReplyFromConfig;
|
||||||
}): Promise<DispatchFromConfigResult> {
|
}): Promise<DispatchFromConfigResult> {
|
||||||
|
const { ctx, cfg, dispatcher } = params;
|
||||||
|
|
||||||
|
// Check if we should route replies to originating channel instead of dispatcher.
|
||||||
|
// Only route when the originating channel is DIFFERENT from the current surface.
|
||||||
|
// This handles cross-provider routing (e.g., message from Telegram being processed
|
||||||
|
// by a shared session that's currently on Slack) while preserving normal dispatcher
|
||||||
|
// flow when the provider handles its own messages.
|
||||||
|
const originatingChannel = ctx.OriginatingChannel;
|
||||||
|
const originatingTo = ctx.OriginatingTo;
|
||||||
|
const currentSurface = ctx.Surface?.toLowerCase();
|
||||||
|
const shouldRouteToOriginating =
|
||||||
|
isRoutableChannel(originatingChannel) &&
|
||||||
|
originatingTo &&
|
||||||
|
originatingChannel !== currentSurface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper to send a payload via route-reply (async).
|
||||||
|
* Only used when actually routing to a different provider.
|
||||||
|
* Note: Only called when shouldRouteToOriginating is true, so
|
||||||
|
* originatingChannel and originatingTo are guaranteed to be defined.
|
||||||
|
*/
|
||||||
|
const sendPayloadAsync = async (payload: ReplyPayload): Promise<void> => {
|
||||||
|
// TypeScript doesn't narrow these from the shouldRouteToOriginating check,
|
||||||
|
// but they're guaranteed non-null when this function is called.
|
||||||
|
if (!originatingChannel || !originatingTo) return;
|
||||||
|
const result = await routeReply({
|
||||||
|
payload,
|
||||||
|
channel: originatingChannel,
|
||||||
|
to: originatingTo,
|
||||||
|
cfg,
|
||||||
|
});
|
||||||
|
if (!result.ok) {
|
||||||
|
logVerbose(
|
||||||
|
`dispatch-from-config: route-reply failed: ${result.error ?? "unknown error"}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
|
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
|
||||||
params.ctx,
|
ctx,
|
||||||
{
|
{
|
||||||
...params.replyOptions,
|
...params.replyOptions,
|
||||||
onToolResult: (payload: ReplyPayload) => {
|
onToolResult: (payload: ReplyPayload) => {
|
||||||
params.dispatcher.sendToolResult(payload);
|
if (shouldRouteToOriginating) {
|
||||||
|
// Fire-and-forget for streaming tool results when routing.
|
||||||
|
void sendPayloadAsync(payload);
|
||||||
|
} else {
|
||||||
|
// Synchronous dispatch to preserve callback timing.
|
||||||
|
dispatcher.sendToolResult(payload);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
onBlockReply: (payload: ReplyPayload) => {
|
onBlockReply: (payload: ReplyPayload) => {
|
||||||
params.dispatcher.sendBlockReply(payload);
|
if (shouldRouteToOriginating) {
|
||||||
|
// Fire-and-forget for streaming block replies when routing.
|
||||||
|
void sendPayloadAsync(payload);
|
||||||
|
} else {
|
||||||
|
// Synchronous dispatch to preserve callback timing.
|
||||||
|
dispatcher.sendBlockReply(payload);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
params.cfg,
|
cfg,
|
||||||
);
|
);
|
||||||
|
|
||||||
const replies = replyResult
|
const replies = replyResult
|
||||||
@@ -38,9 +90,26 @@ export async function dispatchReplyFromConfig(params: {
|
|||||||
|
|
||||||
let queuedFinal = false;
|
let queuedFinal = false;
|
||||||
for (const reply of replies) {
|
for (const reply of replies) {
|
||||||
queuedFinal = params.dispatcher.sendFinalReply(reply) || queuedFinal;
|
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
|
||||||
|
// Route final reply to originating channel.
|
||||||
|
const result = await routeReply({
|
||||||
|
payload: reply,
|
||||||
|
channel: originatingChannel,
|
||||||
|
to: originatingTo,
|
||||||
|
cfg,
|
||||||
|
});
|
||||||
|
if (!result.ok) {
|
||||||
|
logVerbose(
|
||||||
|
`dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// Mark as queued since we handled it ourselves.
|
||||||
|
queuedFinal = true;
|
||||||
|
} else {
|
||||||
|
queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
await params.dispatcher.waitForIdle();
|
await dispatcher.waitForIdle();
|
||||||
|
|
||||||
return { queuedFinal, counts: params.dispatcher.getQueuedCounts() };
|
return { queuedFinal, counts: dispatcher.getQueuedCounts() };
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -366,6 +366,7 @@ export async function monitorIMessageProvider(
|
|||||||
SenderName: sender,
|
SenderName: sender,
|
||||||
SenderId: sender,
|
SenderId: sender,
|
||||||
Provider: "imessage",
|
Provider: "imessage",
|
||||||
|
Surface: "imessage",
|
||||||
MessageSid: message.id ? String(message.id) : undefined,
|
MessageSid: message.id ? String(message.id) : undefined,
|
||||||
Timestamp: createdAt,
|
Timestamp: createdAt,
|
||||||
MediaPath: mediaPath,
|
MediaPath: mediaPath,
|
||||||
|
|||||||
@@ -463,6 +463,7 @@ export async function monitorSignalProvider(
|
|||||||
SenderName: envelope.sourceName ?? sender,
|
SenderName: envelope.sourceName ?? sender,
|
||||||
SenderId: sender,
|
SenderId: sender,
|
||||||
Provider: "signal" as const,
|
Provider: "signal" as const,
|
||||||
|
Surface: "signal" as const,
|
||||||
MessageSid: envelope.timestamp ? String(envelope.timestamp) : undefined,
|
MessageSid: envelope.timestamp ? String(envelope.timestamp) : undefined,
|
||||||
Timestamp: envelope.timestamp ?? undefined,
|
Timestamp: envelope.timestamp ?? undefined,
|
||||||
MediaPath: mediaPath,
|
MediaPath: mediaPath,
|
||||||
|
|||||||
@@ -823,6 +823,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
|||||||
SenderName: senderName,
|
SenderName: senderName,
|
||||||
SenderId: message.user,
|
SenderId: message.user,
|
||||||
Provider: "slack" as const,
|
Provider: "slack" as const,
|
||||||
|
Surface: "slack" as const,
|
||||||
MessageSid: message.ts,
|
MessageSid: message.ts,
|
||||||
ReplyToId: message.thread_ts ?? message.ts,
|
ReplyToId: message.thread_ts ?? message.ts,
|
||||||
Timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined,
|
Timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined,
|
||||||
|
|||||||
Reference in New Issue
Block a user