From 46015a3dd8b1a9c4171eb2e3689a3d93c6c60979 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 17 Jan 2026 03:17:08 +0000 Subject: [PATCH] feat: add cross-context messaging resolver Co-authored-by: Thinh Dinh --- docs/cli/message.md | 14 +- src/agents/clawdbot-tools.ts | 1 + src/agents/tools/discord-actions-messaging.ts | 3 + src/agents/tools/message-tool.ts | 4 + src/auto-reply/reply/agent-runner-utils.ts | 9 +- .../plugins/actions/discord/handle-action.ts | 2 + src/channels/plugins/discord.ts | 31 +- src/channels/plugins/message-action-names.ts | 1 + src/channels/plugins/message-actions.ts | 2 +- src/channels/plugins/types.adapters.ts | 14 + src/channels/plugins/types.core.ts | 1 + src/cli/program/message/register.broadcast.ts | 18 + src/cli/program/register.message.ts | 2 + src/commands/message-format.ts | 28 ++ src/commands/message.test.ts | 6 +- src/config/schema.ts | 21 ++ src/config/types.tools.ts | 27 ++ src/config/zod-schema.agent-runtime.ts | 23 ++ src/discord/send.outbound.ts | 3 + src/discord/send.shared.ts | 10 +- .../outbound/message-action-runner.test.ts | 141 +++++--- src/infra/outbound/message-action-runner.ts | 246 +++++++++++++- src/infra/outbound/target-resolver.ts | 312 ++++++++++++++++++ 23 files changed, 859 insertions(+), 60 deletions(-) create mode 100644 src/cli/program/message/register.broadcast.ts create mode 100644 src/infra/outbound/target-resolver.ts diff --git a/docs/cli/message.md b/docs/cli/message.md index 432db4b66..723825931 100644 --- a/docs/cli/message.md +++ b/docs/cli/message.md @@ -24,16 +24,21 @@ Channel selection: Target formats (`--to`): - WhatsApp: E.164 or group JID - Telegram: chat id or `@username` -- Discord: `channel:` or `user:` (or `<@id>` mention; raw numeric ids are rejected) +- Discord: `channel:` or `user:` (or `<@id>` mention; raw numeric ids are treated as channels) - Slack: `channel:` or `user:` (raw channel id is accepted) - Signal: `+E.164`, `group:`, `signal:+E.164`, `signal:group:`, or `username:`/`u:` - iMessage: handle, `chat_id:`, `chat_guid:`, or `chat_identifier:` - MS Teams: conversation id (`19:...@thread.tacv2`) or `conversation:` or `user:` +Name lookup: +- For supported providers (Discord/Slack/etc), channel names like `Help` or `#help` are resolved via the directory cache. +- On cache miss, Clawdbot will attempt a live directory lookup when the provider supports it. + ## Common flags - `--channel ` - `--account ` +- `--targets ` (repeat; broadcast only) - `--json` - `--dry-run` - `--verbose` @@ -166,6 +171,13 @@ Target formats (`--to`): - `ban`: `--guild-id`, `--user-id` (+ `--delete-days`, `--reason`) - `timeout` also supports `--reason` +### Broadcast + +- `broadcast` + - Channels: any configured channel; use `--channel all` to target all providers + - Required: `--targets` (repeat) + - Optional: `--message`, `--media`, `--dry-run` + ## Examples Send a Discord reply: diff --git a/src/agents/clawdbot-tools.ts b/src/agents/clawdbot-tools.ts index b179795bc..60a7a918d 100644 --- a/src/agents/clawdbot-tools.ts +++ b/src/agents/clawdbot-tools.ts @@ -83,6 +83,7 @@ export function createClawdbotTools(options?: { agentSessionKey: options?.agentSessionKey, config: options?.config, currentChannelId: options?.currentChannelId, + currentChannelProvider: options?.agentChannel, currentThreadTs: options?.currentThreadTs, replyToMode: options?.replyToMode, hasRepliedRef: options?.hasRepliedRef, diff --git a/src/agents/tools/discord-actions-messaging.ts b/src/agents/tools/discord-actions-messaging.ts index df1cc7d1c..309187bdf 100644 --- a/src/agents/tools/discord-actions-messaging.ts +++ b/src/agents/tools/discord-actions-messaging.ts @@ -210,9 +210,12 @@ export async function handleDiscordMessagingAction( }); const mediaUrl = readStringParam(params, "mediaUrl"); const replyTo = readStringParam(params, "replyTo"); + const embeds = + Array.isArray(params.embeds) && params.embeds.length > 0 ? params.embeds : undefined; const result = await sendMessageDiscord(to, content, { mediaUrl, replyTo, + embeds, }); return jsonResult({ ok: true, result }); } diff --git a/src/agents/tools/message-tool.ts b/src/agents/tools/message-tool.ts index d5d72f1e5..bf7e937aa 100644 --- a/src/agents/tools/message-tool.ts +++ b/src/agents/tools/message-tool.ts @@ -26,6 +26,7 @@ const AllMessageActions = CHANNEL_MESSAGE_ACTION_NAMES; const MessageToolCommonSchema = { channel: Type.Optional(Type.String()), to: Type.Optional(Type.String()), + targets: Type.Optional(Type.Array(Type.String())), message: Type.Optional(Type.String()), media: Type.Optional(Type.String()), buttons: Type.Optional( @@ -127,6 +128,7 @@ type MessageToolOptions = { agentSessionKey?: string; config?: ClawdbotConfig; currentChannelId?: string; + currentChannelProvider?: string; currentThreadTs?: string; replyToMode?: "off" | "first" | "all"; hasRepliedRef?: { value: boolean }; @@ -175,11 +177,13 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool { const toolContext = options?.currentChannelId || + options?.currentChannelProvider || options?.currentThreadTs || options?.replyToMode || options?.hasRepliedRef ? { currentChannelId: options?.currentChannelId, + currentChannelProvider: options?.currentChannelProvider, currentThreadTs: options?.currentThreadTs, replyToMode: options?.replyToMode, hasRepliedRef: options?.hasRepliedRef, diff --git a/src/auto-reply/reply/agent-runner-utils.ts b/src/auto-reply/reply/agent-runner-utils.ts index 4110dfb18..62ce6ff71 100644 --- a/src/auto-reply/reply/agent-runner-utils.ts +++ b/src/auto-reply/reply/agent-runner-utils.ts @@ -32,7 +32,7 @@ export function buildThreadingToolContext(params: { : provider === "imessage" && sessionCtx.ChatType === "direct" ? (sessionCtx.From ?? sessionCtx.To) : sessionCtx.To; - return ( + const context = dock.threading.buildToolContext({ cfg: config, accountId: sessionCtx.AccountId, @@ -44,8 +44,11 @@ export function buildThreadingToolContext(params: { MessageThreadId: sessionCtx.MessageThreadId, }, hasRepliedRef, - }) ?? {} - ); + }) ?? {}; + return { + ...context, + currentChannelProvider: provider, + }; } export const isBunFetchSocketError = (message?: string) => diff --git a/src/channels/plugins/actions/discord/handle-action.ts b/src/channels/plugins/actions/discord/handle-action.ts index b2b0a61b0..b052896ec 100644 --- a/src/channels/plugins/actions/discord/handle-action.ts +++ b/src/channels/plugins/actions/discord/handle-action.ts @@ -32,6 +32,7 @@ export async function handleDiscordMessageAction( }); const mediaUrl = readStringParam(params, "media", { trim: false }); const replyTo = readStringParam(params, "replyTo"); + const embeds = Array.isArray(params.embeds) ? params.embeds : undefined; return await handleDiscordAction( { action: "sendMessage", @@ -39,6 +40,7 @@ export async function handleDiscordMessageAction( content, mediaUrl: mediaUrl ?? undefined, replyTo: replyTo ?? undefined, + embeds, }, cfg, ); diff --git a/src/channels/plugins/discord.ts b/src/channels/plugins/discord.ts index 1f2ec5508..9b87d5da9 100644 --- a/src/channels/plugins/discord.ts +++ b/src/channels/plugins/discord.ts @@ -9,7 +9,11 @@ import { collectDiscordAuditChannelIds, } from "../../discord/audit.js"; import { probeDiscord } from "../../discord/probe.js"; -import { sendMessageDiscord, sendPollDiscord } from "../../discord/send.js"; +import { + listGuildChannelsDiscord, + sendMessageDiscord, + sendPollDiscord, +} from "../../discord/send.js"; import { shouldLogVerbose } from "../../globals.js"; import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "../../routing/session-key.js"; import { getChatChannelMeta } from "../registry.js"; @@ -209,6 +213,31 @@ export const discordPlugin: ChannelPlugin = { .map((id) => ({ kind: "group", id }) as const); return groups; }, + listGroupsLive: async ({ cfg, accountId, query, limit }) => { + const account = resolveDiscordAccount({ cfg, accountId }); + const q = query?.trim().toLowerCase() || ""; + const guildIds = Object.keys(account.config.guilds ?? {}).filter((id) => /^\d+$/.test(id)); + const rows: Array<{ kind: "group"; id: string; name?: string; raw?: unknown }> = []; + for (const guildId of guildIds) { + const channels = await listGuildChannelsDiscord(guildId, { + accountId: account.accountId, + }); + for (const channel of channels) { + const name = typeof channel.name === "string" ? channel.name : undefined; + if (q && name && !name.toLowerCase().includes(q)) continue; + rows.push({ + kind: "group", + id: `channel:${channel.id}`, + name: name ?? undefined, + raw: channel, + }); + } + } + const filtered = q ? rows.filter((row) => row.name?.toLowerCase().includes(q)) : rows; + const limited = + typeof limit === "number" && limit > 0 ? filtered.slice(0, limit) : filtered; + return limited; + }, }, actions: discordMessageActions, setup: { diff --git a/src/channels/plugins/message-action-names.ts b/src/channels/plugins/message-action-names.ts index a898e4628..3ed40b9ec 100644 --- a/src/channels/plugins/message-action-names.ts +++ b/src/channels/plugins/message-action-names.ts @@ -1,5 +1,6 @@ export const CHANNEL_MESSAGE_ACTION_NAMES = [ "send", + "broadcast", "poll", "react", "reactions", diff --git a/src/channels/plugins/message-actions.ts b/src/channels/plugins/message-actions.ts index 5134c7ed0..59c712928 100644 --- a/src/channels/plugins/message-actions.ts +++ b/src/channels/plugins/message-actions.ts @@ -5,7 +5,7 @@ import { getChannelPlugin, listChannelPlugins } from "./index.js"; import type { ChannelMessageActionContext, ChannelMessageActionName } from "./types.js"; export function listChannelMessageActions(cfg: ClawdbotConfig): ChannelMessageActionName[] { - const actions = new Set(["send"]); + const actions = new Set(["send", "broadcast"]); for (const plugin of listChannelPlugins()) { const list = plugin.actions?.listActions?.({ cfg }); if (!list) continue; diff --git a/src/channels/plugins/types.adapters.ts b/src/channels/plugins/types.adapters.ts index f2ba9996a..b464c0ed3 100644 --- a/src/channels/plugins/types.adapters.ts +++ b/src/channels/plugins/types.adapters.ts @@ -233,6 +233,13 @@ export type ChannelDirectoryAdapter = { limit?: number | null; runtime: RuntimeEnv; }) => Promise; + listPeersLive?: (params: { + cfg: ClawdbotConfig; + accountId?: string | null; + query?: string | null; + limit?: number | null; + runtime: RuntimeEnv; + }) => Promise; listGroups?: (params: { cfg: ClawdbotConfig; accountId?: string | null; @@ -240,6 +247,13 @@ export type ChannelDirectoryAdapter = { limit?: number | null; runtime: RuntimeEnv; }) => Promise; + listGroupsLive?: (params: { + cfg: ClawdbotConfig; + accountId?: string | null; + query?: string | null; + limit?: number | null; + runtime: RuntimeEnv; + }) => Promise; listGroupMembers?: (params: { cfg: ClawdbotConfig; accountId?: string | null; diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index 227c35169..6adee27b5 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -207,6 +207,7 @@ export type ChannelThreadingContext = { export type ChannelThreadingToolContext = { currentChannelId?: string; + currentChannelProvider?: ChannelId; currentThreadTs?: string; replyToMode?: "off" | "first" | "all"; hasRepliedRef?: { value: boolean }; diff --git a/src/cli/program/message/register.broadcast.ts b/src/cli/program/message/register.broadcast.ts new file mode 100644 index 000000000..ed7ac4c7c --- /dev/null +++ b/src/cli/program/message/register.broadcast.ts @@ -0,0 +1,18 @@ +import type { Command } from "commander"; +import type { MessageCliHelpers } from "./helpers.js"; + +export function registerMessageBroadcastCommand(message: Command, helpers: MessageCliHelpers) { + helpers + .withMessageBase( + message.command("broadcast").description("Broadcast a message to multiple targets"), + ) + .requiredOption( + "--targets ", + "Targets to broadcast to (repeatable, accepts names or ids)", + ) + .option("--message ", "Message to send") + .option("--media ", "Media URL") + .action(async (options: Record) => { + await helpers.runMessageAction("broadcast", options); + }); +} diff --git a/src/cli/program/register.message.ts b/src/cli/program/register.message.ts index 76c157f1b..df211fd6c 100644 --- a/src/cli/program/register.message.ts +++ b/src/cli/program/register.message.ts @@ -18,6 +18,7 @@ import { registerMessageReactionsCommands } from "./message/register.reactions.j import { registerMessageReadEditDeleteCommands } from "./message/register.read-edit-delete.js"; import { registerMessageSendCommand } from "./message/register.send.js"; import { registerMessageThreadCommands } from "./message/register.thread.js"; +import { registerMessageBroadcastCommand } from "./message/register.broadcast.js"; export function registerMessageCommands(program: Command, ctx: ProgramContext) { const message = program @@ -41,6 +42,7 @@ ${theme.muted("Docs:")} ${formatDocsLink("/cli/message", "docs.clawd.bot/cli/mes const helpers = createMessageCliHelpers(message, ctx.messageChannelOptions); registerMessageSendCommand(message, helpers); + registerMessageBroadcastCommand(message, helpers); registerMessagePollCommand(message, helpers); registerMessageReactionsCommands(message, helpers); registerMessageReadEditDeleteCommands(message, helpers); diff --git a/src/commands/message-format.ts b/src/commands/message-format.ts index c531c40b4..25dc069dc 100644 --- a/src/commands/message-format.ts +++ b/src/commands/message-format.ts @@ -238,6 +238,34 @@ export function formatMessageCliText(result: MessageActionRunResult): string[] { return [muted(`[dry-run] would run ${result.action} via ${result.channel}`)]; } + if (result.kind === "broadcast") { + const results = result.payload.results ?? []; + const rows = results.map((entry) => ({ + Channel: resolveChannelLabel(entry.channel), + Target: shortenText(entry.to, 36), + Status: entry.ok ? "ok" : "error", + Error: entry.ok ? "" : shortenText(entry.error ?? "unknown error", 48), + })); + const okCount = results.filter((entry) => entry.ok).length; + const total = results.length; + const headingLine = ok( + `✅ Broadcast complete (${okCount}/${total} succeeded, ${total - okCount} failed)`, + ); + return [ + headingLine, + renderTable({ + width: opts.width, + columns: [ + { key: "Channel", header: "Channel", minWidth: 10 }, + { key: "Target", header: "Target", minWidth: 12, flex: true }, + { key: "Status", header: "Status", minWidth: 6 }, + { key: "Error", header: "Error", minWidth: 20, flex: true }, + ], + rows: rows.slice(0, 50), + }).trimEnd(), + ]; + } + if (result.kind === "send") { if (result.handledBy === "core" && result.sendResult) { const send = result.sendResult; diff --git a/src/commands/message.test.ts b/src/commands/message.test.ts index 6a3841f57..a8a6e9639 100644 --- a/src/commands/message.test.ts +++ b/src/commands/message.test.ts @@ -120,7 +120,7 @@ describe("messageCommand", () => { { action: "send", channel: "whatsapp", - to: "+1", + to: "+15551234567", message: "hi", }, deps, @@ -135,7 +135,7 @@ describe("messageCommand", () => { { action: "poll", channel: "discord", - to: "channel:123", + to: "channel:123456789", pollQuestion: "Snack?", pollOption: ["Pizza", "Sushi"], }, @@ -145,7 +145,7 @@ describe("messageCommand", () => { expect(handleDiscordAction).toHaveBeenCalledWith( expect.objectContaining({ action: "poll", - to: "channel:123", + to: "channel:123456789", }), expect.any(Object), ); diff --git a/src/config/schema.ts b/src/config/schema.ts index 607dccb50..bffe39f49 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -110,6 +110,13 @@ const FIELD_LABELS: Record = { "agents.list[].tools.byProvider": "Agent Tool Policy by Provider", "tools.exec.applyPatch.enabled": "Enable apply_patch", "tools.exec.applyPatch.allowModels": "apply_patch Model Allowlist", + "tools.message.allowCrossContextSend": "Allow Cross-Context Messaging", + "tools.message.crossContext.allowWithinProvider": "Allow Cross-Context (Same Provider)", + "tools.message.crossContext.allowAcrossProviders": "Allow Cross-Context (Across Providers)", + "tools.message.crossContext.marker.enabled": "Cross-Context Marker", + "tools.message.crossContext.marker.prefix": "Cross-Context Marker Prefix", + "tools.message.crossContext.marker.suffix": "Cross-Context Marker Suffix", + "tools.message.broadcast.enabled": "Enable Message Broadcast", "tools.web.search.enabled": "Enable Web Search Tool", "tools.web.search.provider": "Web Search Provider", "tools.web.search.apiKey": "Brave Search API Key", @@ -256,6 +263,20 @@ const FIELD_HELP: Record = { "Experimental. Enables apply_patch for OpenAI models when allowed by tool policy.", "tools.exec.applyPatch.allowModels": 'Optional allowlist of model ids (e.g. "gpt-5.2" or "openai/gpt-5.2").', + "tools.message.allowCrossContextSend": + "Legacy override: allow cross-context sends across all providers.", + "tools.message.crossContext.allowWithinProvider": + "Allow sends to other channels within the same provider (default: true).", + "tools.message.crossContext.allowAcrossProviders": + "Allow sends across different providers (default: false).", + "tools.message.crossContext.marker.enabled": + "Add a visible origin marker when sending cross-context (default: true).", + "tools.message.crossContext.marker.prefix": + 'Text prefix for cross-context markers (supports "{channel}").', + "tools.message.crossContext.marker.suffix": + 'Text suffix for cross-context markers (supports "{channel}").', + "tools.message.broadcast.enabled": + "Enable broadcast action (default: true).", "tools.web.search.enabled": "Enable the web_search tool (requires Brave API key).", "tools.web.search.provider": 'Search provider (only "brave" supported today).', "tools.web.search.apiKey": "Brave Search API key (fallback: BRAVE_API_KEY env var).", diff --git a/src/config/types.tools.ts b/src/config/types.tools.ts index 7f5407300..5c87a3f89 100644 --- a/src/config/types.tools.ts +++ b/src/config/types.tools.ts @@ -134,6 +134,33 @@ export type ToolsConfig = { timeoutSeconds?: number; }; }; + /** Message tool configuration. */ + message?: { + /** + * @deprecated Use tools.message.crossContext settings. + * Allows cross-context sends across providers. + */ + allowCrossContextSend?: boolean; + crossContext?: { + /** Allow sends to other channels within the same provider (default: true). */ + allowWithinProvider?: boolean; + /** Allow sends across different providers (default: false). */ + allowAcrossProviders?: boolean; + /** Cross-context marker configuration. */ + marker?: { + /** Enable origin markers for cross-context sends (default: true). */ + enabled?: boolean; + /** Text prefix template, supports {channel}. */ + prefix?: string; + /** Text suffix template, supports {channel}. */ + suffix?: string; + }; + }; + broadcast?: { + /** Enable broadcast action (default: true). */ + enabled?: boolean; + }; + }; agentToAgent?: { /** Enable agent-to-agent messaging tools. Default: false. */ enabled?: boolean; diff --git a/src/config/zod-schema.agent-runtime.ts b/src/config/zod-schema.agent-runtime.ts index a89f9d378..5946b1b06 100644 --- a/src/config/zod-schema.agent-runtime.ts +++ b/src/config/zod-schema.agent-runtime.ts @@ -288,6 +288,29 @@ export const ToolsSchema = z transcription: ToolsAudioTranscriptionSchema, }) .optional(), + message: z + .object({ + allowCrossContextSend: z.boolean().optional(), + crossContext: z + .object({ + allowWithinProvider: z.boolean().optional(), + allowAcrossProviders: z.boolean().optional(), + marker: z + .object({ + enabled: z.boolean().optional(), + prefix: z.string().optional(), + suffix: z.string().optional(), + }) + .optional(), + }) + .optional(), + broadcast: z + .object({ + enabled: z.boolean().optional(), + }) + .optional(), + }) + .optional(), agentToAgent: z .object({ enabled: z.boolean().optional(), diff --git a/src/discord/send.outbound.ts b/src/discord/send.outbound.ts index 5d9a16888..51d5742e0 100644 --- a/src/discord/send.outbound.ts +++ b/src/discord/send.outbound.ts @@ -25,6 +25,7 @@ type DiscordSendOpts = { rest?: RequestClient; replyTo?: string; retry?: RetryConfig; + embeds?: unknown[]; }; export async function sendMessageDiscord( @@ -51,6 +52,7 @@ export async function sendMessageDiscord( opts.replyTo, request, accountInfo.config.maxLinesPerMessage, + opts.embeds, ); } else { result = await sendDiscordText( @@ -60,6 +62,7 @@ export async function sendMessageDiscord( opts.replyTo, request, accountInfo.config.maxLinesPerMessage, + opts.embeds, ); } } catch (err) { diff --git a/src/discord/send.shared.ts b/src/discord/send.shared.ts index 6b8bf01e5..8d67e23c7 100644 --- a/src/discord/send.shared.ts +++ b/src/discord/send.shared.ts @@ -252,6 +252,7 @@ async function sendDiscordText( replyTo: string | undefined, request: DiscordRequest, maxLinesPerMessage?: number, + embeds?: unknown[], ) { if (!text.trim()) { throw new Error("Message must be non-empty for Discord sends"); @@ -265,7 +266,11 @@ async function sendDiscordText( const res = (await request( () => rest.post(Routes.channelMessages(channelId), { - body: { content: chunks[0], message_reference: messageReference }, + body: { + content: chunks[0], + message_reference: messageReference, + ...(embeds?.length ? { embeds } : {}), + }, }) as Promise<{ id: string; channel_id: string }>, "text", )) as { id: string; channel_id: string }; @@ -280,6 +285,7 @@ async function sendDiscordText( body: { content: chunk, message_reference: isFirst ? messageReference : undefined, + ...(isFirst && embeds?.length ? { embeds } : {}), }, }) as Promise<{ id: string; channel_id: string }>, "text", @@ -300,6 +306,7 @@ async function sendDiscordMedia( replyTo: string | undefined, request: DiscordRequest, maxLinesPerMessage?: number, + embeds?: unknown[], ) { const media = await loadWebMedia(mediaUrl); const chunks = text @@ -316,6 +323,7 @@ async function sendDiscordMedia( body: { content: caption || undefined, message_reference: messageReference, + ...(embeds?.length ? { embeds } : {}), files: [ { data: media.buffer, diff --git a/src/infra/outbound/message-action-runner.test.ts b/src/infra/outbound/message-action-runner.test.ts index 4723b34aa..cee3f9404 100644 --- a/src/infra/outbound/message-action-runner.test.ts +++ b/src/infra/outbound/message-action-runner.test.ts @@ -27,10 +27,10 @@ describe("runMessageAction context isolation", () => { action: "send", params: { channel: "slack", - to: "#C123", + to: "#C12345678", message: "hi", }, - toolContext: { currentChannelId: "C123" }, + toolContext: { currentChannelId: "C12345678" }, dryRun: true, }); @@ -43,10 +43,10 @@ describe("runMessageAction context isolation", () => { action: "send", params: { channel: "slack", - to: "#C123", + to: "#C12345678", media: "https://example.com/note.ogg", }, - toolContext: { currentChannelId: "C123" }, + toolContext: { currentChannelId: "C12345678" }, dryRun: true, }); @@ -60,44 +60,44 @@ describe("runMessageAction context isolation", () => { action: "send", params: { channel: "slack", - to: "#C123", + to: "#C12345678", }, - toolContext: { currentChannelId: "C123" }, + toolContext: { currentChannelId: "C12345678" }, dryRun: true, }), ).rejects.toThrow(/message required/i); }); it("blocks send when target differs from current channel", async () => { - await expect( - runMessageAction({ - cfg: slackConfig, - action: "send", - params: { - channel: "slack", - to: "channel:C999", - message: "hi", - }, - toolContext: { currentChannelId: "C123" }, - dryRun: true, - }), - ).rejects.toThrow(/Cross-context messaging denied/); + const result = await runMessageAction({ + cfg: slackConfig, + action: "send", + params: { + channel: "slack", + to: "channel:C99999999", + message: "hi", + }, + toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" }, + dryRun: true, + }); + + expect(result.kind).toBe("send"); }); it("blocks thread-reply when channelId differs from current channel", async () => { - await expect( - runMessageAction({ - cfg: slackConfig, - action: "thread-reply", - params: { - channel: "slack", - channelId: "C999", - message: "hi", - }, - toolContext: { currentChannelId: "C123" }, - dryRun: true, - }), - ).rejects.toThrow(/Cross-context messaging denied/); + const result = await runMessageAction({ + cfg: slackConfig, + action: "thread-reply", + params: { + channel: "slack", + channelId: "C99999999", + message: "hi", + }, + toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" }, + dryRun: true, + }); + + expect(result.kind).toBe("action"); }); it("allows WhatsApp send when target matches current chat", async () => { @@ -117,19 +117,19 @@ describe("runMessageAction context isolation", () => { }); it("blocks WhatsApp send when target differs from current chat", async () => { - await expect( - runMessageAction({ - cfg: whatsappConfig, - action: "send", - params: { - channel: "whatsapp", - to: "456@g.us", - message: "hi", - }, - toolContext: { currentChannelId: "123@g.us" }, - dryRun: true, - }), - ).rejects.toThrow(/Cross-context messaging denied/); + const result = await runMessageAction({ + cfg: whatsappConfig, + action: "send", + params: { + channel: "whatsapp", + to: "456@g.us", + message: "hi", + }, + toolContext: { currentChannelId: "123@g.us", currentChannelProvider: "whatsapp" }, + dryRun: true, + }); + + expect(result.kind).toBe("send"); }); it("allows iMessage send when target matches current handle", async () => { @@ -149,16 +149,59 @@ describe("runMessageAction context isolation", () => { }); it("blocks iMessage send when target differs from current handle", async () => { + const result = await runMessageAction({ + cfg: whatsappConfig, + action: "send", + params: { + channel: "imessage", + to: "imessage:+15551230000", + message: "hi", + }, + toolContext: { currentChannelId: "imessage:+15551234567", currentChannelProvider: "imessage" }, + dryRun: true, + }); + + expect(result.kind).toBe("send"); + }); + + it("blocks cross-provider sends by default", async () => { await expect( runMessageAction({ - cfg: whatsappConfig, + cfg: slackConfig, action: "send", params: { - channel: "imessage", - to: "imessage:+15551230000", + channel: "telegram", + to: "telegram:@ops", message: "hi", }, - toolContext: { currentChannelId: "imessage:+15551234567" }, + toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" }, + dryRun: true, + }), + ).rejects.toThrow(/Cross-context messaging denied/); + }); + + it("blocks same-provider cross-context when disabled", async () => { + const cfg = { + ...slackConfig, + tools: { + message: { + crossContext: { + allowWithinProvider: false, + }, + }, + }, + } as ClawdbotConfig; + + await expect( + runMessageAction({ + cfg, + action: "send", + params: { + channel: "slack", + to: "channel:C99999999", + message: "hi", + }, + toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" }, dryRun: true, }), ).rejects.toThrow(/Cross-context messaging denied/); diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 7c7ee23fb..93f375db1 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -14,10 +14,11 @@ import type { } from "../../channels/plugins/types.js"; import type { ClawdbotConfig } from "../../config/config.js"; import type { GatewayClientMode, GatewayClientName } from "../../utils/message-channel.js"; -import { resolveMessageChannelSelection } from "./channel-selection.js"; +import { listConfiguredMessageChannels, resolveMessageChannelSelection } from "./channel-selection.js"; import type { OutboundSendDeps } from "./deliver.js"; import type { MessagePollResult, MessageSendResult } from "./message.js"; import { sendMessage, sendPoll } from "./message.js"; +import { lookupDirectoryDisplay, resolveMessagingTarget } from "./target-resolver.js"; export type MessageActionRunnerGateway = { url?: string; @@ -53,6 +54,22 @@ export type MessageActionRunResult = sendResult?: MessageSendResult; dryRun: boolean; } + | { + kind: "broadcast"; + channel: ChannelId; + action: "broadcast"; + handledBy: "core" | "dry-run"; + payload: { + results: Array<{ + channel: ChannelId; + to: string; + ok: boolean; + error?: string; + result?: MessageSendResult; + }>; + }; + dryRun: boolean; + } | { kind: "poll"; channel: ChannelId; @@ -148,11 +165,30 @@ function enforceContextIsolation(params: { action: ChannelMessageActionName; params: Record; toolContext?: ChannelThreadingToolContext; + cfg: ClawdbotConfig; }): void { const currentTarget = params.toolContext?.currentChannelId?.trim(); if (!currentTarget) return; if (!CONTEXT_GUARDED_ACTIONS.has(params.action)) return; + if (params.cfg.tools?.message?.allowCrossContextSend) return; + + const currentProvider = params.toolContext?.currentChannelProvider; + const allowWithinProvider = params.cfg.tools?.message?.crossContext?.allowWithinProvider !== false; + const allowAcrossProviders = + params.cfg.tools?.message?.crossContext?.allowAcrossProviders === true; + + if (currentProvider && currentProvider !== params.channel) { + if (!allowAcrossProviders) { + throw new Error( + `Cross-context messaging denied: action=${params.action} target provider "${params.channel}" while bound to "${currentProvider}".`, + ); + } + return; + } + + if (allowWithinProvider) return; + const target = resolveContextGuardTarget(params.action, params.params); if (!target) return; @@ -178,6 +214,99 @@ async function resolveChannel(cfg: ClawdbotConfig, params: Record; + accountId?: string | null; +}): Promise { + const toRaw = typeof params.args.to === "string" ? params.args.to.trim() : ""; + if (toRaw) { + const resolved = await resolveMessagingTarget({ + cfg: params.cfg, + channel: params.channel, + input: toRaw, + accountId: params.accountId ?? undefined, + }); + if (resolved.ok) { + params.args.to = resolved.target.to; + } else { + throw resolved.error; + } + } + const channelIdRaw = + typeof params.args.channelId === "string" ? params.args.channelId.trim() : ""; + if (channelIdRaw) { + const resolved = await resolveMessagingTarget({ + cfg: params.cfg, + channel: params.channel, + input: channelIdRaw, + accountId: params.accountId ?? undefined, + preferredKind: "group", + }); + if (resolved.ok) { + if (resolved.target.kind === "user") { + throw new Error(`Channel id "${channelIdRaw}" resolved to a user target.`); + } + params.args.channelId = resolved.target.to.replace(/^(channel|group):/i, ""); + } else { + throw resolved.error; + } + } +} + export async function runMessageAction( input: RunMessageActionParams, ): Promise { @@ -186,15 +315,93 @@ export async function runMessageAction( parseButtonsParam(params); const action = input.action; + if (action === "broadcast") { + const broadcastEnabled = cfg.tools?.message?.broadcast?.enabled !== false; + if (!broadcastEnabled) { + throw new Error("Broadcast is disabled. Set tools.message.broadcast.enabled to true."); + } + const rawTargets = readStringArrayParam(params, "targets", { required: true }) ?? []; + if (rawTargets.length === 0) { + throw new Error("Broadcast requires at least one target in --targets."); + } + const channelHint = readStringParam(params, "channel"); + const configured = await listConfiguredMessageChannels(cfg); + if (configured.length === 0) { + throw new Error("Broadcast requires at least one configured channel."); + } + const targetChannels = + channelHint && channelHint.trim().toLowerCase() !== "all" + ? [await resolveChannel(cfg, { channel: channelHint })] + : configured; + const results: Array<{ + channel: ChannelId; + to: string; + ok: boolean; + error?: string; + result?: MessageSendResult; + }> = []; + for (const targetChannel of targetChannels) { + for (const target of rawTargets) { + try { + const resolved = await resolveMessagingTarget({ + cfg, + channel: targetChannel, + input: target, + }); + if (!resolved.ok) throw resolved.error; + const sendResult = await runMessageAction({ + ...input, + action: "send", + params: { + ...params, + channel: targetChannel, + to: resolved.target.to, + }, + }); + results.push({ + channel: targetChannel, + to: resolved.target.to, + ok: true, + result: sendResult.kind === "send" ? sendResult.sendResult : undefined, + }); + } catch (err) { + results.push({ + channel: targetChannel, + to: target, + ok: false, + error: err instanceof Error ? err.message : String(err), + }); + } + } + } + return { + kind: "broadcast", + channel: (targetChannels[0] ?? "discord") as ChannelId, + action: "broadcast", + handledBy: input.dryRun ? "dry-run" : "core", + payload: { results }, + dryRun: Boolean(input.dryRun), + }; + } + const channel = await resolveChannel(cfg, params); const accountId = readStringParam(params, "accountId") ?? input.defaultAccountId; const dryRun = Boolean(input.dryRun ?? readBooleanParam(params, "dryRun")); + await resolveActionTarget({ + cfg, + channel, + action, + args: params, + accountId, + }); + enforceContextIsolation({ channel, action, params, toolContext: input.toolContext, + cfg, }); const gateway = input.gateway @@ -226,9 +433,29 @@ export async function runMessageAction( params.media = parsed.mediaUrls?.[0] || parsed.mediaUrl || undefined; } + const marker = + shouldApplyCrossContextMarker(action) && input.toolContext + ? await buildCrossContextMarker({ + cfg, + channel, + target: to, + toolContext: input.toolContext, + accountId: accountId ?? undefined, + }) + : null; + const useTextMarker = !(channel === "discord" && marker?.discordEmbeds?.length); + if (useTextMarker && (marker?.prefix || marker?.suffix)) { + const base = params.message ?? ""; + params.message = `${marker?.prefix ?? ""}${base}${marker?.suffix ?? ""}`; + message = params.message; + } + const mediaUrl = readStringParam(params, "media", { trim: false }); const gifPlayback = readBooleanParam(params, "gifPlayback") ?? false; const bestEffort = readBooleanParam(params, "bestEffort"); + if (marker?.discordEmbeds && channel === "discord") { + params.embeds = marker.discordEmbeds; + } if (!dryRun) { const handled = await dispatchChannelMessageAction({ @@ -302,6 +529,23 @@ export async function runMessageAction( integer: true, }); const maxSelections = allowMultiselect ? Math.max(2, options.length) : 1; + const marker = + shouldApplyCrossContextMarker(action) && input.toolContext + ? await buildCrossContextMarker({ + cfg, + channel, + target: to, + toolContext: input.toolContext, + accountId: accountId ?? undefined, + }) + : null; + if (marker?.prefix || marker?.suffix) { + const base = typeof params.message === "string" ? params.message : ""; + params.message = `${marker?.prefix ?? ""}${base}${marker?.suffix ?? ""}`; + } + if (marker?.discordEmbeds && channel === "discord") { + params.embeds = marker.discordEmbeds; + } if (!dryRun) { const handled = await dispatchChannelMessageAction({ diff --git a/src/infra/outbound/target-resolver.ts b/src/infra/outbound/target-resolver.ts new file mode 100644 index 000000000..d77895355 --- /dev/null +++ b/src/infra/outbound/target-resolver.ts @@ -0,0 +1,312 @@ +import { normalizeTargetForProvider } from "../../agents/pi-embedded-messaging.js"; +import { getChannelPlugin } from "../../channels/plugins/index.js"; +import type { + ChannelDirectoryEntry, + ChannelDirectoryEntryKind, + ChannelId, +} from "../../channels/plugins/types.js"; +import type { ClawdbotConfig } from "../../config/config.js"; +import { defaultRuntime, type RuntimeEnv } from "../../runtime.js"; + +export type TargetResolveKind = ChannelDirectoryEntryKind | "channel"; + +export type ResolvedMessagingTarget = { + to: string; + kind: TargetResolveKind; + display?: string; + source: "normalized" | "directory"; +}; + +export type ResolveMessagingTargetResult = + | { ok: true; target: ResolvedMessagingTarget } + | { ok: false; error: Error; candidates?: ChannelDirectoryEntry[] }; + +type DirectoryCacheEntry = { + entries: ChannelDirectoryEntry[]; + fetchedAt: number; +}; + +const CACHE_TTL_MS = 30 * 60 * 1000; +const directoryCache = new Map(); +let lastConfigRef: ClawdbotConfig | null = null; + +function resetCacheIfConfigChanged(cfg: ClawdbotConfig): void { + if (lastConfigRef && lastConfigRef !== cfg) { + directoryCache.clear(); + } + lastConfigRef = cfg; +} + +function buildCacheKey(params: { + channel: ChannelId; + accountId?: string | null; + kind: ChannelDirectoryEntryKind; + source: "cache" | "live"; +}) { + return `${params.channel}:${params.accountId ?? "default"}:${params.kind}:${params.source}`; +} + +function normalizeQuery(value: string): string { + return value.trim().toLowerCase(); +} + +function stripTargetPrefixes(value: string): string { + return value.replace(/^(channel|group|user):/i, "").replace(/^[@#]/, "").trim(); +} + +function preserveTargetCase(channel: ChannelId, raw: string, normalized: string): string { + if (channel !== "slack") return normalized; + const trimmed = raw.trim(); + if (/^channel:/i.test(trimmed) || /^user:/i.test(trimmed)) return trimmed; + if (trimmed.startsWith("#")) return `channel:${trimmed.slice(1).trim()}`; + if (trimmed.startsWith("@")) return `user:${trimmed.slice(1).trim()}`; + return trimmed; +} + +function detectTargetKind(raw: string, preferred?: TargetResolveKind): TargetResolveKind { + if (preferred) return preferred; + const trimmed = raw.trim(); + if (!trimmed) return "group"; + if (trimmed.startsWith("@") || /^<@!?/.test(trimmed) || /^user:/i.test(trimmed)) return "user"; + if (trimmed.startsWith("#") || /^channel:/i.test(trimmed) || /^group:/i.test(trimmed)) { + return "group"; + } + return "group"; +} + +function normalizeDirectoryEntryId(channel: ChannelId, entry: ChannelDirectoryEntry): string { + const normalized = normalizeTargetForProvider(channel, entry.id); + return normalized ?? entry.id.trim(); +} + +function matchesDirectoryEntry(params: { + channel: ChannelId; + entry: ChannelDirectoryEntry; + query: string; +}): boolean { + const query = normalizeQuery(params.query); + if (!query) return false; + const id = stripTargetPrefixes(normalizeDirectoryEntryId(params.channel, params.entry)); + const name = params.entry.name ? stripTargetPrefixes(params.entry.name) : ""; + const handle = params.entry.handle ? stripTargetPrefixes(params.entry.handle) : ""; + const candidates = [id, name, handle].map((value) => normalizeQuery(value)).filter(Boolean); + return candidates.some((value) => value === query || value.includes(query)); +} + +function resolveMatch(params: { + channel: ChannelId; + entries: ChannelDirectoryEntry[]; + query: string; +}) { + const matches = params.entries.filter((entry) => + matchesDirectoryEntry({ channel: params.channel, entry, query: params.query }), + ); + if (matches.length === 0) return { kind: "none" as const }; + if (matches.length === 1) return { kind: "single" as const, entry: matches[0] }; + return { kind: "ambiguous" as const, entries: matches }; +} + +function looksLikeId(channel: ChannelId, normalized: string): boolean { + if (!normalized) return false; + const raw = normalized.trim(); + switch (channel) { + case "discord": { + const candidate = stripTargetPrefixes(raw); + return /^\d{6,}$/.test(candidate); + } + case "slack": { + const candidate = stripTargetPrefixes(raw); + return /^[A-Z0-9]{8,}$/i.test(candidate); + } + case "msteams": { + return /^conversation:/i.test(raw) || /^user:/i.test(raw) || raw.includes("@thread"); + } + case "telegram": { + return /^telegram:/i.test(raw) || raw.startsWith("@"); + } + case "whatsapp": { + const candidate = stripTargetPrefixes(raw); + return ( + /@/i.test(candidate) || + /^\+?\d{3,}$/.test(candidate) || + candidate.toLowerCase().endsWith("@g.us") + ); + } + default: + return Boolean(raw); + } +} + +async function listDirectoryEntries(params: { + cfg: ClawdbotConfig; + channel: ChannelId; + accountId?: string | null; + kind: ChannelDirectoryEntryKind; + runtime?: RuntimeEnv; + query?: string; + source: "cache" | "live"; +}): Promise { + const plugin = getChannelPlugin(params.channel); + const directory = plugin?.directory; + if (!directory) return []; + const runtime = params.runtime ?? defaultRuntime; + const useLive = params.source === "live"; + if (params.kind === "user") { + const fn = useLive ? directory.listPeersLive ?? directory.listPeers : directory.listPeers; + if (!fn) return []; + return await fn({ + cfg: params.cfg, + accountId: params.accountId ?? undefined, + query: params.query ?? undefined, + limit: undefined, + runtime, + }); + } + const fn = useLive ? directory.listGroupsLive ?? directory.listGroups : directory.listGroups; + if (!fn) return []; + return await fn({ + cfg: params.cfg, + accountId: params.accountId ?? undefined, + query: params.query ?? undefined, + limit: undefined, + runtime, + }); +} + +async function getDirectoryEntries(params: { + cfg: ClawdbotConfig; + channel: ChannelId; + accountId?: string | null; + kind: ChannelDirectoryEntryKind; + query?: string; + runtime?: RuntimeEnv; + preferLiveOnMiss?: boolean; +}): Promise { + resetCacheIfConfigChanged(params.cfg); + const cacheKey = buildCacheKey({ + channel: params.channel, + accountId: params.accountId, + kind: params.kind, + source: "cache", + }); + const cached = directoryCache.get(cacheKey); + if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) { + return cached.entries; + } + const entries = await listDirectoryEntries({ + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + kind: params.kind, + query: params.query, + runtime: params.runtime, + source: "cache", + }); + if (entries.length > 0 || !params.preferLiveOnMiss) { + directoryCache.set(cacheKey, { entries, fetchedAt: Date.now() }); + return entries; + } + const liveKey = buildCacheKey({ + channel: params.channel, + accountId: params.accountId, + kind: params.kind, + source: "live", + }); + const liveEntries = await listDirectoryEntries({ + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + kind: params.kind, + query: params.query, + runtime: params.runtime, + source: "live", + }); + directoryCache.set(liveKey, { entries: liveEntries, fetchedAt: Date.now() }); + return liveEntries; +} + +export async function resolveMessagingTarget(params: { + cfg: ClawdbotConfig; + channel: ChannelId; + input: string; + accountId?: string | null; + preferredKind?: TargetResolveKind; + runtime?: RuntimeEnv; +}): Promise { + const raw = params.input.trim(); + if (!raw) { + return { ok: false, error: new Error("Target is required") }; + } + const kind = detectTargetKind(raw, params.preferredKind); + const normalized = normalizeTargetForProvider(params.channel, raw) ?? raw; + if (looksLikeId(params.channel, normalized)) { + const directTarget = preserveTargetCase(params.channel, raw, normalized); + return { + ok: true, + target: { + to: directTarget, + kind, + display: stripTargetPrefixes(raw), + source: "normalized", + }, + }; + } + const query = stripTargetPrefixes(raw); + const entries = await getDirectoryEntries({ + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + kind: kind === "user" ? "user" : "group", + query, + runtime: params.runtime, + preferLiveOnMiss: true, + }); + const match = resolveMatch({ channel: params.channel, entries, query }); + if (match.kind === "single") { + const entry = match.entry; + return { + ok: true, + target: { + to: normalizeDirectoryEntryId(params.channel, entry), + kind, + display: entry.name ?? entry.handle ?? stripTargetPrefixes(entry.id), + source: "directory", + }, + }; + } + if (match.kind === "ambiguous") { + return { + ok: false, + error: new Error( + `Ambiguous target "${raw}". Provide a unique name or an explicit id.`, + ), + candidates: match.entries, + }; + } + return { + ok: false, + error: new Error(`Unknown target "${raw}" for ${params.channel}.`), + }; +} + +export async function lookupDirectoryDisplay(params: { + cfg: ClawdbotConfig; + channel: ChannelId; + targetId: string; + accountId?: string | null; + runtime?: RuntimeEnv; +}): Promise { + const normalized = normalizeTargetForProvider(params.channel, params.targetId) ?? params.targetId; + const candidates = await getDirectoryEntries({ + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + kind: "group", + runtime: params.runtime, + preferLiveOnMiss: false, + }); + const entry = candidates.find( + (candidate) => normalizeDirectoryEntryId(params.channel, candidate) === normalized, + ); + return entry?.name ?? entry?.handle ?? undefined; +}