From ccea3a0615e5171c09d05199faeb7b1bec4bb536 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 17 Jan 2026 05:28:55 +0000 Subject: [PATCH] refactor: unify delivery target resolution Co-authored-by: adam91holt --- src/cron/isolated-agent/delivery-target.ts | 65 ++++---- ...tbeat-runner.returns-default-unset.test.ts | 27 ++++ src/infra/heartbeat-runner.ts | 13 +- src/infra/outbound/targets.test.ts | 95 ++++++++++- src/infra/outbound/targets.ts | 147 ++++++++++++++---- 5 files changed, 280 insertions(+), 67 deletions(-) diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index 45c04f0f1..55759ba95 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -1,4 +1,3 @@ -import { normalizeChannelId } from "../../channels/plugins/index.js"; import type { ChannelId } from "../../channels/plugins/types.js"; import { DEFAULT_CHAT_CHANNEL } from "../../channels/registry.js"; import type { ClawdbotConfig } from "../../config/config.js"; @@ -9,8 +8,7 @@ import { } from "../../config/sessions.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; import type { OutboundChannel } from "../../infra/outbound/targets.js"; -import { resolveOutboundTarget } from "../../infra/outbound/targets.js"; -import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js"; +import { resolveOutboundTarget, resolveSessionDeliveryTarget } from "../../infra/outbound/targets.js"; export async function resolveDeliveryTarget( cfg: ClawdbotConfig, @@ -26,56 +24,63 @@ export async function resolveDeliveryTarget( mode: "explicit" | "implicit"; error?: Error; }> { - const requestedRaw = typeof jobPayload.channel === "string" ? jobPayload.channel : "last"; - const requestedChannelHint = normalizeMessageChannel(requestedRaw) ?? requestedRaw; - const explicitTo = - typeof jobPayload.to === "string" && jobPayload.to.trim() ? jobPayload.to.trim() : undefined; + const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last"; + const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined; const sessionCfg = cfg.session; const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId }); const storePath = resolveStorePath(sessionCfg?.store, { agentId }); const store = loadSessionStore(storePath); const main = store[mainSessionKey]; - const lastChannel = - main?.lastChannel && main.lastChannel !== INTERNAL_MESSAGE_CHANNEL - ? normalizeChannelId(main.lastChannel) - : undefined; - const lastTo = typeof main?.lastTo === "string" ? main.lastTo.trim() : ""; - const lastAccountId = main?.lastAccountId; - let channel: Exclude | undefined = - requestedChannelHint === "last" - ? (lastChannel ?? undefined) - : requestedChannelHint === INTERNAL_MESSAGE_CHANNEL - ? undefined - : (normalizeChannelId(requestedChannelHint) ?? undefined); - if (!channel) { + const preliminary = resolveSessionDeliveryTarget({ + entry: main, + requestedChannel, + explicitTo, + allowMismatchedLastTo: true, + }); + + let fallbackChannel: Exclude | undefined; + if (!preliminary.channel) { try { const selection = await resolveMessageChannelSelection({ cfg }); - channel = selection.channel; + fallbackChannel = selection.channel; } catch { - channel = lastChannel ?? DEFAULT_CHAT_CHANNEL; + fallbackChannel = preliminary.lastChannel ?? DEFAULT_CHAT_CHANNEL; } } - const toCandidate = explicitTo ?? (lastTo || undefined); - const mode: "explicit" | "implicit" = explicitTo ? "explicit" : "implicit"; + const resolved = fallbackChannel + ? resolveSessionDeliveryTarget({ + entry: main, + requestedChannel, + explicitTo, + fallbackChannel, + allowMismatchedLastTo: true, + mode: preliminary.mode, + }) + : preliminary; + + const channel = resolved.channel ?? fallbackChannel ?? DEFAULT_CHAT_CHANNEL; + const mode = resolved.mode as "explicit" | "implicit"; + const toCandidate = resolved.to; + if (!toCandidate) { - return { channel, to: undefined, accountId: lastAccountId, mode }; + return { channel, to: undefined, accountId: resolved.accountId, mode }; } - const resolved = resolveOutboundTarget({ + const docked = resolveOutboundTarget({ channel, to: toCandidate, cfg, - accountId: channel === lastChannel ? lastAccountId : undefined, + accountId: resolved.accountId, mode, }); return { channel, - to: resolved.ok ? resolved.to : undefined, - accountId: channel === lastChannel ? lastAccountId : undefined, + to: docked.ok ? docked.to : undefined, + accountId: resolved.accountId, mode, - error: resolved.ok ? undefined : resolved.error, + error: docked.ok ? undefined : docked.error, }; } diff --git a/src/infra/heartbeat-runner.returns-default-unset.test.ts b/src/infra/heartbeat-runner.returns-default-unset.test.ts index a9902f127..dde804234 100644 --- a/src/infra/heartbeat-runner.returns-default-unset.test.ts +++ b/src/infra/heartbeat-runner.returns-default-unset.test.ts @@ -119,6 +119,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { expect(resolveHeartbeatDeliveryTarget({ cfg, entry: baseEntry })).toEqual({ channel: "none", reason: "target-none", + accountId: undefined, + lastChannel: undefined, + lastAccountId: undefined, }); }); @@ -132,6 +135,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { expect(resolveHeartbeatDeliveryTarget({ cfg, entry })).toEqual({ channel: "whatsapp", to: "+1555", + accountId: undefined, + lastChannel: "whatsapp", + lastAccountId: undefined, }); }); @@ -147,6 +153,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { expect(resolveHeartbeatDeliveryTarget({ cfg, entry: baseEntry })).toEqual({ channel: "whatsapp", to: "+555123", + accountId: undefined, + lastChannel: undefined, + lastAccountId: undefined, }); }); @@ -160,6 +169,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { expect(resolveHeartbeatDeliveryTarget({ cfg, entry })).toEqual({ channel: "none", reason: "no-target", + accountId: undefined, + lastChannel: undefined, + lastAccountId: undefined, }); }); @@ -177,6 +189,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { channel: "whatsapp", to: "+1555", reason: "allowFrom-fallback", + accountId: undefined, + lastChannel: "whatsapp", + lastAccountId: undefined, }); }); @@ -192,6 +207,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { expect(resolveHeartbeatDeliveryTarget({ cfg, entry })).toEqual({ channel: "whatsapp", to: "120363401234567890@g.us", + accountId: undefined, + lastChannel: "whatsapp", + lastAccountId: undefined, }); }); @@ -207,6 +225,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { expect(resolveHeartbeatDeliveryTarget({ cfg, entry })).toEqual({ channel: "whatsapp", to: "120363401234567890@g.us", + accountId: undefined, + lastChannel: "whatsapp", + lastAccountId: undefined, }); }); @@ -217,6 +238,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { expect(resolveHeartbeatDeliveryTarget({ cfg, entry: baseEntry })).toEqual({ channel: "telegram", to: "123", + accountId: undefined, + lastChannel: undefined, + lastAccountId: undefined, }); }); @@ -234,6 +258,9 @@ describe("resolveHeartbeatDeliveryTarget", () => { ).toEqual({ channel: "whatsapp", to: "+1555", + accountId: undefined, + lastChannel: "whatsapp", + lastAccountId: undefined, }); }); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 001a0abeb..c5609b3ad 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -8,7 +8,7 @@ import { } from "../auto-reply/heartbeat.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; -import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js"; +import { getChannelPlugin } from "../channels/plugins/index.js"; import type { ChannelHeartbeatDeps } from "../channels/plugins/types.js"; import { parseDurationMs } from "../cli/parse-duration.js"; import type { ClawdbotConfig } from "../config/config.js"; @@ -26,7 +26,6 @@ import { createSubsystemLogger } from "../logging.js"; import { getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeAgentId } from "../routing/session-key.js"; -import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js"; import { emitHeartbeatEvent } from "./heartbeat-events.js"; import { type HeartbeatRunResult, @@ -337,15 +336,13 @@ export async function runHeartbeatOnce(opts: { const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId); const previousUpdatedAt = entry?.updatedAt; const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat }); - const lastChannel = - entry?.lastChannel && entry.lastChannel !== INTERNAL_MESSAGE_CHANNEL - ? normalizeChannelId(entry.lastChannel) - : undefined; + const lastChannel = delivery.lastChannel; + const lastAccountId = delivery.lastAccountId; const senderProvider = delivery.channel !== "none" ? delivery.channel : lastChannel; const senderAllowFrom = senderProvider ? (getChannelPlugin(senderProvider)?.config.resolveAllowFrom?.({ cfg, - accountId: senderProvider === lastChannel ? entry?.lastAccountId : undefined, + accountId: senderProvider === lastChannel ? lastAccountId : undefined, }) ?? []) : []; const sender = resolveHeartbeatSender({ @@ -460,7 +457,7 @@ export async function runHeartbeatOnce(opts: { return { status: "ran", durationMs: Date.now() - startedAt }; } - const deliveryAccountId = delivery.channel === lastChannel ? entry?.lastAccountId : undefined; + const deliveryAccountId = delivery.accountId; const heartbeatPlugin = getChannelPlugin(delivery.channel); if (heartbeatPlugin?.heartbeat?.checkReady) { const readiness = await heartbeatPlugin.heartbeat.checkReady({ diff --git a/src/infra/outbound/targets.test.ts b/src/infra/outbound/targets.test.ts index bed3bb9ae..0c1d89796 100644 --- a/src/infra/outbound/targets.test.ts +++ b/src/infra/outbound/targets.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it } from "vitest"; import type { ClawdbotConfig } from "../../config/config.js"; -import { resolveOutboundTarget } from "./targets.js"; +import { resolveOutboundTarget, resolveSessionDeliveryTarget } from "./targets.js"; describe("resolveOutboundTarget", () => { it("falls back to whatsapp allowFrom via config", () => { @@ -93,3 +93,96 @@ describe("resolveOutboundTarget", () => { } }); }); + +describe("resolveSessionDeliveryTarget", () => { + it("derives implicit delivery from the last route", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-1", + updatedAt: 1, + lastChannel: " whatsapp ", + lastTo: " +1555 ", + lastAccountId: " acct-1 ", + }, + requestedChannel: "last", + }); + + expect(resolved).toEqual({ + channel: "whatsapp", + to: "+1555", + accountId: "acct-1", + mode: "implicit", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: "acct-1", + }); + }); + + it("prefers explicit targets without reusing lastTo", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-2", + updatedAt: 1, + lastChannel: "whatsapp", + lastTo: "+1555", + }, + requestedChannel: "telegram", + }); + + expect(resolved).toEqual({ + channel: "telegram", + to: undefined, + accountId: undefined, + mode: "implicit", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: undefined, + }); + }); + + it("allows mismatched lastTo when configured", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-3", + updatedAt: 1, + lastChannel: "whatsapp", + lastTo: "+1555", + }, + requestedChannel: "telegram", + allowMismatchedLastTo: true, + }); + + expect(resolved).toEqual({ + channel: "telegram", + to: "+1555", + accountId: undefined, + mode: "implicit", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: undefined, + }); + }); + + it("falls back to a provided channel when requested is unsupported", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-4", + updatedAt: 1, + lastChannel: "whatsapp", + lastTo: "+1555", + }, + requestedChannel: "webchat", + fallbackChannel: "slack", + }); + + expect(resolved).toEqual({ + channel: "slack", + to: undefined, + accountId: undefined, + mode: "implicit", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: undefined, + }); + }); +}); diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index 9df3c731f..8302f4c99 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -3,11 +3,16 @@ import type { ChannelId, ChannelOutboundTargetMode } from "../../channels/plugin import type { ClawdbotConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions.js"; import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js"; +import { deliveryContextFromSession } from "../../utils/delivery-context.js"; import type { DeliverableMessageChannel, GatewayMessageChannel, } from "../../utils/message-channel.js"; -import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js"; +import { + INTERNAL_MESSAGE_CHANNEL, + isDeliverableMessageChannel, + normalizeMessageChannel, +} from "../../utils/message-channel.js"; export type OutboundChannel = DeliverableMessageChannel | "none"; @@ -17,10 +22,79 @@ export type OutboundTarget = { channel: OutboundChannel; to?: string; reason?: string; + accountId?: string; + lastChannel?: DeliverableMessageChannel; + lastAccountId?: string; }; export type OutboundTargetResolution = { ok: true; to: string } | { ok: false; error: Error }; +export type SessionDeliveryTarget = { + channel?: DeliverableMessageChannel; + to?: string; + accountId?: string; + mode: ChannelOutboundTargetMode; + lastChannel?: DeliverableMessageChannel; + lastTo?: string; + lastAccountId?: string; +}; + +export function resolveSessionDeliveryTarget(params: { + entry?: SessionEntry; + requestedChannel?: GatewayMessageChannel | "last"; + explicitTo?: string; + fallbackChannel?: DeliverableMessageChannel; + allowMismatchedLastTo?: boolean; + mode?: ChannelOutboundTargetMode; +}): SessionDeliveryTarget { + const context = deliveryContextFromSession(params.entry); + const lastChannel = + context?.channel && isDeliverableMessageChannel(context.channel) ? context.channel : undefined; + const lastTo = context?.to; + const lastAccountId = context?.accountId; + + const rawRequested = params.requestedChannel ?? "last"; + const requested = rawRequested === "last" ? "last" : normalizeMessageChannel(rawRequested); + const requestedChannel = + requested === "last" + ? "last" + : requested && isDeliverableMessageChannel(requested) + ? requested + : undefined; + + const explicitTo = + typeof params.explicitTo === "string" && params.explicitTo.trim() + ? params.explicitTo.trim() + : undefined; + + let channel = requestedChannel === "last" ? lastChannel : requestedChannel; + if (!channel && params.fallbackChannel && isDeliverableMessageChannel(params.fallbackChannel)) { + channel = params.fallbackChannel; + } + + let to = explicitTo; + if (!to && lastTo) { + if (channel && channel === lastChannel) { + to = lastTo; + } else if (params.allowMismatchedLastTo) { + to = lastTo; + } + } + + const accountId = channel && channel === lastChannel ? lastAccountId : undefined; + const mode = params.mode ?? (explicitTo ? "explicit" : "implicit"); + + return { + channel, + to, + accountId, + mode, + lastChannel, + lastTo, + lastAccountId, + }; +} + // Channel docking: prefer plugin.outbound.resolveTarget + allowFrom to normalize destinations. export function resolveOutboundTarget(params: { channel: GatewayMessageChannel; @@ -94,48 +168,58 @@ export function resolveHeartbeatDeliveryTarget(params: { } if (target === "none") { - return { channel: "none", reason: "target-none" }; + const base = resolveSessionDeliveryTarget({ entry }); + return { + channel: "none", + reason: "target-none", + accountId: undefined, + lastChannel: base.lastChannel, + lastAccountId: base.lastAccountId, + }; } - const explicitTo = - typeof heartbeat?.to === "string" && heartbeat.to.trim() ? heartbeat.to.trim() : undefined; + const resolvedTarget = resolveSessionDeliveryTarget({ + entry, + requestedChannel: target === "last" ? "last" : target, + explicitTo: heartbeat?.to, + mode: "heartbeat", + }); - const lastChannel = - entry?.lastChannel && entry.lastChannel !== INTERNAL_MESSAGE_CHANNEL - ? normalizeChannelId(entry.lastChannel) - : undefined; - const lastTo = typeof entry?.lastTo === "string" ? entry.lastTo.trim() : ""; - const channel = target === "last" ? lastChannel : target; - - const to = - explicitTo || - (channel && lastChannel === channel ? lastTo : undefined) || - (target === "last" ? lastTo : undefined); - - if (!channel || !to) { - return { channel: "none", reason: "no-target" }; + if (!resolvedTarget.channel || !resolvedTarget.to) { + return { + channel: "none", + reason: "no-target", + accountId: resolvedTarget.accountId, + lastChannel: resolvedTarget.lastChannel, + lastAccountId: resolvedTarget.lastAccountId, + }; } - const accountId = channel === lastChannel ? entry?.lastAccountId : undefined; const resolved = resolveOutboundTarget({ - channel, - to, + channel: resolvedTarget.channel, + to: resolvedTarget.to, cfg, - accountId, + accountId: resolvedTarget.accountId, mode: "heartbeat", }); if (!resolved.ok) { - return { channel: "none", reason: "no-target" }; + return { + channel: "none", + reason: "no-target", + accountId: resolvedTarget.accountId, + lastChannel: resolvedTarget.lastChannel, + lastAccountId: resolvedTarget.lastAccountId, + }; } let reason: string | undefined; - const plugin = getChannelPlugin(channel as ChannelId); + const plugin = getChannelPlugin(resolvedTarget.channel as ChannelId); if (plugin?.config.resolveAllowFrom) { const explicit = resolveOutboundTarget({ - channel, - to, + channel: resolvedTarget.channel, + to: resolvedTarget.to, cfg, - accountId, + accountId: resolvedTarget.accountId, mode: "explicit", }); if (explicit.ok && explicit.to !== resolved.to) { @@ -143,5 +227,12 @@ export function resolveHeartbeatDeliveryTarget(params: { } } - return reason ? { channel, to: resolved.to, reason } : { channel, to: resolved.to }; + return { + channel: resolvedTarget.channel, + to: resolved.to, + reason, + accountId: resolvedTarget.accountId, + lastChannel: resolvedTarget.lastChannel, + lastAccountId: resolvedTarget.lastAccountId, + }; }