feat(msteams): add config reload, DM policy, proper shutdown

- Add msteams to config-reload.ts (ProviderKind, ReloadAction, rules)
- Add msteams to PairingProvider for pairing code support
- Create conversation-store.ts for storing ConversationReference
- Implement DM policy check (disabled/pairing/open/allowlist)
- Fix WasMentioned to check actual bot mentions via entities
- Fix server shutdown by using custom Express server with httpServer.close()
- Pass authConfig to CloudAdapter for outbound call authentication
- Improve error logging with JSON serialization
This commit is contained in:
Onur
2026-01-07 23:36:30 +03:00
committed by Peter Steinberger
parent 1c73d45106
commit e0812f8c4d
5 changed files with 288 additions and 28 deletions

View File

@@ -17,7 +17,8 @@ export type ProviderKind =
| "discord" | "discord"
| "slack" | "slack"
| "signal" | "signal"
| "imessage"; | "imessage"
| "msteams";
export type GatewayReloadPlan = { export type GatewayReloadPlan = {
changedPaths: string[]; changedPaths: string[];
@@ -50,7 +51,8 @@ type ReloadAction =
| "restart-provider:discord" | "restart-provider:discord"
| "restart-provider:slack" | "restart-provider:slack"
| "restart-provider:signal" | "restart-provider:signal"
| "restart-provider:imessage"; | "restart-provider:imessage"
| "restart-provider:msteams";
const DEFAULT_RELOAD_SETTINGS: GatewayReloadSettings = { const DEFAULT_RELOAD_SETTINGS: GatewayReloadSettings = {
mode: "hybrid", mode: "hybrid",
@@ -75,6 +77,7 @@ const RELOAD_RULES: ReloadRule[] = [
{ prefix: "slack", kind: "hot", actions: ["restart-provider:slack"] }, { prefix: "slack", kind: "hot", actions: ["restart-provider:slack"] },
{ prefix: "signal", kind: "hot", actions: ["restart-provider:signal"] }, { prefix: "signal", kind: "hot", actions: ["restart-provider:signal"] },
{ prefix: "imessage", kind: "hot", actions: ["restart-provider:imessage"] }, { prefix: "imessage", kind: "hot", actions: ["restart-provider:imessage"] },
{ prefix: "msteams", kind: "hot", actions: ["restart-provider:msteams"] },
{ prefix: "identity", kind: "none" }, { prefix: "identity", kind: "none" },
{ prefix: "wizard", kind: "none" }, { prefix: "wizard", kind: "none" },
{ prefix: "logging", kind: "none" }, { prefix: "logging", kind: "none" },
@@ -212,6 +215,9 @@ export function buildGatewayReloadPlan(
case "restart-provider:imessage": case "restart-provider:imessage":
plan.restartProviders.add("imessage"); plan.restartProviders.add("imessage");
break; break;
case "restart-provider:msteams":
plan.restartProviders.add("msteams");
break;
default: default:
break; break;
} }

View File

@@ -0,0 +1,122 @@
/**
* Conversation store for MS Teams proactive messaging.
*
* Stores ConversationReference objects keyed by conversation ID so we can
* send proactive messages later (after the webhook turn has completed).
*/
import fs from "node:fs";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
/** Minimal ConversationReference shape for proactive messaging */
export type StoredConversationReference = {
/** Activity ID from the last message */
activityId?: string;
/** User who sent the message */
user?: { id?: string; name?: string; aadObjectId?: string };
/** Bot that received the message */
bot?: { id?: string; name?: string };
/** Conversation details */
conversation?: { id?: string; conversationType?: string; tenantId?: string };
/** Channel ID (usually "msteams") */
channelId?: string;
/** Service URL for sending messages back */
serviceUrl?: string;
/** Locale */
locale?: string;
};
type ConversationStoreData = {
version: 1;
conversations: Record<string, StoredConversationReference>;
};
const STORE_FILENAME = "msteams-conversations.json";
const MAX_CONVERSATIONS = 1000;
function resolveStorePath(): string {
const stateDir = resolveStateDir(process.env);
return path.join(stateDir, STORE_FILENAME);
}
async function readStore(): Promise<ConversationStoreData> {
try {
const raw = await fs.promises.readFile(resolveStorePath(), "utf-8");
const data = JSON.parse(raw) as ConversationStoreData;
if (data.version !== 1) {
return { version: 1, conversations: {} };
}
return data;
} catch {
return { version: 1, conversations: {} };
}
}
async function writeStore(data: ConversationStoreData): Promise<void> {
const filePath = resolveStorePath();
const dir = path.dirname(filePath);
await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 });
await fs.promises.writeFile(filePath, JSON.stringify(data, null, 2), "utf-8");
}
/**
* Save a conversation reference for later proactive messaging.
*/
export async function saveConversationReference(
conversationId: string,
reference: StoredConversationReference,
): Promise<void> {
const store = await readStore();
// Prune if over limit (keep most recent)
const keys = Object.keys(store.conversations);
if (keys.length >= MAX_CONVERSATIONS) {
const toRemove = keys.slice(0, keys.length - MAX_CONVERSATIONS + 1);
for (const key of toRemove) {
delete store.conversations[key];
}
}
store.conversations[conversationId] = reference;
await writeStore(store);
}
/**
* Get a stored conversation reference.
*/
export async function getConversationReference(
conversationId: string,
): Promise<StoredConversationReference | null> {
const store = await readStore();
return store.conversations[conversationId] ?? null;
}
/**
* List all stored conversation references.
*/
export async function listConversationReferences(): Promise<
Array<{ conversationId: string; reference: StoredConversationReference }>
> {
const store = await readStore();
return Object.entries(store.conversations).map(
([conversationId, reference]) => ({
conversationId,
reference,
}),
);
}
/**
* Remove a conversation reference.
*/
export async function removeConversationReference(
conversationId: string,
): Promise<boolean> {
const store = await readStore();
if (!(conversationId in store.conversations)) return false;
delete store.conversations[conversationId];
await writeStore(store);
return true;
}

View File

@@ -11,8 +11,16 @@ import type { ClawdbotConfig } from "../config/types.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { enqueueSystemEvent } from "../infra/system-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js"; import { getChildLogger } from "../logging.js";
import {
readProviderAllowFromStore,
upsertProviderPairingRequest,
} from "../pairing/pairing-store.js";
import { resolveAgentRoute } from "../routing/resolve-route.js"; import { resolveAgentRoute } from "../routing/resolve-route.js";
import type { RuntimeEnv } from "../runtime.js"; import type { RuntimeEnv } from "../runtime.js";
import {
saveConversationReference,
type StoredConversationReference,
} from "./conversation-store.js";
import { resolveMSTeamsCredentials } from "./token.js"; import { resolveMSTeamsCredentials } from "./token.js";
const log = getChildLogger({ name: "msteams" }); const log = getChildLogger({ name: "msteams" });
@@ -44,6 +52,11 @@ type TeamsActivity = {
channelId?: string; channelId?: string;
serviceUrl?: string; serviceUrl?: string;
membersAdded?: Array<{ id?: string; name?: string }>; membersAdded?: Array<{ id?: string; name?: string }>;
/** Entities including mentions */
entities?: Array<{
type?: string;
mentioned?: { id?: string; name?: string };
}>;
}; };
type TeamsTurnContext = { type TeamsTurnContext = {
@@ -93,9 +106,10 @@ export async function monitorMSTeamsProvider(
// Dynamic import to avoid loading SDK when provider is disabled // Dynamic import to avoid loading SDK when provider is disabled
const agentsHosting = await import("@microsoft/agents-hosting"); const agentsHosting = await import("@microsoft/agents-hosting");
const { startServer } = await import("@microsoft/agents-hosting-express"); const express = await import("express");
const { ActivityHandler } = agentsHosting; const { ActivityHandler, CloudAdapter, authorizeJWT, getAuthConfigWithDefaults } =
agentsHosting;
// Helper to deliver replies via Teams SDK // Helper to deliver replies via Teams SDK
async function deliverReplies(params: { async function deliverReplies(params: {
@@ -136,6 +150,16 @@ export async function monitorMSTeamsProvider(
return text.replace(/<at>.*?<\/at>/gi, "").trim(); return text.replace(/<at>.*?<\/at>/gi, "").trim();
} }
// Check if the bot was mentioned in the activity
function wasBotMentioned(activity: TeamsActivity): boolean {
const botId = activity.recipient?.id;
if (!botId) return false;
const entities = activity.entities ?? [];
return entities.some(
(e) => e.type === "mention" && e.mentioned?.id === botId,
);
}
// Handler for incoming messages // Handler for incoming messages
async function handleTeamsMessage(context: TeamsTurnContext) { async function handleTeamsMessage(context: TeamsTurnContext) {
const activity = context.activity; const activity = context.activity;
@@ -172,6 +196,25 @@ export async function monitorMSTeamsProvider(
const senderName = from.name ?? from.id; const senderName = from.name ?? from.id;
const senderId = from.aadObjectId ?? from.id; const senderId = from.aadObjectId ?? from.id;
// Save conversation reference for proactive messaging
const conversationRef: StoredConversationReference = {
activityId: activity.id,
user: { id: from.id, name: from.name, aadObjectId: from.aadObjectId },
bot: activity.recipient
? { id: activity.recipient.id, name: activity.recipient.name }
: undefined,
conversation: {
id: conversationId,
conversationType,
tenantId: conversation?.tenantId,
},
channelId: activity.channelId,
serviceUrl: activity.serviceUrl,
};
saveConversationReference(conversationId, conversationRef).catch((err) => {
log.debug("failed to save conversation reference", { error: String(err) });
});
// Build Teams-specific identifiers // Build Teams-specific identifiers
const teamsFrom = isDirectMessage const teamsFrom = isDirectMessage
? `msteams:${senderId}` ? `msteams:${senderId}`
@@ -202,6 +245,49 @@ export async function monitorMSTeamsProvider(
contextKey: `msteams:message:${conversationId}:${activity.id ?? "unknown"}`, contextKey: `msteams:message:${conversationId}:${activity.id ?? "unknown"}`,
}); });
// Check DM policy for direct messages
if (isDirectMessage && msteamsCfg) {
const dmPolicy = msteamsCfg.dmPolicy ?? "pairing";
const allowFrom = msteamsCfg.allowFrom ?? [];
if (dmPolicy === "disabled") {
log.debug("dropping dm (dms disabled)");
return;
}
if (dmPolicy !== "open") {
// Check allowlist - look up from config and pairing store
const storedAllowFrom = await readProviderAllowFromStore("msteams");
const effectiveAllowFrom = [
...allowFrom.map((v) => String(v).toLowerCase()),
...storedAllowFrom.map((v) => v.toLowerCase()),
];
const senderLower = senderId.toLowerCase();
const permitted = effectiveAllowFrom.some(
(entry) => entry === senderLower || entry === "*",
);
if (!permitted) {
if (dmPolicy === "pairing") {
const { code, created } = await upsertProviderPairingRequest({
provider: "msteams",
id: senderId,
meta: { name: senderName },
});
const msg = created
? `👋 Hi ${senderName}! To chat with me, please share this pairing code with my owner: **${code}**`
: `🔑 Your pairing code is: **${code}** — please share it with my owner to get access.`;
await context.sendActivity(msg);
log.info("sent pairing code", { senderId, code });
} else {
log.debug("dropping unauthorized dm", { senderId, dmPolicy });
}
return;
}
}
}
// Format the message body with envelope // Format the message body with envelope
const timestamp = parseTimestamp(activity.timestamp); const timestamp = parseTimestamp(activity.timestamp);
const body = formatAgentEnvelope({ const body = formatAgentEnvelope({
@@ -226,7 +312,7 @@ export async function monitorMSTeamsProvider(
Surface: "msteams" as const, Surface: "msteams" as const,
MessageSid: activity.id, MessageSid: activity.id,
Timestamp: timestamp?.getTime() ?? Date.now(), Timestamp: timestamp?.getTime() ?? Date.now(),
WasMentioned: !isDirectMessage, WasMentioned: isDirectMessage || wasBotMentioned(activity),
CommandAuthorized: true, CommandAuthorized: true,
OriginatingChannel: "msteams" as const, OriginatingChannel: "msteams" as const,
OriginatingTo: teamsTo, OriginatingTo: teamsTo,
@@ -260,9 +346,16 @@ export async function monitorMSTeamsProvider(
}); });
}, },
onError: (err, info) => { onError: (err, info) => {
const errMsg =
err instanceof Error
? err.message
: typeof err === "object"
? JSON.stringify(err)
: String(err);
runtime.error?.( runtime.error?.(
danger(`msteams ${info.kind} reply failed: ${String(err)}`), danger(`msteams ${info.kind} reply failed: ${errMsg}`),
); );
log.error("reply failed", { kind: info.kind, error: err });
}, },
onReplyStart: sendTypingIndicator, onReplyStart: sendTypingIndicator,
}); });
@@ -323,28 +416,57 @@ export async function monitorMSTeamsProvider(
await next(); await next();
}); });
// Auth configuration using the new SDK format // Auth configuration - use SDK's defaults merger
const authConfig = { const authConfig = getAuthConfigWithDefaults({
clientId: creds.appId, clientId: creds.appId,
clientSecret: creds.appPassword, clientSecret: creds.appPassword,
tenantId: creds.tenantId, tenantId: creds.tenantId,
});
// Create our own Express server (instead of using startServer) so we can control shutdown
// Pass authConfig to CloudAdapter so it can authenticate outbound calls
const adapter = new CloudAdapter(authConfig);
const expressApp = express.default();
expressApp.use(express.json());
expressApp.use(authorizeJWT(authConfig));
// Set up the messages endpoint - use configured path and /api/messages as fallback
const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const messageHandler = (req: any, res: any) => {
adapter.process(req, res, (context) => handler.run(context));
}; };
// Set env vars that startServer reads (it uses loadAuthConfigFromEnv internally) // Listen on configured path and /api/messages (standard Bot Framework path)
process.env.clientId = creds.appId; expressApp.post(configuredPath, messageHandler);
process.env.clientSecret = creds.appPassword; if (configuredPath !== "/api/messages") {
process.env.tenantId = creds.tenantId; expressApp.post("/api/messages", messageHandler);
process.env.PORT = String(port); }
// Start the server log.debug("listening on paths", {
const expressApp = startServer(handler, authConfig); primary: configuredPath,
fallback: "/api/messages",
});
log.info(`msteams provider started on port ${port}`); // Start listening and capture the HTTP server handle
const httpServer = expressApp.listen(port, () => {
log.info(`msteams provider started on port ${port}`);
});
httpServer.on("error", (err) => {
log.error("msteams server error", { error: String(err) });
});
const shutdown = async () => { const shutdown = async () => {
log.info("shutting down msteams provider"); log.info("shutting down msteams provider");
// Express app doesn't have a direct close method return new Promise<void>((resolve) => {
// The server is managed by startServer internally httpServer.close((err) => {
if (err) {
log.debug("msteams server close error", { error: String(err) });
}
resolve();
});
});
}; };
// Handle abort signal // Handle abort signal

