fix: sync delivery routing context

Co-authored-by: adam91holt <adam91holt@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-01-17 06:01:30 +00:00
parent e59d8c5436
commit 285ed8bac3
10 changed files with 208 additions and 51 deletions

View File

@@ -36,6 +36,7 @@
### Fixes
- Sub-agents: normalize announce delivery origin + queue bucketing by accountId to keep multi-account routing stable. (#1061, #1058) — thanks @adam91holt.
- Gateway: honor explicit delivery targets without implicit accountId fallback; preserve lastAccountId for implicit routing.
- Gateway: avoid reusing last-to/accountId when the requested channel differs; sync deliveryContext with last route fields.
- Repo: fix oxlint config filename and move ignore pattern into config. (#1064) — thanks @connorshea.
- Messages: `/stop` now hard-aborts queued followups and sub-agent runs; suppress zero-count stop notes.
- Messages: include sender labels for live group messages across channels, matching queued/history formatting. (#1059)

View File

@@ -29,6 +29,7 @@ import { normalizeChatType } from "../../channels/chat-type.js";
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
import { formatInboundBodyWithSenderMeta } from "./inbound-sender-meta.js";
import { normalizeInboundTextNewlines } from "./inbound-text.js";
import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js";
export type SessionInitResult = {
sessionCtx: TemplateContext;
@@ -199,10 +200,20 @@ export async function initSessionState(params: {
const baseEntry = !isNewSession && freshEntry ? entry : undefined;
// Track the originating channel/to for announce routing (subagent announce-back).
const lastChannel =
const lastChannelRaw =
(ctx.OriginatingChannel as string | undefined)?.trim() || baseEntry?.lastChannel;
const lastTo = ctx.OriginatingTo?.trim() || ctx.To?.trim() || baseEntry?.lastTo;
const lastAccountId = ctx.AccountId?.trim() || baseEntry?.lastAccountId;
const lastToRaw = ctx.OriginatingTo?.trim() || ctx.To?.trim() || baseEntry?.lastTo;
const lastAccountIdRaw = ctx.AccountId?.trim() || baseEntry?.lastAccountId;
const deliveryFields = normalizeSessionDeliveryFields({
deliveryContext: {
channel: lastChannelRaw,
to: lastToRaw,
accountId: lastAccountIdRaw,
},
});
const lastChannel = deliveryFields.lastChannel ?? lastChannelRaw;
const lastTo = deliveryFields.lastTo ?? lastToRaw;
const lastAccountId = deliveryFields.lastAccountId ?? lastAccountIdRaw;
sessionEntry = {
...baseEntry,
sessionId,
@@ -227,6 +238,7 @@ export async function initSessionState(params: {
subject: baseEntry?.subject,
room: baseEntry?.room,
space: baseEntry?.space,
deliveryContext: deliveryFields.deliveryContext,
// Track originating channel for subagent announce routing.
lastChannel,
lastTo,

View File

@@ -76,6 +76,7 @@ describe("deliverAgentCommandResult", () => {
} as unknown as RuntimeEnv;
const sessionEntry = {
lastAccountId: "legacy",
lastChannel: "whatsapp",
} as SessionEntry;
const result = {
payloads: [{ text: "hi" }],
@@ -141,4 +142,40 @@ describe("deliverAgentCommandResult", () => {
expect.objectContaining({ accountId: undefined }),
);
});
it("skips session accountId when channel differs", async () => {
const cfg = {} as ClawdbotConfig;
const deps = {} as CliDeps;
const runtime = {
log: vi.fn(),
error: vi.fn(),
} as unknown as RuntimeEnv;
const sessionEntry = {
lastAccountId: "legacy",
lastChannel: "telegram",
} as SessionEntry;
const result = {
payloads: [{ text: "hi" }],
meta: {},
};
const { deliverAgentCommandResult } = await import("./agent/delivery.js");
await deliverAgentCommandResult({
cfg,
deps,
runtime,
opts: {
message: "hello",
deliver: true,
channel: "whatsapp",
},
sessionEntry,
result,
payloads: result.payloads,
});
expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith(
expect.objectContaining({ accountId: undefined, channel: "whatsapp" }),
);
});
});

View File

@@ -30,12 +30,14 @@ function resolveDeliveryAccountId(params: {
opts: AgentCommandOpts;
sessionEntry?: SessionEntry;
targetMode: ChannelOutboundTargetMode;
deliveryChannel?: string;
}) {
const sessionOrigin = deliveryContextFromSession(params.sessionEntry);
return (
normalizeAccountId(params.opts.accountId) ??
(params.targetMode === "implicit" ? normalizeAccountId(sessionOrigin?.accountId) : undefined)
);
const explicit = normalizeAccountId(params.opts.accountId);
if (explicit || params.targetMode !== "implicit") return explicit;
if (!params.deliveryChannel || isInternalMessageChannel(params.deliveryChannel)) return undefined;
if (sessionOrigin?.channel !== params.deliveryChannel) return undefined;
return normalizeAccountId(sessionOrigin?.accountId);
}
export async function deliverAgentCommandResult(params: {
@@ -61,7 +63,12 @@ export async function deliverAgentCommandResult(params: {
const targetMode: ChannelOutboundTargetMode =
opts.deliveryTargetMode ?? (opts.to ? "explicit" : "implicit");
const resolvedAccountId = resolveDeliveryAccountId({ opts, sessionEntry, targetMode });
const resolvedAccountId = resolveDeliveryAccountId({
opts,
sessionEntry,
targetMode,
deliveryChannel,
});
const resolvedTarget =
deliver && isDeliveryChannelKnown && deliveryChannel
? resolveOutboundTarget({

View File

@@ -130,6 +130,10 @@ describe("sessions", () => {
expect(store[mainSessionKey]?.updatedAt).toBeGreaterThanOrEqual(123);
expect(store[mainSessionKey]?.lastChannel).toBe("telegram");
expect(store[mainSessionKey]?.lastTo).toBe("12345");
expect(store[mainSessionKey]?.deliveryContext).toEqual({
channel: "telegram",
to: "12345",
});
expect(store[mainSessionKey]?.responseUsage).toBe("on");
expect(store[mainSessionKey]?.queueDebounceMs).toBe(1234);
expect(store[mainSessionKey]?.reasoningLevel).toBe("on");
@@ -176,6 +180,11 @@ describe("sessions", () => {
expect(store["agent:main:main"]?.lastChannel).toBe("whatsapp");
expect(store["agent:main:main"]?.lastTo).toBe("+1555");
expect(store["agent:main:main"]?.lastAccountId).toBe("acct-1");
expect(store["agent:main:main"]?.deliveryContext).toEqual({
channel: "whatsapp",
to: "+1555",
accountId: "acct-1",
});
});
it("updateSessionStore keeps deletions when concurrent writes happen", async () => {

View File

@@ -4,8 +4,7 @@ import path from "node:path";
import JSON5 from "json5";
import { getFileMtimeMs, isCacheEnabled, resolveCacheTtlMs } from "../cache-utils.js";
import { normalizeAccountId } from "../../utils/account-id.js";
import { normalizeMessageChannel } from "../../utils/message-channel.js";
import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js";
import { mergeSessionEntry, type SessionEntry } from "./types.js";
// ============================================================================
@@ -44,21 +43,23 @@ function invalidateSessionStoreCache(storePath: string): void {
}
function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry {
const normalizedLastChannel = normalizeMessageChannel(entry.lastChannel) ?? undefined;
const normalizedLastTo = typeof entry.lastTo === "string" ? entry.lastTo.trim() : undefined;
const normalizedLastAccountId = normalizeAccountId(entry.lastAccountId);
if (
normalizedLastChannel === entry.lastChannel &&
normalizedLastTo === entry.lastTo &&
normalizedLastAccountId === entry.lastAccountId
) {
return entry;
}
const normalized = normalizeSessionDeliveryFields(entry);
const nextDelivery = normalized.deliveryContext;
const sameDelivery =
(entry.deliveryContext?.channel ?? undefined) === nextDelivery?.channel &&
(entry.deliveryContext?.to ?? undefined) === nextDelivery?.to &&
(entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId;
const sameLast =
entry.lastChannel === normalized.lastChannel &&
entry.lastTo === normalized.lastTo &&
entry.lastAccountId === normalized.lastAccountId;
if (sameDelivery && sameLast) return entry;
return {
...entry,
lastChannel: normalizedLastChannel,
lastTo: normalizedLastTo || undefined,
lastAccountId: normalizedLastAccountId,
deliveryContext: nextDelivery,
lastChannel: normalized.lastChannel,
lastTo: normalized.lastTo,
lastAccountId: normalized.lastAccountId,
};
}
@@ -331,11 +332,24 @@ export async function updateLastRoute(params: {
const store = loadSessionStore(storePath);
const existing = store[sessionKey];
const now = Date.now();
const trimmedAccountId = accountId?.trim();
const resolvedAccountId =
trimmedAccountId && trimmedAccountId.length > 0
? trimmedAccountId
: existing?.lastAccountId ?? existing?.deliveryContext?.accountId;
const normalized = normalizeSessionDeliveryFields({
deliveryContext: {
channel: channel ?? existing?.lastChannel,
to,
accountId: resolvedAccountId,
},
});
const next = mergeSessionEntry(existing, {
updatedAt: Math.max(existing?.updatedAt ?? 0, now),
lastChannel: channel,
lastTo: to?.trim() ? to.trim() : undefined,
lastAccountId: accountId?.trim() ? accountId.trim() : existing?.lastAccountId,
deliveryContext: normalized.deliveryContext,
lastChannel: normalized.lastChannel ?? channel,
lastTo: normalized.lastTo ?? (to?.trim() ? to.trim() : undefined),
lastAccountId: normalized.lastAccountId ?? resolvedAccountId,
});
store[sessionKey] = next;
await saveSessionStoreUnlocked(storePath, store);

View File

@@ -2,6 +2,7 @@ import crypto from "node:crypto";
import type { Skill } from "@mariozechner/pi-coding-agent";
import type { ChannelId } from "../../channels/plugins/types.js";
import type { DeliveryContext } from "../../utils/delivery-context.js";
export type SessionScope = "per-sender" | "global";
@@ -69,6 +70,7 @@ export type SessionEntry = {
subject?: string;
room?: string;
space?: string;
deliveryContext?: DeliveryContext;
lastChannel?: SessionChannelId;
lastTo?: string;
lastAccountId?: string;

View File

@@ -9,10 +9,11 @@ import {
updateSessionStore,
} from "../../config/sessions.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
import { resolveOutboundTarget, resolveSessionDeliveryTarget } from "../../infra/outbound/targets.js";
import { defaultRuntime } from "../../runtime.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { normalizeAccountId } from "../../utils/account-id.js";
import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isDeliverableMessageChannel,
@@ -144,6 +145,7 @@ export const agentHandlers: GatewayRequestHandlers = {
const sessionId = entry?.sessionId ?? randomUUID();
const labelValue = request.label?.trim() || entry?.label;
const spawnedByValue = request.spawnedBy?.trim() || entry?.spawnedBy;
const deliveryFields = normalizeSessionDeliveryFields(entry);
const nextEntry: SessionEntry = {
sessionId,
updatedAt: now,
@@ -153,9 +155,10 @@ export const agentHandlers: GatewayRequestHandlers = {
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
skillsSnapshot: entry?.skillsSnapshot,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
lastAccountId: entry?.lastAccountId,
deliveryContext: deliveryFields.deliveryContext,
lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel,
lastTo: deliveryFields.lastTo ?? entry?.lastTo,
lastAccountId: deliveryFields.lastAccountId ?? entry?.lastAccountId,
modelOverride: entry?.modelOverride,
providerOverride: entry?.providerOverride,
label: labelValue,
@@ -198,32 +201,32 @@ export const agentHandlers: GatewayRequestHandlers = {
const runId = idem;
const wantsDelivery = request.deliver === true;
const requestedChannel = normalizeMessageChannel(request.channel) ?? "last";
const lastChannel = sessionEntry?.lastChannel;
const lastTo = typeof sessionEntry?.lastTo === "string" ? sessionEntry.lastTo.trim() : "";
const explicitTo =
typeof request.to === "string" && request.to.trim() ? request.to.trim() : undefined;
const resolvedAccountId =
normalizeAccountId(request.accountId) ??
(explicitTo ? undefined : normalizeAccountId(sessionEntry?.lastAccountId));
const wantsDelivery = request.deliver === true;
const baseDelivery = resolveSessionDeliveryTarget({
entry: sessionEntry,
requestedChannel: requestedChannel === INTERNAL_MESSAGE_CHANNEL ? "last" : requestedChannel,
explicitTo,
});
const resolvedChannel = (() => {
if (requestedChannel === INTERNAL_MESSAGE_CHANNEL) return INTERNAL_MESSAGE_CHANNEL;
if (requestedChannel === "last") {
// WebChat is not a deliverable surface. Treat it as "unset" for routing,
// so VoiceWake and CLI callers don't get stuck with deliver=false.
if (lastChannel && lastChannel !== INTERNAL_MESSAGE_CHANNEL) {
return lastChannel;
if (baseDelivery.channel && baseDelivery.channel !== INTERNAL_MESSAGE_CHANNEL) {
return baseDelivery.channel;
}
return wantsDelivery ? DEFAULT_CHAT_CHANNEL : INTERNAL_MESSAGE_CHANNEL;
}
if (isGatewayMessageChannel(requestedChannel)) return requestedChannel;
if (lastChannel && lastChannel !== INTERNAL_MESSAGE_CHANNEL) {
return lastChannel;
if (baseDelivery.channel && baseDelivery.channel !== INTERNAL_MESSAGE_CHANNEL) {
return baseDelivery.channel;
}
return wantsDelivery ? DEFAULT_CHAT_CHANNEL : INTERNAL_MESSAGE_CHANNEL;
})();
@@ -233,9 +236,19 @@ export const agentHandlers: GatewayRequestHandlers = {
: isDeliverableMessageChannel(resolvedChannel)
? "implicit"
: undefined;
let resolvedTo =
explicitTo ||
(isDeliverableMessageChannel(resolvedChannel) ? lastTo || undefined : undefined);
const resolvedAccountId =
normalizeAccountId(request.accountId) ??
(deliveryTargetMode === "implicit" && resolvedChannel === baseDelivery.lastChannel
? baseDelivery.lastAccountId
: undefined);
let resolvedTo = explicitTo;
if (
!resolvedTo &&
isDeliverableMessageChannel(resolvedChannel) &&
resolvedChannel === baseDelivery.lastChannel
) {
resolvedTo = baseDelivery.lastTo;
}
if (!resolvedTo && isDeliverableMessageChannel(resolvedChannel)) {
const cfg = cfgForAgent ?? loadConfig();
const fallback = resolveOutboundTarget({

View File

@@ -5,6 +5,7 @@ import {
deliveryContextFromSession,
mergeDeliveryContext,
normalizeDeliveryContext,
normalizeSessionDeliveryFields,
} from "./delivery-context.js";
describe("delivery context helpers", () => {
@@ -70,4 +71,21 @@ describe("delivery context helpers", () => {
accountId: undefined,
});
});
it("normalizes delivery fields and mirrors them on session entries", () => {
const normalized = normalizeSessionDeliveryFields({
deliveryContext: { channel: " Slack ", to: " channel:1 ", accountId: " acct-2 " },
lastChannel: " whatsapp ",
lastTo: " +1555 ",
});
expect(normalized.deliveryContext).toEqual({
channel: "whatsapp",
to: "+1555",
accountId: "acct-2",
});
expect(normalized.lastChannel).toBe("whatsapp");
expect(normalized.lastTo).toBe("+1555");
expect(normalized.lastAccountId).toBe("acct-2");
});
});

View File

@@ -1,4 +1,5 @@
import { normalizeAccountId } from "./account-id.js";
import { normalizeMessageChannel } from "./message-channel.js";
export type DeliveryContext = {
channel?: string;
@@ -6,16 +7,20 @@ export type DeliveryContext = {
accountId?: string;
};
type DeliveryContextSessionSource = {
export type DeliveryContextSessionSource = {
channel?: string;
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
deliveryContext?: DeliveryContext;
};
export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
if (!context) return undefined;
const channel = typeof context.channel === "string" ? context.channel.trim() : undefined;
const channel =
typeof context.channel === "string"
? normalizeMessageChannel(context.channel) ?? context.channel.trim()
: undefined;
const to = typeof context.to === "string" ? context.to.trim() : undefined;
const accountId = normalizeAccountId(context.accountId);
if (!channel && !to && !accountId) return undefined;
@@ -26,15 +31,54 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon
};
}
export function normalizeSessionDeliveryFields(
source?: DeliveryContextSessionSource,
): {
deliveryContext?: DeliveryContext;
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
} {
if (!source) {
return {
deliveryContext: undefined,
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
};
}
const merged = mergeDeliveryContext(
normalizeDeliveryContext({
channel: source.lastChannel ?? source.channel,
to: source.lastTo,
accountId: source.lastAccountId,
}),
normalizeDeliveryContext(source.deliveryContext),
);
if (!merged) {
return {
deliveryContext: undefined,
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
};
}
return {
deliveryContext: merged,
lastChannel: merged.channel,
lastTo: merged.to,
lastAccountId: merged.accountId,
};
}
export function deliveryContextFromSession(
entry?: DeliveryContextSessionSource,
): DeliveryContext | undefined {
if (!entry) return undefined;
return normalizeDeliveryContext({
channel: entry.lastChannel ?? entry.channel,
to: entry.lastTo,
accountId: entry.lastAccountId,
});
return normalizeSessionDeliveryFields(entry).deliveryContext;
}
export function mergeDeliveryContext(