fix(cron): auto-deliver agent output to explicit targets
This commit is contained in:
@@ -80,7 +80,11 @@ export function registerCronAddCommand(cron: Command) {
|
||||
.option("--thinking <level>", "Thinking level for agent jobs (off|minimal|low|medium|high)")
|
||||
.option("--model <model>", "Model override for agent jobs (provider/model or alias)")
|
||||
.option("--timeout-seconds <n>", "Timeout seconds for agent jobs")
|
||||
.option("--deliver", "Deliver agent output", false)
|
||||
.option(
|
||||
"--deliver",
|
||||
"Deliver agent output (required when using last-route delivery without --to)",
|
||||
false,
|
||||
)
|
||||
.option("--channel <channel>", `Delivery channel (${getCronChannelOptions()})`, "last")
|
||||
.option(
|
||||
"--to <dest>",
|
||||
@@ -159,10 +163,10 @@ export function registerCronAddCommand(cron: Command) {
|
||||
: undefined,
|
||||
timeoutSeconds:
|
||||
timeoutSeconds && Number.isFinite(timeoutSeconds) ? timeoutSeconds : undefined,
|
||||
deliver: Boolean(opts.deliver),
|
||||
deliver: opts.deliver ? true : undefined,
|
||||
channel: typeof opts.channel === "string" ? opts.channel : "last",
|
||||
to: typeof opts.to === "string" && opts.to.trim() ? opts.to.trim() : undefined,
|
||||
bestEffortDeliver: Boolean(opts.bestEffortDeliver),
|
||||
bestEffortDeliver: opts.bestEffortDeliver ? true : undefined,
|
||||
};
|
||||
})();
|
||||
|
||||
|
||||
@@ -35,7 +35,11 @@ export function registerCronEditCommand(cron: Command) {
|
||||
.option("--thinking <level>", "Thinking level for agent jobs")
|
||||
.option("--model <model>", "Model override for agent jobs")
|
||||
.option("--timeout-seconds <n>", "Timeout seconds for agent jobs")
|
||||
.option("--deliver", "Deliver agent output", false)
|
||||
.option(
|
||||
"--deliver",
|
||||
"Deliver agent output (required when using last-route delivery without --to)",
|
||||
false,
|
||||
)
|
||||
.option("--channel <channel>", `Delivery channel (${getCronChannelOptions()})`)
|
||||
.option(
|
||||
"--to <dest>",
|
||||
@@ -125,10 +129,10 @@ export function registerCronEditCommand(cron: Command) {
|
||||
thinking,
|
||||
timeoutSeconds:
|
||||
timeoutSeconds && Number.isFinite(timeoutSeconds) ? timeoutSeconds : undefined,
|
||||
deliver: Boolean(opts.deliver),
|
||||
deliver: opts.deliver ? true : undefined,
|
||||
channel: typeof opts.channel === "string" ? opts.channel : undefined,
|
||||
to: typeof opts.to === "string" ? opts.to : undefined,
|
||||
bestEffortDeliver: Boolean(opts.bestEffortDeliver),
|
||||
bestEffortDeliver: opts.bestEffortDeliver ? true : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -203,6 +203,116 @@ describe("runCronIsolatedAgentTurn", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("auto-delivers when explicit target is set without deliver flag", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const storePath = await writeSessionStore(home);
|
||||
const deps: CliDeps = {
|
||||
sendMessageWhatsApp: vi.fn(),
|
||||
sendMessageTelegram: vi.fn().mockResolvedValue({
|
||||
messageId: "t1",
|
||||
chatId: "123",
|
||||
}),
|
||||
sendMessageDiscord: vi.fn(),
|
||||
sendMessageSignal: vi.fn(),
|
||||
sendMessageIMessage: vi.fn(),
|
||||
};
|
||||
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
|
||||
payloads: [{ text: "hello from cron" }],
|
||||
meta: {
|
||||
durationMs: 5,
|
||||
agentMeta: { sessionId: "s", provider: "p", model: "m" },
|
||||
},
|
||||
});
|
||||
|
||||
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
|
||||
process.env.TELEGRAM_BOT_TOKEN = "";
|
||||
try {
|
||||
const res = await runCronIsolatedAgentTurn({
|
||||
cfg: makeCfg(home, storePath, {
|
||||
channels: { telegram: { botToken: "t-1" } },
|
||||
}),
|
||||
deps,
|
||||
job: makeJob({
|
||||
kind: "agentTurn",
|
||||
message: "do it",
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
}),
|
||||
message: "do it",
|
||||
sessionKey: "cron:job-1",
|
||||
lane: "cron",
|
||||
});
|
||||
|
||||
expect(res.status).toBe("ok");
|
||||
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
|
||||
"123",
|
||||
"hello from cron",
|
||||
expect.objectContaining({ verbose: false }),
|
||||
);
|
||||
} finally {
|
||||
if (prevTelegramToken === undefined) {
|
||||
delete process.env.TELEGRAM_BOT_TOKEN;
|
||||
} else {
|
||||
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it("skips auto-delivery when messaging tool already sent to the target", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const storePath = await writeSessionStore(home);
|
||||
const deps: CliDeps = {
|
||||
sendMessageWhatsApp: vi.fn(),
|
||||
sendMessageTelegram: vi.fn().mockResolvedValue({
|
||||
messageId: "t1",
|
||||
chatId: "123",
|
||||
}),
|
||||
sendMessageDiscord: vi.fn(),
|
||||
sendMessageSignal: vi.fn(),
|
||||
sendMessageIMessage: vi.fn(),
|
||||
};
|
||||
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
|
||||
payloads: [{ text: "sent" }],
|
||||
meta: {
|
||||
durationMs: 5,
|
||||
agentMeta: { sessionId: "s", provider: "p", model: "m" },
|
||||
},
|
||||
didSendViaMessagingTool: true,
|
||||
messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }],
|
||||
});
|
||||
|
||||
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
|
||||
process.env.TELEGRAM_BOT_TOKEN = "";
|
||||
try {
|
||||
const res = await runCronIsolatedAgentTurn({
|
||||
cfg: makeCfg(home, storePath, {
|
||||
channels: { telegram: { botToken: "t-1" } },
|
||||
}),
|
||||
deps,
|
||||
job: makeJob({
|
||||
kind: "agentTurn",
|
||||
message: "do it",
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
}),
|
||||
message: "do it",
|
||||
sessionKey: "cron:job-1",
|
||||
lane: "cron",
|
||||
});
|
||||
|
||||
expect(res.status).toBe("ok");
|
||||
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
if (prevTelegramToken === undefined) {
|
||||
delete process.env.TELEGRAM_BOT_TOKEN;
|
||||
} else {
|
||||
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it("delivers telegram topic targets via channel send", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const storePath = await writeSessionStore(home);
|
||||
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
resolveThinkingDefault,
|
||||
} from "../../agents/model-selection.js";
|
||||
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
||||
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js";
|
||||
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
|
||||
import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js";
|
||||
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
|
||||
@@ -49,6 +50,20 @@ import {
|
||||
} from "./helpers.js";
|
||||
import { resolveCronSession } from "./session.js";
|
||||
|
||||
function matchesMessagingToolDeliveryTarget(
|
||||
target: MessagingToolSend,
|
||||
delivery: { channel: string; to?: string; accountId?: string },
|
||||
): boolean {
|
||||
if (!delivery.to || !target.to) return false;
|
||||
const channel = delivery.channel.trim().toLowerCase();
|
||||
const provider = target.provider?.trim().toLowerCase();
|
||||
if (provider && provider !== "message" && provider !== channel) return false;
|
||||
if (target.accountId && delivery.accountId && target.accountId !== delivery.accountId) {
|
||||
return false;
|
||||
}
|
||||
return target.to === delivery.to;
|
||||
}
|
||||
|
||||
export type RunCronAgentTurnResult = {
|
||||
status: "ok" | "error" | "skipped";
|
||||
summary?: string;
|
||||
@@ -197,14 +212,17 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
params.job.payload.kind === "agentTurn" ? params.job.payload.timeoutSeconds : undefined,
|
||||
});
|
||||
|
||||
const delivery = params.job.payload.kind === "agentTurn" && params.job.payload.deliver === true;
|
||||
const bestEffortDeliver =
|
||||
params.job.payload.kind === "agentTurn" && params.job.payload.bestEffortDeliver === true;
|
||||
const agentPayload = params.job.payload.kind === "agentTurn" ? params.job.payload : null;
|
||||
const deliveryMode =
|
||||
agentPayload?.deliver === true ? "explicit" : agentPayload?.deliver === false ? "off" : "auto";
|
||||
const hasExplicitTarget = Boolean(agentPayload?.to && agentPayload.to.trim());
|
||||
const deliveryRequested =
|
||||
deliveryMode === "explicit" || (deliveryMode === "auto" && hasExplicitTarget);
|
||||
const bestEffortDeliver = agentPayload?.bestEffortDeliver === true;
|
||||
|
||||
const resolvedDelivery = await resolveDeliveryTarget(cfgWithAgentDefaults, agentId, {
|
||||
channel:
|
||||
params.job.payload.kind === "agentTurn" ? (params.job.payload.channel ?? "last") : "last",
|
||||
to: params.job.payload.kind === "agentTurn" ? params.job.payload.to : undefined,
|
||||
channel: agentPayload?.channel ?? "last",
|
||||
to: agentPayload?.to,
|
||||
});
|
||||
|
||||
const base = `[cron:${params.job.id} ${params.job.name}] ${params.message}`.trim();
|
||||
@@ -342,9 +360,20 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
|
||||
// Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content).
|
||||
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
|
||||
const skipHeartbeatDelivery = delivery && isHeartbeatOnlyResponse(payloads, ackMaxChars);
|
||||
const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars);
|
||||
const skipMessagingToolDelivery =
|
||||
deliveryRequested &&
|
||||
deliveryMode === "auto" &&
|
||||
runResult.didSendViaMessagingTool === true &&
|
||||
(runResult.messagingToolSentTargets ?? []).some((target) =>
|
||||
matchesMessagingToolDeliveryTarget(target, {
|
||||
channel: resolvedDelivery.channel,
|
||||
to: resolvedDelivery.to,
|
||||
accountId: resolvedDelivery.accountId,
|
||||
}),
|
||||
);
|
||||
|
||||
if (delivery && !skipHeartbeatDelivery) {
|
||||
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
|
||||
if (!resolvedDelivery.to) {
|
||||
const reason =
|
||||
resolvedDelivery.error?.message ?? "Cron delivery requires a recipient (--to).";
|
||||
|
||||
Reference in New Issue
Block a user