From 6b107e9e74967a295c5893870c48fe6101c6c1ff Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 11:18:33 +0100 Subject: [PATCH] refactor(msteams): consolidate stores and send context --- src/msteams/conversation-store-fs.ts | 84 +------------------ src/msteams/messenger.ts | 5 ++ src/msteams/monitor-handler.ts | 47 +++++------ src/msteams/monitor.ts | 7 +- src/msteams/polls.ts | 83 +------------------ src/msteams/sdk.ts | 9 ++- src/msteams/send-context.ts | 117 +++++++++++++++++++++++++++ src/msteams/send.ts | 116 +------------------------- src/msteams/store-fs.ts | 86 ++++++++++++++++++++ 9 files changed, 246 insertions(+), 308 deletions(-) create mode 100644 src/msteams/send-context.ts create mode 100644 src/msteams/store-fs.ts diff --git a/src/msteams/conversation-store-fs.ts b/src/msteams/conversation-store-fs.ts index 3b1b3bc93..b0ba6fbaf 100644 --- a/src/msteams/conversation-store-fs.ts +++ b/src/msteams/conversation-store-fs.ts @@ -1,15 +1,10 @@ -import crypto from "node:crypto"; -import fs from "node:fs"; -import path from "node:path"; - -import lockfile from "proper-lockfile"; - import type { MSTeamsConversationStore, MSTeamsConversationStoreEntry, StoredConversationReference, } from "./conversation-store.js"; import { resolveMSTeamsStorePath } from "./storage.js"; +import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js"; type ConversationStoreData = { version: 1; @@ -22,83 +17,6 @@ type ConversationStoreData = { const STORE_FILENAME = "msteams-conversations.json"; const MAX_CONVERSATIONS = 1000; const CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000; -const STORE_LOCK_OPTIONS = { - retries: { - retries: 10, - factor: 2, - minTimeout: 100, - maxTimeout: 10_000, - randomize: true, - }, - stale: 30_000, -} as const; - -function safeParseJson(raw: string): T | null { - try { - return JSON.parse(raw) as T; - } catch { - return null; - } -} - -async function readJsonFile( - filePath: string, - fallback: T, -): Promise<{ value: T; exists: boolean }> { - try { - const raw = await fs.promises.readFile(filePath, "utf-8"); - const parsed = safeParseJson(raw); - if (parsed == null) return { value: fallback, exists: true }; - return { value: parsed, exists: true }; - } catch (err) { - const code = (err as { code?: string }).code; - if (code === "ENOENT") return { value: fallback, exists: false }; - return { value: fallback, exists: false }; - } -} - -async function writeJsonFile(filePath: string, value: unknown): Promise { - const dir = path.dirname(filePath); - await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 }); - const tmp = path.join( - dir, - `${path.basename(filePath)}.${crypto.randomUUID()}.tmp`, - ); - await fs.promises.writeFile(tmp, `${JSON.stringify(value, null, 2)}\n`, { - encoding: "utf-8", - }); - await fs.promises.chmod(tmp, 0o600); - await fs.promises.rename(tmp, filePath); -} - -async function ensureJsonFile(filePath: string, fallback: unknown) { - try { - await fs.promises.access(filePath); - } catch { - await writeJsonFile(filePath, fallback); - } -} - -async function withFileLock( - filePath: string, - fallback: unknown, - fn: () => Promise, -): Promise { - await ensureJsonFile(filePath, fallback); - let release: (() => Promise) | undefined; - try { - release = await lockfile.lock(filePath, STORE_LOCK_OPTIONS); - return await fn(); - } finally { - if (release) { - try { - await release(); - } catch { - // ignore unlock errors - } - } - } -} function parseTimestamp(value: string | undefined): number | null { if (!value) return null; diff --git a/src/msteams/messenger.ts b/src/msteams/messenger.ts index 7b45993cf..82c970002 100644 --- a/src/msteams/messenger.ts +++ b/src/msteams/messenger.ts @@ -25,6 +25,11 @@ export type MSTeamsAdapter = { reference: MSTeamsConversationReference, logic: (context: SendContext) => Promise, ) => Promise; + process: ( + req: unknown, + res: unknown, + logic: (context: unknown) => Promise, + ) => Promise; }; export type MSTeamsReplyRenderOptions = { diff --git a/src/msteams/monitor-handler.ts b/src/msteams/monitor-handler.ts index 05476f700..e10ec1c35 100644 --- a/src/msteams/monitor-handler.ts +++ b/src/msteams/monitor-handler.ts @@ -71,29 +71,30 @@ export function registerMSTeamsHandlers( deps: MSTeamsMessageHandlerDeps, ): T { const handleTeamsMessage = createMSTeamsMessageHandler(deps); + handler.onMessage(async (context, next) => { + try { + await handleTeamsMessage(context as MSTeamsTurnContext); + } catch (err) { + deps.runtime.error?.(danger(`msteams handler failed: ${String(err)}`)); + } + await next(); + }); - return handler - .onMessage(async (context, next) => { - try { - await handleTeamsMessage(context as MSTeamsTurnContext); - } catch (err) { - deps.runtime.error?.(danger(`msteams handler failed: ${String(err)}`)); + handler.onMembersAdded(async (context, next) => { + const membersAdded = + (context as MSTeamsTurnContext).activity?.membersAdded ?? []; + for (const member of membersAdded) { + if ( + member.id !== (context as MSTeamsTurnContext).activity?.recipient?.id + ) { + deps.log.debug("member added", { member: member.id }); + // Don't send welcome message - let the user initiate conversation. } - await next(); - }) - .onMembersAdded(async (context, next) => { - const membersAdded = - (context as MSTeamsTurnContext).activity?.membersAdded ?? []; - for (const member of membersAdded) { - if ( - member.id !== (context as MSTeamsTurnContext).activity?.recipient?.id - ) { - deps.log.debug("member added", { member: member.id }); - // Don't send welcome message - let the user initiate conversation. - } - } - await next(); - }); + } + await next(); + }); + + return handler; } function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { @@ -192,8 +193,8 @@ function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { if (dmPolicy === "pairing") { const request = await upsertProviderPairingRequest({ provider: "msteams", - sender: senderId, - label: senderName, + id: senderId, + meta: { name: senderName }, }); if (request) { log.info("msteams pairing request created", { diff --git a/src/msteams/monitor.ts b/src/msteams/monitor.ts index f859daaa2..a137cd190 100644 --- a/src/msteams/monitor.ts +++ b/src/msteams/monitor.ts @@ -96,9 +96,12 @@ export async function monitorMSTeamsProvider( // Set up the messages endpoint - use configured path and /api/messages as fallback const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages"; const messageHandler = (req: Request, res: Response) => { + type HandlerContext = Parameters<(typeof handler)["run"]>[0]; void adapter - .process(req, res, (context) => handler.run(context)) - .catch((err) => { + .process(req, res, (context: unknown) => + handler.run(context as HandlerContext), + ) + .catch((err: unknown) => { log.error("msteams webhook failed", { error: formatUnknownError(err) }); }); }; diff --git a/src/msteams/polls.ts b/src/msteams/polls.ts index 9a7c9dc6e..db354c820 100644 --- a/src/msteams/polls.ts +++ b/src/msteams/polls.ts @@ -1,10 +1,7 @@ import crypto from "node:crypto"; -import fs from "node:fs"; -import path from "node:path"; - -import lockfile from "proper-lockfile"; import { resolveMSTeamsStorePath } from "./storage.js"; +import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js"; export type MSTeamsPollVote = { pollId: string; @@ -50,17 +47,6 @@ type PollStoreData = { const STORE_FILENAME = "msteams-polls.json"; const MAX_POLLS = 1000; const POLL_TTL_MS = 30 * 24 * 60 * 60 * 1000; -const STORE_LOCK_OPTIONS = { - retries: { - retries: 10, - factor: 2, - minTimeout: 100, - maxTimeout: 10_000, - randomize: true, - }, - stale: 30_000, -} as const; - function isRecord(value: unknown): value is Record { return Boolean(value) && typeof value === "object" && !Array.isArray(value); } @@ -239,73 +225,6 @@ export type MSTeamsPollStoreFsOptions = { storePath?: string; }; -function safeParseJson(raw: string): T | null { - try { - return JSON.parse(raw) as T; - } catch { - return null; - } -} - -async function readJsonFile( - filePath: string, - fallback: T, -): Promise<{ value: T; exists: boolean }> { - try { - const raw = await fs.promises.readFile(filePath, "utf-8"); - const parsed = safeParseJson(raw); - if (parsed == null) return { value: fallback, exists: true }; - return { value: parsed, exists: true }; - } catch (err) { - const code = (err as { code?: string }).code; - if (code === "ENOENT") return { value: fallback, exists: false }; - return { value: fallback, exists: false }; - } -} - -async function writeJsonFile(filePath: string, value: unknown): Promise { - const dir = path.dirname(filePath); - await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 }); - const tmp = path.join( - dir, - `${path.basename(filePath)}.${crypto.randomUUID()}.tmp`, - ); - await fs.promises.writeFile(tmp, `${JSON.stringify(value, null, 2)}\n`, { - encoding: "utf-8", - }); - await fs.promises.chmod(tmp, 0o600); - await fs.promises.rename(tmp, filePath); -} - -async function ensureJsonFile(filePath: string, fallback: unknown) { - try { - await fs.promises.access(filePath); - } catch { - await writeJsonFile(filePath, fallback); - } -} - -async function withFileLock( - filePath: string, - fallback: unknown, - fn: () => Promise, -): Promise { - await ensureJsonFile(filePath, fallback); - let release: (() => Promise) | undefined; - try { - release = await lockfile.lock(filePath, STORE_LOCK_OPTIONS); - return await fn(); - } finally { - if (release) { - try { - await release(); - } catch { - // ignore unlock errors - } - } - } -} - function parseTimestamp(value?: string): number | null { if (!value) return null; const parsed = Date.parse(value); diff --git a/src/msteams/sdk.ts b/src/msteams/sdk.ts index 2d0f7a959..a9ccaaf81 100644 --- a/src/msteams/sdk.ts +++ b/src/msteams/sdk.ts @@ -1,8 +1,9 @@ import type { MSTeamsAdapter } from "./messenger.js"; import type { MSTeamsCredentials } from "./token.js"; -export type MSTeamsSdk = Awaited< - ReturnType +export type MSTeamsSdk = typeof import("@microsoft/agents-hosting"); +export type MSTeamsAuthConfig = ReturnType< + MSTeamsSdk["getAuthConfigWithDefaults"] >; export async function loadMSTeamsSdk(): Promise { @@ -12,7 +13,7 @@ export async function loadMSTeamsSdk(): Promise { export function buildMSTeamsAuthConfig( creds: MSTeamsCredentials, sdk: MSTeamsSdk, -) { +): MSTeamsAuthConfig { return sdk.getAuthConfigWithDefaults({ clientId: creds.appId, clientSecret: creds.appPassword, @@ -21,7 +22,7 @@ export function buildMSTeamsAuthConfig( } export function createMSTeamsAdapter( - authConfig: unknown, + authConfig: MSTeamsAuthConfig, sdk: MSTeamsSdk, ): MSTeamsAdapter { return new sdk.CloudAdapter(authConfig) as unknown as MSTeamsAdapter; diff --git a/src/msteams/send-context.ts b/src/msteams/send-context.ts new file mode 100644 index 000000000..2a0f5e2f8 --- /dev/null +++ b/src/msteams/send-context.ts @@ -0,0 +1,117 @@ +import type { ClawdbotConfig } from "../config/types.js"; +import type { getChildLogger as getChildLoggerFn } from "../logging.js"; +import type { + MSTeamsConversationStore, + StoredConversationReference, +} from "./conversation-store.js"; +import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; +import type { MSTeamsAdapter } from "./messenger.js"; +import { createMSTeamsAdapter, loadMSTeamsSdkWithAuth } from "./sdk.js"; +import { resolveMSTeamsCredentials } from "./token.js"; + +let _log: ReturnType | undefined; +const getLog = async (): Promise> => { + if (_log) return _log; + const { getChildLogger } = await import("../logging.js"); + _log = getChildLogger({ name: "msteams:send" }); + return _log; +}; + +export type MSTeamsProactiveContext = { + appId: string; + conversationId: string; + ref: StoredConversationReference; + adapter: MSTeamsAdapter; + log: Awaited>; +}; + +/** + * Parse the --to argument into a conversation reference lookup key. + * Supported formats: + * - conversation:19:abc@thread.tacv2 → lookup by conversation ID + * - user:aad-object-id → lookup by user AAD object ID + * - 19:abc@thread.tacv2 → direct conversation ID + */ +function parseRecipient(to: string): { + type: "conversation" | "user"; + id: string; +} { + const trimmed = to.trim(); + if (trimmed.startsWith("conversation:")) { + return { type: "conversation", id: trimmed.slice("conversation:".length) }; + } + if (trimmed.startsWith("user:")) { + return { type: "user", id: trimmed.slice("user:".length) }; + } + // Assume it's a conversation ID if it looks like one + if (trimmed.startsWith("19:") || trimmed.includes("@thread")) { + return { type: "conversation", id: trimmed }; + } + // Otherwise treat as user ID + return { type: "user", id: trimmed }; +} + +/** + * Find a stored conversation reference for the given recipient. + */ +async function findConversationReference(recipient: { + type: "conversation" | "user"; + id: string; + store: MSTeamsConversationStore; +}): Promise<{ + conversationId: string; + ref: StoredConversationReference; +} | null> { + if (recipient.type === "conversation") { + const ref = await recipient.store.get(recipient.id); + if (ref) return { conversationId: recipient.id, ref }; + return null; + } + + const found = await recipient.store.findByUserId(recipient.id); + if (!found) return null; + return { conversationId: found.conversationId, ref: found.reference }; +} + +export async function resolveMSTeamsSendContext(params: { + cfg: ClawdbotConfig; + to: string; +}): Promise { + const msteamsCfg = params.cfg.msteams; + + if (!msteamsCfg?.enabled) { + throw new Error("msteams provider is not enabled"); + } + + const creds = resolveMSTeamsCredentials(msteamsCfg); + if (!creds) { + throw new Error("msteams credentials not configured"); + } + + const store = createMSTeamsConversationStoreFs(); + + // Parse recipient and find conversation reference + const recipient = parseRecipient(params.to); + const found = await findConversationReference({ ...recipient, store }); + + if (!found) { + throw new Error( + `No conversation reference found for ${recipient.type}:${recipient.id}. ` + + `The bot must receive a message from this conversation before it can send proactively.`, + ); + } + + const { conversationId, ref } = found; + const log = await getLog(); + + const { sdk, authConfig } = await loadMSTeamsSdkWithAuth(creds); + const adapter = createMSTeamsAdapter(authConfig, sdk); + + return { + appId: creds.appId, + conversationId, + ref, + adapter: adapter as unknown as MSTeamsAdapter, + log, + }; +} diff --git a/src/msteams/send.ts b/src/msteams/send.ts index dde3b3945..ef43ba102 100644 --- a/src/msteams/send.ts +++ b/src/msteams/send.ts @@ -1,9 +1,5 @@ import type { ClawdbotConfig } from "../config/types.js"; -import type { getChildLogger as getChildLoggerFn } from "../logging.js"; -import type { - MSTeamsConversationStore, - StoredConversationReference, -} from "./conversation-store.js"; +import type { StoredConversationReference } from "./conversation-store.js"; import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; import { classifyMSTeamsSendError, @@ -16,16 +12,7 @@ import { sendMSTeamsMessages, } from "./messenger.js"; import { buildMSTeamsPollCard } from "./polls.js"; -import { createMSTeamsAdapter, loadMSTeamsSdkWithAuth } from "./sdk.js"; -import { resolveMSTeamsCredentials } from "./token.js"; - -let _log: ReturnType | undefined; -const getLog = async (): Promise> => { - if (_log) return _log; - const { getChildLogger } = await import("../logging.js"); - _log = getChildLogger({ name: "msteams:send" }); - return _log; -}; +import { resolveMSTeamsSendContext } from "./send-context.js"; export type SendMSTeamsMessageParams = { /** Full config (for credentials) */ @@ -62,54 +49,6 @@ export type SendMSTeamsPollResult = { conversationId: string; }; -/** - * Parse the --to argument into a conversation reference lookup key. - * Supported formats: - * - conversation:19:abc@thread.tacv2 → lookup by conversation ID - * - user:aad-object-id → lookup by user AAD object ID - * - 19:abc@thread.tacv2 → direct conversation ID - */ -function parseRecipient(to: string): { - type: "conversation" | "user"; - id: string; -} { - const trimmed = to.trim(); - if (trimmed.startsWith("conversation:")) { - return { type: "conversation", id: trimmed.slice("conversation:".length) }; - } - if (trimmed.startsWith("user:")) { - return { type: "user", id: trimmed.slice("user:".length) }; - } - // Assume it's a conversation ID if it looks like one - if (trimmed.startsWith("19:") || trimmed.includes("@thread")) { - return { type: "conversation", id: trimmed }; - } - // Otherwise treat as user ID - return { type: "user", id: trimmed }; -} - -/** - * Find a stored conversation reference for the given recipient. - */ -async function findConversationReference(recipient: { - type: "conversation" | "user"; - id: string; - store: MSTeamsConversationStore; -}): Promise<{ - conversationId: string; - ref: StoredConversationReference; -} | null> { - if (recipient.type === "conversation") { - const ref = await recipient.store.get(recipient.id); - if (ref) return { conversationId: recipient.id, ref }; - return null; - } - - const found = await recipient.store.findByUserId(recipient.id); - if (!found) return null; - return { conversationId: found.conversationId, ref: found.reference }; -} - function extractMessageId(response: unknown): string | null { if (!response || typeof response !== "object") return null; if (!("id" in response)) return null; @@ -118,57 +57,6 @@ function extractMessageId(response: unknown): string | null { return id; } -type MSTeamsProactiveContext = { - appId: string; - conversationId: string; - ref: StoredConversationReference; - adapter: MSTeamsAdapter; - log: Awaited>; -}; - -async function resolveMSTeamsSendContext(params: { - cfg: ClawdbotConfig; - to: string; -}): Promise { - const msteamsCfg = params.cfg.msteams; - - if (!msteamsCfg?.enabled) { - throw new Error("msteams provider is not enabled"); - } - - const creds = resolveMSTeamsCredentials(msteamsCfg); - if (!creds) { - throw new Error("msteams credentials not configured"); - } - - const store = createMSTeamsConversationStoreFs(); - - // Parse recipient and find conversation reference - const recipient = parseRecipient(params.to); - const found = await findConversationReference({ ...recipient, store }); - - if (!found) { - throw new Error( - `No conversation reference found for ${recipient.type}:${recipient.id}. ` + - `The bot must receive a message from this conversation before it can send proactively.`, - ); - } - - const { conversationId, ref } = found; - const log = await getLog(); - - const { sdk, authConfig } = await loadMSTeamsSdkWithAuth(creds); - const adapter = createMSTeamsAdapter(authConfig, sdk); - - return { - appId: creds.appId, - conversationId, - ref, - adapter: adapter as unknown as MSTeamsAdapter, - log, - }; -} - async function sendMSTeamsActivity(params: { adapter: MSTeamsAdapter; appId: string; diff --git a/src/msteams/store-fs.ts b/src/msteams/store-fs.ts new file mode 100644 index 000000000..9f3b847d9 --- /dev/null +++ b/src/msteams/store-fs.ts @@ -0,0 +1,86 @@ +import crypto from "node:crypto"; +import fs from "node:fs"; +import path from "node:path"; + +import lockfile from "proper-lockfile"; + +const STORE_LOCK_OPTIONS = { + retries: { + retries: 10, + factor: 2, + minTimeout: 100, + maxTimeout: 10_000, + randomize: true, + }, + stale: 30_000, +} as const; + +function safeParseJson(raw: string): T | null { + try { + return JSON.parse(raw) as T; + } catch { + return null; + } +} + +export async function readJsonFile( + filePath: string, + fallback: T, +): Promise<{ value: T; exists: boolean }> { + try { + const raw = await fs.promises.readFile(filePath, "utf-8"); + const parsed = safeParseJson(raw); + if (parsed == null) return { value: fallback, exists: true }; + return { value: parsed, exists: true }; + } catch (err) { + const code = (err as { code?: string }).code; + if (code === "ENOENT") return { value: fallback, exists: false }; + return { value: fallback, exists: false }; + } +} + +export async function writeJsonFile( + filePath: string, + value: unknown, +): Promise { + const dir = path.dirname(filePath); + await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 }); + const tmp = path.join( + dir, + `${path.basename(filePath)}.${crypto.randomUUID()}.tmp`, + ); + await fs.promises.writeFile(tmp, `${JSON.stringify(value, null, 2)}\n`, { + encoding: "utf-8", + }); + await fs.promises.chmod(tmp, 0o600); + await fs.promises.rename(tmp, filePath); +} + +async function ensureJsonFile(filePath: string, fallback: unknown) { + try { + await fs.promises.access(filePath); + } catch { + await writeJsonFile(filePath, fallback); + } +} + +export async function withFileLock( + filePath: string, + fallback: unknown, + fn: () => Promise, +): Promise { + await ensureJsonFile(filePath, fallback); + let release: (() => Promise) | undefined; + try { + release = await lockfile.lock(filePath, STORE_LOCK_OPTIONS); + return await fn(); + } finally { + if (release) { + try { + await release(); + } catch { + // ignore unlock errors + } + } + } +}