feat: migrate zalo plugin to sdk

This commit is contained in:
Peter Steinberger
2026-01-18 03:34:02 +00:00
parent 5fa1a63978
commit b6d470a679
22 changed files with 182 additions and 654 deletions

View File

@@ -1,14 +1,17 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import type { ResolvedZaloAccount } from "./accounts.js";
import {
finalizeInboundContext,
formatAgentEnvelope,
isControlCommandMessage,
recordSessionMetaFromInbound,
resolveCommandAuthorizedFromAuthorizers,
resolveStorePath,
shouldComputeCommandAuthorized,
type ClawdbotConfig,
} from "clawdbot/plugin-sdk";
import type { ResolvedZaloAccount } from "./accounts.js";
import {
ZaloApiError,
deleteWebhook,
@@ -20,10 +23,8 @@ import {
type ZaloMessage,
type ZaloUpdate,
} from "./api.js";
import { zaloPlugin } from "./channel.js";
import { loadCoreChannelDeps } from "./core-bridge.js";
import { resolveZaloProxyFetch } from "./proxy.js";
import type { CoreConfig } from "./types.js";
import { getZaloRuntime } from "./runtime.js";
export type ZaloRuntimeEnv = {
log?: (message: string) => void;
@@ -33,7 +34,7 @@ export type ZaloRuntimeEnv = {
export type ZaloMonitorOptions = {
token: string;
account: ResolvedZaloAccount;
config: CoreConfig;
config: ClawdbotConfig;
runtime: ZaloRuntimeEnv;
abortSignal: AbortSignal;
useWebhook?: boolean;
@@ -51,9 +52,11 @@ export type ZaloMonitorResult = {
const ZALO_TEXT_LIMIT = 2000;
const DEFAULT_MEDIA_MAX_MB = 5;
function logVerbose(deps: Awaited<ReturnType<typeof loadCoreChannelDeps>>, message: string): void {
if (deps.shouldLogVerbose()) {
console.log(`[zalo] ${message}`);
type ZaloCoreRuntime = ReturnType<typeof getZaloRuntime>;
function logVerbose(core: ZaloCoreRuntime, runtime: ZaloRuntimeEnv, message: string): void {
if (core.logging.shouldLogVerbose()) {
runtime.log?.(`[zalo] ${message}`);
}
}
@@ -100,9 +103,9 @@ async function readJsonBody(req: IncomingMessage, maxBytes: number) {
type WebhookTarget = {
token: string;
account: ResolvedZaloAccount;
config: CoreConfig;
config: ClawdbotConfig;
runtime: ZaloRuntimeEnv;
deps: Awaited<ReturnType<typeof loadCoreChannelDeps>>;
core: ZaloCoreRuntime;
secret: string;
path: string;
mediaMaxMb: number;
@@ -207,7 +210,7 @@ export async function handleZaloWebhookRequest(
target.account,
target.config,
target.runtime,
target.deps,
target.core,
target.mediaMaxMb,
target.statusSink,
target.fetcher,
@@ -223,9 +226,9 @@ export async function handleZaloWebhookRequest(
function startPollingLoop(params: {
token: string;
account: ResolvedZaloAccount;
config: CoreConfig;
config: ClawdbotConfig;
runtime: ZaloRuntimeEnv;
deps: Awaited<ReturnType<typeof loadCoreChannelDeps>>;
core: ZaloCoreRuntime;
abortSignal: AbortSignal;
isStopped: () => boolean;
mediaMaxMb: number;
@@ -237,7 +240,7 @@ function startPollingLoop(params: {
account,
config,
runtime,
deps,
core,
abortSignal,
isStopped,
mediaMaxMb,
@@ -259,7 +262,7 @@ function startPollingLoop(params: {
account,
config,
runtime,
deps,
core,
mediaMaxMb,
statusSink,
fetcher,
@@ -286,9 +289,9 @@ async function processUpdate(
update: ZaloUpdate,
token: string,
account: ResolvedZaloAccount,
config: CoreConfig,
config: ClawdbotConfig,
runtime: ZaloRuntimeEnv,
deps: Awaited<ReturnType<typeof loadCoreChannelDeps>>,
core: ZaloCoreRuntime,
mediaMaxMb: number,
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
fetcher?: ZaloFetch,
@@ -304,7 +307,7 @@ async function processUpdate(
account,
config,
runtime,
deps,
core,
statusSink,
fetcher,
);
@@ -316,7 +319,7 @@ async function processUpdate(
account,
config,
runtime,
deps,
core,
mediaMaxMb,
statusSink,
fetcher,
@@ -337,9 +340,9 @@ async function handleTextMessage(
message: ZaloMessage,
token: string,
account: ResolvedZaloAccount,
config: CoreConfig,
config: ClawdbotConfig,
runtime: ZaloRuntimeEnv,
deps: Awaited<ReturnType<typeof loadCoreChannelDeps>>,
core: ZaloCoreRuntime,
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
fetcher?: ZaloFetch,
): Promise<void> {
@@ -352,7 +355,7 @@ async function handleTextMessage(
account,
config,
runtime,
deps,
core,
text,
mediaPath: undefined,
mediaType: undefined,
@@ -365,9 +368,9 @@ async function handleImageMessage(
message: ZaloMessage,
token: string,
account: ResolvedZaloAccount,
config: CoreConfig,
config: ClawdbotConfig,
runtime: ZaloRuntimeEnv,
deps: Awaited<ReturnType<typeof loadCoreChannelDeps>>,
core: ZaloCoreRuntime,
mediaMaxMb: number,
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
fetcher?: ZaloFetch,
@@ -380,8 +383,8 @@ async function handleImageMessage(
if (photo) {
try {
const maxBytes = mediaMaxMb * 1024 * 1024;
const fetched = await deps.fetchRemoteMedia({ url: photo });
const saved = await deps.saveMediaBuffer(
const fetched = await core.channel.media.fetchRemoteMedia({ url: photo });
const saved = await core.channel.media.saveMediaBuffer(
fetched.buffer,
fetched.contentType,
"inbound",
@@ -400,7 +403,7 @@ async function handleImageMessage(
account,
config,
runtime,
deps,
core,
text: caption,
mediaPath,
mediaType,
@@ -413,9 +416,9 @@ async function processMessageWithPipeline(params: {
message: ZaloMessage;
token: string;
account: ResolvedZaloAccount;
config: CoreConfig;
config: ClawdbotConfig;
runtime: ZaloRuntimeEnv;
deps: Awaited<ReturnType<typeof loadCoreChannelDeps>>;
core: ZaloCoreRuntime;
text?: string;
mediaPath?: string;
mediaType?: string;
@@ -428,7 +431,7 @@ async function processMessageWithPipeline(params: {
account,
config,
runtime,
deps,
core,
text,
mediaPath,
mediaType,
@@ -448,7 +451,7 @@ async function processMessageWithPipeline(params: {
const shouldComputeAuth = shouldComputeCommandAuthorized(rawBody, config);
const storeAllowFrom =
!isGroup && (dmPolicy !== "open" || shouldComputeAuth)
? await deps.readChannelAllowFromStore("zalo").catch(() => [])
? await core.channel.pairing.readAllowFromStore("zalo").catch(() => [])
: [];
const effectiveAllowFrom = [...configAllowFrom, ...storeAllowFrom];
const useAccessGroups = config.commands?.useAccessGroups !== false;
@@ -462,7 +465,7 @@ async function processMessageWithPipeline(params: {
if (!isGroup) {
if (dmPolicy === "disabled") {
logVerbose(deps, `Blocked zalo DM from ${senderId} (dmPolicy=disabled)`);
logVerbose(core, runtime, `Blocked zalo DM from ${senderId} (dmPolicy=disabled)`);
return;
}
@@ -471,21 +474,20 @@ async function processMessageWithPipeline(params: {
if (!allowed) {
if (dmPolicy === "pairing") {
const { code, created } = await deps.upsertChannelPairingRequest({
const { code, created } = await core.channel.pairing.upsertPairingRequest({
channel: "zalo",
id: senderId,
meta: { name: senderName ?? undefined },
pairingAdapter: zaloPlugin.pairing,
});
if (created) {
logVerbose(deps, `zalo pairing request sender=${senderId}`);
logVerbose(core, runtime, `zalo pairing request sender=${senderId}`);
try {
await sendMessage(
token,
{
chat_id: chatId,
text: deps.buildPairingReply({
text: core.channel.pairing.buildPairingReply({
channel: "zalo",
idLine: `Your Zalo user id: ${senderId}`,
code,
@@ -495,18 +497,26 @@ async function processMessageWithPipeline(params: {
);
statusSink?.({ lastOutboundAt: Date.now() });
} catch (err) {
logVerbose(deps, `zalo pairing reply failed for ${senderId}: ${String(err)}`);
logVerbose(
core,
runtime,
`zalo pairing reply failed for ${senderId}: ${String(err)}`,
);
}
}
} else {
logVerbose(deps, `Blocked unauthorized zalo sender ${senderId} (dmPolicy=${dmPolicy})`);
logVerbose(
core,
runtime,
`Blocked unauthorized zalo sender ${senderId} (dmPolicy=${dmPolicy})`,
);
}
return;
}
}
}
const route = deps.resolveAgentRoute({
const route = core.channel.routing.resolveAgentRoute({
cfg: config,
channel: "zalo",
accountId: account.accountId,
@@ -517,16 +527,14 @@ async function processMessageWithPipeline(params: {
});
if (isGroup && isControlCommandMessage(rawBody, config) && commandAuthorized !== true) {
logVerbose(deps, `zalo: drop control command from unauthorized sender ${senderId}`);
logVerbose(core, runtime, `zalo: drop control command from unauthorized sender ${senderId}`);
return;
}
const fromLabel = isGroup
? `group:${chatId}`
: senderName || `user:${senderId}`;
const body = deps.formatAgentEnvelope({
channel: "Zalo",
from: fromLabel,
const fromLabel = isGroup ? `group:${chatId}` : senderName || `user:${senderId}`;
const body = formatAgentEnvelope({
channel: "Zalo",
from: fromLabel,
timestamp: date ? date * 1000 : undefined,
body: rawBody,
});
@@ -565,7 +573,7 @@ async function processMessageWithPipeline(params: {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
});
await deps.dispatchReplyWithBufferedBlockDispatcher({
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: {
@@ -575,7 +583,7 @@ async function processMessageWithPipeline(params: {
token,
chatId,
runtime,
deps,
core,
statusSink,
fetcher,
});
@@ -592,11 +600,11 @@ async function deliverZaloReply(params: {
token: string;
chatId: string;
runtime: ZaloRuntimeEnv;
deps: Awaited<ReturnType<typeof loadCoreChannelDeps>>;
core: ZaloCoreRuntime;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
fetcher?: ZaloFetch;
}): Promise<void> {
const { payload, token, chatId, runtime, deps, statusSink, fetcher } = params;
const { payload, token, chatId, runtime, core, statusSink, fetcher } = params;
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
@@ -620,7 +628,7 @@ async function deliverZaloReply(params: {
}
if (payload.text) {
const chunks = deps.chunkMarkdownText(payload.text, ZALO_TEXT_LIMIT);
const chunks = core.channel.text.chunkMarkdownText(payload.text, ZALO_TEXT_LIMIT);
for (const chunk of chunks) {
try {
await sendMessage(token, { chat_id: chatId, text: chunk }, fetcher);
@@ -649,7 +657,7 @@ export async function monitorZaloProvider(
fetcher: fetcherOverride,
} = options;
const deps = await loadCoreChannelDeps();
const core = getZaloRuntime();
const effectiveMediaMaxMb = account.config.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB;
const fetcher = fetcherOverride ?? resolveZaloProxyFetch(account.config.proxy);
@@ -686,7 +694,7 @@ export async function monitorZaloProvider(
account,
config,
runtime,
deps,
core,
path,
secret: webhookSecret,
statusSink: (patch) => statusSink?.(patch),
@@ -715,7 +723,7 @@ export async function monitorZaloProvider(
account,
config,
runtime,
deps,
core,
abortSignal,
isStopped: () => stopped,
mediaMaxMb: effectiveMediaMaxMb,