fix: thread accountId through subagent announce delivery
Co-authored-by: Adam Holt <adam91holt@users.noreply.github.com>
This commit is contained in:
@@ -153,4 +153,42 @@ describe("subagent announce formatting", () => {
|
||||
);
|
||||
expect(agentSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("queues announce delivery with origin account routing", async () => {
|
||||
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
|
||||
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
|
||||
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
|
||||
sessionStore = {
|
||||
"agent:main:main": {
|
||||
sessionId: "session-456",
|
||||
lastChannel: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
lastAccountId: "kev",
|
||||
queueMode: "collect",
|
||||
queueDebounceMs: 0,
|
||||
},
|
||||
};
|
||||
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
childRunId: "run-999",
|
||||
requesterSessionKey: "main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "do thing",
|
||||
timeoutMs: 1000,
|
||||
cleanup: "keep",
|
||||
waitForCompletion: false,
|
||||
startedAt: 10,
|
||||
endedAt: 20,
|
||||
outcome: { status: "ok" },
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(true);
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
|
||||
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
|
||||
expect(call?.params?.channel).toBe("whatsapp");
|
||||
expect(call?.params?.to).toBe("+1555");
|
||||
expect(call?.params?.accountId).toBe("kev");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -16,6 +16,10 @@ import {
|
||||
} from "../auto-reply/reply/queue.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import {
|
||||
type DeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
} from "../utils/delivery-context.js";
|
||||
import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js";
|
||||
import { readLatestAssistantReply } from "./tools/agent-step.js";
|
||||
|
||||
@@ -89,9 +93,7 @@ type AnnounceQueueItem = {
|
||||
summaryLine?: string;
|
||||
enqueuedAt: number;
|
||||
sessionKey: string;
|
||||
originatingChannel?: string;
|
||||
originatingTo?: string;
|
||||
originatingAccountId?: string;
|
||||
origin?: DeliveryContext;
|
||||
};
|
||||
|
||||
type AnnounceQueueState = {
|
||||
@@ -225,9 +227,10 @@ function hasCrossChannelItems(items: AnnounceQueueItem[]): boolean {
|
||||
const keys = new Set<string>();
|
||||
let hasUnkeyed = false;
|
||||
for (const item of items) {
|
||||
const channel = item.originatingChannel;
|
||||
const to = item.originatingTo;
|
||||
const accountId = item.originatingAccountId;
|
||||
const origin = item.origin;
|
||||
const channel = origin?.channel;
|
||||
const to = origin?.to;
|
||||
const accountId = origin?.accountId;
|
||||
if (!channel && !to && !accountId) {
|
||||
hasUnkeyed = true;
|
||||
continue;
|
||||
@@ -301,14 +304,15 @@ function scheduleAnnounceDrain(key: string) {
|
||||
}
|
||||
|
||||
async function sendAnnounce(item: AnnounceQueueItem) {
|
||||
const origin = item.origin;
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
sessionKey: item.sessionKey,
|
||||
message: item.prompt,
|
||||
channel: item.originatingChannel,
|
||||
accountId: item.originatingAccountId,
|
||||
to: item.originatingTo,
|
||||
channel: origin?.channel,
|
||||
accountId: origin?.accountId,
|
||||
to: origin?.to,
|
||||
deliver: true,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
},
|
||||
@@ -377,6 +381,11 @@ async function maybeQueueSubagentAnnounce(params: {
|
||||
queueSettings.mode === "steer-backlog" ||
|
||||
queueSettings.mode === "interrupt";
|
||||
if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
|
||||
const origin = normalizeDeliveryContext({
|
||||
channel: entry?.lastChannel,
|
||||
to: entry?.lastTo,
|
||||
accountId: entry?.lastAccountId ?? params.requesterAccountId,
|
||||
});
|
||||
enqueueAnnounce(
|
||||
canonicalKey,
|
||||
{
|
||||
@@ -384,9 +393,7 @@ async function maybeQueueSubagentAnnounce(params: {
|
||||
summaryLine: params.summaryLine,
|
||||
enqueuedAt: Date.now(),
|
||||
sessionKey: canonicalKey,
|
||||
originatingChannel: entry?.lastChannel,
|
||||
originatingTo: entry?.lastTo,
|
||||
originatingAccountId: entry?.lastAccountId ?? params.requesterAccountId,
|
||||
origin,
|
||||
},
|
||||
queueSettings,
|
||||
);
|
||||
@@ -617,14 +624,18 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
}
|
||||
|
||||
// Send to main agent - it will respond in its own voice
|
||||
const directOrigin = normalizeDeliveryContext({
|
||||
channel: params.requesterChannel,
|
||||
accountId: params.requesterAccountId,
|
||||
});
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
sessionKey: params.requesterSessionKey,
|
||||
message: triggerMessage,
|
||||
deliver: true,
|
||||
channel: params.requesterChannel,
|
||||
accountId: params.requesterAccountId,
|
||||
channel: directOrigin?.channel,
|
||||
accountId: directOrigin?.accountId,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
},
|
||||
expectFinal: true,
|
||||
|
||||
@@ -52,6 +52,7 @@ describe("subagent registry persistence", () => {
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterAccountId: "acct-main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "do the thing",
|
||||
cleanup: "keep",
|
||||
@@ -61,6 +62,8 @@ describe("subagent registry persistence", () => {
|
||||
const raw = await fs.readFile(registryPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as { runs?: Record<string, unknown> };
|
||||
expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1");
|
||||
const run = parsed.runs?.["run-1"] as { requesterAccountId?: string } | undefined;
|
||||
expect(run?.requesterAccountId).toBe("acct-main");
|
||||
|
||||
// Simulate a process restart: module re-import should load persisted runs
|
||||
// and trigger the announce flow once the run resolves.
|
||||
@@ -77,12 +80,14 @@ describe("subagent registry persistence", () => {
|
||||
childSessionKey: string;
|
||||
childRunId: string;
|
||||
requesterSessionKey: string;
|
||||
requesterAccountId?: string;
|
||||
task: string;
|
||||
cleanup: string;
|
||||
label?: string;
|
||||
};
|
||||
const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams;
|
||||
expect(first.childSessionKey).toBe("agent:main:subagent:test");
|
||||
expect(first.requesterAccountId).toBe("acct-main");
|
||||
});
|
||||
|
||||
it("skips cleanup when cleanupHandled/announceHandled was persisted", async () => {
|
||||
|
||||
64
src/commands/agent.delivery.test.ts
Normal file
64
src/commands/agent.delivery.test.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import type { ClawdbotConfig } from "../config/config.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { SessionEntry } from "../config/sessions.js";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
deliverOutboundPayloads: vi.fn(async () => []),
|
||||
getChannelPlugin: vi.fn(() => ({})),
|
||||
resolveOutboundTarget: vi.fn(() => ({ ok: true as const, to: "+15551234567" })),
|
||||
}));
|
||||
|
||||
vi.mock("../channels/plugins/index.js", () => ({
|
||||
getChannelPlugin: mocks.getChannelPlugin,
|
||||
normalizeChannelId: (value: string) => value,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/deliver.js", () => ({
|
||||
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/targets.js", () => ({
|
||||
resolveOutboundTarget: mocks.resolveOutboundTarget,
|
||||
}));
|
||||
|
||||
describe("deliverAgentCommandResult", () => {
|
||||
it("prefers explicit accountId for outbound delivery", async () => {
|
||||
const cfg = {} as ClawdbotConfig;
|
||||
const deps = {} as CliDeps;
|
||||
const runtime = {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
} as unknown as RuntimeEnv;
|
||||
const sessionEntry = {
|
||||
lastAccountId: "default",
|
||||
} as SessionEntry;
|
||||
const result = {
|
||||
payloads: [{ text: "hi" }],
|
||||
meta: {},
|
||||
};
|
||||
|
||||
const { deliverAgentCommandResult } = await import("./agent/delivery.js");
|
||||
await deliverAgentCommandResult({
|
||||
cfg,
|
||||
deps,
|
||||
runtime,
|
||||
opts: {
|
||||
message: "hello",
|
||||
deliver: true,
|
||||
channel: "whatsapp",
|
||||
accountId: "kev",
|
||||
to: "+15551234567",
|
||||
},
|
||||
sessionEntry,
|
||||
result,
|
||||
payloads: result.payloads,
|
||||
});
|
||||
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ accountId: "kev" }),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
isInternalMessageChannel,
|
||||
resolveGatewayMessageChannel,
|
||||
} from "../../utils/message-channel.js";
|
||||
import { normalizeAccountId } from "../../utils/account-id.js";
|
||||
import type { AgentCommandOpts } from "./types.js";
|
||||
|
||||
type RunResult = Awaited<
|
||||
@@ -49,11 +50,8 @@ export async function deliverAgentCommandResult(params: {
|
||||
const targetMode: ChannelOutboundTargetMode =
|
||||
opts.deliveryTargetMode ?? (opts.to ? "explicit" : "implicit");
|
||||
const resolvedAccountId =
|
||||
typeof opts.accountId === "string" && opts.accountId.trim()
|
||||
? opts.accountId.trim()
|
||||
: targetMode === "implicit"
|
||||
? sessionEntry?.lastAccountId
|
||||
: undefined;
|
||||
normalizeAccountId(opts.accountId) ??
|
||||
(targetMode === "implicit" ? normalizeAccountId(sessionEntry?.lastAccountId) : undefined);
|
||||
const resolvedTarget =
|
||||
deliver && isDeliveryChannelKnown && deliveryChannel
|
||||
? resolveOutboundTarget({
|
||||
|
||||
@@ -12,6 +12,7 @@ import { registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { resolveSendPolicy } from "../../sessions/send-policy.js";
|
||||
import { normalizeAccountId } from "../../utils/account-id.js";
|
||||
import {
|
||||
INTERNAL_MESSAGE_CHANNEL,
|
||||
isDeliverableMessageChannel,
|
||||
@@ -201,9 +202,8 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
const lastChannel = sessionEntry?.lastChannel;
|
||||
const lastTo = typeof sessionEntry?.lastTo === "string" ? sessionEntry.lastTo.trim() : "";
|
||||
const resolvedAccountId =
|
||||
typeof request.accountId === "string" && request.accountId.trim()
|
||||
? request.accountId.trim()
|
||||
: sessionEntry?.lastAccountId;
|
||||
normalizeAccountId(request.accountId) ??
|
||||
normalizeAccountId(sessionEntry?.lastAccountId);
|
||||
|
||||
const wantsDelivery = request.deliver === true;
|
||||
|
||||
|
||||
@@ -107,6 +107,51 @@ describe("gateway server agent", () => {
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("agent forwards accountId to agentCommand", async () => {
|
||||
testState.allowFrom = ["+1555"];
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
|
||||
testState.sessionStorePath = path.join(dir, "sessions.json");
|
||||
await fs.writeFile(
|
||||
testState.sessionStorePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
main: {
|
||||
sessionId: "sess-main-account",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
lastAccountId: "default",
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const res = await rpcReq(ws, "agent", {
|
||||
message: "hi",
|
||||
sessionKey: "main",
|
||||
deliver: true,
|
||||
accountId: "kev",
|
||||
idempotencyKey: "idem-agent-account",
|
||||
});
|
||||
expect(res.ok).toBe(true);
|
||||
|
||||
const spy = vi.mocked(agentCommand);
|
||||
const call = spy.mock.calls.at(-1)?.[0] as Record<string, unknown>;
|
||||
expectChannels(call, "whatsapp");
|
||||
expect(call.to).toBe("+1555");
|
||||
expect(call.accountId).toBe("kev");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
testState.allowFrom = undefined;
|
||||
});
|
||||
|
||||
test("agent forwards image attachments as images[]", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
|
||||
testState.sessionStorePath = path.join(dir, "sessions.json");
|
||||
|
||||
5
src/utils/account-id.ts
Normal file
5
src/utils/account-id.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
export function normalizeAccountId(value?: string): string | undefined {
|
||||
if (typeof value !== "string") return undefined;
|
||||
const trimmed = value.trim();
|
||||
return trimmed || undefined;
|
||||
}
|
||||
21
src/utils/delivery-context.ts
Normal file
21
src/utils/delivery-context.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
import { normalizeAccountId } from "./account-id.js";
|
||||
|
||||
export type DeliveryContext = {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
};
|
||||
|
||||
export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
|
||||
if (!context) return undefined;
|
||||
const channel =
|
||||
typeof context.channel === "string" ? context.channel.trim() : undefined;
|
||||
const to = typeof context.to === "string" ? context.to.trim() : undefined;
|
||||
const accountId = normalizeAccountId(context.accountId);
|
||||
if (!channel && !to && !accountId) return undefined;
|
||||
return {
|
||||
channel: channel || undefined,
|
||||
to: to || undefined,
|
||||
accountId,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user