diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f38e9d34..37931f6e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ ### Fixes - Sub-agents: normalize announce delivery origin + queue bucketing by accountId to keep multi-account routing stable. (#1061, #1058) — thanks @adam91holt. - Gateway: honor explicit delivery targets without implicit accountId fallback; preserve lastAccountId for implicit routing. +- Gateway: avoid reusing last-to/accountId when the requested channel differs; sync deliveryContext with last route fields. - Repo: fix oxlint config filename and move ignore pattern into config. (#1064) — thanks @connorshea. - Messages: `/stop` now hard-aborts queued followups and sub-agent runs; suppress zero-count stop notes. - Messages: include sender labels for live group messages across channels, matching queued/history formatting. (#1059) diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index e86c27600..3e6ff9f16 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -29,6 +29,7 @@ import { normalizeChatType } from "../../channels/chat-type.js"; import { stripMentions, stripStructuralPrefixes } from "./mentions.js"; import { formatInboundBodyWithSenderMeta } from "./inbound-sender-meta.js"; import { normalizeInboundTextNewlines } from "./inbound-text.js"; +import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; export type SessionInitResult = { sessionCtx: TemplateContext; @@ -199,10 +200,20 @@ export async function initSessionState(params: { const baseEntry = !isNewSession && freshEntry ? entry : undefined; // Track the originating channel/to for announce routing (subagent announce-back). - const lastChannel = + const lastChannelRaw = (ctx.OriginatingChannel as string | undefined)?.trim() || baseEntry?.lastChannel; - const lastTo = ctx.OriginatingTo?.trim() || ctx.To?.trim() || baseEntry?.lastTo; - const lastAccountId = ctx.AccountId?.trim() || baseEntry?.lastAccountId; + const lastToRaw = ctx.OriginatingTo?.trim() || ctx.To?.trim() || baseEntry?.lastTo; + const lastAccountIdRaw = ctx.AccountId?.trim() || baseEntry?.lastAccountId; + const deliveryFields = normalizeSessionDeliveryFields({ + deliveryContext: { + channel: lastChannelRaw, + to: lastToRaw, + accountId: lastAccountIdRaw, + }, + }); + const lastChannel = deliveryFields.lastChannel ?? lastChannelRaw; + const lastTo = deliveryFields.lastTo ?? lastToRaw; + const lastAccountId = deliveryFields.lastAccountId ?? lastAccountIdRaw; sessionEntry = { ...baseEntry, sessionId, @@ -227,6 +238,7 @@ export async function initSessionState(params: { subject: baseEntry?.subject, room: baseEntry?.room, space: baseEntry?.space, + deliveryContext: deliveryFields.deliveryContext, // Track originating channel for subagent announce routing. lastChannel, lastTo, diff --git a/src/commands/agent.delivery.test.ts b/src/commands/agent.delivery.test.ts index c29af2c14..8eabd336a 100644 --- a/src/commands/agent.delivery.test.ts +++ b/src/commands/agent.delivery.test.ts @@ -76,6 +76,7 @@ describe("deliverAgentCommandResult", () => { } as unknown as RuntimeEnv; const sessionEntry = { lastAccountId: "legacy", + lastChannel: "whatsapp", } as SessionEntry; const result = { payloads: [{ text: "hi" }], @@ -141,4 +142,40 @@ describe("deliverAgentCommandResult", () => { expect.objectContaining({ accountId: undefined }), ); }); + + it("skips session accountId when channel differs", async () => { + const cfg = {} as ClawdbotConfig; + const deps = {} as CliDeps; + const runtime = { + log: vi.fn(), + error: vi.fn(), + } as unknown as RuntimeEnv; + const sessionEntry = { + lastAccountId: "legacy", + lastChannel: "telegram", + } as SessionEntry; + const result = { + payloads: [{ text: "hi" }], + meta: {}, + }; + + const { deliverAgentCommandResult } = await import("./agent/delivery.js"); + await deliverAgentCommandResult({ + cfg, + deps, + runtime, + opts: { + message: "hello", + deliver: true, + channel: "whatsapp", + }, + sessionEntry, + result, + payloads: result.payloads, + }); + + expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith( + expect.objectContaining({ accountId: undefined, channel: "whatsapp" }), + ); + }); }); diff --git a/src/commands/agent/delivery.ts b/src/commands/agent/delivery.ts index f79c57620..5fb97c3a2 100644 --- a/src/commands/agent/delivery.ts +++ b/src/commands/agent/delivery.ts @@ -30,12 +30,14 @@ function resolveDeliveryAccountId(params: { opts: AgentCommandOpts; sessionEntry?: SessionEntry; targetMode: ChannelOutboundTargetMode; + deliveryChannel?: string; }) { const sessionOrigin = deliveryContextFromSession(params.sessionEntry); - return ( - normalizeAccountId(params.opts.accountId) ?? - (params.targetMode === "implicit" ? normalizeAccountId(sessionOrigin?.accountId) : undefined) - ); + const explicit = normalizeAccountId(params.opts.accountId); + if (explicit || params.targetMode !== "implicit") return explicit; + if (!params.deliveryChannel || isInternalMessageChannel(params.deliveryChannel)) return undefined; + if (sessionOrigin?.channel !== params.deliveryChannel) return undefined; + return normalizeAccountId(sessionOrigin?.accountId); } export async function deliverAgentCommandResult(params: { @@ -61,7 +63,12 @@ export async function deliverAgentCommandResult(params: { const targetMode: ChannelOutboundTargetMode = opts.deliveryTargetMode ?? (opts.to ? "explicit" : "implicit"); - const resolvedAccountId = resolveDeliveryAccountId({ opts, sessionEntry, targetMode }); + const resolvedAccountId = resolveDeliveryAccountId({ + opts, + sessionEntry, + targetMode, + deliveryChannel, + }); const resolvedTarget = deliver && isDeliveryChannelKnown && deliveryChannel ? resolveOutboundTarget({ diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index 8c5dff606..f33bae2b2 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -130,6 +130,10 @@ describe("sessions", () => { expect(store[mainSessionKey]?.updatedAt).toBeGreaterThanOrEqual(123); expect(store[mainSessionKey]?.lastChannel).toBe("telegram"); expect(store[mainSessionKey]?.lastTo).toBe("12345"); + expect(store[mainSessionKey]?.deliveryContext).toEqual({ + channel: "telegram", + to: "12345", + }); expect(store[mainSessionKey]?.responseUsage).toBe("on"); expect(store[mainSessionKey]?.queueDebounceMs).toBe(1234); expect(store[mainSessionKey]?.reasoningLevel).toBe("on"); @@ -176,6 +180,11 @@ describe("sessions", () => { expect(store["agent:main:main"]?.lastChannel).toBe("whatsapp"); expect(store["agent:main:main"]?.lastTo).toBe("+1555"); expect(store["agent:main:main"]?.lastAccountId).toBe("acct-1"); + expect(store["agent:main:main"]?.deliveryContext).toEqual({ + channel: "whatsapp", + to: "+1555", + accountId: "acct-1", + }); }); it("updateSessionStore keeps deletions when concurrent writes happen", async () => { diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 8378df605..b2639addf 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -4,8 +4,7 @@ import path from "node:path"; import JSON5 from "json5"; import { getFileMtimeMs, isCacheEnabled, resolveCacheTtlMs } from "../cache-utils.js"; -import { normalizeAccountId } from "../../utils/account-id.js"; -import { normalizeMessageChannel } from "../../utils/message-channel.js"; +import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; import { mergeSessionEntry, type SessionEntry } from "./types.js"; // ============================================================================ @@ -44,21 +43,23 @@ function invalidateSessionStoreCache(storePath: string): void { } function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry { - const normalizedLastChannel = normalizeMessageChannel(entry.lastChannel) ?? undefined; - const normalizedLastTo = typeof entry.lastTo === "string" ? entry.lastTo.trim() : undefined; - const normalizedLastAccountId = normalizeAccountId(entry.lastAccountId); - if ( - normalizedLastChannel === entry.lastChannel && - normalizedLastTo === entry.lastTo && - normalizedLastAccountId === entry.lastAccountId - ) { - return entry; - } + const normalized = normalizeSessionDeliveryFields(entry); + const nextDelivery = normalized.deliveryContext; + const sameDelivery = + (entry.deliveryContext?.channel ?? undefined) === nextDelivery?.channel && + (entry.deliveryContext?.to ?? undefined) === nextDelivery?.to && + (entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId; + const sameLast = + entry.lastChannel === normalized.lastChannel && + entry.lastTo === normalized.lastTo && + entry.lastAccountId === normalized.lastAccountId; + if (sameDelivery && sameLast) return entry; return { ...entry, - lastChannel: normalizedLastChannel, - lastTo: normalizedLastTo || undefined, - lastAccountId: normalizedLastAccountId, + deliveryContext: nextDelivery, + lastChannel: normalized.lastChannel, + lastTo: normalized.lastTo, + lastAccountId: normalized.lastAccountId, }; } @@ -331,11 +332,24 @@ export async function updateLastRoute(params: { const store = loadSessionStore(storePath); const existing = store[sessionKey]; const now = Date.now(); + const trimmedAccountId = accountId?.trim(); + const resolvedAccountId = + trimmedAccountId && trimmedAccountId.length > 0 + ? trimmedAccountId + : existing?.lastAccountId ?? existing?.deliveryContext?.accountId; + const normalized = normalizeSessionDeliveryFields({ + deliveryContext: { + channel: channel ?? existing?.lastChannel, + to, + accountId: resolvedAccountId, + }, + }); const next = mergeSessionEntry(existing, { updatedAt: Math.max(existing?.updatedAt ?? 0, now), - lastChannel: channel, - lastTo: to?.trim() ? to.trim() : undefined, - lastAccountId: accountId?.trim() ? accountId.trim() : existing?.lastAccountId, + deliveryContext: normalized.deliveryContext, + lastChannel: normalized.lastChannel ?? channel, + lastTo: normalized.lastTo ?? (to?.trim() ? to.trim() : undefined), + lastAccountId: normalized.lastAccountId ?? resolvedAccountId, }); store[sessionKey] = next; await saveSessionStoreUnlocked(storePath, store); diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index f30354680..88dcfd944 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -2,6 +2,7 @@ import crypto from "node:crypto"; import type { Skill } from "@mariozechner/pi-coding-agent"; import type { ChannelId } from "../../channels/plugins/types.js"; +import type { DeliveryContext } from "../../utils/delivery-context.js"; export type SessionScope = "per-sender" | "global"; @@ -69,6 +70,7 @@ export type SessionEntry = { subject?: string; room?: string; space?: string; + deliveryContext?: DeliveryContext; lastChannel?: SessionChannelId; lastTo?: string; lastAccountId?: string; diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 8d5544f1a..37373a1af 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -9,10 +9,11 @@ import { updateSessionStore, } from "../../config/sessions.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; -import { resolveOutboundTarget } from "../../infra/outbound/targets.js"; +import { resolveOutboundTarget, resolveSessionDeliveryTarget } from "../../infra/outbound/targets.js"; import { defaultRuntime } from "../../runtime.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { normalizeAccountId } from "../../utils/account-id.js"; +import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; import { INTERNAL_MESSAGE_CHANNEL, isDeliverableMessageChannel, @@ -144,6 +145,7 @@ export const agentHandlers: GatewayRequestHandlers = { const sessionId = entry?.sessionId ?? randomUUID(); const labelValue = request.label?.trim() || entry?.label; const spawnedByValue = request.spawnedBy?.trim() || entry?.spawnedBy; + const deliveryFields = normalizeSessionDeliveryFields(entry); const nextEntry: SessionEntry = { sessionId, updatedAt: now, @@ -153,9 +155,10 @@ export const agentHandlers: GatewayRequestHandlers = { systemSent: entry?.systemSent, sendPolicy: entry?.sendPolicy, skillsSnapshot: entry?.skillsSnapshot, - lastChannel: entry?.lastChannel, - lastTo: entry?.lastTo, - lastAccountId: entry?.lastAccountId, + deliveryContext: deliveryFields.deliveryContext, + lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel, + lastTo: deliveryFields.lastTo ?? entry?.lastTo, + lastAccountId: deliveryFields.lastAccountId ?? entry?.lastAccountId, modelOverride: entry?.modelOverride, providerOverride: entry?.providerOverride, label: labelValue, @@ -198,32 +201,32 @@ export const agentHandlers: GatewayRequestHandlers = { const runId = idem; + const wantsDelivery = request.deliver === true; const requestedChannel = normalizeMessageChannel(request.channel) ?? "last"; - - const lastChannel = sessionEntry?.lastChannel; - const lastTo = typeof sessionEntry?.lastTo === "string" ? sessionEntry.lastTo.trim() : ""; const explicitTo = typeof request.to === "string" && request.to.trim() ? request.to.trim() : undefined; - const resolvedAccountId = - normalizeAccountId(request.accountId) ?? - (explicitTo ? undefined : normalizeAccountId(sessionEntry?.lastAccountId)); - const wantsDelivery = request.deliver === true; + const baseDelivery = resolveSessionDeliveryTarget({ + entry: sessionEntry, + requestedChannel: requestedChannel === INTERNAL_MESSAGE_CHANNEL ? "last" : requestedChannel, + explicitTo, + }); const resolvedChannel = (() => { + if (requestedChannel === INTERNAL_MESSAGE_CHANNEL) return INTERNAL_MESSAGE_CHANNEL; if (requestedChannel === "last") { // WebChat is not a deliverable surface. Treat it as "unset" for routing, // so VoiceWake and CLI callers don't get stuck with deliver=false. - if (lastChannel && lastChannel !== INTERNAL_MESSAGE_CHANNEL) { - return lastChannel; + if (baseDelivery.channel && baseDelivery.channel !== INTERNAL_MESSAGE_CHANNEL) { + return baseDelivery.channel; } return wantsDelivery ? DEFAULT_CHAT_CHANNEL : INTERNAL_MESSAGE_CHANNEL; } if (isGatewayMessageChannel(requestedChannel)) return requestedChannel; - if (lastChannel && lastChannel !== INTERNAL_MESSAGE_CHANNEL) { - return lastChannel; + if (baseDelivery.channel && baseDelivery.channel !== INTERNAL_MESSAGE_CHANNEL) { + return baseDelivery.channel; } return wantsDelivery ? DEFAULT_CHAT_CHANNEL : INTERNAL_MESSAGE_CHANNEL; })(); @@ -233,9 +236,19 @@ export const agentHandlers: GatewayRequestHandlers = { : isDeliverableMessageChannel(resolvedChannel) ? "implicit" : undefined; - let resolvedTo = - explicitTo || - (isDeliverableMessageChannel(resolvedChannel) ? lastTo || undefined : undefined); + const resolvedAccountId = + normalizeAccountId(request.accountId) ?? + (deliveryTargetMode === "implicit" && resolvedChannel === baseDelivery.lastChannel + ? baseDelivery.lastAccountId + : undefined); + let resolvedTo = explicitTo; + if ( + !resolvedTo && + isDeliverableMessageChannel(resolvedChannel) && + resolvedChannel === baseDelivery.lastChannel + ) { + resolvedTo = baseDelivery.lastTo; + } if (!resolvedTo && isDeliverableMessageChannel(resolvedChannel)) { const cfg = cfgForAgent ?? loadConfig(); const fallback = resolveOutboundTarget({ diff --git a/src/utils/delivery-context.test.ts b/src/utils/delivery-context.test.ts index 6a91c7b14..fee2628f8 100644 --- a/src/utils/delivery-context.test.ts +++ b/src/utils/delivery-context.test.ts @@ -5,6 +5,7 @@ import { deliveryContextFromSession, mergeDeliveryContext, normalizeDeliveryContext, + normalizeSessionDeliveryFields, } from "./delivery-context.js"; describe("delivery context helpers", () => { @@ -70,4 +71,21 @@ describe("delivery context helpers", () => { accountId: undefined, }); }); + + it("normalizes delivery fields and mirrors them on session entries", () => { + const normalized = normalizeSessionDeliveryFields({ + deliveryContext: { channel: " Slack ", to: " channel:1 ", accountId: " acct-2 " }, + lastChannel: " whatsapp ", + lastTo: " +1555 ", + }); + + expect(normalized.deliveryContext).toEqual({ + channel: "whatsapp", + to: "+1555", + accountId: "acct-2", + }); + expect(normalized.lastChannel).toBe("whatsapp"); + expect(normalized.lastTo).toBe("+1555"); + expect(normalized.lastAccountId).toBe("acct-2"); + }); }); diff --git a/src/utils/delivery-context.ts b/src/utils/delivery-context.ts index e60084ca6..e24e722f4 100644 --- a/src/utils/delivery-context.ts +++ b/src/utils/delivery-context.ts @@ -1,4 +1,5 @@ import { normalizeAccountId } from "./account-id.js"; +import { normalizeMessageChannel } from "./message-channel.js"; export type DeliveryContext = { channel?: string; @@ -6,16 +7,20 @@ export type DeliveryContext = { accountId?: string; }; -type DeliveryContextSessionSource = { +export type DeliveryContextSessionSource = { channel?: string; lastChannel?: string; lastTo?: string; lastAccountId?: string; + deliveryContext?: DeliveryContext; }; export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined { if (!context) return undefined; - const channel = typeof context.channel === "string" ? context.channel.trim() : undefined; + const channel = + typeof context.channel === "string" + ? normalizeMessageChannel(context.channel) ?? context.channel.trim() + : undefined; const to = typeof context.to === "string" ? context.to.trim() : undefined; const accountId = normalizeAccountId(context.accountId); if (!channel && !to && !accountId) return undefined; @@ -26,15 +31,54 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon }; } +export function normalizeSessionDeliveryFields( + source?: DeliveryContextSessionSource, +): { + deliveryContext?: DeliveryContext; + lastChannel?: string; + lastTo?: string; + lastAccountId?: string; +} { + if (!source) { + return { + deliveryContext: undefined, + lastChannel: undefined, + lastTo: undefined, + lastAccountId: undefined, + }; + } + + const merged = mergeDeliveryContext( + normalizeDeliveryContext({ + channel: source.lastChannel ?? source.channel, + to: source.lastTo, + accountId: source.lastAccountId, + }), + normalizeDeliveryContext(source.deliveryContext), + ); + + if (!merged) { + return { + deliveryContext: undefined, + lastChannel: undefined, + lastTo: undefined, + lastAccountId: undefined, + }; + } + + return { + deliveryContext: merged, + lastChannel: merged.channel, + lastTo: merged.to, + lastAccountId: merged.accountId, + }; +} + export function deliveryContextFromSession( entry?: DeliveryContextSessionSource, ): DeliveryContext | undefined { if (!entry) return undefined; - return normalizeDeliveryContext({ - channel: entry.lastChannel ?? entry.channel, - to: entry.lastTo, - accountId: entry.lastAccountId, - }); + return normalizeSessionDeliveryFields(entry).deliveryContext; } export function mergeDeliveryContext(