diff --git a/src/agents/subagent-announce.format.test.ts b/src/agents/subagent-announce.format.test.ts index 5d7484e8a..4fc052d3b 100644 --- a/src/agents/subagent-announce.format.test.ts +++ b/src/agents/subagent-announce.format.test.ts @@ -153,4 +153,42 @@ describe("subagent announce formatting", () => { ); expect(agentSpy).not.toHaveBeenCalled(); }); + + it("queues announce delivery with origin account routing", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-456", + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: "kev", + queueMode: "collect", + queueDebounceMs: 0, + }, + }; + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-999", + requesterSessionKey: "main", + requesterDisplayKey: "main", + task: "do thing", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(true); + await new Promise((r) => setTimeout(r, 5)); + + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.channel).toBe("whatsapp"); + expect(call?.params?.to).toBe("+1555"); + expect(call?.params?.accountId).toBe("kev"); + }); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 12fc5328d..c7801610f 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -16,6 +16,10 @@ import { } from "../auto-reply/reply/queue.js"; import { callGateway } from "../gateway/call.js"; import { defaultRuntime } from "../runtime.js"; +import { + type DeliveryContext, + normalizeDeliveryContext, +} from "../utils/delivery-context.js"; import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js"; import { readLatestAssistantReply } from "./tools/agent-step.js"; @@ -89,9 +93,7 @@ type AnnounceQueueItem = { summaryLine?: string; enqueuedAt: number; sessionKey: string; - originatingChannel?: string; - originatingTo?: string; - originatingAccountId?: string; + origin?: DeliveryContext; }; type AnnounceQueueState = { @@ -225,9 +227,10 @@ function hasCrossChannelItems(items: AnnounceQueueItem[]): boolean { const keys = new Set(); let hasUnkeyed = false; for (const item of items) { - const channel = item.originatingChannel; - const to = item.originatingTo; - const accountId = item.originatingAccountId; + const origin = item.origin; + const channel = origin?.channel; + const to = origin?.to; + const accountId = origin?.accountId; if (!channel && !to && !accountId) { hasUnkeyed = true; continue; @@ -301,14 +304,15 @@ function scheduleAnnounceDrain(key: string) { } async function sendAnnounce(item: AnnounceQueueItem) { + const origin = item.origin; await callGateway({ method: "agent", params: { sessionKey: item.sessionKey, message: item.prompt, - channel: item.originatingChannel, - accountId: item.originatingAccountId, - to: item.originatingTo, + channel: origin?.channel, + accountId: origin?.accountId, + to: origin?.to, deliver: true, idempotencyKey: crypto.randomUUID(), }, @@ -377,6 +381,11 @@ async function maybeQueueSubagentAnnounce(params: { queueSettings.mode === "steer-backlog" || queueSettings.mode === "interrupt"; if (isActive && (shouldFollowup || queueSettings.mode === "steer")) { + const origin = normalizeDeliveryContext({ + channel: entry?.lastChannel, + to: entry?.lastTo, + accountId: entry?.lastAccountId ?? params.requesterAccountId, + }); enqueueAnnounce( canonicalKey, { @@ -384,9 +393,7 @@ async function maybeQueueSubagentAnnounce(params: { summaryLine: params.summaryLine, enqueuedAt: Date.now(), sessionKey: canonicalKey, - originatingChannel: entry?.lastChannel, - originatingTo: entry?.lastTo, - originatingAccountId: entry?.lastAccountId ?? params.requesterAccountId, + origin, }, queueSettings, ); @@ -617,14 +624,18 @@ export async function runSubagentAnnounceFlow(params: { } // Send to main agent - it will respond in its own voice + const directOrigin = normalizeDeliveryContext({ + channel: params.requesterChannel, + accountId: params.requesterAccountId, + }); await callGateway({ method: "agent", params: { sessionKey: params.requesterSessionKey, message: triggerMessage, deliver: true, - channel: params.requesterChannel, - accountId: params.requesterAccountId, + channel: directOrigin?.channel, + accountId: directOrigin?.accountId, idempotencyKey: crypto.randomUUID(), }, expectFinal: true, diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index 27d145ba6..e44957374 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -52,6 +52,7 @@ describe("subagent registry persistence", () => { runId: "run-1", childSessionKey: "agent:main:subagent:test", requesterSessionKey: "agent:main:main", + requesterAccountId: "acct-main", requesterDisplayKey: "main", task: "do the thing", cleanup: "keep", @@ -61,6 +62,8 @@ describe("subagent registry persistence", () => { const raw = await fs.readFile(registryPath, "utf8"); const parsed = JSON.parse(raw) as { runs?: Record }; expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1"); + const run = parsed.runs?.["run-1"] as { requesterAccountId?: string } | undefined; + expect(run?.requesterAccountId).toBe("acct-main"); // Simulate a process restart: module re-import should load persisted runs // and trigger the announce flow once the run resolves. @@ -77,12 +80,14 @@ describe("subagent registry persistence", () => { childSessionKey: string; childRunId: string; requesterSessionKey: string; + requesterAccountId?: string; task: string; cleanup: string; label?: string; }; const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams; expect(first.childSessionKey).toBe("agent:main:subagent:test"); + expect(first.requesterAccountId).toBe("acct-main"); }); it("skips cleanup when cleanupHandled/announceHandled was persisted", async () => { diff --git a/src/commands/agent.delivery.test.ts b/src/commands/agent.delivery.test.ts new file mode 100644 index 000000000..8f5aecd2a --- /dev/null +++ b/src/commands/agent.delivery.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { CliDeps } from "../cli/deps.js"; +import type { ClawdbotConfig } from "../config/config.js"; +import type { RuntimeEnv } from "../runtime.js"; +import type { SessionEntry } from "../config/sessions.js"; + +const mocks = vi.hoisted(() => ({ + deliverOutboundPayloads: vi.fn(async () => []), + getChannelPlugin: vi.fn(() => ({})), + resolveOutboundTarget: vi.fn(() => ({ ok: true as const, to: "+15551234567" })), +})); + +vi.mock("../channels/plugins/index.js", () => ({ + getChannelPlugin: mocks.getChannelPlugin, + normalizeChannelId: (value: string) => value, +})); + +vi.mock("../infra/outbound/deliver.js", () => ({ + deliverOutboundPayloads: mocks.deliverOutboundPayloads, +})); + +vi.mock("../infra/outbound/targets.js", () => ({ + resolveOutboundTarget: mocks.resolveOutboundTarget, +})); + +describe("deliverAgentCommandResult", () => { + it("prefers explicit accountId for outbound delivery", async () => { + const cfg = {} as ClawdbotConfig; + const deps = {} as CliDeps; + const runtime = { + log: vi.fn(), + error: vi.fn(), + } as unknown as RuntimeEnv; + const sessionEntry = { + lastAccountId: "default", + } as SessionEntry; + const result = { + payloads: [{ text: "hi" }], + meta: {}, + }; + + const { deliverAgentCommandResult } = await import("./agent/delivery.js"); + await deliverAgentCommandResult({ + cfg, + deps, + runtime, + opts: { + message: "hello", + deliver: true, + channel: "whatsapp", + accountId: "kev", + to: "+15551234567", + }, + sessionEntry, + result, + payloads: result.payloads, + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ accountId: "kev" }), + ); + }); +}); diff --git a/src/commands/agent/delivery.ts b/src/commands/agent/delivery.ts index 580e3222d..d2bb50137 100644 --- a/src/commands/agent/delivery.ts +++ b/src/commands/agent/delivery.ts @@ -19,6 +19,7 @@ import { isInternalMessageChannel, resolveGatewayMessageChannel, } from "../../utils/message-channel.js"; +import { normalizeAccountId } from "../../utils/account-id.js"; import type { AgentCommandOpts } from "./types.js"; type RunResult = Awaited< @@ -49,11 +50,8 @@ export async function deliverAgentCommandResult(params: { const targetMode: ChannelOutboundTargetMode = opts.deliveryTargetMode ?? (opts.to ? "explicit" : "implicit"); const resolvedAccountId = - typeof opts.accountId === "string" && opts.accountId.trim() - ? opts.accountId.trim() - : targetMode === "implicit" - ? sessionEntry?.lastAccountId - : undefined; + normalizeAccountId(opts.accountId) ?? + (targetMode === "implicit" ? normalizeAccountId(sessionEntry?.lastAccountId) : undefined); const resolvedTarget = deliver && isDeliveryChannelKnown && deliveryChannel ? resolveOutboundTarget({ diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 63452a584..5f6068f83 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -12,6 +12,7 @@ import { registerAgentRunContext } from "../../infra/agent-events.js"; import { resolveOutboundTarget } from "../../infra/outbound/targets.js"; import { defaultRuntime } from "../../runtime.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; +import { normalizeAccountId } from "../../utils/account-id.js"; import { INTERNAL_MESSAGE_CHANNEL, isDeliverableMessageChannel, @@ -201,9 +202,8 @@ export const agentHandlers: GatewayRequestHandlers = { const lastChannel = sessionEntry?.lastChannel; const lastTo = typeof sessionEntry?.lastTo === "string" ? sessionEntry.lastTo.trim() : ""; const resolvedAccountId = - typeof request.accountId === "string" && request.accountId.trim() - ? request.accountId.trim() - : sessionEntry?.lastAccountId; + normalizeAccountId(request.accountId) ?? + normalizeAccountId(sessionEntry?.lastAccountId); const wantsDelivery = request.deliver === true; diff --git a/src/gateway/server.agent.gateway-server-agent-a.test.ts b/src/gateway/server.agent.gateway-server-agent-a.test.ts index e53b80b97..bc0f75735 100644 --- a/src/gateway/server.agent.gateway-server-agent-a.test.ts +++ b/src/gateway/server.agent.gateway-server-agent-a.test.ts @@ -107,6 +107,51 @@ describe("gateway server agent", () => { await server.close(); }); + test("agent forwards accountId to agentCommand", async () => { + testState.allowFrom = ["+1555"]; + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-")); + testState.sessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testState.sessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main-account", + updatedAt: Date.now(), + lastChannel: "whatsapp", + lastTo: "+1555", + lastAccountId: "default", + }, + }, + null, + 2, + ), + "utf-8", + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const res = await rpcReq(ws, "agent", { + message: "hi", + sessionKey: "main", + deliver: true, + accountId: "kev", + idempotencyKey: "idem-agent-account", + }); + expect(res.ok).toBe(true); + + const spy = vi.mocked(agentCommand); + const call = spy.mock.calls.at(-1)?.[0] as Record; + expectChannels(call, "whatsapp"); + expect(call.to).toBe("+1555"); + expect(call.accountId).toBe("kev"); + + ws.close(); + await server.close(); + testState.allowFrom = undefined; + }); + test("agent forwards image attachments as images[]", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-")); testState.sessionStorePath = path.join(dir, "sessions.json"); diff --git a/src/utils/account-id.ts b/src/utils/account-id.ts new file mode 100644 index 000000000..e8992cfd3 --- /dev/null +++ b/src/utils/account-id.ts @@ -0,0 +1,5 @@ +export function normalizeAccountId(value?: string): string | undefined { + if (typeof value !== "string") return undefined; + const trimmed = value.trim(); + return trimmed || undefined; +} diff --git a/src/utils/delivery-context.ts b/src/utils/delivery-context.ts new file mode 100644 index 000000000..a3a1e3956 --- /dev/null +++ b/src/utils/delivery-context.ts @@ -0,0 +1,21 @@ +import { normalizeAccountId } from "./account-id.js"; + +export type DeliveryContext = { + channel?: string; + to?: string; + accountId?: string; +}; + +export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined { + if (!context) return undefined; + const channel = + typeof context.channel === "string" ? context.channel.trim() : undefined; + const to = typeof context.to === "string" ? context.to.trim() : undefined; + const accountId = normalizeAccountId(context.accountId); + if (!channel && !to && !accountId) return undefined; + return { + channel: channel || undefined, + to: to || undefined, + accountId, + }; +}