From e0812f8c4d7c516a0c8ffd1d02475ef9c5e56c92 Mon Sep 17 00:00:00 2001 From: Onur Date: Wed, 7 Jan 2026 23:36:30 +0300 Subject: [PATCH] feat(msteams): add config reload, DM policy, proper shutdown - Add msteams to config-reload.ts (ProviderKind, ReloadAction, rules) - Add msteams to PairingProvider for pairing code support - Create conversation-store.ts for storing ConversationReference - Implement DM policy check (disabled/pairing/open/allowlist) - Fix WasMentioned to check actual bot mentions via entities - Fix server shutdown by using custom Express server with httpServer.close() - Pass authConfig to CloudAdapter for outbound call authentication - Improve error logging with JSON serialization --- src/gateway/config-reload.ts | 10 +- src/msteams/conversation-store.ts | 122 ++++++++++++++++++++++ src/msteams/monitor.ts | 154 +++++++++++++++++++++++++--- src/pairing/pairing-store.ts | 4 +- tmp/msteams-implementation-guide.md | 26 +++-- 5 files changed, 288 insertions(+), 28 deletions(-) create mode 100644 src/msteams/conversation-store.ts diff --git a/src/gateway/config-reload.ts b/src/gateway/config-reload.ts index 72b459d4e..65303873a 100644 --- a/src/gateway/config-reload.ts +++ b/src/gateway/config-reload.ts @@ -17,7 +17,8 @@ export type ProviderKind = | "discord" | "slack" | "signal" - | "imessage"; + | "imessage" + | "msteams"; export type GatewayReloadPlan = { changedPaths: string[]; @@ -50,7 +51,8 @@ type ReloadAction = | "restart-provider:discord" | "restart-provider:slack" | "restart-provider:signal" - | "restart-provider:imessage"; + | "restart-provider:imessage" + | "restart-provider:msteams"; const DEFAULT_RELOAD_SETTINGS: GatewayReloadSettings = { mode: "hybrid", @@ -75,6 +77,7 @@ const RELOAD_RULES: ReloadRule[] = [ { prefix: "slack", kind: "hot", actions: ["restart-provider:slack"] }, { prefix: "signal", kind: "hot", actions: ["restart-provider:signal"] }, { prefix: "imessage", kind: "hot", actions: ["restart-provider:imessage"] }, + { prefix: "msteams", kind: "hot", actions: ["restart-provider:msteams"] }, { prefix: "identity", kind: "none" }, { prefix: "wizard", kind: "none" }, { prefix: "logging", kind: "none" }, @@ -212,6 +215,9 @@ export function buildGatewayReloadPlan( case "restart-provider:imessage": plan.restartProviders.add("imessage"); break; + case "restart-provider:msteams": + plan.restartProviders.add("msteams"); + break; default: break; } diff --git a/src/msteams/conversation-store.ts b/src/msteams/conversation-store.ts new file mode 100644 index 000000000..d1463d521 --- /dev/null +++ b/src/msteams/conversation-store.ts @@ -0,0 +1,122 @@ +/** + * Conversation store for MS Teams proactive messaging. + * + * Stores ConversationReference objects keyed by conversation ID so we can + * send proactive messages later (after the webhook turn has completed). + */ + +import fs from "node:fs"; +import path from "node:path"; + +import { resolveStateDir } from "../config/paths.js"; + +/** Minimal ConversationReference shape for proactive messaging */ +export type StoredConversationReference = { + /** Activity ID from the last message */ + activityId?: string; + /** User who sent the message */ + user?: { id?: string; name?: string; aadObjectId?: string }; + /** Bot that received the message */ + bot?: { id?: string; name?: string }; + /** Conversation details */ + conversation?: { id?: string; conversationType?: string; tenantId?: string }; + /** Channel ID (usually "msteams") */ + channelId?: string; + /** Service URL for sending messages back */ + serviceUrl?: string; + /** Locale */ + locale?: string; +}; + +type ConversationStoreData = { + version: 1; + conversations: Record; +}; + +const STORE_FILENAME = "msteams-conversations.json"; +const MAX_CONVERSATIONS = 1000; + +function resolveStorePath(): string { + const stateDir = resolveStateDir(process.env); + return path.join(stateDir, STORE_FILENAME); +} + +async function readStore(): Promise { + try { + const raw = await fs.promises.readFile(resolveStorePath(), "utf-8"); + const data = JSON.parse(raw) as ConversationStoreData; + if (data.version !== 1) { + return { version: 1, conversations: {} }; + } + return data; + } catch { + return { version: 1, conversations: {} }; + } +} + +async function writeStore(data: ConversationStoreData): Promise { + const filePath = resolveStorePath(); + const dir = path.dirname(filePath); + await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 }); + await fs.promises.writeFile(filePath, JSON.stringify(data, null, 2), "utf-8"); +} + +/** + * Save a conversation reference for later proactive messaging. + */ +export async function saveConversationReference( + conversationId: string, + reference: StoredConversationReference, +): Promise { + const store = await readStore(); + + // Prune if over limit (keep most recent) + const keys = Object.keys(store.conversations); + if (keys.length >= MAX_CONVERSATIONS) { + const toRemove = keys.slice(0, keys.length - MAX_CONVERSATIONS + 1); + for (const key of toRemove) { + delete store.conversations[key]; + } + } + + store.conversations[conversationId] = reference; + await writeStore(store); +} + +/** + * Get a stored conversation reference. + */ +export async function getConversationReference( + conversationId: string, +): Promise { + const store = await readStore(); + return store.conversations[conversationId] ?? null; +} + +/** + * List all stored conversation references. + */ +export async function listConversationReferences(): Promise< + Array<{ conversationId: string; reference: StoredConversationReference }> +> { + const store = await readStore(); + return Object.entries(store.conversations).map( + ([conversationId, reference]) => ({ + conversationId, + reference, + }), + ); +} + +/** + * Remove a conversation reference. + */ +export async function removeConversationReference( + conversationId: string, +): Promise { + const store = await readStore(); + if (!(conversationId in store.conversations)) return false; + delete store.conversations[conversationId]; + await writeStore(store); + return true; +} diff --git a/src/msteams/monitor.ts b/src/msteams/monitor.ts index 6b624f4ea..278073659 100644 --- a/src/msteams/monitor.ts +++ b/src/msteams/monitor.ts @@ -11,8 +11,16 @@ import type { ClawdbotConfig } from "../config/types.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; +import { + readProviderAllowFromStore, + upsertProviderPairingRequest, +} from "../pairing/pairing-store.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; import type { RuntimeEnv } from "../runtime.js"; +import { + saveConversationReference, + type StoredConversationReference, +} from "./conversation-store.js"; import { resolveMSTeamsCredentials } from "./token.js"; const log = getChildLogger({ name: "msteams" }); @@ -44,6 +52,11 @@ type TeamsActivity = { channelId?: string; serviceUrl?: string; membersAdded?: Array<{ id?: string; name?: string }>; + /** Entities including mentions */ + entities?: Array<{ + type?: string; + mentioned?: { id?: string; name?: string }; + }>; }; type TeamsTurnContext = { @@ -93,9 +106,10 @@ export async function monitorMSTeamsProvider( // Dynamic import to avoid loading SDK when provider is disabled const agentsHosting = await import("@microsoft/agents-hosting"); - const { startServer } = await import("@microsoft/agents-hosting-express"); + const express = await import("express"); - const { ActivityHandler } = agentsHosting; + const { ActivityHandler, CloudAdapter, authorizeJWT, getAuthConfigWithDefaults } = + agentsHosting; // Helper to deliver replies via Teams SDK async function deliverReplies(params: { @@ -136,6 +150,16 @@ export async function monitorMSTeamsProvider( return text.replace(/.*?<\/at>/gi, "").trim(); } + // Check if the bot was mentioned in the activity + function wasBotMentioned(activity: TeamsActivity): boolean { + const botId = activity.recipient?.id; + if (!botId) return false; + const entities = activity.entities ?? []; + return entities.some( + (e) => e.type === "mention" && e.mentioned?.id === botId, + ); + } + // Handler for incoming messages async function handleTeamsMessage(context: TeamsTurnContext) { const activity = context.activity; @@ -172,6 +196,25 @@ export async function monitorMSTeamsProvider( const senderName = from.name ?? from.id; const senderId = from.aadObjectId ?? from.id; + // Save conversation reference for proactive messaging + const conversationRef: StoredConversationReference = { + activityId: activity.id, + user: { id: from.id, name: from.name, aadObjectId: from.aadObjectId }, + bot: activity.recipient + ? { id: activity.recipient.id, name: activity.recipient.name } + : undefined, + conversation: { + id: conversationId, + conversationType, + tenantId: conversation?.tenantId, + }, + channelId: activity.channelId, + serviceUrl: activity.serviceUrl, + }; + saveConversationReference(conversationId, conversationRef).catch((err) => { + log.debug("failed to save conversation reference", { error: String(err) }); + }); + // Build Teams-specific identifiers const teamsFrom = isDirectMessage ? `msteams:${senderId}` @@ -202,6 +245,49 @@ export async function monitorMSTeamsProvider( contextKey: `msteams:message:${conversationId}:${activity.id ?? "unknown"}`, }); + // Check DM policy for direct messages + if (isDirectMessage && msteamsCfg) { + const dmPolicy = msteamsCfg.dmPolicy ?? "pairing"; + const allowFrom = msteamsCfg.allowFrom ?? []; + + if (dmPolicy === "disabled") { + log.debug("dropping dm (dms disabled)"); + return; + } + + if (dmPolicy !== "open") { + // Check allowlist - look up from config and pairing store + const storedAllowFrom = await readProviderAllowFromStore("msteams"); + const effectiveAllowFrom = [ + ...allowFrom.map((v) => String(v).toLowerCase()), + ...storedAllowFrom.map((v) => v.toLowerCase()), + ]; + + const senderLower = senderId.toLowerCase(); + const permitted = effectiveAllowFrom.some( + (entry) => entry === senderLower || entry === "*", + ); + + if (!permitted) { + if (dmPolicy === "pairing") { + const { code, created } = await upsertProviderPairingRequest({ + provider: "msteams", + id: senderId, + meta: { name: senderName }, + }); + const msg = created + ? `👋 Hi ${senderName}! To chat with me, please share this pairing code with my owner: **${code}**` + : `🔑 Your pairing code is: **${code}** — please share it with my owner to get access.`; + await context.sendActivity(msg); + log.info("sent pairing code", { senderId, code }); + } else { + log.debug("dropping unauthorized dm", { senderId, dmPolicy }); + } + return; + } + } + } + // Format the message body with envelope const timestamp = parseTimestamp(activity.timestamp); const body = formatAgentEnvelope({ @@ -226,7 +312,7 @@ export async function monitorMSTeamsProvider( Surface: "msteams" as const, MessageSid: activity.id, Timestamp: timestamp?.getTime() ?? Date.now(), - WasMentioned: !isDirectMessage, + WasMentioned: isDirectMessage || wasBotMentioned(activity), CommandAuthorized: true, OriginatingChannel: "msteams" as const, OriginatingTo: teamsTo, @@ -260,9 +346,16 @@ export async function monitorMSTeamsProvider( }); }, onError: (err, info) => { + const errMsg = + err instanceof Error + ? err.message + : typeof err === "object" + ? JSON.stringify(err) + : String(err); runtime.error?.( - danger(`msteams ${info.kind} reply failed: ${String(err)}`), + danger(`msteams ${info.kind} reply failed: ${errMsg}`), ); + log.error("reply failed", { kind: info.kind, error: err }); }, onReplyStart: sendTypingIndicator, }); @@ -323,28 +416,57 @@ export async function monitorMSTeamsProvider( await next(); }); - // Auth configuration using the new SDK format - const authConfig = { + // Auth configuration - use SDK's defaults merger + const authConfig = getAuthConfigWithDefaults({ clientId: creds.appId, clientSecret: creds.appPassword, tenantId: creds.tenantId, + }); + + // Create our own Express server (instead of using startServer) so we can control shutdown + // Pass authConfig to CloudAdapter so it can authenticate outbound calls + const adapter = new CloudAdapter(authConfig); + const expressApp = express.default(); + expressApp.use(express.json()); + expressApp.use(authorizeJWT(authConfig)); + + // Set up the messages endpoint - use configured path and /api/messages as fallback + const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages"; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const messageHandler = (req: any, res: any) => { + adapter.process(req, res, (context) => handler.run(context)); }; - // Set env vars that startServer reads (it uses loadAuthConfigFromEnv internally) - process.env.clientId = creds.appId; - process.env.clientSecret = creds.appPassword; - process.env.tenantId = creds.tenantId; - process.env.PORT = String(port); + // Listen on configured path and /api/messages (standard Bot Framework path) + expressApp.post(configuredPath, messageHandler); + if (configuredPath !== "/api/messages") { + expressApp.post("/api/messages", messageHandler); + } - // Start the server - const expressApp = startServer(handler, authConfig); + log.debug("listening on paths", { + primary: configuredPath, + fallback: "/api/messages", + }); - log.info(`msteams provider started on port ${port}`); + // Start listening and capture the HTTP server handle + const httpServer = expressApp.listen(port, () => { + log.info(`msteams provider started on port ${port}`); + }); + + httpServer.on("error", (err) => { + log.error("msteams server error", { error: String(err) }); + }); const shutdown = async () => { log.info("shutting down msteams provider"); - // Express app doesn't have a direct close method - // The server is managed by startServer internally + return new Promise((resolve) => { + httpServer.close((err) => { + if (err) { + log.debug("msteams server close error", { error: String(err) }); + } + resolve(); + }); + }); }; // Handle abort signal diff --git a/src/pairing/pairing-store.ts b/src/pairing/pairing-store.ts index f7428467b..718a7cedd 100644 --- a/src/pairing/pairing-store.ts +++ b/src/pairing/pairing-store.ts @@ -27,7 +27,8 @@ export type PairingProvider = | "imessage" | "discord" | "slack" - | "whatsapp"; + | "whatsapp" + | "msteams"; export type PairingRequest = { id: string; @@ -189,6 +190,7 @@ function normalizeAllowEntry(provider: PairingProvider, entry: string): string { if (provider === "signal") return trimmed.replace(/^signal:/i, ""); if (provider === "discord") return trimmed.replace(/^(discord|user):/i, ""); if (provider === "slack") return trimmed.replace(/^(slack|user):/i, ""); + if (provider === "msteams") return trimmed.replace(/^(msteams|user):/i, ""); return trimmed; } diff --git a/tmp/msteams-implementation-guide.md b/tmp/msteams-implementation-guide.md index 04a0889c5..f39052631 100644 --- a/tmp/msteams-implementation-guide.md +++ b/tmp/msteams-implementation-guide.md @@ -840,11 +840,17 @@ Initial recommendation: support this type first; treat other attachment types as - **Tailscale Funnel**: Must be running separately (`tailscale funnel 3978`) - doesn't work well as background task - **Auth errors (401)**: Expected when testing manually without Azure JWT - means endpoint is reachable -### In Progress (2026-01-07 - Session 2) +### Completed (2026-01-07 - Session 2) 6. ✅ **Agent dispatch (sync)**: Wired inbound messages to `dispatchReplyFromConfig()` - replies sent via `context.sendActivity()` within turn 7. ✅ **Typing indicator**: Added typing indicator support via `sendActivities([{ type: "typing" }])` 8. ✅ **Type system updates**: Added `msteams` to `TextChunkProvider`, `OriginatingChannelType`, and route-reply switch +9. ✅ **@mention stripping**: Strip `...` HTML tags from message text +10. ✅ **Session key fix**: Remove `;messageid=...` suffix from conversation ID +11. ✅ **Config reload**: Added msteams to `config-reload.ts` (ProviderKind, ReloadAction, RELOAD_RULES) +12. ✅ **Pairing support**: Added msteams to PairingProvider type +13. ✅ **Conversation store**: Created `src/msteams/conversation-store.ts` for storing ConversationReference +14. ✅ **DM policy**: Implemented DM policy check with pairing support (disabled/pairing/open/allowlist) ### Implementation Notes @@ -868,13 +874,15 @@ await dispatchReplyFromConfig({ ctx: ctxPayload, cfg, dispatcher, replyOptions } - `To`: `user:` (DM) or `conversation:` (group/channel) - `ChatType`: `"direct"` | `"group"` | `"room"` based on conversation type +**DM Policy:** +- `dmPolicy: "disabled"` - Drop all DMs +- `dmPolicy: "open"` - Allow all DMs +- `dmPolicy: "pairing"` (default) - Require pairing code approval +- `dmPolicy: "allowlist"` - Only allow from `allowFrom` list + ### Remaining -9. **Test full agent flow**: Send message in Teams → verify agent responds (not just echo) -10. **Conversation store**: Persist `ConversationReference` by `conversation.id` for proactive messaging -11. **Proactive messaging**: For slow LLM responses, store reference and send replies asynchronously -12. **Access control**: Implement DM policy + pairing (reuse existing pairing store) + mention gating in channels -13. **Config reload**: Add msteams to `config-reload.ts` restart rules -14. **Outbound CLI/gateway sends**: Implement `sendMessageMSTeams` properly; wire `clawdbot send --provider msteams` -15. **Media**: Implement inbound attachment download and outbound strategy -16. **Docs + UI + Onboard**: Write `docs/providers/msteams.md`, add UI config form, update `clawdbot onboard` +15. **Proactive messaging**: For slow LLM responses, use stored ConversationReference to send async replies +16. **Outbound CLI/gateway sends**: Implement `sendMessageMSTeams` properly; wire `clawdbot send --provider msteams` +17. **Media**: Implement inbound attachment download and outbound strategy +18. **Docs + UI + Onboard**: Write `docs/providers/msteams.md`, add UI config form, update `clawdbot onboard`