diff --git a/docs/channels/index.md b/docs/channels/index.md index 80db578dd..b198a8c71 100644 --- a/docs/channels/index.md +++ b/docs/channels/index.md @@ -18,6 +18,7 @@ Text is supported everywhere; media and reactions vary by channel. - [Signal](/channels/signal) — signal-cli; privacy-focused. - [iMessage](/channels/imessage) — macOS only; native integration. - [Microsoft Teams](/channels/msteams) — Bot Framework; enterprise support. +- [Zalo](/channels/zalo) — Zalo Bot API; Vietnam's popular messenger (plugin). - [WebChat](/web/webchat) — Gateway WebChat UI over WebSocket. ## Notes diff --git a/docs/channels/zalo.md b/docs/channels/zalo.md new file mode 100644 index 000000000..38839a117 --- /dev/null +++ b/docs/channels/zalo.md @@ -0,0 +1,159 @@ +--- +summary: "Zalo bot support status, capabilities, and configuration" +read_when: + - Working on Zalo features or webhooks +--- +# Zalo (Bot API) + +Status: experimental. Direct messages only; groups coming soon per Zalo docs. + +## Quick setup (beginner) +1) Install the Zalo plugin: + - From a source checkout: `clawdbot plugins install ./extensions/zalo` + - From npm (if published): `clawdbot plugins install @clawdbot/zalo` + - Or pick **Zalo** in onboarding and confirm the install prompt +2) Set the token: + - Env: `ZALO_BOT_TOKEN=...` + - Or config: `channels.zalo.botToken: "..."`. +3) Restart the gateway (or finish onboarding). +4) DM access is pairing by default; approve the pairing code on first contact. + +Minimal config: +```json5 +{ + channels: { + zalo: { + enabled: true, + botToken: "12345689:abc-xyz", + dmPolicy: "pairing" + } + } +} +``` + +## What it is +- A Zalo Bot API channel owned by the Gateway. +- Deterministic routing: replies go back to Zalo; the model never chooses channels. +- DMs share the agent's main session. +- Groups are not yet supported (Zalo docs state "coming soon"). + +## Setup (fast path) + +### 1) Create a bot token (Zalo Bot Platform) +1) Go to **https://bot.zaloplatforms.com** and sign in. +2) Create a new bot and configure its settings. +3) Copy the bot token (format: `12345689:abc-xyz`). + +### 2) Configure the token (env or config) +Example: + +```json5 +{ + channels: { + zalo: { + enabled: true, + botToken: "12345689:abc-xyz", + dmPolicy: "pairing" + } + } +} +``` + +Env option: `ZALO_BOT_TOKEN=...` (works for the default account only). + +Multi-account support: use `channels.zalo.accounts` with per-account tokens and optional `name`. + +3) Restart the gateway. Zalo starts when a token is resolved (env or config). +4) DM access defaults to pairing. Approve the code when the bot is first contacted. + +## How it works (behavior) +- Inbound messages are normalized into the shared channel envelope with media placeholders. +- Replies always route back to the same Zalo chat. +- Long-polling by default; webhook mode available with `channels.zalo.webhookUrl`. + +## Limits +- Outbound text is chunked to 2000 characters (Zalo API limit). +- Media downloads/uploads are capped by `channels.zalo.mediaMaxMb` (default 5). +- Streaming is blocked by default due to the 2000 char limit making streaming less useful. + +## Access control (DMs) + +### DM access +- Default: `channels.zalo.dmPolicy = "pairing"`. Unknown senders receive a pairing code; messages are ignored until approved (codes expire after 1 hour). +- Approve via: + - `clawdbot pairing list zalo` + - `clawdbot pairing approve zalo ` +- Pairing is the default token exchange. Details: [Pairing](/start/pairing) +- `channels.zalo.allowFrom` accepts numeric user IDs. + +## Long-polling vs webhook +- Default: long-polling (no public URL required). +- Webhook mode: set `channels.zalo.webhookUrl` and `channels.zalo.webhookSecret`. + - The webhook secret must be 8-256 characters. + - Webhook URL must use HTTPS. + - Zalo sends events with `X-Bot-Api-Secret-Token` header for verification. + - Gateway HTTP handles webhook requests at `channels.zalo.webhookPath` (defaults to the webhook URL path). + +**Note:** getUpdates (polling) and webhook are mutually exclusive per Zalo API docs. + +## Supported message types +- **Text messages**: Full support with 2000 character chunking. +- **Image messages**: Download and process inbound images; send images via `sendPhoto`. +- **Stickers**: Logged but not fully processed (no agent response). +- **Unsupported types**: Logged (e.g., messages from protected users). + +## Capabilities +| Feature | Status | +|---------|--------| +| Direct messages | ✅ Supported | +| Groups | ❌ Coming soon (per Zalo docs) | +| Media (images) | ✅ Supported | +| Reactions | ❌ Not supported | +| Threads | ❌ Not supported | +| Polls | ❌ Not supported | +| Native commands | ❌ Not supported | +| Streaming | ⚠️ Blocked (2000 char limit) | + +## Delivery targets (CLI/cron) +- Use a chat id as the target. +- Example: `clawdbot message send --channel zalo --to 123456789 --message "hi"`. + +## Troubleshooting + +**Bot doesn't respond:** +- Check that the token is valid: `clawdbot channels status --probe` +- Verify the sender is approved (pairing or allowFrom) +- Check gateway logs: `clawdbot logs --follow` + +**Webhook not receiving events:** +- Ensure webhook URL uses HTTPS +- Verify secret token is 8-256 characters +- Confirm the gateway HTTP endpoint is reachable on the configured path +- Check that getUpdates polling is not running (they're mutually exclusive) + +## Configuration reference (Zalo) +Full configuration: [Configuration](/gateway/configuration) + +Provider options: +- `channels.zalo.enabled`: enable/disable channel startup. +- `channels.zalo.botToken`: bot token from Zalo Bot Platform. +- `channels.zalo.tokenFile`: read token from file path. +- `channels.zalo.dmPolicy`: `pairing | allowlist | open | disabled` (default: pairing). +- `channels.zalo.allowFrom`: DM allowlist (user IDs). `open` requires `"*"`. +- `channels.zalo.mediaMaxMb`: inbound/outbound media cap (MB, default 5). +- `channels.zalo.webhookUrl`: enable webhook mode (HTTPS required). +- `channels.zalo.webhookSecret`: webhook secret (8-256 chars). +- `channels.zalo.webhookPath`: webhook path on the gateway HTTP server. +- `channels.zalo.proxy`: proxy URL for API requests. + +Multi-account options: +- `channels.zalo.accounts..botToken`: per-account token. +- `channels.zalo.accounts..tokenFile`: per-account token file. +- `channels.zalo.accounts..name`: display name. +- `channels.zalo.accounts..enabled`: enable/disable account. +- `channels.zalo.accounts..dmPolicy`: per-account DM policy. +- `channels.zalo.accounts..allowFrom`: per-account allowlist. +- `channels.zalo.accounts..webhookUrl`: per-account webhook URL. +- `channels.zalo.accounts..webhookSecret`: per-account webhook secret. +- `channels.zalo.accounts..webhookPath`: per-account webhook path. +- `channels.zalo.accounts..proxy`: per-account proxy URL. diff --git a/docs/plugin.md b/docs/plugin.md index e868588ee..78a203d4d 100644 --- a/docs/plugin.md +++ b/docs/plugin.md @@ -37,6 +37,7 @@ Clawdbot plugins are **TypeScript modules** loaded at runtime via jiti. They can register: - Gateway RPC methods +- Gateway HTTP handlers - Agent tools - CLI commands - Background services diff --git a/extensions/zalo/CHANGELOG.md b/extensions/zalo/CHANGELOG.md new file mode 100644 index 000000000..3c2e37730 --- /dev/null +++ b/extensions/zalo/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +## 0.1.0 + +### Features +- Zalo Bot API channel plugin with token-based auth (env/config/file). +- Direct message support (DMs only) with pairing/allowlist/open/disabled policies. +- Polling and webhook delivery modes. +- Text + image messaging with 2000-char chunking and media size caps. +- Multi-account support with per-account config. diff --git a/extensions/zalo/README.md b/extensions/zalo/README.md new file mode 100644 index 000000000..1d9d3483a --- /dev/null +++ b/extensions/zalo/README.md @@ -0,0 +1,50 @@ +# @clawdbot/zalo + +Zalo channel plugin for Clawdbot (Bot API). + +## Install (local checkout) + +```bash +clawdbot plugins install ./extensions/zalo +``` + +## Install (npm) + +```bash +clawdbot plugins install @clawdbot/zalo +``` + +Onboarding: select Zalo and confirm the install prompt to fetch the plugin automatically. + +## Config + +```json5 +{ + channels: { + zalo: { + enabled: true, + botToken: "12345689:abc-xyz", + dmPolicy: "pairing", + proxy: "http://proxy.local:8080" + } + } +} +``` + +## Webhook mode + +```json5 +{ + channels: { + zalo: { + webhookUrl: "https://example.com/zalo-webhook", + webhookSecret: "your-secret-8-plus-chars", + webhookPath: "/zalo-webhook" + } + } +} +``` + +If `webhookPath` is omitted, the plugin uses the webhook URL path. + +Restart the gateway after config changes. diff --git a/extensions/zalo/index.ts b/extensions/zalo/index.ts new file mode 100644 index 000000000..aa85bead4 --- /dev/null +++ b/extensions/zalo/index.ts @@ -0,0 +1,16 @@ +import type { ClawdbotPluginApi } from "../../src/plugins/types.js"; + +import { zaloDock, zaloPlugin } from "./src/channel.js"; +import { handleZaloWebhookRequest } from "./src/monitor.js"; + +const plugin = { + id: "zalo", + name: "Zalo", + description: "Zalo channel plugin (Bot API)", + register(api: ClawdbotPluginApi) { + api.registerChannel({ plugin: zaloPlugin, dock: zaloDock }); + api.registerHttpHandler(handleZaloWebhookRequest); + }, +}; + +export default plugin; diff --git a/extensions/zalo/package.json b/extensions/zalo/package.json new file mode 100644 index 000000000..fff36d6d4 --- /dev/null +++ b/extensions/zalo/package.json @@ -0,0 +1,12 @@ +{ + "name": "@clawdbot/zalo", + "version": "0.1.0", + "type": "module", + "description": "Clawdbot Zalo channel plugin", + "clawdbot": { + "extensions": ["./index.ts"] + }, + "dependencies": { + "undici": "7.18.2" + } +} diff --git a/extensions/zalo/src/accounts.ts b/extensions/zalo/src/accounts.ts new file mode 100644 index 000000000..c9dc3c069 --- /dev/null +++ b/extensions/zalo/src/accounts.ts @@ -0,0 +1,74 @@ +import type { + CoreConfig, + ResolvedZaloAccount, + ZaloAccountConfig, + ZaloConfig, +} from "./types.js"; +import { resolveZaloToken } from "./token.js"; +import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "./shared/account-ids.js"; + +function listConfiguredAccountIds(cfg: CoreConfig): string[] { + const accounts = (cfg.channels?.zalo as ZaloConfig | undefined)?.accounts; + if (!accounts || typeof accounts !== "object") return []; + return Object.keys(accounts).filter(Boolean); +} + +export function listZaloAccountIds(cfg: CoreConfig): string[] { + const ids = listConfiguredAccountIds(cfg); + if (ids.length === 0) return [DEFAULT_ACCOUNT_ID]; + return ids.sort((a, b) => a.localeCompare(b)); +} + +export function resolveDefaultZaloAccountId(cfg: CoreConfig): string { + const zaloConfig = cfg.channels?.zalo as ZaloConfig | undefined; + if (zaloConfig?.defaultAccount?.trim()) return zaloConfig.defaultAccount.trim(); + const ids = listZaloAccountIds(cfg); + if (ids.includes(DEFAULT_ACCOUNT_ID)) return DEFAULT_ACCOUNT_ID; + return ids[0] ?? DEFAULT_ACCOUNT_ID; +} + +function resolveAccountConfig( + cfg: CoreConfig, + accountId: string, +): ZaloAccountConfig | undefined { + const accounts = (cfg.channels?.zalo as ZaloConfig | undefined)?.accounts; + if (!accounts || typeof accounts !== "object") return undefined; + return accounts[accountId] as ZaloAccountConfig | undefined; +} + +function mergeZaloAccountConfig(cfg: CoreConfig, accountId: string): ZaloAccountConfig { + const raw = (cfg.channels?.zalo ?? {}) as ZaloConfig; + const { accounts: _ignored, defaultAccount: _ignored2, ...base } = raw; + const account = resolveAccountConfig(cfg, accountId) ?? {}; + return { ...base, ...account }; +} + +export function resolveZaloAccount(params: { + cfg: CoreConfig; + accountId?: string | null; +}): ResolvedZaloAccount { + const accountId = normalizeAccountId(params.accountId); + const baseEnabled = (params.cfg.channels?.zalo as ZaloConfig | undefined)?.enabled !== false; + const merged = mergeZaloAccountConfig(params.cfg, accountId); + const accountEnabled = merged.enabled !== false; + const enabled = baseEnabled && accountEnabled; + const tokenResolution = resolveZaloToken( + params.cfg.channels?.zalo as ZaloConfig | undefined, + accountId, + ); + + return { + accountId, + name: merged.name?.trim() || undefined, + enabled, + token: tokenResolution.token, + tokenSource: tokenResolution.source, + config: merged, + }; +} + +export function listEnabledZaloAccounts(cfg: CoreConfig): ResolvedZaloAccount[] { + return listZaloAccountIds(cfg) + .map((accountId) => resolveZaloAccount({ cfg, accountId })) + .filter((account) => account.enabled); +} diff --git a/extensions/zalo/src/actions.ts b/extensions/zalo/src/actions.ts new file mode 100644 index 000000000..aeaece1bc --- /dev/null +++ b/extensions/zalo/src/actions.ts @@ -0,0 +1,59 @@ +import type { ChannelMessageActionAdapter, ChannelMessageActionName } from "../../src/channels/plugins/types.js"; + +import type { CoreConfig } from "./types.js"; +import { listEnabledZaloAccounts } from "./accounts.js"; +import { sendMessageZalo } from "./send.js"; +import { jsonResult, readStringParam } from "./tool-helpers.js"; + +const providerId = "zalo"; + +function listEnabledAccounts(cfg: CoreConfig) { + return listEnabledZaloAccounts(cfg).filter( + (account) => account.enabled && account.tokenSource !== "none", + ); +} + +export const zaloMessageActions: ChannelMessageActionAdapter = { + listActions: ({ cfg }) => { + const accounts = listEnabledAccounts(cfg as CoreConfig); + if (accounts.length === 0) return []; + const actions = new Set(["send"]); + return Array.from(actions); + }, + supportsButtons: () => false, + extractToolSend: ({ args }) => { + const action = typeof args.action === "string" ? args.action.trim() : ""; + if (action !== "sendMessage") return null; + const to = typeof args.to === "string" ? args.to : undefined; + if (!to) return null; + const accountId = typeof args.accountId === "string" ? args.accountId.trim() : undefined; + return { to, accountId }; + }, + handleAction: async ({ action, params, cfg, accountId }) => { + if (action === "send") { + const to = readStringParam(params, "to", { required: true }); + const content = readStringParam(params, "message", { + required: true, + allowEmpty: true, + }); + const mediaUrl = readStringParam(params, "media", { trim: false }); + + const result = await sendMessageZalo(to ?? "", content ?? "", { + accountId: accountId ?? undefined, + mediaUrl: mediaUrl ?? undefined, + cfg: cfg as CoreConfig, + }); + + if (!result.ok) { + return jsonResult({ + ok: false, + error: result.error ?? "Failed to send Zalo message", + }); + } + + return jsonResult({ ok: true, to, messageId: result.messageId }); + } + + throw new Error(`Action ${action} is not supported for provider ${providerId}.`); + }, +}; diff --git a/extensions/zalo/src/api.ts b/extensions/zalo/src/api.ts new file mode 100644 index 000000000..63c04351a --- /dev/null +++ b/extensions/zalo/src/api.ts @@ -0,0 +1,206 @@ +/** + * Zalo Bot API client + * @see https://bot.zaloplatforms.com/docs + */ + +const ZALO_API_BASE = "https://bot-api.zaloplatforms.com"; + +export type ZaloFetch = (input: string, init?: RequestInit) => Promise; + +export type ZaloApiResponse = { + ok: boolean; + result?: T; + error_code?: number; + description?: string; +}; + +export type ZaloBotInfo = { + id: string; + name: string; + avatar?: string; +}; + +export type ZaloMessage = { + message_id: string; + from: { + id: string; + name?: string; + avatar?: string; + }; + chat: { + id: string; + chat_type: "PRIVATE" | "GROUP"; + }; + date: number; + text?: string; + photo?: string; + caption?: string; + sticker?: string; +}; + +export type ZaloUpdate = { + event_name: + | "message.text.received" + | "message.image.received" + | "message.sticker.received" + | "message.unsupported.received"; + message?: ZaloMessage; +}; + +export type ZaloSendMessageParams = { + chat_id: string; + text: string; +}; + +export type ZaloSendPhotoParams = { + chat_id: string; + photo: string; + caption?: string; +}; + +export type ZaloSetWebhookParams = { + url: string; + secret_token: string; +}; + +export type ZaloGetUpdatesParams = { + /** Timeout in seconds (passed as string to API) */ + timeout?: number; +}; + +export class ZaloApiError extends Error { + constructor( + message: string, + public readonly errorCode?: number, + public readonly description?: string, + ) { + super(message); + this.name = "ZaloApiError"; + } + + /** True if this is a long-polling timeout (no updates available) */ + get isPollingTimeout(): boolean { + return this.errorCode === 408; + } +} + +/** + * Call the Zalo Bot API + */ +export async function callZaloApi( + method: string, + token: string, + body?: Record, + options?: { timeoutMs?: number; fetch?: ZaloFetch }, +): Promise> { + const url = `${ZALO_API_BASE}/bot${token}/${method}`; + const controller = new AbortController(); + const timeoutId = options?.timeoutMs + ? setTimeout(() => controller.abort(), options.timeoutMs) + : undefined; + const fetcher = options?.fetch ?? fetch; + + try { + const response = await fetcher(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: body ? JSON.stringify(body) : undefined, + signal: controller.signal, + }); + + const data = (await response.json()) as ZaloApiResponse; + + if (!data.ok) { + throw new ZaloApiError( + data.description ?? `Zalo API error: ${method}`, + data.error_code, + data.description, + ); + } + + return data; + } finally { + if (timeoutId) clearTimeout(timeoutId); + } +} + +/** + * Validate bot token and get bot info + */ +export async function getMe( + token: string, + timeoutMs?: number, + fetcher?: ZaloFetch, +): Promise> { + return callZaloApi("getMe", token, undefined, { timeoutMs, fetch: fetcher }); +} + +/** + * Send a text message + */ +export async function sendMessage( + token: string, + params: ZaloSendMessageParams, + fetcher?: ZaloFetch, +): Promise> { + return callZaloApi("sendMessage", token, params, { fetch: fetcher }); +} + +/** + * Send a photo message + */ +export async function sendPhoto( + token: string, + params: ZaloSendPhotoParams, + fetcher?: ZaloFetch, +): Promise> { + return callZaloApi("sendPhoto", token, params, { fetch: fetcher }); +} + +/** + * Get updates using long polling (dev/testing only) + * Note: Zalo returns a single update per call, not an array like Telegram + */ +export async function getUpdates( + token: string, + params?: ZaloGetUpdatesParams, + fetcher?: ZaloFetch, +): Promise> { + const pollTimeoutSec = params?.timeout ?? 30; + const timeoutMs = (pollTimeoutSec + 5) * 1000; + const body = { timeout: String(pollTimeoutSec) }; + return callZaloApi("getUpdates", token, body, { timeoutMs, fetch: fetcher }); +} + +/** + * Set webhook URL for receiving updates + */ +export async function setWebhook( + token: string, + params: ZaloSetWebhookParams, + fetcher?: ZaloFetch, +): Promise> { + return callZaloApi("setWebhook", token, params, { fetch: fetcher }); +} + +/** + * Delete webhook configuration + */ +export async function deleteWebhook( + token: string, + fetcher?: ZaloFetch, +): Promise> { + return callZaloApi("deleteWebhook", token, undefined, { fetch: fetcher }); +} + +/** + * Get current webhook info + */ +export async function getWebhookInfo( + token: string, + fetcher?: ZaloFetch, +): Promise> { + return callZaloApi("getWebhookInfo", token, undefined, { fetch: fetcher }); +} diff --git a/extensions/zalo/src/channel.ts b/extensions/zalo/src/channel.ts new file mode 100644 index 000000000..d03812653 --- /dev/null +++ b/extensions/zalo/src/channel.ts @@ -0,0 +1,370 @@ +import type { ChannelDock, ChannelPlugin } from "../../src/channels/plugins/types.js"; +import type { ChannelAccountSnapshot } from "../../src/channels/plugins/types.js"; + +import { listZaloAccountIds, resolveDefaultZaloAccountId, resolveZaloAccount, type ResolvedZaloAccount } from "./accounts.js"; +import { zaloMessageActions } from "./actions.js"; +import { + deleteAccountFromConfigSection, + setAccountEnabledInConfigSection, +} from "./shared/channel-config.js"; +import { zaloOnboardingAdapter } from "./onboarding.js"; +import { formatPairingApproveHint, PAIRING_APPROVED_MESSAGE } from "./shared/pairing.js"; +import { resolveZaloProxyFetch } from "./proxy.js"; +import { probeZalo } from "./probe.js"; +import { sendMessageZalo } from "./send.js"; +import { + applyAccountNameToChannelSection, + migrateBaseNameToDefaultAccount, +} from "./shared/channel-setup.js"; +import { collectZaloStatusIssues } from "./status-issues.js"; +import type { CoreConfig } from "./types.js"; +import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "./shared/account-ids.js"; + +const meta = { + id: "zalo", + label: "Zalo", + selectionLabel: "Zalo (Bot API)", + docsPath: "/channels/zalo", + docsLabel: "zalo", + blurb: "Vietnam-focused messaging platform with Bot API.", + aliases: ["zl"], + order: 80, + quickstartAllowFrom: true, +}; + + +function normalizeZaloMessagingTarget(raw: string): string | undefined { + const trimmed = raw?.trim(); + if (!trimmed) return undefined; + return trimmed.replace(/^(zalo|zl):/i, ""); +} + +export const zaloDock: ChannelDock = { + id: "zalo", + capabilities: { + chatTypes: ["direct"], + media: true, + blockStreaming: true, + }, + outbound: { textChunkLimit: 2000 }, + config: { + resolveAllowFrom: ({ cfg, accountId }) => + (resolveZaloAccount({ cfg: cfg as CoreConfig, accountId }).config.allowFrom ?? []).map( + (entry) => String(entry), + ), + formatAllowFrom: ({ allowFrom }) => + allowFrom + .map((entry) => String(entry).trim()) + .filter(Boolean) + .map((entry) => entry.replace(/^(zalo|zl):/i, "")) + .map((entry) => entry.toLowerCase()), + }, + groups: { + resolveRequireMention: () => true, + }, + threading: { + resolveReplyToMode: () => "off", + }, +}; + +export const zaloPlugin: ChannelPlugin = { + id: "zalo", + meta, + onboarding: zaloOnboardingAdapter, + capabilities: { + chatTypes: ["direct"], + media: true, + reactions: false, + threads: false, + polls: false, + nativeCommands: false, + blockStreaming: true, + }, + reload: { configPrefixes: ["channels.zalo"] }, + config: { + listAccountIds: (cfg) => listZaloAccountIds(cfg as CoreConfig), + resolveAccount: (cfg, accountId) => resolveZaloAccount({ cfg: cfg as CoreConfig, accountId }), + defaultAccountId: (cfg) => resolveDefaultZaloAccountId(cfg as CoreConfig), + setAccountEnabled: ({ cfg, accountId, enabled }) => + setAccountEnabledInConfigSection({ + cfg: cfg as CoreConfig, + sectionKey: "zalo", + accountId, + enabled, + allowTopLevel: true, + }), + deleteAccount: ({ cfg, accountId }) => + deleteAccountFromConfigSection({ + cfg: cfg as CoreConfig, + sectionKey: "zalo", + accountId, + clearBaseFields: ["botToken", "tokenFile", "name"], + }), + isConfigured: (account) => Boolean(account.token?.trim()), + describeAccount: (account): ChannelAccountSnapshot => ({ + accountId: account.accountId, + name: account.name, + enabled: account.enabled, + configured: Boolean(account.token?.trim()), + tokenSource: account.tokenSource, + }), + resolveAllowFrom: ({ cfg, accountId }) => + (resolveZaloAccount({ cfg: cfg as CoreConfig, accountId }).config.allowFrom ?? []).map( + (entry) => String(entry), + ), + formatAllowFrom: ({ allowFrom }) => + allowFrom + .map((entry) => String(entry).trim()) + .filter(Boolean) + .map((entry) => entry.replace(/^(zalo|zl):/i, "")) + .map((entry) => entry.toLowerCase()), + }, + security: { + resolveDmPolicy: ({ cfg, accountId, account }) => { + const resolvedAccountId = accountId ?? account.accountId ?? DEFAULT_ACCOUNT_ID; + const useAccountPath = Boolean( + (cfg as CoreConfig).channels?.zalo?.accounts?.[resolvedAccountId], + ); + const basePath = useAccountPath + ? `channels.zalo.accounts.${resolvedAccountId}.` + : "channels.zalo."; + return { + policy: account.config.dmPolicy ?? "pairing", + allowFrom: account.config.allowFrom ?? [], + policyPath: `${basePath}dmPolicy`, + allowFromPath: basePath, + approveHint: formatPairingApproveHint("zalo"), + normalizeEntry: (raw) => raw.replace(/^(zalo|zl):/i, ""), + }; + }, + }, + groups: { + resolveRequireMention: () => true, + }, + threading: { + resolveReplyToMode: () => "off", + }, + actions: zaloMessageActions, + messaging: { + normalizeTarget: normalizeZaloMessagingTarget, + }, + setup: { + resolveAccountId: ({ accountId }) => normalizeAccountId(accountId), + applyAccountName: ({ cfg, accountId, name }) => + applyAccountNameToChannelSection({ + cfg: cfg as CoreConfig, + channelKey: "zalo", + accountId, + name, + }), + validateInput: ({ accountId, input }) => { + if (input.useEnv && accountId !== DEFAULT_ACCOUNT_ID) { + return "ZALO_BOT_TOKEN can only be used for the default account."; + } + if (!input.useEnv && !input.token && !input.tokenFile) { + return "Zalo requires --token or --token-file (or --use-env)."; + } + return null; + }, + applyAccountConfig: ({ cfg, accountId, input }) => { + const namedConfig = applyAccountNameToChannelSection({ + cfg: cfg as CoreConfig, + channelKey: "zalo", + accountId, + name: input.name, + }); + const next = + accountId !== DEFAULT_ACCOUNT_ID + ? migrateBaseNameToDefaultAccount({ + cfg: namedConfig, + channelKey: "zalo", + }) + : namedConfig; + if (accountId === DEFAULT_ACCOUNT_ID) { + return { + ...next, + channels: { + ...next.channels, + zalo: { + ...next.channels?.zalo, + enabled: true, + ...(input.useEnv + ? {} + : input.tokenFile + ? { tokenFile: input.tokenFile } + : input.token + ? { botToken: input.token } + : {}), + }, + }, + } as CoreConfig; + } + return { + ...next, + channels: { + ...next.channels, + zalo: { + ...next.channels?.zalo, + enabled: true, + accounts: { + ...(next.channels?.zalo?.accounts ?? {}), + [accountId]: { + ...(next.channels?.zalo?.accounts?.[accountId] ?? {}), + enabled: true, + ...(input.tokenFile + ? { tokenFile: input.tokenFile } + : input.token + ? { botToken: input.token } + : {}), + }, + }, + }, + }, + } as CoreConfig; + }, + }, + pairing: { + idLabel: "zaloUserId", + normalizeAllowEntry: (entry) => entry.replace(/^(zalo|zl):/i, ""), + notifyApproval: async ({ cfg, id }) => { + const account = resolveZaloAccount({ cfg: cfg as CoreConfig }); + if (!account.token) throw new Error("Zalo token not configured"); + await sendMessageZalo(id, PAIRING_APPROVED_MESSAGE, { token: account.token }); + }, + }, + outbound: { + deliveryMode: "direct", + chunker: (text, limit) => { + if (!text) return []; + if (limit <= 0 || text.length <= limit) return [text]; + const chunks: string[] = []; + let remaining = text; + while (remaining.length > limit) { + const window = remaining.slice(0, limit); + const lastNewline = window.lastIndexOf("\n"); + const lastSpace = window.lastIndexOf(" "); + let breakIdx = lastNewline > 0 ? lastNewline : lastSpace; + if (breakIdx <= 0) breakIdx = limit; + const rawChunk = remaining.slice(0, breakIdx); + const chunk = rawChunk.trimEnd(); + if (chunk.length > 0) chunks.push(chunk); + const brokeOnSeparator = breakIdx < remaining.length && /\s/.test(remaining[breakIdx]); + const nextStart = Math.min(remaining.length, breakIdx + (brokeOnSeparator ? 1 : 0)); + remaining = remaining.slice(nextStart).trimStart(); + } + if (remaining.length) chunks.push(remaining); + return chunks; + }, + textChunkLimit: 2000, + resolveTarget: ({ to }) => { + const trimmed = to?.trim(); + if (!trimmed) { + return { + ok: false, + error: new Error("Delivering to Zalo requires --to "), + }; + } + return { ok: true, to: trimmed }; + }, + sendText: async ({ to, text, accountId, cfg }) => { + const result = await sendMessageZalo(to, text, { + accountId: accountId ?? undefined, + cfg: cfg as CoreConfig, + }); + return { + channel: "zalo", + ok: result.ok, + messageId: result.messageId ?? "", + error: result.error ? new Error(result.error) : undefined, + }; + }, + sendMedia: async ({ to, text, mediaUrl, accountId, cfg }) => { + const result = await sendMessageZalo(to, text, { + accountId: accountId ?? undefined, + mediaUrl, + cfg: cfg as CoreConfig, + }); + return { + channel: "zalo", + ok: result.ok, + messageId: result.messageId ?? "", + error: result.error ? new Error(result.error) : undefined, + }; + }, + }, + status: { + defaultRuntime: { + accountId: DEFAULT_ACCOUNT_ID, + running: false, + lastStartAt: null, + lastStopAt: null, + lastError: null, + }, + collectStatusIssues: collectZaloStatusIssues, + buildChannelSummary: ({ snapshot }) => ({ + configured: snapshot.configured ?? false, + tokenSource: snapshot.tokenSource ?? "none", + running: snapshot.running ?? false, + mode: snapshot.mode ?? null, + lastStartAt: snapshot.lastStartAt ?? null, + lastStopAt: snapshot.lastStopAt ?? null, + lastError: snapshot.lastError ?? null, + probe: snapshot.probe, + lastProbeAt: snapshot.lastProbeAt ?? null, + }), + probeAccount: async ({ account, timeoutMs }) => + probeZalo(account.token, timeoutMs, resolveZaloProxyFetch(account.config.proxy)), + buildAccountSnapshot: ({ account, runtime }) => { + const configured = Boolean(account.token?.trim()); + return { + accountId: account.accountId, + name: account.name, + enabled: account.enabled, + configured, + tokenSource: account.tokenSource, + running: runtime?.running ?? false, + lastStartAt: runtime?.lastStartAt ?? null, + lastStopAt: runtime?.lastStopAt ?? null, + lastError: runtime?.lastError ?? null, + mode: account.config.webhookUrl ? "webhook" : "polling", + lastInboundAt: runtime?.lastInboundAt ?? null, + lastOutboundAt: runtime?.lastOutboundAt ?? null, + dmPolicy: account.config.dmPolicy ?? "pairing", + }; + }, + }, + gateway: { + startAccount: async (ctx) => { + const account = ctx.account; + const token = account.token.trim(); + let zaloBotLabel = ""; + const fetcher = resolveZaloProxyFetch(account.config.proxy); + try { + const probe = await probeZalo(token, 2500, fetcher); + const name = probe.ok ? probe.bot?.name?.trim() : null; + if (name) zaloBotLabel = ` (${name})`; + ctx.setStatus({ + accountId: account.accountId, + bot: probe.bot, + }); + } catch { + // ignore probe errors + } + ctx.log?.info(`[${account.accountId}] starting provider${zaloBotLabel}`); + const { monitorZaloProvider } = await import("./monitor.js"); + return monitorZaloProvider({ + token, + account, + config: ctx.cfg as CoreConfig, + runtime: ctx.runtime, + abortSignal: ctx.abortSignal, + useWebhook: Boolean(account.config.webhookUrl), + webhookUrl: account.config.webhookUrl, + webhookSecret: account.config.webhookSecret, + webhookPath: account.config.webhookPath, + fetcher, + statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }), + }); + }, + }, +}; diff --git a/extensions/zalo/src/core-bridge.ts b/extensions/zalo/src/core-bridge.ts new file mode 100644 index 000000000..46162412b --- /dev/null +++ b/extensions/zalo/src/core-bridge.ts @@ -0,0 +1,171 @@ +import fs from "node:fs"; +import path from "node:path"; +import { fileURLToPath, pathToFileURL } from "node:url"; + +export type CoreChannelDeps = { + chunkMarkdownText: (text: string, limit: number) => string[]; + formatAgentEnvelope: (params: { + channel: string; + from: string; + timestamp?: number; + body: string; + }) => string; + dispatchReplyWithBufferedBlockDispatcher: (params: { + ctx: unknown; + cfg: unknown; + dispatcherOptions: { + deliver: (payload: unknown) => Promise; + onError?: (err: unknown, info: { kind: string }) => void; + }; + }) => Promise; + resolveAgentRoute: (params: { + cfg: unknown; + channel: string; + accountId: string; + peer: { kind: "dm" | "group" | "channel"; id: string }; + }) => { sessionKey: string; accountId: string }; + buildPairingReply: (params: { channel: string; idLine: string; code: string }) => string; + readChannelAllowFromStore: (channel: string) => Promise; + upsertChannelPairingRequest: (params: { + channel: string; + id: string; + meta?: { name?: string }; + }) => Promise<{ code: string; created: boolean }>; + fetchRemoteMedia: (params: { url: string }) => Promise<{ buffer: Buffer; contentType?: string }>; + saveMediaBuffer: ( + buffer: Buffer, + contentType: string | undefined, + type: "inbound" | "outbound", + maxBytes: number, + ) => Promise<{ path: string; contentType: string }>; + shouldLogVerbose: () => boolean; +}; + +let coreRootCache: string | null = null; +let coreDepsPromise: Promise | null = null; + +function findPackageRoot(startDir: string, name: string): string | null { + let dir = startDir; + for (;;) { + const pkgPath = path.join(dir, "package.json"); + try { + if (fs.existsSync(pkgPath)) { + const raw = fs.readFileSync(pkgPath, "utf8"); + const pkg = JSON.parse(raw) as { name?: string }; + if (pkg.name === name) return dir; + } + } catch { + // ignore parse errors + } + const parent = path.dirname(dir); + if (parent === dir) return null; + dir = parent; + } +} + +function resolveClawdbotRoot(): string { + if (coreRootCache) return coreRootCache; + const override = process.env.CLAWDBOT_ROOT?.trim(); + if (override) { + coreRootCache = override; + return override; + } + + const candidates = new Set(); + if (process.argv[1]) { + candidates.add(path.dirname(process.argv[1])); + } + candidates.add(process.cwd()); + try { + const urlPath = fileURLToPath(import.meta.url); + candidates.add(path.dirname(urlPath)); + } catch { + // ignore + } + + for (const start of candidates) { + const found = findPackageRoot(start, "clawdbot"); + if (found) { + coreRootCache = found; + return found; + } + } + + throw new Error( + "Unable to resolve Clawdbot root. Set CLAWDBOT_ROOT to the package root.", + ); +} + +async function importCoreModule(relativePath: string): Promise { + const root = resolveClawdbotRoot(); + const distPath = path.join(root, "dist", relativePath); + if (!fs.existsSync(distPath)) { + throw new Error( + `Missing core module at ${distPath}. Run \`pnpm build\` or install the official package.`, + ); + } + return (await import(pathToFileURL(distPath).href)) as T; +} + +export async function loadCoreChannelDeps(): Promise { + if (coreDepsPromise) return coreDepsPromise; + + coreDepsPromise = (async () => { + const [ + chunk, + envelope, + dispatcher, + routing, + pairingMessages, + pairingStore, + mediaFetch, + mediaStore, + globals, + ] = await Promise.all([ + importCoreModule<{ chunkMarkdownText: CoreChannelDeps["chunkMarkdownText"] }>( + "auto-reply/chunk.js", + ), + importCoreModule<{ formatAgentEnvelope: CoreChannelDeps["formatAgentEnvelope"] }>( + "auto-reply/envelope.js", + ), + importCoreModule<{ + dispatchReplyWithBufferedBlockDispatcher: CoreChannelDeps["dispatchReplyWithBufferedBlockDispatcher"]; + }>("auto-reply/reply/provider-dispatcher.js"), + importCoreModule<{ resolveAgentRoute: CoreChannelDeps["resolveAgentRoute"] }>( + "routing/resolve-route.js", + ), + importCoreModule<{ buildPairingReply: CoreChannelDeps["buildPairingReply"] }>( + "pairing/pairing-messages.js", + ), + importCoreModule<{ + readChannelAllowFromStore: CoreChannelDeps["readChannelAllowFromStore"]; + upsertChannelPairingRequest: CoreChannelDeps["upsertChannelPairingRequest"]; + }>("pairing/pairing-store.js"), + importCoreModule<{ fetchRemoteMedia: CoreChannelDeps["fetchRemoteMedia"] }>( + "media/fetch.js", + ), + importCoreModule<{ saveMediaBuffer: CoreChannelDeps["saveMediaBuffer"] }>( + "media/store.js", + ), + importCoreModule<{ shouldLogVerbose: CoreChannelDeps["shouldLogVerbose"] }>( + "globals.js", + ), + ]); + + return { + chunkMarkdownText: chunk.chunkMarkdownText, + formatAgentEnvelope: envelope.formatAgentEnvelope, + dispatchReplyWithBufferedBlockDispatcher: + dispatcher.dispatchReplyWithBufferedBlockDispatcher, + resolveAgentRoute: routing.resolveAgentRoute, + buildPairingReply: pairingMessages.buildPairingReply, + readChannelAllowFromStore: pairingStore.readChannelAllowFromStore, + upsertChannelPairingRequest: pairingStore.upsertChannelPairingRequest, + fetchRemoteMedia: mediaFetch.fetchRemoteMedia, + saveMediaBuffer: mediaStore.saveMediaBuffer, + shouldLogVerbose: globals.shouldLogVerbose, + }; + })(); + + return coreDepsPromise; +} diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts new file mode 100644 index 000000000..6770cf53b --- /dev/null +++ b/extensions/zalo/src/monitor.ts @@ -0,0 +1,679 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; + +import type { ResolvedZaloAccount } from "./accounts.js"; +import { + ZaloApiError, + deleteWebhook, + getUpdates, + sendMessage, + sendPhoto, + setWebhook, + type ZaloFetch, + type ZaloMessage, + type ZaloUpdate, +} from "./api.js"; +import { loadCoreChannelDeps } from "./core-bridge.js"; +import { resolveZaloProxyFetch } from "./proxy.js"; +import type { CoreConfig } from "./types.js"; + +export type ZaloRuntimeEnv = { + log?: (message: string) => void; + error?: (message: string) => void; +}; + +export type ZaloMonitorOptions = { + token: string; + account: ResolvedZaloAccount; + config: CoreConfig; + runtime: ZaloRuntimeEnv; + abortSignal: AbortSignal; + useWebhook?: boolean; + webhookUrl?: string; + webhookSecret?: string; + webhookPath?: string; + fetcher?: ZaloFetch; + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; +}; + +export type ZaloMonitorResult = { + stop: () => void; +}; + +const ZALO_TEXT_LIMIT = 2000; +const DEFAULT_MEDIA_MAX_MB = 5; + +function logVerbose(deps: Awaited>, message: string): void { + if (deps.shouldLogVerbose()) { + console.log(`[zalo] ${message}`); + } +} + +function isSenderAllowed(senderId: string, allowFrom: string[]): boolean { + if (allowFrom.includes("*")) return true; + const normalizedSenderId = senderId.toLowerCase(); + return allowFrom.some((entry) => { + const normalized = entry.toLowerCase().replace(/^(zalo|zl):/i, ""); + return normalized === normalizedSenderId; + }); +} + +async function readJsonBody(req: IncomingMessage, maxBytes: number) { + const chunks: Buffer[] = []; + let total = 0; + return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => { + req.on("data", (chunk: Buffer) => { + total += chunk.length; + if (total > maxBytes) { + resolve({ ok: false, error: "payload too large" }); + req.destroy(); + return; + } + chunks.push(chunk); + }); + req.on("end", () => { + try { + const raw = Buffer.concat(chunks).toString("utf8"); + if (!raw.trim()) { + resolve({ ok: false, error: "empty payload" }); + return; + } + resolve({ ok: true, value: JSON.parse(raw) as unknown }); + } catch (err) { + resolve({ ok: false, error: err instanceof Error ? err.message : String(err) }); + } + }); + req.on("error", (err) => { + resolve({ ok: false, error: err instanceof Error ? err.message : String(err) }); + }); + }); +} + +type WebhookTarget = { + token: string; + account: ResolvedZaloAccount; + config: CoreConfig; + runtime: ZaloRuntimeEnv; + deps: Awaited>; + secret: string; + path: string; + mediaMaxMb: number; + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; + fetcher?: ZaloFetch; +}; + +const webhookTargets = new Map(); + +function normalizeWebhookPath(raw: string): string { + const trimmed = raw.trim(); + if (!trimmed) return "/"; + const withSlash = trimmed.startsWith("/") ? trimmed : `/${trimmed}`; + if (withSlash.length > 1 && withSlash.endsWith("/")) { + return withSlash.slice(0, -1); + } + return withSlash; +} + +function resolveWebhookPath(webhookPath?: string, webhookUrl?: string): string | null { + const trimmedPath = webhookPath?.trim(); + if (trimmedPath) return normalizeWebhookPath(trimmedPath); + if (webhookUrl?.trim()) { + try { + const parsed = new URL(webhookUrl); + return normalizeWebhookPath(parsed.pathname || "/"); + } catch { + return null; + } + } + return null; +} + +export function registerZaloWebhookTarget(target: WebhookTarget): () => void { + const key = normalizeWebhookPath(target.path); + const normalizedTarget = { ...target, path: key }; + const existing = webhookTargets.get(key) ?? []; + const next = [...existing, normalizedTarget]; + webhookTargets.set(key, next); + return () => { + const updated = (webhookTargets.get(key) ?? []).filter( + (entry) => entry !== normalizedTarget, + ); + if (updated.length > 0) { + webhookTargets.set(key, updated); + } else { + webhookTargets.delete(key); + } + }; +} + +export async function handleZaloWebhookRequest( + req: IncomingMessage, + res: ServerResponse, +): Promise { + const url = new URL(req.url ?? "/", "http://localhost"); + const path = normalizeWebhookPath(url.pathname); + const targets = webhookTargets.get(path); + if (!targets || targets.length === 0) return false; + + if (req.method !== "POST") { + res.statusCode = 405; + res.setHeader("Allow", "POST"); + res.end("Method Not Allowed"); + return true; + } + + const headerToken = String(req.headers["x-bot-api-secret-token"] ?? ""); + const target = targets.find((entry) => entry.secret === headerToken); + if (!target) { + res.statusCode = 401; + res.end("unauthorized"); + return true; + } + + const body = await readJsonBody(req, 1024 * 1024); + if (!body.ok) { + res.statusCode = body.error === "payload too large" ? 413 : 400; + res.end(body.error ?? "invalid payload"); + return true; + } + + const payload = body.value as { ok?: boolean; result?: ZaloUpdate }; + if (!payload?.ok || !payload.result) { + res.statusCode = 400; + res.end("invalid payload"); + return true; + } + + target.statusSink?.({ lastInboundAt: Date.now() }); + processUpdate( + payload.result, + target.token, + target.account, + target.config, + target.runtime, + target.deps, + target.mediaMaxMb, + target.statusSink, + target.fetcher, + ).catch((err) => { + target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`); + }); + + res.statusCode = 200; + res.end("ok"); + return true; +} + +function startPollingLoop(params: { + token: string; + account: ResolvedZaloAccount; + config: CoreConfig; + runtime: ZaloRuntimeEnv; + deps: Awaited>; + abortSignal: AbortSignal; + isStopped: () => boolean; + mediaMaxMb: number; + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; + fetcher?: ZaloFetch; +}) { + const { + token, + account, + config, + runtime, + deps, + abortSignal, + isStopped, + mediaMaxMb, + statusSink, + fetcher, + } = params; + const pollTimeout = 30; + + const poll = async () => { + if (isStopped() || abortSignal.aborted) return; + + try { + const response = await getUpdates(token, { timeout: pollTimeout }, fetcher); + if (response.ok && response.result) { + statusSink?.({ lastInboundAt: Date.now() }); + await processUpdate( + response.result, + token, + account, + config, + runtime, + deps, + mediaMaxMb, + statusSink, + fetcher, + ); + } + } catch (err) { + if (err instanceof ZaloApiError && err.isPollingTimeout) { + // no updates + } else if (!isStopped() && !abortSignal.aborted) { + console.error(`[${account.accountId}] Zalo polling error:`, err); + await new Promise((resolve) => setTimeout(resolve, 5000)); + } + } + + if (!isStopped() && !abortSignal.aborted) { + setImmediate(poll); + } + }; + + void poll(); +} + +async function processUpdate( + update: ZaloUpdate, + token: string, + account: ResolvedZaloAccount, + config: CoreConfig, + runtime: ZaloRuntimeEnv, + deps: Awaited>, + mediaMaxMb: number, + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, + fetcher?: ZaloFetch, +): Promise { + const { event_name, message } = update; + if (!message) return; + + switch (event_name) { + case "message.text.received": + await handleTextMessage( + message, + token, + account, + config, + runtime, + deps, + statusSink, + fetcher, + ); + break; + case "message.image.received": + await handleImageMessage( + message, + token, + account, + config, + runtime, + deps, + mediaMaxMb, + statusSink, + fetcher, + ); + break; + case "message.sticker.received": + console.log(`[${account.accountId}] Received sticker from ${message.from.id}`); + break; + case "message.unsupported.received": + console.log( + `[${account.accountId}] Received unsupported message type from ${message.from.id}`, + ); + break; + } +} + +async function handleTextMessage( + message: ZaloMessage, + token: string, + account: ResolvedZaloAccount, + config: CoreConfig, + runtime: ZaloRuntimeEnv, + deps: Awaited>, + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, + fetcher?: ZaloFetch, +): Promise { + const { text } = message; + if (!text?.trim()) return; + + await processMessageWithPipeline({ + message, + token, + account, + config, + runtime, + deps, + text, + mediaPath: undefined, + mediaType: undefined, + statusSink, + fetcher, + }); +} + +async function handleImageMessage( + message: ZaloMessage, + token: string, + account: ResolvedZaloAccount, + config: CoreConfig, + runtime: ZaloRuntimeEnv, + deps: Awaited>, + mediaMaxMb: number, + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, + fetcher?: ZaloFetch, +): Promise { + const { photo, caption } = message; + + let mediaPath: string | undefined; + let mediaType: string | undefined; + + if (photo) { + try { + const maxBytes = mediaMaxMb * 1024 * 1024; + const fetched = await deps.fetchRemoteMedia({ url: photo }); + const saved = await deps.saveMediaBuffer( + fetched.buffer, + fetched.contentType, + "inbound", + maxBytes, + ); + mediaPath = saved.path; + mediaType = saved.contentType; + } catch (err) { + console.error(`[${account.accountId}] Failed to download Zalo image:`, err); + } + } + + await processMessageWithPipeline({ + message, + token, + account, + config, + runtime, + deps, + text: caption, + mediaPath, + mediaType, + statusSink, + fetcher, + }); +} + +async function processMessageWithPipeline(params: { + message: ZaloMessage; + token: string; + account: ResolvedZaloAccount; + config: CoreConfig; + runtime: ZaloRuntimeEnv; + deps: Awaited>; + text?: string; + mediaPath?: string; + mediaType?: string; + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; + fetcher?: ZaloFetch; +}): Promise { + const { + message, + token, + account, + config, + runtime, + deps, + text, + mediaPath, + mediaType, + statusSink, + fetcher, + } = params; + const { from, chat, message_id, date } = message; + + const isGroup = chat.chat_type === "GROUP"; + const chatId = chat.id; + const senderId = from.id; + const senderName = from.name; + + const dmPolicy = account.config.dmPolicy ?? "pairing"; + const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v)); + + if (!isGroup) { + if (dmPolicy === "disabled") { + logVerbose(deps, `Blocked zalo DM from ${senderId} (dmPolicy=disabled)`); + return; + } + + if (dmPolicy !== "open") { + const storeAllowFrom = await deps.readChannelAllowFromStore("zalo").catch(() => []); + const effectiveAllowFrom = [...configAllowFrom, ...storeAllowFrom]; + const allowed = isSenderAllowed(senderId, effectiveAllowFrom); + + if (!allowed) { + if (dmPolicy === "pairing") { + const { code, created } = await deps.upsertChannelPairingRequest({ + channel: "zalo", + id: senderId, + meta: { name: senderName ?? undefined }, + }); + + if (created) { + logVerbose(deps, `zalo pairing request sender=${senderId}`); + try { + await sendMessage( + token, + { + chat_id: chatId, + text: deps.buildPairingReply({ + channel: "zalo", + idLine: `Your Zalo user id: ${senderId}`, + code, + }), + }, + fetcher, + ); + statusSink?.({ lastOutboundAt: Date.now() }); + } catch (err) { + logVerbose(deps, `zalo pairing reply failed for ${senderId}: ${String(err)}`); + } + } + } else { + logVerbose(deps, `Blocked unauthorized zalo sender ${senderId} (dmPolicy=${dmPolicy})`); + } + return; + } + } + } + + const route = deps.resolveAgentRoute({ + cfg: config, + channel: "zalo", + accountId: account.accountId, + peer: { + kind: isGroup ? "group" : "dm", + id: chatId, + }, + }); + + const rawBody = text?.trim() || (mediaPath ? "" : ""); + const fromLabel = isGroup + ? `group:${chatId} from ${senderName || senderId}` + : senderName || `user:${senderId}`; + const body = deps.formatAgentEnvelope({ + channel: "Zalo", + from: fromLabel, + timestamp: date ? date * 1000 : undefined, + body: rawBody, + }); + + const ctxPayload = { + Body: body, + RawBody: rawBody, + CommandBody: rawBody, + From: isGroup ? `group:${chatId}` : `zalo:${senderId}`, + To: `zalo:${chatId}`, + SessionKey: route.sessionKey, + AccountId: route.accountId, + ChatType: isGroup ? "group" : "direct", + SenderName: senderName || undefined, + SenderId: senderId, + Provider: "zalo", + Surface: "zalo", + MessageSid: message_id, + MediaPath: mediaPath, + MediaType: mediaType, + MediaUrl: mediaPath, + OriginatingChannel: "zalo", + OriginatingTo: `zalo:${chatId}`, + }; + + await deps.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: config, + dispatcherOptions: { + deliver: async (payload) => { + await deliverZaloReply({ + payload, + token, + chatId, + runtime, + deps, + statusSink, + fetcher, + }); + }, + onError: (err, info) => { + runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`); + }, + }, + }); +} + +async function deliverZaloReply(params: { + payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string }; + token: string; + chatId: string; + runtime: ZaloRuntimeEnv; + deps: Awaited>; + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; + fetcher?: ZaloFetch; +}): Promise { + const { payload, token, chatId, runtime, deps, statusSink, fetcher } = params; + + const mediaList = payload.mediaUrls?.length + ? payload.mediaUrls + : payload.mediaUrl + ? [payload.mediaUrl] + : []; + + if (mediaList.length > 0) { + let first = true; + for (const mediaUrl of mediaList) { + const caption = first ? payload.text : undefined; + first = false; + try { + await sendPhoto(token, { chat_id: chatId, photo: mediaUrl, caption }, fetcher); + statusSink?.({ lastOutboundAt: Date.now() }); + } catch (err) { + runtime.error?.(`Zalo photo send failed: ${String(err)}`); + } + } + return; + } + + if (payload.text) { + const chunks = deps.chunkMarkdownText(payload.text, ZALO_TEXT_LIMIT); + for (const chunk of chunks) { + try { + await sendMessage(token, { chat_id: chatId, text: chunk }, fetcher); + statusSink?.({ lastOutboundAt: Date.now() }); + } catch (err) { + runtime.error?.(`Zalo message send failed: ${String(err)}`); + } + } + } +} + +export async function monitorZaloProvider( + options: ZaloMonitorOptions, +): Promise { + const { + token, + account, + config, + runtime, + abortSignal, + useWebhook, + webhookUrl, + webhookSecret, + webhookPath, + statusSink, + fetcher: fetcherOverride, + } = options; + + const deps = await loadCoreChannelDeps(); + const effectiveMediaMaxMb = account.config.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB; + const fetcher = fetcherOverride ?? resolveZaloProxyFetch(account.config.proxy); + + let stopped = false; + const stopHandlers: Array<() => void> = []; + + const stop = () => { + stopped = true; + for (const handler of stopHandlers) { + handler(); + } + }; + + if (useWebhook) { + if (!webhookUrl || !webhookSecret) { + throw new Error("Zalo webhookUrl and webhookSecret are required for webhook mode"); + } + if (!webhookUrl.startsWith("https://")) { + throw new Error("Zalo webhook URL must use HTTPS"); + } + if (webhookSecret.length < 8 || webhookSecret.length > 256) { + throw new Error("Zalo webhook secret must be 8-256 characters"); + } + + const path = resolveWebhookPath(webhookPath, webhookUrl); + if (!path) { + throw new Error("Zalo webhookPath could not be derived"); + } + + await setWebhook(token, { url: webhookUrl, secret_token: webhookSecret }, fetcher); + + const unregister = registerZaloWebhookTarget({ + token, + account, + config, + runtime, + deps, + path, + secret: webhookSecret, + statusSink: (patch) => statusSink?.(patch), + mediaMaxMb: effectiveMediaMaxMb, + fetcher, + }); + stopHandlers.push(unregister); + abortSignal.addEventListener( + "abort", + () => { + void deleteWebhook(token, fetcher).catch(() => {}); + }, + { once: true }, + ); + return { stop }; + } + + try { + await deleteWebhook(token, fetcher); + } catch { + // ignore + } + + startPollingLoop({ + token, + account, + config, + runtime, + deps, + abortSignal, + isStopped: () => stopped, + mediaMaxMb: effectiveMediaMaxMb, + statusSink, + fetcher, + }); + + return { stop }; +} diff --git a/extensions/zalo/src/onboarding.ts b/extensions/zalo/src/onboarding.ts new file mode 100644 index 000000000..1f60a841b --- /dev/null +++ b/extensions/zalo/src/onboarding.ts @@ -0,0 +1,384 @@ +import type { ChannelOnboardingAdapter, ChannelOnboardingDmPolicy } from "../../src/channels/plugins/onboarding-types.js"; +import type { WizardPrompter } from "../../src/wizard/prompts.js"; + +import { addWildcardAllowFrom, promptAccountId } from "./shared/onboarding.js"; +import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "./shared/account-ids.js"; +import { + listZaloAccountIds, + resolveDefaultZaloAccountId, + resolveZaloAccount, +} from "./accounts.js"; +import type { CoreConfig } from "./types.js"; + +const channel = "zalo" as const; + +type UpdateMode = "polling" | "webhook"; + +function setZaloDmPolicy(cfg: CoreConfig, dmPolicy: "pairing" | "allowlist" | "open" | "disabled") { + const allowFrom = dmPolicy === "open" ? addWildcardAllowFrom(cfg.channels?.zalo?.allowFrom) : undefined; + return { + ...cfg, + channels: { + ...cfg.channels, + zalo: { + ...cfg.channels?.zalo, + dmPolicy, + ...(allowFrom ? { allowFrom } : {}), + }, + }, + } as CoreConfig; +} + +function setZaloUpdateMode( + cfg: CoreConfig, + accountId: string, + mode: UpdateMode, + webhookUrl?: string, + webhookSecret?: string, + webhookPath?: string, +): CoreConfig { + const isDefault = accountId === DEFAULT_ACCOUNT_ID; + if (mode === "polling") { + if (isDefault) { + const { + webhookUrl: _url, + webhookSecret: _secret, + webhookPath: _path, + ...rest + } = cfg.channels?.zalo ?? {}; + return { + ...cfg, + channels: { + ...cfg.channels, + zalo: rest, + }, + } as CoreConfig; + } + const accounts = { ...(cfg.channels?.zalo?.accounts ?? {}) } as Record< + string, + Record + >; + const existing = accounts[accountId] ?? {}; + const { + webhookUrl: _url, + webhookSecret: _secret, + webhookPath: _path, + ...rest + } = existing; + accounts[accountId] = rest; + return { + ...cfg, + channels: { + ...cfg.channels, + zalo: { + ...cfg.channels?.zalo, + accounts, + }, + }, + } as CoreConfig; + } + + if (isDefault) { + return { + ...cfg, + channels: { + ...cfg.channels, + zalo: { + ...cfg.channels?.zalo, + webhookUrl, + webhookSecret, + webhookPath, + }, + }, + } as CoreConfig; + } + + const accounts = { ...(cfg.channels?.zalo?.accounts ?? {}) } as Record< + string, + Record + >; + accounts[accountId] = { + ...(accounts[accountId] ?? {}), + webhookUrl, + webhookSecret, + webhookPath, + }; + return { + ...cfg, + channels: { + ...cfg.channels, + zalo: { + ...cfg.channels?.zalo, + accounts, + }, + }, + } as CoreConfig; +} + +async function noteZaloTokenHelp(prompter: WizardPrompter): Promise { + await prompter.note( + [ + "1) Open Zalo Bot Platform: https://bot.zaloplatforms.com", + "2) Create a bot and get the token", + "3) Token looks like 12345689:abc-xyz", + "Tip: you can also set ZALO_BOT_TOKEN in your env.", + "Docs: https://docs.clawd.bot/channels/zalo", + ].join("\n"), + "Zalo bot token", + ); +} + +async function promptZaloAllowFrom(params: { + cfg: CoreConfig; + prompter: WizardPrompter; + accountId: string; +}): Promise { + const { cfg, prompter, accountId } = params; + const resolved = resolveZaloAccount({ cfg, accountId }); + const existingAllowFrom = resolved.config.allowFrom ?? []; + const entry = await prompter.text({ + message: "Zalo allowFrom (user id)", + placeholder: "123456789", + initialValue: existingAllowFrom[0] ? String(existingAllowFrom[0]) : undefined, + validate: (value) => { + const raw = String(value ?? "").trim(); + if (!raw) return "Required"; + if (!/^\d+$/.test(raw)) return "Use a numeric Zalo user id"; + return undefined; + }, + }); + const normalized = String(entry).trim(); + const merged = [ + ...existingAllowFrom.map((item) => String(item).trim()).filter(Boolean), + normalized, + ]; + const unique = [...new Set(merged)]; + + if (accountId === DEFAULT_ACCOUNT_ID) { + return { + ...cfg, + channels: { + ...cfg.channels, + zalo: { + ...cfg.channels?.zalo, + enabled: true, + dmPolicy: "allowlist", + allowFrom: unique, + }, + }, + } as CoreConfig; + } + + return { + ...cfg, + channels: { + ...cfg.channels, + zalo: { + ...cfg.channels?.zalo, + enabled: true, + accounts: { + ...(cfg.channels?.zalo?.accounts ?? {}), + [accountId]: { + ...(cfg.channels?.zalo?.accounts?.[accountId] ?? {}), + enabled: cfg.channels?.zalo?.accounts?.[accountId]?.enabled ?? true, + dmPolicy: "allowlist", + allowFrom: unique, + }, + }, + }, + }, + } as CoreConfig; +} + +const dmPolicy: ChannelOnboardingDmPolicy = { + label: "Zalo", + channel, + policyKey: "channels.zalo.dmPolicy", + allowFromKey: "channels.zalo.allowFrom", + getCurrent: (cfg) => (cfg.channels?.zalo?.dmPolicy ?? "pairing") as "pairing", + setPolicy: (cfg, policy) => setZaloDmPolicy(cfg as CoreConfig, policy), +}; + +export const zaloOnboardingAdapter: ChannelOnboardingAdapter = { + channel, + dmPolicy, + getStatus: async ({ cfg }) => { + const configured = listZaloAccountIds(cfg as CoreConfig).some((accountId) => + Boolean(resolveZaloAccount({ cfg: cfg as CoreConfig, accountId }).token), + ); + return { + channel, + configured, + statusLines: [`Zalo: ${configured ? "configured" : "needs token"}`], + selectionHint: configured ? "recommended · configured" : "recommended · newcomer-friendly", + quickstartScore: configured ? 1 : 10, + }; + }, + configure: async ({ cfg, prompter, accountOverrides, shouldPromptAccountIds, forceAllowFrom }) => { + const zaloOverride = accountOverrides.zalo?.trim(); + const defaultZaloAccountId = resolveDefaultZaloAccountId(cfg as CoreConfig); + let zaloAccountId = zaloOverride + ? normalizeAccountId(zaloOverride) + : defaultZaloAccountId; + if (shouldPromptAccountIds && !zaloOverride) { + zaloAccountId = await promptAccountId({ + cfg: cfg as CoreConfig, + prompter, + label: "Zalo", + currentId: zaloAccountId, + listAccountIds: listZaloAccountIds, + defaultAccountId: defaultZaloAccountId, + }); + } + + let next = cfg as CoreConfig; + const resolvedAccount = resolveZaloAccount({ cfg: next, accountId: zaloAccountId }); + const accountConfigured = Boolean(resolvedAccount.token); + const allowEnv = zaloAccountId === DEFAULT_ACCOUNT_ID; + const canUseEnv = allowEnv && Boolean(process.env.ZALO_BOT_TOKEN?.trim()); + const hasConfigToken = Boolean( + resolvedAccount.config.botToken || resolvedAccount.config.tokenFile, + ); + + let token: string | null = null; + if (!accountConfigured) { + await noteZaloTokenHelp(prompter); + } + if (canUseEnv && !resolvedAccount.config.botToken) { + const keepEnv = await prompter.confirm({ + message: "ZALO_BOT_TOKEN detected. Use env var?", + initialValue: true, + }); + if (keepEnv) { + next = { + ...next, + channels: { + ...next.channels, + zalo: { + ...next.channels?.zalo, + enabled: true, + }, + }, + } as CoreConfig; + } else { + token = String( + await prompter.text({ + message: "Enter Zalo bot token", + validate: (value) => (value?.trim() ? undefined : "Required"), + }), + ).trim(); + } + } else if (hasConfigToken) { + const keep = await prompter.confirm({ + message: "Zalo token already configured. Keep it?", + initialValue: true, + }); + if (!keep) { + token = String( + await prompter.text({ + message: "Enter Zalo bot token", + validate: (value) => (value?.trim() ? undefined : "Required"), + }), + ).trim(); + } + } else { + token = String( + await prompter.text({ + message: "Enter Zalo bot token", + validate: (value) => (value?.trim() ? undefined : "Required"), + }), + ).trim(); + } + + if (token) { + if (zaloAccountId === DEFAULT_ACCOUNT_ID) { + next = { + ...next, + channels: { + ...next.channels, + zalo: { + ...next.channels?.zalo, + enabled: true, + botToken: token, + }, + }, + } as CoreConfig; + } else { + next = { + ...next, + channels: { + ...next.channels, + zalo: { + ...next.channels?.zalo, + enabled: true, + accounts: { + ...(next.channels?.zalo?.accounts ?? {}), + [zaloAccountId]: { + ...(next.channels?.zalo?.accounts?.[zaloAccountId] ?? {}), + enabled: true, + botToken: token, + }, + }, + }, + }, + } as CoreConfig; + } + } + + const wantsWebhook = await prompter.confirm({ + message: "Use webhook mode for Zalo?", + initialValue: false, + }); + if (wantsWebhook) { + const webhookUrl = String( + await prompter.text({ + message: "Webhook URL (https://...) ", + validate: (value) => (value?.trim()?.startsWith("https://") ? undefined : "HTTPS URL required"), + }), + ).trim(); + const defaultPath = (() => { + try { + return new URL(webhookUrl).pathname || "/zalo-webhook"; + } catch { + return "/zalo-webhook"; + } + })(); + const webhookSecret = String( + await prompter.text({ + message: "Webhook secret (8-256 chars)", + validate: (value) => { + const raw = String(value ?? ""); + if (raw.length < 8 || raw.length > 256) return "8-256 chars"; + return undefined; + }, + }), + ).trim(); + const webhookPath = String( + await prompter.text({ + message: "Webhook path (optional)", + initialValue: defaultPath, + }), + ).trim(); + next = setZaloUpdateMode( + next, + zaloAccountId, + "webhook", + webhookUrl, + webhookSecret, + webhookPath || undefined, + ); + } else { + next = setZaloUpdateMode(next, zaloAccountId, "polling"); + } + + if (forceAllowFrom) { + next = await promptZaloAllowFrom({ + cfg: next, + prompter, + accountId: zaloAccountId, + }); + } + + return { cfg: next, accountId: zaloAccountId }; + }, +}; diff --git a/extensions/zalo/src/probe.ts b/extensions/zalo/src/probe.ts new file mode 100644 index 000000000..ebdb37a34 --- /dev/null +++ b/extensions/zalo/src/probe.ts @@ -0,0 +1,46 @@ +import { getMe, ZaloApiError, type ZaloBotInfo, type ZaloFetch } from "./api.js"; + +export type ZaloProbeResult = { + ok: boolean; + bot?: ZaloBotInfo; + error?: string; + elapsedMs: number; +}; + +export async function probeZalo( + token: string, + timeoutMs = 5000, + fetcher?: ZaloFetch, +): Promise { + if (!token?.trim()) { + return { ok: false, error: "No token provided", elapsedMs: 0 }; + } + + const startTime = Date.now(); + + try { + const response = await getMe(token.trim(), timeoutMs, fetcher); + const elapsedMs = Date.now() - startTime; + + if (response.ok && response.result) { + return { ok: true, bot: response.result, elapsedMs }; + } + + return { ok: false, error: "Invalid response from Zalo API", elapsedMs }; + } catch (err) { + const elapsedMs = Date.now() - startTime; + + if (err instanceof ZaloApiError) { + return { ok: false, error: err.description ?? err.message, elapsedMs }; + } + + if (err instanceof Error) { + if (err.name === "AbortError") { + return { ok: false, error: `Request timed out after ${timeoutMs}ms`, elapsedMs }; + } + return { ok: false, error: err.message, elapsedMs }; + } + + return { ok: false, error: String(err), elapsedMs }; + } +} diff --git a/extensions/zalo/src/proxy.ts b/extensions/zalo/src/proxy.ts new file mode 100644 index 000000000..65d6e04af --- /dev/null +++ b/extensions/zalo/src/proxy.ts @@ -0,0 +1,18 @@ +import { ProxyAgent, fetch as undiciFetch } from "undici"; +import type { Dispatcher } from "undici"; + +import type { ZaloFetch } from "./api.js"; + +const proxyCache = new Map(); + +export function resolveZaloProxyFetch(proxyUrl?: string | null): ZaloFetch | undefined { + const trimmed = proxyUrl?.trim(); + if (!trimmed) return undefined; + const cached = proxyCache.get(trimmed); + if (cached) return cached; + const agent = new ProxyAgent(trimmed); + const fetcher: ZaloFetch = (input, init) => + undiciFetch(input, { ...(init ?? {}), dispatcher: agent as Dispatcher }); + proxyCache.set(trimmed, fetcher); + return fetcher; +} diff --git a/extensions/zalo/src/send.ts b/extensions/zalo/src/send.ts new file mode 100644 index 000000000..202520cc8 --- /dev/null +++ b/extensions/zalo/src/send.ts @@ -0,0 +1,116 @@ +import type { CoreConfig } from "./types.js"; +import type { ZaloFetch } from "./api.js"; +import { sendMessage, sendPhoto } from "./api.js"; +import { resolveZaloAccount } from "./accounts.js"; +import { resolveZaloProxyFetch } from "./proxy.js"; +import { resolveZaloToken } from "./token.js"; + +export type ZaloSendOptions = { + token?: string; + accountId?: string; + cfg?: CoreConfig; + mediaUrl?: string; + caption?: string; + verbose?: boolean; + proxy?: string; +}; + +export type ZaloSendResult = { + ok: boolean; + messageId?: string; + error?: string; +}; + +function resolveSendContext(options: ZaloSendOptions): { + token: string; + fetcher?: ZaloFetch; +} { + if (options.cfg) { + const account = resolveZaloAccount({ + cfg: options.cfg, + accountId: options.accountId, + }); + const token = options.token || account.token; + const proxy = options.proxy ?? account.config.proxy; + return { token, fetcher: resolveZaloProxyFetch(proxy) }; + } + + const token = options.token ?? resolveZaloToken(undefined, options.accountId).token; + const proxy = options.proxy; + return { token: token || process.env.ZALO_BOT_TOKEN?.trim() || "", fetcher: resolveZaloProxyFetch(proxy) }; +} + +export async function sendMessageZalo( + chatId: string, + text: string, + options: ZaloSendOptions = {}, +): Promise { + const { token, fetcher } = resolveSendContext(options); + + if (!token) { + return { ok: false, error: "No Zalo bot token configured" }; + } + + if (!chatId?.trim()) { + return { ok: false, error: "No chat_id provided" }; + } + + if (options.mediaUrl) { + return sendPhotoZalo(chatId, options.mediaUrl, { + ...options, + token, + caption: text || options.caption, + }); + } + + try { + const response = await sendMessage(token, { + chat_id: chatId.trim(), + text: text.slice(0, 2000), + }, fetcher); + + if (response.ok && response.result) { + return { ok: true, messageId: response.result.message_id }; + } + + return { ok: false, error: "Failed to send message" }; + } catch (err) { + return { ok: false, error: err instanceof Error ? err.message : String(err) }; + } +} + +export async function sendPhotoZalo( + chatId: string, + photoUrl: string, + options: ZaloSendOptions = {}, +): Promise { + const { token, fetcher } = resolveSendContext(options); + + if (!token) { + return { ok: false, error: "No Zalo bot token configured" }; + } + + if (!chatId?.trim()) { + return { ok: false, error: "No chat_id provided" }; + } + + if (!photoUrl?.trim()) { + return { ok: false, error: "No photo URL provided" }; + } + + try { + const response = await sendPhoto(token, { + chat_id: chatId.trim(), + photo: photoUrl.trim(), + caption: options.caption?.slice(0, 2000), + }, fetcher); + + if (response.ok && response.result) { + return { ok: true, messageId: response.result.message_id }; + } + + return { ok: false, error: "Failed to send photo" }; + } catch (err) { + return { ok: false, error: err instanceof Error ? err.message : String(err) }; + } +} diff --git a/extensions/zalo/src/shared/account-ids.ts b/extensions/zalo/src/shared/account-ids.ts new file mode 100644 index 000000000..5edcd8376 --- /dev/null +++ b/extensions/zalo/src/shared/account-ids.ts @@ -0,0 +1,15 @@ +export const DEFAULT_ACCOUNT_ID = "default"; + +export function normalizeAccountId(value: string | undefined | null): string { + const trimmed = (value ?? "").trim(); + if (!trimmed) return DEFAULT_ACCOUNT_ID; + if (/^[a-z0-9][a-z0-9_-]{0,63}$/i.test(trimmed)) return trimmed; + return ( + trimmed + .toLowerCase() + .replace(/[^a-z0-9_-]+/g, "-") + .replace(/^-+/, "") + .replace(/-+$/, "") + .slice(0, 64) || DEFAULT_ACCOUNT_ID + ); +} diff --git a/extensions/zalo/src/shared/channel-config.ts b/extensions/zalo/src/shared/channel-config.ts new file mode 100644 index 000000000..184b5cf12 --- /dev/null +++ b/extensions/zalo/src/shared/channel-config.ts @@ -0,0 +1,112 @@ +import { DEFAULT_ACCOUNT_ID } from "./account-ids.js"; + +type ChannelSection = { + accounts?: Record>; + enabled?: boolean; +}; + +type ConfigWithChannels = { + channels?: Record; +}; + +export function setAccountEnabledInConfigSection(params: { + cfg: T; + sectionKey: string; + accountId: string; + enabled: boolean; + allowTopLevel?: boolean; +}): T { + const accountKey = params.accountId || DEFAULT_ACCOUNT_ID; + const channels = params.cfg.channels; + const base = (channels?.[params.sectionKey] as ChannelSection | undefined) ?? undefined; + const hasAccounts = Boolean(base?.accounts); + if (params.allowTopLevel && accountKey === DEFAULT_ACCOUNT_ID && !hasAccounts) { + return { + ...params.cfg, + channels: { + ...channels, + [params.sectionKey]: { + ...base, + enabled: params.enabled, + }, + }, + } as T; + } + + const baseAccounts = (base?.accounts ?? {}) as Record>; + const existing = baseAccounts[accountKey] ?? {}; + return { + ...params.cfg, + channels: { + ...channels, + [params.sectionKey]: { + ...base, + accounts: { + ...baseAccounts, + [accountKey]: { + ...existing, + enabled: params.enabled, + }, + }, + }, + }, + } as T; +} + +export function deleteAccountFromConfigSection(params: { + cfg: T; + sectionKey: string; + accountId: string; + clearBaseFields?: string[]; +}): T { + const accountKey = params.accountId || DEFAULT_ACCOUNT_ID; + const channels = params.cfg.channels as Record | undefined; + const base = (channels?.[params.sectionKey] as ChannelSection | undefined) ?? undefined; + if (!base) return params.cfg; + + const baseAccounts = + base.accounts && typeof base.accounts === "object" ? { ...base.accounts } : undefined; + + if (accountKey !== DEFAULT_ACCOUNT_ID) { + const accounts = baseAccounts ? { ...baseAccounts } : {}; + delete accounts[accountKey]; + return { + ...params.cfg, + channels: { + ...channels, + [params.sectionKey]: { + ...base, + accounts: Object.keys(accounts).length ? accounts : undefined, + }, + }, + } as T; + } + + if (baseAccounts && Object.keys(baseAccounts).length > 0) { + delete baseAccounts[accountKey]; + const baseRecord = { ...(base as Record) }; + for (const field of params.clearBaseFields ?? []) { + if (field in baseRecord) baseRecord[field] = undefined; + } + return { + ...params.cfg, + channels: { + ...channels, + [params.sectionKey]: { + ...baseRecord, + accounts: Object.keys(baseAccounts).length ? baseAccounts : undefined, + }, + }, + } as T; + } + + const nextChannels = { ...channels } as Record; + delete nextChannels[params.sectionKey]; + const nextCfg = { ...params.cfg } as T; + if (Object.keys(nextChannels).length > 0) { + nextCfg.channels = nextChannels as T["channels"]; + } else { + delete nextCfg.channels; + } + return nextCfg; +} diff --git a/extensions/zalo/src/shared/channel-setup.ts b/extensions/zalo/src/shared/channel-setup.ts new file mode 100644 index 000000000..b164ed18f --- /dev/null +++ b/extensions/zalo/src/shared/channel-setup.ts @@ -0,0 +1,114 @@ +import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "./account-ids.js"; + +type ConfigWithChannels = { + channels?: Record; +}; + +type ChannelSectionBase = { + name?: string; + accounts?: Record>; +}; + +function channelHasAccounts(cfg: ConfigWithChannels, channelKey: string): boolean { + const channels = cfg.channels as Record | undefined; + const base = channels?.[channelKey] as ChannelSectionBase | undefined; + return Boolean(base?.accounts && Object.keys(base.accounts).length > 0); +} + +function shouldStoreNameInAccounts(params: { + cfg: ConfigWithChannels; + channelKey: string; + accountId: string; + alwaysUseAccounts?: boolean; +}): boolean { + if (params.alwaysUseAccounts) return true; + if (params.accountId !== DEFAULT_ACCOUNT_ID) return true; + return channelHasAccounts(params.cfg, params.channelKey); +} + +export function applyAccountNameToChannelSection(params: { + cfg: T; + channelKey: string; + accountId: string; + name?: string; + alwaysUseAccounts?: boolean; +}): T { + const trimmed = params.name?.trim(); + if (!trimmed) return params.cfg; + const accountId = normalizeAccountId(params.accountId); + const channels = params.cfg.channels as Record | undefined; + const baseConfig = channels?.[params.channelKey]; + const base = + typeof baseConfig === "object" && baseConfig ? (baseConfig as ChannelSectionBase) : undefined; + const useAccounts = shouldStoreNameInAccounts({ + cfg: params.cfg, + channelKey: params.channelKey, + accountId, + alwaysUseAccounts: params.alwaysUseAccounts, + }); + if (!useAccounts && accountId === DEFAULT_ACCOUNT_ID) { + const safeBase = base ?? {}; + return { + ...params.cfg, + channels: { + ...channels, + [params.channelKey]: { + ...safeBase, + name: trimmed, + }, + }, + } as T; + } + const baseAccounts: Record> = base?.accounts ?? {}; + const existingAccount = baseAccounts[accountId] ?? {}; + const baseWithoutName = + accountId === DEFAULT_ACCOUNT_ID + ? (({ name: _ignored, ...rest }) => rest)(base ?? {}) + : (base ?? {}); + return { + ...params.cfg, + channels: { + ...channels, + [params.channelKey]: { + ...baseWithoutName, + accounts: { + ...baseAccounts, + [accountId]: { + ...existingAccount, + name: trimmed, + }, + }, + }, + }, + } as T; +} + +export function migrateBaseNameToDefaultAccount(params: { + cfg: T; + channelKey: string; + alwaysUseAccounts?: boolean; +}): T { + if (params.alwaysUseAccounts) return params.cfg; + const channels = params.cfg.channels as Record | undefined; + const base = channels?.[params.channelKey] as ChannelSectionBase | undefined; + const baseName = base?.name?.trim(); + if (!baseName) return params.cfg; + const accounts: Record> = { + ...base?.accounts, + }; + const defaultAccount = accounts[DEFAULT_ACCOUNT_ID] ?? {}; + if (!defaultAccount.name) { + accounts[DEFAULT_ACCOUNT_ID] = { ...defaultAccount, name: baseName }; + } + const { name: _ignored, ...rest } = base ?? {}; + return { + ...params.cfg, + channels: { + ...channels, + [params.channelKey]: { + ...rest, + accounts, + }, + }, + } as T; +} diff --git a/extensions/zalo/src/shared/onboarding.ts b/extensions/zalo/src/shared/onboarding.ts new file mode 100644 index 000000000..a998fb778 --- /dev/null +++ b/extensions/zalo/src/shared/onboarding.ts @@ -0,0 +1,53 @@ +import type { WizardPrompter } from "../../../src/wizard/prompts.js"; + +import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "./account-ids.js"; + +export type PromptAccountIdParams = { + cfg: TConfig; + prompter: WizardPrompter; + label: string; + currentId?: string; + listAccountIds: (cfg: TConfig) => string[]; + defaultAccountId: string; +}; + +export async function promptAccountId( + params: PromptAccountIdParams, +): Promise { + const existingIds = params.listAccountIds(params.cfg); + const initial = params.currentId?.trim() || params.defaultAccountId || DEFAULT_ACCOUNT_ID; + const choice = (await params.prompter.select({ + message: `${params.label} account`, + options: [ + ...existingIds.map((id) => ({ + value: id, + label: id === DEFAULT_ACCOUNT_ID ? "default (primary)" : id, + })), + { value: "__new__", label: "Add a new account" }, + ], + initialValue: initial, + })) as string; + + if (choice !== "__new__") return normalizeAccountId(choice); + + const entered = await params.prompter.text({ + message: `New ${params.label} account id`, + validate: (value) => (value?.trim() ? undefined : "Required"), + }); + const normalized = normalizeAccountId(String(entered)); + if (String(entered).trim() !== normalized) { + await params.prompter.note( + `Normalized account id to "${normalized}".`, + `${params.label} account`, + ); + } + return normalized; +} + +export function addWildcardAllowFrom( + allowFrom?: Array | null, +): Array { + const next = (allowFrom ?? []).map((v) => String(v).trim()).filter(Boolean); + if (!next.includes("*")) next.push("*"); + return next; +} diff --git a/extensions/zalo/src/shared/pairing.ts b/extensions/zalo/src/shared/pairing.ts new file mode 100644 index 000000000..91e75fbab --- /dev/null +++ b/extensions/zalo/src/shared/pairing.ts @@ -0,0 +1,6 @@ +export const PAIRING_APPROVED_MESSAGE = + "\u2705 Clawdbot access approved. Send a message to start chatting."; + +export function formatPairingApproveHint(channelId: string): string { + return `Approve via: clawdbot pairing list ${channelId} / clawdbot pairing approve ${channelId} `; +} diff --git a/extensions/zalo/src/status-issues.ts b/extensions/zalo/src/status-issues.ts new file mode 100644 index 000000000..c5ca219f3 --- /dev/null +++ b/extensions/zalo/src/status-issues.ts @@ -0,0 +1,50 @@ +import type { ChannelAccountSnapshot, ChannelStatusIssue } from "../../src/channels/plugins/types.js"; + +type ZaloAccountStatus = { + accountId?: unknown; + enabled?: unknown; + configured?: unknown; + dmPolicy?: unknown; +}; + +const isRecord = (value: unknown): value is Record => + Boolean(value && typeof value === "object"); + +const asString = (value: unknown): string | undefined => + typeof value === "string" ? value : typeof value === "number" ? String(value) : undefined; + +function readZaloAccountStatus(value: ChannelAccountSnapshot): ZaloAccountStatus | null { + if (!isRecord(value)) return null; + return { + accountId: value.accountId, + enabled: value.enabled, + configured: value.configured, + dmPolicy: value.dmPolicy, + }; +} + +export function collectZaloStatusIssues( + accounts: ChannelAccountSnapshot[], +): ChannelStatusIssue[] { + const issues: ChannelStatusIssue[] = []; + for (const entry of accounts) { + const account = readZaloAccountStatus(entry); + if (!account) continue; + const accountId = asString(account.accountId) ?? "default"; + const enabled = account.enabled !== false; + const configured = account.configured === true; + if (!enabled || !configured) continue; + + if (account.dmPolicy === "open") { + issues.push({ + channel: "zalo", + accountId, + kind: "config", + message: + 'Zalo dmPolicy is "open", allowing any user to message the bot without pairing.', + fix: 'Set channels.zalo.dmPolicy to "pairing" or "allowlist" to restrict access.', + }); + } + } + return issues; +} diff --git a/extensions/zalo/src/token.ts b/extensions/zalo/src/token.ts new file mode 100644 index 000000000..be3ee5dd9 --- /dev/null +++ b/extensions/zalo/src/token.ts @@ -0,0 +1,54 @@ +import { readFileSync } from "node:fs"; + +import type { ZaloConfig } from "./types.js"; +import { DEFAULT_ACCOUNT_ID } from "./shared/account-ids.js"; + +export type ZaloTokenResolution = { + token: string; + source: "env" | "config" | "configFile" | "none"; +}; + +export function resolveZaloToken( + config: ZaloConfig | undefined, + accountId?: string | null, +): ZaloTokenResolution { + const resolvedAccountId = accountId ?? DEFAULT_ACCOUNT_ID; + const isDefaultAccount = resolvedAccountId === DEFAULT_ACCOUNT_ID; + const baseConfig = config; + const accountConfig = + resolvedAccountId !== DEFAULT_ACCOUNT_ID + ? (baseConfig?.accounts?.[resolvedAccountId] as ZaloConfig | undefined) + : undefined; + + if (accountConfig) { + const token = accountConfig.botToken?.trim(); + if (token) return { token, source: "config" }; + const tokenFile = accountConfig.tokenFile?.trim(); + if (tokenFile) { + try { + const fileToken = readFileSync(tokenFile, "utf8").trim(); + if (fileToken) return { token: fileToken, source: "configFile" }; + } catch { + // ignore read failures + } + } + } + + if (isDefaultAccount) { + const token = baseConfig?.botToken?.trim(); + if (token) return { token, source: "config" }; + const tokenFile = baseConfig?.tokenFile?.trim(); + if (tokenFile) { + try { + const fileToken = readFileSync(tokenFile, "utf8").trim(); + if (fileToken) return { token: fileToken, source: "configFile" }; + } catch { + // ignore read failures + } + } + const envToken = process.env.ZALO_BOT_TOKEN?.trim(); + if (envToken) return { token: envToken, source: "env" }; + } + + return { token: "", source: "none" }; +} diff --git a/extensions/zalo/src/tool-helpers.ts b/extensions/zalo/src/tool-helpers.ts new file mode 100644 index 000000000..358be47ac --- /dev/null +++ b/extensions/zalo/src/tool-helpers.ts @@ -0,0 +1,30 @@ +export function readStringParam( + params: Record, + key: string, + opts?: { required?: boolean; allowEmpty?: boolean; trim?: boolean }, +): string | undefined { + const raw = params[key]; + if (raw === undefined || raw === null) { + if (opts?.required) throw new Error(`${key} is required`); + return undefined; + } + const value = String(raw); + const trimmed = opts?.trim === false ? value : value.trim(); + if (!opts?.allowEmpty && !trimmed) { + if (opts?.required) throw new Error(`${key} is required`); + return undefined; + } + return trimmed; +} + +export function jsonResult(payload: unknown) { + return { + content: [ + { + type: "text", + text: JSON.stringify(payload, null, 2), + }, + ], + details: payload, + }; +} diff --git a/extensions/zalo/src/types.ts b/extensions/zalo/src/types.ts new file mode 100644 index 000000000..8309654c7 --- /dev/null +++ b/extensions/zalo/src/types.ts @@ -0,0 +1,49 @@ +export type ZaloAccountConfig = { + /** Optional display name for this account (used in CLI/UI lists). */ + name?: string; + /** If false, do not start this Zalo account. Default: true. */ + enabled?: boolean; + /** Bot token from Zalo Bot Creator. */ + botToken?: string; + /** Path to file containing the bot token. */ + tokenFile?: string; + /** Webhook URL for receiving updates (HTTPS required). */ + webhookUrl?: string; + /** Webhook secret token (8-256 chars) for request verification. */ + webhookSecret?: string; + /** Webhook path for the gateway HTTP server (defaults to webhook URL path). */ + webhookPath?: string; + /** Direct message access policy (default: pairing). */ + dmPolicy?: "pairing" | "allowlist" | "open" | "disabled"; + /** Allowlist for DM senders (Zalo user IDs). */ + allowFrom?: Array; + /** Max inbound media size in MB. */ + mediaMaxMb?: number; + /** Proxy URL for API requests. */ + proxy?: string; +}; + +export type ZaloConfig = { + /** Optional per-account Zalo configuration (multi-account). */ + accounts?: Record; + /** Default account ID when multiple accounts are configured. */ + defaultAccount?: string; +} & ZaloAccountConfig; + +export type ZaloTokenSource = "env" | "config" | "configFile" | "none"; + +export type ResolvedZaloAccount = { + accountId: string; + name?: string; + enabled: boolean; + token: string; + tokenSource: ZaloTokenSource; + config: ZaloAccountConfig; +}; + +export type CoreConfig = { + channels?: { + zalo?: ZaloConfig; + }; + [key: string]: unknown; +}; diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index 8e8111fa2..f814d1659 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -190,6 +190,7 @@ export function createGatewayHttpServer(opts: { controlUiBasePath: string; openAiChatCompletionsEnabled: boolean; handleHooksRequest: HooksRequestHandler; + handlePluginRequest?: HooksRequestHandler; resolvedAuth: import("./auth.js").ResolvedGatewayAuth; }): HttpServer { const { @@ -198,6 +199,7 @@ export function createGatewayHttpServer(opts: { controlUiBasePath, openAiChatCompletionsEnabled, handleHooksRequest, + handlePluginRequest, resolvedAuth, } = opts; const httpServer: HttpServer = createHttpServer((req, res) => { @@ -206,6 +208,7 @@ export function createGatewayHttpServer(opts: { void (async () => { if (await handleHooksRequest(req, res)) return; + if (handlePluginRequest && (await handlePluginRequest(req, res))) return; if (openAiChatCompletionsEnabled) { if (await handleOpenAiHttpRequest(req, res, { auth: resolvedAuth })) return; } diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index 34d6c0dda..45501ead5 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -10,12 +10,14 @@ import type { ChatAbortControllerEntry } from "./chat-abort.js"; import type { HooksConfigResolved } from "./hooks.js"; import { createGatewayHooksRequestHandler } from "./server/hooks.js"; import { listenGatewayHttpServer } from "./server/http-listen.js"; +import { createGatewayPluginRequestHandler } from "./server/plugins-http.js"; import type { GatewayWsClient } from "./server/ws-types.js"; import { createGatewayBroadcaster } from "./server-broadcast.js"; import { type ChatRunEntry, createChatRunState } from "./server-chat.js"; import { MAX_PAYLOAD_BYTES } from "./server-constants.js"; import { attachGatewayUpgradeHandler, createGatewayHttpServer } from "./server-http.js"; import type { DedupeEntry } from "./server-shared.js"; +import type { PluginRegistry } from "../plugins/registry.js"; export async function createGatewayRuntimeState(params: { cfg: { @@ -28,12 +30,14 @@ export async function createGatewayRuntimeState(params: { openAiChatCompletionsEnabled: boolean; resolvedAuth: ResolvedGatewayAuth; hooksConfig: () => HooksConfigResolved | null; + pluginRegistry: PluginRegistry; deps: CliDeps; canvasRuntime: RuntimeEnv; canvasHostEnabled: boolean; allowCanvasHostInTests?: boolean; logCanvas: { info: (msg: string) => void; warn: (msg: string) => void }; logHooks: ReturnType; + logPlugins: ReturnType; }): Promise<{ canvasHost: CanvasHostHandler | null; httpServer: HttpServer; @@ -89,12 +93,18 @@ export async function createGatewayRuntimeState(params: { logHooks: params.logHooks, }); + const handlePluginRequest = createGatewayPluginRequestHandler({ + registry: params.pluginRegistry, + log: params.logPlugins, + }); + const httpServer = createGatewayHttpServer({ canvasHost, controlUiEnabled: params.controlUiEnabled, controlUiBasePath: params.controlUiBasePath, openAiChatCompletionsEnabled: params.openAiChatCompletionsEnabled, handleHooksRequest, + handlePluginRequest, resolvedAuth: params.resolvedAuth, }); diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 5524a54bd..a59018b29 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -67,6 +67,7 @@ const logHealth = log.child("health"); const logCron = log.child("cron"); const logReload = log.child("reload"); const logHooks = log.child("hooks"); +const logPlugins = log.child("plugins"); const logWsControl = log.child("ws"); const canvasRuntime = runtimeForLogger(logCanvas); @@ -222,12 +223,14 @@ export async function startGatewayServer( openAiChatCompletionsEnabled, resolvedAuth, hooksConfig: () => hooksConfig, + pluginRegistry, deps, canvasRuntime, canvasHostEnabled, allowCanvasHostInTests: opts.allowCanvasHostInTests, logCanvas, logHooks, + logPlugins, }); let bonjourStop: (() => Promise) | null = null; let bridge: import("../infra/bridge/server.js").NodeBridgeServer | null = null; diff --git a/src/gateway/server/__tests__/test-utils.ts b/src/gateway/server/__tests__/test-utils.ts new file mode 100644 index 000000000..1a82c625d --- /dev/null +++ b/src/gateway/server/__tests__/test-utils.ts @@ -0,0 +1,22 @@ +import type { PluginRegistry } from "../../../plugins/registry.js"; + +export const createTestRegistry = ( + overrides: Partial = {}, +): PluginRegistry => { + const base: PluginRegistry = { + plugins: [], + tools: [], + channels: [], + gatewayHandlers: {}, + httpHandlers: [], + cliRegistrars: [], + services: [], + diagnostics: [], + }; + const merged = { ...base, ...overrides }; + return { + ...merged, + gatewayHandlers: merged.gatewayHandlers ?? {}, + httpHandlers: merged.httpHandlers ?? [], + }; +}; diff --git a/src/gateway/server/plugins-http.test.ts b/src/gateway/server/plugins-http.test.ts new file mode 100644 index 000000000..63707ebfd --- /dev/null +++ b/src/gateway/server/plugins-http.test.ts @@ -0,0 +1,77 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; +import { describe, expect, it, vi } from "vitest"; + +import { createGatewayPluginRequestHandler } from "./plugins-http.js"; +import { createTestRegistry } from "./__tests__/test-utils.js"; + +const makeResponse = () => + ({ + headersSent: false, + statusCode: 200, + setHeader: vi.fn(), + end: vi.fn(), + }) as unknown as ServerResponse; + +describe("createGatewayPluginRequestHandler", () => { + it("returns false when no handlers are registered", async () => { + const log = { warn: vi.fn() } as unknown as Parameters< + typeof createGatewayPluginRequestHandler + >[0]["log"]; + const handler = createGatewayPluginRequestHandler({ + registry: createTestRegistry(), + log, + }); + const handled = await handler({} as IncomingMessage, makeResponse()); + expect(handled).toBe(false); + }); + + it("continues until a handler reports it handled the request", async () => { + const first = vi.fn(async () => false); + const second = vi.fn(async () => true); + const handler = createGatewayPluginRequestHandler({ + registry: createTestRegistry({ + httpHandlers: [ + { pluginId: "first", handler: first, source: "first" }, + { pluginId: "second", handler: second, source: "second" }, + ], + }), + log: { warn: vi.fn() } as unknown as Parameters[0]["log"], + }); + + const handled = await handler({} as IncomingMessage, makeResponse()); + expect(handled).toBe(true); + expect(first).toHaveBeenCalledTimes(1); + expect(second).toHaveBeenCalledTimes(1); + }); + + it("logs and responds with 500 when a handler throws", async () => { + const log = { warn: vi.fn() } as unknown as Parameters< + typeof createGatewayPluginRequestHandler + >[0]["log"]; + const handler = createGatewayPluginRequestHandler({ + registry: createTestRegistry({ + httpHandlers: [ + { + pluginId: "boom", + handler: async () => { + throw new Error("boom"); + }, + source: "boom", + }, + ], + }), + log, + }); + + const res = makeResponse(); + const handled = await handler({} as IncomingMessage, res); + expect(handled).toBe(true); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("boom")); + expect(res.statusCode).toBe(500); + expect(res.setHeader).toHaveBeenCalledWith( + "Content-Type", + "text/plain; charset=utf-8", + ); + expect(res.end).toHaveBeenCalledWith("Internal Server Error"); + }); +}); diff --git a/src/gateway/server/plugins-http.ts b/src/gateway/server/plugins-http.ts new file mode 100644 index 000000000..956b2ea8f --- /dev/null +++ b/src/gateway/server/plugins-http.ts @@ -0,0 +1,36 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; + +import type { createSubsystemLogger } from "../../logging.js"; +import type { PluginRegistry } from "../../plugins/registry.js"; + +type SubsystemLogger = ReturnType; + +export type PluginHttpRequestHandler = ( + req: IncomingMessage, + res: ServerResponse, +) => Promise; + +export function createGatewayPluginRequestHandler(params: { + registry: PluginRegistry; + log: SubsystemLogger; +}): PluginHttpRequestHandler { + const { registry, log } = params; + return async (req, res) => { + if (registry.httpHandlers.length === 0) return false; + for (const entry of registry.httpHandlers) { + try { + const handled = await entry.handler(req, res); + if (handled) return true; + } catch (err) { + log.warn(`plugin http handler failed (${entry.pluginId}): ${String(err)}`); + if (!res.headersSent) { + res.statusCode = 500; + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end("Internal Server Error"); + } + return true; + } + } + return false; + }; +} diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index 881982507..f48581c0f 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -142,4 +142,28 @@ describe("loadClawdbotPlugins", () => { expect(registry.channels.length).toBe(1); expect(registry.channels[0]?.plugin.id).toBe("demo"); }); + + it("registers http handlers", () => { + const plugin = writePlugin({ + id: "http-demo", + body: `export default function (api) { + api.registerHttpHandler(async () => false); +};`, + }); + + const registry = loadClawdbotPlugins({ + cache: false, + workspaceDir: plugin.dir, + config: { + plugins: { + load: { paths: [plugin.file] }, + allow: ["http-demo"], + }, + }, + }); + + expect(registry.httpHandlers.length).toBe(1); + expect(registry.httpHandlers[0]?.pluginId).toBe("http-demo"); + expect(registry.plugins[0]?.httpHandlers).toBe(1); + }); }); diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index c1df8f282..a3e8f3ec9 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -193,6 +193,7 @@ function createPluginRecord(params: { gatewayMethods: [], cliCommands: [], services: [], + httpHandlers: 0, configSchema: params.configSchema, configUiHints: undefined, }; diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index e9c040b98..2ad93f455 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -10,6 +10,7 @@ import type { ClawdbotPluginApi, ClawdbotPluginChannelRegistration, ClawdbotPluginCliRegistrar, + ClawdbotPluginHttpHandler, ClawdbotPluginService, ClawdbotPluginToolContext, ClawdbotPluginToolFactory, @@ -33,6 +34,12 @@ export type PluginCliRegistration = { source: string; }; +export type PluginHttpRegistration = { + pluginId: string; + handler: ClawdbotPluginHttpHandler; + source: string; +}; + export type PluginChannelRegistration = { pluginId: string; plugin: ChannelPlugin; @@ -62,6 +69,7 @@ export type PluginRecord = { gatewayMethods: string[]; cliCommands: string[]; services: string[]; + httpHandlers: number; configSchema: boolean; configUiHints?: Record; }; @@ -71,6 +79,7 @@ export type PluginRegistry = { tools: PluginToolRegistration[]; channels: PluginChannelRegistration[]; gatewayHandlers: GatewayRequestHandlers; + httpHandlers: PluginHttpRegistration[]; cliRegistrars: PluginCliRegistration[]; services: PluginServiceRegistration[]; diagnostics: PluginDiagnostic[]; @@ -87,6 +96,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { tools: [], channels: [], gatewayHandlers: {}, + httpHandlers: [], cliRegistrars: [], services: [], diagnostics: [], @@ -142,6 +152,18 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { record.gatewayMethods.push(trimmed); }; + const registerHttpHandler = ( + record: PluginRecord, + handler: ClawdbotPluginHttpHandler, + ) => { + record.httpHandlers += 1; + registry.httpHandlers.push({ + pluginId: record.id, + handler, + source: record.source, + }); + }; + const registerChannel = ( record: PluginRecord, registration: ClawdbotPluginChannelRegistration | ChannelPlugin, @@ -220,6 +242,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { pluginConfig: params.pluginConfig, logger: normalizeLogger(registryParams.logger), registerTool: (tool, opts) => registerTool(record, tool, opts), + registerHttpHandler: (handler) => registerHttpHandler(record, handler), registerChannel: (registration) => registerChannel(record, registration), registerGatewayMethod: (method, handler) => registerGatewayMethod(record, method, handler), registerCli: (registrar, opts) => registerCli(record, registrar, opts), diff --git a/src/plugins/types.ts b/src/plugins/types.ts index c4fe25842..f1726b45f 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -1,3 +1,4 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; import type { Command } from "commander"; import type { AnyAgentTool } from "../agents/tools/common.js"; @@ -58,6 +59,11 @@ export type ClawdbotPluginGatewayMethod = { handler: GatewayRequestHandler; }; +export type ClawdbotPluginHttpHandler = ( + req: IncomingMessage, + res: ServerResponse, +) => Promise | boolean; + export type ClawdbotPluginCliContext = { program: Command; config: ClawdbotConfig; @@ -112,6 +118,7 @@ export type ClawdbotPluginApi = { tool: AnyAgentTool | ClawdbotPluginToolFactory, opts?: { name?: string; names?: string[] }, ) => void; + registerHttpHandler: (handler: ClawdbotPluginHttpHandler) => void; registerChannel: (registration: ClawdbotPluginChannelRegistration | ChannelPlugin) => void; registerGatewayMethod: (method: string, handler: GatewayRequestHandler) => void; registerCli: (registrar: ClawdbotPluginCliRegistrar, opts?: { commands?: string[] }) => void;