refactor(msteams): consolidate stores and send context

This commit is contained in:
Peter Steinberger
2026-01-09 11:18:33 +01:00
parent 6d223303eb
commit 6b107e9e74
9 changed files with 246 additions and 308 deletions

View File

@@ -1,15 +1,10 @@
import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import lockfile from "proper-lockfile";
import type {
MSTeamsConversationStore,
MSTeamsConversationStoreEntry,
StoredConversationReference,
} from "./conversation-store.js";
import { resolveMSTeamsStorePath } from "./storage.js";
import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js";
type ConversationStoreData = {
version: 1;
@@ -22,83 +17,6 @@ type ConversationStoreData = {
const STORE_FILENAME = "msteams-conversations.json";
const MAX_CONVERSATIONS = 1000;
const CONVERSATION_TTL_MS = 365 * 24 * 60 * 60 * 1000;
const STORE_LOCK_OPTIONS = {
retries: {
retries: 10,
factor: 2,
minTimeout: 100,
maxTimeout: 10_000,
randomize: true,
},
stale: 30_000,
} as const;
function safeParseJson<T>(raw: string): T | null {
try {
return JSON.parse(raw) as T;
} catch {
return null;
}
}
async function readJsonFile<T>(
filePath: string,
fallback: T,
): Promise<{ value: T; exists: boolean }> {
try {
const raw = await fs.promises.readFile(filePath, "utf-8");
const parsed = safeParseJson<T>(raw);
if (parsed == null) return { value: fallback, exists: true };
return { value: parsed, exists: true };
} catch (err) {
const code = (err as { code?: string }).code;
if (code === "ENOENT") return { value: fallback, exists: false };
return { value: fallback, exists: false };
}
}
async function writeJsonFile(filePath: string, value: unknown): Promise<void> {
const dir = path.dirname(filePath);
await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 });
const tmp = path.join(
dir,
`${path.basename(filePath)}.${crypto.randomUUID()}.tmp`,
);
await fs.promises.writeFile(tmp, `${JSON.stringify(value, null, 2)}\n`, {
encoding: "utf-8",
});
await fs.promises.chmod(tmp, 0o600);
await fs.promises.rename(tmp, filePath);
}
async function ensureJsonFile(filePath: string, fallback: unknown) {
try {
await fs.promises.access(filePath);
} catch {
await writeJsonFile(filePath, fallback);
}
}
async function withFileLock<T>(
filePath: string,
fallback: unknown,
fn: () => Promise<T>,
): Promise<T> {
await ensureJsonFile(filePath, fallback);
let release: (() => Promise<void>) | undefined;
try {
release = await lockfile.lock(filePath, STORE_LOCK_OPTIONS);
return await fn();
} finally {
if (release) {
try {
await release();
} catch {
// ignore unlock errors
}
}
}
}
function parseTimestamp(value: string | undefined): number | null {
if (!value) return null;

View File

@@ -25,6 +25,11 @@ export type MSTeamsAdapter = {
reference: MSTeamsConversationReference,
logic: (context: SendContext) => Promise<void>,
) => Promise<void>;
process: (
req: unknown,
res: unknown,
logic: (context: unknown) => Promise<void>,
) => Promise<void>;
};
export type MSTeamsReplyRenderOptions = {

View File

@@ -71,29 +71,30 @@ export function registerMSTeamsHandlers<T extends MSTeamsActivityHandler>(
deps: MSTeamsMessageHandlerDeps,
): T {
const handleTeamsMessage = createMSTeamsMessageHandler(deps);
handler.onMessage(async (context, next) => {
try {
await handleTeamsMessage(context as MSTeamsTurnContext);
} catch (err) {
deps.runtime.error?.(danger(`msteams handler failed: ${String(err)}`));
}
await next();
});
return handler
.onMessage(async (context, next) => {
try {
await handleTeamsMessage(context as MSTeamsTurnContext);
} catch (err) {
deps.runtime.error?.(danger(`msteams handler failed: ${String(err)}`));
handler.onMembersAdded(async (context, next) => {
const membersAdded =
(context as MSTeamsTurnContext).activity?.membersAdded ?? [];
for (const member of membersAdded) {
if (
member.id !== (context as MSTeamsTurnContext).activity?.recipient?.id
) {
deps.log.debug("member added", { member: member.id });
// Don't send welcome message - let the user initiate conversation.
}
await next();
})
.onMembersAdded(async (context, next) => {
const membersAdded =
(context as MSTeamsTurnContext).activity?.membersAdded ?? [];
for (const member of membersAdded) {
if (
member.id !== (context as MSTeamsTurnContext).activity?.recipient?.id
) {
deps.log.debug("member added", { member: member.id });
// Don't send welcome message - let the user initiate conversation.
}
}
await next();
});
}
await next();
});
return handler;
}
function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
@@ -192,8 +193,8 @@ function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
if (dmPolicy === "pairing") {
const request = await upsertProviderPairingRequest({
provider: "msteams",
sender: senderId,
label: senderName,
id: senderId,
meta: { name: senderName },
});
if (request) {
log.info("msteams pairing request created", {

View File

@@ -96,9 +96,12 @@ export async function monitorMSTeamsProvider(
// Set up the messages endpoint - use configured path and /api/messages as fallback
const configuredPath = msteamsCfg.webhook?.path ?? "/api/messages";
const messageHandler = (req: Request, res: Response) => {
type HandlerContext = Parameters<(typeof handler)["run"]>[0];
void adapter
.process(req, res, (context) => handler.run(context))
.catch((err) => {
.process(req, res, (context: unknown) =>
handler.run(context as HandlerContext),
)
.catch((err: unknown) => {
log.error("msteams webhook failed", { error: formatUnknownError(err) });
});
};

View File

@@ -1,10 +1,7 @@
import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import lockfile from "proper-lockfile";
import { resolveMSTeamsStorePath } from "./storage.js";
import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js";
export type MSTeamsPollVote = {
pollId: string;
@@ -50,17 +47,6 @@ type PollStoreData = {
const STORE_FILENAME = "msteams-polls.json";
const MAX_POLLS = 1000;
const POLL_TTL_MS = 30 * 24 * 60 * 60 * 1000;
const STORE_LOCK_OPTIONS = {
retries: {
retries: 10,
factor: 2,
minTimeout: 100,
maxTimeout: 10_000,
randomize: true,
},
stale: 30_000,
} as const;
function isRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
}
@@ -239,73 +225,6 @@ export type MSTeamsPollStoreFsOptions = {
storePath?: string;
};
function safeParseJson<T>(raw: string): T | null {
try {
return JSON.parse(raw) as T;
} catch {
return null;
}
}
async function readJsonFile<T>(
filePath: string,
fallback: T,
): Promise<{ value: T; exists: boolean }> {
try {
const raw = await fs.promises.readFile(filePath, "utf-8");
const parsed = safeParseJson<T>(raw);
if (parsed == null) return { value: fallback, exists: true };
return { value: parsed, exists: true };
} catch (err) {
const code = (err as { code?: string }).code;
if (code === "ENOENT") return { value: fallback, exists: false };
return { value: fallback, exists: false };
}
}
async function writeJsonFile(filePath: string, value: unknown): Promise<void> {
const dir = path.dirname(filePath);
await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 });
const tmp = path.join(
dir,
`${path.basename(filePath)}.${crypto.randomUUID()}.tmp`,
);
await fs.promises.writeFile(tmp, `${JSON.stringify(value, null, 2)}\n`, {
encoding: "utf-8",
});
await fs.promises.chmod(tmp, 0o600);
await fs.promises.rename(tmp, filePath);
}
async function ensureJsonFile(filePath: string, fallback: unknown) {
try {
await fs.promises.access(filePath);
} catch {
await writeJsonFile(filePath, fallback);
}
}
async function withFileLock<T>(
filePath: string,
fallback: unknown,
fn: () => Promise<T>,
): Promise<T> {
await ensureJsonFile(filePath, fallback);
let release: (() => Promise<void>) | undefined;
try {
release = await lockfile.lock(filePath, STORE_LOCK_OPTIONS);
return await fn();
} finally {
if (release) {
try {
await release();
} catch {
// ignore unlock errors
}
}
}
}
function parseTimestamp(value?: string): number | null {
if (!value) return null;
const parsed = Date.parse(value);

View File

@@ -1,8 +1,9 @@
import type { MSTeamsAdapter } from "./messenger.js";
import type { MSTeamsCredentials } from "./token.js";
export type MSTeamsSdk = Awaited<
ReturnType<typeof import("@microsoft/agents-hosting")>
export type MSTeamsSdk = typeof import("@microsoft/agents-hosting");
export type MSTeamsAuthConfig = ReturnType<
MSTeamsSdk["getAuthConfigWithDefaults"]
>;
export async function loadMSTeamsSdk(): Promise<MSTeamsSdk> {
@@ -12,7 +13,7 @@ export async function loadMSTeamsSdk(): Promise<MSTeamsSdk> {
export function buildMSTeamsAuthConfig(
creds: MSTeamsCredentials,
sdk: MSTeamsSdk,
) {
): MSTeamsAuthConfig {
return sdk.getAuthConfigWithDefaults({
clientId: creds.appId,
clientSecret: creds.appPassword,
@@ -21,7 +22,7 @@ export function buildMSTeamsAuthConfig(
}
export function createMSTeamsAdapter(
authConfig: unknown,
authConfig: MSTeamsAuthConfig,
sdk: MSTeamsSdk,
): MSTeamsAdapter {
return new sdk.CloudAdapter(authConfig) as unknown as MSTeamsAdapter;

117
src/msteams/send-context.ts Normal file
View File

@@ -0,0 +1,117 @@
import type { ClawdbotConfig } from "../config/types.js";
import type { getChildLogger as getChildLoggerFn } from "../logging.js";
import type {
MSTeamsConversationStore,
StoredConversationReference,
} from "./conversation-store.js";
import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import type { MSTeamsAdapter } from "./messenger.js";
import { createMSTeamsAdapter, loadMSTeamsSdkWithAuth } from "./sdk.js";
import { resolveMSTeamsCredentials } from "./token.js";
let _log: ReturnType<typeof getChildLoggerFn> | undefined;
const getLog = async (): Promise<ReturnType<typeof getChildLoggerFn>> => {
if (_log) return _log;
const { getChildLogger } = await import("../logging.js");
_log = getChildLogger({ name: "msteams:send" });
return _log;
};
export type MSTeamsProactiveContext = {
appId: string;
conversationId: string;
ref: StoredConversationReference;
adapter: MSTeamsAdapter;
log: Awaited<ReturnType<typeof getLog>>;
};
/**
* Parse the --to argument into a conversation reference lookup key.
* Supported formats:
* - conversation:19:abc@thread.tacv2 → lookup by conversation ID
* - user:aad-object-id → lookup by user AAD object ID
* - 19:abc@thread.tacv2 → direct conversation ID
*/
function parseRecipient(to: string): {
type: "conversation" | "user";
id: string;
} {
const trimmed = to.trim();
if (trimmed.startsWith("conversation:")) {
return { type: "conversation", id: trimmed.slice("conversation:".length) };
}
if (trimmed.startsWith("user:")) {
return { type: "user", id: trimmed.slice("user:".length) };
}
// Assume it's a conversation ID if it looks like one
if (trimmed.startsWith("19:") || trimmed.includes("@thread")) {
return { type: "conversation", id: trimmed };
}
// Otherwise treat as user ID
return { type: "user", id: trimmed };
}
/**
* Find a stored conversation reference for the given recipient.
*/
async function findConversationReference(recipient: {
type: "conversation" | "user";
id: string;
store: MSTeamsConversationStore;
}): Promise<{
conversationId: string;
ref: StoredConversationReference;
} | null> {
if (recipient.type === "conversation") {
const ref = await recipient.store.get(recipient.id);
if (ref) return { conversationId: recipient.id, ref };
return null;
}
const found = await recipient.store.findByUserId(recipient.id);
if (!found) return null;
return { conversationId: found.conversationId, ref: found.reference };
}
export async function resolveMSTeamsSendContext(params: {
cfg: ClawdbotConfig;
to: string;
}): Promise<MSTeamsProactiveContext> {
const msteamsCfg = params.cfg.msteams;
if (!msteamsCfg?.enabled) {
throw new Error("msteams provider is not enabled");
}
const creds = resolveMSTeamsCredentials(msteamsCfg);
if (!creds) {
throw new Error("msteams credentials not configured");
}
const store = createMSTeamsConversationStoreFs();
// Parse recipient and find conversation reference
const recipient = parseRecipient(params.to);
const found = await findConversationReference({ ...recipient, store });
if (!found) {
throw new Error(
`No conversation reference found for ${recipient.type}:${recipient.id}. ` +
`The bot must receive a message from this conversation before it can send proactively.`,
);
}
const { conversationId, ref } = found;
const log = await getLog();
const { sdk, authConfig } = await loadMSTeamsSdkWithAuth(creds);
const adapter = createMSTeamsAdapter(authConfig, sdk);
return {
appId: creds.appId,
conversationId,
ref,
adapter: adapter as unknown as MSTeamsAdapter,
log,
};
}

View File

@@ -1,9 +1,5 @@
import type { ClawdbotConfig } from "../config/types.js";
import type { getChildLogger as getChildLoggerFn } from "../logging.js";
import type {
MSTeamsConversationStore,
StoredConversationReference,
} from "./conversation-store.js";
import type { StoredConversationReference } from "./conversation-store.js";
import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js";
import {
classifyMSTeamsSendError,
@@ -16,16 +12,7 @@ import {
sendMSTeamsMessages,
} from "./messenger.js";
import { buildMSTeamsPollCard } from "./polls.js";
import { createMSTeamsAdapter, loadMSTeamsSdkWithAuth } from "./sdk.js";
import { resolveMSTeamsCredentials } from "./token.js";
let _log: ReturnType<typeof getChildLoggerFn> | undefined;
const getLog = async (): Promise<ReturnType<typeof getChildLoggerFn>> => {
if (_log) return _log;
const { getChildLogger } = await import("../logging.js");
_log = getChildLogger({ name: "msteams:send" });
return _log;
};
import { resolveMSTeamsSendContext } from "./send-context.js";
export type SendMSTeamsMessageParams = {
/** Full config (for credentials) */
@@ -62,54 +49,6 @@ export type SendMSTeamsPollResult = {
conversationId: string;
};
/**
* Parse the --to argument into a conversation reference lookup key.
* Supported formats:
* - conversation:19:abc@thread.tacv2 → lookup by conversation ID
* - user:aad-object-id → lookup by user AAD object ID
* - 19:abc@thread.tacv2 → direct conversation ID
*/
function parseRecipient(to: string): {
type: "conversation" | "user";
id: string;
} {
const trimmed = to.trim();
if (trimmed.startsWith("conversation:")) {
return { type: "conversation", id: trimmed.slice("conversation:".length) };
}
if (trimmed.startsWith("user:")) {
return { type: "user", id: trimmed.slice("user:".length) };
}
// Assume it's a conversation ID if it looks like one
if (trimmed.startsWith("19:") || trimmed.includes("@thread")) {
return { type: "conversation", id: trimmed };
}
// Otherwise treat as user ID
return { type: "user", id: trimmed };
}
/**
* Find a stored conversation reference for the given recipient.
*/
async function findConversationReference(recipient: {
type: "conversation" | "user";
id: string;
store: MSTeamsConversationStore;
}): Promise<{
conversationId: string;
ref: StoredConversationReference;
} | null> {
if (recipient.type === "conversation") {
const ref = await recipient.store.get(recipient.id);
if (ref) return { conversationId: recipient.id, ref };
return null;
}
const found = await recipient.store.findByUserId(recipient.id);
if (!found) return null;
return { conversationId: found.conversationId, ref: found.reference };
}
function extractMessageId(response: unknown): string | null {
if (!response || typeof response !== "object") return null;
if (!("id" in response)) return null;
@@ -118,57 +57,6 @@ function extractMessageId(response: unknown): string | null {
return id;
}
type MSTeamsProactiveContext = {
appId: string;
conversationId: string;
ref: StoredConversationReference;
adapter: MSTeamsAdapter;
log: Awaited<ReturnType<typeof getLog>>;
};
async function resolveMSTeamsSendContext(params: {
cfg: ClawdbotConfig;
to: string;
}): Promise<MSTeamsProactiveContext> {
const msteamsCfg = params.cfg.msteams;
if (!msteamsCfg?.enabled) {
throw new Error("msteams provider is not enabled");
}
const creds = resolveMSTeamsCredentials(msteamsCfg);
if (!creds) {
throw new Error("msteams credentials not configured");
}
const store = createMSTeamsConversationStoreFs();
// Parse recipient and find conversation reference
const recipient = parseRecipient(params.to);
const found = await findConversationReference({ ...recipient, store });
if (!found) {
throw new Error(
`No conversation reference found for ${recipient.type}:${recipient.id}. ` +
`The bot must receive a message from this conversation before it can send proactively.`,
);
}
const { conversationId, ref } = found;
const log = await getLog();
const { sdk, authConfig } = await loadMSTeamsSdkWithAuth(creds);
const adapter = createMSTeamsAdapter(authConfig, sdk);
return {
appId: creds.appId,
conversationId,
ref,
adapter: adapter as unknown as MSTeamsAdapter,
log,
};
}
async function sendMSTeamsActivity(params: {
adapter: MSTeamsAdapter;
appId: string;

86
src/msteams/store-fs.ts Normal file
View File

@@ -0,0 +1,86 @@
import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import lockfile from "proper-lockfile";
const STORE_LOCK_OPTIONS = {
retries: {
retries: 10,
factor: 2,
minTimeout: 100,
maxTimeout: 10_000,
randomize: true,
},
stale: 30_000,
} as const;
function safeParseJson<T>(raw: string): T | null {
try {
return JSON.parse(raw) as T;
} catch {
return null;
}
}
export async function readJsonFile<T>(
filePath: string,
fallback: T,
): Promise<{ value: T; exists: boolean }> {
try {
const raw = await fs.promises.readFile(filePath, "utf-8");
const parsed = safeParseJson<T>(raw);
if (parsed == null) return { value: fallback, exists: true };
return { value: parsed, exists: true };
} catch (err) {
const code = (err as { code?: string }).code;
if (code === "ENOENT") return { value: fallback, exists: false };
return { value: fallback, exists: false };
}
}
export async function writeJsonFile(
filePath: string,
value: unknown,
): Promise<void> {
const dir = path.dirname(filePath);
await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 });
const tmp = path.join(
dir,
`${path.basename(filePath)}.${crypto.randomUUID()}.tmp`,
);
await fs.promises.writeFile(tmp, `${JSON.stringify(value, null, 2)}\n`, {
encoding: "utf-8",
});
await fs.promises.chmod(tmp, 0o600);
await fs.promises.rename(tmp, filePath);
}
async function ensureJsonFile(filePath: string, fallback: unknown) {
try {
await fs.promises.access(filePath);
} catch {
await writeJsonFile(filePath, fallback);
}
}
export async function withFileLock<T>(
filePath: string,
fallback: unknown,
fn: () => Promise<T>,
): Promise<T> {
await ensureJsonFile(filePath, fallback);
let release: (() => Promise<void>) | undefined;
try {
release = await lockfile.lock(filePath, STORE_LOCK_OPTIONS);
return await fn();
} finally {
if (release) {
try {
await release();
} catch {
// ignore unlock errors
}
}
}
}