refactor(msteams): split monitor handler and poll store

This commit is contained in:
Peter Steinberger
2026-01-09 10:27:06 +01:00
parent 475d598ecb
commit 8875dbd449
8 changed files with 819 additions and 583 deletions

View File

@@ -155,6 +155,20 @@ clawdbot message poll --provider discord \
--poll-multi --poll-duration-hours 48
```
Send a Teams proactive message:
```
clawdbot message send --provider msteams \
--to conversation:19:abc@thread.tacv2 --message "hi"
```
Create a Teams poll:
```
clawdbot message poll --provider msteams \
--to conversation:19:abc@thread.tacv2 \
--poll-question "Lunch?" \
--poll-option Pizza --poll-option Sushi
```
React in Slack:
```
clawdbot message react --provider slack \

View File

@@ -0,0 +1,77 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { sendMessage, sendPoll } from "./message.js";
const callGatewayMock = vi.fn();
vi.mock("../../gateway/call.js", () => ({
callGateway: (...args: unknown[]) => callGatewayMock(...args),
randomIdempotencyKey: () => "idem-1",
}));
describe("sendMessage provider normalization", () => {
beforeEach(() => {
callGatewayMock.mockReset();
});
it("normalizes Teams alias", async () => {
const sendMSTeams = vi.fn(async () => ({
messageId: "m1",
conversationId: "c1",
}));
const result = await sendMessage({
cfg: {},
to: "conversation:19:abc@thread.tacv2",
content: "hi",
provider: "teams",
deps: { sendMSTeams },
});
expect(sendMSTeams).toHaveBeenCalledWith(
"conversation:19:abc@thread.tacv2",
"hi",
);
expect(result.provider).toBe("msteams");
});
it("normalizes iMessage alias", async () => {
const sendIMessage = vi.fn(async () => ({ messageId: "i1" }));
const result = await sendMessage({
cfg: {},
to: "someone@example.com",
content: "hi",
provider: "imsg",
deps: { sendIMessage },
});
expect(sendIMessage).toHaveBeenCalledWith(
"someone@example.com",
"hi",
expect.any(Object),
);
expect(result.provider).toBe("imessage");
});
});
describe("sendPoll provider normalization", () => {
beforeEach(() => {
callGatewayMock.mockReset();
});
it("normalizes Teams alias for polls", async () => {
callGatewayMock.mockResolvedValueOnce({ messageId: "p1" });
const result = await sendPoll({
cfg: {},
to: "conversation:19:abc@thread.tacv2",
question: "Lunch?",
options: ["Pizza", "Sushi"],
provider: "Teams",
});
const call = callGatewayMock.mock.calls[0]?.[0] as {
params?: Record<string, unknown>;
};
expect(call?.params?.provider).toBe("msteams");
expect(result.provider).toBe("msteams");
});
});

View File

