diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 7eba4cf4b..b9ec5a2c6 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -1,8 +1,10 @@ import type { ClawdbotConfig } from "../../config/config.js"; +import { logVerbose } from "../../globals.js"; import { getReplyFromConfig } from "../reply.js"; import type { MsgContext } from "../templating.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; +import { isRoutableChannel, routeReply } from "./route-reply.js"; type DispatchFromConfigResult = { queuedFinal: boolean; @@ -16,18 +18,68 @@ export async function dispatchReplyFromConfig(params: { replyOptions?: Omit; replyResolver?: typeof getReplyFromConfig; }): Promise { + const { ctx, cfg, dispatcher } = params; + + // Check if we should route replies to originating channel instead of dispatcher. + // Only route when the originating channel is DIFFERENT from the current surface. + // This handles cross-provider routing (e.g., message from Telegram being processed + // by a shared session that's currently on Slack) while preserving normal dispatcher + // flow when the provider handles its own messages. + const originatingChannel = ctx.OriginatingChannel; + const originatingTo = ctx.OriginatingTo; + const currentSurface = ctx.Surface?.toLowerCase(); + const shouldRouteToOriginating = + isRoutableChannel(originatingChannel) && + originatingTo && + originatingChannel !== currentSurface; + + /** + * Helper to send a payload via route-reply (async). + * Only used when actually routing to a different provider. + * Note: Only called when shouldRouteToOriginating is true, so + * originatingChannel and originatingTo are guaranteed to be defined. + */ + const sendPayloadAsync = async (payload: ReplyPayload): Promise => { + // TypeScript doesn't narrow these from the shouldRouteToOriginating check, + // but they're guaranteed non-null when this function is called. + if (!originatingChannel || !originatingTo) return; + const result = await routeReply({ + payload, + channel: originatingChannel, + to: originatingTo, + cfg, + }); + if (!result.ok) { + logVerbose( + `dispatch-from-config: route-reply failed: ${result.error ?? "unknown error"}`, + ); + } + }; + const replyResult = await (params.replyResolver ?? getReplyFromConfig)( - params.ctx, + ctx, { ...params.replyOptions, onToolResult: (payload: ReplyPayload) => { - params.dispatcher.sendToolResult(payload); + if (shouldRouteToOriginating) { + // Fire-and-forget for streaming tool results when routing. + void sendPayloadAsync(payload); + } else { + // Synchronous dispatch to preserve callback timing. + dispatcher.sendToolResult(payload); + } }, onBlockReply: (payload: ReplyPayload) => { - params.dispatcher.sendBlockReply(payload); + if (shouldRouteToOriginating) { + // Fire-and-forget for streaming block replies when routing. + void sendPayloadAsync(payload); + } else { + // Synchronous dispatch to preserve callback timing. + dispatcher.sendBlockReply(payload); + } }, }, - params.cfg, + cfg, ); const replies = replyResult @@ -38,9 +90,26 @@ export async function dispatchReplyFromConfig(params: { let queuedFinal = false; for (const reply of replies) { - queuedFinal = params.dispatcher.sendFinalReply(reply) || queuedFinal; + if (shouldRouteToOriginating && originatingChannel && originatingTo) { + // Route final reply to originating channel. + const result = await routeReply({ + payload: reply, + channel: originatingChannel, + to: originatingTo, + cfg, + }); + if (!result.ok) { + logVerbose( + `dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`, + ); + } + // Mark as queued since we handled it ourselves. + queuedFinal = true; + } else { + queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; + } } - await params.dispatcher.waitForIdle(); + await dispatcher.waitForIdle(); - return { queuedFinal, counts: params.dispatcher.getQueuedCounts() }; + return { queuedFinal, counts: dispatcher.getQueuedCounts() }; } diff --git a/src/imessage/monitor.ts b/src/imessage/monitor.ts index 3c01425d1..d913b81ab 100644 --- a/src/imessage/monitor.ts +++ b/src/imessage/monitor.ts @@ -366,6 +366,7 @@ export async function monitorIMessageProvider( SenderName: sender, SenderId: sender, Provider: "imessage", + Surface: "imessage", MessageSid: message.id ? String(message.id) : undefined, Timestamp: createdAt, MediaPath: mediaPath, diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index 24cb995d2..15bf8ed4b 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -463,6 +463,7 @@ export async function monitorSignalProvider( SenderName: envelope.sourceName ?? sender, SenderId: sender, Provider: "signal" as const, + Surface: "signal" as const, MessageSid: envelope.timestamp ? String(envelope.timestamp) : undefined, Timestamp: envelope.timestamp ?? undefined, MediaPath: mediaPath, diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index 5acbed0d0..6db05d41e 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -823,6 +823,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { SenderName: senderName, SenderId: message.user, Provider: "slack" as const, + Surface: "slack" as const, MessageSid: message.ts, ReplyToId: message.thread_ts ?? message.ts, Timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined,