From 269a3c4000cae58f3953696c3d511ce0238a9baa Mon Sep 17 00:00:00 2001 From: Onur Date: Thu, 8 Jan 2026 03:22:16 +0300 Subject: [PATCH] feat(msteams): add outbound sends and fix reply delivery - Add sendMessageMSTeams for proactive messaging via CLI/gateway - Wire msteams into outbound delivery, heartbeat targets, and gateway send - Fix reply delivery to use SDK's getConversationReference() for proper bot info, avoiding "Activity Recipient undefined" errors - Use proactive messaging for replies to post as top-level messages (not threaded) by omitting activityId from conversation reference - Add lazy logger in send.ts to avoid test initialization issues --- src/cli/deps.ts | 3 + src/gateway/server-methods/send.ts | 21 +++ src/gateway/server.ts | 7 + src/infra/outbound/deliver.ts | 27 +++- src/infra/outbound/targets.ts | 18 ++- src/msteams/monitor.ts | 66 +++++++-- src/msteams/send.ts | 231 +++++++++++++++++++++++++++-- 7 files changed, 340 insertions(+), 33 deletions(-) diff --git a/src/cli/deps.ts b/src/cli/deps.ts index 41a1118d0..aab4366c2 100644 --- a/src/cli/deps.ts +++ b/src/cli/deps.ts @@ -1,5 +1,6 @@ import { sendMessageDiscord } from "../discord/send.js"; import { sendMessageIMessage } from "../imessage/send.js"; +import { sendMessageMSTeams } from "../msteams/send.js"; import { logWebSelfId, sendMessageWhatsApp } from "../providers/web/index.js"; import { sendMessageSignal } from "../signal/send.js"; import { sendMessageSlack } from "../slack/send.js"; @@ -12,6 +13,7 @@ export type CliDeps = { sendMessageSlack: typeof sendMessageSlack; sendMessageSignal: typeof sendMessageSignal; sendMessageIMessage: typeof sendMessageIMessage; + sendMessageMSTeams: typeof sendMessageMSTeams; }; export function createDefaultDeps(): CliDeps { @@ -22,6 +24,7 @@ export function createDefaultDeps(): CliDeps { sendMessageSlack, sendMessageSignal, sendMessageIMessage, + sendMessageMSTeams, }; } diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 7da830bbf..ce0509122 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -2,6 +2,7 @@ import { loadConfig } from "../../config/config.js"; import { sendMessageDiscord, sendPollDiscord } from "../../discord/index.js"; import { shouldLogVerbose } from "../../globals.js"; import { sendMessageIMessage } from "../../imessage/index.js"; +import { sendMessageMSTeams } from "../../msteams/send.js"; import { sendMessageSignal } from "../../signal/index.js"; import { sendMessageSlack } from "../../slack/send.js"; import { sendMessageTelegram } from "../../telegram/send.js"; @@ -141,6 +142,26 @@ export const sendHandlers: GatewayRequestHandlers = { payload, }); respond(true, payload, undefined, { provider }); + } else if (provider === "msteams") { + const cfg = loadConfig(); + const result = await sendMessageMSTeams({ + cfg, + to, + text: message, + mediaUrl: request.mediaUrl, + }); + const payload = { + runId: idem, + messageId: result.messageId, + conversationId: result.conversationId, + provider, + }; + context.dedupe.set(`send:${idem}`, { + ts: Date.now(), + ok: true, + payload, + }); + respond(true, payload, undefined, { provider }); } else { const cfg = loadConfig(); const targetAccountId = diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 3c244d1ba..5b2b70a5e 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -1964,6 +1964,13 @@ export async function startGatewayServer( startIMessageProvider, ); } + if (plan.restartProviders.has("msteams")) { + await restartProvider( + "msteams", + stopMSTeamsProvider, + startMSTeamsProvider, + ); + } } } diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 7b915715d..f38576f5f 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -7,6 +7,7 @@ import type { ReplyPayload } from "../../auto-reply/types.js"; import type { ClawdbotConfig } from "../../config/config.js"; import { sendMessageDiscord } from "../../discord/send.js"; import { sendMessageIMessage } from "../../imessage/send.js"; +import { sendMessageMSTeams } from "../../msteams/send.js"; import { normalizeAccountId } from "../../routing/session-key.js"; import { sendMessageSignal } from "../../signal/send.js"; import { sendMessageSlack } from "../../slack/send.js"; @@ -28,6 +29,11 @@ export type OutboundSendDeps = { sendSlack?: typeof sendMessageSlack; sendSignal?: typeof sendMessageSignal; sendIMessage?: typeof sendMessageIMessage; + sendMSTeams?: ( + to: string, + text: string, + opts?: { mediaUrl?: string }, + ) => Promise<{ messageId: string; conversationId: string }>; }; export type OutboundDeliveryResult = @@ -36,7 +42,8 @@ export type OutboundDeliveryResult = | { provider: "discord"; messageId: string; channelId: string } | { provider: "slack"; messageId: string; channelId: string } | { provider: "signal"; messageId: string; timestamp?: number } - | { provider: "imessage"; messageId: string }; + | { provider: "imessage"; messageId: string } + | { provider: "msteams"; messageId: string; conversationId: string }; type Chunker = (text: string, limit: number) => string[]; @@ -50,6 +57,7 @@ const providerCaps: Record< slack: { chunker: null }, signal: { chunker: chunkText }, imessage: { chunker: chunkText }, + msteams: { chunker: chunkMarkdownText }, }; type ProviderHandler = { @@ -204,6 +212,17 @@ function createProviderHandler(params: { })), }), }, + msteams: { + chunker: providerCaps.msteams.chunker, + sendText: async (text) => ({ + provider: "msteams", + ...(await deps.sendMSTeams(to, text)), + }), + sendMedia: async (caption, mediaUrl) => ({ + provider: "msteams", + ...(await deps.sendMSTeams(to, caption, { mediaUrl })), + }), + }, }; return handlers[params.provider]; @@ -222,6 +241,11 @@ export async function deliverOutboundPayloads(params: { }): Promise { const { cfg, provider, to, payloads } = params; const accountId = params.accountId; + const defaultSendMSTeams = async ( + to: string, + text: string, + opts?: { mediaUrl?: string }, + ) => sendMessageMSTeams({ cfg, to, text, mediaUrl: opts?.mediaUrl }); const deps = { sendWhatsApp: params.deps?.sendWhatsApp ?? sendMessageWhatsApp, sendTelegram: params.deps?.sendTelegram ?? sendMessageTelegram, @@ -229,6 +253,7 @@ export async function deliverOutboundPayloads(params: { sendSlack: params.deps?.sendSlack ?? sendMessageSlack, sendSignal: params.deps?.sendSignal ?? sendMessageSignal, sendIMessage: params.deps?.sendIMessage ?? sendMessageIMessage, + sendMSTeams: params.deps?.sendMSTeams ?? defaultSendMSTeams, }; const results: OutboundDeliveryResult[] = []; const handler = createProviderHandler({ diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index 59328a4d0..6d526f851 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -9,6 +9,7 @@ export type OutboundProvider = | "slack" | "signal" | "imessage" + | "msteams" | "none"; export type HeartbeatTarget = OutboundProvider | "last"; @@ -31,6 +32,7 @@ export function resolveOutboundTarget(params: { | "slack" | "signal" | "imessage" + | "msteams" | "webchat"; to?: string; allowFrom?: string[]; @@ -104,6 +106,17 @@ export function resolveOutboundTarget(params: { } return { ok: true, to: trimmed }; } + if (params.provider === "msteams") { + if (!trimmed) { + return { + ok: false, + error: new Error( + "Delivering to MS Teams requires --to ", + ), + }; + } + return { ok: true, to: trimmed }; + } return { ok: false, error: new Error( @@ -125,6 +138,7 @@ export function resolveHeartbeatDeliveryTarget(params: { rawTarget === "slack" || rawTarget === "signal" || rawTarget === "imessage" || + rawTarget === "msteams" || rawTarget === "none" || rawTarget === "last" ? rawTarget @@ -152,6 +166,7 @@ export function resolveHeartbeatDeliveryTarget(params: { | "slack" | "signal" | "imessage" + | "msteams" | undefined = target === "last" ? lastProvider @@ -160,7 +175,8 @@ export function resolveHeartbeatDeliveryTarget(params: { target === "discord" || target === "slack" || target === "signal" || - target === "imessage" + target === "imessage" || + target === "msteams" ? target : undefined; diff --git a/src/msteams/monitor.ts b/src/msteams/monitor.ts index 0a8b0217b..f6d90fd4e 100644 --- a/src/msteams/monitor.ts +++ b/src/msteams/monitor.ts @@ -96,6 +96,7 @@ export async function monitorMSTeamsProvider( log.error("msteams credentials not configured"); return { app: null, shutdown: async () => {} }; } + const appId = creds.appId; // Extract for use in closures const runtime: RuntimeEnv = opts.runtime ?? { log: console.log, @@ -117,34 +118,74 @@ export async function monitorMSTeamsProvider( const { ActivityHandler, CloudAdapter, authorizeJWT, getAuthConfigWithDefaults } = agentsHosting; - // Helper to deliver replies via Teams SDK + // Auth configuration - create early so adapter is available for deliverReplies + const authConfig = getAuthConfigWithDefaults({ + clientId: creds.appId, + clientSecret: creds.appPassword, + tenantId: creds.tenantId, + }); + const adapter = new CloudAdapter(authConfig); + + // Helper to deliver replies as top-level messages (not threaded) + // We use proactive messaging to avoid threading to the original message async function deliverReplies(params: { replies: ReplyPayload[]; - context: TeamsTurnContext; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + context: any; // TurnContext from SDK - has activity.getConversationReference() + adapter: InstanceType; + appId: string; }) { const chunkLimit = Math.min(textLimit, 4000); + + // Get conversation reference from SDK's activity (includes proper bot info) + // Then remove activityId to avoid threading + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const fullRef = params.context.activity.getConversationReference() as any; + const conversationRef = { + ...fullRef, + activityId: undefined, // Remove to post as top-level message, not thread + }; + // Also strip the messageid suffix from conversation.id if present + if (conversationRef.conversation?.id) { + conversationRef.conversation = { + ...conversationRef.conversation, + id: conversationRef.conversation.id.split(";")[0], + }; + } + for (const payload of params.replies) { const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); const text = payload.text ?? ""; if (!text && mediaList.length === 0) continue; + const sendMessage = async (message: string) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (params.adapter as any).continueConversation( + params.appId, + conversationRef, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + async (ctx: any) => { + await ctx.sendActivity({ type: "message", text: message }); + }, + ); + }; + if (mediaList.length === 0) { for (const chunk of chunkMarkdownText(text, chunkLimit)) { const trimmed = chunk.trim(); if (!trimmed || trimmed === SILENT_REPLY_TOKEN) continue; - await params.context.sendActivity(trimmed); + await sendMessage(trimmed); } } else { // For media, send text first then media URLs as separate messages if (text.trim() && text.trim() !== SILENT_REPLY_TOKEN) { for (const chunk of chunkMarkdownText(text, chunkLimit)) { - await params.context.sendActivity(chunk); + await sendMessage(chunk); } } for (const mediaUrl of mediaList) { - // Teams supports adaptive cards for rich media, but for now just send URL - await params.context.sendActivity(mediaUrl); + await sendMessage(mediaUrl); } } } @@ -377,6 +418,8 @@ export async function monitorMSTeamsProvider( await deliverReplies({ replies: [payload], context, + adapter, + appId, }); }, onError: (err, info) => { @@ -450,16 +493,7 @@ export async function monitorMSTeamsProvider( await next(); }); - // Auth configuration - use SDK's defaults merger - const authConfig = getAuthConfigWithDefaults({ - clientId: creds.appId, - clientSecret: creds.appPassword, - tenantId: creds.tenantId, - }); - - // Create our own Express server (instead of using startServer) so we can control shutdown - // Pass authConfig to CloudAdapter so it can authenticate outbound calls - const adapter = new CloudAdapter(authConfig); + // Create Express server const expressApp = express.default(); expressApp.use(express.json()); expressApp.use(authorizeJWT(authConfig)); diff --git a/src/msteams/send.ts b/src/msteams/send.ts index 3e62c75f7..0daf2a7c1 100644 --- a/src/msteams/send.ts +++ b/src/msteams/send.ts @@ -1,25 +1,226 @@ -import type { MSTeamsConfig } from "../config/types.js"; -import { getChildLogger } from "../logging.js"; +import type { ClawdbotConfig } from "../config/types.js"; +import type { getChildLogger as getChildLoggerFn } from "../logging.js"; +import { + getConversationReference, + listConversationReferences, + type StoredConversationReference, +} from "./conversation-store.js"; +import { resolveMSTeamsCredentials } from "./token.js"; -const log = getChildLogger({ name: "msteams:send" }); +// Lazy logger to avoid initialization order issues in tests +let _log: ReturnType | undefined; +const getLog = (): ReturnType => { + if (!_log) { + // Dynamic import to defer initialization + // eslint-disable-next-line @typescript-eslint/no-require-imports + const { getChildLogger } = require("../logging.js") as { + getChildLogger: typeof getChildLoggerFn; + }; + _log = getChildLogger({ name: "msteams:send" }); + } + return _log; +}; export type SendMSTeamsMessageParams = { - cfg: MSTeamsConfig; - conversationId: string; + /** Full config (for credentials) */ + cfg: ClawdbotConfig; + /** Conversation ID or user ID to send to */ + to: string; + /** Message text */ text: string; - serviceUrl: string; + /** Optional media URL */ + mediaUrl?: string; }; export type SendMSTeamsMessageResult = { - ok: boolean; - messageId?: string; - error?: string; + messageId: string; + conversationId: string; }; -export async function sendMessageMSTeams( - _params: SendMSTeamsMessageParams, -): Promise { - // TODO: Implement using CloudAdapter.continueConversationAsync - log.warn("sendMessageMSTeams not yet implemented"); - return { ok: false, error: "not implemented" }; +/** + * Parse the --to argument into a conversation reference lookup key. + * Supported formats: + * - conversation:19:abc@thread.tacv2 → lookup by conversation ID + * - user:aad-object-id → lookup by user AAD object ID + * - 19:abc@thread.tacv2 → direct conversation ID + */ +function parseRecipient(to: string): { + type: "conversation" | "user"; + id: string; +} { + const trimmed = to.trim(); + if (trimmed.startsWith("conversation:")) { + return { type: "conversation", id: trimmed.slice("conversation:".length) }; + } + if (trimmed.startsWith("user:")) { + return { type: "user", id: trimmed.slice("user:".length) }; + } + // Assume it's a conversation ID if it looks like one + if (trimmed.startsWith("19:") || trimmed.includes("@thread")) { + return { type: "conversation", id: trimmed }; + } + // Otherwise treat as user ID + return { type: "user", id: trimmed }; +} + +/** + * Find a stored conversation reference for the given recipient. + */ +async function findConversationReference( + recipient: { type: "conversation" | "user"; id: string }, +): Promise<{ conversationId: string; ref: StoredConversationReference } | null> { + if (recipient.type === "conversation") { + const ref = await getConversationReference(recipient.id); + if (ref) return { conversationId: recipient.id, ref }; + return null; + } + + // Search by user AAD object ID + const all = await listConversationReferences(); + for (const { conversationId, reference } of all) { + if (reference.user?.aadObjectId === recipient.id) { + return { conversationId, ref: reference }; + } + if (reference.user?.id === recipient.id) { + return { conversationId, ref: reference }; + } + } + return null; +} + +// Type matching @microsoft/agents-activity ConversationReference +type ConversationReferenceShape = { + activityId?: string; + user?: { id: string; name?: string }; + bot?: { id: string; name?: string }; + conversation: { id: string; conversationType?: string; tenantId?: string }; + channelId: string; + serviceUrl?: string; + locale?: string; +}; + +/** + * Build a Bot Framework ConversationReference from our stored format. + * Note: activityId is intentionally omitted so proactive messages post as + * top-level messages rather than replies/threads. + */ +function buildConversationReference( + ref: StoredConversationReference, +): ConversationReferenceShape { + if (!ref.conversation?.id) { + throw new Error("Invalid stored reference: missing conversation.id"); + } + return { + // activityId omitted to avoid creating reply threads + user: ref.user?.id ? { id: ref.user.id, name: ref.user.name } : undefined, + bot: ref.bot?.id ? { id: ref.bot.id, name: ref.bot.name } : undefined, + conversation: { + id: ref.conversation.id, + conversationType: ref.conversation.conversationType, + tenantId: ref.conversation.tenantId, + }, + channelId: ref.channelId ?? "msteams", + serviceUrl: ref.serviceUrl, + locale: ref.locale, + }; +} + +/** + * Send a message to a Teams conversation or user. + * + * Uses the stored ConversationReference from previous interactions. + * The bot must have received at least one message from the conversation + * before proactive messaging works. + */ +export async function sendMessageMSTeams( + params: SendMSTeamsMessageParams, +): Promise { + const { cfg, to, text, mediaUrl } = params; + const msteamsCfg = cfg.msteams; + + if (!msteamsCfg?.enabled) { + throw new Error("msteams provider is not enabled"); + } + + const creds = resolveMSTeamsCredentials(msteamsCfg); + if (!creds) { + throw new Error("msteams credentials not configured"); + } + + // Parse recipient and find conversation reference + const recipient = parseRecipient(to); + const found = await findConversationReference(recipient); + + if (!found) { + throw new Error( + `No conversation reference found for ${recipient.type}:${recipient.id}. ` + + `The bot must receive a message from this conversation before it can send proactively.`, + ); + } + + const { conversationId, ref } = found; + const conversationRef = buildConversationReference(ref); + + getLog().debug("sending proactive message", { + conversationId, + textLength: text.length, + hasMedia: Boolean(mediaUrl), + }); + + // Dynamic import to avoid loading SDK when not needed + const agentsHosting = await import("@microsoft/agents-hosting"); + const { CloudAdapter, getAuthConfigWithDefaults } = agentsHosting; + + const authConfig = getAuthConfigWithDefaults({ + clientId: creds.appId, + clientSecret: creds.appPassword, + tenantId: creds.tenantId, + }); + + const adapter = new CloudAdapter(authConfig); + + let messageId = "unknown"; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (adapter as any).continueConversation( + creds.appId, + conversationRef, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + async (context: any) => { + // Build the activity + const activity = { + type: "message", + text: mediaUrl ? (text ? `${text}\n\n${mediaUrl}` : mediaUrl) : text, + }; + const response = await context.sendActivity(activity); + if (response?.id) { + messageId = response.id; + } + }, + ); + + getLog().info("sent proactive message", { conversationId, messageId }); + + return { + messageId, + conversationId, + }; +} + +/** + * List all known conversation references (for debugging/CLI). + */ +export async function listMSTeamsConversations(): Promise< + Array<{ + conversationId: string; + userName?: string; + conversationType?: string; + }> +> { + const all = await listConversationReferences(); + return all.map(({ conversationId, reference }) => ({ + conversationId, + userName: reference.user?.name, + conversationType: reference.conversation?.conversationType, + })); }