@@ -0,0 +1,547 @@
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import type { ClawdbotConfig } from "../config/types.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import {
readProviderAllowFromStore,
upsertProviderPairingRequest,
} from "../pairing/pairing-store.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import type { RuntimeEnv } from "../runtime.js";
import {
buildMSTeamsAttachmentPlaceholder,
buildMSTeamsGraphMessageUrls,
buildMSTeamsMediaPayload,
downloadMSTeamsGraphMedia,
downloadMSTeamsImageAttachments,
type MSTeamsAttachmentLike,
summarizeMSTeamsHtmlAttachments,
} from "./attachments.js";
import type {
MSTeamsConversationStore,
StoredConversationReference,
} from "./conversation-store.js";
import {
classifyMSTeamsSendError,
formatMSTeamsSendErrorHint,
formatUnknownError,
} from "./errors.js";
import {
extractMSTeamsConversationMessageId,
normalizeMSTeamsConversationId,
parseMSTeamsActivityTimestamp,
stripMSTeamsMentionTags,
wasMSTeamsBotMentioned,
} from "./inbound.js";
import {
type MSTeamsAdapter,
renderReplyPayloadsToMessages,
sendMSTeamsMessages,
} from "./messenger.js";
import {
resolveMSTeamsReplyPolicy,
resolveMSTeamsRouteConfig,
} from "./policy.js";
import { extractMSTeamsPollVote, type MSTeamsPollStore } from "./polls.js";
import type { MSTeamsTurnContext } from "./sdk-types.js";
export type MSTeamsMonitorLogger = {
debug: (message: string, meta?: Record<string, unknown>) => void;
info: (message: string, meta?: Record<string, unknown>) => void;
error: (message: string, meta?: Record<string, unknown>) => void;
};
export type MSTeamsAccessTokenProvider = {
getAccessToken: (scope: string) => Promise<string>;
};
export type MSTeamsActivityHandler = {
onMessage: (
handler: (context: unknown, next: () => Promise<void>) => Promise<void>,
) => MSTeamsActivityHandler;
onMembersAdded: (
handler: (context: unknown, next: () => Promise<void>) => Promise<void>,
) => MSTeamsActivityHandler;
};
export type MSTeamsMessageHandlerDeps = {
cfg: ClawdbotConfig;
runtime: RuntimeEnv;
appId: string;
adapter: MSTeamsAdapter;
tokenProvider: MSTeamsAccessTokenProvider;
textLimit: number;
mediaMaxBytes: number;
conversationStore: MSTeamsConversationStore;
pollStore: MSTeamsPollStore;
log: MSTeamsMonitorLogger;
};
export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
handler: T,
deps: MSTeamsMessageHandlerDeps,
): T {
const handleTeamsMessage = createMSTeamsMessageHandler(deps);
return handler
.onMessage(async (context, next) => {
try {
await handleTeamsMessage(context as MSTeamsTurnContext);
} catch (err) {
deps.runtime.error?.(danger(`msteams handler failed: ${String(err)}`));
}
await next();
})
.onMembersAdded(async (context, next) => {
const membersAdded =
(context as MSTeamsTurnContext).activity?.membersAdded ?? [];
for (const member of membersAdded) {
if (
member.id !== (context as MSTeamsTurnContext).activity?.recipient?.id
) {
deps.log.debug("member added", { member: member.id });
// Don't send welcome message - let the user initiate conversation.
}
}
await next();
});
}
function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
const {
cfg,
runtime,
appId,
adapter,
tokenProvider,
textLimit,
mediaMaxBytes,
conversationStore,
pollStore,
log,
} = deps;
const msteamsCfg = cfg.msteams;
return async function handleTeamsMessage(context: MSTeamsTurnContext) {
const activity = context.activity;
const rawText = activity.text?.trim() ?? "";
const text = stripMSTeamsMentionTags(rawText);
const attachments = Array.isArray(activity.attachments)
? (activity.attachments as unknown as MSTeamsAttachmentLike[])
: [];
const attachmentPlaceholder =
buildMSTeamsAttachmentPlaceholder(attachments);
const rawBody = text || attachmentPlaceholder;
const from = activity.from;
const conversation = activity.conversation;
const attachmentTypes = attachments
.map((att) =>
typeof att.contentType === "string" ? att.contentType : undefined,
)
.filter(Boolean)
.slice(0, 3);
const htmlSummary = summarizeMSTeamsHtmlAttachments(attachments);
log.info("received message", {
rawText: rawText.slice(0, 50),
text: text.slice(0, 50),
attachments: attachments.length,
attachmentTypes,
from: from?.id,
conversation: conversation?.id,
});
if (htmlSummary) {
log.debug("html attachment summary", htmlSummary);
}
if (!from?.id) {
log.debug("skipping message without from.id");
return;
}
// Teams conversation.id may include ";messageid=..." suffix - strip it for session key
const rawConversationId = conversation?.id ?? "";
const conversationId = normalizeMSTeamsConversationId(rawConversationId);
const conversationMessageId =
extractMSTeamsConversationMessageId(rawConversationId);
const conversationType = conversation?.conversationType ?? "personal";
const isGroupChat =
conversationType === "groupChat" || conversation?.isGroup === true;
const isChannel = conversationType === "channel";
const isDirectMessage = !isGroupChat && !isChannel;
const senderName = from.name ?? from.id;
const senderId = from.aadObjectId ?? from.id;
// Check DM policy for direct messages
if (isDirectMessage && msteamsCfg) {
const dmPolicy = msteamsCfg.dmPolicy ?? "pairing";
const allowFrom = msteamsCfg.allowFrom ?? [];
if (dmPolicy === "disabled") {
log.debug("dropping dm (dms disabled)");
return;
}
if (dmPolicy !== "open") {
// Check allowlist - look up from config and pairing store
const storedAllowFrom = await readProviderAllowFromStore("msteams");
const effectiveAllowFrom = [
...allowFrom.map((v) => String(v).toLowerCase()),
...storedAllowFrom,
];
const senderLower = senderId.toLowerCase();
const senderNameLower = senderName.toLowerCase();
const allowed =
effectiveAllowFrom.includes("*") ||
effectiveAllowFrom.includes(senderLower) ||
effectiveAllowFrom.includes(senderNameLower);
if (!allowed) {
if (dmPolicy === "pairing") {
const request = await upsertProviderPairingRequest({
provider: "msteams",
sender: senderId,
label: senderName,
});
if (request) {
log.info("msteams pairing request created", {
sender: senderId,
label: senderName,
});
}
}
log.debug("dropping dm (not allowlisted)", {
sender: senderId,
label: senderName,
});
return;
}
}
}
// Build conversation reference for proactive replies
const agent = activity.recipient;
const teamId = activity.channelData?.team?.id;
const conversationRef: StoredConversationReference = {
activityId: activity.id,
user: { id: from.id, name: from.name, aadObjectId: from.aadObjectId },
agent,
bot: agent ? { id: agent.id, name: agent.name } : undefined,
conversation: {
id: conversationId,
conversationType,
tenantId: conversation?.tenantId,
},
teamId,
channelId: activity.channelId,
serviceUrl: activity.serviceUrl,
locale: activity.locale,
};
conversationStore.upsert(conversationId, conversationRef).catch((err) => {
log.debug("failed to save conversation reference", {
error: formatUnknownError(err),
});
});
const pollVote = extractMSTeamsPollVote(activity);
if (pollVote) {
try {
const poll = await pollStore.recordVote({
pollId: pollVote.pollId,
voterId: senderId,
selections: pollVote.selections,
});
if (!poll) {
log.debug("poll vote ignored (poll not found)", {
pollId: pollVote.pollId,
});
} else {
log.info("recorded poll vote", {
pollId: pollVote.pollId,
voter: senderId,
selections: pollVote.selections,
});
}
} catch (err) {
log.error("failed to record poll vote", {
pollId: pollVote.pollId,
error: formatUnknownError(err),
});
}
return;
}
if (!rawBody) {
log.debug("skipping empty message after stripping mentions");
return;
}
// Build Teams-specific identifiers
const teamsFrom = isDirectMessage
? `msteams:${senderId}`
: isChannel
? `msteams:channel:${conversationId}`
: `msteams:group:${conversationId}`;
const teamsTo = isDirectMessage
? `user:${senderId}`
: `conversation:${conversationId}`;
// Resolve routing
const route = resolveAgentRoute({
cfg,
provider: "msteams",
peer: {
kind: isDirectMessage ? "dm" : isChannel ? "channel" : "group",
id: isDirectMessage ? senderId : conversationId,
},
});
const preview = rawBody.replace(/\s+/g, " ").slice(0, 160);
const inboundLabel = isDirectMessage
? `Teams DM from ${senderName}`
: `Teams message in ${conversationType} from ${senderName}`;
enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
sessionKey: route.sessionKey,
contextKey: `msteams:message:${conversationId}:${activity.id ?? "unknown"}`,
});
// Resolve team/channel config for channels and group chats
const channelId = conversationId;
const { teamConfig, channelConfig } = resolveMSTeamsRouteConfig({
cfg: msteamsCfg,
teamId,
conversationId: channelId,
});
const { requireMention, replyStyle } = resolveMSTeamsReplyPolicy({
isDirectMessage,
globalConfig: msteamsCfg,
teamConfig,
channelConfig,
});
// Check requireMention for channels and group chats
if (!isDirectMessage) {
const mentioned = wasMSTeamsBotMentioned(activity);
if (requireMention && !mentioned) {
log.debug("skipping message (mention required)", {
teamId,
channelId,
requireMention,
mentioned,
});
return;
}
}
// Format the message body with envelope
const timestamp = parseMSTeamsActivityTimestamp(activity.timestamp);
let mediaList = await downloadMSTeamsImageAttachments({
attachments,
maxBytes: mediaMaxBytes,
tokenProvider: {
getAccessToken: (scope) => tokenProvider.getAccessToken(scope),
},
allowHosts: msteamsCfg?.mediaAllowHosts,
});
if (mediaList.length === 0) {
const onlyHtmlAttachments =
attachments.length > 0 &&
attachments.every((att) =>
String(att.contentType ?? "").startsWith("text/html"),
);
if (onlyHtmlAttachments) {
const messageUrls = buildMSTeamsGraphMessageUrls({
conversationType,
conversationId,
messageId: activity.id ?? undefined,
replyToId: activity.replyToId ?? undefined,
conversationMessageId,
channelData: activity.channelData,
});
if (messageUrls.length === 0) {
log.debug("graph message url unavailable", {
conversationType,
hasChannelData: Boolean(activity.channelData),
messageId: activity.id ?? undefined,
replyToId: activity.replyToId ?? undefined,
});
} else {
const attempts: Array<{
url: string;
hostedStatus?: number;
attachmentStatus?: number;
hostedCount?: number;
attachmentCount?: number;
tokenError?: boolean;
}> = [];
for (const messageUrl of messageUrls) {
const graphMedia = await downloadMSTeamsGraphMedia({
messageUrl,
tokenProvider: {
getAccessToken: (scope) => tokenProvider.getAccessToken(scope),
},
maxBytes: mediaMaxBytes,
allowHosts: msteamsCfg?.mediaAllowHosts,
});
attempts.push({
url: messageUrl,
hostedStatus: graphMedia.hostedStatus,
attachmentStatus: graphMedia.attachmentStatus,
hostedCount: graphMedia.hostedCount,
attachmentCount: graphMedia.attachmentCount,
tokenError: graphMedia.tokenError,
});
if (graphMedia.media.length > 0) {
mediaList = graphMedia.media;
break;
}
if (graphMedia.tokenError) break;
}
if (mediaList.length === 0) {
log.debug("graph media fetch empty", { attempts });
}
}
}
}
if (mediaList.length > 0) {
log.debug("downloaded image attachments", { count: mediaList.length });
} else if (htmlSummary?.imgTags) {
log.debug("inline images detected but none downloaded", {
imgTags: htmlSummary.imgTags,
srcHosts: htmlSummary.srcHosts,
dataImages: htmlSummary.dataImages,
cidImages: htmlSummary.cidImages,
});
}
const mediaPayload = buildMSTeamsMediaPayload(mediaList);
const body = formatAgentEnvelope({
provider: "Teams",
from: senderName,
timestamp,
body: rawBody,
});
// Build context payload for agent
const ctxPayload = {
Body: body,
From: teamsFrom,
To: teamsTo,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isDirectMessage ? "direct" : isChannel ? "room" : "group",
GroupSubject: !isDirectMessage ? conversationType : undefined,
SenderName: senderName,
SenderId: senderId,
Provider: "msteams" as const,
Surface: "msteams" as const,
MessageSid: activity.id,
Timestamp: timestamp?.getTime() ?? Date.now(),
WasMentioned: isDirectMessage || wasMSTeamsBotMentioned(activity),
CommandAuthorized: true,
OriginatingChannel: "msteams" as const,
OriginatingTo: teamsTo,
...mediaPayload,
};
if (shouldLogVerbose()) {
logVerbose(
`msteams inbound: from=${ctxPayload.From} preview="${preview}"`,
);
}
// Send typing indicator
const sendTypingIndicator = async () => {
try {
await context.sendActivities([{ type: "typing" }]);
} catch {
// Typing indicator is best-effort.
}
};
// Create reply dispatcher
const { dispatcher, replyOptions, markDispatchIdle } =
createReplyDispatcherWithTyping({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
const messages = renderReplyPayloadsToMessages([payload], {
textChunkLimit: textLimit,
chunkText: true,
mediaMode: "split",
});
await sendMSTeamsMessages({
replyStyle,
adapter,
appId,
conversationRef,
context,
messages,
// Enable default retry/backoff for throttling/transient failures.
retry: {},
onRetry: (event) => {
log.debug("retrying send", {
replyStyle,
...event,
});
},
});
},
onError: (err, info) => {
const errMsg = formatUnknownError(err);
const classification = classifyMSTeamsSendError(err);
const hint = formatMSTeamsSendErrorHint(classification);
runtime.error?.(
danger(
`msteams ${info.kind} reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`,
),
);
log.error("reply failed", {
kind: info.kind,
error: errMsg,
classification,
hint,
});
},
onReplyStart: sendTypingIndicator,
});
// Dispatch to agent
log.info("dispatching to agent", { sessionKey: route.sessionKey });
try {
const { queuedFinal, counts } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions,
});
markDispatchIdle();
log.info("dispatch complete", { queuedFinal, counts });
if (!queuedFinal) return;
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(
`msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`,
);
}
} catch (err) {
log.error("dispatch failed", { error: String(err) });
runtime.error?.(danger(`msteams dispatch failed: ${String(err)}`));
// Try to send error message back to Teams.
try {
await context.sendActivity(
`⚠️ Agent failed: ${err instanceof Error ? err.message : String(err)}`,
);
} catch {
// Best effort.
}
}
};
}

