feat: add cross-context messaging resolver

Co-authored-by: Thinh Dinh <tobalsan@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-01-17 03:17:08 +00:00
parent 1481a3d90f
commit 46015a3dd8
23 changed files with 859 additions and 60 deletions

View File

@@ -27,10 +27,10 @@ describe("runMessageAction context isolation", () => {
action: "send",
params: {
channel: "slack",
to: "#C123",
to: "#C12345678",
message: "hi",
},
toolContext: { currentChannelId: "C123" },
toolContext: { currentChannelId: "C12345678" },
dryRun: true,
});
@@ -43,10 +43,10 @@ describe("runMessageAction context isolation", () => {
action: "send",
params: {
channel: "slack",
to: "#C123",
to: "#C12345678",
media: "https://example.com/note.ogg",
},
toolContext: { currentChannelId: "C123" },
toolContext: { currentChannelId: "C12345678" },
dryRun: true,
});
@@ -60,44 +60,44 @@ describe("runMessageAction context isolation", () => {
action: "send",
params: {
channel: "slack",
to: "#C123",
to: "#C12345678",
},
toolContext: { currentChannelId: "C123" },
toolContext: { currentChannelId: "C12345678" },
dryRun: true,
}),
).rejects.toThrow(/message required/i);
});
it("blocks send when target differs from current channel", async () => {
await expect(
runMessageAction({
cfg: slackConfig,
action: "send",
params: {
channel: "slack",
to: "channel:C999",
message: "hi",
},
toolContext: { currentChannelId: "C123" },
dryRun: true,
}),
).rejects.toThrow(/Cross-context messaging denied/);
const result = await runMessageAction({
cfg: slackConfig,
action: "send",
params: {
channel: "slack",
to: "channel:C99999999",
message: "hi",
},
toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" },
dryRun: true,
});
expect(result.kind).toBe("send");
});
it("blocks thread-reply when channelId differs from current channel", async () => {
await expect(
runMessageAction({
cfg: slackConfig,
action: "thread-reply",
params: {
channel: "slack",
channelId: "C999",
message: "hi",
},
toolContext: { currentChannelId: "C123" },
dryRun: true,
}),
).rejects.toThrow(/Cross-context messaging denied/);
const result = await runMessageAction({
cfg: slackConfig,
action: "thread-reply",
params: {
channel: "slack",
channelId: "C99999999",
message: "hi",
},
toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" },
dryRun: true,
});
expect(result.kind).toBe("action");
});
it("allows WhatsApp send when target matches current chat", async () => {
@@ -117,19 +117,19 @@ describe("runMessageAction context isolation", () => {
});
it("blocks WhatsApp send when target differs from current chat", async () => {
await expect(
runMessageAction({
cfg: whatsappConfig,
action: "send",
params: {
channel: "whatsapp",
to: "456@g.us",
message: "hi",
},
toolContext: { currentChannelId: "123@g.us" },
dryRun: true,
}),
).rejects.toThrow(/Cross-context messaging denied/);
const result = await runMessageAction({
cfg: whatsappConfig,
action: "send",
params: {
channel: "whatsapp",
to: "456@g.us",
message: "hi",
},
toolContext: { currentChannelId: "123@g.us", currentChannelProvider: "whatsapp" },
dryRun: true,
});
expect(result.kind).toBe("send");
});
it("allows iMessage send when target matches current handle", async () => {
@@ -149,16 +149,59 @@ describe("runMessageAction context isolation", () => {
});
it("blocks iMessage send when target differs from current handle", async () => {
const result = await runMessageAction({
cfg: whatsappConfig,
action: "send",
params: {
channel: "imessage",
to: "imessage:+15551230000",
message: "hi",
},
toolContext: { currentChannelId: "imessage:+15551234567", currentChannelProvider: "imessage" },
dryRun: true,
});
expect(result.kind).toBe("send");
});
it("blocks cross-provider sends by default", async () => {
await expect(
runMessageAction({
cfg: whatsappConfig,
cfg: slackConfig,
action: "send",
params: {
channel: "imessage",
to: "imessage:+15551230000",
channel: "telegram",
to: "telegram:@ops",
message: "hi",
},
toolContext: { currentChannelId: "imessage:+15551234567" },
toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" },
dryRun: true,
}),
).rejects.toThrow(/Cross-context messaging denied/);
});
it("blocks same-provider cross-context when disabled", async () => {
const cfg = {
...slackConfig,
tools: {
message: {
crossContext: {
allowWithinProvider: false,
},
},
},
} as ClawdbotConfig;
await expect(
runMessageAction({
cfg,
action: "send",
params: {
channel: "slack",
to: "channel:C99999999",
message: "hi",
},
toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" },
dryRun: true,
}),
).rejects.toThrow(/Cross-context messaging denied/);

View File

@@ -14,10 +14,11 @@ import type {
} from "../../channels/plugins/types.js";
import type { ClawdbotConfig } from "../../config/config.js";
import type { GatewayClientMode, GatewayClientName } from "../../utils/message-channel.js";
import { resolveMessageChannelSelection } from "./channel-selection.js";
import { listConfiguredMessageChannels, resolveMessageChannelSelection } from "./channel-selection.js";
import type { OutboundSendDeps } from "./deliver.js";
import type { MessagePollResult, MessageSendResult } from "./message.js";
import { sendMessage, sendPoll } from "./message.js";
import { lookupDirectoryDisplay, resolveMessagingTarget } from "./target-resolver.js";
export type MessageActionRunnerGateway = {
url?: string;
@@ -53,6 +54,22 @@ export type MessageActionRunResult =
sendResult?: MessageSendResult;
dryRun: boolean;
}
| {
kind: "broadcast";
channel: ChannelId;
action: "broadcast";
handledBy: "core" | "dry-run";
payload: {
results: Array<{
channel: ChannelId;
to: string;
ok: boolean;
error?: string;
result?: MessageSendResult;
}>;
};
dryRun: boolean;
}
| {
kind: "poll";
channel: ChannelId;
@@ -148,11 +165,30 @@ function enforceContextIsolation(params: {
action: ChannelMessageActionName;
params: Record<string, unknown>;
toolContext?: ChannelThreadingToolContext;
cfg: ClawdbotConfig;
}): void {
const currentTarget = params.toolContext?.currentChannelId?.trim();
if (!currentTarget) return;
if (!CONTEXT_GUARDED_ACTIONS.has(params.action)) return;
if (params.cfg.tools?.message?.allowCrossContextSend) return;
const currentProvider = params.toolContext?.currentChannelProvider;
const allowWithinProvider = params.cfg.tools?.message?.crossContext?.allowWithinProvider !== false;
const allowAcrossProviders =
params.cfg.tools?.message?.crossContext?.allowAcrossProviders === true;
if (currentProvider && currentProvider !== params.channel) {
if (!allowAcrossProviders) {
throw new Error(
`Cross-context messaging denied: action=${params.action} target provider "${params.channel}" while bound to "${currentProvider}".`,
);
}
return;
}
if (allowWithinProvider) return;
const target = resolveContextGuardTarget(params.action, params.params);
if (!target) return;
@@ -178,6 +214,99 @@ async function resolveChannel(cfg: ClawdbotConfig, params: Record<string, unknow
return selection.channel;
}
function shouldApplyCrossContextMarker(action: ChannelMessageActionName): boolean {
return action === "send" || action === "poll" || action === "thread-reply" || action === "sticker";
}
async function buildCrossContextMarker(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
target: string;
toolContext?: ChannelThreadingToolContext;
accountId?: string | null;
}) {
const currentTarget = params.toolContext?.currentChannelId?.trim();
if (!currentTarget) return null;
const normalizedTarget =
normalizeTargetForProvider(params.channel, params.target) ?? params.target.toLowerCase();
const normalizedCurrent =
normalizeTargetForProvider(params.channel, currentTarget) ?? currentTarget.toLowerCase();
if (!normalizedTarget || !normalizedCurrent) return null;
if (normalizedTarget === normalizedCurrent) return null;
const markerEnabled = params.cfg.tools?.message?.crossContext?.marker?.enabled !== false;
if (!markerEnabled) return null;
const currentName =
(await lookupDirectoryDisplay({
cfg: params.cfg,
channel: params.channel,
targetId: currentTarget,
accountId: params.accountId ?? undefined,
})) ?? currentTarget;
const originLabel = currentName.startsWith("#") ? currentName : `#${currentName}`;
const markerConfig = params.cfg.tools?.message?.crossContext?.marker;
const prefixTemplate = markerConfig?.prefix ?? "[from {channel}] ";
const suffixTemplate = markerConfig?.suffix ?? "";
const prefix = prefixTemplate.replaceAll("{channel}", originLabel);
const suffix = suffixTemplate.replaceAll("{channel}", originLabel);
const discordEmbeds =
params.channel === "discord"
? [
{
description: `From ${originLabel}`,
},
]
: undefined;
return {
prefix,
suffix,
discordEmbeds,
};
}
async function resolveActionTarget(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
action: ChannelMessageActionName;
args: Record<string, unknown>;
accountId?: string | null;
}): Promise<void> {
const toRaw = typeof params.args.to === "string" ? params.args.to.trim() : "";
if (toRaw) {
const resolved = await resolveMessagingTarget({
cfg: params.cfg,
channel: params.channel,
input: toRaw,
accountId: params.accountId ?? undefined,
});
if (resolved.ok) {
params.args.to = resolved.target.to;
} else {
throw resolved.error;
}
}
const channelIdRaw =
typeof params.args.channelId === "string" ? params.args.channelId.trim() : "";
if (channelIdRaw) {
const resolved = await resolveMessagingTarget({
cfg: params.cfg,
channel: params.channel,
input: channelIdRaw,
accountId: params.accountId ?? undefined,
preferredKind: "group",
});
if (resolved.ok) {
if (resolved.target.kind === "user") {
throw new Error(`Channel id "${channelIdRaw}" resolved to a user target.`);
}
params.args.channelId = resolved.target.to.replace(/^(channel|group):/i, "");
} else {
throw resolved.error;
}
}
}
export async function runMessageAction(
input: RunMessageActionParams,
): Promise<MessageActionRunResult> {
@@ -186,15 +315,93 @@ export async function runMessageAction(
parseButtonsParam(params);
const action = input.action;
if (action === "broadcast") {
const broadcastEnabled = cfg.tools?.message?.broadcast?.enabled !== false;
if (!broadcastEnabled) {
throw new Error("Broadcast is disabled. Set tools.message.broadcast.enabled to true.");
}
const rawTargets = readStringArrayParam(params, "targets", { required: true }) ?? [];
if (rawTargets.length === 0) {
throw new Error("Broadcast requires at least one target in --targets.");
}
const channelHint = readStringParam(params, "channel");
const configured = await listConfiguredMessageChannels(cfg);
if (configured.length === 0) {
throw new Error("Broadcast requires at least one configured channel.");
}
const targetChannels =
channelHint && channelHint.trim().toLowerCase() !== "all"
? [await resolveChannel(cfg, { channel: channelHint })]
: configured;
const results: Array<{
channel: ChannelId;
to: string;
ok: boolean;
error?: string;
result?: MessageSendResult;
}> = [];
for (const targetChannel of targetChannels) {
for (const target of rawTargets) {
try {
const resolved = await resolveMessagingTarget({
cfg,
channel: targetChannel,
input: target,
});
if (!resolved.ok) throw resolved.error;
const sendResult = await runMessageAction({
...input,
action: "send",
params: {
...params,
channel: targetChannel,
to: resolved.target.to,
},
});
results.push({
channel: targetChannel,
to: resolved.target.to,
ok: true,
result: sendResult.kind === "send" ? sendResult.sendResult : undefined,
});
} catch (err) {
results.push({
channel: targetChannel,
to: target,
ok: false,
error: err instanceof Error ? err.message : String(err),
});
}
}
}
return {
kind: "broadcast",
channel: (targetChannels[0] ?? "discord") as ChannelId,
action: "broadcast",
handledBy: input.dryRun ? "dry-run" : "core",
payload: { results },
dryRun: Boolean(input.dryRun),
};
}
const channel = await resolveChannel(cfg, params);
const accountId = readStringParam(params, "accountId") ?? input.defaultAccountId;
const dryRun = Boolean(input.dryRun ?? readBooleanParam(params, "dryRun"));
await resolveActionTarget({
cfg,
channel,
action,
args: params,
accountId,
});
enforceContextIsolation({
channel,
action,
params,
toolContext: input.toolContext,
cfg,
});
const gateway = input.gateway
@@ -226,9 +433,29 @@ export async function runMessageAction(
params.media = parsed.mediaUrls?.[0] || parsed.mediaUrl || undefined;
}
const marker =
shouldApplyCrossContextMarker(action) && input.toolContext
? await buildCrossContextMarker({
cfg,
channel,
target: to,
toolContext: input.toolContext,
accountId: accountId ?? undefined,
})
: null;
const useTextMarker = !(channel === "discord" && marker?.discordEmbeds?.length);
if (useTextMarker && (marker?.prefix || marker?.suffix)) {
const base = params.message ?? "";
params.message = `${marker?.prefix ?? ""}${base}${marker?.suffix ?? ""}`;
message = params.message;
}
const mediaUrl = readStringParam(params, "media", { trim: false });
const gifPlayback = readBooleanParam(params, "gifPlayback") ?? false;
const bestEffort = readBooleanParam(params, "bestEffort");
if (marker?.discordEmbeds && channel === "discord") {
params.embeds = marker.discordEmbeds;
}
if (!dryRun) {
const handled = await dispatchChannelMessageAction({
@@ -302,6 +529,23 @@ export async function runMessageAction(
integer: true,
});
const maxSelections = allowMultiselect ? Math.max(2, options.length) : 1;
const marker =
shouldApplyCrossContextMarker(action) && input.toolContext
? await buildCrossContextMarker({
cfg,
channel,
target: to,
toolContext: input.toolContext,
accountId: accountId ?? undefined,
})
: null;
if (marker?.prefix || marker?.suffix) {
const base = typeof params.message === "string" ? params.message : "";
params.message = `${marker?.prefix ?? ""}${base}${marker?.suffix ?? ""}`;
}
if (marker?.discordEmbeds && channel === "discord") {
params.embeds = marker.discordEmbeds;
}
if (!dryRun) {
const handled = await dispatchChannelMessageAction({

View File

@@ -0,0 +1,312 @@
import { normalizeTargetForProvider } from "../../agents/pi-embedded-messaging.js";
import { getChannelPlugin } from "../../channels/plugins/index.js";
import type {
ChannelDirectoryEntry,
ChannelDirectoryEntryKind,
ChannelId,
} from "../../channels/plugins/types.js";
import type { ClawdbotConfig } from "../../config/config.js";
import { defaultRuntime, type RuntimeEnv } from "../../runtime.js";
export type TargetResolveKind = ChannelDirectoryEntryKind | "channel";
export type ResolvedMessagingTarget = {
to: string;
kind: TargetResolveKind;
display?: string;
source: "normalized" | "directory";
};
export type ResolveMessagingTargetResult =
| { ok: true; target: ResolvedMessagingTarget }
| { ok: false; error: Error; candidates?: ChannelDirectoryEntry[] };
type DirectoryCacheEntry = {
entries: ChannelDirectoryEntry[];
fetchedAt: number;
};
const CACHE_TTL_MS = 30 * 60 * 1000;
const directoryCache = new Map<string, DirectoryCacheEntry>();
let lastConfigRef: ClawdbotConfig | null = null;
function resetCacheIfConfigChanged(cfg: ClawdbotConfig): void {
if (lastConfigRef && lastConfigRef !== cfg) {
directoryCache.clear();
}
lastConfigRef = cfg;
}
function buildCacheKey(params: {
channel: ChannelId;
accountId?: string | null;
kind: ChannelDirectoryEntryKind;
source: "cache" | "live";
}) {
return `${params.channel}:${params.accountId ?? "default"}:${params.kind}:${params.source}`;
}
function normalizeQuery(value: string): string {
return value.trim().toLowerCase();
}
function stripTargetPrefixes(value: string): string {
return value.replace(/^(channel|group|user):/i, "").replace(/^[@#]/, "").trim();
}
function preserveTargetCase(channel: ChannelId, raw: string, normalized: string): string {
if (channel !== "slack") return normalized;
const trimmed = raw.trim();
if (/^channel:/i.test(trimmed) || /^user:/i.test(trimmed)) return trimmed;
if (trimmed.startsWith("#")) return `channel:${trimmed.slice(1).trim()}`;
if (trimmed.startsWith("@")) return `user:${trimmed.slice(1).trim()}`;
return trimmed;
}
function detectTargetKind(raw: string, preferred?: TargetResolveKind): TargetResolveKind {
if (preferred) return preferred;
const trimmed = raw.trim();
if (!trimmed) return "group";
if (trimmed.startsWith("@") || /^<@!?/.test(trimmed) || /^user:/i.test(trimmed)) return "user";
if (trimmed.startsWith("#") || /^channel:/i.test(trimmed) || /^group:/i.test(trimmed)) {
return "group";
}
return "group";
}
function normalizeDirectoryEntryId(channel: ChannelId, entry: ChannelDirectoryEntry): string {
const normalized = normalizeTargetForProvider(channel, entry.id);
return normalized ?? entry.id.trim();
}
function matchesDirectoryEntry(params: {
channel: ChannelId;
entry: ChannelDirectoryEntry;
query: string;
}): boolean {
const query = normalizeQuery(params.query);
if (!query) return false;
const id = stripTargetPrefixes(normalizeDirectoryEntryId(params.channel, params.entry));
const name = params.entry.name ? stripTargetPrefixes(params.entry.name) : "";
const handle = params.entry.handle ? stripTargetPrefixes(params.entry.handle) : "";
const candidates = [id, name, handle].map((value) => normalizeQuery(value)).filter(Boolean);
return candidates.some((value) => value === query || value.includes(query));
}
function resolveMatch(params: {
channel: ChannelId;
entries: ChannelDirectoryEntry[];
query: string;
}) {
const matches = params.entries.filter((entry) =>
matchesDirectoryEntry({ channel: params.channel, entry, query: params.query }),
);
if (matches.length === 0) return { kind: "none" as const };
if (matches.length === 1) return { kind: "single" as const, entry: matches[0] };
return { kind: "ambiguous" as const, entries: matches };
}
function looksLikeId(channel: ChannelId, normalized: string): boolean {
if (!normalized) return false;
const raw = normalized.trim();
switch (channel) {
case "discord": {
const candidate = stripTargetPrefixes(raw);
return /^\d{6,}$/.test(candidate);
}
case "slack": {
const candidate = stripTargetPrefixes(raw);
return /^[A-Z0-9]{8,}$/i.test(candidate);
}
case "msteams": {
return /^conversation:/i.test(raw) || /^user:/i.test(raw) || raw.includes("@thread");
}
case "telegram": {
return /^telegram:/i.test(raw) || raw.startsWith("@");
}
case "whatsapp": {
const candidate = stripTargetPrefixes(raw);
return (
/@/i.test(candidate) ||
/^\+?\d{3,}$/.test(candidate) ||
candidate.toLowerCase().endsWith("@g.us")
);
}
default:
return Boolean(raw);
}
}
async function listDirectoryEntries(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
accountId?: string | null;
kind: ChannelDirectoryEntryKind;
runtime?: RuntimeEnv;
query?: string;
source: "cache" | "live";
}): Promise<ChannelDirectoryEntry[]> {
const plugin = getChannelPlugin(params.channel);
const directory = plugin?.directory;
if (!directory) return [];
const runtime = params.runtime ?? defaultRuntime;
const useLive = params.source === "live";
if (params.kind === "user") {
const fn = useLive ? directory.listPeersLive ?? directory.listPeers : directory.listPeers;
if (!fn) return [];
return await fn({
cfg: params.cfg,
accountId: params.accountId ?? undefined,
query: params.query ?? undefined,
limit: undefined,
runtime,
});
}
const fn = useLive ? directory.listGroupsLive ?? directory.listGroups : directory.listGroups;
if (!fn) return [];
return await fn({
cfg: params.cfg,
accountId: params.accountId ?? undefined,
query: params.query ?? undefined,
limit: undefined,
runtime,
});
}
async function getDirectoryEntries(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
accountId?: string | null;
kind: ChannelDirectoryEntryKind;
query?: string;
runtime?: RuntimeEnv;
preferLiveOnMiss?: boolean;
}): Promise<ChannelDirectoryEntry[]> {
resetCacheIfConfigChanged(params.cfg);
const cacheKey = buildCacheKey({
channel: params.channel,
accountId: params.accountId,
kind: params.kind,
source: "cache",
});
const cached = directoryCache.get(cacheKey);
if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) {
return cached.entries;
}
const entries = await listDirectoryEntries({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
kind: params.kind,
query: params.query,
runtime: params.runtime,
source: "cache",
});
if (entries.length > 0 || !params.preferLiveOnMiss) {
directoryCache.set(cacheKey, { entries, fetchedAt: Date.now() });
return entries;
}
const liveKey = buildCacheKey({
channel: params.channel,
accountId: params.accountId,
kind: params.kind,
source: "live",
});
const liveEntries = await listDirectoryEntries({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
kind: params.kind,
query: params.query,
runtime: params.runtime,
source: "live",
});
directoryCache.set(liveKey, { entries: liveEntries, fetchedAt: Date.now() });
return liveEntries;
}
export async function resolveMessagingTarget(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
input: string;
accountId?: string | null;
preferredKind?: TargetResolveKind;
runtime?: RuntimeEnv;
}): Promise<ResolveMessagingTargetResult> {
const raw = params.input.trim();
if (!raw) {
return { ok: false, error: new Error("Target is required") };
}
const kind = detectTargetKind(raw, params.preferredKind);
const normalized = normalizeTargetForProvider(params.channel, raw) ?? raw;
if (looksLikeId(params.channel, normalized)) {
const directTarget = preserveTargetCase(params.channel, raw, normalized);
return {
ok: true,
target: {
to: directTarget,
kind,
display: stripTargetPrefixes(raw),
source: "normalized",
},
};
}
const query = stripTargetPrefixes(raw);
const entries = await getDirectoryEntries({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
kind: kind === "user" ? "user" : "group",
query,
runtime: params.runtime,
preferLiveOnMiss: true,
});
const match = resolveMatch({ channel: params.channel, entries, query });
if (match.kind === "single") {
const entry = match.entry;
return {
ok: true,
target: {
to: normalizeDirectoryEntryId(params.channel, entry),
kind,
display: entry.name ?? entry.handle ?? stripTargetPrefixes(entry.id),
source: "directory",
},
};
}
if (match.kind === "ambiguous") {
return {
ok: false,
error: new Error(
`Ambiguous target "${raw}". Provide a unique name or an explicit id.`,
),
candidates: match.entries,
};
}
return {
ok: false,
error: new Error(`Unknown target "${raw}" for ${params.channel}.`),
};
}
export async function lookupDirectoryDisplay(params: {
cfg: ClawdbotConfig;
channel: ChannelId;
targetId: string;
accountId?: string | null;
runtime?: RuntimeEnv;
}): Promise<string | undefined> {
const normalized = normalizeTargetForProvider(params.channel, params.targetId) ?? params.targetId;
const candidates = await getDirectoryEntries({
cfg: params.cfg,
channel: params.channel,
accountId: params.accountId,
kind: "group",
runtime: params.runtime,
preferLiveOnMiss: false,
});
const entry = candidates.find(
(candidate) => normalizeDirectoryEntryId(params.channel, candidate) === normalized,
);
return entry?.name ?? entry?.handle ?? undefined;
}