fix: normalize subagent announce delivery origin

Co-authored-by: Adam Holt <mail@adamholt.co.nz>
This commit is contained in:
Peter Steinberger
2026-01-17 03:57:59 +00:00
parent dd68faef23
commit 9f4b7a1683
11 changed files with 295 additions and 91 deletions

View File

@@ -29,7 +29,7 @@
- Health: add per-agent session summaries and account-level health details, and allow selective probes. (#1047) — thanks @gumadeiras. - Health: add per-agent session summaries and account-level health details, and allow selective probes. (#1047) — thanks @gumadeiras.
### Fixes ### Fixes
- Sub-agents: route announce delivery through the correct channel account IDs. (#1061, #1058) — thanks @adam91holt. - Sub-agents: normalize announce delivery origin + queue bucketing by accountId to keep multi-account routing stable. (#1061, #1058) — thanks @adam91holt.
- Repo: fix oxlint config filename and move ignore pattern into config. (#1064) — thanks @connorshea. - Repo: fix oxlint config filename and move ignore pattern into config. (#1064) — thanks @connorshea.
- Messages: `/stop` now hard-aborts queued followups and sub-agent runs; suppress zero-count stop notes. - Messages: `/stop` now hard-aborts queued followups and sub-agent runs; suppress zero-count stop notes.
- Sessions: reset `compactionCount` on `/new` and `/reset`, and preserve `sessions.json` file mode (0600). - Sessions: reset `compactionCount` on `/new` and `/reset`, and preserve `sessions.json` file mode (0600).

View File

@@ -192,7 +192,7 @@ describe("subagent announce formatting", () => {
expect(call?.params?.accountId).toBe("kev"); expect(call?.params?.accountId).toBe("kev");
}); });
it("uses requester accountId for direct announce when not queued", async () => { it("uses requester origin for direct announce when not queued", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
@@ -201,8 +201,7 @@ describe("subagent announce formatting", () => {
childSessionKey: "agent:main:subagent:test", childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct", childRunId: "run-direct",
requesterSessionKey: "agent:main:main", requesterSessionKey: "agent:main:main",
requesterChannel: "whatsapp", requesterOrigin: { channel: "whatsapp", accountId: "acct-123" },
requesterAccountId: "acct-123",
requesterDisplayKey: "main", requesterDisplayKey: "main",
task: "do thing", task: "do thing",
timeoutMs: 1000, timeoutMs: 1000,
@@ -219,6 +218,32 @@ describe("subagent announce formatting", () => {
expect(call?.params?.accountId).toBe("acct-123"); expect(call?.params?.accountId).toBe("acct-123");
}); });
it("normalizes requesterOrigin for direct announce delivery", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-origin",
requesterSessionKey: "agent:main:main",
requesterOrigin: { channel: " whatsapp ", accountId: " acct-987 " },
requesterDisplayKey: "main",
task: "do thing",
timeoutMs: 1000,
cleanup: "keep",
waitForCompletion: false,
startedAt: 10,
endedAt: 20,
outcome: { status: "ok" },
});
expect(didAnnounce).toBe(true);
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("whatsapp");
expect(call?.params?.accountId).toBe("acct-987");
});
it("splits collect-mode announces when accountId differs", async () => { it("splits collect-mode announces when accountId differs", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
@@ -237,7 +262,7 @@ describe("subagent announce formatting", () => {
childSessionKey: "agent:main:subagent:test", childSessionKey: "agent:main:subagent:test",
childRunId: "run-a", childRunId: "run-a",
requesterSessionKey: "main", requesterSessionKey: "main",
requesterAccountId: "acct-a", requesterOrigin: { accountId: "acct-a" },
requesterDisplayKey: "main", requesterDisplayKey: "main",
task: "do thing", task: "do thing",
timeoutMs: 1000, timeoutMs: 1000,
@@ -252,7 +277,7 @@ describe("subagent announce formatting", () => {
childSessionKey: "agent:main:subagent:test", childSessionKey: "agent:main:subagent:test",
childRunId: "run-b", childRunId: "run-b",
requesterSessionKey: "main", requesterSessionKey: "main",
requesterAccountId: "acct-b", requesterOrigin: { accountId: "acct-b" },
requesterDisplayKey: "main", requesterDisplayKey: "main",
task: "do thing", task: "do thing",
timeoutMs: 1000, timeoutMs: 1000,

View File

@@ -16,7 +16,12 @@ import {
} from "../auto-reply/reply/queue.js"; } from "../auto-reply/reply/queue.js";
import { callGateway } from "../gateway/call.js"; import { callGateway } from "../gateway/call.js";
import { defaultRuntime } from "../runtime.js"; import { defaultRuntime } from "../runtime.js";
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; import {
type DeliveryContext,
deliveryContextKey,
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js"; import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js";
import { readLatestAssistantReply } from "./tools/agent-step.js"; import { readLatestAssistantReply } from "./tools/agent-step.js";
@@ -224,18 +229,17 @@ function hasCrossChannelItems(items: AnnounceQueueItem[]): boolean {
const keys = new Set<string>(); const keys = new Set<string>();
let hasUnkeyed = false; let hasUnkeyed = false;
for (const item of items) { for (const item of items) {
const origin = item.origin; const origin = normalizeDeliveryContext(item.origin);
const channel = origin?.channel; if (!origin) {
const to = origin?.to;
const accountId = origin?.accountId;
if (!channel && !to && !accountId) {
hasUnkeyed = true; hasUnkeyed = true;
continue; continue;
} }
if (!channel || !to) { if (!origin.channel || !origin.to) {
return true; return true;
} }
keys.add([channel, to, accountId || ""].join("|")); const key = deliveryContextKey(origin);
if (!key) return true;
keys.add(key);
} }
if (keys.size === 0) return false; if (keys.size === 0) return false;
if (hasUnkeyed) return true; if (hasUnkeyed) return true;
@@ -348,24 +352,11 @@ function loadRequesterSessionEntry(requesterSessionKey: string) {
return { cfg, entry, canonicalKey }; return { cfg, entry, canonicalKey };
} }
function resolveAnnounceOrigin(params: {
channel?: string;
to?: string;
accountId?: string;
fallbackAccountId?: string;
}) {
return normalizeDeliveryContext({
channel: params.channel,
to: params.to,
accountId: params.accountId ?? params.fallbackAccountId,
});
}
async function maybeQueueSubagentAnnounce(params: { async function maybeQueueSubagentAnnounce(params: {
requesterSessionKey: string; requesterSessionKey: string;
triggerMessage: string; triggerMessage: string;
summaryLine?: string; summaryLine?: string;
requesterAccountId?: string; requesterOrigin?: DeliveryContext;
}): Promise<"steered" | "queued" | "none"> { }): Promise<"steered" | "queued" | "none"> {
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey); const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
@@ -391,12 +382,14 @@ async function maybeQueueSubagentAnnounce(params: {
queueSettings.mode === "steer-backlog" || queueSettings.mode === "steer-backlog" ||
queueSettings.mode === "interrupt"; queueSettings.mode === "interrupt";
if (isActive && (shouldFollowup || queueSettings.mode === "steer")) { if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
const origin = resolveAnnounceOrigin({ const origin = mergeDeliveryContext(
channel: entry?.lastChannel, {
to: entry?.lastTo, channel: entry?.lastChannel,
accountId: entry?.lastAccountId, to: entry?.lastTo,
fallbackAccountId: params.requesterAccountId, accountId: entry?.lastAccountId,
}); },
params.requesterOrigin,
);
enqueueAnnounce( enqueueAnnounce(
canonicalKey, canonicalKey,
{ {
@@ -469,7 +462,7 @@ async function buildSubagentStatsLine(params: {
export function buildSubagentSystemPrompt(params: { export function buildSubagentSystemPrompt(params: {
requesterSessionKey?: string; requesterSessionKey?: string;
requesterChannel?: string; requesterOrigin?: DeliveryContext;
childSessionKey: string; childSessionKey: string;
label?: string; label?: string;
task?: string; task?: string;
@@ -510,7 +503,9 @@ export function buildSubagentSystemPrompt(params: {
"## Session Context", "## Session Context",
params.label ? `- Label: ${params.label}` : undefined, params.label ? `- Label: ${params.label}` : undefined,
params.requesterSessionKey ? `- Requester session: ${params.requesterSessionKey}.` : undefined, params.requesterSessionKey ? `- Requester session: ${params.requesterSessionKey}.` : undefined,
params.requesterChannel ? `- Requester channel: ${params.requesterChannel}.` : undefined, params.requesterOrigin?.channel
? `- Requester channel: ${params.requesterOrigin.channel}.`
: undefined,
`- Your session: ${params.childSessionKey}.`, `- Your session: ${params.childSessionKey}.`,
"", "",
].filter((line): line is string => line !== undefined); ].filter((line): line is string => line !== undefined);
@@ -526,8 +521,7 @@ export async function runSubagentAnnounceFlow(params: {
childSessionKey: string; childSessionKey: string;
childRunId: string; childRunId: string;
requesterSessionKey: string; requesterSessionKey: string;
requesterChannel?: string; requesterOrigin?: DeliveryContext;
requesterAccountId?: string;
requesterDisplayKey: string; requesterDisplayKey: string;
task: string; task: string;
timeoutMs: number; timeoutMs: number;
@@ -541,6 +535,7 @@ export async function runSubagentAnnounceFlow(params: {
}): Promise<boolean> { }): Promise<boolean> {
let didAnnounce = false; let didAnnounce = false;
try { try {
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
let reply = params.roundOneReply; let reply = params.roundOneReply;
let outcome: SubagentRunOutcome | undefined = params.outcome; let outcome: SubagentRunOutcome | undefined = params.outcome;
if (!reply && params.waitForCompletion !== false) { if (!reply && params.waitForCompletion !== false) {
@@ -623,7 +618,7 @@ export async function runSubagentAnnounceFlow(params: {
requesterSessionKey: params.requesterSessionKey, requesterSessionKey: params.requesterSessionKey,
triggerMessage, triggerMessage,
summaryLine: taskLabel, summaryLine: taskLabel,
requesterAccountId: params.requesterAccountId, requesterOrigin,
}); });
if (queued === "steered") { if (queued === "steered") {
didAnnounce = true; didAnnounce = true;
@@ -635,10 +630,15 @@ export async function runSubagentAnnounceFlow(params: {
} }
// Send to main agent - it will respond in its own voice // Send to main agent - it will respond in its own voice
const directOrigin = resolveAnnounceOrigin({ let directOrigin = requesterOrigin;
channel: params.requesterChannel, if (!directOrigin) {
accountId: params.requesterAccountId, const { entry } = loadRequesterSessionEntry(params.requesterSessionKey);
}); directOrigin = normalizeDeliveryContext({
channel: entry?.lastChannel ?? entry?.channel,
to: entry?.lastTo,
accountId: entry?.lastAccountId,
});
}
await callGateway({ await callGateway({
method: "agent", method: "agent",
params: { params: {

View File

@@ -52,7 +52,7 @@ describe("subagent registry persistence", () => {
runId: "run-1", runId: "run-1",
childSessionKey: "agent:main:subagent:test", childSessionKey: "agent:main:subagent:test",
requesterSessionKey: "agent:main:main", requesterSessionKey: "agent:main:main",
requesterAccountId: "acct-main", requesterOrigin: { channel: " whatsapp ", accountId: " acct-main " },
requesterDisplayKey: "main", requesterDisplayKey: "main",
task: "do the thing", task: "do the thing",
cleanup: "keep", cleanup: "keep",
@@ -62,8 +62,18 @@ describe("subagent registry persistence", () => {
const raw = await fs.readFile(registryPath, "utf8"); const raw = await fs.readFile(registryPath, "utf8");
const parsed = JSON.parse(raw) as { runs?: Record<string, unknown> }; const parsed = JSON.parse(raw) as { runs?: Record<string, unknown> };
expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1"); expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1");
const run = parsed.runs?.["run-1"] as { requesterAccountId?: string } | undefined; const run = parsed.runs?.["run-1"] as
expect(run?.requesterAccountId).toBe("acct-main"); | {
requesterOrigin?: { channel?: string; accountId?: string };
}
| undefined;
expect(run).toBeDefined();
if (run) {
expect("requesterAccountId" in run).toBe(false);
expect("requesterChannel" in run).toBe(false);
}
expect(run?.requesterOrigin?.channel).toBe("whatsapp");
expect(run?.requesterOrigin?.accountId).toBe("acct-main");
// Simulate a process restart: module re-import should load persisted runs // Simulate a process restart: module re-import should load persisted runs
// and trigger the announce flow once the run resolves. // and trigger the announce flow once the run resolves.
@@ -80,17 +90,18 @@ describe("subagent registry persistence", () => {
childSessionKey: string; childSessionKey: string;
childRunId: string; childRunId: string;
requesterSessionKey: string; requesterSessionKey: string;
requesterAccountId?: string; requesterOrigin?: { channel?: string; accountId?: string };
task: string; task: string;
cleanup: string; cleanup: string;
label?: string; label?: string;
}; };
const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams; const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams;
expect(first.childSessionKey).toBe("agent:main:subagent:test"); expect(first.childSessionKey).toBe("agent:main:subagent:test");
expect(first.requesterAccountId).toBe("acct-main"); expect(first.requesterOrigin?.channel).toBe("whatsapp");
expect(first.requesterOrigin?.accountId).toBe("acct-main");
}); });
it("skips cleanup when cleanupHandled/announceHandled was persisted", async () => { it("skips cleanup when cleanupHandled was persisted", async () => {
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-subagent-")); tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-subagent-"));
process.env.CLAWDBOT_STATE_DIR = tempStateDir; process.env.CLAWDBOT_STATE_DIR = tempStateDir;
@@ -130,6 +141,44 @@ describe("subagent registry persistence", () => {
expect(match).toBeFalsy(); expect(match).toBeFalsy();
}); });
it("maps legacy announce fields into cleanup state", async () => {
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-subagent-"));
process.env.CLAWDBOT_STATE_DIR = tempStateDir;
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
const persisted = {
version: 1,
runs: {
"run-legacy": {
runId: "run-legacy",
childSessionKey: "agent:main:subagent:legacy",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "legacy announce",
cleanup: "keep",
createdAt: 1,
startedAt: 1,
endedAt: 2,
announceCompletedAt: 9,
announceHandled: true,
requesterChannel: "whatsapp",
requesterAccountId: "legacy-account",
},
},
};
await fs.mkdir(path.dirname(registryPath), { recursive: true });
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
vi.resetModules();
const { loadSubagentRegistryFromDisk } = await import("./subagent-registry.store.js");
const runs = loadSubagentRegistryFromDisk();
const entry = runs.get("run-legacy");
expect(entry?.cleanupHandled).toBe(true);
expect(entry?.cleanupCompletedAt).toBe(9);
expect(entry?.requesterOrigin?.channel).toBe("whatsapp");
expect(entry?.requesterOrigin?.accountId).toBe("legacy-account");
});
it("retries cleanup announce after a failed announce", async () => { it("retries cleanup announce after a failed announce", async () => {
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-subagent-")); tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-subagent-"));
process.env.CLAWDBOT_STATE_DIR = tempStateDir; process.env.CLAWDBOT_STATE_DIR = tempStateDir;

View File

@@ -2,6 +2,7 @@ import path from "node:path";
import { STATE_DIR_CLAWDBOT } from "../config/paths.js"; import { STATE_DIR_CLAWDBOT } from "../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js"; import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import type { SubagentRunRecord } from "./subagent-registry.js"; import type { SubagentRunRecord } from "./subagent-registry.js";
export type PersistedSubagentRegistryVersion = 1; export type PersistedSubagentRegistryVersion = 1;
@@ -13,8 +14,13 @@ type PersistedSubagentRegistry = {
const REGISTRY_VERSION = 1 as const; const REGISTRY_VERSION = 1 as const;
type PersistedSubagentRunRecord = Omit<SubagentRunRecord, "announceHandled"> & { type PersistedSubagentRunRecord = SubagentRunRecord;
announceHandled?: boolean;
type LegacySubagentRunRecord = PersistedSubagentRunRecord & {
announceCompletedAt?: unknown;
announceHandled?: unknown;
requesterChannel?: unknown;
requesterAccountId?: unknown;
}; };
export function resolveSubagentRegistryPath(): string { export function resolveSubagentRegistryPath(): string {
@@ -32,25 +38,33 @@ export function loadSubagentRegistryFromDisk(): Map<string, SubagentRunRecord> {
const out = new Map<string, SubagentRunRecord>(); const out = new Map<string, SubagentRunRecord>();
for (const [runId, entry] of Object.entries(runsRaw)) { for (const [runId, entry] of Object.entries(runsRaw)) {
if (!entry || typeof entry !== "object") continue; if (!entry || typeof entry !== "object") continue;
const typed = entry as PersistedSubagentRunRecord; const typed = entry as LegacySubagentRunRecord;
if (!typed.runId || typeof typed.runId !== "string") continue; if (!typed.runId || typeof typed.runId !== "string") continue;
// Back-compat: map legacy announce fields into cleanup fields. const legacyCompletedAt =
const announceCompletedAt =
typeof typed.announceCompletedAt === "number" ? typed.announceCompletedAt : undefined; typeof typed.announceCompletedAt === "number" ? typed.announceCompletedAt : undefined;
const cleanupCompletedAt = const cleanupCompletedAt =
typeof typed.cleanupCompletedAt === "number" ? typed.cleanupCompletedAt : announceCompletedAt; typeof typed.cleanupCompletedAt === "number" ? typed.cleanupCompletedAt : legacyCompletedAt;
const cleanupHandled = const cleanupHandled =
typeof typed.cleanupHandled === "boolean" typeof typed.cleanupHandled === "boolean"
? typed.cleanupHandled ? typed.cleanupHandled
: Boolean(typed.announceHandled ?? announceCompletedAt ?? cleanupCompletedAt); : Boolean(typed.announceHandled ?? cleanupCompletedAt);
const announceHandled = const requesterOrigin = normalizeDeliveryContext(
typeof typed.announceHandled === "boolean" typed.requesterOrigin ?? {
? typed.announceHandled channel: typeof typed.requesterChannel === "string" ? typed.requesterChannel : undefined,
: Boolean(announceCompletedAt); accountId:
typeof typed.requesterAccountId === "string" ? typed.requesterAccountId : undefined,
},
);
const {
announceCompletedAt: _announceCompletedAt,
announceHandled: _announceHandled,
requesterChannel: _channel,
requesterAccountId: _accountId,
...rest
} = typed;
out.set(runId, { out.set(runId, {
...typed, ...rest,
announceCompletedAt, requesterOrigin,
announceHandled,
cleanupCompletedAt, cleanupCompletedAt,
cleanupHandled, cleanupHandled,
}); });
@@ -62,8 +76,7 @@ export function saveSubagentRegistryToDisk(runs: Map<string, SubagentRunRecord>)
const pathname = resolveSubagentRegistryPath(); const pathname = resolveSubagentRegistryPath();
const serialized: Record<string, PersistedSubagentRunRecord> = {}; const serialized: Record<string, PersistedSubagentRunRecord> = {};
for (const [runId, entry] of runs.entries()) { for (const [runId, entry] of runs.entries()) {
const { announceHandled: _ignored, ...persisted } = entry; serialized[runId] = entry;
serialized[runId] = persisted;
} }
const out: PersistedSubagentRegistry = { const out: PersistedSubagentRegistry = {
version: REGISTRY_VERSION, version: REGISTRY_VERSION,

View File

@@ -1,6 +1,10 @@
import { loadConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js";
import { callGateway } from "../gateway/call.js"; import { callGateway } from "../gateway/call.js";
import { onAgentEvent } from "../infra/agent-events.js"; import { onAgentEvent } from "../infra/agent-events.js";
import {
type DeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js"; import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js";
import { import {
loadSubagentRegistryFromDisk, loadSubagentRegistryFromDisk,
@@ -12,8 +16,7 @@ export type SubagentRunRecord = {
runId: string; runId: string;
childSessionKey: string; childSessionKey: string;
requesterSessionKey: string; requesterSessionKey: string;
requesterChannel?: string; requesterOrigin?: DeliveryContext;
requesterAccountId?: string;
requesterDisplayKey: string; requesterDisplayKey: string;
task: string; task: string;
cleanup: "delete" | "keep"; cleanup: "delete" | "keep";
@@ -23,10 +26,6 @@ export type SubagentRunRecord = {
endedAt?: number; endedAt?: number;
outcome?: SubagentRunOutcome; outcome?: SubagentRunOutcome;
archiveAtMs?: number; archiveAtMs?: number;
/** @deprecated Use cleanupCompletedAt instead */
announceCompletedAt?: number;
/** @deprecated Use cleanupHandled instead */
announceHandled?: boolean;
cleanupCompletedAt?: number; cleanupCompletedAt?: number;
cleanupHandled?: boolean; cleanupHandled?: boolean;
}; };
@@ -55,12 +54,12 @@ function resumeSubagentRun(runId: string) {
if (typeof entry.endedAt === "number" && entry.endedAt > 0) { if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
if (!beginSubagentCleanup(runId)) return; if (!beginSubagentCleanup(runId)) return;
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
void runSubagentAnnounceFlow({ void runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey, childSessionKey: entry.childSessionKey,
childRunId: entry.runId, childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey, requesterSessionKey: entry.requesterSessionKey,
requesterChannel: entry.requesterChannel, requesterOrigin,
requesterAccountId: entry.requesterAccountId,
requesterDisplayKey: entry.requesterDisplayKey, requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task, task: entry.task,
timeoutMs: 30_000, timeoutMs: 30_000,
@@ -196,12 +195,12 @@ function ensureListener() {
if (!beginSubagentCleanup(evt.runId)) { if (!beginSubagentCleanup(evt.runId)) {
return; return;
} }
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
void runSubagentAnnounceFlow({ void runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey, childSessionKey: entry.childSessionKey,
childRunId: entry.runId, childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey, requesterSessionKey: entry.requesterSessionKey,
requesterChannel: entry.requesterChannel, requesterOrigin,
requesterAccountId: entry.requesterAccountId,
requesterDisplayKey: entry.requesterDisplayKey, requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task, task: entry.task,
timeoutMs: 30_000, timeoutMs: 30_000,
@@ -238,9 +237,8 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA
function beginSubagentCleanup(runId: string) { function beginSubagentCleanup(runId: string) {
const entry = subagentRuns.get(runId); const entry = subagentRuns.get(runId);
if (!entry) return false; if (!entry) return false;
// Support legacy field names for backward compatibility if (entry.cleanupCompletedAt) return false;
if (entry.cleanupCompletedAt || entry.announceCompletedAt) return false; if (entry.cleanupHandled) return false;
if (entry.cleanupHandled || entry.announceHandled) return false;
entry.cleanupHandled = true; entry.cleanupHandled = true;
persistSubagentRuns(); persistSubagentRuns();
return true; return true;
@@ -250,8 +248,7 @@ export function registerSubagentRun(params: {
runId: string; runId: string;
childSessionKey: string; childSessionKey: string;
requesterSessionKey: string; requesterSessionKey: string;
requesterChannel?: string; requesterOrigin?: DeliveryContext;
requesterAccountId?: string;
requesterDisplayKey: string; requesterDisplayKey: string;
task: string; task: string;
cleanup: "delete" | "keep"; cleanup: "delete" | "keep";
@@ -263,12 +260,12 @@ export function registerSubagentRun(params: {
const archiveAfterMs = resolveArchiveAfterMs(cfg); const archiveAfterMs = resolveArchiveAfterMs(cfg);
const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined; const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined;
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, params.runTimeoutSeconds); const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, params.runTimeoutSeconds);
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
subagentRuns.set(params.runId, { subagentRuns.set(params.runId, {
runId: params.runId, runId: params.runId,
childSessionKey: params.childSessionKey, childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey, requesterSessionKey: params.requesterSessionKey,
requesterChannel: params.requesterChannel, requesterOrigin,
requesterAccountId: params.requesterAccountId,
requesterDisplayKey: params.requesterDisplayKey, requesterDisplayKey: params.requesterDisplayKey,
task: params.task, task: params.task,
cleanup: params.cleanup, cleanup: params.cleanup,
@@ -318,12 +315,12 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
mutated = true; mutated = true;
if (mutated) persistSubagentRuns(); if (mutated) persistSubagentRuns();
if (!beginSubagentCleanup(runId)) return; if (!beginSubagentCleanup(runId)) return;
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
void runSubagentAnnounceFlow({ void runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey, childSessionKey: entry.childSessionKey,
childRunId: entry.runId, childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey, requesterSessionKey: entry.requesterSessionKey,
requesterChannel: entry.requesterChannel, requesterOrigin,
requesterAccountId: entry.requesterAccountId,
requesterDisplayKey: entry.requesterDisplayKey, requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task, task: entry.task,
timeoutMs: 30_000, timeoutMs: 30_000,

View File

@@ -9,6 +9,7 @@ import {
normalizeAgentId, normalizeAgentId,
parseAgentSessionKey, parseAgentSessionKey,
} from "../../routing/session-key.js"; } from "../../routing/session-key.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
import type { GatewayMessageChannel } from "../../utils/message-channel.js"; import type { GatewayMessageChannel } from "../../utils/message-channel.js";
import { resolveAgentConfig } from "../agent-scope.js"; import { resolveAgentConfig } from "../agent-scope.js";
import { AGENT_LANE_SUBAGENT } from "../lanes.js"; import { AGENT_LANE_SUBAGENT } from "../lanes.js";
@@ -67,6 +68,10 @@ export function createSessionsSpawnTool(opts?: {
params.cleanup === "keep" || params.cleanup === "delete" params.cleanup === "keep" || params.cleanup === "delete"
? (params.cleanup as "keep" | "delete") ? (params.cleanup as "keep" | "delete")
: "keep"; : "keep";
const requesterOrigin = normalizeDeliveryContext({
channel: opts?.agentChannel,
accountId: opts?.agentAccountId,
});
const runTimeoutSeconds = (() => { const runTimeoutSeconds = (() => {
const explicit = const explicit =
typeof params.runTimeoutSeconds === "number" && Number.isFinite(params.runTimeoutSeconds) typeof params.runTimeoutSeconds === "number" && Number.isFinite(params.runTimeoutSeconds)
@@ -163,7 +168,7 @@ export function createSessionsSpawnTool(opts?: {
} }
const childSystemPrompt = buildSubagentSystemPrompt({ const childSystemPrompt = buildSubagentSystemPrompt({
requesterSessionKey, requesterSessionKey,
requesterChannel: opts?.agentChannel, requesterOrigin,
childSessionKey, childSessionKey,
label: label || undefined, label: label || undefined,
task, task,
@@ -177,7 +182,7 @@ export function createSessionsSpawnTool(opts?: {
params: { params: {
message: task, message: task,
sessionKey: childSessionKey, sessionKey: childSessionKey,
channel: opts?.agentChannel, channel: requesterOrigin?.channel,
idempotencyKey: childIdem, idempotencyKey: childIdem,
deliver: false, deliver: false,
lane: AGENT_LANE_SUBAGENT, lane: AGENT_LANE_SUBAGENT,
@@ -206,8 +211,7 @@ export function createSessionsSpawnTool(opts?: {
runId: childRunId, runId: childRunId,
childSessionKey, childSessionKey,
requesterSessionKey: requesterInternalKey, requesterSessionKey: requesterInternalKey,
requesterChannel: opts?.agentChannel, requesterOrigin,
requesterAccountId: opts?.agentAccountId,
requesterDisplayKey, requesterDisplayKey,
task, task,
cleanup, cleanup,

View File

@@ -101,4 +101,44 @@ describe("deliverAgentCommandResult", () => {
expect.objectContaining({ accountId: "legacy" }), expect.objectContaining({ accountId: "legacy" }),
); );
}); });
it("does not infer accountId for explicit delivery targets", async () => {
const cfg = {} as ClawdbotConfig;
const deps = {} as CliDeps;
const runtime = {
log: vi.fn(),
error: vi.fn(),
} as unknown as RuntimeEnv;
const sessionEntry = {
lastAccountId: "legacy",
} as SessionEntry;
const result = {
payloads: [{ text: "hi" }],
meta: {},
};
const { deliverAgentCommandResult } = await import("./agent/delivery.js");
await deliverAgentCommandResult({
cfg,
deps,
runtime,
opts: {
message: "hello",
deliver: true,
channel: "whatsapp",
to: "+15551234567",
deliveryTargetMode: "explicit",
},
sessionEntry,
result,
payloads: result.payloads,
});
expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith(
expect.objectContaining({ accountId: undefined, mode: "explicit" }),
);
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({ accountId: undefined }),
);
});
}); });

View File

@@ -26,6 +26,19 @@ type RunResult = Awaited<
ReturnType<(typeof import("../../agents/pi-embedded.js"))["runEmbeddedPiAgent"]> ReturnType<(typeof import("../../agents/pi-embedded.js"))["runEmbeddedPiAgent"]>
>; >;
function resolveDeliveryAccountId(params: {
opts: AgentCommandOpts;
sessionEntry?: SessionEntry;
targetMode: ChannelOutboundTargetMode;
}) {
return (
normalizeAccountId(params.opts.accountId) ??
(params.targetMode === "implicit"
? normalizeAccountId(params.sessionEntry?.lastAccountId)
: undefined)
);
}
export async function deliverAgentCommandResult(params: { export async function deliverAgentCommandResult(params: {
cfg: ClawdbotConfig; cfg: ClawdbotConfig;
deps: CliDeps; deps: CliDeps;
@@ -49,9 +62,7 @@ export async function deliverAgentCommandResult(params: {
const targetMode: ChannelOutboundTargetMode = const targetMode: ChannelOutboundTargetMode =
opts.deliveryTargetMode ?? (opts.to ? "explicit" : "implicit"); opts.deliveryTargetMode ?? (opts.to ? "explicit" : "implicit");
const resolvedAccountId = const resolvedAccountId = resolveDeliveryAccountId({ opts, sessionEntry, targetMode });
normalizeAccountId(opts.accountId) ??
(targetMode === "implicit" ? normalizeAccountId(sessionEntry?.lastAccountId) : undefined);
const resolvedTarget = const resolvedTarget =
deliver && isDeliveryChannelKnown && deliveryChannel deliver && isDeliveryChannelKnown && deliveryChannel
? resolveOutboundTarget({ ? resolveOutboundTarget({

View File

@@ -0,0 +1,45 @@
import { describe, expect, it } from "vitest";
import {
deliveryContextKey,
mergeDeliveryContext,
normalizeDeliveryContext,
} from "./delivery-context.js";
describe("delivery context helpers", () => {
it("normalizes channel/to/accountId and drops empty contexts", () => {
expect(
normalizeDeliveryContext({
channel: " whatsapp ",
to: " +1555 ",
accountId: " acct-1 ",
}),
).toEqual({
channel: "whatsapp",
to: "+1555",
accountId: "acct-1",
});
expect(normalizeDeliveryContext({ channel: " " })).toBeUndefined();
});
it("merges primary values over fallback", () => {
const merged = mergeDeliveryContext(
{ channel: "whatsapp", to: "channel:abc" },
{ channel: "slack", to: "channel:def", accountId: "acct" },
);
expect(merged).toEqual({
channel: "whatsapp",
to: "channel:abc",
accountId: "acct",
});
});
it("builds stable keys only when channel and to are present", () => {
expect(deliveryContextKey({ channel: "whatsapp", to: "+1555" })).toBe(
"whatsapp|+1555|",
);
expect(deliveryContextKey({ channel: "whatsapp" })).toBeUndefined();
});
});

View File

@@ -18,3 +18,23 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon
accountId, accountId,
}; };
} }
export function mergeDeliveryContext(
primary?: DeliveryContext,
fallback?: DeliveryContext,
): DeliveryContext | undefined {
const normalizedPrimary = normalizeDeliveryContext(primary);
const normalizedFallback = normalizeDeliveryContext(fallback);
if (!normalizedPrimary && !normalizedFallback) return undefined;
return normalizeDeliveryContext({
channel: normalizedPrimary?.channel ?? normalizedFallback?.channel,
to: normalizedPrimary?.to ?? normalizedFallback?.to,
accountId: normalizedPrimary?.accountId ?? normalizedFallback?.accountId,
});
}
export function deliveryContextKey(context?: DeliveryContext): string | undefined {
const normalized = normalizeDeliveryContext(context);
if (!normalized?.channel || !normalized?.to) return undefined;
return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}`;
}