View File

@@ -1,59 +1,14 @@
import type { Request, Response } from "express";
import { resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import type { ClawdbotConfig } from "../config/types.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import {
readProviderAllowFromStore,
upsertProviderPairingRequest,
} from "../pairing/pairing-store.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import type { RuntimeEnv } from "../runtime.js";
import {
buildMSTeamsAttachmentPlaceholder,
buildMSTeamsGraphMessageUrls,
buildMSTeamsMediaPayload,
downloadMSTeamsGraphMedia,
downloadMSTeamsImageAttachments,
type MSTeamsAttachmentLike,
summarizeMSTeamsHtmlAttachments,
} from "./attachments.js";
import type {
MSTeamsConversationStore,
StoredConversationReference,
} from "./conversation-store.js";
import type { MSTeamsConversationStore } from "./conversation-store.js";
import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import {
classifyMSTeamsSendError,
formatMSTeamsSendErrorHint,
formatUnknownError,
} from "./errors.js";
import {
extractMSTeamsConversationMessageId,
normalizeMSTeamsConversationId,
parseMSTeamsActivityTimestamp,
stripMSTeamsMentionTags,
wasMSTeamsBotMentioned,
} from "./inbound.js";
import {
type MSTeamsAdapter,
renderReplyPayloadsToMessages,
sendMSTeamsMessages,
} from "./messenger.js";
import {
resolveMSTeamsReplyPolicy,
resolveMSTeamsRouteConfig,
} from "./policy.js";
import {
createMSTeamsPollStoreFs,
extractMSTeamsPollVote,
type MSTeamsPollStore,
} from "./polls.js";
import type { MSTeamsTurnContext } from "./sdk-types.js";
import { formatUnknownError } from "./errors.js";
import type { MSTeamsAdapter } from "./messenger.js";
import { registerMSTeamsHandlers } from "./monitor-handler.js";
import { createMSTeamsPollStoreFs, type MSTeamsPollStore } from "./polls.js";
import { resolveMSTeamsCredentials } from "./token.js";
const log = getChildLogger({ name: "msteams" });
@@ -130,448 +85,18 @@ export async function monitorMSTeamsProvider(
const tokenProvider = new MsalTokenProvider(authConfig);
const adapter = new CloudAdapter(authConfig);
// Handler for incoming messages
async function handleTeamsMessage(context: MSTeamsTurnContext) {
const activity = context.activity;
const rawText = activity.text?.trim() ?? "";
const text = stripMSTeamsMentionTags(rawText);
const attachments = Array.isArray(activity.attachments)
? (activity.attachments as unknown as MSTeamsAttachmentLike[])
: [];
const attachmentPlaceholder =
buildMSTeamsAttachmentPlaceholder(attachments);
const rawBody = text || attachmentPlaceholder;
const from = activity.from;
const conversation = activity.conversation;
const attachmentTypes = attachments
.map((att) =>
typeof att.contentType === "string" ? att.contentType : undefined,
)
.filter(Boolean)
.slice(0, 3);
const htmlSummary = summarizeMSTeamsHtmlAttachments(attachments);
log.info("received message", {
rawText: rawText.slice(0, 50),
text: text.slice(0, 50),
attachments: attachments.length,
attachmentTypes,
from: from?.id,
conversation: conversation?.id,
});
if (htmlSummary) {
log.debug("html attachment summary", htmlSummary);
}
if (!from?.id) {
log.debug("skipping message without from.id");
return;
}
// Teams conversation.id may include ";messageid=..." suffix - strip it for session key
const rawConversationId = conversation?.id ?? "";
const conversationId = normalizeMSTeamsConversationId(rawConversationId);
const conversationMessageId =
extractMSTeamsConversationMessageId(rawConversationId);
const conversationType = conversation?.conversationType ?? "personal";
const isGroupChat =
conversationType === "groupChat" || conversation?.isGroup === true;
const isChannel = conversationType === "channel";
const isDirectMessage = !isGroupChat && !isChannel;
const senderName = from.name ?? from.id;
const senderId = from.aadObjectId ?? from.id;
// Check DM policy for direct messages
if (isDirectMessage && msteamsCfg) {
const dmPolicy = msteamsCfg.dmPolicy ?? "pairing";
const allowFrom = msteamsCfg.allowFrom ?? [];
if (dmPolicy === "disabled") {
log.debug("dropping dm (dms disabled)");
return;
}
if (dmPolicy !== "open") {
// Check allowlist - look up from config and pairing store
const storedAllowFrom = await readProviderAllowFromStore("msteams");
const effectiveAllowFrom = [
...allowFrom.map((v) => String(v).toLowerCase()),
...storedAllowFrom.map((v) => v.toLowerCase()),
];
const senderLower = senderId.toLowerCase();
const permitted = effectiveAllowFrom.some(
(entry) => entry === senderLower || entry === "*",
);
if (!permitted) {
if (dmPolicy === "pairing") {
const { code, created } = await upsertProviderPairingRequest({
provider: "msteams",
id: senderId,
meta: { name: senderName },
});
const msg = created
? `👋 Hi ${senderName}! To chat with me, please share this pairing code with my owner: **${code}**`
: `🔑 Your pairing code is: **${code}** — please share it with my owner to get access.`;
await context.sendActivity(msg);
log.info("sent pairing code", { senderId, code });
} else {
log.debug("dropping unauthorized dm", { senderId, dmPolicy });
}
return;
}
}
}
// Save conversation reference for proactive messaging
const agent = activity.recipient
? {
id: activity.recipient.id,
name: activity.recipient.name,
aadObjectId: activity.recipient.aadObjectId,
}
: undefined;
const teamId = activity.channelData?.team?.id;
const conversationRef: StoredConversationReference = {
activityId: activity.id,
user: { id: from.id, name: from.name, aadObjectId: from.aadObjectId },
agent,
bot: agent ? { id: agent.id, name: agent.name } : undefined,
conversation: {
id: conversationId,
conversationType,
tenantId: conversation?.tenantId,
},
teamId,
channelId: activity.channelId,
serviceUrl: activity.serviceUrl,
};
conversationStore.upsert(conversationId, conversationRef).catch((err) => {
log.debug("failed to save conversation reference", {
error: formatUnknownError(err),
});
});
const pollVote = extractMSTeamsPollVote(activity);
if (pollVote) {
try {
const poll = await pollStore.recordVote({
pollId: pollVote.pollId,
voterId: senderId,
selections: pollVote.selections,
});
if (!poll) {
log.debug("poll vote ignored (poll not found)", {
pollId: pollVote.pollId,
});
} else {
log.info("recorded poll vote", {
pollId: pollVote.pollId,
voter: senderId,
selections: pollVote.selections,
});
}
} catch (err) {
log.error("failed to record poll vote", {
pollId: pollVote.pollId,
error: formatUnknownError(err),
});
}
return;
}
if (!rawBody) {
log.debug("skipping empty message after stripping mentions");
return;
}
// Build Teams-specific identifiers
const teamsFrom = isDirectMessage
? `msteams:${senderId}`
: isChannel
? `msteams:channel:${conversationId}`
: `msteams:group:${conversationId}`;
const teamsTo = isDirectMessage
? `user:${senderId}`
: `conversation:${conversationId}`;
// Resolve routing
const route = resolveAgentRoute({
cfg,
provider: "msteams",
peer: {
kind: isDirectMessage ? "dm" : isChannel ? "channel" : "group",
id: isDirectMessage ? senderId : conversationId,
},
});
const preview = rawBody.replace(/\s+/g, " ").slice(0, 160);
const inboundLabel = isDirectMessage
? `Teams DM from ${senderName}`
: `Teams message in ${conversationType} from ${senderName}`;
enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
sessionKey: route.sessionKey,
contextKey: `msteams:message:${conversationId}:${activity.id ?? "unknown"}`,
});
// Resolve team/channel config for channels and group chats
const channelId = conversationId;
const { teamConfig, channelConfig } = resolveMSTeamsRouteConfig({
cfg: msteamsCfg,
teamId,
conversationId: channelId,
});
const { requireMention, replyStyle } = resolveMSTeamsReplyPolicy({
isDirectMessage,
globalConfig: msteamsCfg,
teamConfig,
channelConfig,
});
// Check requireMention for channels and group chats
if (!isDirectMessage) {
const mentioned = wasMSTeamsBotMentioned(activity);
if (requireMention && !mentioned) {
log.debug("skipping message (mention required)", {
teamId,
channelId,
requireMention,
mentioned,
});
return;
}
}
// Format the message body with envelope
const timestamp = parseMSTeamsActivityTimestamp(activity.timestamp);
let mediaList = await downloadMSTeamsImageAttachments({
attachments,
maxBytes: mediaMaxBytes,
tokenProvider: {
getAccessToken: (scope) => tokenProvider.getAccessToken(scope),
},
allowHosts: msteamsCfg?.mediaAllowHosts,
});
if (mediaList.length === 0) {
const onlyHtmlAttachments =
attachments.length > 0 &&
attachments.every((att) =>
String(att.contentType ?? "").startsWith("text/html"),
);
if (onlyHtmlAttachments) {
const messageUrls = buildMSTeamsGraphMessageUrls({
conversationType,
conversationId,
messageId: activity.id ?? undefined,
replyToId: activity.replyToId ?? undefined,
conversationMessageId,
channelData: activity.channelData,
});
if (messageUrls.length === 0) {
log.debug("graph message url unavailable", {
conversationType,
hasChannelData: Boolean(activity.channelData),
messageId: activity.id ?? undefined,
replyToId: activity.replyToId ?? undefined,
});
} else {
const attempts: Array<{
url: string;
hostedStatus?: number;
attachmentStatus?: number;
hostedCount?: number;
attachmentCount?: number;
tokenError?: boolean;
}> = [];
for (const messageUrl of messageUrls) {
const graphMedia = await downloadMSTeamsGraphMedia({
messageUrl,
tokenProvider: {
getAccessToken: (scope) => tokenProvider.getAccessToken(scope),
},
maxBytes: mediaMaxBytes,
allowHosts: msteamsCfg?.mediaAllowHosts,
});
attempts.push({
url: messageUrl,
hostedStatus: graphMedia.hostedStatus,
attachmentStatus: graphMedia.attachmentStatus,
hostedCount: graphMedia.hostedCount,
attachmentCount: graphMedia.attachmentCount,
tokenError: graphMedia.tokenError,
});
if (graphMedia.media.length > 0) {
mediaList = graphMedia.media;
break;
}
if (graphMedia.tokenError) break;
}
if (mediaList.length === 0) {
log.debug("graph media fetch empty", { attempts });
}
}
}
}
if (mediaList.length > 0) {
log.debug("downloaded image attachments", { count: mediaList.length });
} else if (htmlSummary?.imgTags) {
log.debug("inline images detected but none downloaded", {
imgTags: htmlSummary.imgTags,
srcHosts: htmlSummary.srcHosts,
dataImages: htmlSummary.dataImages,
cidImages: htmlSummary.cidImages,
});
}
const mediaPayload = buildMSTeamsMediaPayload(mediaList);
const body = formatAgentEnvelope({
provider: "Teams",
from: senderName,
timestamp,
body: rawBody,
});
// Build context payload for agent
const ctxPayload = {
Body: body,
From: teamsFrom,
To: teamsTo,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isDirectMessage ? "direct" : isChannel ? "room" : "group",
GroupSubject: !isDirectMessage ? conversationType : undefined,
SenderName: senderName,
SenderId: senderId,
Provider: "msteams" as const,
Surface: "msteams" as const,
MessageSid: activity.id,
Timestamp: timestamp?.getTime() ?? Date.now(),
WasMentioned: isDirectMessage || wasMSTeamsBotMentioned(activity),
CommandAuthorized: true,
OriginatingChannel: "msteams" as const,
OriginatingTo: teamsTo,
...mediaPayload,
};
if (shouldLogVerbose()) {
logVerbose(
`msteams inbound: from=${ctxPayload.From} preview="${preview}"`,
);
}
// Send typing indicator
const sendTypingIndicator = async () => {
try {
await context.sendActivities([{ type: "typing" }]);
} catch {
// Typing indicator is best-effort
}
};
// Create reply dispatcher
const { dispatcher, replyOptions, markDispatchIdle } =
createReplyDispatcherWithTyping({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
const messages = renderReplyPayloadsToMessages([payload], {
textChunkLimit: textLimit,
chunkText: true,
mediaMode: "split",
});
await sendMSTeamsMessages({
replyStyle,
adapter: adapter as unknown as MSTeamsAdapter,
appId,
conversationRef,
context,
messages,
// Enable default retry/backoff for throttling/transient failures.
retry: {},
onRetry: (event) => {
log.debug("retrying send", {
replyStyle,
...event,
});
},
});
},
onError: (err, info) => {
const errMsg = formatUnknownError(err);
const classification = classifyMSTeamsSendError(err);
const hint = formatMSTeamsSendErrorHint(classification);
runtime.error?.(
danger(
`msteams ${info.kind} reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`,
),
);
log.error("reply failed", {
kind: info.kind,
error: errMsg,
classification,
hint,
});
},
onReplyStart: sendTypingIndicator,
});
// Dispatch to agent
log.info("dispatching to agent", { sessionKey: route.sessionKey });
try {
const { queuedFinal, counts } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions,
});
markDispatchIdle();
log.info("dispatch complete", { queuedFinal, counts });
if (!queuedFinal) return;
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(
`msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`,
);
}
} catch (err) {
log.error("dispatch failed", { error: String(err) });
runtime.error?.(danger(`msteams dispatch failed: ${String(err)}`));
// Try to send error message back to Teams
try {
await context.sendActivity(
`⚠️ Agent failed: ${err instanceof Error ? err.message : String(err)}`,
);
} catch {
// Best effort
}
}
}
// Create activity handler using fluent API
const handler = new ActivityHandler()
.onMessage(async (context, next) => {
try {
await handleTeamsMessage(context as unknown as MSTeamsTurnContext);
} catch (err) {
runtime.error?.(danger(`msteams handler failed: ${String(err)}`));
}
await next();
})
.onMembersAdded(async (context, next) => {
const membersAdded = context.activity?.membersAdded ?? [];
for (const member of membersAdded) {
if (member.id !== context.activity?.recipient?.id) {
log.debug("member added", { member: member.id });
// Don't send welcome message - let the user initiate conversation
}
}
await next();
});
const handler = registerMSTeamsHandlers(new ActivityHandler(), {
cfg,
runtime,
appId,
adapter: adapter as unknown as MSTeamsAdapter,
tokenProvider,
textLimit,
mediaMaxBytes,
conversationStore,
pollStore,
log,
});
// Create Express server
const expressApp = express.default();

