Files
clawdbot/src/infra/outbound/message-action-runner.ts

768 lines
22 KiB
TypeScript

import path from "node:path";
import { fileURLToPath } from "node:url";
import type { AgentToolResult } from "@mariozechner/pi-agent-core";
import {
readNumberParam,
readStringArrayParam,
readStringParam,
} from "../../agents/tools/common.js";
import { parseReplyDirectives } from "../../auto-reply/reply/reply-directives.js";
import { dispatchChannelMessageAction } from "../../channels/plugins/message-actions.js";
import type {
ChannelId,
ChannelMessageActionName,
ChannelThreadingToolContext,
} from "../../channels/plugins/types.js";
import type { ClawdbotConfig } from "../../config/config.js";
import {
isDeliverableMessageChannel,
normalizeMessageChannel,
type GatewayClientMode,
type GatewayClientName,
} from "../../utils/message-channel.js";
import {
listConfiguredMessageChannels,
resolveMessageChannelSelection,
} from "./channel-selection.js";
import { applyTargetToParams } from "./channel-target.js";
import type { OutboundSendDeps } from "./deliver.js";
import type { MessagePollResult, MessageSendResult } from "./message.js";
import {
applyCrossContextDecoration,
buildCrossContextDecoration,
type CrossContextDecoration,
enforceCrossContextPolicy,
shouldApplyCrossContextMarker,
} from "./outbound-policy.js";
import { executePollAction, executeSendAction } from "./outbound-send-service.js";
import { actionHasTarget, actionRequiresTarget } from "./message-action-spec.js";
import { resolveChannelTarget } from "./target-resolver.js";
import { loadWebMedia } from "../../web/media.js";
import { extensionForMime } from "../../media/mime.js";
export type MessageActionRunnerGateway = {
url?: string;
token?: string;
timeoutMs?: number;
clientName: GatewayClientName;
clientDisplayName?: string;
mode: GatewayClientMode;
};
export type RunMessageActionParams = {
cfg: ClawdbotConfig;
action: ChannelMessageActionName;
params: Record<string, unknown>;
defaultAccountId?: string;
toolContext?: ChannelThreadingToolContext;
gateway?: MessageActionRunnerGateway;
deps?: OutboundSendDeps;
sessionKey?: string;
agentId?: string;
dryRun?: boolean;
};
export type MessageActionRunResult =
| {
kind: "send";
channel: ChannelId;
action: "send";
to: string;
handledBy: "plugin" | "core";
payload: unknown;
toolResult?: AgentToolResult<unknown>;
sendResult?: MessageSendResult;
dryRun: boolean;
}
| {
kind: "broadcast";
channel: ChannelId;
action: "broadcast";
handledBy: "core" | "dry-run";
payload: {
results: Array<{
channel: ChannelId;
to: string;
ok: boolean;
error?: string;
result?: MessageSendResult;
}>;
};
dryRun: boolean;
}
| {
kind: "poll";
channel: ChannelId;
action: "poll";
to: string;
handledBy: "plugin" | "core";
payload: unknown;
toolResult?: AgentToolResult<unknown>;
pollResult?: MessagePollResult;
dryRun: boolean;
}
| {
kind: "action";
channel: ChannelId;
action: Exclude<ChannelMessageActionName, "send" | "poll">;
handledBy: "plugin" | "dry-run";
payload: unknown;
toolResult?: AgentToolResult<unknown>;
dryRun: boolean;
};
export function getToolResult(
result: MessageActionRunResult,
): AgentToolResult<unknown> | undefined {
return "toolResult" in result ? result.toolResult : undefined;
}
function extractToolPayload(result: AgentToolResult<unknown>): unknown {
if (result.details !== undefined) return result.details;
const textBlock = Array.isArray(result.content)
? result.content.find(
(block) =>
block &&
typeof block === "object" &&
(block as { type?: unknown }).type === "text" &&
typeof (block as { text?: unknown }).text === "string",
)
: undefined;
const text = (textBlock as { text?: string } | undefined)?.text;
if (text) {
try {
return JSON.parse(text);
} catch {
return text;
}
}
return result.content ?? result;
}
function applyCrossContextMessageDecoration({
params,
message,
decoration,
preferEmbeds,
}: {
params: Record<string, unknown>;
message: string;
decoration: CrossContextDecoration;
preferEmbeds: boolean;
}): string {
const applied = applyCrossContextDecoration({
message,
decoration,
preferEmbeds,
});
params.message = applied.message;
if (applied.embeds?.length) {
params.embeds = applied.embeds;
}
return applied.message;
}
async function maybeApplyCrossContextMarker(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
action: ChannelMessageActionName;
target: string;
toolContext?: ChannelThreadingToolContext;
accountId?: string | null;
args: Record<string, unknown>;
message: string;
preferEmbeds: boolean;
}): Promise<string> {
if (!shouldApplyCrossContextMarker(params.action) || !params.toolContext) {
return params.message;
}
const decoration = await buildCrossContextDecoration({
cfg: params.cfg,
channel: params.channel,
target: params.target,
toolContext: params.toolContext,
accountId: params.accountId ?? undefined,
});
if (!decoration) return params.message;
return applyCrossContextMessageDecoration({
params: params.args,
message: params.message,
decoration,
preferEmbeds: params.preferEmbeds,
});
}
function readBooleanParam(params: Record<string, unknown>, key: string): boolean | undefined {
const raw = params[key];
if (typeof raw === "boolean") return raw;
if (typeof raw === "string") {
const trimmed = raw.trim().toLowerCase();
if (trimmed === "true") return true;
if (trimmed === "false") return false;
}
return undefined;
}
function resolveAttachmentMaxBytes(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
accountId?: string | null;
}): number | undefined {
const fallback = params.cfg.agents?.defaults?.mediaMaxMb;
if (params.channel !== "bluebubbles") {
return typeof fallback === "number" ? fallback * 1024 * 1024 : undefined;
}
const accountId = typeof params.accountId === "string" ? params.accountId.trim() : "";
const channelCfg = params.cfg.channels?.bluebubbles;
const accountCfg = accountId ? channelCfg?.accounts?.[accountId] : undefined;
const limitMb =
accountCfg?.mediaMaxMb ?? channelCfg?.mediaMaxMb ?? params.cfg.agents?.defaults?.mediaMaxMb;
return typeof limitMb === "number" ? limitMb * 1024 * 1024 : undefined;
}
function inferAttachmentFilename(params: {
mediaHint?: string;
contentType?: string;
}): string | undefined {
const mediaHint = params.mediaHint?.trim();
if (mediaHint) {
try {
if (mediaHint.startsWith("file://")) {
const filePath = fileURLToPath(mediaHint);
const base = path.basename(filePath);
if (base) return base;
} else if (/^https?:\/\//i.test(mediaHint)) {
const url = new URL(mediaHint);
const base = path.basename(url.pathname);
if (base) return base;
} else {
const base = path.basename(mediaHint);
if (base) return base;
}
} catch {
// fall through to content-type based default
}
}
const ext = params.contentType ? extensionForMime(params.contentType) : undefined;
return ext ? `attachment${ext}` : "attachment";
}
function normalizeBase64Payload(params: { base64?: string; contentType?: string }): {
base64?: string;
contentType?: string;
} {
if (!params.base64) return { base64: params.base64, contentType: params.contentType };
const match = /^data:([^;]+);base64,(.*)$/i.exec(params.base64.trim());
if (!match) return { base64: params.base64, contentType: params.contentType };
const [, mime, payload] = match;
return {
base64: payload,
contentType: params.contentType ?? mime,
};
}
async function hydrateSendAttachmentParams(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
accountId?: string | null;
args: Record<string, unknown>;
action: ChannelMessageActionName;
dryRun?: boolean;
}): Promise<void> {
if (params.action !== "sendAttachment") return;
const mediaHint = readStringParam(params.args, "media", { trim: false });
const fileHint =
readStringParam(params.args, "path", { trim: false }) ??
readStringParam(params.args, "filePath", { trim: false });
const contentTypeParam =
readStringParam(params.args, "contentType") ?? readStringParam(params.args, "mimeType");
const caption = readStringParam(params.args, "caption", { allowEmpty: true })?.trim();
const message = readStringParam(params.args, "message", { allowEmpty: true })?.trim();
if (!caption && message) params.args.caption = message;
const rawBuffer = readStringParam(params.args, "buffer", { trim: false });
const normalized = normalizeBase64Payload({
base64: rawBuffer,
contentType: contentTypeParam ?? undefined,
});
if (normalized.base64 !== rawBuffer && normalized.base64) {
params.args.buffer = normalized.base64;
if (normalized.contentType && !contentTypeParam) {
params.args.contentType = normalized.contentType;
}
}
const filename = readStringParam(params.args, "filename");
const mediaSource = mediaHint ?? fileHint;
if (!params.dryRun && !readStringParam(params.args, "buffer", { trim: false }) && mediaSource) {
const maxBytes = resolveAttachmentMaxBytes({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
});
const media = await loadWebMedia(mediaSource, maxBytes);
params.args.buffer = media.buffer.toString("base64");
if (!contentTypeParam && media.contentType) {
params.args.contentType = media.contentType;
}
if (!filename) {
params.args.filename = inferAttachmentFilename({
mediaHint: media.fileName ?? mediaSource,
contentType: media.contentType ?? contentTypeParam ?? undefined,
});
}
} else if (!filename) {
params.args.filename = inferAttachmentFilename({
mediaHint: mediaSource,
contentType: contentTypeParam ?? undefined,
});
}
}
function parseButtonsParam(params: Record<string, unknown>): void {
const raw = params.buttons;
if (typeof raw !== "string") return;
const trimmed = raw.trim();
if (!trimmed) {
delete params.buttons;
return;
}
try {
params.buttons = JSON.parse(trimmed) as unknown;
} catch {
throw new Error("--buttons must be valid JSON");
}
}
async function resolveChannel(cfg: ClawdbotConfig, params: Record<string, unknown>) {
const channelHint = readStringParam(params, "channel");
const selection = await resolveMessageChannelSelection({
cfg,
channel: channelHint,
});
return selection.channel;
}
async function resolveActionTarget(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
action: ChannelMessageActionName;
args: Record<string, unknown>;
accountId?: string | null;
}): Promise<void> {
const toRaw = typeof params.args.to === "string" ? params.args.to.trim() : "";
if (toRaw) {
const resolved = await resolveChannelTarget({
cfg: params.cfg,
channel: params.channel,
input: toRaw,
accountId: params.accountId ?? undefined,
});
if (resolved.ok) {
params.args.to = resolved.target.to;
} else {
throw resolved.error;
}
}
const channelIdRaw =
typeof params.args.channelId === "string" ? params.args.channelId.trim() : "";
if (channelIdRaw) {
const resolved = await resolveChannelTarget({
cfg: params.cfg,
channel: params.channel,
input: channelIdRaw,
accountId: params.accountId ?? undefined,
preferredKind: "group",
});
if (resolved.ok) {
if (resolved.target.kind === "user") {
throw new Error(`Channel id "${channelIdRaw}" resolved to a user target.`);
}
params.args.channelId = resolved.target.to.replace(/^(channel|group):/i, "");
} else {
throw resolved.error;
}
}
}
type ResolvedActionContext = {
cfg: ClawdbotConfig;
params: Record<string, unknown>;
channel: ChannelId;
accountId?: string | null;
dryRun: boolean;
gateway?: MessageActionRunnerGateway;
input: RunMessageActionParams;
};
function resolveGateway(input: RunMessageActionParams): MessageActionRunnerGateway | undefined {
if (!input.gateway) return undefined;
return {
url: input.gateway.url,
token: input.gateway.token,
timeoutMs: input.gateway.timeoutMs,
clientName: input.gateway.clientName,
clientDisplayName: input.gateway.clientDisplayName,
mode: input.gateway.mode,
};
}
async function handleBroadcastAction(
input: RunMessageActionParams,
params: Record<string, unknown>,
): Promise<MessageActionRunResult> {
const broadcastEnabled = input.cfg.tools?.message?.broadcast?.enabled !== false;
if (!broadcastEnabled) {
throw new Error("Broadcast is disabled. Set tools.message.broadcast.enabled to true.");
}
const rawTargets = readStringArrayParam(params, "targets", { required: true }) ?? [];
if (rawTargets.length === 0) {
throw new Error("Broadcast requires at least one target in --targets.");
}
const channelHint = readStringParam(params, "channel");
const configured = await listConfiguredMessageChannels(input.cfg);
if (configured.length === 0) {
throw new Error("Broadcast requires at least one configured channel.");
}
const targetChannels =
channelHint && channelHint.trim().toLowerCase() !== "all"
? [await resolveChannel(input.cfg, { channel: channelHint })]
: configured;
const results: Array<{
channel: ChannelId;
to: string;
ok: boolean;
error?: string;
result?: MessageSendResult;
}> = [];
for (const targetChannel of targetChannels) {
for (const target of rawTargets) {
try {
const resolved = await resolveChannelTarget({
cfg: input.cfg,
channel: targetChannel,
input: target,
});
if (!resolved.ok) throw resolved.error;
const sendResult = await runMessageAction({
...input,
action: "send",
params: {
...params,
channel: targetChannel,
target: resolved.target.to,
},
});
results.push({
channel: targetChannel,
to: resolved.target.to,
ok: true,
result: sendResult.kind === "send" ? sendResult.sendResult : undefined,
});
} catch (err) {
results.push({
channel: targetChannel,
to: target,
ok: false,
error: err instanceof Error ? err.message : String(err),
});
}
}
}
return {
kind: "broadcast",
channel: (targetChannels[0] ?? "discord") as ChannelId,
action: "broadcast",
handledBy: input.dryRun ? "dry-run" : "core",
payload: { results },
dryRun: Boolean(input.dryRun),
};
}
async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActionRunResult> {
const { cfg, params, channel, accountId, dryRun, gateway, input } = ctx;
const action: ChannelMessageActionName = "send";
const to = readStringParam(params, "to", { required: true });
const mediaHint = readStringParam(params, "media", { trim: false });
let message =
readStringParam(params, "message", {
required: !mediaHint,
allowEmpty: true,
}) ?? "";
const parsed = parseReplyDirectives(message);
message = parsed.text;
params.message = message;
if (!params.replyTo && parsed.replyToId) params.replyTo = parsed.replyToId;
if (!params.media) {
params.media = parsed.mediaUrls?.[0] || parsed.mediaUrl || undefined;
}
message = await maybeApplyCrossContextMarker({
cfg,
channel,
action,
target: to,
toolContext: input.toolContext,
accountId,
args: params,
message,
preferEmbeds: true,
});
const mediaUrl = readStringParam(params, "media", { trim: false });
const gifPlayback = readBooleanParam(params, "gifPlayback") ?? false;
const bestEffort = readBooleanParam(params, "bestEffort");
const send = await executeSendAction({
ctx: {
cfg,
channel,
params,
accountId: accountId ?? undefined,
gateway,
toolContext: input.toolContext,
deps: input.deps,
dryRun,
mirror:
input.sessionKey && !dryRun
? {
sessionKey: input.sessionKey,
agentId: input.agentId,
}
: undefined,
},
to,
message,
mediaUrl: mediaUrl || undefined,
gifPlayback,
bestEffort: bestEffort ?? undefined,
});
return {
kind: "send",
channel,
action,
to,
handledBy: send.handledBy,
payload: send.payload,
toolResult: send.toolResult,
sendResult: send.sendResult,
dryRun,
};
}
async function handlePollAction(ctx: ResolvedActionContext): Promise<MessageActionRunResult> {
const { cfg, params, channel, accountId, dryRun, gateway, input } = ctx;
const action: ChannelMessageActionName = "poll";
const to = readStringParam(params, "to", { required: true });
const question = readStringParam(params, "pollQuestion", {
required: true,
});
const options = readStringArrayParam(params, "pollOption", { required: true }) ?? [];
if (options.length < 2) {
throw new Error("pollOption requires at least two values");
}
const allowMultiselect = readBooleanParam(params, "pollMulti") ?? false;
const durationHours = readNumberParam(params, "pollDurationHours", {
integer: true,
});
const maxSelections = allowMultiselect ? Math.max(2, options.length) : 1;
const base = typeof params.message === "string" ? params.message : "";
await maybeApplyCrossContextMarker({
cfg,
channel,
action,
target: to,
toolContext: input.toolContext,
accountId,
args: params,
message: base,
preferEmbeds: true,
});
const poll = await executePollAction({
ctx: {
cfg,
channel,
params,
accountId: accountId ?? undefined,
gateway,
toolContext: input.toolContext,
dryRun,
},
to,
question,
options,
maxSelections,
durationHours: durationHours ?? undefined,
});
return {
kind: "poll",
channel,
action,
to,
handledBy: poll.handledBy,
payload: poll.payload,
toolResult: poll.toolResult,
pollResult: poll.pollResult,
dryRun,
};
}
async function handlePluginAction(ctx: ResolvedActionContext): Promise<MessageActionRunResult> {
const { cfg, params, channel, accountId, dryRun, gateway, input } = ctx;
const action = input.action as Exclude<ChannelMessageActionName, "send" | "poll" | "broadcast">;
if (dryRun) {
return {
kind: "action",
channel,
action,
handledBy: "dry-run",
payload: { ok: true, dryRun: true, channel, action },
dryRun: true,
};
}
const handled = await dispatchChannelMessageAction({
channel,
action,
cfg,
params,
accountId: accountId ?? undefined,
gateway,
toolContext: input.toolContext,
dryRun,
});
if (!handled) {
throw new Error(`Message action ${action} not supported for channel ${channel}.`);
}
return {
kind: "action",
channel,
action,
handledBy: "plugin",
payload: extractToolPayload(handled),
toolResult: handled,
dryRun,
};
}
export async function runMessageAction(
input: RunMessageActionParams,
): Promise<MessageActionRunResult> {
const cfg = input.cfg;
const params = { ...input.params };
parseButtonsParam(params);
const action = input.action;
if (action === "broadcast") {
return handleBroadcastAction(input, params);
}
const explicitTarget = typeof params.target === "string" ? params.target.trim() : "";
const hasLegacyTarget =
(typeof params.to === "string" && params.to.trim().length > 0) ||
(typeof params.channelId === "string" && params.channelId.trim().length > 0);
if (
!explicitTarget &&
!hasLegacyTarget &&
actionRequiresTarget(action) &&
!actionHasTarget(action, params)
) {
const inferredTarget = input.toolContext?.currentChannelId?.trim();
if (inferredTarget) {
params.target = inferredTarget;
}
}
const explicitChannel = typeof params.channel === "string" ? params.channel.trim() : "";
if (!explicitChannel) {
const inferredChannel = normalizeMessageChannel(input.toolContext?.currentChannelProvider);
if (inferredChannel && isDeliverableMessageChannel(inferredChannel)) {
params.channel = inferredChannel;
}
}
applyTargetToParams({ action, args: params });
if (actionRequiresTarget(action)) {
if (!actionHasTarget(action, params)) {
throw new Error(`Action ${action} requires a target.`);
}
}
const channel = await resolveChannel(cfg, params);
const accountId = readStringParam(params, "accountId") ?? input.defaultAccountId;
const dryRun = Boolean(input.dryRun ?? readBooleanParam(params, "dryRun"));
await hydrateSendAttachmentParams({
cfg,
channel,
accountId,
args: params,
action,
dryRun,
});
await resolveActionTarget({
cfg,
channel,
action,
args: params,
accountId,
});
enforceCrossContextPolicy({
channel,
action,
args: params,
toolContext: input.toolContext,
cfg,
});
await hydrateSendAttachmentParams({
cfg,
channel,
accountId,
args: params,
dryRun,
});
const gateway = resolveGateway(input);
if (action === "send") {
return handleSendAction({
cfg,
params,
channel,
accountId,
dryRun,
gateway,
input,
});
}
if (action === "poll") {
return handlePollAction({
cfg,
params,
channel,
accountId,
dryRun,
gateway,
input,
});
}
return handlePluginAction({
cfg,
params,
channel,
accountId,
dryRun,
gateway,
input,
});
}