View File

@@ -27,7 +27,8 @@ export type PairingProvider =
| "imessage" | "imessage"
| "discord" | "discord"
| "slack" | "slack"
| "whatsapp"; | "whatsapp"
| "msteams";
export type PairingRequest = { export type PairingRequest = {
id: string; id: string;
@@ -189,6 +190,7 @@ function normalizeAllowEntry(provider: PairingProvider, entry: string): string {
if (provider === "signal") return trimmed.replace(/^signal:/i, ""); if (provider === "signal") return trimmed.replace(/^signal:/i, "");
if (provider === "discord") return trimmed.replace(/^(discord|user):/i, ""); if (provider === "discord") return trimmed.replace(/^(discord|user):/i, "");
if (provider === "slack") return trimmed.replace(/^(slack|user):/i, ""); if (provider === "slack") return trimmed.replace(/^(slack|user):/i, "");
if (provider === "msteams") return trimmed.replace(/^(msteams|user):/i, "");
return trimmed; return trimmed;
} }

View File

@@ -840,11 +840,17 @@ Initial recommendation: support this type first; treat other attachment types as
- **Tailscale Funnel**: Must be running separately (`tailscale funnel 3978`) - doesn't work well as background task - **Tailscale Funnel**: Must be running separately (`tailscale funnel 3978`) - doesn't work well as background task
- **Auth errors (401)**: Expected when testing manually without Azure JWT - means endpoint is reachable - **Auth errors (401)**: Expected when testing manually without Azure JWT - means endpoint is reachable
### In Progress (2026-01-07 - Session 2) ### Completed (2026-01-07 - Session 2)
6.**Agent dispatch (sync)**: Wired inbound messages to `dispatchReplyFromConfig()` - replies sent via `context.sendActivity()` within turn 6.**Agent dispatch (sync)**: Wired inbound messages to `dispatchReplyFromConfig()` - replies sent via `context.sendActivity()` within turn
7.**Typing indicator**: Added typing indicator support via `sendActivities([{ type: "typing" }])` 7.**Typing indicator**: Added typing indicator support via `sendActivities([{ type: "typing" }])`
8.**Type system updates**: Added `msteams` to `TextChunkProvider`, `OriginatingChannelType`, and route-reply switch 8.**Type system updates**: Added `msteams` to `TextChunkProvider`, `OriginatingChannelType`, and route-reply switch
9.**@mention stripping**: Strip `<at>...</at>` HTML tags from message text
10.**Session key fix**: Remove `;messageid=...` suffix from conversation ID
11.**Config reload**: Added msteams to `config-reload.ts` (ProviderKind, ReloadAction, RELOAD_RULES)
12.**Pairing support**: Added msteams to PairingProvider type
13.**Conversation store**: Created `src/msteams/conversation-store.ts` for storing ConversationReference
14.**DM policy**: Implemented DM policy check with pairing support (disabled/pairing/open/allowlist)
### Implementation Notes ### Implementation Notes
@@ -868,13 +874,15 @@ await dispatchReplyFromConfig({ ctx: ctxPayload, cfg, dispatcher, replyOptions }
- `To`: `user:<userId>` (DM) or `conversation:<conversationId>` (group/channel) - `To`: `user:<userId>` (DM) or `conversation:<conversationId>` (group/channel)
- `ChatType`: `"direct"` | `"group"` | `"room"` based on conversation type - `ChatType`: `"direct"` | `"group"` | `"room"` based on conversation type
**DM Policy:**
- `dmPolicy: "disabled"` - Drop all DMs
- `dmPolicy: "open"` - Allow all DMs
- `dmPolicy: "pairing"` (default) - Require pairing code approval
- `dmPolicy: "allowlist"` - Only allow from `allowFrom` list
### Remaining ### Remaining
9. **Test full agent flow**: Send message in Teams → verify agent responds (not just echo) 15. **Proactive messaging**: For slow LLM responses, use stored ConversationReference to send async replies
10. **Conversation store**: Persist `ConversationReference` by `conversation.id` for proactive messaging 16. **Outbound CLI/gateway sends**: Implement `sendMessageMSTeams` properly; wire `clawdbot send --provider msteams`
11. **Proactive messaging**: For slow LLM responses, store reference and send replies asynchronously 17. **Media**: Implement inbound attachment download and outbound strategy
12. **Access control**: Implement DM policy + pairing (reuse existing pairing store) + mention gating in channels 18. **Docs + UI + Onboard**: Write `docs/providers/msteams.md`, add UI config form, update `clawdbot onboard`
13. **Config reload**: Add msteams to `config-reload.ts` restart rules
14. **Outbound CLI/gateway sends**: Implement `sendMessageMSTeams` properly; wire `clawdbot send --provider msteams`
15. **Media**: Implement inbound attachment download and outbound strategy
16. **Docs + UI + Onboard**: Write `docs/providers/msteams.md`, add UI config form, update `clawdbot onboard`