View File

@@ -0,0 +1,25 @@
import { describe, expect, it } from "vitest";
import { createMSTeamsPollStoreMemory } from "./polls-store-memory.js";
describe("msteams poll memory store", () => {
it("stores polls and records normalized votes", async () => {
const store = createMSTeamsPollStoreMemory();
await store.createPoll({
id: "poll-1",
question: "Lunch?",
options: ["Pizza", "Sushi"],
maxSelections: 1,
createdAt: new Date().toISOString(),
votes: {},
});
const poll = await store.recordVote({
pollId: "poll-1",
voterId: "user-1",
selections: ["0", "1"],
});
expect(poll?.votes["user-1"]).toEqual(["0"]);
});
});

View File

@@ -0,0 +1,36 @@
import {
type MSTeamsPoll,
type MSTeamsPollStore,
normalizeMSTeamsPollSelections,
} from "./polls.js";
export function createMSTeamsPollStoreMemory(
initial: MSTeamsPoll[] = [],
): MSTeamsPollStore {
const polls = new Map<string, MSTeamsPoll>();
for (const poll of initial) {
polls.set(poll.id, { ...poll });
}
const createPoll = async (poll: MSTeamsPoll) => {
polls.set(poll.id, { ...poll });
};
const getPoll = async (pollId: string) => polls.get(pollId) ?? null;
const recordVote = async (params: {
pollId: string;
voterId: string;
selections: string[];
}) => {
const poll = polls.get(params.pollId);
if (!poll) return null;
const normalized = normalizeMSTeamsPollSelections(poll, params.selections);
poll.votes[params.voterId] = normalized;
poll.updatedAt = new Date().toISOString();
polls.set(poll.id, poll);
return poll;
};
return { createPoll, getPoll, recordVote };
}

