diff --git a/CHANGELOG.md b/CHANGELOG.md index 4589a4777..f1c1fe0be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - Agents: add optional auth-profile copy prompt on `agents add` and improve auth error messaging. - Security: expand `clawdbot security audit` checks (model hygiene, config includes, plugin allowlists, exposure matrix) and extend `--fix` to tighten more sensitive state paths. - Security: add `SECURITY.md` reporting policy. +- Channels: add Matrix plugin (external) with docs + onboarding hooks. - Plugins: add Zalo channel plugin with gateway HTTP hooks and onboarding install prompt. (#854) — thanks @longmaba. - Onboarding: add a security checkpoint prompt (docs link + sandboxing hint); require `--accept-risk` for `--non-interactive`. - Docs: expand gateway security hardening guidance and incident response checklist. diff --git a/docs/channels/index.md b/docs/channels/index.md index f6b90ba3b..76a697dc3 100644 --- a/docs/channels/index.md +++ b/docs/channels/index.md @@ -18,6 +18,7 @@ Text is supported everywhere; media and reactions vary by channel. - [Signal](/channels/signal) — signal-cli; privacy-focused. - [iMessage](/channels/imessage) — macOS only; native integration. - [Microsoft Teams](/channels/msteams) — Bot Framework; enterprise support. +- [Matrix](/channels/matrix) — Matrix protocol (plugin, installed separately). - [Zalo](/channels/zalo) — Zalo Bot API; Vietnam's popular messenger (plugin, installed separately). - [WebChat](/web/webchat) — Gateway WebChat UI over WebSocket. diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md new file mode 100644 index 000000000..0b7c731c1 --- /dev/null +++ b/docs/channels/matrix.md @@ -0,0 +1,116 @@ +--- +summary: "Matrix support status, capabilities, and configuration" +read_when: + - Working on Matrix channel features +--- +# Matrix (plugin) + +Status: supported via plugin (matrix-js-sdk). Direct messages, rooms, threads, media, reactions, and polls. + +## Plugin required +Matrix ships as a plugin and is not bundled with the core install. +- Install via CLI: `clawdbot plugins install @clawdbot/matrix` +- Or select **Matrix** during onboarding and confirm the install prompt +- Details: [Plugins](/plugin) + +## Quick setup (beginner) +1) Install the Matrix plugin: + - From a source checkout: `clawdbot plugins install ./extensions/matrix` + - From npm (if published): `clawdbot plugins install @clawdbot/matrix` + - Or pick **Matrix** in onboarding and confirm the install prompt +2) Configure credentials: + - Env: `MATRIX_HOMESERVER`, `MATRIX_USER_ID`, `MATRIX_ACCESS_TOKEN` (or `MATRIX_PASSWORD`) + - Or config: `channels.matrix.*` +3) Restart the gateway (or finish onboarding). +4) DM access defaults to pairing; approve the pairing code on first contact. + +Runtime note: Matrix requires Node.js (Bun is not supported). + +Minimal config: +```json5 +{ + channels: { + matrix: { + enabled: true, + homeserver: "https://matrix.example.org", + userId: "@clawdbot:example.org", + accessToken: "syt_***", + dm: { policy: "pairing" } + } + } +} +``` + +## Encryption (E2EE) +End-to-end encrypted rooms are **not** supported. +- Use unencrypted rooms or disable encryption when creating the room. +- If a room is E2EE, the bot will receive encrypted events and won’t reply. + +## What it is +Matrix is an open messaging protocol. Clawdbot connects as a Matrix user and listens to DMs and rooms. +- A Matrix user account owned by the Gateway. +- Deterministic routing: replies go back to Matrix. +- DMs share the agent's main session; rooms map to group sessions. + +## Access control (DMs) +- Default: `channels.matrix.dm.policy = "pairing"`. Unknown senders get a pairing code. +- Approve via: + - `clawdbot pairing list matrix` + - `clawdbot pairing approve matrix ` +- Public DMs: `channels.matrix.dm.policy="open"` plus `channels.matrix.dm.allowFrom=["*"]`. + +## Rooms (groups) +- Default: `channels.matrix.groupPolicy = "allowlist"` (mention-gated). +- Allowlist rooms with `channels.matrix.rooms`: +```json5 +{ + channels: { + matrix: { + rooms: { + "!roomId:example.org": { requireMention: true } + } + } + } +} +``` +- `requireMention: false` enables auto-reply in that room. + +## Threads +- Reply threading is supported. +- `channels.matrix.replyToMode` controls replies when tagged: + - `off` (default), `first`, `all` + +## Capabilities +| Feature | Status | +|---------|--------| +| Direct messages | ✅ Supported | +| Rooms | ✅ Supported | +| Threads | ✅ Supported | +| Media | ✅ Supported | +| Reactions | ✅ Supported | +| Polls | ✅ Supported | +| Native commands | ✅ Supported | + +## Configuration reference (Matrix) +Full configuration: [Configuration](/gateway/configuration) + +Provider options: +- `channels.matrix.enabled`: enable/disable channel startup. +- `channels.matrix.homeserver`: homeserver URL. +- `channels.matrix.userId`: Matrix user ID. +- `channels.matrix.accessToken`: access token. +- `channels.matrix.password`: password for login (token stored). +- `channels.matrix.deviceName`: device display name. +- `channels.matrix.initialSyncLimit`: initial sync limit. +- `channels.matrix.threadReplies`: `off | inbound | always` (default: inbound). +- `channels.matrix.textChunkLimit`: outbound text chunk size (chars). +- `channels.matrix.dm.policy`: `pairing | allowlist | open | disabled` (default: pairing). +- `channels.matrix.dm.allowFrom`: DM allowlist. `open` requires `"*"`. +- `channels.matrix.groupPolicy`: `allowlist | open | disabled` (default: allowlist). +- `channels.matrix.allowlistOnly`: force allowlist rules for DMs + rooms. +- `channels.matrix.rooms`: per-room settings and allowlist. +- `channels.matrix.replyToMode`: reply-to mode for threads/tags. +- `channels.matrix.mediaMaxMb`: inbound/outbound media cap (MB). +- `channels.matrix.autoJoin`: invite handling (`always | allowlist | off`, default: always). +- `channels.matrix.autoJoinAllowlist`: allowed room IDs/aliases for auto-join. +- `channels.matrix.actions`: per-action tool gating (reactions/messages/pins/memberInfo/channelInfo). diff --git a/docs/docs.json b/docs/docs.json index 2716d1a50..ecf4f88d8 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -905,6 +905,7 @@ "channels/signal", "channels/imessage", "channels/msteams", + "channels/matrix", "channels/zalo", "broadcast-groups", "channels/troubleshooting", diff --git a/extensions/matrix/index.ts b/extensions/matrix/index.ts new file mode 100644 index 000000000..91ab6c07c --- /dev/null +++ b/extensions/matrix/index.ts @@ -0,0 +1,14 @@ +import type { ClawdbotPluginApi } from "../../src/plugins/types.js"; + +import { matrixPlugin } from "./src/channel.js"; + +const plugin = { + id: "matrix", + name: "Matrix", + description: "Matrix channel plugin (matrix-js-sdk)", + register(api: ClawdbotPluginApi) { + api.registerChannel({ plugin: matrixPlugin }); + }, +}; + +export default plugin; diff --git a/extensions/matrix/package.json b/extensions/matrix/package.json new file mode 100644 index 000000000..dfb508146 --- /dev/null +++ b/extensions/matrix/package.json @@ -0,0 +1,13 @@ +{ + "name": "@clawdbot/matrix", + "version": "2026.1.14", + "type": "module", + "description": "Clawdbot Matrix channel plugin", + "clawdbot": { + "extensions": ["./index.ts"] + }, + "dependencies": { + "markdown-it": "14.1.0", + "matrix-js-sdk": "40.0.0" + } +} diff --git a/extensions/matrix/src/actions.ts b/extensions/matrix/src/actions.ts new file mode 100644 index 000000000..03066f9cb --- /dev/null +++ b/extensions/matrix/src/actions.ts @@ -0,0 +1,183 @@ +import { createActionGate, readNumberParam, readStringParam } from "../../../src/agents/tools/common.js"; +import { resolveMatrixAccount } from "./matrix/accounts.js"; +import { handleMatrixAction } from "./tool-actions.js"; +import type { CoreConfig } from "./types.js"; +import type { + ChannelMessageActionAdapter, + ChannelMessageActionContext, + ChannelMessageActionName, + ChannelToolSend, +} from "../../../src/channels/plugins/types.js"; + +export const matrixMessageActions: ChannelMessageActionAdapter = { + listActions: ({ cfg }) => { + const account = resolveMatrixAccount({ cfg: cfg as CoreConfig }); + if (!account.enabled || !account.configured) return []; + const gate = createActionGate((cfg as CoreConfig).channels?.matrix?.actions); + const actions = new Set(["send", "poll"]); + if (gate("reactions")) { + actions.add("react"); + actions.add("reactions"); + } + if (gate("messages")) { + actions.add("read"); + actions.add("edit"); + actions.add("delete"); + } + if (gate("pins")) { + actions.add("pin"); + actions.add("unpin"); + actions.add("list-pins"); + } + if (gate("memberInfo")) actions.add("member-info"); + if (gate("channelInfo")) actions.add("channel-info"); + return Array.from(actions); + }, + supportsAction: ({ action }) => action !== "poll", + extractToolSend: ({ args }): ChannelToolSend | null => { + const action = typeof args.action === "string" ? args.action.trim() : ""; + if (action !== "sendMessage") return null; + const to = typeof args.to === "string" ? args.to : undefined; + if (!to) return null; + return { to }; + }, + handleAction: async (ctx: ChannelMessageActionContext) => { + const { action, params, cfg } = ctx; + const resolveRoomId = () => + readStringParam(params, "roomId") ?? + readStringParam(params, "channelId") ?? + readStringParam(params, "to", { required: true }); + + if (action === "send") { + const to = readStringParam(params, "to", { required: true }); + const content = readStringParam(params, "message", { + required: true, + allowEmpty: true, + }); + const mediaUrl = readStringParam(params, "media", { trim: false }); + const replyTo = readStringParam(params, "replyTo"); + const threadId = readStringParam(params, "threadId"); + return await handleMatrixAction( + { + action: "sendMessage", + to, + content, + mediaUrl: mediaUrl ?? undefined, + replyToId: replyTo ?? undefined, + threadId: threadId ?? undefined, + }, + cfg, + ); + } + + if (action === "react") { + const messageId = readStringParam(params, "messageId", { required: true }); + const emoji = readStringParam(params, "emoji", { allowEmpty: true }); + const remove = typeof params.remove === "boolean" ? params.remove : undefined; + return await handleMatrixAction( + { + action: "react", + roomId: resolveRoomId(), + messageId, + emoji, + remove, + }, + cfg, + ); + } + + if (action === "reactions") { + const messageId = readStringParam(params, "messageId", { required: true }); + const limit = readNumberParam(params, "limit", { integer: true }); + return await handleMatrixAction( + { + action: "reactions", + roomId: resolveRoomId(), + messageId, + limit, + }, + cfg, + ); + } + + if (action === "read") { + const limit = readNumberParam(params, "limit", { integer: true }); + return await handleMatrixAction( + { + action: "readMessages", + roomId: resolveRoomId(), + limit, + before: readStringParam(params, "before"), + after: readStringParam(params, "after"), + }, + cfg, + ); + } + + if (action === "edit") { + const messageId = readStringParam(params, "messageId", { required: true }); + const content = readStringParam(params, "message", { required: true }); + return await handleMatrixAction( + { + action: "editMessage", + roomId: resolveRoomId(), + messageId, + content, + }, + cfg, + ); + } + + if (action === "delete") { + const messageId = readStringParam(params, "messageId", { required: true }); + return await handleMatrixAction( + { + action: "deleteMessage", + roomId: resolveRoomId(), + messageId, + }, + cfg, + ); + } + + if (action === "pin" || action === "unpin" || action === "list-pins") { + const messageId = + action === "list-pins" + ? undefined + : readStringParam(params, "messageId", { required: true }); + return await handleMatrixAction( + { + action: + action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins", + roomId: resolveRoomId(), + messageId, + }, + cfg, + ); + } + + if (action === "member-info") { + const userId = readStringParam(params, "userId", { required: true }); + return await handleMatrixAction( + { + action: "memberInfo", + userId, + roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"), + }, + cfg, + ); + } + + if (action === "channel-info") { + return await handleMatrixAction( + { + action: "channelInfo", + roomId: resolveRoomId(), + }, + cfg, + ); + } + + throw new Error(`Action ${action} is not supported for provider matrix.`); + }, +}; diff --git a/extensions/matrix/src/channel.ts b/extensions/matrix/src/channel.ts new file mode 100644 index 000000000..d828da88b --- /dev/null +++ b/extensions/matrix/src/channel.ts @@ -0,0 +1,299 @@ +import type { ChannelPlugin } from "../../../src/channels/plugins/types.js"; +import { + deleteAccountFromConfigSection, + setAccountEnabledInConfigSection, +} from "../../../src/channels/plugins/config-helpers.js"; +import { formatPairingApproveHint } from "../../../src/channels/plugins/helpers.js"; +import { PAIRING_APPROVED_MESSAGE } from "../../../src/channels/plugins/pairing-message.js"; +import { applyAccountNameToChannelSection } from "../../../src/channels/plugins/setup-helpers.js"; +import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "../../../src/routing/session-key.js"; + +import { matrixMessageActions } from "./actions.js"; +import { resolveMatrixGroupRequireMention } from "./group-mentions.js"; +import type { CoreConfig } from "./types.js"; +import { + listMatrixAccountIds, + resolveDefaultMatrixAccountId, + resolveMatrixAccount, + type ResolvedMatrixAccount, +} from "./matrix/accounts.js"; +import { resolveMatrixAuth } from "./matrix/client.js"; +import { normalizeAllowListLower } from "./matrix/monitor/allowlist.js"; +import { probeMatrix } from "./matrix/probe.js"; +import { sendMessageMatrix } from "./matrix/send.js"; +import { matrixOnboardingAdapter } from "./onboarding.js"; +import { matrixOutbound } from "./outbound.js"; + +const meta = { + id: "matrix", + label: "Matrix", + selectionLabel: "Matrix (plugin)", + docsPath: "/channels/matrix", + docsLabel: "matrix", + blurb: "open protocol; configure a homeserver + access token.", + order: 70, + quickstartAllowFrom: true, +}; + +function normalizeMatrixMessagingTarget(raw: string): string | undefined { + let normalized = raw.trim(); + if (!normalized) return undefined; + if (normalized.toLowerCase().startsWith("matrix:")) { + normalized = normalized.slice("matrix:".length).trim(); + } + return normalized ? normalized.toLowerCase() : undefined; +} + +function buildMatrixConfigUpdate( + cfg: CoreConfig, + input: { + homeserver?: string; + userId?: string; + accessToken?: string; + password?: string; + deviceName?: string; + initialSyncLimit?: number; + }, +): CoreConfig { + const existing = cfg.channels?.matrix ?? {}; + return { + ...cfg, + channels: { + ...cfg.channels, + matrix: { + ...existing, + enabled: true, + ...(input.homeserver ? { homeserver: input.homeserver } : {}), + ...(input.userId ? { userId: input.userId } : {}), + ...(input.accessToken ? { accessToken: input.accessToken } : {}), + ...(input.password ? { password: input.password } : {}), + ...(input.deviceName ? { deviceName: input.deviceName } : {}), + ...(typeof input.initialSyncLimit === "number" + ? { initialSyncLimit: input.initialSyncLimit } + : {}), + }, + }, + }; +} + +export const matrixPlugin: ChannelPlugin = { + id: "matrix", + meta, + onboarding: matrixOnboardingAdapter, + pairing: { + idLabel: "matrixUserId", + normalizeAllowEntry: (entry) => entry.replace(/^matrix:/i, ""), + notifyApproval: async ({ id }) => { + await sendMessageMatrix(`user:${id}`, PAIRING_APPROVED_MESSAGE); + }, + }, + capabilities: { + chatTypes: ["direct", "group", "thread"], + polls: true, + reactions: true, + threads: true, + media: true, + }, + reload: { configPrefixes: ["channels.matrix"] }, + config: { + listAccountIds: (cfg) => listMatrixAccountIds(cfg as CoreConfig), + resolveAccount: (cfg, accountId) => + resolveMatrixAccount({ cfg: cfg as CoreConfig, accountId }), + defaultAccountId: (cfg) => resolveDefaultMatrixAccountId(cfg as CoreConfig), + setAccountEnabled: ({ cfg, accountId, enabled }) => + setAccountEnabledInConfigSection({ + cfg: cfg as CoreConfig, + sectionKey: "matrix", + accountId, + enabled, + allowTopLevel: true, + }), + deleteAccount: ({ cfg, accountId }) => + deleteAccountFromConfigSection({ + cfg: cfg as CoreConfig, + sectionKey: "matrix", + accountId, + clearBaseFields: [ + "name", + "homeserver", + "userId", + "accessToken", + "password", + "deviceName", + "initialSyncLimit", + ], + }), + isConfigured: (account) => account.configured, + describeAccount: (account) => ({ + accountId: account.accountId, + name: account.name, + enabled: account.enabled, + configured: account.configured, + baseUrl: account.homeserver, + }), + resolveAllowFrom: ({ cfg }) => + ((cfg as CoreConfig).channels?.matrix?.dm?.allowFrom ?? []).map((entry) => String(entry)), + formatAllowFrom: ({ allowFrom }) => normalizeAllowListLower(allowFrom), + }, + security: { + resolveDmPolicy: ({ account }) => ({ + policy: account.config.dm?.policy ?? "pairing", + allowFrom: account.config.dm?.allowFrom ?? [], + policyPath: "channels.matrix.dm.policy", + allowFromPath: "channels.matrix.dm.allowFrom", + approveHint: formatPairingApproveHint("matrix"), + normalizeEntry: (raw) => raw.replace(/^matrix:/i, "").trim().toLowerCase(), + }), + collectWarnings: ({ account }) => { + const groupPolicy = account.config.groupPolicy ?? "allowlist"; + if (groupPolicy !== "open") return []; + return [ + "- Matrix rooms: groupPolicy=\"open\" allows any room to trigger (mention-gated). Set channels.matrix.groupPolicy=\"allowlist\" + channels.matrix.rooms to restrict rooms.", + ]; + }, + }, + groups: { + resolveRequireMention: resolveMatrixGroupRequireMention, + }, + threading: { + resolveReplyToMode: ({ cfg }) => + (cfg as CoreConfig).channels?.matrix?.replyToMode ?? "off", + }, + messaging: { + normalizeTarget: normalizeMatrixMessagingTarget, + }, + actions: matrixMessageActions, + setup: { + resolveAccountId: ({ accountId }) => normalizeAccountId(accountId), + applyAccountName: ({ cfg, accountId, name }) => + applyAccountNameToChannelSection({ + cfg: cfg as CoreConfig, + channelKey: "matrix", + accountId, + name, + }), + validateInput: ({ input }) => { + if (input.useEnv) return null; + if (!input.homeserver?.trim()) return "Matrix requires --homeserver"; + if (!input.userId?.trim()) return "Matrix requires --user-id"; + if (!input.accessToken?.trim() && !input.password?.trim()) { + return "Matrix requires --access-token or --password"; + } + return null; + }, + applyAccountConfig: ({ cfg, input }) => { + const namedConfig = applyAccountNameToChannelSection({ + cfg: cfg as CoreConfig, + channelKey: "matrix", + accountId: DEFAULT_ACCOUNT_ID, + name: input.name, + }); + if (input.useEnv) { + return { + ...namedConfig, + channels: { + ...namedConfig.channels, + matrix: { + ...namedConfig.channels?.matrix, + enabled: true, + }, + }, + } as CoreConfig; + } + return buildMatrixConfigUpdate(namedConfig as CoreConfig, { + homeserver: input.homeserver?.trim(), + userId: input.userId?.trim(), + accessToken: input.accessToken?.trim(), + password: input.password?.trim(), + deviceName: input.deviceName?.trim(), + initialSyncLimit: input.initialSyncLimit, + }); + }, + }, + outbound: matrixOutbound, + status: { + defaultRuntime: { + accountId: DEFAULT_ACCOUNT_ID, + running: false, + lastStartAt: null, + lastStopAt: null, + lastError: null, + }, + collectStatusIssues: (accounts) => + accounts.flatMap((account) => { + const lastError = typeof account.lastError === "string" ? account.lastError.trim() : ""; + if (!lastError) return []; + return [ + { + channel: "matrix", + accountId: account.accountId, + kind: "runtime", + message: `Channel error: ${lastError}`, + }, + ]; + }), + buildChannelSummary: ({ snapshot }) => ({ + configured: snapshot.configured ?? false, + baseUrl: snapshot.baseUrl ?? null, + running: snapshot.running ?? false, + lastStartAt: snapshot.lastStartAt ?? null, + lastStopAt: snapshot.lastStopAt ?? null, + lastError: snapshot.lastError ?? null, + probe: snapshot.probe, + lastProbeAt: snapshot.lastProbeAt ?? null, + }), + probeAccount: async ({ account, timeoutMs, cfg }) => { + try { + const auth = await resolveMatrixAuth({ cfg: cfg as CoreConfig }); + return await probeMatrix({ + homeserver: auth.homeserver, + accessToken: auth.accessToken, + userId: auth.userId, + timeoutMs, + }); + } catch (err) { + return { + ok: false, + error: err instanceof Error ? err.message : String(err), + elapsedMs: 0, + }; + } + }, + buildAccountSnapshot: ({ account, runtime, probe }) => ({ + accountId: account.accountId, + name: account.name, + enabled: account.enabled, + configured: account.configured, + baseUrl: account.homeserver, + running: runtime?.running ?? false, + lastStartAt: runtime?.lastStartAt ?? null, + lastStopAt: runtime?.lastStopAt ?? null, + lastError: runtime?.lastError ?? null, + probe, + lastProbeAt: runtime?.lastProbeAt ?? null, + lastInboundAt: runtime?.lastInboundAt ?? null, + lastOutboundAt: runtime?.lastOutboundAt ?? null, + }), + }, + gateway: { + startAccount: async (ctx) => { + const account = ctx.account; + ctx.setStatus({ + accountId: account.accountId, + baseUrl: account.homeserver, + }); + ctx.log?.info( + `[${account.accountId}] starting provider (${account.homeserver ?? "matrix"})`, + ); + // Lazy import: the monitor pulls the reply pipeline; avoid ESM init cycles. + const { monitorMatrixProvider } = await import("./matrix/index.js"); + return monitorMatrixProvider({ + runtime: ctx.runtime, + abortSignal: ctx.abortSignal, + mediaMaxMb: account.config.mediaMaxMb, + initialSyncLimit: account.config.initialSyncLimit, + replyToMode: account.config.replyToMode, + }); + }, + }, +}; diff --git a/extensions/matrix/src/group-mentions.ts b/extensions/matrix/src/group-mentions.ts new file mode 100644 index 000000000..d548bc6f4 --- /dev/null +++ b/extensions/matrix/src/group-mentions.ts @@ -0,0 +1,34 @@ +import type { ChannelGroupContext } from "../../../src/channels/plugins/types.js"; + +import { resolveMatrixRoomConfig } from "./matrix/monitor/rooms.js"; +import type { CoreConfig } from "./types.js"; + +export function resolveMatrixGroupRequireMention(params: ChannelGroupContext): boolean { + const rawGroupId = params.groupId?.trim() ?? ""; + let roomId = rawGroupId; + const lower = roomId.toLowerCase(); + if (lower.startsWith("matrix:")) { + roomId = roomId.slice("matrix:".length).trim(); + } + if (roomId.toLowerCase().startsWith("channel:")) { + roomId = roomId.slice("channel:".length).trim(); + } + if (roomId.toLowerCase().startsWith("room:")) { + roomId = roomId.slice("room:".length).trim(); + } + const groupRoom = params.groupRoom?.trim() ?? ""; + const aliases = groupRoom ? [groupRoom] : []; + const cfg = params.cfg as CoreConfig; + const resolved = resolveMatrixRoomConfig({ + rooms: cfg.channels?.matrix?.rooms, + roomId, + aliases, + name: groupRoom || undefined, + }).config; + if (resolved) { + if (resolved.autoReply === true) return false; + if (resolved.autoReply === false) return true; + if (typeof resolved.requireMention === "boolean") return resolved.requireMention; + } + return true; +} diff --git a/extensions/matrix/src/matrix/accounts.ts b/extensions/matrix/src/matrix/accounts.ts new file mode 100644 index 000000000..fa94aec6a --- /dev/null +++ b/extensions/matrix/src/matrix/accounts.ts @@ -0,0 +1,61 @@ +import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "../../../../src/routing/session-key.js"; +import type { CoreConfig, MatrixConfig } from "../types.js"; +import { resolveMatrixConfig } from "./client.js"; +import { credentialsMatchConfig, loadMatrixCredentials } from "./credentials.js"; + +export type ResolvedMatrixAccount = { + accountId: string; + enabled: boolean; + name?: string; + configured: boolean; + homeserver?: string; + userId?: string; + config: MatrixConfig; +}; + +export function listMatrixAccountIds(_cfg: CoreConfig): string[] { + return [DEFAULT_ACCOUNT_ID]; +} + +export function resolveDefaultMatrixAccountId(cfg: CoreConfig): string { + const ids = listMatrixAccountIds(cfg); + if (ids.includes(DEFAULT_ACCOUNT_ID)) return DEFAULT_ACCOUNT_ID; + return ids[0] ?? DEFAULT_ACCOUNT_ID; +} + +export function resolveMatrixAccount(params: { + cfg: CoreConfig; + accountId?: string | null; +}): ResolvedMatrixAccount { + const accountId = normalizeAccountId(params.accountId); + const base = (params.cfg.channels?.matrix ?? {}) as MatrixConfig; + const enabled = base.enabled !== false; + const resolved = resolveMatrixConfig(params.cfg, process.env); + const hasCore = Boolean(resolved.homeserver && resolved.userId); + const hasToken = Boolean(resolved.accessToken || resolved.password); + const stored = loadMatrixCredentials(process.env); + const hasStored = + stored && + resolved.homeserver && + resolved.userId && + credentialsMatchConfig(stored, { + homeserver: resolved.homeserver, + userId: resolved.userId, + }); + const configured = hasCore && (hasToken || Boolean(hasStored)); + return { + accountId, + enabled, + name: base.name?.trim() || undefined, + configured, + homeserver: resolved.homeserver || undefined, + userId: resolved.userId || undefined, + config: base, + }; +} + +export function listEnabledMatrixAccounts(cfg: CoreConfig): ResolvedMatrixAccount[] { + return listMatrixAccountIds(cfg) + .map((accountId) => resolveMatrixAccount({ cfg, accountId })) + .filter((account) => account.enabled); +} diff --git a/extensions/matrix/src/matrix/actions.ts b/extensions/matrix/src/matrix/actions.ts new file mode 100644 index 000000000..4b463b5b5 --- /dev/null +++ b/extensions/matrix/src/matrix/actions.ts @@ -0,0 +1,445 @@ +import type { MatrixClient, MatrixEvent } from "matrix-js-sdk"; +import { + Direction, + EventType, + MatrixError, + MsgType, + RelationType, +} from "matrix-js-sdk"; +import type { + ReactionEventContent, + RoomMessageEventContent, +} from "matrix-js-sdk/lib/@types/events.js"; +import type { + RoomPinnedEventsEventContent, + RoomTopicEventContent, +} from "matrix-js-sdk/lib/@types/state_events.js"; + +import { loadConfig } from "../../../../src/config/config.js"; +import type { CoreConfig } from "../types.js"; +import { getActiveMatrixClient } from "./active-client.js"; +import { + createMatrixClient, + isBunRuntime, + resolveMatrixAuth, + resolveSharedMatrixClient, + waitForMatrixSync, +} from "./client.js"; +import { + reactMatrixMessage, + resolveMatrixRoomId, + sendMessageMatrix, +} from "./send.js"; + +export type MatrixActionClientOpts = { + client?: MatrixClient; + timeoutMs?: number; +}; + +export type MatrixMessageSummary = { + eventId?: string; + sender?: string; + body?: string; + msgtype?: string; + timestamp?: number; + relatesTo?: { + relType?: string; + eventId?: string; + key?: string; + }; +}; + +export type MatrixReactionSummary = { + key: string; + count: number; + users: string[]; +}; + +type MatrixActionClient = { + client: MatrixClient; + stopOnDone: boolean; +}; + +function ensureNodeRuntime() { + if (isBunRuntime()) { + throw new Error("Matrix support requires Node (bun runtime not supported)"); + } +} + +async function resolveActionClient(opts: MatrixActionClientOpts = {}): Promise { + ensureNodeRuntime(); + if (opts.client) return { client: opts.client, stopOnDone: false }; + const active = getActiveMatrixClient(); + if (active) return { client: active, stopOnDone: false }; + const shouldShareClient = Boolean(process.env.CLAWDBOT_GATEWAY_PORT); + if (shouldShareClient) { + const client = await resolveSharedMatrixClient({ + cfg: loadConfig() as CoreConfig, + timeoutMs: opts.timeoutMs, + }); + return { client, stopOnDone: false }; + } + const auth = await resolveMatrixAuth({ cfg: loadConfig() as CoreConfig }); + const client = await createMatrixClient({ + homeserver: auth.homeserver, + userId: auth.userId, + accessToken: auth.accessToken, + localTimeoutMs: opts.timeoutMs, + }); + await client.startClient({ + initialSyncLimit: 0, + lazyLoadMembers: true, + threadSupport: true, + }); + await waitForMatrixSync({ client, timeoutMs: opts.timeoutMs }); + return { client, stopOnDone: true }; +} + +function summarizeMatrixEvent(event: MatrixEvent): MatrixMessageSummary { + const content = event.getContent(); + const relates = content["m.relates_to"]; + let relType: string | undefined; + let eventId: string | undefined; + if (relates) { + if ("rel_type" in relates) { + relType = relates.rel_type; + eventId = relates.event_id; + } else if ("m.in_reply_to" in relates) { + eventId = relates["m.in_reply_to"]?.event_id; + } + } + const relatesTo = + relType || eventId + ? { + relType, + eventId, + } + : undefined; + return { + eventId: event.getId() ?? undefined, + sender: event.getSender() ?? undefined, + body: content.body, + msgtype: content.msgtype, + timestamp: event.getTs() ?? undefined, + relatesTo, + }; +} + +async function readPinnedEvents(client: MatrixClient, roomId: string): Promise { + try { + const content = (await client.getStateEvent( + roomId, + EventType.RoomPinnedEvents, + "", + )) as RoomPinnedEventsEventContent; + const pinned = content.pinned; + return pinned.filter((id) => id.trim().length > 0); + } catch (err) { + const httpStatus = err instanceof MatrixError ? err.httpStatus : undefined; + const errcode = err instanceof MatrixError ? err.errcode : undefined; + if (httpStatus === 404 || errcode === "M_NOT_FOUND") { + return []; + } + throw err; + } +} + +async function fetchEventSummary( + client: MatrixClient, + roomId: string, + eventId: string, +): Promise { + const raw = await client.fetchRoomEvent(roomId, eventId); + const mapper = client.getEventMapper(); + const event = mapper(raw); + if (event.isRedacted()) return null; + return summarizeMatrixEvent(event); +} + +export async function sendMatrixMessage( + to: string, + content: string, + opts: MatrixActionClientOpts & { + mediaUrl?: string; + replyToId?: string; + threadId?: string; + } = {}, +) { + return await sendMessageMatrix(to, content, { + mediaUrl: opts.mediaUrl, + replyToId: opts.replyToId, + threadId: opts.threadId, + client: opts.client, + timeoutMs: opts.timeoutMs, + }); +} + +export async function editMatrixMessage( + roomId: string, + messageId: string, + content: string, + opts: MatrixActionClientOpts = {}, +) { + const trimmed = content.trim(); + if (!trimmed) throw new Error("Matrix edit requires content"); + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const newContent = { + msgtype: MsgType.Text, + body: trimmed, + } satisfies RoomMessageEventContent; + const payload: RoomMessageEventContent = { + msgtype: MsgType.Text, + body: `* ${trimmed}`, + "m.new_content": newContent, + "m.relates_to": { + rel_type: RelationType.Replace, + event_id: messageId, + }, + }; + const response = await client.sendMessage(resolvedRoom, payload); + return { eventId: response.event_id ?? null }; + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function deleteMatrixMessage( + roomId: string, + messageId: string, + opts: MatrixActionClientOpts & { reason?: string } = {}, +) { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + await client.redactEvent(resolvedRoom, messageId, undefined, { + reason: opts.reason, + }); + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function readMatrixMessages( + roomId: string, + opts: MatrixActionClientOpts & { + limit?: number; + before?: string; + after?: string; + } = {}, +): Promise<{ + messages: MatrixMessageSummary[]; + nextBatch?: string | null; + prevBatch?: string | null; +}> { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const limit = + typeof opts.limit === "number" && Number.isFinite(opts.limit) + ? Math.max(1, Math.floor(opts.limit)) + : 20; + const token = opts.before?.trim() || opts.after?.trim() || null; + const dir = opts.after ? Direction.Forward : Direction.Backward; + const res = await client.createMessagesRequest(resolvedRoom, token, limit, dir); + const mapper = client.getEventMapper(); + const events = res.chunk.map(mapper); + const messages = events + .filter((event) => event.getType() === EventType.RoomMessage) + .filter((event) => !event.isRedacted()) + .map(summarizeMatrixEvent); + return { + messages, + nextBatch: res.end ?? null, + prevBatch: res.start ?? null, + }; + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function listMatrixReactions( + roomId: string, + messageId: string, + opts: MatrixActionClientOpts & { limit?: number } = {}, +): Promise { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const limit = + typeof opts.limit === "number" && Number.isFinite(opts.limit) + ? Math.max(1, Math.floor(opts.limit)) + : 100; + const res = await client.relations( + resolvedRoom, + messageId, + RelationType.Annotation, + EventType.Reaction, + { dir: Direction.Backward, limit }, + ); + const summaries = new Map(); + for (const event of res.events) { + const content = event.getContent(); + const key = content["m.relates_to"].key; + if (!key) continue; + const sender = event.getSender() ?? ""; + const entry: MatrixReactionSummary = summaries.get(key) ?? { + key, + count: 0, + users: [], + }; + entry.count += 1; + if (sender && !entry.users.includes(sender)) { + entry.users.push(sender); + } + summaries.set(key, entry); + } + return Array.from(summaries.values()); + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function removeMatrixReactions( + roomId: string, + messageId: string, + opts: MatrixActionClientOpts & { emoji?: string } = {}, +): Promise<{ removed: number }> { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const res = await client.relations( + resolvedRoom, + messageId, + RelationType.Annotation, + EventType.Reaction, + { dir: Direction.Backward, limit: 200 }, + ); + const userId = client.getUserId(); + if (!userId) return { removed: 0 }; + const targetEmoji = opts.emoji?.trim(); + const toRemove = res.events + .filter((event) => event.getSender() === userId) + .filter((event) => { + if (!targetEmoji) return true; + const content = event.getContent(); + return content["m.relates_to"].key === targetEmoji; + }) + .map((event) => event.getId()) + .filter((id): id is string => Boolean(id)); + if (toRemove.length === 0) return { removed: 0 }; + await Promise.all(toRemove.map((id) => client.redactEvent(resolvedRoom, id))); + return { removed: toRemove.length }; + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function pinMatrixMessage( + roomId: string, + messageId: string, + opts: MatrixActionClientOpts = {}, +): Promise<{ pinned: string[] }> { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const current = await readPinnedEvents(client, resolvedRoom); + const next = current.includes(messageId) ? current : [...current, messageId]; + const payload: RoomPinnedEventsEventContent = { pinned: next }; + await client.sendStateEvent(resolvedRoom, EventType.RoomPinnedEvents, payload); + return { pinned: next }; + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function unpinMatrixMessage( + roomId: string, + messageId: string, + opts: MatrixActionClientOpts = {}, +): Promise<{ pinned: string[] }> { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const current = await readPinnedEvents(client, resolvedRoom); + const next = current.filter((id) => id !== messageId); + const payload: RoomPinnedEventsEventContent = { pinned: next }; + await client.sendStateEvent(resolvedRoom, EventType.RoomPinnedEvents, payload); + return { pinned: next }; + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function listMatrixPins( + roomId: string, + opts: MatrixActionClientOpts = {}, +): Promise<{ pinned: string[]; events: MatrixMessageSummary[] }> { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const pinned = await readPinnedEvents(client, resolvedRoom); + const events = ( + await Promise.all( + pinned.map(async (eventId) => { + try { + return await fetchEventSummary(client, resolvedRoom, eventId); + } catch { + return null; + } + }), + ) + ).filter((event): event is MatrixMessageSummary => Boolean(event)); + return { pinned, events }; + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function getMatrixMemberInfo( + userId: string, + opts: MatrixActionClientOpts & { roomId?: string } = {}, +) { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const roomId = opts.roomId ? await resolveMatrixRoomId(client, opts.roomId) : undefined; + const profile = await client.getProfileInfo(userId); + const member = roomId ? client.getRoom(roomId)?.getMember(userId) : undefined; + return { + userId, + profile: { + displayName: profile?.displayname ?? null, + avatarUrl: profile?.avatar_url ?? null, + }, + membership: member?.membership ?? null, + powerLevel: member?.powerLevel ?? null, + displayName: member?.name ?? null, + }; + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export async function getMatrixRoomInfo(roomId: string, opts: MatrixActionClientOpts = {}) { + const { client, stopOnDone } = await resolveActionClient(opts); + try { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const room = client.getRoom(resolvedRoom); + const topicEvent = room?.currentState.getStateEvents(EventType.RoomTopic, ""); + const topicContent = topicEvent?.getContent(); + const topic = typeof topicContent?.topic === "string" ? topicContent.topic : undefined; + return { + roomId: resolvedRoom, + name: room?.name ?? null, + topic: topic ?? null, + canonicalAlias: room?.getCanonicalAlias?.() ?? null, + altAliases: room?.getAltAliases?.() ?? [], + memberCount: room?.getJoinedMemberCount?.() ?? null, + }; + } finally { + if (stopOnDone) client.stopClient(); + } +} + +export { reactMatrixMessage }; diff --git a/extensions/matrix/src/matrix/active-client.ts b/extensions/matrix/src/matrix/active-client.ts new file mode 100644 index 000000000..2befe15b6 --- /dev/null +++ b/extensions/matrix/src/matrix/active-client.ts @@ -0,0 +1,11 @@ +import type { MatrixClient } from "matrix-js-sdk"; + +let activeClient: MatrixClient | null = null; + +export function setActiveMatrixClient(client: MatrixClient | null): void { + activeClient = client; +} + +export function getActiveMatrixClient(): MatrixClient | null { + return activeClient; +} diff --git a/extensions/matrix/src/matrix/client.ts b/extensions/matrix/src/matrix/client.ts new file mode 100644 index 000000000..51f1c4631 --- /dev/null +++ b/extensions/matrix/src/matrix/client.ts @@ -0,0 +1,338 @@ +import { ClientEvent, type MatrixClient, SyncState } from "matrix-js-sdk"; + +import { loadConfig } from "../../../../src/config/config.js"; +import type { CoreConfig } from "../types.js"; + +export type MatrixResolvedConfig = { + homeserver: string; + userId: string; + accessToken?: string; + password?: string; + deviceName?: string; + initialSyncLimit?: number; +}; + +export type MatrixAuth = { + homeserver: string; + userId: string; + accessToken: string; + deviceName?: string; + initialSyncLimit?: number; +}; + +type MatrixSdk = typeof import("matrix-js-sdk"); + +type SharedMatrixClientState = { + client: MatrixClient; + key: string; + started: boolean; +}; + +let sharedClientState: SharedMatrixClientState | null = null; +let sharedClientPromise: Promise | null = null; +let sharedClientStartPromise: Promise | null = null; + +export function isBunRuntime(): boolean { + const versions = process.versions as { bun?: string }; + return typeof versions.bun === "string"; +} + +async function loadMatrixSdk(): Promise { + return (await import("matrix-js-sdk")) as MatrixSdk; +} + +function clean(value?: string): string { + return value?.trim() ?? ""; +} + +export function resolveMatrixConfig( + cfg: CoreConfig = loadConfig() as CoreConfig, + env: NodeJS.ProcessEnv = process.env, +): MatrixResolvedConfig { + const matrix = cfg.channels?.matrix ?? {}; + const homeserver = clean(env.MATRIX_HOMESERVER) || clean(matrix.homeserver); + const userId = clean(env.MATRIX_USER_ID) || clean(matrix.userId); + const accessToken = + clean(env.MATRIX_ACCESS_TOKEN) || clean(matrix.accessToken) || undefined; + const password = clean(env.MATRIX_PASSWORD) || clean(matrix.password) || undefined; + const deviceName = + clean(env.MATRIX_DEVICE_NAME) || clean(matrix.deviceName) || undefined; + const initialSyncLimit = + typeof matrix.initialSyncLimit === "number" + ? Math.max(0, Math.floor(matrix.initialSyncLimit)) + : undefined; + return { + homeserver, + userId, + accessToken, + password, + deviceName, + initialSyncLimit, + }; +} + +export async function resolveMatrixAuth(params?: { + cfg?: CoreConfig; + env?: NodeJS.ProcessEnv; +}): Promise { + const cfg = params?.cfg ?? (loadConfig() as CoreConfig); + const env = params?.env ?? process.env; + const resolved = resolveMatrixConfig(cfg, env); + if (!resolved.homeserver) { + throw new Error("Matrix homeserver is required (matrix.homeserver)"); + } + if (!resolved.userId) { + throw new Error("Matrix userId is required (matrix.userId)"); + } + + const { + loadMatrixCredentials, + saveMatrixCredentials, + credentialsMatchConfig, + touchMatrixCredentials, + } = await import("./credentials.js"); + + const cached = loadMatrixCredentials(env); + const cachedCredentials = + cached && + credentialsMatchConfig(cached, { + homeserver: resolved.homeserver, + userId: resolved.userId, + }) + ? cached + : null; + + if (resolved.accessToken) { + if (cachedCredentials && cachedCredentials.accessToken === resolved.accessToken) { + touchMatrixCredentials(env); + } + return { + homeserver: resolved.homeserver, + userId: resolved.userId, + accessToken: resolved.accessToken, + deviceName: resolved.deviceName, + initialSyncLimit: resolved.initialSyncLimit, + }; + } + + if (cachedCredentials) { + touchMatrixCredentials(env); + return { + homeserver: cachedCredentials.homeserver, + userId: cachedCredentials.userId, + accessToken: cachedCredentials.accessToken, + deviceName: resolved.deviceName, + initialSyncLimit: resolved.initialSyncLimit, + }; + } + + if (!resolved.password) { + throw new Error( + "Matrix access token or password is required (matrix.accessToken or matrix.password)", + ); + } + + const sdk = await loadMatrixSdk(); + const loginClient = sdk.createClient({ + baseUrl: resolved.homeserver, + }); + const login = await loginClient.loginRequest({ + type: "m.login.password", + identifier: { type: "m.id.user", user: resolved.userId }, + password: resolved.password, + initial_device_display_name: resolved.deviceName ?? "Clawdbot Gateway", + }); + const accessToken = login.access_token?.trim(); + if (!accessToken) { + throw new Error("Matrix login did not return an access token"); + } + + const auth: MatrixAuth = { + homeserver: resolved.homeserver, + userId: login.user_id ?? resolved.userId, + accessToken, + deviceName: resolved.deviceName, + initialSyncLimit: resolved.initialSyncLimit, + }; + + saveMatrixCredentials({ + homeserver: auth.homeserver, + userId: auth.userId, + accessToken: auth.accessToken, + }); + + return auth; +} + +export async function createMatrixClient(params: { + homeserver: string; + userId: string; + accessToken: string; + localTimeoutMs?: number; +}): Promise { + const sdk = await loadMatrixSdk(); + const store = new sdk.MemoryStore(); + return sdk.createClient({ + baseUrl: params.homeserver, + userId: params.userId, + accessToken: params.accessToken, + localTimeoutMs: params.localTimeoutMs, + store, + }); +} + +function buildSharedClientKey(auth: MatrixAuth): string { + return [auth.homeserver, auth.userId, auth.accessToken].join("|"); +} + +async function createSharedMatrixClient(params: { + auth: MatrixAuth; + timeoutMs?: number; +}): Promise { + const client = await createMatrixClient({ + homeserver: params.auth.homeserver, + userId: params.auth.userId, + accessToken: params.auth.accessToken, + localTimeoutMs: params.timeoutMs, + }); + return { client, key: buildSharedClientKey(params.auth), started: false }; +} + +async function ensureSharedClientStarted(params: { + state: SharedMatrixClientState; + timeoutMs?: number; + initialSyncLimit?: number; +}): Promise { + if (params.state.started) return; + if (sharedClientStartPromise) { + await sharedClientStartPromise; + return; + } + sharedClientStartPromise = (async () => { + const startOpts: Parameters[0] = { + lazyLoadMembers: true, + threadSupport: true, + }; + if (typeof params.initialSyncLimit === "number") { + startOpts.initialSyncLimit = params.initialSyncLimit; + } + await params.state.client.startClient(startOpts); + await waitForMatrixSync({ + client: params.state.client, + timeoutMs: params.timeoutMs, + }); + params.state.started = true; + })(); + try { + await sharedClientStartPromise; + } finally { + sharedClientStartPromise = null; + } +} + +export async function resolveSharedMatrixClient( + params: { + cfg?: CoreConfig; + env?: NodeJS.ProcessEnv; + timeoutMs?: number; + auth?: MatrixAuth; + startClient?: boolean; + } = {}, +): Promise { + const auth = params.auth ?? (await resolveMatrixAuth({ cfg: params.cfg, env: params.env })); + const key = buildSharedClientKey(auth); + const shouldStart = params.startClient !== false; + + if (sharedClientState?.key === key) { + if (shouldStart) { + await ensureSharedClientStarted({ + state: sharedClientState, + timeoutMs: params.timeoutMs, + initialSyncLimit: auth.initialSyncLimit, + }); + } + return sharedClientState.client; + } + + if (sharedClientPromise) { + const pending = await sharedClientPromise; + if (pending.key === key) { + if (shouldStart) { + await ensureSharedClientStarted({ + state: pending, + timeoutMs: params.timeoutMs, + initialSyncLimit: auth.initialSyncLimit, + }); + } + return pending.client; + } + pending.client.stopClient(); + sharedClientState = null; + sharedClientPromise = null; + } + + sharedClientPromise = createSharedMatrixClient({ + auth, + timeoutMs: params.timeoutMs, + }); + try { + const created = await sharedClientPromise; + sharedClientState = created; + if (shouldStart) { + await ensureSharedClientStarted({ + state: created, + timeoutMs: params.timeoutMs, + initialSyncLimit: auth.initialSyncLimit, + }); + } + return created.client; + } finally { + sharedClientPromise = null; + } +} + +export async function waitForMatrixSync(params: { + client: MatrixClient; + timeoutMs?: number; + abortSignal?: AbortSignal; +}): Promise { + const timeoutMs = Math.max(1000, params.timeoutMs ?? 15_000); + if (params.client.getSyncState() === SyncState.Syncing) return; + await new Promise((resolve, reject) => { + let done = false; + let timer: NodeJS.Timeout | undefined; + const cleanup = () => { + if (done) return; + done = true; + params.client.removeListener(ClientEvent.Sync, onSync); + if (params.abortSignal) { + params.abortSignal.removeEventListener("abort", onAbort); + } + if (timer) { + clearTimeout(timer); + timer = undefined; + } + }; + const onSync = (state: SyncState) => { + if (done) return; + if (state === SyncState.Prepared || state === SyncState.Syncing) { + cleanup(); + resolve(); + } + if (state === SyncState.Error) { + cleanup(); + reject(new Error("Matrix sync failed")); + } + }; + const onAbort = () => { + cleanup(); + reject(new Error("Matrix sync aborted")); + }; + params.client.on(ClientEvent.Sync, onSync); + params.abortSignal?.addEventListener("abort", onAbort, { once: true }); + timer = setTimeout(() => { + cleanup(); + reject(new Error("Matrix sync timed out")); + }, timeoutMs); + }); +} diff --git a/extensions/matrix/src/matrix/credentials.ts b/extensions/matrix/src/matrix/credentials.ts new file mode 100644 index 000000000..375fb6aa6 --- /dev/null +++ b/extensions/matrix/src/matrix/credentials.ts @@ -0,0 +1,96 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import { resolveStateDir } from "../../../../src/config/paths.js"; + +export type MatrixStoredCredentials = { + homeserver: string; + userId: string; + accessToken: string; + createdAt: string; + lastUsedAt?: string; +}; + +const CREDENTIALS_FILENAME = "credentials.json"; + +export function resolveMatrixCredentialsDir( + env: NodeJS.ProcessEnv = process.env, + stateDir: string = resolveStateDir(env, os.homedir), +): string { + return path.join(stateDir, "credentials", "matrix"); +} + +export function resolveMatrixCredentialsPath(env: NodeJS.ProcessEnv = process.env): string { + const dir = resolveMatrixCredentialsDir(env); + return path.join(dir, CREDENTIALS_FILENAME); +} + +export function loadMatrixCredentials( + env: NodeJS.ProcessEnv = process.env, +): MatrixStoredCredentials | null { + const credPath = resolveMatrixCredentialsPath(env); + try { + if (!fs.existsSync(credPath)) return null; + const raw = fs.readFileSync(credPath, "utf-8"); + const parsed = JSON.parse(raw) as Partial; + if ( + typeof parsed.homeserver !== "string" || + typeof parsed.userId !== "string" || + typeof parsed.accessToken !== "string" + ) { + return null; + } + return parsed as MatrixStoredCredentials; + } catch { + return null; + } +} + +export function saveMatrixCredentials( + credentials: Omit, + env: NodeJS.ProcessEnv = process.env, +): void { + const dir = resolveMatrixCredentialsDir(env); + fs.mkdirSync(dir, { recursive: true }); + + const credPath = resolveMatrixCredentialsPath(env); + + const existing = loadMatrixCredentials(env); + const now = new Date().toISOString(); + + const toSave: MatrixStoredCredentials = { + ...credentials, + createdAt: existing?.createdAt ?? now, + lastUsedAt: now, + }; + + fs.writeFileSync(credPath, JSON.stringify(toSave, null, 2), "utf-8"); +} + +export function touchMatrixCredentials(env: NodeJS.ProcessEnv = process.env): void { + const existing = loadMatrixCredentials(env); + if (!existing) return; + + existing.lastUsedAt = new Date().toISOString(); + const credPath = resolveMatrixCredentialsPath(env); + fs.writeFileSync(credPath, JSON.stringify(existing, null, 2), "utf-8"); +} + +export function clearMatrixCredentials(env: NodeJS.ProcessEnv = process.env): void { + const credPath = resolveMatrixCredentialsPath(env); + try { + if (fs.existsSync(credPath)) { + fs.unlinkSync(credPath); + } + } catch { + // ignore + } +} + +export function credentialsMatchConfig( + stored: MatrixStoredCredentials, + config: { homeserver: string; userId: string }, +): boolean { + return stored.homeserver === config.homeserver && stored.userId === config.userId; +} diff --git a/extensions/matrix/src/matrix/deps.ts b/extensions/matrix/src/matrix/deps.ts new file mode 100644 index 000000000..86746fb8a --- /dev/null +++ b/extensions/matrix/src/matrix/deps.ts @@ -0,0 +1,57 @@ +import fs from "node:fs"; +import path from "node:path"; +import { createRequire } from "node:module"; +import { fileURLToPath } from "node:url"; + +import { runCommandWithTimeout } from "../../../../src/process/exec.js"; +import type { RuntimeEnv } from "../../../../src/runtime.js"; + +const MATRIX_SDK_PACKAGE = "matrix-js-sdk"; + +export function isMatrixSdkAvailable(): boolean { + try { + const req = createRequire(import.meta.url); + req.resolve(MATRIX_SDK_PACKAGE); + return true; + } catch { + return false; + } +} + +function resolvePluginRoot(): string { + const currentDir = path.dirname(fileURLToPath(import.meta.url)); + return path.resolve(currentDir, "..", ".."); +} + +export async function ensureMatrixSdkInstalled(params: { + runtime: RuntimeEnv; + confirm?: (message: string) => Promise; +}): Promise { + if (isMatrixSdkAvailable()) return; + const confirm = params.confirm; + if (confirm) { + const ok = await confirm("Matrix requires matrix-js-sdk. Install now?"); + if (!ok) { + throw new Error("Matrix requires matrix-js-sdk (install dependencies first)."); + } + } + + const root = resolvePluginRoot(); + const command = fs.existsSync(path.join(root, "pnpm-lock.yaml")) + ? ["pnpm", "install"] + : ["npm", "install", "--omit=dev", "--silent"]; + params.runtime.log?.(`matrix: installing dependencies via ${command[0]} (${root})…`); + const result = await runCommandWithTimeout(command, { + cwd: root, + timeoutMs: 300_000, + env: { COREPACK_ENABLE_DOWNLOAD_PROMPT: "0" }, + }); + if (result.code !== 0) { + throw new Error( + result.stderr.trim() || result.stdout.trim() || "Matrix dependency install failed.", + ); + } + if (!isMatrixSdkAvailable()) { + throw new Error("Matrix dependency install completed but matrix-js-sdk is still missing."); + } +} diff --git a/extensions/matrix/src/matrix/format.test.ts b/extensions/matrix/src/matrix/format.test.ts new file mode 100644 index 000000000..5ae98c97c --- /dev/null +++ b/extensions/matrix/src/matrix/format.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, it } from "vitest"; + +import { markdownToMatrixHtml } from "./format.js"; + +describe("markdownToMatrixHtml", () => { + it("renders basic inline formatting", () => { + const html = markdownToMatrixHtml("hi _there_ **boss** `code`"); + expect(html).toContain("there"); + expect(html).toContain("boss"); + expect(html).toContain("code"); + }); + + it("renders links as HTML", () => { + const html = markdownToMatrixHtml("see [docs](https://example.com)"); + expect(html).toContain('docs'); + }); + + it("escapes raw HTML", () => { + const html = markdownToMatrixHtml("nope"); + expect(html).toContain("<b>nope</b>"); + expect(html).not.toContain("nope"); + }); + + it("flattens images into alt text", () => { + const html = markdownToMatrixHtml("![alt](https://example.com/img.png)"); + expect(html).toContain("alt"); + expect(html).not.toContain(" { + const html = markdownToMatrixHtml("line1\nline2"); + expect(html).toContain(" escapeHtml(tokens[idx]?.content ?? ""); + +md.renderer.rules.html_block = (tokens, idx) => escapeHtml(tokens[idx]?.content ?? ""); +md.renderer.rules.html_inline = (tokens, idx) => escapeHtml(tokens[idx]?.content ?? ""); + +export function markdownToMatrixHtml(markdown: string): string { + const rendered = md.render(markdown ?? ""); + return rendered.trimEnd(); +} diff --git a/extensions/matrix/src/matrix/index.ts b/extensions/matrix/src/matrix/index.ts new file mode 100644 index 000000000..8729ebc6e --- /dev/null +++ b/extensions/matrix/src/matrix/index.ts @@ -0,0 +1,10 @@ +export { monitorMatrixProvider } from "./monitor/index.js"; +export { probeMatrix } from "./probe.js"; +export { + reactMatrixMessage, + resolveMatrixRoomId, + sendMessageMatrix, + sendPollMatrix, + sendTypingMatrix, +} from "./send.js"; +export { resolveMatrixAuth, resolveSharedMatrixClient } from "./client.js"; diff --git a/extensions/matrix/src/matrix/monitor/allowlist.ts b/extensions/matrix/src/matrix/monitor/allowlist.ts new file mode 100644 index 000000000..a423575e1 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/allowlist.ts @@ -0,0 +1,32 @@ +function normalizeAllowList(list?: Array) { + return (list ?? []).map((entry) => String(entry).trim()).filter(Boolean); +} + +export function normalizeAllowListLower(list?: Array) { + return normalizeAllowList(list).map((entry) => entry.toLowerCase()); +} + +function normalizeMatrixUser(raw?: string | null): string { + return (raw ?? "").trim().toLowerCase(); +} + +export function resolveMatrixAllowListMatches(params: { + allowList: string[]; + userId?: string; + userName?: string; +}) { + const allowList = params.allowList; + if (allowList.length === 0) return false; + if (allowList.includes("*")) return true; + const userId = normalizeMatrixUser(params.userId); + const userName = normalizeMatrixUser(params.userName); + const localPart = userId.startsWith("@") ? (userId.slice(1).split(":")[0] ?? "") : ""; + const candidates = [ + userId, + userId ? `matrix:${userId}` : "", + userId ? `user:${userId}` : "", + userName, + localPart, + ].filter(Boolean); + return candidates.some((value) => allowList.includes(value)); +} diff --git a/extensions/matrix/src/matrix/monitor/auto-join.ts b/extensions/matrix/src/matrix/monitor/auto-join.ts new file mode 100644 index 000000000..c7d24ed5f --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/auto-join.ts @@ -0,0 +1,43 @@ +import type { MatrixClient, MatrixEvent, RoomMember } from "matrix-js-sdk"; +import { RoomMemberEvent } from "matrix-js-sdk"; + +import { danger, logVerbose } from "../../../../../src/globals.js"; +import type { RuntimeEnv } from "../../../../../src/runtime.js"; +import type { CoreConfig } from "../../types.js"; + +export function registerMatrixAutoJoin(params: { + client: MatrixClient; + cfg: CoreConfig; + runtime: RuntimeEnv; +}) { + const { client, cfg, runtime } = params; + const autoJoin = cfg.channels?.matrix?.autoJoin ?? "always"; + const autoJoinAllowlist = cfg.channels?.matrix?.autoJoinAllowlist ?? []; + + client.on(RoomMemberEvent.Membership, async (_event: MatrixEvent, member: RoomMember) => { + if (member.userId !== client.getUserId()) return; + if (member.membership !== "invite") return; + const roomId = member.roomId; + if (autoJoin === "off") return; + if (autoJoin === "allowlist") { + const invitedRoom = client.getRoom(roomId); + const alias = invitedRoom?.getCanonicalAlias?.() ?? ""; + const altAliases = invitedRoom?.getAltAliases?.() ?? []; + const allowed = + autoJoinAllowlist.includes("*") || + autoJoinAllowlist.includes(roomId) || + (alias ? autoJoinAllowlist.includes(alias) : false) || + altAliases.some((value) => autoJoinAllowlist.includes(value)); + if (!allowed) { + logVerbose(`matrix: invite ignored (not in allowlist) room=${roomId}`); + return; + } + } + try { + await client.joinRoom(roomId); + logVerbose(`matrix: joined room ${roomId}`); + } catch (err) { + runtime.error?.(danger(`matrix: failed to join room ${roomId}: ${String(err)}`)); + } + }); +} diff --git a/extensions/matrix/src/matrix/monitor/direct.ts b/extensions/matrix/src/matrix/monitor/direct.ts new file mode 100644 index 000000000..9f64384f8 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/direct.ts @@ -0,0 +1,80 @@ +import type { + AccountDataEvents, + MatrixClient, + MatrixEvent, + Room, + RoomMember, +} from "matrix-js-sdk"; +import { ClientEvent, EventType } from "matrix-js-sdk"; + +function hasDirectFlag(member?: RoomMember | null): boolean { + if (!member?.events.member) return false; + const content = member.events.member.getContent() as { is_direct?: boolean } | undefined; + if (content?.is_direct === true) return true; + const prev = member.events.member.getPrevContent() as { is_direct?: boolean } | undefined; + return prev?.is_direct === true; +} + +export function isLikelyDirectRoom(params: { + room: Room; + senderId: string; + selfId?: string | null; +}): boolean { + if (!params.selfId) return false; + const memberCount = params.room.getJoinedMemberCount?.(); + if (typeof memberCount !== "number" || memberCount !== 2) return false; + return true; +} + +export function isDirectRoomByFlag(params: { + room: Room; + senderId: string; + selfId?: string | null; +}): boolean { + if (!params.selfId) return false; + const selfMember = params.room.getMember(params.selfId); + const senderMember = params.room.getMember(params.senderId); + if (hasDirectFlag(selfMember) || hasDirectFlag(senderMember)) return true; + const inviter = selfMember?.getDMInviter() ?? senderMember?.getDMInviter(); + return Boolean(inviter); +} + +type MatrixDirectAccountData = AccountDataEvents[EventType.Direct]; + +export function createDirectRoomTracker(client: MatrixClient) { + const directMap = new Map>(); + + const updateDirectMap = (content: MatrixDirectAccountData) => { + directMap.clear(); + for (const [userId, rooms] of Object.entries(content)) { + if (!Array.isArray(rooms)) continue; + const ids = rooms.map((roomId) => String(roomId).trim()).filter(Boolean); + if (ids.length === 0) continue; + directMap.set(userId, new Set(ids)); + } + }; + + const initialDirect = client.getAccountData(EventType.Direct); + if (initialDirect) { + updateDirectMap(initialDirect.getContent() ?? {}); + } + + client.on(ClientEvent.AccountData, (event: MatrixEvent) => { + if (event.getType() !== EventType.Direct) return; + updateDirectMap(event.getContent() ?? {}); + }); + + return { + isDirectMessage: (room: Room, senderId: string) => { + const roomId = room.roomId; + const directRooms = directMap.get(senderId); + const selfId = client.getUserId(); + const isDirectByFlag = isDirectRoomByFlag({ room, senderId, selfId }); + return ( + Boolean(directRooms?.has(roomId)) || + isDirectByFlag || + isLikelyDirectRoom({ room, senderId, selfId }) + ); + }, + }; +} diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts new file mode 100644 index 000000000..d441d633d --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -0,0 +1,500 @@ +import type { MatrixEvent, Room } from "matrix-js-sdk"; +import { EventType, RelationType, RoomEvent } from "matrix-js-sdk"; +import type { RoomMessageEventContent } from "matrix-js-sdk/lib/@types/events.js"; + +import { resolveEffectiveMessagesConfig, resolveHumanDelayConfig } from "../../../../../src/agents/identity.js"; +import { chunkMarkdownText, resolveTextChunkLimit } from "../../../../../src/auto-reply/chunk.js"; +import { hasControlCommand } from "../../../../../src/auto-reply/command-detection.js"; +import { shouldHandleTextCommands } from "../../../../../src/auto-reply/commands-registry.js"; +import { formatAgentEnvelope } from "../../../../../src/auto-reply/envelope.js"; +import { dispatchReplyFromConfig } from "../../../../../src/auto-reply/reply/dispatch-from-config.js"; +import { + buildMentionRegexes, + matchesMentionPatterns, +} from "../../../../../src/auto-reply/reply/mentions.js"; +import { createReplyDispatcherWithTyping } from "../../../../../src/auto-reply/reply/reply-dispatcher.js"; +import type { ReplyPayload } from "../../../../../src/auto-reply/types.js"; +import { loadConfig } from "../../../../../src/config/config.js"; +import { resolveStorePath, updateLastRoute } from "../../../../../src/config/sessions.js"; +import { danger, logVerbose, shouldLogVerbose } from "../../../../../src/globals.js"; +import { enqueueSystemEvent } from "../../../../../src/infra/system-events.js"; +import { getChildLogger } from "../../../../../src/logging.js"; +import { + readChannelAllowFromStore, + upsertChannelPairingRequest, +} from "../../../../../src/pairing/pairing-store.js"; +import { resolveAgentRoute } from "../../../../../src/routing/resolve-route.js"; +import type { RuntimeEnv } from "../../../../../src/runtime.js"; +import type { CoreConfig, ReplyToMode } from "../../types.js"; +import { setActiveMatrixClient } from "../active-client.js"; +import { + isBunRuntime, + resolveMatrixAuth, + resolveSharedMatrixClient, +} from "../client.js"; +import { + formatPollAsText, + isPollStartType, + type PollStartContent, + parsePollStartContent, +} from "../poll-types.js"; +import { reactMatrixMessage, sendMessageMatrix, sendTypingMatrix } from "../send.js"; +import { resolveMatrixAllowListMatches, normalizeAllowListLower } from "./allowlist.js"; +import { registerMatrixAutoJoin } from "./auto-join.js"; +import { createDirectRoomTracker } from "./direct.js"; +import { downloadMatrixMedia } from "./media.js"; +import { resolveMentions } from "./mentions.js"; +import { deliverMatrixReplies } from "./replies.js"; +import { resolveMatrixRoomConfig } from "./rooms.js"; +import { resolveMatrixThreadRootId, resolveMatrixThreadTarget } from "./threads.js"; + +export type MonitorMatrixOpts = { + runtime?: RuntimeEnv; + abortSignal?: AbortSignal; + mediaMaxMb?: number; + initialSyncLimit?: number; + replyToMode?: ReplyToMode; +}; + +const DEFAULT_MEDIA_MAX_MB = 20; + +export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promise { + if (isBunRuntime()) { + throw new Error("Matrix provider requires Node (bun runtime not supported)"); + } + const cfg = loadConfig() as CoreConfig; + if (cfg.channels?.matrix?.enabled === false) return; + + const runtime: RuntimeEnv = opts.runtime ?? { + log: console.log, + error: console.error, + exit: (code: number): never => { + throw new Error(`exit ${code}`); + }, + }; + + const auth = await resolveMatrixAuth({ cfg }); + const resolvedInitialSyncLimit = + typeof opts.initialSyncLimit === "number" + ? Math.max(0, Math.floor(opts.initialSyncLimit)) + : auth.initialSyncLimit; + const authWithLimit = + resolvedInitialSyncLimit === auth.initialSyncLimit + ? auth + : { ...auth, initialSyncLimit: resolvedInitialSyncLimit }; + const client = await resolveSharedMatrixClient({ + cfg, + auth: authWithLimit, + startClient: false, + }); + setActiveMatrixClient(client); + + const mentionRegexes = buildMentionRegexes(cfg); + const logger = getChildLogger({ module: "matrix-auto-reply" }); + const allowlistOnly = cfg.channels?.matrix?.allowlistOnly === true; + const groupPolicyRaw = cfg.channels?.matrix?.groupPolicy ?? "allowlist"; + const groupPolicy = allowlistOnly && groupPolicyRaw === "open" ? "allowlist" : groupPolicyRaw; + const replyToMode = opts.replyToMode ?? cfg.channels?.matrix?.replyToMode ?? "off"; + const threadReplies = cfg.channels?.matrix?.threadReplies ?? "inbound"; + const dmConfig = cfg.channels?.matrix?.dm; + const dmEnabled = dmConfig?.enabled ?? true; + const dmPolicyRaw = dmConfig?.policy ?? "pairing"; + const dmPolicy = allowlistOnly && dmPolicyRaw !== "disabled" ? "allowlist" : dmPolicyRaw; + const allowFrom = dmConfig?.allowFrom ?? []; + const textLimit = resolveTextChunkLimit(cfg, "matrix"); + const mediaMaxMb = opts.mediaMaxMb ?? cfg.channels?.matrix?.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB; + const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024; + const startupMs = Date.now(); + const startupGraceMs = 0; + const directTracker = createDirectRoomTracker(client); + registerMatrixAutoJoin({ client, cfg, runtime }); + + const handleTimeline = async ( + event: MatrixEvent, + room: Room | undefined, + toStartOfTimeline?: boolean, + ) => { + try { + if (!room) return; + if (toStartOfTimeline) return; + if (event.getType() === EventType.RoomMessageEncrypted || event.isDecryptionFailure()) { + return; + } + + const eventType = event.getType(); + const isPollEvent = isPollStartType(eventType); + if (eventType !== EventType.RoomMessage && !isPollEvent) return; + if (event.isRedacted()) return; + const senderId = event.getSender(); + if (!senderId) return; + if (senderId === client.getUserId()) return; + const eventTs = event.getTs(); + const eventAge = event.getAge(); + if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) { + return; + } + if ( + typeof eventTs !== "number" && + typeof eventAge === "number" && + eventAge > startupGraceMs + ) { + return; + } + + let content = event.getContent(); + if (isPollEvent) { + const pollStartContent = event.getContent(); + const pollSummary = parsePollStartContent(pollStartContent); + if (pollSummary) { + pollSummary.eventId = event.getId() ?? ""; + pollSummary.roomId = room.roomId; + pollSummary.sender = senderId; + pollSummary.senderName = room.getMember(senderId)?.name ?? senderId; + const pollText = formatPollAsText(pollSummary); + content = { + msgtype: "m.text", + body: pollText, + } as unknown as RoomMessageEventContent; + } else { + return; + } + } + + const relates = content["m.relates_to"]; + if (relates && "rel_type" in relates) { + if (relates.rel_type === RelationType.Replace) return; + } + + const roomId = room.roomId; + const isDirectMessage = directTracker.isDirectMessage(room, senderId); + const isRoom = !isDirectMessage; + + if (!isDirectMessage && groupPolicy === "disabled") return; + + const roomAliases = [ + room.getCanonicalAlias?.() ?? "", + ...(room.getAltAliases?.() ?? []), + ].filter(Boolean); + const roomName = room.name ?? undefined; + const roomConfigInfo = resolveMatrixRoomConfig({ + rooms: cfg.channels?.matrix?.rooms, + roomId, + aliases: roomAliases, + name: roomName, + }); + + if (roomConfigInfo.config && !roomConfigInfo.allowed) { + logVerbose(`matrix: room disabled room=${roomId}`); + return; + } + if (groupPolicy === "allowlist") { + if (!roomConfigInfo.allowlistConfigured) { + logVerbose("matrix: drop room message (no allowlist)"); + return; + } + if (!roomConfigInfo.config) { + logVerbose("matrix: drop room message (not in allowlist)"); + return; + } + } + + const senderName = room.getMember(senderId)?.name ?? senderId; + const storeAllowFrom = await readChannelAllowFromStore("matrix").catch(() => []); + const effectiveAllowFrom = normalizeAllowListLower([...allowFrom, ...storeAllowFrom]); + + if (isDirectMessage) { + if (!dmEnabled || dmPolicy === "disabled") return; + if (dmPolicy !== "open") { + const permitted = + effectiveAllowFrom.length > 0 && + resolveMatrixAllowListMatches({ + allowList: effectiveAllowFrom, + userId: senderId, + userName: senderName, + }); + if (!permitted) { + if (dmPolicy === "pairing") { + const { code, created } = await upsertChannelPairingRequest({ + channel: "matrix", + id: senderId, + meta: { name: senderName }, + }); + if (created) { + try { + await sendMessageMatrix( + `room:${roomId}`, + [ + "Clawdbot: access not configured.", + "", + `Pairing code: ${code}`, + "", + "Ask the bot owner to approve with:", + "clawdbot pairing approve matrix ", + ].join("\n"), + { client }, + ); + } catch (err) { + logVerbose(`matrix pairing reply failed for ${senderId}: ${String(err)}`); + } + } + } + return; + } + } + } + + if (isRoom && roomConfigInfo.config?.users?.length) { + const userAllowed = resolveMatrixAllowListMatches({ + allowList: normalizeAllowListLower(roomConfigInfo.config.users), + userId: senderId, + userName: senderName, + }); + if (!userAllowed) { + logVerbose(`matrix: blocked sender ${senderId} (room users allowlist)`); + return; + } + } + + const rawBody = content.body.trim(); + let media: { + path: string; + contentType?: string; + placeholder: string; + } | null = null; + const contentUrl = + "url" in content && typeof content.url === "string" ? content.url : undefined; + if (!rawBody && !contentUrl) { + return; + } + + const contentType = + "info" in content && content.info && "mimetype" in content.info + ? (content.info as { mimetype?: string }).mimetype + : undefined; + if (contentUrl?.startsWith("mxc://")) { + try { + media = await downloadMatrixMedia({ + client, + mxcUrl: contentUrl, + contentType, + maxBytes: mediaMaxBytes, + }); + } catch (err) { + logVerbose(`matrix: media download failed: ${String(err)}`); + } + } + + const bodyText = rawBody || media?.placeholder || ""; + if (!bodyText) return; + + const { wasMentioned, hasExplicitMention } = resolveMentions({ + content, + userId: client.getUserId(), + text: bodyText, + mentionRegexes, + }); + const commandAuthorized = + (!allowlistOnly && effectiveAllowFrom.length === 0) || + resolveMatrixAllowListMatches({ + allowList: effectiveAllowFrom, + userId: senderId, + userName: senderName, + }); + const allowTextCommands = shouldHandleTextCommands({ + cfg, + surface: "matrix", + }); + const shouldRequireMention = isRoom + ? roomConfigInfo.config?.autoReply === true + ? false + : roomConfigInfo.config?.autoReply === false + ? true + : typeof roomConfigInfo.config?.requireMention === "boolean" + ? roomConfigInfo.config.requireMention + : true + : false; + const shouldBypassMention = + allowTextCommands && + isRoom && + shouldRequireMention && + !wasMentioned && + !hasExplicitMention && + commandAuthorized && + hasControlCommand(bodyText); + if (isRoom && shouldRequireMention && !wasMentioned && !shouldBypassMention) { + logger.info({ roomId, reason: "no-mention" }, "skipping room message"); + return; + } + + const messageId = event.getId() ?? ""; + const threadRootId = resolveMatrixThreadRootId({ event, content }); + const threadTarget = resolveMatrixThreadTarget({ + threadReplies, + messageId, + threadRootId, + isThreadRoot: event.isThreadRoot, + }); + + const textWithId = `${bodyText}\n[matrix event id: ${messageId} room: ${roomId}]`; + const body = formatAgentEnvelope({ + channel: "Matrix", + from: senderName, + timestamp: event.getTs() ?? undefined, + body: textWithId, + }); + + const route = resolveAgentRoute({ + cfg, + channel: "matrix", + peer: { + kind: isDirectMessage ? "dm" : "channel", + id: isDirectMessage ? senderId : roomId, + }, + }); + + const groupSystemPrompt = roomConfigInfo.config?.systemPrompt?.trim() || undefined; + const ctxPayload = { + Body: body, + From: isDirectMessage ? `matrix:${senderId}` : `matrix:channel:${roomId}`, + To: `room:${roomId}`, + SessionKey: route.sessionKey, + AccountId: route.accountId, + ChatType: isDirectMessage ? "direct" : "room", + SenderName: senderName, + SenderId: senderId, + SenderUsername: senderId.split(":")[0]?.replace(/^@/, ""), + GroupSubject: isRoom ? (roomName ?? roomId) : undefined, + GroupRoom: isRoom ? (room.getCanonicalAlias?.() ?? roomId) : undefined, + GroupSystemPrompt: isRoom ? groupSystemPrompt : undefined, + Provider: "matrix" as const, + Surface: "matrix" as const, + WasMentioned: isRoom ? wasMentioned : undefined, + MessageSid: messageId, + ReplyToId: threadTarget ? undefined : (event.replyEventId ?? undefined), + MessageThreadId: threadTarget, + Timestamp: event.getTs() ?? undefined, + MediaPath: media?.path, + MediaType: media?.contentType, + MediaUrl: media?.path, + CommandAuthorized: commandAuthorized, + CommandSource: "text" as const, + OriginatingChannel: "matrix" as const, + OriginatingTo: `room:${roomId}`, + }; + + if (isDirectMessage) { + const storePath = resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + await updateLastRoute({ + storePath, + sessionKey: route.mainSessionKey, + channel: "matrix", + to: `room:${roomId}`, + accountId: route.accountId, + }); + } + + if (shouldLogVerbose()) { + const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n"); + logVerbose(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`); + } + + const ackReaction = (cfg.messages?.ackReaction ?? "").trim(); + const ackScope = cfg.messages?.ackReactionScope ?? "group-mentions"; + const shouldAckReaction = () => { + if (!ackReaction) return false; + if (ackScope === "all") return true; + if (ackScope === "direct") return isDirectMessage; + if (ackScope === "group-all") return isRoom; + if (ackScope === "group-mentions") { + if (!isRoom) return false; + if (!shouldRequireMention) return false; + return wasMentioned || shouldBypassMention; + } + return false; + }; + if (shouldAckReaction() && messageId) { + reactMatrixMessage(roomId, messageId, ackReaction, client).catch((err) => { + logVerbose(`matrix react failed for room ${roomId}: ${String(err)}`); + }); + } + + const replyTarget = ctxPayload.To; + if (!replyTarget) { + runtime.error?.(danger("matrix: missing reply target")); + return; + } + + let didSendReply = false; + const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ + responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId).responsePrefix, + humanDelay: resolveHumanDelayConfig(cfg, route.agentId), + deliver: async (payload) => { + await deliverMatrixReplies({ + replies: [payload], + roomId, + client, + runtime, + textLimit, + replyToMode, + threadId: threadTarget, + }); + didSendReply = true; + }, + onError: (err, info) => { + runtime.error?.(danger(`matrix ${info.kind} reply failed: ${String(err)}`)); + }, + onReplyStart: () => sendTypingMatrix(roomId, true, undefined, client).catch(() => {}), + onIdle: () => sendTypingMatrix(roomId, false, undefined, client).catch(() => {}), + }); + + const { queuedFinal, counts } = await dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: roomConfigInfo.config?.skills, + }, + }); + markDispatchIdle(); + if (!queuedFinal) return; + didSendReply = true; + if (shouldLogVerbose()) { + const finalCount = counts.final; + logVerbose(`matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`); + } + if (didSendReply) { + const preview = bodyText.replace(/\s+/g, " ").slice(0, 160); + enqueueSystemEvent(`Matrix message from ${senderName}: ${preview}`, { + sessionKey: route.sessionKey, + contextKey: `matrix:message:${roomId}:${messageId || "unknown"}`, + }); + } + } catch (err) { + runtime.error?.(danger(`matrix handler failed: ${String(err)}`)); + } + }; + + client.on(RoomEvent.Timeline, handleTimeline); + + await resolveSharedMatrixClient({ cfg, auth: authWithLimit, startClient: true }); + runtime.log?.(`matrix: logged in as ${auth.userId}`); + + await new Promise((resolve) => { + const onAbort = () => { + try { + client.stopClient(); + } finally { + setActiveMatrixClient(null); + resolve(); + } + }; + if (opts.abortSignal?.aborted) { + onAbort(); + return; + } + opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); + }); +} diff --git a/extensions/matrix/src/matrix/monitor/media.ts b/extensions/matrix/src/matrix/monitor/media.ts new file mode 100644 index 000000000..8e66755e1 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/media.ts @@ -0,0 +1,58 @@ +import type { MatrixClient } from "matrix-js-sdk"; + +import { saveMediaBuffer } from "../../../../../src/media/store.js"; + +async function fetchMatrixMediaBuffer(params: { + client: MatrixClient; + mxcUrl: string; + maxBytes: number; +}): Promise<{ buffer: Buffer; headerType?: string } | null> { + const url = params.client.mxcUrlToHttp( + params.mxcUrl, + undefined, + undefined, + undefined, + false, + true, + true, + ); + if (!url) return null; + const token = params.client.getAccessToken(); + const res = await fetch(url, { + headers: token ? { Authorization: `Bearer ${token}` } : undefined, + }); + if (!res.ok) { + throw new Error(`Matrix media download failed: HTTP ${res.status}`); + } + const buffer = Buffer.from(await res.arrayBuffer()); + if (buffer.byteLength > params.maxBytes) { + throw new Error("Matrix media exceeds configured size limit"); + } + const headerType = res.headers.get("content-type") ?? undefined; + return { buffer, headerType }; +} + +export async function downloadMatrixMedia(params: { + client: MatrixClient; + mxcUrl: string; + contentType?: string; + maxBytes: number; +}): Promise<{ + path: string; + contentType?: string; + placeholder: string; +} | null> { + const fetched = await fetchMatrixMediaBuffer({ + client: params.client, + mxcUrl: params.mxcUrl, + maxBytes: params.maxBytes, + }); + if (!fetched) return null; + const headerType = fetched.headerType ?? params.contentType ?? undefined; + const saved = await saveMediaBuffer(fetched.buffer, headerType, "inbound", params.maxBytes); + return { + path: saved.path, + contentType: saved.contentType, + placeholder: "[matrix media]", + }; +} diff --git a/extensions/matrix/src/matrix/monitor/mentions.ts b/extensions/matrix/src/matrix/monitor/mentions.ts new file mode 100644 index 000000000..fee2562d1 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/mentions.ts @@ -0,0 +1,22 @@ +import type { RoomMessageEventContent } from "matrix-js-sdk/lib/@types/events.js"; + +import { matchesMentionPatterns } from "../../../../../src/auto-reply/reply/mentions.js"; + +export function resolveMentions(params: { + content: RoomMessageEventContent; + userId?: string | null; + text?: string; + mentionRegexes: RegExp[]; +}) { + const mentions = params.content["m.mentions"] as + | { user_ids?: string[]; room?: boolean } + | undefined; + const mentionedUsers = Array.isArray(mentions?.user_ids) + ? new Set(mentions.user_ids) + : new Set(); + const wasMentioned = + Boolean(mentions?.room) || + (params.userId ? mentionedUsers.has(params.userId) : false) || + matchesMentionPatterns(params.text ?? "", params.mentionRegexes); + return { wasMentioned, hasExplicitMention: Boolean(mentions) }; +} diff --git a/extensions/matrix/src/matrix/monitor/replies.ts b/extensions/matrix/src/matrix/monitor/replies.ts new file mode 100644 index 000000000..e347df1c9 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/replies.ts @@ -0,0 +1,67 @@ +import type { MatrixClient } from "matrix-js-sdk"; + +import { chunkMarkdownText } from "../../../../../src/auto-reply/chunk.js"; +import type { ReplyPayload } from "../../../../../src/auto-reply/types.js"; +import { danger } from "../../../../../src/globals.js"; +import type { RuntimeEnv } from "../../../../../src/runtime.js"; +import { sendMessageMatrix } from "../send.js"; + +export async function deliverMatrixReplies(params: { + replies: ReplyPayload[]; + roomId: string; + client: MatrixClient; + runtime: RuntimeEnv; + textLimit: number; + replyToMode: "off" | "first" | "all"; + threadId?: string; +}): Promise { + const chunkLimit = Math.min(params.textLimit, 4000); + let hasReplied = false; + for (const reply of params.replies) { + if (!reply?.text && !reply?.mediaUrl && !(reply?.mediaUrls?.length ?? 0)) { + params.runtime.error?.(danger("matrix reply missing text/media")); + continue; + } + const replyToIdRaw = reply.replyToId?.trim(); + const replyToId = params.threadId || params.replyToMode === "off" ? undefined : replyToIdRaw; + const mediaList = reply.mediaUrls?.length + ? reply.mediaUrls + : reply.mediaUrl + ? [reply.mediaUrl] + : []; + + const shouldIncludeReply = (id?: string) => + Boolean(id) && (params.replyToMode === "all" || !hasReplied); + + if (mediaList.length === 0) { + for (const chunk of chunkMarkdownText(reply.text ?? "", chunkLimit)) { + const trimmed = chunk.trim(); + if (!trimmed) continue; + await sendMessageMatrix(params.roomId, trimmed, { + client: params.client, + replyToId: shouldIncludeReply(replyToId) ? replyToId : undefined, + threadId: params.threadId, + }); + if (shouldIncludeReply(replyToId)) { + hasReplied = true; + } + } + continue; + } + + let first = true; + for (const mediaUrl of mediaList) { + const caption = first ? (reply.text ?? "") : ""; + await sendMessageMatrix(params.roomId, caption, { + client: params.client, + mediaUrl, + replyToId: shouldIncludeReply(replyToId) ? replyToId : undefined, + threadId: params.threadId, + }); + if (shouldIncludeReply(replyToId)) { + hasReplied = true; + } + first = false; + } + } +} diff --git a/extensions/matrix/src/matrix/monitor/rooms.ts b/extensions/matrix/src/matrix/monitor/rooms.ts new file mode 100644 index 000000000..12c475699 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/rooms.ts @@ -0,0 +1,36 @@ +import type { MatrixConfig, MatrixRoomConfig } from "../../types.js"; + +export type MatrixRoomConfigResolved = { + allowed: boolean; + allowlistConfigured: boolean; + config?: MatrixRoomConfig; +}; + +export function resolveMatrixRoomConfig(params: { + rooms?: MatrixConfig["rooms"]; + roomId: string; + aliases: string[]; + name?: string | null; +}): MatrixRoomConfigResolved { + const rooms = params.rooms ?? {}; + const keys = Object.keys(rooms); + const allowlistConfigured = keys.length > 0; + const candidates = [ + params.roomId, + `room:${params.roomId}`, + ...params.aliases, + params.name ?? "", + ].filter(Boolean); + let matched: MatrixRoomConfigResolved["config"] | undefined; + for (const candidate of candidates) { + if (rooms[candidate]) { + matched = rooms[candidate]; + break; + } + } + if (!matched && rooms["*"]) { + matched = rooms["*"]; + } + const allowed = matched ? matched.enabled !== false && matched.allow !== false : false; + return { allowed, allowlistConfigured, config: matched }; +} diff --git a/extensions/matrix/src/matrix/monitor/threads.ts b/extensions/matrix/src/matrix/monitor/threads.ts new file mode 100644 index 000000000..50a42ac67 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/threads.ts @@ -0,0 +1,49 @@ +import type { MatrixEvent } from "matrix-js-sdk"; +import { RelationType } from "matrix-js-sdk"; +import type { RoomMessageEventContent } from "matrix-js-sdk/lib/@types/events.js"; + +export function resolveMatrixThreadTarget(params: { + threadReplies: "off" | "inbound" | "always"; + messageId: string; + threadRootId?: string; + isThreadRoot?: boolean; +}): string | undefined { + const { threadReplies, messageId, threadRootId } = params; + if (threadReplies === "off") return undefined; + const isThreadRoot = params.isThreadRoot === true; + const hasInboundThread = Boolean(threadRootId && threadRootId !== messageId && !isThreadRoot); + if (threadReplies === "inbound") { + return hasInboundThread ? threadRootId : undefined; + } + if (threadReplies === "always") { + return threadRootId ?? messageId; + } + return undefined; +} + +export function resolveMatrixThreadRootId(params: { + event: MatrixEvent; + content: RoomMessageEventContent; +}): string | undefined { + const fromThread = params.event.getThread?.()?.id; + if (fromThread) return fromThread; + const direct = params.event.threadRootId ?? undefined; + if (direct) return direct; + const relates = params.content["m.relates_to"]; + if (!relates || typeof relates !== "object") return undefined; + if ("rel_type" in relates && relates.rel_type === RelationType.Thread) { + if ("event_id" in relates && typeof relates.event_id === "string") { + return relates.event_id; + } + if ( + "m.in_reply_to" in relates && + typeof relates["m.in_reply_to"] === "object" && + relates["m.in_reply_to"] && + "event_id" in relates["m.in_reply_to"] && + typeof relates["m.in_reply_to"].event_id === "string" + ) { + return relates["m.in_reply_to"].event_id; + } + } + return undefined; +} diff --git a/extensions/matrix/src/matrix/poll-types.ts b/extensions/matrix/src/matrix/poll-types.ts new file mode 100644 index 000000000..68ddc9412 --- /dev/null +++ b/extensions/matrix/src/matrix/poll-types.ts @@ -0,0 +1,144 @@ +/** + * Matrix Poll Types (MSC3381) + * + * Defines types for Matrix poll events: + * - m.poll.start - Creates a new poll + * - m.poll.response - Records a vote + * - m.poll.end - Closes a poll + */ + +import type { PollInput } from "../../../../src/polls.js"; + +export const M_POLL_START = "m.poll.start"; +export const M_POLL_RESPONSE = "m.poll.response"; +export const M_POLL_END = "m.poll.end"; + +export const ORG_POLL_START = "org.matrix.msc3381.poll.start"; +export const ORG_POLL_RESPONSE = "org.matrix.msc3381.poll.response"; +export const ORG_POLL_END = "org.matrix.msc3381.poll.end"; + +export const POLL_EVENT_TYPES = [ + M_POLL_START, + M_POLL_RESPONSE, + M_POLL_END, + ORG_POLL_START, + ORG_POLL_RESPONSE, + ORG_POLL_END, +]; + +export const POLL_START_TYPES = [M_POLL_START, ORG_POLL_START]; +export const POLL_RESPONSE_TYPES = [M_POLL_RESPONSE, ORG_POLL_RESPONSE]; +export const POLL_END_TYPES = [M_POLL_END, ORG_POLL_END]; + +export type PollKind = "m.poll.disclosed" | "m.poll.undisclosed"; + +export type TextContent = { + "m.text"?: string; + "org.matrix.msc1767.text"?: string; + body?: string; +}; + +export type PollAnswer = { + id: string; +} & TextContent; + +export type PollStartContent = { + "m.poll"?: { + question: TextContent; + kind?: PollKind; + max_selections?: number; + answers: PollAnswer[]; + }; + "org.matrix.msc3381.poll.start"?: { + question: TextContent; + kind?: PollKind; + max_selections?: number; + answers: PollAnswer[]; + }; + "m.relates_to"?: { + rel_type: "m.reference"; + event_id: string; + }; +}; + +export type PollSummary = { + eventId: string; + roomId: string; + sender: string; + senderName: string; + question: string; + answers: string[]; + kind: PollKind; + maxSelections: number; +}; + +export function isPollStartType(eventType: string): boolean { + return POLL_START_TYPES.includes(eventType); +} + +export function getTextContent(text?: TextContent): string { + if (!text) return ""; + return text["m.text"] ?? text["org.matrix.msc1767.text"] ?? text.body ?? ""; +} + +export function parsePollStartContent(content: PollStartContent): PollSummary | null { + const poll = content["m.poll"] ?? content["org.matrix.msc3381.poll.start"]; + if (!poll) return null; + + const question = getTextContent(poll.question); + if (!question) return null; + + const answers = poll.answers + .map((answer) => getTextContent(answer)) + .filter((a) => a.trim().length > 0); + + return { + eventId: "", + roomId: "", + sender: "", + senderName: "", + question, + answers, + kind: poll.kind ?? "m.poll.disclosed", + maxSelections: poll.max_selections ?? 1, + }; +} + +export function formatPollAsText(summary: PollSummary): string { + const lines = [ + "[Poll]", + summary.question, + "", + ...summary.answers.map((answer, idx) => `${idx + 1}. ${answer}`), + ]; + return lines.join("\n"); +} + +function buildTextContent(body: string): TextContent { + return { + "m.text": body, + "org.matrix.msc1767.text": body, + }; +} + +export function buildPollStartContent(poll: PollInput): PollStartContent { + const question = poll.question.trim(); + const answers = poll.options + .map((option) => option.trim()) + .filter((option) => option.length > 0) + .map((option, idx) => ({ + id: `answer${idx + 1}`, + ...buildTextContent(option), + })); + + const maxSelections = poll.multiple ? Math.max(1, answers.length) : 1; + + return { + "m.poll": { + question: buildTextContent(question), + kind: poll.multiple ? "m.poll.undisclosed" : "m.poll.disclosed", + max_selections: maxSelections, + answers, + }, + }; +} diff --git a/extensions/matrix/src/matrix/probe.ts b/extensions/matrix/src/matrix/probe.ts new file mode 100644 index 000000000..baf3c502c --- /dev/null +++ b/extensions/matrix/src/matrix/probe.ts @@ -0,0 +1,69 @@ +import { createMatrixClient, isBunRuntime } from "./client.js"; + +export type MatrixProbe = { + ok: boolean; + status?: number | null; + error?: string | null; + elapsedMs: number; + userId?: string | null; +}; + +export async function probeMatrix(params: { + homeserver: string; + accessToken: string; + userId?: string; + timeoutMs: number; +}): Promise { + const started = Date.now(); + const result: MatrixProbe = { + ok: false, + status: null, + error: null, + elapsedMs: 0, + }; + if (isBunRuntime()) { + return { + ...result, + error: "Matrix probe requires Node (bun runtime not supported)", + elapsedMs: Date.now() - started, + }; + } + if (!params.homeserver?.trim()) { + return { + ...result, + error: "missing homeserver", + elapsedMs: Date.now() - started, + }; + } + if (!params.accessToken?.trim()) { + return { + ...result, + error: "missing access token", + elapsedMs: Date.now() - started, + }; + } + try { + const client = await createMatrixClient({ + homeserver: params.homeserver, + userId: params.userId ?? "", + accessToken: params.accessToken, + localTimeoutMs: params.timeoutMs, + }); + const res = await client.whoami(); + result.ok = true; + result.userId = res.user_id ?? null; + + result.elapsedMs = Date.now() - started; + return result; + } catch (err) { + return { + ...result, + status: + typeof err === "object" && err && "httpStatus" in err + ? Number((err as { httpStatus?: number }).httpStatus) + : result.status, + error: err instanceof Error ? err.message : String(err), + elapsedMs: Date.now() - started, + }; + } +} diff --git a/extensions/matrix/src/matrix/send.test.ts b/extensions/matrix/src/matrix/send.test.ts new file mode 100644 index 000000000..91b3da29c --- /dev/null +++ b/extensions/matrix/src/matrix/send.test.ts @@ -0,0 +1,61 @@ +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("../../../../src/config/config.js", () => ({ + loadConfig: () => ({}), +})); + +vi.mock("../../../../src/web/media.js", () => ({ + loadWebMedia: vi.fn().mockResolvedValue({ + buffer: Buffer.from("media"), + fileName: "photo.png", + contentType: "image/png", + kind: "image", + }), +})); + +let sendMessageMatrix: typeof import("./send.js").sendMessageMatrix; + +const makeClient = () => { + const sendMessage = vi.fn().mockResolvedValue({ event_id: "evt1" }); + const uploadContent = vi.fn().mockResolvedValue({ + content_uri: "mxc://example/file", + }); + const client = { + sendMessage, + uploadContent, + } as unknown as import("matrix-js-sdk").MatrixClient; + return { client, sendMessage, uploadContent }; +}; + +describe("sendMessageMatrix media", () => { + beforeAll(async () => { + ({ sendMessageMatrix } = await import("./send.js")); + }); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("uploads media with url payloads", async () => { + const { client, sendMessage, uploadContent } = makeClient(); + + await sendMessageMatrix("room:!room:example", "caption", { + client, + mediaUrl: "file:///tmp/photo.png", + }); + + const uploadArg = uploadContent.mock.calls[0]?.[0]; + expect(Buffer.isBuffer(uploadArg)).toBe(true); + + const content = sendMessage.mock.calls[0]?.[2] as { + url?: string; + msgtype?: string; + format?: string; + formatted_body?: string; + }; + expect(content.msgtype).toBe("m.file"); + expect(content.format).toBe("org.matrix.custom.html"); + expect(content.formatted_body).toContain("caption"); + expect(content.url).toBe("mxc://example/file"); + }); +}); diff --git a/extensions/matrix/src/matrix/send.ts b/extensions/matrix/src/matrix/send.ts new file mode 100644 index 000000000..47b566eb3 --- /dev/null +++ b/extensions/matrix/src/matrix/send.ts @@ -0,0 +1,389 @@ +import type { AccountDataEvents, MatrixClient } from "matrix-js-sdk"; +import { EventType, MsgType, RelationType } from "matrix-js-sdk"; +import type { + ReactionEventContent, + RoomMessageEventContent, +} from "matrix-js-sdk/lib/@types/events.js"; + +import { chunkMarkdownText, resolveTextChunkLimit } from "../../../../src/auto-reply/chunk.js"; +import { loadConfig } from "../../../../src/config/config.js"; +import type { PollInput } from "../../../../src/polls.js"; +import { loadWebMedia } from "../../../../src/web/media.js"; +import { getActiveMatrixClient } from "./active-client.js"; +import { + createMatrixClient, + isBunRuntime, + resolveMatrixAuth, + resolveSharedMatrixClient, + waitForMatrixSync, +} from "./client.js"; +import { markdownToMatrixHtml } from "./format.js"; +import { buildPollStartContent, M_POLL_START } from "./poll-types.js"; +import type { CoreConfig } from "../types.js"; + +const MATRIX_TEXT_LIMIT = 4000; + +type MatrixDirectAccountData = AccountDataEvents[EventType.Direct]; + +type MatrixReplyRelation = { + "m.in_reply_to": { event_id: string }; +}; + +type MatrixMessageContent = Record & { + msgtype: MsgType; + body: string; +}; + +type MatrixUploadContent = Parameters[0]; + +export type MatrixSendResult = { + messageId: string; + roomId: string; +}; + +export type MatrixSendOpts = { + client?: MatrixClient; + mediaUrl?: string; + replyToId?: string; + threadId?: string | number | null; + timeoutMs?: number; +}; + +function ensureNodeRuntime() { + if (isBunRuntime()) { + throw new Error("Matrix support requires Node (bun runtime not supported)"); + } +} + +function resolveMediaMaxBytes(): number | undefined { + const cfg = loadConfig() as CoreConfig; + if (typeof cfg.channels?.matrix?.mediaMaxMb === "number") { + return cfg.channels.matrix.mediaMaxMb * 1024 * 1024; + } + return undefined; +} + +function normalizeTarget(raw: string): string { + const trimmed = raw.trim(); + if (!trimmed) { + throw new Error("Matrix target is required (room: or #alias)"); + } + return trimmed; +} + +async function resolveDirectRoomId(client: MatrixClient, userId: string): Promise { + const trimmed = userId.trim(); + if (!trimmed.startsWith("@")) { + throw new Error(`Matrix user IDs must be fully qualified (got "${trimmed}")`); + } + const directEvent = client.getAccountData(EventType.Direct); + const directContent = directEvent?.getContent(); + const list = Array.isArray(directContent?.[trimmed]) ? directContent[trimmed] : []; + if (list.length > 0) return list[0]; + const server = await client.getAccountDataFromServer(EventType.Direct); + const serverList = Array.isArray(server?.[trimmed]) ? server[trimmed] : []; + if (serverList.length > 0) return serverList[0]; + throw new Error( + `No m.direct room found for ${trimmed}. Open a DM first so Matrix can set m.direct.`, + ); +} + +export async function resolveMatrixRoomId( + client: MatrixClient, + raw: string, +): Promise { + const target = normalizeTarget(raw); + const lowered = target.toLowerCase(); + if (lowered.startsWith("matrix:")) { + return await resolveMatrixRoomId(client, target.slice("matrix:".length)); + } + if (lowered.startsWith("room:")) { + return await resolveMatrixRoomId(client, target.slice("room:".length)); + } + if (lowered.startsWith("channel:")) { + return await resolveMatrixRoomId(client, target.slice("channel:".length)); + } + if (lowered.startsWith("user:")) { + return await resolveDirectRoomId(client, target.slice("user:".length)); + } + if (target.startsWith("@")) { + return await resolveDirectRoomId(client, target); + } + if (target.startsWith("#")) { + const resolved = await client.getRoomIdForAlias(target); + if (!resolved?.room_id) { + throw new Error(`Matrix alias ${target} could not be resolved`); + } + return resolved.room_id; + } + return target; +} + +function buildMediaContent(params: { + msgtype: MsgType.Image | MsgType.Audio | MsgType.Video | MsgType.File; + body: string; + url: string; + filename?: string; + mimetype?: string; + size: number; + relation?: MatrixReplyRelation; +}): RoomMessageEventContent { + const info = { mimetype: params.mimetype, size: params.size }; + const base: MatrixMessageContent = { + msgtype: params.msgtype, + body: params.body, + filename: params.filename, + info, + url: params.url, + }; + if (params.relation) { + base["m.relates_to"] = params.relation; + } + applyMatrixFormatting(base, params.body); + return base as RoomMessageEventContent; +} + +function buildTextContent(body: string, relation?: MatrixReplyRelation): RoomMessageEventContent { + const content: MatrixMessageContent = relation + ? { + msgtype: MsgType.Text, + body, + "m.relates_to": relation, + } + : { + msgtype: MsgType.Text, + body, + }; + applyMatrixFormatting(content, body); + return content as RoomMessageEventContent; +} + +function applyMatrixFormatting(content: MatrixMessageContent, body: string): void { + const formatted = markdownToMatrixHtml(body ?? ""); + if (!formatted) return; + content.format = "org.matrix.custom.html"; + content.formatted_body = formatted; +} + +function buildReplyRelation(replyToId?: string): MatrixReplyRelation | undefined { + const trimmed = replyToId?.trim(); + if (!trimmed) return undefined; + return { "m.in_reply_to": { event_id: trimmed } }; +} + +async function uploadFile( + client: MatrixClient, + file: MatrixUploadContent | Buffer, + params: { + contentType?: string; + filename?: string; + includeFilename?: boolean; + }, +): Promise { + const upload = await client.uploadContent(file as MatrixUploadContent, { + type: params.contentType, + name: params.filename, + includeFilename: params.includeFilename, + }); + return upload.content_uri; +} + +async function resolveMatrixClient(opts: { + client?: MatrixClient; + timeoutMs?: number; +}): Promise<{ client: MatrixClient; stopOnDone: boolean }> { + ensureNodeRuntime(); + if (opts.client) return { client: opts.client, stopOnDone: false }; + const active = getActiveMatrixClient(); + if (active) return { client: active, stopOnDone: false }; + const shouldShareClient = Boolean(process.env.CLAWDBOT_GATEWAY_PORT); + if (shouldShareClient) { + const client = await resolveSharedMatrixClient({ + timeoutMs: opts.timeoutMs, + }); + return { client, stopOnDone: false }; + } + const auth = await resolveMatrixAuth(); + const client = await createMatrixClient({ + homeserver: auth.homeserver, + userId: auth.userId, + accessToken: auth.accessToken, + localTimeoutMs: opts.timeoutMs, + }); + await client.startClient({ + initialSyncLimit: 0, + lazyLoadMembers: true, + threadSupport: true, + }); + await waitForMatrixSync({ client, timeoutMs: opts.timeoutMs }); + return { client, stopOnDone: true }; +} + +export async function sendMessageMatrix( + to: string, + message: string, + opts: MatrixSendOpts = {}, +): Promise { + const trimmedMessage = message?.trim() ?? ""; + if (!trimmedMessage && !opts.mediaUrl) { + throw new Error("Matrix send requires text or media"); + } + const { client, stopOnDone } = await resolveMatrixClient({ + client: opts.client, + timeoutMs: opts.timeoutMs, + }); + try { + const roomId = await resolveMatrixRoomId(client, to); + const cfg = loadConfig(); + const textLimit = resolveTextChunkLimit(cfg, "matrix"); + const chunkLimit = Math.min(textLimit, MATRIX_TEXT_LIMIT); + const chunks = chunkMarkdownText(trimmedMessage, chunkLimit); + const rawThreadId = opts.threadId; + const threadId = + rawThreadId !== undefined && rawThreadId !== null + ? String(rawThreadId).trim() + : null; + const relation = threadId ? undefined : buildReplyRelation(opts.replyToId); + const sendContent = (content: RoomMessageEventContent) => + client.sendMessage(roomId, threadId ?? undefined, content); + + let lastMessageId = ""; + if (opts.mediaUrl) { + const maxBytes = resolveMediaMaxBytes(); + const media = await loadWebMedia(opts.mediaUrl, maxBytes); + const contentUri = await uploadFile(client, media.buffer, { + contentType: media.contentType, + filename: media.fileName, + }); + const msgtype = MsgType.File; + const [firstChunk, ...rest] = chunks; + const body = firstChunk ?? media.fileName ?? "(file)"; + const content = buildMediaContent({ + msgtype, + body, + url: contentUri, + filename: media.fileName, + mimetype: media.contentType, + size: media.buffer.byteLength, + relation, + }); + const response = await sendContent(content); + lastMessageId = response.event_id ?? lastMessageId; + for (const chunk of rest) { + const text = chunk.trim(); + if (!text) continue; + const followup = buildTextContent(text); + const followupRes = await sendContent(followup); + lastMessageId = followupRes.event_id ?? lastMessageId; + } + } else { + for (const chunk of chunks.length ? chunks : [""]) { + const text = chunk.trim(); + if (!text) continue; + const content = buildTextContent(text, relation); + const response = await sendContent(content); + lastMessageId = response.event_id ?? lastMessageId; + } + } + + return { + messageId: lastMessageId || "unknown", + roomId, + }; + } finally { + if (stopOnDone) { + client.stopClient(); + } + } +} + +export async function sendPollMatrix( + to: string, + poll: PollInput, + opts: MatrixSendOpts = {}, +): Promise<{ eventId: string; roomId: string }> { + if (!poll.question?.trim()) { + throw new Error("Matrix poll requires a question"); + } + if (!poll.options?.length) { + throw new Error("Matrix poll requires options"); + } + const { client, stopOnDone } = await resolveMatrixClient({ + client: opts.client, + timeoutMs: opts.timeoutMs, + }); + + try { + const roomId = await resolveMatrixRoomId(client, to); + const pollContent = buildPollStartContent(poll); + const rawThreadId = opts.threadId; + const threadId = + rawThreadId !== undefined && rawThreadId !== null + ? String(rawThreadId).trim() + : null; + const response = await client.sendEvent( + roomId, + threadId ?? undefined, + M_POLL_START as EventType.RoomMessage, + pollContent as unknown as RoomMessageEventContent, + ); + + return { + eventId: response.event_id ?? "unknown", + roomId, + }; + } finally { + if (stopOnDone) { + client.stopClient(); + } + } +} + +export async function sendTypingMatrix( + roomId: string, + typing: boolean, + timeoutMs?: number, + client?: MatrixClient, +): Promise { + const { client: resolved, stopOnDone } = await resolveMatrixClient({ + client, + timeoutMs, + }); + try { + const resolvedTimeoutMs = typeof timeoutMs === "number" ? timeoutMs : 30_000; + await resolved.sendTyping(roomId, typing, resolvedTimeoutMs); + } finally { + if (stopOnDone) { + resolved.stopClient(); + } + } +} + +export async function reactMatrixMessage( + roomId: string, + messageId: string, + emoji: string, + client?: MatrixClient, +): Promise { + if (!emoji.trim()) { + throw new Error("Matrix reaction requires an emoji"); + } + const { client: resolved, stopOnDone } = await resolveMatrixClient({ + client, + }); + try { + const resolvedRoom = await resolveMatrixRoomId(resolved, roomId); + const reaction: ReactionEventContent = { + "m.relates_to": { + rel_type: RelationType.Annotation, + event_id: messageId, + key: emoji, + }, + }; + await resolved.sendEvent(resolvedRoom, EventType.Reaction, reaction); + } finally { + if (stopOnDone) { + resolved.stopClient(); + } + } +} diff --git a/extensions/matrix/src/onboarding.ts b/extensions/matrix/src/onboarding.ts new file mode 100644 index 000000000..1f00c3581 --- /dev/null +++ b/extensions/matrix/src/onboarding.ts @@ -0,0 +1,267 @@ +import { addWildcardAllowFrom } from "../../../src/channels/plugins/onboarding/helpers.js"; +import type { + ChannelOnboardingAdapter, + ChannelOnboardingDmPolicy, +} from "../../../src/channels/plugins/onboarding-types.js"; +import { formatDocsLink } from "../../../src/terminal/links.js"; +import type { WizardPrompter } from "../../../src/wizard/prompts.js"; +import { resolveMatrixAccount } from "./matrix/accounts.js"; +import { ensureMatrixSdkInstalled, isMatrixSdkAvailable } from "./matrix/deps.js"; +import type { CoreConfig, DmPolicy } from "./types.js"; + +const channel = "matrix" as const; + +function setMatrixDmPolicy(cfg: CoreConfig, policy: DmPolicy) { + const allowFrom = policy === "open" ? addWildcardAllowFrom(cfg.channels?.matrix?.dm?.allowFrom) : undefined; + return { + ...cfg, + channels: { + ...cfg.channels, + matrix: { + ...cfg.channels?.matrix, + dm: { + ...cfg.channels?.matrix?.dm, + policy, + ...(allowFrom ? { allowFrom } : {}), + }, + }, + }, + }; +} + +async function noteMatrixAuthHelp(prompter: WizardPrompter): Promise { + await prompter.note( + [ + "Matrix requires a homeserver URL + user ID.", + "Use an access token or a password (password logs in and stores a token).", + "Env vars supported: MATRIX_HOMESERVER, MATRIX_USER_ID, MATRIX_ACCESS_TOKEN, MATRIX_PASSWORD.", + `Docs: ${formatDocsLink("/channels/matrix", "channels/matrix")}`, + ].join("\n"), + "Matrix setup", + ); +} + +async function promptMatrixAllowFrom(params: { + cfg: CoreConfig; + prompter: WizardPrompter; +}): Promise { + const { cfg, prompter } = params; + const existingAllowFrom = cfg.channels?.matrix?.dm?.allowFrom ?? []; + const entry = await prompter.text({ + message: "Matrix allowFrom (user id)", + placeholder: "@user:server", + initialValue: existingAllowFrom[0] ? String(existingAllowFrom[0]) : undefined, + validate: (value) => { + const raw = String(value ?? "").trim(); + if (!raw) return "Required"; + if (!raw.startsWith("@")) return "Matrix user IDs should start with @"; + if (!raw.includes(":")) return "Matrix user IDs should include a server (:@server)"; + return undefined; + }, + }); + const normalized = String(entry).trim(); + const merged = [ + ...existingAllowFrom.map((item) => String(item).trim()).filter(Boolean), + normalized, + ]; + const unique = [...new Set(merged)]; + + return { + ...cfg, + channels: { + ...cfg.channels, + matrix: { + ...cfg.channels?.matrix, + enabled: true, + dm: { + ...cfg.channels?.matrix?.dm, + policy: "allowlist", + allowFrom: unique, + }, + }, + }, + }; +} + +const dmPolicy: ChannelOnboardingDmPolicy = { + label: "Matrix", + channel, + policyKey: "channels.matrix.dm.policy", + allowFromKey: "channels.matrix.dm.allowFrom", + getCurrent: (cfg) => (cfg as CoreConfig).channels?.matrix?.dm?.policy ?? "pairing", + setPolicy: (cfg, policy) => setMatrixDmPolicy(cfg as CoreConfig, policy), +}; + +export const matrixOnboardingAdapter: ChannelOnboardingAdapter = { + channel, + getStatus: async ({ cfg }) => { + const account = resolveMatrixAccount({ cfg: cfg as CoreConfig }); + const configured = account.configured; + const sdkReady = isMatrixSdkAvailable(); + return { + channel, + configured, + statusLines: [`Matrix: ${configured ? "configured" : "needs homeserver + user id"}`], + selectionHint: !sdkReady + ? "install matrix-js-sdk" + : configured + ? "configured" + : "needs auth", + }; + }, + configure: async ({ cfg, runtime, prompter, forceAllowFrom }) => { + let next = cfg as CoreConfig; + await ensureMatrixSdkInstalled({ + runtime, + confirm: async (message) => + await prompter.confirm({ + message, + initialValue: true, + }), + }); + const existing = next.channels?.matrix ?? {}; + const account = resolveMatrixAccount({ cfg: next }); + if (!account.configured) { + await noteMatrixAuthHelp(prompter); + } + + const envHomeserver = process.env.MATRIX_HOMESERVER?.trim(); + const envUserId = process.env.MATRIX_USER_ID?.trim(); + const envAccessToken = process.env.MATRIX_ACCESS_TOKEN?.trim(); + const envPassword = process.env.MATRIX_PASSWORD?.trim(); + const envReady = Boolean(envHomeserver && envUserId && (envAccessToken || envPassword)); + + if ( + envReady && + !existing.homeserver && + !existing.userId && + !existing.accessToken && + !existing.password + ) { + const useEnv = await prompter.confirm({ + message: "Matrix env vars detected. Use env values?", + initialValue: true, + }); + if (useEnv) { + next = { + ...next, + channels: { + ...next.channels, + matrix: { + ...next.channels?.matrix, + enabled: true, + }, + }, + }; + if (forceAllowFrom) { + next = await promptMatrixAllowFrom({ cfg: next, prompter }); + } + return { cfg: next }; + } + } + + const homeserver = String( + await prompter.text({ + message: "Matrix homeserver URL", + initialValue: existing.homeserver ?? envHomeserver, + validate: (value) => { + const raw = String(value ?? "").trim(); + if (!raw) return "Required"; + if (!/^https?:\/\//i.test(raw)) return "Use a full URL (https://...)"; + return undefined; + }, + }), + ).trim(); + + const userId = String( + await prompter.text({ + message: "Matrix user ID", + initialValue: existing.userId ?? envUserId, + validate: (value) => { + const raw = String(value ?? "").trim(); + if (!raw) return "Required"; + if (!raw.startsWith("@")) return "Matrix user IDs should start with @"; + if (!raw.includes(":")) return "Matrix user IDs should include a server (:@server)"; + return undefined; + }, + }), + ).trim(); + + let accessToken = existing.accessToken ?? ""; + let password = existing.password ?? ""; + + if (accessToken || password) { + const keep = await prompter.confirm({ + message: "Matrix credentials already configured. Keep them?", + initialValue: true, + }); + if (!keep) { + accessToken = ""; + password = ""; + } + } + + if (!accessToken && !password) { + const authMode = (await prompter.select({ + message: "Matrix auth method", + options: [ + { value: "token", label: "Access token" }, + { value: "password", label: "Password (stores token)" }, + ], + })) as "token" | "password"; + + if (authMode === "token") { + accessToken = String( + await prompter.text({ + message: "Matrix access token", + validate: (value) => (value?.trim() ? undefined : "Required"), + }), + ).trim(); + } else { + password = String( + await prompter.text({ + message: "Matrix password", + validate: (value) => (value?.trim() ? undefined : "Required"), + }), + ).trim(); + } + } + + const deviceName = String( + await prompter.text({ + message: "Matrix device name (optional)", + initialValue: existing.deviceName ?? "Clawdbot Gateway", + }), + ).trim(); + + next = { + ...next, + channels: { + ...next.channels, + matrix: { + ...next.channels?.matrix, + enabled: true, + homeserver, + userId, + accessToken: accessToken || undefined, + password: password || undefined, + deviceName: deviceName || undefined, + }, + }, + }; + + if (forceAllowFrom) { + next = await promptMatrixAllowFrom({ cfg: next, prompter }); + } + + return { cfg: next }; + }, + dmPolicy, + disable: (cfg) => ({ + ...(cfg as CoreConfig), + channels: { + ...(cfg as CoreConfig).channels, + matrix: { ...(cfg as CoreConfig).channels?.matrix, enabled: false }, + }, + }), +}; diff --git a/extensions/matrix/src/outbound.ts b/extensions/matrix/src/outbound.ts new file mode 100644 index 000000000..790056486 --- /dev/null +++ b/extensions/matrix/src/outbound.ts @@ -0,0 +1,61 @@ +import { chunkMarkdownText } from "../../../src/auto-reply/chunk.js"; +import type { ChannelOutboundAdapter } from "../../../src/channels/plugins/types.js"; +import { sendMessageMatrix, sendPollMatrix } from "./matrix/send.js"; + +export const matrixOutbound: ChannelOutboundAdapter = { + deliveryMode: "direct", + chunker: chunkMarkdownText, + textChunkLimit: 4000, + resolveTarget: ({ to }) => { + const trimmed = to?.trim(); + if (!trimmed) { + return { + ok: false, + error: new Error("Delivering to Matrix requires --to "), + }; + } + return { ok: true, to: trimmed }; + }, + sendText: async ({ to, text, deps, replyToId, threadId }) => { + const send = deps?.sendMatrix ?? sendMessageMatrix; + const resolvedThreadId = + threadId !== undefined && threadId !== null ? String(threadId) : undefined; + const result = await send(to, text, { + replyToId: replyToId ?? undefined, + threadId: resolvedThreadId, + }); + return { + channel: "matrix", + messageId: result.messageId, + roomId: result.roomId, + }; + }, + sendMedia: async ({ to, text, mediaUrl, deps, replyToId, threadId }) => { + const send = deps?.sendMatrix ?? sendMessageMatrix; + const resolvedThreadId = + threadId !== undefined && threadId !== null ? String(threadId) : undefined; + const result = await send(to, text, { + mediaUrl, + replyToId: replyToId ?? undefined, + threadId: resolvedThreadId, + }); + return { + channel: "matrix", + messageId: result.messageId, + roomId: result.roomId, + }; + }, + sendPoll: async ({ to, poll, threadId }) => { + const resolvedThreadId = + threadId !== undefined && threadId !== null ? String(threadId) : undefined; + const result = await sendPollMatrix(to, poll, { + threadId: resolvedThreadId, + }); + return { + channel: "matrix", + messageId: result.eventId, + roomId: result.roomId, + pollId: result.eventId, + }; + }, +}; diff --git a/extensions/matrix/src/tool-actions.ts b/extensions/matrix/src/tool-actions.ts new file mode 100644 index 000000000..947748a2c --- /dev/null +++ b/extensions/matrix/src/tool-actions.ts @@ -0,0 +1,160 @@ +import type { AgentToolResult } from "@mariozechner/pi-agent-core"; + +import type { CoreConfig } from "./types.js"; +import { + deleteMatrixMessage, + editMatrixMessage, + getMatrixMemberInfo, + getMatrixRoomInfo, + listMatrixPins, + listMatrixReactions, + pinMatrixMessage, + readMatrixMessages, + removeMatrixReactions, + sendMatrixMessage, + unpinMatrixMessage, +} from "./matrix/actions.js"; +import { reactMatrixMessage } from "./matrix/send.js"; +import { + createActionGate, + jsonResult, + readNumberParam, + readReactionParams, + readStringParam, +} from "../../../src/agents/tools/common.js"; + +const messageActions = new Set(["sendMessage", "editMessage", "deleteMessage", "readMessages"]); +const reactionActions = new Set(["react", "reactions"]); +const pinActions = new Set(["pinMessage", "unpinMessage", "listPins"]); + +function readRoomId(params: Record, required = true): string { + const direct = readStringParam(params, "roomId") ?? readStringParam(params, "channelId"); + if (direct) return direct; + if (!required) return readStringParam(params, "to") ?? ""; + return readStringParam(params, "to", { required: true }); +} + +export async function handleMatrixAction( + params: Record, + cfg: CoreConfig, +): Promise> { + const action = readStringParam(params, "action", { required: true }); + const isActionEnabled = createActionGate(cfg.channels?.matrix?.actions); + + if (reactionActions.has(action)) { + if (!isActionEnabled("reactions")) { + throw new Error("Matrix reactions are disabled."); + } + const roomId = readRoomId(params); + const messageId = readStringParam(params, "messageId", { required: true }); + if (action === "react") { + const { emoji, remove, isEmpty } = readReactionParams(params, { + removeErrorMessage: "Emoji is required to remove a Matrix reaction.", + }); + if (remove || isEmpty) { + const result = await removeMatrixReactions(roomId, messageId, { + emoji: remove ? emoji : undefined, + }); + return jsonResult({ ok: true, removed: result.removed }); + } + await reactMatrixMessage(roomId, messageId, emoji); + return jsonResult({ ok: true, added: emoji }); + } + const reactions = await listMatrixReactions(roomId, messageId); + return jsonResult({ ok: true, reactions }); + } + + if (messageActions.has(action)) { + if (!isActionEnabled("messages")) { + throw new Error("Matrix messages are disabled."); + } + switch (action) { + case "sendMessage": { + const to = readStringParam(params, "to", { required: true }); + const content = readStringParam(params, "content", { + required: true, + allowEmpty: true, + }); + const mediaUrl = readStringParam(params, "mediaUrl"); + const replyToId = readStringParam(params, "replyToId") ?? readStringParam(params, "replyTo"); + const threadId = readStringParam(params, "threadId"); + const result = await sendMatrixMessage(to, content, { + mediaUrl: mediaUrl ?? undefined, + replyToId: replyToId ?? undefined, + threadId: threadId ?? undefined, + }); + return jsonResult({ ok: true, result }); + } + case "editMessage": { + const roomId = readRoomId(params); + const messageId = readStringParam(params, "messageId", { required: true }); + const content = readStringParam(params, "content", { required: true }); + const result = await editMatrixMessage(roomId, messageId, content); + return jsonResult({ ok: true, result }); + } + case "deleteMessage": { + const roomId = readRoomId(params); + const messageId = readStringParam(params, "messageId", { required: true }); + const reason = readStringParam(params, "reason"); + await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined }); + return jsonResult({ ok: true, deleted: true }); + } + case "readMessages": { + const roomId = readRoomId(params); + const limit = readNumberParam(params, "limit", { integer: true }); + const before = readStringParam(params, "before"); + const after = readStringParam(params, "after"); + const result = await readMatrixMessages(roomId, { + limit: limit ?? undefined, + before: before ?? undefined, + after: after ?? undefined, + }); + return jsonResult({ ok: true, ...result }); + } + default: + break; + } + } + + if (pinActions.has(action)) { + if (!isActionEnabled("pins")) { + throw new Error("Matrix pins are disabled."); + } + const roomId = readRoomId(params); + if (action === "pinMessage") { + const messageId = readStringParam(params, "messageId", { required: true }); + const result = await pinMatrixMessage(roomId, messageId); + return jsonResult({ ok: true, pinned: result.pinned }); + } + if (action === "unpinMessage") { + const messageId = readStringParam(params, "messageId", { required: true }); + const result = await unpinMatrixMessage(roomId, messageId); + return jsonResult({ ok: true, pinned: result.pinned }); + } + const result = await listMatrixPins(roomId); + return jsonResult({ ok: true, pinned: result.pinned, events: result.events }); + } + + if (action === "memberInfo") { + if (!isActionEnabled("memberInfo")) { + throw new Error("Matrix member info is disabled."); + } + const userId = readStringParam(params, "userId", { required: true }); + const roomId = readStringParam(params, "roomId") ?? readStringParam(params, "channelId"); + const result = await getMatrixMemberInfo(userId, { + roomId: roomId ?? undefined, + }); + return jsonResult({ ok: true, member: result }); + } + + if (action === "channelInfo") { + if (!isActionEnabled("channelInfo")) { + throw new Error("Matrix room info is disabled."); + } + const roomId = readRoomId(params); + const result = await getMatrixRoomInfo(roomId); + return jsonResult({ ok: true, room: result }); + } + + throw new Error(`Unsupported Matrix action: ${action}`); +} diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts new file mode 100644 index 000000000..e5c91e2e2 --- /dev/null +++ b/extensions/matrix/src/types.ts @@ -0,0 +1,85 @@ +export type ReplyToMode = "off" | "first" | "all"; +export type GroupPolicy = "open" | "disabled" | "allowlist"; +export type DmPolicy = "pairing" | "allowlist" | "open" | "disabled"; + +export type MatrixDmConfig = { + /** If false, ignore all incoming Matrix DMs. Default: true. */ + enabled?: boolean; + /** Direct message access policy (default: pairing). */ + policy?: DmPolicy; + /** Allowlist for DM senders (matrix user IDs, localparts, or "*"). */ + allowFrom?: Array; +}; + +export type MatrixRoomConfig = { + /** If false, disable the bot in this room (alias for allow: false). */ + enabled?: boolean; + /** Legacy room allow toggle; prefer enabled. */ + allow?: boolean; + /** Require mentioning the bot to trigger replies. */ + requireMention?: boolean; + /** If true, reply without mention requirements. */ + autoReply?: boolean; + /** Optional allowlist for room senders (user IDs or localparts). */ + users?: Array; + /** Optional skill filter for this room. */ + skills?: string[]; + /** Optional system prompt snippet for this room. */ + systemPrompt?: string; +}; + +export type MatrixActionConfig = { + reactions?: boolean; + messages?: boolean; + pins?: boolean; + memberInfo?: boolean; + channelInfo?: boolean; +}; + +export type MatrixConfig = { + /** Optional display name for this account (used in CLI/UI lists). */ + name?: string; + /** If false, do not start Matrix. Default: true. */ + enabled?: boolean; + /** Matrix homeserver URL (https://matrix.example.org). */ + homeserver?: string; + /** Matrix user id (@user:server). */ + userId?: string; + /** Matrix access token. */ + accessToken?: string; + /** Matrix password (used only to fetch access token). */ + password?: string; + /** Optional device name when logging in via password. */ + deviceName?: string; + /** Initial sync limit for startup (default: matrix-js-sdk default). */ + initialSyncLimit?: number; + /** If true, enforce allowlists for groups + DMs regardless of policy. */ + allowlistOnly?: boolean; + /** Group message policy (default: allowlist). */ + groupPolicy?: GroupPolicy; + /** Control reply threading when reply tags are present (off|first|all). */ + replyToMode?: ReplyToMode; + /** How to handle thread replies (off|inbound|always). */ + threadReplies?: "off" | "inbound" | "always"; + /** Outbound text chunk size (chars). Default: 4000. */ + textChunkLimit?: number; + /** Max outbound media size in MB. */ + mediaMaxMb?: number; + /** Auto-join invites (always|allowlist|off). Default: always. */ + autoJoin?: "always" | "allowlist" | "off"; + /** Allowlist for auto-join invites (room IDs, aliases). */ + autoJoinAllowlist?: Array; + /** Direct message policy + allowlist overrides. */ + dm?: MatrixDmConfig; + /** Room config allowlist keyed by room ID, alias, or name. */ + rooms?: Record; + /** Per-action tool gating (default: true for all). */ + actions?: MatrixActionConfig; +}; + +export type CoreConfig = { + channels?: { + matrix?: MatrixConfig; + }; + [key: string]: unknown; +}; diff --git a/src/auto-reply/reply/agent-runner-utils.ts b/src/auto-reply/reply/agent-runner-utils.ts index 5caf4399e..4110dfb18 100644 --- a/src/auto-reply/reply/agent-runner-utils.ts +++ b/src/auto-reply/reply/agent-runner-utils.ts @@ -41,6 +41,7 @@ export function buildThreadingToolContext(params: { To: threadingTo, ReplyToId: sessionCtx.ReplyToId, ThreadLabel: sessionCtx.ThreadLabel, + MessageThreadId: sessionCtx.MessageThreadId, }, hasRepliedRef, }) ?? {} diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 103df3432..590fc0edc 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -216,9 +216,11 @@ export async function runReplyAgent(params: { abortedLastRun: false, }; const agentId = resolveAgentIdFromSessionKey(sessionKey); - const topicId = - typeof sessionCtx.MessageThreadId === "number" ? sessionCtx.MessageThreadId : undefined; - const nextSessionFile = resolveSessionTranscriptPath(nextSessionId, agentId, topicId); + const nextSessionFile = resolveSessionTranscriptPath( + nextSessionId, + agentId, + sessionCtx.MessageThreadId, + ); nextEntry.sessionFile = nextSessionFile; activeSessionStore[sessionKey] = nextEntry; try { diff --git a/src/auto-reply/reply/inbound-dedupe.ts b/src/auto-reply/reply/inbound-dedupe.ts index 00f105001..fdfba0a03 100644 --- a/src/auto-reply/reply/inbound-dedupe.ts +++ b/src/auto-reply/reply/inbound-dedupe.ts @@ -23,7 +23,10 @@ export function buildInboundDedupeKey(ctx: MsgContext): string | null { if (!peerId) return null; const sessionKey = ctx.SessionKey?.trim() ?? ""; const accountId = ctx.AccountId?.trim() ?? ""; - const threadId = typeof ctx.MessageThreadId === "number" ? String(ctx.MessageThreadId) : ""; + const threadId = + ctx.MessageThreadId !== undefined && ctx.MessageThreadId !== null + ? String(ctx.MessageThreadId) + : ""; return [provider, accountId, sessionKey, peerId, threadId, messageId].filter(Boolean).join("|"); } diff --git a/src/auto-reply/reply/queue/types.ts b/src/auto-reply/reply/queue/types.ts index f04828a1c..c64143d75 100644 --- a/src/auto-reply/reply/queue/types.ts +++ b/src/auto-reply/reply/queue/types.ts @@ -36,8 +36,8 @@ export type FollowupRun = { originatingTo?: string; /** Provider account id (multi-account). */ originatingAccountId?: string; - /** Telegram forum topic thread id. */ - originatingThreadId?: number; + /** Thread id for reply routing (Telegram topic id or Matrix thread event id). */ + originatingThreadId?: string | number; run: { agentId: string; agentDir: string; diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index 4a2b19e47..fd774abb6 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -27,8 +27,8 @@ export type RouteReplyParams = { sessionKey?: string; /** Provider account id (multi-account). */ accountId?: string; - /** Telegram message thread id (forum topics). */ - threadId?: number; + /** Thread id for replies (Telegram topic id or Matrix thread event id). */ + threadId?: string | number; /** Config for provider-specific settings. */ cfg: ClawdbotConfig; /** Optional abort signal for cooperative cancellation. */ diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index c435deac4..66a4c2815 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -53,8 +53,8 @@ export type MsgContext = { CommandAuthorized?: boolean; CommandSource?: "text" | "native"; CommandTargetSessionKey?: string; - /** Telegram forum topic thread ID. */ - MessageThreadId?: number; + /** Thread identifier (Telegram topic id or Matrix thread event id). */ + MessageThreadId?: string | number; /** Telegram forum supergroup marker. */ IsForum?: boolean; /** diff --git a/src/channels/dock.ts b/src/channels/dock.ts index 908903dc2..000a37fb4 100644 --- a/src/channels/dock.ts +++ b/src/channels/dock.ts @@ -103,11 +103,14 @@ const DOCKS: Record = { }, threading: { resolveReplyToMode: ({ cfg }) => cfg.channels?.telegram?.replyToMode ?? "first", - buildToolContext: ({ context, hasRepliedRef }) => ({ - currentChannelId: context.To?.trim() || undefined, - currentThreadTs: context.ReplyToId, - hasRepliedRef, - }), + buildToolContext: ({ context, hasRepliedRef }) => { + const threadId = context.MessageThreadId ?? context.ReplyToId; + return { + currentChannelId: context.To?.trim() || undefined, + currentThreadTs: threadId != null ? String(threadId) : undefined, + hasRepliedRef, + }; + }, }, }, whatsapp: { diff --git a/src/channels/plugins/catalog.ts b/src/channels/plugins/catalog.ts index 83edbd244..8ae13c26d 100644 --- a/src/channels/plugins/catalog.ts +++ b/src/channels/plugins/catalog.ts @@ -6,10 +6,29 @@ export type ChannelPluginCatalogEntry = { install: { npmSpec: string; localPath?: string; + defaultChoice?: "npm" | "local"; }; }; const CATALOG: ChannelPluginCatalogEntry[] = [ + { + id: "matrix", + meta: { + id: "matrix", + label: "Matrix", + selectionLabel: "Matrix (plugin)", + docsPath: "/channels/matrix", + docsLabel: "matrix", + blurb: "open protocol; install the plugin to enable.", + order: 70, + quickstartAllowFrom: true, + }, + install: { + npmSpec: "@clawdbot/matrix", + localPath: "extensions/matrix", + defaultChoice: "npm", + }, + }, { id: "zalo", meta: { diff --git a/src/channels/plugins/types.adapters.ts b/src/channels/plugins/types.adapters.ts index 10aebc7f8..897a16f95 100644 --- a/src/channels/plugins/types.adapters.ts +++ b/src/channels/plugins/types.adapters.ts @@ -73,7 +73,7 @@ export type ChannelOutboundContext = { mediaUrl?: string; gifPlayback?: boolean; replyToId?: string | null; - threadId?: number | null; + threadId?: string | number | null; accountId?: string | null; deps?: OutboundSendDeps; }; diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index db14ab48c..87213f1a0 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -31,6 +31,12 @@ export type ChannelSetupInput = { httpHost?: string; httpPort?: string; useEnv?: boolean; + homeserver?: string; + userId?: string; + accessToken?: string; + password?: string; + deviceName?: string; + initialSyncLimit?: number; }; export type ChannelStatusIssue = { @@ -196,6 +202,7 @@ export type ChannelThreadingContext = { To?: string; ReplyToId?: string; ThreadLabel?: string; + MessageThreadId?: string | number; }; export type ChannelThreadingToolContext = { diff --git a/src/cli/channels-cli.ts b/src/cli/channels-cli.ts index 73fad1bd6..af6e6a366 100644 --- a/src/cli/channels-cli.ts +++ b/src/cli/channels-cli.ts @@ -32,6 +32,12 @@ const optionNamesAdd = [ "httpHost", "httpPort", "useEnv", + "homeserver", + "userId", + "accessToken", + "password", + "deviceName", + "initialSyncLimit", ] as const; const optionNamesRemove = ["channel", "account", "delete"] as const; @@ -115,6 +121,12 @@ export function registerChannelsCli(program: Command) { .option("--http-url ", "Signal HTTP daemon base URL") .option("--http-host ", "Signal HTTP host") .option("--http-port ", "Signal HTTP port") + .option("--homeserver ", "Matrix homeserver URL") + .option("--user-id ", "Matrix user ID") + .option("--access-token ", "Matrix access token") + .option("--password ", "Matrix password") + .option("--device-name ", "Matrix device name") + .option("--initial-sync-limit ", "Matrix initial sync limit") .option("--use-env", "Use env token (default account only)", false) .action(async (opts, command) => { try { diff --git a/src/commands/channels/add-mutators.ts b/src/commands/channels/add-mutators.ts index 2f154835b..1c846d175 100644 --- a/src/commands/channels/add-mutators.ts +++ b/src/commands/channels/add-mutators.ts @@ -36,6 +36,12 @@ export function applyChannelAccountConfig(params: { httpHost?: string; httpPort?: string; useEnv?: boolean; + homeserver?: string; + userId?: string; + accessToken?: string; + password?: string; + deviceName?: string; + initialSyncLimit?: number; }): ClawdbotConfig { const accountId = normalizeAccountId(params.accountId); const plugin = getChannelPlugin(params.channel); @@ -57,6 +63,12 @@ export function applyChannelAccountConfig(params: { httpHost: params.httpHost, httpPort: params.httpPort, useEnv: params.useEnv, + homeserver: params.homeserver, + userId: params.userId, + accessToken: params.accessToken, + password: params.password, + deviceName: params.deviceName, + initialSyncLimit: params.initialSyncLimit, }; return apply({ cfg: params.cfg, accountId, input }); } diff --git a/src/commands/channels/add.ts b/src/commands/channels/add.ts index 43664110e..04e616c3d 100644 --- a/src/commands/channels/add.ts +++ b/src/commands/channels/add.ts @@ -27,6 +27,12 @@ export type ChannelsAddOptions = { httpHost?: string; httpPort?: string; useEnv?: boolean; + homeserver?: string; + userId?: string; + accessToken?: string; + password?: string; + deviceName?: string; + initialSyncLimit?: number | string; }; export async function channelsAddCommand( @@ -88,9 +94,9 @@ export async function channelsAddCommand( } await writeConfigFile(nextConfig); - await prompter.outro("Channels updated."); - return; - } + await prompter.outro("Channels updated."); + return; + } const channel = normalizeChannelId(opts.channel); if (!channel) { @@ -109,6 +115,12 @@ export async function channelsAddCommand( plugin.setup.resolveAccountId?.({ cfg, accountId: opts.account }) ?? normalizeAccountId(opts.account); const useEnv = opts.useEnv === true; + const initialSyncLimit = + typeof opts.initialSyncLimit === "number" + ? opts.initialSyncLimit + : typeof opts.initialSyncLimit === "string" && opts.initialSyncLimit.trim() + ? Number.parseInt(opts.initialSyncLimit, 10) + : undefined; const validationError = plugin.setup.validateInput?.({ cfg, accountId, @@ -127,6 +139,12 @@ export async function channelsAddCommand( httpUrl: opts.httpUrl, httpHost: opts.httpHost, httpPort: opts.httpPort, + homeserver: opts.homeserver, + userId: opts.userId, + accessToken: opts.accessToken, + password: opts.password, + deviceName: opts.deviceName, + initialSyncLimit, useEnv, }, }); @@ -154,6 +172,12 @@ export async function channelsAddCommand( httpUrl: opts.httpUrl, httpHost: opts.httpHost, httpPort: opts.httpPort, + homeserver: opts.homeserver, + userId: opts.userId, + accessToken: opts.accessToken, + password: opts.password, + deviceName: opts.deviceName, + initialSyncLimit, useEnv, }); diff --git a/src/config/sessions/paths.ts b/src/config/sessions/paths.ts index 829568e7d..63d2f6834 100644 --- a/src/config/sessions/paths.ts +++ b/src/config/sessions/paths.ts @@ -36,10 +36,16 @@ export function resolveDefaultSessionStorePath(agentId?: string): string { export function resolveSessionTranscriptPath( sessionId: string, agentId?: string, - topicId?: number, + topicId?: string | number, ): string { + const safeTopicId = + typeof topicId === "string" + ? encodeURIComponent(topicId) + : typeof topicId === "number" + ? String(topicId) + : undefined; const fileName = - topicId !== undefined ? `${sessionId}-topic-${topicId}.jsonl` : `${sessionId}.jsonl`; + safeTopicId !== undefined ? `${sessionId}-topic-${safeTopicId}.jsonl` : `${sessionId}.jsonl`; return path.join(resolveAgentSessionsDir(agentId), fileName); } diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 839b8266a..51f87d7c3 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -18,6 +18,12 @@ import type { OutboundChannel } from "./targets.js"; export type { NormalizedOutboundPayload } from "./payloads.js"; export { normalizeOutboundPayloads } from "./payloads.js"; +type SendMatrixMessage = ( + to: string, + text: string, + opts?: { mediaUrl?: string; replyToId?: string; threadId?: string; timeoutMs?: number }, +) => Promise<{ messageId: string; roomId: string }>; + export type OutboundSendDeps = { sendWhatsApp?: typeof sendMessageWhatsApp; sendTelegram?: typeof sendMessageTelegram; @@ -25,6 +31,7 @@ export type OutboundSendDeps = { sendSlack?: typeof sendMessageSlack; sendSignal?: typeof sendMessageSignal; sendIMessage?: typeof sendMessageIMessage; + sendMatrix?: SendMatrixMessage; sendMSTeams?: ( to: string, text: string, @@ -37,6 +44,7 @@ export type OutboundDeliveryResult = { messageId: string; chatId?: string; channelId?: string; + roomId?: string; conversationId?: string; timestamp?: number; toJid?: string; @@ -67,7 +75,7 @@ async function createChannelHandler(params: { to: string; accountId?: string; replyToId?: string | null; - threadId?: number | null; + threadId?: string | number | null; deps?: OutboundSendDeps; gifPlayback?: boolean; }): Promise { @@ -99,7 +107,7 @@ function createPluginHandler(params: { to: string; accountId?: string; replyToId?: string | null; - threadId?: number | null; + threadId?: string | number | null; deps?: OutboundSendDeps; gifPlayback?: boolean; }): ChannelHandler | null { @@ -144,7 +152,7 @@ export async function deliverOutboundPayloads(params: { accountId?: string; payloads: ReplyPayload[]; replyToId?: string | null; - threadId?: number | null; + threadId?: string | number | null; deps?: OutboundSendDeps; gifPlayback?: boolean; abortSignal?: AbortSignal; diff --git a/src/infra/outbound/format.ts b/src/infra/outbound/format.ts index c4f02a8ac..8e3656902 100644 --- a/src/infra/outbound/format.ts +++ b/src/infra/outbound/format.ts @@ -10,6 +10,7 @@ export type OutboundDeliveryJson = { mediaUrl: string | null; chatId?: string; channelId?: string; + roomId?: string; conversationId?: string; timestamp?: number; toJid?: string; @@ -20,6 +21,7 @@ type OutboundDeliveryMeta = { messageId?: string; chatId?: string; channelId?: string; + roomId?: string; conversationId?: string; timestamp?: number; toJid?: string; @@ -42,6 +44,7 @@ export function formatOutboundDeliverySummary( if ("chatId" in result) return `${base} (chat ${result.chatId})`; if ("channelId" in result) return `${base} (channel ${result.channelId})`; + if ("roomId" in result) return `${base} (room ${result.roomId})`; if ("conversationId" in result) return `${base} (conversation ${result.conversationId})`; return base; } @@ -69,6 +72,9 @@ export function buildOutboundDeliveryJson(params: { if (result && "channelId" in result && result.channelId !== undefined) { payload.channelId = result.channelId; } + if (result && "roomId" in result && result.roomId !== undefined) { + payload.roomId = result.roomId; + } if (result && "conversationId" in result && result.conversationId !== undefined) { payload.conversationId = result.conversationId; }