refactor: migrate messaging plugins to sdk

This commit is contained in:
Peter Steinberger
2026-01-18 08:32:19 +00:00
parent 9241e21114
commit c5e19f5c67
63 changed files with 4082 additions and 376 deletions

View File

@@ -8,9 +8,15 @@ const mocks = vi.hoisted(() => ({
appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })),
}));
vi.mock("../../config/config.js", () => ({
loadConfig: () => ({}),
}));
vi.mock("../../config/config.js", async () => {
const actual = await vi.importActual<typeof import("../../config/config.js")>(
"../../config/config.js",
);
return {
...actual,
loadConfig: () => ({}),
};
});
vi.mock("../../channels/plugins/index.js", () => ({
getChannelPlugin: () => ({ outbound: {} }),

View File

@@ -2,6 +2,7 @@ import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/ind
import type { ChannelId } from "../../channels/plugins/types.js";
import { DEFAULT_CHAT_CHANNEL } from "../../channels/registry.js";
import { loadConfig } from "../../config/config.js";
import { createOutboundSendDeps } from "../../cli/deps.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import type { OutboundChannel } from "../../infra/outbound/targets.js";
@@ -15,7 +16,28 @@ import {
validateSendParams,
} from "../protocol/index.js";
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js";
type InflightResult = {
ok: boolean;
payload?: Record<string, unknown>;
error?: ReturnType<typeof errorShape>;
meta?: Record<string, unknown>;
};
const inflightByContext = new WeakMap<
GatewayRequestContext,
Map<string, Promise<InflightResult>>
>();
const getInflightMap = (context: GatewayRequestContext) => {
let inflight = inflightByContext.get(context);
if (!inflight) {
inflight = new Map();
inflightByContext.set(context, inflight);
}
return inflight;
};
export const sendHandlers: GatewayRequestHandlers = {
send: async ({ params, respond, context }) => {
@@ -42,13 +64,22 @@ export const sendHandlers: GatewayRequestHandlers = {
idempotencyKey: string;
};
const idem = request.idempotencyKey;
const cached = context.dedupe.get(`send:${idem}`);
const dedupeKey = `send:${idem}`;
const cached = context.dedupe.get(dedupeKey);
if (cached) {
respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
return;
}
const inflightMap = getInflightMap(context);
const inflight = inflightMap.get(dedupeKey);
if (inflight) {
const result = await inflight;
const meta = result.meta ? { ...result.meta, cached: true } : { cached: true };
respond(result.ok, result.payload, result.error, meta);
return;
}
const to = request.to.trim();
const message = request.message.trim();
const channelInput = typeof request.channel === "string" ? request.channel : undefined;
@@ -66,79 +97,99 @@ export const sendHandlers: GatewayRequestHandlers = {
typeof request.accountId === "string" && request.accountId.trim().length
? request.accountId.trim()
: undefined;
try {
const outboundChannel = channel as Exclude<OutboundChannel, "none">;
const plugin = getChannelPlugin(channel as ChannelId);
if (!plugin) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `unsupported channel: ${channel}`),
);
return;
}
const cfg = loadConfig();
const resolved = resolveOutboundTarget({
channel: outboundChannel,
to,
cfg,
accountId,
mode: "explicit",
});
if (!resolved.ok) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, String(resolved.error)));
return;
}
const results = await deliverOutboundPayloads({
cfg,
channel: outboundChannel,
to: resolved.to,
accountId,
payloads: [{ text: message, mediaUrl: request.mediaUrl }],
gifPlayback: request.gifPlayback,
mirror:
typeof request.sessionKey === "string" && request.sessionKey.trim()
? {
sessionKey: request.sessionKey.trim(),
agentId: resolveSessionAgentId({
sessionKey: request.sessionKey.trim(),
config: cfg,
}),
text: message,
mediaUrls: request.mediaUrl ? [request.mediaUrl] : undefined,
}
: undefined,
});
const outboundChannel = channel as Exclude<OutboundChannel, "none">;
const plugin = getChannelPlugin(channel as ChannelId);
if (!plugin) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `unsupported channel: ${channel}`),
);
return;
}
const result = results.at(-1);
if (!result) {
throw new Error("No delivery result");
const work = (async (): Promise<InflightResult> => {
try {
const cfg = loadConfig();
const resolved = resolveOutboundTarget({
channel: outboundChannel,
to,
cfg,
accountId,
mode: "explicit",
});
if (!resolved.ok) {
return {
ok: false,
error: errorShape(ErrorCodes.INVALID_REQUEST, String(resolved.error)),
meta: { channel },
};
}
const outboundDeps = context.deps ? createOutboundSendDeps(context.deps) : undefined;
const results = await deliverOutboundPayloads({
cfg,
channel: outboundChannel,
to: resolved.to,
accountId,
payloads: [{ text: message, mediaUrl: request.mediaUrl }],
gifPlayback: request.gifPlayback,
deps: outboundDeps,
mirror:
typeof request.sessionKey === "string" && request.sessionKey.trim()
? {
sessionKey: request.sessionKey.trim(),
agentId: resolveSessionAgentId({
sessionKey: request.sessionKey.trim(),
config: cfg,
}),
text: message,
mediaUrls: request.mediaUrl ? [request.mediaUrl] : undefined,
}
: undefined,
});
const result = results.at(-1);
if (!result) {
throw new Error("No delivery result");
}
const payload: Record<string, unknown> = {
runId: idem,
messageId: result.messageId,
channel,
};
if ("chatId" in result) payload.chatId = result.chatId;
if ("channelId" in result) payload.channelId = result.channelId;
if ("toJid" in result) payload.toJid = result.toJid;
if ("conversationId" in result) {
payload.conversationId = result.conversationId;
}
context.dedupe.set(dedupeKey, {
ts: Date.now(),
ok: true,
payload,
});
return {
ok: true,
payload,
meta: { channel },
};
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
context.dedupe.set(dedupeKey, {
ts: Date.now(),
ok: false,
error,
});
return { ok: false, error, meta: { channel, error: formatForLog(err) } };
}
const payload: Record<string, unknown> = {
runId: idem,
messageId: result.messageId,
channel,
};
if ("chatId" in result) payload.chatId = result.chatId;
if ("channelId" in result) payload.channelId = result.channelId;
if ("toJid" in result) payload.toJid = result.toJid;
if ("conversationId" in result) {
payload.conversationId = result.conversationId;
}
context.dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { channel });
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
context.dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: false,
error,
});
respond(false, undefined, error, { channel, error: formatForLog(err) });
})();
inflightMap.set(dedupeKey, work);
try {
const result = await work;
respond(result.ok, result.payload, result.error, result.meta);
} finally {
inflightMap.delete(dedupeKey);
}
},
poll: async ({ params, respond, context }) => {