From 9d50ebad7d331b93a01b393c0c3e86c3958bcbdb Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Tue, 6 Jan 2026 10:58:45 -0800 Subject: [PATCH] feat(routing): route replies to originating channel Implement reply routing based on OriginatingChannel/OriginatingTo fields. This ensures replies go back to the provider where the message originated instead of using the session's lastChannel. Changes: - Add OriginatingChannel/OriginatingTo fields to MsgContext (templating.ts) - Add originatingChannel/originatingTo fields to FollowupRun (queue.ts) - Create route-reply.ts with provider-agnostic router - Update all providers (Telegram, Slack, Discord, Signal, iMessage) to pass originating channel info - Update reply.ts to pass originating channel to followupRun - Update followup-runner.ts to use route-reply for originating channels This addresses the issue where messages from one provider (e.g., Slack) would receive replies on a different provider (e.g., Telegram) because the queue used the last active dispatcher instead of the originating one. --- src/auto-reply/reply.ts | 3 + src/auto-reply/reply/followup-runner.ts | 42 +++++++- src/auto-reply/reply/queue.ts | 12 +++ src/auto-reply/reply/route-reply.ts | 135 ++++++++++++++++++++++++ src/auto-reply/templating.ts | 21 ++++ src/discord/monitor.ts | 8 +- src/imessage/monitor.ts | 6 +- src/signal/monitor.ts | 6 +- src/slack/monitor.ts | 13 ++- src/telegram/bot.ts | 4 + 10 files changed, 238 insertions(+), 12 deletions(-) create mode 100644 src/auto-reply/reply/route-reply.ts diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 34c8b0115..b1ea70ef9 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -714,6 +714,9 @@ export async function getReplyFromConfig( prompt: queuedBody, summaryLine: baseBodyTrimmedRaw, enqueuedAt: Date.now(), + // Originating channel for reply routing. + originatingChannel: ctx.OriginatingChannel, + originatingTo: ctx.OriginatingTo, run: { agentId, agentDir, diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 71e183263..e386e33e0 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -13,6 +13,7 @@ import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { FollowupRun } from "./queue.js"; import { extractReplyToTag } from "./reply-tags.js"; +import { isRoutableChannel, routeReply } from "./route-reply.js"; import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; @@ -37,11 +38,28 @@ export function createFollowupRunner(params: { agentCfgContextTokens, } = params; - const sendFollowupPayloads = async (payloads: ReplyPayload[]) => { - if (!opts?.onBlockReply) { + /** + * Sends followup payloads, routing to the originating channel if set. + * + * When originatingChannel/originatingTo are set on the queued run, + * replies are routed directly to that provider instead of using the + * session's current dispatcher. This ensures replies go back to + * where the message originated. + */ + const sendFollowupPayloads = async ( + payloads: ReplyPayload[], + queued: FollowupRun, + ) => { + // Check if we should route to originating channel. + const { originatingChannel, originatingTo } = queued; + const shouldRouteToOriginating = + isRoutableChannel(originatingChannel) && originatingTo; + + if (!shouldRouteToOriginating && !opts?.onBlockReply) { logVerbose("followup queue: no onBlockReply handler; dropping payloads"); return; } + for (const payload of payloads) { if (!payload?.text && !payload?.mediaUrl && !payload?.mediaUrls?.length) { continue; @@ -54,7 +72,23 @@ export function createFollowupRunner(params: { continue; } await typing.startTypingOnText(payload.text); - await opts.onBlockReply(payload); + + // Route to originating channel if set, otherwise fall back to dispatcher. + if (shouldRouteToOriginating) { + const result = await routeReply({ + payload, + channel: originatingChannel, + to: originatingTo, + cfg: queued.run.config, + }); + if (!result.ok) { + logVerbose( + `followup queue: route-reply failed: ${result.error ?? "unknown error"}`, + ); + } + } else if (opts?.onBlockReply) { + await opts.onBlockReply(payload); + } } }; @@ -210,7 +244,7 @@ export function createFollowupRunner(params: { } } - await sendFollowupPayloads(replyTaggedPayloads); + await sendFollowupPayloads(replyTaggedPayloads, queued); } finally { typing.markRunComplete(); } diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index 2d9093cc5..2645a8704 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -3,6 +3,7 @@ import { parseDurationMs } from "../../cli/parse-duration.js"; import type { ClawdbotConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions.js"; import { defaultRuntime } from "../../runtime.js"; +import type { OriginatingChannelType } from "../templating.js"; import type { ElevatedLevel, ThinkLevel, VerboseLevel } from "./directives.js"; export type QueueMode = | "steer" @@ -22,6 +23,17 @@ export type FollowupRun = { prompt: string; summaryLine?: string; enqueuedAt: number; + /** + * Originating channel for reply routing. + * When set, replies should be routed back to this provider + * instead of using the session's lastChannel. + */ + originatingChannel?: OriginatingChannelType; + /** + * Originating destination for reply routing. + * The chat/channel/user ID where the reply should be sent. + */ + originatingTo?: string; run: { agentId: string; agentDir: string; diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts new file mode 100644 index 000000000..55f251b43 --- /dev/null +++ b/src/auto-reply/reply/route-reply.ts @@ -0,0 +1,135 @@ +/** + * Provider-agnostic reply router. + * + * Routes replies to the originating channel based on OriginatingChannel/OriginatingTo + * instead of using the session's lastChannel. This ensures replies go back to the + * provider where the message originated, even when the main session is shared + * across multiple providers. + */ + +import type { ClawdbotConfig } from "../../config/config.js"; +import { sendMessageDiscord } from "../../discord/send.js"; +import { sendMessageIMessage } from "../../imessage/send.js"; +import { sendMessageSignal } from "../../signal/send.js"; +import { sendMessageSlack } from "../../slack/send.js"; +import { sendMessageTelegram } from "../../telegram/send.js"; +import type { OriginatingChannelType } from "../templating.js"; +import type { ReplyPayload } from "../types.js"; + +export type RouteReplyParams = { + /** The reply payload to send. */ + payload: ReplyPayload; + /** The originating channel type (telegram, slack, etc). */ + channel: OriginatingChannelType; + /** The destination chat/channel/user ID. */ + to: string; + /** Config for provider-specific settings. */ + cfg: ClawdbotConfig; +}; + +export type RouteReplyResult = { + /** Whether the reply was sent successfully. */ + ok: boolean; + /** Optional message ID from the provider. */ + messageId?: string; + /** Error message if the send failed. */ + error?: string; +}; + +/** + * Routes a reply payload to the specified channel. + * + * This function provides a unified interface for sending messages to any + * supported provider. It's used by the followup queue to route replies + * back to the originating channel when OriginatingChannel/OriginatingTo + * are set. + */ +export async function routeReply( + params: RouteReplyParams, +): Promise { + const { payload, channel, to } = params; + const text = payload.text ?? ""; + const mediaUrl = payload.mediaUrl ?? payload.mediaUrls?.[0]; + + // Skip empty replies. + if (!text.trim() && !mediaUrl) { + return { ok: true }; + } + + try { + switch (channel) { + case "telegram": { + const result = await sendMessageTelegram(to, text, { mediaUrl }); + return { ok: true, messageId: result.messageId }; + } + + case "slack": { + const result = await sendMessageSlack(to, text, { mediaUrl }); + return { ok: true, messageId: result.messageId }; + } + + case "discord": { + const result = await sendMessageDiscord(to, text, { mediaUrl }); + return { ok: true, messageId: result.messageId }; + } + + case "signal": { + const result = await sendMessageSignal(to, text, { mediaUrl }); + return { ok: true, messageId: result.messageId }; + } + + case "imessage": { + const result = await sendMessageIMessage(to, text, { mediaUrl }); + return { ok: true, messageId: result.messageId }; + } + + case "whatsapp": { + // WhatsApp doesn't have a standalone send function in this codebase. + // Falls through to unknown channel handling. + return { + ok: false, + error: `WhatsApp routing not yet implemented`, + }; + } + + case "webchat": { + // Webchat is typically handled differently (real-time WebSocket). + // Falls through to unknown channel handling. + return { + ok: false, + error: `Webchat routing not supported for queued replies`, + }; + } + + default: { + // Exhaustive check for unknown channel types. + const _exhaustive: never = channel; + return { + ok: false, + error: `Unknown channel: ${String(_exhaustive)}`, + }; + } + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return { + ok: false, + error: `Failed to route reply to ${channel}: ${message}`, + }; + } +} + +/** + * Checks if a channel type is routable via routeReply. + * + * Some channels (webchat, whatsapp) require special handling and + * cannot be routed through this generic interface. + */ +export function isRoutableChannel( + channel: OriginatingChannelType | undefined, +): channel is "telegram" | "slack" | "discord" | "signal" | "imessage" { + if (!channel) return false; + return ["telegram", "slack", "discord", "signal", "imessage"].includes( + channel, + ); +} diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index 1ba411358..cccdc3663 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -1,3 +1,13 @@ +/** Valid provider channels for message routing. */ +export type OriginatingChannelType = + | "telegram" + | "slack" + | "discord" + | "signal" + | "imessage" + | "whatsapp" + | "webchat"; + export type MsgContext = { Body?: string; From?: string; @@ -38,6 +48,17 @@ export type MsgContext = { MessageThreadId?: number; /** Telegram forum supergroup marker. */ IsForum?: boolean; + /** + * Originating channel for reply routing. + * When set, replies should be routed back to this provider + * instead of using lastChannel from the session. + */ + OriginatingChannel?: OriginatingChannelType; + /** + * Originating destination for reply routing. + * The chat/channel/user ID where the reply should be sent. + */ + OriginatingTo?: string; }; export type TemplateContext = MsgContext & { diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 3fb3285ea..cb1bccc12 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -741,14 +741,13 @@ export function createDiscordMessageHandler(params: { combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`; } + const discordTo = `channel:${message.channelId}`; const ctxPayload = { Body: combinedBody, From: isDirectMessage ? `discord:${author.id}` : `group:${message.channelId}`, - To: isDirectMessage - ? `user:${author.id}` - : `channel:${message.channelId}`, + To: discordTo, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isDirectMessage ? "direct" : "group", @@ -772,6 +771,9 @@ export function createDiscordMessageHandler(params: { MediaUrl: media?.path, CommandAuthorized: commandAuthorized, CommandSource: "text" as const, + // Originating channel for reply routing. + OriginatingChannel: "discord" as const, + OriginatingTo: discordTo, }; const replyTarget = ctxPayload.To ?? undefined; if (!replyTarget) { diff --git a/src/imessage/monitor.ts b/src/imessage/monitor.ts index 7db1ac381..3c01425d1 100644 --- a/src/imessage/monitor.ts +++ b/src/imessage/monitor.ts @@ -351,10 +351,11 @@ export async function monitorIMessageProvider( : normalizeIMessageHandle(sender), }, }); + const imessageTo = chatTarget || `imessage:${sender}`; const ctxPayload = { Body: body, From: isGroup ? `group:${chatId}` : `imessage:${sender}`, - To: chatTarget || `imessage:${sender}`, + To: imessageTo, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", @@ -372,6 +373,9 @@ export async function monitorIMessageProvider( MediaUrl: mediaPath, WasMentioned: mentioned, CommandAuthorized: commandAuthorized, + // Originating channel for reply routing. + OriginatingChannel: "imessage" as const, + OriginatingTo: imessageTo, }; if (!isGroup) { diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index 7c1873a2c..24cb995d2 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -451,10 +451,11 @@ export async function monitorSignalProvider( id: isGroup ? (groupId ?? "unknown") : normalizeE164(sender), }, }); + const signalTo = isGroup ? `group:${groupId}` : `signal:${sender}`; const ctxPayload = { Body: body, From: isGroup ? `group:${groupId ?? "unknown"}` : `signal:${sender}`, - To: isGroup ? `group:${groupId ?? "unknown"}` : `signal:${sender}`, + To: signalTo, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", @@ -468,6 +469,9 @@ export async function monitorSignalProvider( MediaType: mediaType, MediaUrl: mediaPath, CommandAuthorized: commandAuthorized, + // Originating channel for reply routing. + OriginatingChannel: "signal" as const, + OriginatingTo: signalTo, }; if (!isGroup) { diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index 33581db13..5acbed0d0 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -809,12 +809,13 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { }); const isRoomish = isRoom || isGroupDm; + const slackTo = isDirectMessage + ? `user:${message.user}` + : `channel:${message.channel}`; const ctxPayload = { Body: body, From: slackFrom, - To: isDirectMessage - ? `user:${message.user}` - : `channel:${message.channel}`, + To: slackTo, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isDirectMessage ? "direct" : isRoom ? "room" : "group", @@ -830,6 +831,9 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { MediaType: media?.contentType, MediaUrl: media?.path, CommandAuthorized: commandAuthorized, + // Originating channel for reply routing. + OriginatingChannel: "slack" as const, + OriginatingTo: slackTo, }; const replyTarget = ctxPayload.To ?? undefined; @@ -1570,6 +1574,9 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { AccountId: route.accountId, CommandSource: "native" as const, CommandAuthorized: commandAuthorized, + // Originating channel for reply routing. + OriginatingChannel: "slack" as const, + OriginatingTo: `user:${command.user_id}`, }; const replyResult = await getReplyFromConfig(ctxPayload, undefined, cfg); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index e6f49faaf..f181021a0 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -414,6 +414,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { SenderId: senderId || undefined, SenderUsername: senderUsername || undefined, Provider: "telegram", + Surface: "telegram", MessageSid: String(msg.message_id), ReplyToId: replyTarget?.id, ReplyToBody: replyTarget?.body, @@ -433,6 +434,9 @@ export function createTelegramBot(opts: TelegramBotOptions) { CommandAuthorized: commandAuthorized, MessageThreadId: messageThreadId, IsForum: isForum, + // Originating channel for reply routing. + OriginatingChannel: "telegram" as const, + OriginatingTo: `telegram:${chatId}`, }; if (replyTarget && shouldLogVerbose()) {