refactor: route channel runtime via plugin api

This commit is contained in:
Peter Steinberger
2026-01-18 11:00:19 +00:00
parent 676d41d415
commit ee6e534ccb
82 changed files with 1253 additions and 3167 deletions

View File

@@ -1,25 +1,10 @@
import {
buildPendingHistoryContextFromMap,
clearHistoryEntries,
createInboundDebouncer,
danger,
DEFAULT_GROUP_HISTORY_LIMIT,
readChannelAllowFromStore,
recordSessionMetaFromInbound,
recordPendingHistoryEntry,
resolveAgentRoute,
resolveCommandAuthorizedFromAuthorizers,
resolveInboundDebounceMs,
resolveMentionGating,
resolveStorePath,
dispatchReplyFromConfig,
finalizeInboundContext,
formatAgentEnvelope,
formatAllowlistMatchMeta,
hasControlCommand,
logVerbose,
shouldLogVerbose,
upsertChannelPairingRequest,
type HistoryEntry,
} from "clawdbot/plugin-sdk";
@@ -50,6 +35,7 @@ import { createMSTeamsReplyDispatcher } from "../reply-dispatcher.js";
import { recordMSTeamsSentMessage, wasMSTeamsMessageSent } from "../sent-message-cache.js";
import type { MSTeamsTurnContext } from "../sdk-types.js";
import { resolveMSTeamsInboundMedia } from "./inbound-media.js";
import { getMSTeamsRuntime } from "../runtime.js";
export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
const {
@@ -64,6 +50,12 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
pollStore,
log,
} = deps;
const core = getMSTeamsRuntime();
const logVerboseMessage = (message: string) => {
if (core.logging.shouldLogVerbose()) {
log.debug(message);
}
};
const msteamsCfg = cfg.channels?.msteams;
const historyLimit = Math.max(
0,
@@ -72,7 +64,10 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
DEFAULT_GROUP_HISTORY_LIMIT,
);
const conversationHistories = new Map<string, HistoryEntry[]>();
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "msteams" });
const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({
cfg,
channel: "msteams",
});
type MSTeamsDebounceEntry = {
context: MSTeamsTurnContext;
@@ -126,7 +121,9 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
const senderName = from.name ?? from.id;
const senderId = from.aadObjectId ?? from.id;
const storedAllowFrom = await readChannelAllowFromStore("msteams").catch(() => []);
const storedAllowFrom = await core.channel.pairing
.readAllowFromStore("msteams")
.catch(() => []);
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
// Check DM policy for direct messages.
@@ -151,7 +148,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
if (!allowMatch.allowed) {
if (dmPolicy === "pairing") {
const request = await upsertChannelPairingRequest({
const request = await core.channel.pairing.upsertPairingRequest({
channel: "msteams",
id: senderId,
meta: { name: senderName },
@@ -254,15 +251,15 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
senderId,
senderName,
});
const commandAuthorized = resolveCommandAuthorizedFromAuthorizers({
const commandAuthorized = core.channel.commands.resolveCommandAuthorizedFromAuthorizers({
useAccessGroups,
authorizers: [
{ configured: effectiveDmAllowFrom.length > 0, allowed: ownerAllowedForCommands },
{ configured: effectiveGroupAllowFrom.length > 0, allowed: groupAllowedForCommands },
],
});
if (hasControlCommand(text, cfg) && !commandAuthorized) {
logVerbose(`msteams: drop control command from unauthorized sender ${senderId}`);
if (core.channel.text.hasControlCommand(text, cfg) && !commandAuthorized) {
logVerboseMessage(`msteams: drop control command from unauthorized sender ${senderId}`);
return;
}
@@ -329,7 +326,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
: `msteams:group:${conversationId}`;
const teamsTo = isDirectMessage ? `user:${senderId}` : `conversation:${conversationId}`;
const route = resolveAgentRoute({
const route = core.channel.routing.resolveAgentRoute({
cfg,
channel: "msteams",
peer: {
@@ -343,7 +340,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
? `Teams DM from ${senderName}`
: `Teams message in ${conversationType} from ${senderName}`;
enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
core.system.enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
sessionKey: route.sessionKey,
contextKey: `msteams:message:${conversationId}:${activity.id ?? "unknown"}`,
});
@@ -409,7 +406,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
const mediaPayload = buildMSTeamsMediaPayload(mediaList);
const envelopeFrom = isDirectMessage ? senderName : conversationType;
const body = formatAgentEnvelope({
const body = core.channel.reply.formatAgentEnvelope({
channel: "Teams",
from: envelopeFrom,
timestamp,
@@ -425,7 +422,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
limit: historyLimit,
currentMessage: combinedBody,
formatEntry: (entry) =>
formatAgentEnvelope({
core.channel.reply.formatAgentEnvelope({
channel: "Teams",
from: conversationType,
timestamp: entry.timestamp,
@@ -434,7 +431,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
});
}
const ctxPayload = finalizeInboundContext({
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: combinedBody,
RawBody: rawBody,
CommandBody: rawBody,
@@ -458,20 +455,18 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
...mediaPayload,
});
const storePath = resolveStorePath(cfg.session?.store, {
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
void recordSessionMetaFromInbound({
void core.channel.session.recordSessionMetaFromInbound({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`msteams: failed updating session meta: ${String(err)}`);
logVerboseMessage(`msteams: failed updating session meta: ${String(err)}`);
});
if (shouldLogVerbose()) {
logVerbose(`msteams inbound: from=${ctxPayload.From} preview="${preview}"`);
}
logVerboseMessage(`msteams inbound: from=${ctxPayload.From} preview="${preview}"`);
const { dispatcher, replyOptions, markDispatchIdle } = createMSTeamsReplyDispatcher({
cfg,
@@ -493,7 +488,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
log.info("dispatching to agent", { sessionKey: route.sessionKey });
try {
const { queuedFinal, counts } = await dispatchReplyFromConfig({
const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
@@ -513,18 +508,16 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
}
return;
}
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(
`msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`,
);
}
const finalCount = counts.final;
logVerboseMessage(
`msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`,
);
if (isRoomish && historyKey && historyLimit > 0) {
clearHistoryEntries({ historyMap: conversationHistories, historyKey });
}
} catch (err) {
log.error("dispatch failed", { error: String(err) });
runtime.error?.(danger(`msteams dispatch failed: ${String(err)}`));
runtime.error?.(`msteams dispatch failed: ${String(err)}`);
try {
await context.sendActivity(
`⚠️ Agent failed: ${err instanceof Error ? err.message : String(err)}`,
@@ -535,7 +528,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
}
};
const inboundDebouncer = createInboundDebouncer<MSTeamsDebounceEntry>({
const inboundDebouncer = core.channel.debounce.createInboundDebouncer<MSTeamsDebounceEntry>({
debounceMs: inboundDebounceMs,
buildKey: (entry) => {
const conversationId = normalizeMSTeamsConversationId(
@@ -549,7 +542,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
shouldDebounce: (entry) => {
if (!entry.text.trim()) return false;
if (entry.attachments.length > 0) return false;
return !hasControlCommand(entry.text, cfg);
return !core.channel.text.hasControlCommand(entry.text, cfg);
},
onFlush: async (entries) => {
const last = entries.at(-1);
@@ -579,7 +572,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
});
},
onError: (err) => {
runtime.error?.(danger(`msteams debounce flush failed: ${String(err)}`));
runtime.error?.(`msteams debounce flush failed: ${String(err)}`);
},
});