From 9f4b7a1683b25767fe6aa446651b93cba5f9dfbb Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 17 Jan 2026 03:57:59 +0000 Subject: [PATCH] fix: normalize subagent announce delivery origin Co-authored-by: Adam Holt --- CHANGELOG.md | 2 +- src/agents/subagent-announce.format.test.ts | 35 +++++++-- src/agents/subagent-announce.ts | 74 +++++++++---------- .../subagent-registry.persistence.test.ts | 61 +++++++++++++-- src/agents/subagent-registry.store.ts | 45 +++++++---- src/agents/subagent-registry.ts | 35 ++++----- src/agents/tools/sessions-spawn-tool.ts | 12 ++- src/commands/agent.delivery.test.ts | 40 ++++++++++ src/commands/agent/delivery.ts | 17 ++++- src/utils/delivery-context.test.ts | 45 +++++++++++ src/utils/delivery-context.ts | 20 +++++ 11 files changed, 295 insertions(+), 91 deletions(-) create mode 100644 src/utils/delivery-context.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 7123c9353..9731f62b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ - Health: add per-agent session summaries and account-level health details, and allow selective probes. (#1047) — thanks @gumadeiras. ### Fixes -- Sub-agents: route announce delivery through the correct channel account IDs. (#1061, #1058) — thanks @adam91holt. +- Sub-agents: normalize announce delivery origin + queue bucketing by accountId to keep multi-account routing stable. (#1061, #1058) — thanks @adam91holt. - Repo: fix oxlint config filename and move ignore pattern into config. (#1064) — thanks @connorshea. - Messages: `/stop` now hard-aborts queued followups and sub-agent runs; suppress zero-count stop notes. - Sessions: reset `compactionCount` on `/new` and `/reset`, and preserve `sessions.json` file mode (0600). diff --git a/src/agents/subagent-announce.format.test.ts b/src/agents/subagent-announce.format.test.ts index 5fc72d662..3b18f592f 100644 --- a/src/agents/subagent-announce.format.test.ts +++ b/src/agents/subagent-announce.format.test.ts @@ -192,7 +192,7 @@ describe("subagent announce formatting", () => { expect(call?.params?.accountId).toBe("kev"); }); - it("uses requester accountId for direct announce when not queued", async () => { + it("uses requester origin for direct announce when not queued", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); @@ -201,8 +201,7 @@ describe("subagent announce formatting", () => { childSessionKey: "agent:main:subagent:test", childRunId: "run-direct", requesterSessionKey: "agent:main:main", - requesterChannel: "whatsapp", - requesterAccountId: "acct-123", + requesterOrigin: { channel: "whatsapp", accountId: "acct-123" }, requesterDisplayKey: "main", task: "do thing", timeoutMs: 1000, @@ -219,6 +218,32 @@ describe("subagent announce formatting", () => { expect(call?.params?.accountId).toBe("acct-123"); }); + it("normalizes requesterOrigin for direct announce delivery", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-direct-origin", + requesterSessionKey: "agent:main:main", + requesterOrigin: { channel: " whatsapp ", accountId: " acct-987 " }, + requesterDisplayKey: "main", + task: "do thing", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(true); + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.channel).toBe("whatsapp"); + expect(call?.params?.accountId).toBe("acct-987"); + }); + it("splits collect-mode announces when accountId differs", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); @@ -237,7 +262,7 @@ describe("subagent announce formatting", () => { childSessionKey: "agent:main:subagent:test", childRunId: "run-a", requesterSessionKey: "main", - requesterAccountId: "acct-a", + requesterOrigin: { accountId: "acct-a" }, requesterDisplayKey: "main", task: "do thing", timeoutMs: 1000, @@ -252,7 +277,7 @@ describe("subagent announce formatting", () => { childSessionKey: "agent:main:subagent:test", childRunId: "run-b", requesterSessionKey: "main", - requesterAccountId: "acct-b", + requesterOrigin: { accountId: "acct-b" }, requesterDisplayKey: "main", task: "do thing", timeoutMs: 1000, diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index d4b302f00..b3f7dbf55 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -16,7 +16,12 @@ import { } from "../auto-reply/reply/queue.js"; import { callGateway } from "../gateway/call.js"; import { defaultRuntime } from "../runtime.js"; -import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; +import { + type DeliveryContext, + deliveryContextKey, + mergeDeliveryContext, + normalizeDeliveryContext, +} from "../utils/delivery-context.js"; import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js"; import { readLatestAssistantReply } from "./tools/agent-step.js"; @@ -224,18 +229,17 @@ function hasCrossChannelItems(items: AnnounceQueueItem[]): boolean { const keys = new Set(); let hasUnkeyed = false; for (const item of items) { - const origin = item.origin; - const channel = origin?.channel; - const to = origin?.to; - const accountId = origin?.accountId; - if (!channel && !to && !accountId) { + const origin = normalizeDeliveryContext(item.origin); + if (!origin) { hasUnkeyed = true; continue; } - if (!channel || !to) { + if (!origin.channel || !origin.to) { return true; } - keys.add([channel, to, accountId || ""].join("|")); + const key = deliveryContextKey(origin); + if (!key) return true; + keys.add(key); } if (keys.size === 0) return false; if (hasUnkeyed) return true; @@ -348,24 +352,11 @@ function loadRequesterSessionEntry(requesterSessionKey: string) { return { cfg, entry, canonicalKey }; } -function resolveAnnounceOrigin(params: { - channel?: string; - to?: string; - accountId?: string; - fallbackAccountId?: string; -}) { - return normalizeDeliveryContext({ - channel: params.channel, - to: params.to, - accountId: params.accountId ?? params.fallbackAccountId, - }); -} - async function maybeQueueSubagentAnnounce(params: { requesterSessionKey: string; triggerMessage: string; summaryLine?: string; - requesterAccountId?: string; + requesterOrigin?: DeliveryContext; }): Promise<"steered" | "queued" | "none"> { const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey); const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); @@ -391,12 +382,14 @@ async function maybeQueueSubagentAnnounce(params: { queueSettings.mode === "steer-backlog" || queueSettings.mode === "interrupt"; if (isActive && (shouldFollowup || queueSettings.mode === "steer")) { - const origin = resolveAnnounceOrigin({ - channel: entry?.lastChannel, - to: entry?.lastTo, - accountId: entry?.lastAccountId, - fallbackAccountId: params.requesterAccountId, - }); + const origin = mergeDeliveryContext( + { + channel: entry?.lastChannel, + to: entry?.lastTo, + accountId: entry?.lastAccountId, + }, + params.requesterOrigin, + ); enqueueAnnounce( canonicalKey, { @@ -469,7 +462,7 @@ async function buildSubagentStatsLine(params: { export function buildSubagentSystemPrompt(params: { requesterSessionKey?: string; - requesterChannel?: string; + requesterOrigin?: DeliveryContext; childSessionKey: string; label?: string; task?: string; @@ -510,7 +503,9 @@ export function buildSubagentSystemPrompt(params: { "## Session Context", params.label ? `- Label: ${params.label}` : undefined, params.requesterSessionKey ? `- Requester session: ${params.requesterSessionKey}.` : undefined, - params.requesterChannel ? `- Requester channel: ${params.requesterChannel}.` : undefined, + params.requesterOrigin?.channel + ? `- Requester channel: ${params.requesterOrigin.channel}.` + : undefined, `- Your session: ${params.childSessionKey}.`, "", ].filter((line): line is string => line !== undefined); @@ -526,8 +521,7 @@ export async function runSubagentAnnounceFlow(params: { childSessionKey: string; childRunId: string; requesterSessionKey: string; - requesterChannel?: string; - requesterAccountId?: string; + requesterOrigin?: DeliveryContext; requesterDisplayKey: string; task: string; timeoutMs: number; @@ -541,6 +535,7 @@ export async function runSubagentAnnounceFlow(params: { }): Promise { let didAnnounce = false; try { + const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); let reply = params.roundOneReply; let outcome: SubagentRunOutcome | undefined = params.outcome; if (!reply && params.waitForCompletion !== false) { @@ -623,7 +618,7 @@ export async function runSubagentAnnounceFlow(params: { requesterSessionKey: params.requesterSessionKey, triggerMessage, summaryLine: taskLabel, - requesterAccountId: params.requesterAccountId, + requesterOrigin, }); if (queued === "steered") { didAnnounce = true; @@ -635,10 +630,15 @@ export async function runSubagentAnnounceFlow(params: { } // Send to main agent - it will respond in its own voice - const directOrigin = resolveAnnounceOrigin({ - channel: params.requesterChannel, - accountId: params.requesterAccountId, - }); + let directOrigin = requesterOrigin; + if (!directOrigin) { + const { entry } = loadRequesterSessionEntry(params.requesterSessionKey); + directOrigin = normalizeDeliveryContext({ + channel: entry?.lastChannel ?? entry?.channel, + to: entry?.lastTo, + accountId: entry?.lastAccountId, + }); + } await callGateway({ method: "agent", params: { diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index e44957374..c3c7e4358 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -52,7 +52,7 @@ describe("subagent registry persistence", () => { runId: "run-1", childSessionKey: "agent:main:subagent:test", requesterSessionKey: "agent:main:main", - requesterAccountId: "acct-main", + requesterOrigin: { channel: " whatsapp ", accountId: " acct-main " }, requesterDisplayKey: "main", task: "do the thing", cleanup: "keep", @@ -62,8 +62,18 @@ describe("subagent registry persistence", () => { const raw = await fs.readFile(registryPath, "utf8"); const parsed = JSON.parse(raw) as { runs?: Record }; expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1"); - const run = parsed.runs?.["run-1"] as { requesterAccountId?: string } | undefined; - expect(run?.requesterAccountId).toBe("acct-main"); + const run = parsed.runs?.["run-1"] as + | { + requesterOrigin?: { channel?: string; accountId?: string }; + } + | undefined; + expect(run).toBeDefined(); + if (run) { + expect("requesterAccountId" in run).toBe(false); + expect("requesterChannel" in run).toBe(false); + } + expect(run?.requesterOrigin?.channel).toBe("whatsapp"); + expect(run?.requesterOrigin?.accountId).toBe("acct-main"); // Simulate a process restart: module re-import should load persisted runs // and trigger the announce flow once the run resolves. @@ -80,17 +90,18 @@ describe("subagent registry persistence", () => { childSessionKey: string; childRunId: string; requesterSessionKey: string; - requesterAccountId?: string; + requesterOrigin?: { channel?: string; accountId?: string }; task: string; cleanup: string; label?: string; }; const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams; expect(first.childSessionKey).toBe("agent:main:subagent:test"); - expect(first.requesterAccountId).toBe("acct-main"); + expect(first.requesterOrigin?.channel).toBe("whatsapp"); + expect(first.requesterOrigin?.accountId).toBe("acct-main"); }); - it("skips cleanup when cleanupHandled/announceHandled was persisted", async () => { + it("skips cleanup when cleanupHandled was persisted", async () => { tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-subagent-")); process.env.CLAWDBOT_STATE_DIR = tempStateDir; @@ -130,6 +141,44 @@ describe("subagent registry persistence", () => { expect(match).toBeFalsy(); }); + it("maps legacy announce fields into cleanup state", async () => { + tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-subagent-")); + process.env.CLAWDBOT_STATE_DIR = tempStateDir; + + const registryPath = path.join(tempStateDir, "subagents", "runs.json"); + const persisted = { + version: 1, + runs: { + "run-legacy": { + runId: "run-legacy", + childSessionKey: "agent:main:subagent:legacy", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "legacy announce", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + announceCompletedAt: 9, + announceHandled: true, + requesterChannel: "whatsapp", + requesterAccountId: "legacy-account", + }, + }, + }; + await fs.mkdir(path.dirname(registryPath), { recursive: true }); + await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8"); + + vi.resetModules(); + const { loadSubagentRegistryFromDisk } = await import("./subagent-registry.store.js"); + const runs = loadSubagentRegistryFromDisk(); + const entry = runs.get("run-legacy"); + expect(entry?.cleanupHandled).toBe(true); + expect(entry?.cleanupCompletedAt).toBe(9); + expect(entry?.requesterOrigin?.channel).toBe("whatsapp"); + expect(entry?.requesterOrigin?.accountId).toBe("legacy-account"); + }); + it("retries cleanup announce after a failed announce", async () => { tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-subagent-")); process.env.CLAWDBOT_STATE_DIR = tempStateDir; diff --git a/src/agents/subagent-registry.store.ts b/src/agents/subagent-registry.store.ts index 40f689368..d433deaa3 100644 --- a/src/agents/subagent-registry.store.ts +++ b/src/agents/subagent-registry.store.ts @@ -2,6 +2,7 @@ import path from "node:path"; import { STATE_DIR_CLAWDBOT } from "../config/paths.js"; import { loadJsonFile, saveJsonFile } from "../infra/json-file.js"; +import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import type { SubagentRunRecord } from "./subagent-registry.js"; export type PersistedSubagentRegistryVersion = 1; @@ -13,8 +14,13 @@ type PersistedSubagentRegistry = { const REGISTRY_VERSION = 1 as const; -type PersistedSubagentRunRecord = Omit & { - announceHandled?: boolean; +type PersistedSubagentRunRecord = SubagentRunRecord; + +type LegacySubagentRunRecord = PersistedSubagentRunRecord & { + announceCompletedAt?: unknown; + announceHandled?: unknown; + requesterChannel?: unknown; + requesterAccountId?: unknown; }; export function resolveSubagentRegistryPath(): string { @@ -32,25 +38,33 @@ export function loadSubagentRegistryFromDisk(): Map { const out = new Map(); for (const [runId, entry] of Object.entries(runsRaw)) { if (!entry || typeof entry !== "object") continue; - const typed = entry as PersistedSubagentRunRecord; + const typed = entry as LegacySubagentRunRecord; if (!typed.runId || typeof typed.runId !== "string") continue; - // Back-compat: map legacy announce fields into cleanup fields. - const announceCompletedAt = + const legacyCompletedAt = typeof typed.announceCompletedAt === "number" ? typed.announceCompletedAt : undefined; const cleanupCompletedAt = - typeof typed.cleanupCompletedAt === "number" ? typed.cleanupCompletedAt : announceCompletedAt; + typeof typed.cleanupCompletedAt === "number" ? typed.cleanupCompletedAt : legacyCompletedAt; const cleanupHandled = typeof typed.cleanupHandled === "boolean" ? typed.cleanupHandled - : Boolean(typed.announceHandled ?? announceCompletedAt ?? cleanupCompletedAt); - const announceHandled = - typeof typed.announceHandled === "boolean" - ? typed.announceHandled - : Boolean(announceCompletedAt); + : Boolean(typed.announceHandled ?? cleanupCompletedAt); + const requesterOrigin = normalizeDeliveryContext( + typed.requesterOrigin ?? { + channel: typeof typed.requesterChannel === "string" ? typed.requesterChannel : undefined, + accountId: + typeof typed.requesterAccountId === "string" ? typed.requesterAccountId : undefined, + }, + ); + const { + announceCompletedAt: _announceCompletedAt, + announceHandled: _announceHandled, + requesterChannel: _channel, + requesterAccountId: _accountId, + ...rest + } = typed; out.set(runId, { - ...typed, - announceCompletedAt, - announceHandled, + ...rest, + requesterOrigin, cleanupCompletedAt, cleanupHandled, }); @@ -62,8 +76,7 @@ export function saveSubagentRegistryToDisk(runs: Map) const pathname = resolveSubagentRegistryPath(); const serialized: Record = {}; for (const [runId, entry] of runs.entries()) { - const { announceHandled: _ignored, ...persisted } = entry; - serialized[runId] = persisted; + serialized[runId] = entry; } const out: PersistedSubagentRegistry = { version: REGISTRY_VERSION, diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 6d36bdff7..4fb225365 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -1,6 +1,10 @@ import { loadConfig } from "../config/config.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; +import { + type DeliveryContext, + normalizeDeliveryContext, +} from "../utils/delivery-context.js"; import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js"; import { loadSubagentRegistryFromDisk, @@ -12,8 +16,7 @@ export type SubagentRunRecord = { runId: string; childSessionKey: string; requesterSessionKey: string; - requesterChannel?: string; - requesterAccountId?: string; + requesterOrigin?: DeliveryContext; requesterDisplayKey: string; task: string; cleanup: "delete" | "keep"; @@ -23,10 +26,6 @@ export type SubagentRunRecord = { endedAt?: number; outcome?: SubagentRunOutcome; archiveAtMs?: number; - /** @deprecated Use cleanupCompletedAt instead */ - announceCompletedAt?: number; - /** @deprecated Use cleanupHandled instead */ - announceHandled?: boolean; cleanupCompletedAt?: number; cleanupHandled?: boolean; }; @@ -55,12 +54,12 @@ function resumeSubagentRun(runId: string) { if (typeof entry.endedAt === "number" && entry.endedAt > 0) { if (!beginSubagentCleanup(runId)) return; + const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, - requesterChannel: entry.requesterChannel, - requesterAccountId: entry.requesterAccountId, + requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, timeoutMs: 30_000, @@ -196,12 +195,12 @@ function ensureListener() { if (!beginSubagentCleanup(evt.runId)) { return; } + const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, - requesterChannel: entry.requesterChannel, - requesterAccountId: entry.requesterAccountId, + requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, timeoutMs: 30_000, @@ -238,9 +237,8 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA function beginSubagentCleanup(runId: string) { const entry = subagentRuns.get(runId); if (!entry) return false; - // Support legacy field names for backward compatibility - if (entry.cleanupCompletedAt || entry.announceCompletedAt) return false; - if (entry.cleanupHandled || entry.announceHandled) return false; + if (entry.cleanupCompletedAt) return false; + if (entry.cleanupHandled) return false; entry.cleanupHandled = true; persistSubagentRuns(); return true; @@ -250,8 +248,7 @@ export function registerSubagentRun(params: { runId: string; childSessionKey: string; requesterSessionKey: string; - requesterChannel?: string; - requesterAccountId?: string; + requesterOrigin?: DeliveryContext; requesterDisplayKey: string; task: string; cleanup: "delete" | "keep"; @@ -263,12 +260,12 @@ export function registerSubagentRun(params: { const archiveAfterMs = resolveArchiveAfterMs(cfg); const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined; const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, params.runTimeoutSeconds); + const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); subagentRuns.set(params.runId, { runId: params.runId, childSessionKey: params.childSessionKey, requesterSessionKey: params.requesterSessionKey, - requesterChannel: params.requesterChannel, - requesterAccountId: params.requesterAccountId, + requesterOrigin, requesterDisplayKey: params.requesterDisplayKey, task: params.task, cleanup: params.cleanup, @@ -318,12 +315,12 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { mutated = true; if (mutated) persistSubagentRuns(); if (!beginSubagentCleanup(runId)) return; + const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, - requesterChannel: entry.requesterChannel, - requesterAccountId: entry.requesterAccountId, + requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, timeoutMs: 30_000, diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index 3c17a8252..e743c07d1 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -9,6 +9,7 @@ import { normalizeAgentId, parseAgentSessionKey, } from "../../routing/session-key.js"; +import { normalizeDeliveryContext } from "../../utils/delivery-context.js"; import type { GatewayMessageChannel } from "../../utils/message-channel.js"; import { resolveAgentConfig } from "../agent-scope.js"; import { AGENT_LANE_SUBAGENT } from "../lanes.js"; @@ -67,6 +68,10 @@ export function createSessionsSpawnTool(opts?: { params.cleanup === "keep" || params.cleanup === "delete" ? (params.cleanup as "keep" | "delete") : "keep"; + const requesterOrigin = normalizeDeliveryContext({ + channel: opts?.agentChannel, + accountId: opts?.agentAccountId, + }); const runTimeoutSeconds = (() => { const explicit = typeof params.runTimeoutSeconds === "number" && Number.isFinite(params.runTimeoutSeconds) @@ -163,7 +168,7 @@ export function createSessionsSpawnTool(opts?: { } const childSystemPrompt = buildSubagentSystemPrompt({ requesterSessionKey, - requesterChannel: opts?.agentChannel, + requesterOrigin, childSessionKey, label: label || undefined, task, @@ -177,7 +182,7 @@ export function createSessionsSpawnTool(opts?: { params: { message: task, sessionKey: childSessionKey, - channel: opts?.agentChannel, + channel: requesterOrigin?.channel, idempotencyKey: childIdem, deliver: false, lane: AGENT_LANE_SUBAGENT, @@ -206,8 +211,7 @@ export function createSessionsSpawnTool(opts?: { runId: childRunId, childSessionKey, requesterSessionKey: requesterInternalKey, - requesterChannel: opts?.agentChannel, - requesterAccountId: opts?.agentAccountId, + requesterOrigin, requesterDisplayKey, task, cleanup, diff --git a/src/commands/agent.delivery.test.ts b/src/commands/agent.delivery.test.ts index 34d004c60..c29af2c14 100644 --- a/src/commands/agent.delivery.test.ts +++ b/src/commands/agent.delivery.test.ts @@ -101,4 +101,44 @@ describe("deliverAgentCommandResult", () => { expect.objectContaining({ accountId: "legacy" }), ); }); + + it("does not infer accountId for explicit delivery targets", async () => { + const cfg = {} as ClawdbotConfig; + const deps = {} as CliDeps; + const runtime = { + log: vi.fn(), + error: vi.fn(), + } as unknown as RuntimeEnv; + const sessionEntry = { + lastAccountId: "legacy", + } as SessionEntry; + const result = { + payloads: [{ text: "hi" }], + meta: {}, + }; + + const { deliverAgentCommandResult } = await import("./agent/delivery.js"); + await deliverAgentCommandResult({ + cfg, + deps, + runtime, + opts: { + message: "hello", + deliver: true, + channel: "whatsapp", + to: "+15551234567", + deliveryTargetMode: "explicit", + }, + sessionEntry, + result, + payloads: result.payloads, + }); + + expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith( + expect.objectContaining({ accountId: undefined, mode: "explicit" }), + ); + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ accountId: undefined }), + ); + }); }); diff --git a/src/commands/agent/delivery.ts b/src/commands/agent/delivery.ts index d2bb50137..f0bfa3314 100644 --- a/src/commands/agent/delivery.ts +++ b/src/commands/agent/delivery.ts @@ -26,6 +26,19 @@ type RunResult = Awaited< ReturnType<(typeof import("../../agents/pi-embedded.js"))["runEmbeddedPiAgent"]> >; +function resolveDeliveryAccountId(params: { + opts: AgentCommandOpts; + sessionEntry?: SessionEntry; + targetMode: ChannelOutboundTargetMode; +}) { + return ( + normalizeAccountId(params.opts.accountId) ?? + (params.targetMode === "implicit" + ? normalizeAccountId(params.sessionEntry?.lastAccountId) + : undefined) + ); +} + export async function deliverAgentCommandResult(params: { cfg: ClawdbotConfig; deps: CliDeps; @@ -49,9 +62,7 @@ export async function deliverAgentCommandResult(params: { const targetMode: ChannelOutboundTargetMode = opts.deliveryTargetMode ?? (opts.to ? "explicit" : "implicit"); - const resolvedAccountId = - normalizeAccountId(opts.accountId) ?? - (targetMode === "implicit" ? normalizeAccountId(sessionEntry?.lastAccountId) : undefined); + const resolvedAccountId = resolveDeliveryAccountId({ opts, sessionEntry, targetMode }); const resolvedTarget = deliver && isDeliveryChannelKnown && deliveryChannel ? resolveOutboundTarget({ diff --git a/src/utils/delivery-context.test.ts b/src/utils/delivery-context.test.ts new file mode 100644 index 000000000..1b7065b93 --- /dev/null +++ b/src/utils/delivery-context.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, it } from "vitest"; + +import { + deliveryContextKey, + mergeDeliveryContext, + normalizeDeliveryContext, +} from "./delivery-context.js"; + +describe("delivery context helpers", () => { + it("normalizes channel/to/accountId and drops empty contexts", () => { + expect( + normalizeDeliveryContext({ + channel: " whatsapp ", + to: " +1555 ", + accountId: " acct-1 ", + }), + ).toEqual({ + channel: "whatsapp", + to: "+1555", + accountId: "acct-1", + }); + + expect(normalizeDeliveryContext({ channel: " " })).toBeUndefined(); + }); + + it("merges primary values over fallback", () => { + const merged = mergeDeliveryContext( + { channel: "whatsapp", to: "channel:abc" }, + { channel: "slack", to: "channel:def", accountId: "acct" }, + ); + + expect(merged).toEqual({ + channel: "whatsapp", + to: "channel:abc", + accountId: "acct", + }); + }); + + it("builds stable keys only when channel and to are present", () => { + expect(deliveryContextKey({ channel: "whatsapp", to: "+1555" })).toBe( + "whatsapp|+1555|", + ); + expect(deliveryContextKey({ channel: "whatsapp" })).toBeUndefined(); + }); +}); diff --git a/src/utils/delivery-context.ts b/src/utils/delivery-context.ts index e0a4d4d6f..0cd1ceae6 100644 --- a/src/utils/delivery-context.ts +++ b/src/utils/delivery-context.ts @@ -18,3 +18,23 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon accountId, }; } + +export function mergeDeliveryContext( + primary?: DeliveryContext, + fallback?: DeliveryContext, +): DeliveryContext | undefined { + const normalizedPrimary = normalizeDeliveryContext(primary); + const normalizedFallback = normalizeDeliveryContext(fallback); + if (!normalizedPrimary && !normalizedFallback) return undefined; + return normalizeDeliveryContext({ + channel: normalizedPrimary?.channel ?? normalizedFallback?.channel, + to: normalizedPrimary?.to ?? normalizedFallback?.to, + accountId: normalizedPrimary?.accountId ?? normalizedFallback?.accountId, + }); +} + +export function deliveryContextKey(context?: DeliveryContext): string | undefined { + const normalized = normalizeDeliveryContext(context); + if (!normalized?.channel || !normalized?.to) return undefined; + return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}`; +}