View File

@@ -232,13 +232,23 @@ export function buildMSTeamsPollCard(params: {
};
}
function resolveStorePath(
env: NodeJS.ProcessEnv = process.env,
homedir?: () => string,
): string {
const stateDir = homedir
? resolveStateDir(env, homedir)
: resolveStateDir(env);
export type MSTeamsPollStoreFsOptions = {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
stateDir?: string;
storePath?: string;
};
function resolveStorePath(params?: MSTeamsPollStoreFsOptions): string {
if (params?.storePath) {
return params.storePath;
}
if (params?.stateDir) {
return path.join(params.stateDir, STORE_FILENAME);
}
const stateDir = params?.homedir
? resolveStateDir(params.env ?? process.env, params.homedir)
: resolveStateDir(params?.env ?? process.env);
return path.join(stateDir, STORE_FILENAME);
}
@@ -336,7 +346,10 @@ function pruneToLimit(polls: Record<string, MSTeamsPoll>) {
return Object.fromEntries(keep);
}
function normalizePollSelections(poll: MSTeamsPoll, selections: string[]) {
export function normalizeMSTeamsPollSelections(
poll: MSTeamsPoll,
selections: string[],
) {
const maxSelections = Math.max(1, poll.maxSelections);
const mapped = selections
.map((entry) => Number.parseInt(entry, 10))
@@ -348,11 +361,10 @@ function normalizePollSelections(poll: MSTeamsPoll, selections: string[]) {
return Array.from(new Set(limited));
}
export function createMSTeamsPollStoreFs(params?: {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
}): MSTeamsPollStore {
const filePath = resolveStorePath(params?.env, params?.homedir);
export function createMSTeamsPollStoreFs(
params?: MSTeamsPollStoreFsOptions,
): MSTeamsPollStore {
const filePath = resolveStorePath(params);
const empty: PollStoreData = { version: 1, polls: {} };
const readStore = async (): Promise<PollStoreData> => {
@@ -388,7 +400,10 @@ export function createMSTeamsPollStoreFs(params?: {
const data = await readStore();
const poll = data.polls[params.pollId];
if (!poll) return null;
const normalized = normalizePollSelections(poll, params.selections);
const normalized = normalizeMSTeamsPollSelections(
poll,
params.selections,
);
poll.votes[params.voterId] = normalized;
poll.updatedAt = new Date().toISOString();
data.polls[poll.id] = poll;

View File

@@ -117,6 +117,66 @@ function extractMessageId(response: unknown): string | null {
return id;
}
type MSTeamsProactiveContext = {
appId: string;
conversationId: string;
ref: StoredConversationReference;
adapter: MSTeamsAdapter;
log: Awaited<ReturnType<typeof getLog>>;
};
async function resolveMSTeamsSendContext(params: {
cfg: ClawdbotConfig;
to: string;
}): Promise<MSTeamsProactiveContext> {
const msteamsCfg = params.cfg.msteams;
if (!msteamsCfg?.enabled) {
throw new Error("msteams provider is not enabled");
}
const creds = resolveMSTeamsCredentials(msteamsCfg);
if (!creds) {
throw new Error("msteams credentials not configured");
}
const store = createMSTeamsConversationStoreFs();
// Parse recipient and find conversation reference
const recipient = parseRecipient(params.to);
const found = await findConversationReference({ ...recipient, store });
if (!found) {
throw new Error(
`No conversation reference found for ${recipient.type}:${recipient.id}. ` +
`The bot must receive a message from this conversation before it can send proactively.`,
);
}
const { conversationId, ref } = found;
const log = await getLog();
// Dynamic import to avoid loading SDK when not needed
const agentsHosting = await import("@microsoft/agents-hosting");
const { CloudAdapter, getAuthConfigWithDefaults } = agentsHosting;
const authConfig = getAuthConfigWithDefaults({
clientId: creds.appId,
clientSecret: creds.appPassword,
tenantId: creds.tenantId,
});
const adapter = new CloudAdapter(authConfig);
return {
appId: creds.appId,
conversationId,
ref,
adapter: adapter as unknown as MSTeamsAdapter,
log,
};
}
async function sendMSTeamsActivity(params: {
adapter: MSTeamsAdapter;
appId: string;
@@ -151,33 +211,11 @@ export async function sendMessageMSTeams(
params: SendMSTeamsMessageParams,
): Promise<SendMSTeamsMessageResult> {
const { cfg, to, text, mediaUrl } = params;
const msteamsCfg = cfg.msteams;
if (!msteamsCfg?.enabled) {
throw new Error("msteams provider is not enabled");
}
const creds = resolveMSTeamsCredentials(msteamsCfg);
if (!creds) {
throw new Error("msteams credentials not configured");
}
const store = createMSTeamsConversationStoreFs();
// Parse recipient and find conversation reference
const recipient = parseRecipient(to);
const found = await findConversationReference({ ...recipient, store });
if (!found) {
throw new Error(
`No conversation reference found for ${recipient.type}:${recipient.id}. ` +
`The bot must receive a message from this conversation before it can send proactively.`,
);
}
const { conversationId, ref } = found;
const log = await getLog();
const { adapter, appId, conversationId, ref, log } =
await resolveMSTeamsSendContext({
cfg,
to,
});
log.debug("sending proactive message", {
conversationId,
@@ -185,18 +223,6 @@ export async function sendMessageMSTeams(
hasMedia: Boolean(mediaUrl),
});
// Dynamic import to avoid loading SDK when not needed
const agentsHosting = await import("@microsoft/agents-hosting");
const { CloudAdapter, getAuthConfigWithDefaults } = agentsHosting;
const authConfig = getAuthConfigWithDefaults({
clientId: creds.appId,
clientSecret: creds.appPassword,
tenantId: creds.tenantId,
});
const adapter = new CloudAdapter(authConfig);
const message = mediaUrl
? text
? `${text}\n\n${mediaUrl}`
@@ -206,8 +232,8 @@ export async function sendMessageMSTeams(
try {
messageIds = await sendMSTeamsMessages({
replyStyle: "top-level",
adapter: adapter as unknown as MSTeamsAdapter,
appId: creds.appId,
adapter,
appId,
conversationRef: ref,
messages: [message],
// Enable default retry/backoff for throttling/transient failures.
@@ -243,30 +269,11 @@ export async function sendPollMSTeams(
params: SendMSTeamsPollParams,
): Promise<SendMSTeamsPollResult> {
const { cfg, to, question, options, maxSelections } = params;
const msteamsCfg = cfg.msteams;
if (!msteamsCfg?.enabled) {
throw new Error("msteams provider is not enabled");
}
const creds = resolveMSTeamsCredentials(msteamsCfg);
if (!creds) {
throw new Error("msteams credentials not configured");
}
const store = createMSTeamsConversationStoreFs();
const recipient = parseRecipient(to);
const found = await findConversationReference({ ...recipient, store });
if (!found) {
throw new Error(
`No conversation reference found for ${recipient.type}:${recipient.id}. ` +
`The bot must receive a message from this conversation before it can send proactively.`,
);
}
const { conversationId, ref } = found;
const log = await getLog();
const { adapter, appId, conversationId, ref, log } =
await resolveMSTeamsSendContext({
cfg,
to,
});
const pollCard = buildMSTeamsPollCard({
question,
@@ -280,16 +287,6 @@ export async function sendPollMSTeams(
optionCount: pollCard.options.length,
});
const agentsHosting = await import("@microsoft/agents-hosting");
const { CloudAdapter, getAuthConfigWithDefaults } = agentsHosting;
const authConfig = getAuthConfigWithDefaults({
clientId: creds.appId,
clientSecret: creds.appPassword,
tenantId: creds.tenantId,
});
const adapter = new CloudAdapter(authConfig);
const activity = {
type: "message",
text: pollCard.fallbackText,
@@ -304,8 +301,8 @@ export async function sendPollMSTeams(
let messageId: string;
try {
messageId = await sendMSTeamsActivity({
adapter: adapter as unknown as MSTeamsAdapter,
appId: creds.appId,
adapter,
appId,
conversationRef: ref,
activity,
});