import fs from "node:fs/promises"; import { resolveEffectiveMessagesConfig, resolveHumanDelayConfig, resolveIdentityName, } from "../../agents/identity.js"; import { extractShortModelName, type ResponsePrefixContext, } from "../../auto-reply/reply/response-prefix-template.js"; import { resolveTextChunkLimit } from "../../auto-reply/chunk.js"; import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../../auto-reply/envelope.js"; import { createInboundDebouncer, resolveInboundDebounceMs, } from "../../auto-reply/inbound-debounce.js"; import { dispatchReplyFromConfig } from "../../auto-reply/reply/dispatch-from-config.js"; import { buildPendingHistoryContextFromMap, clearHistoryEntries, DEFAULT_GROUP_HISTORY_LIMIT, recordPendingHistoryEntry, type HistoryEntry, } from "../../auto-reply/reply/history.js"; import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; import { loadConfig } from "../../config/config.js"; import { resolveChannelGroupPolicy, resolveChannelGroupRequireMention, } from "../../config/group-policy.js"; import { resolveStorePath, updateLastRoute } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { waitForTransportReady } from "../../infra/transport-ready.js"; import { mediaKindFromMime } from "../../media/constants.js"; import { buildPairingReply } from "../../pairing/pairing-messages.js"; import { readChannelAllowFromStore, upsertChannelPairingRequest, } from "../../pairing/pairing-store.js"; import { resolveAgentRoute } from "../../routing/resolve-route.js"; import { truncateUtf16Safe } from "../../utils.js"; import { resolveIMessageAccount } from "../accounts.js"; import { createIMessageRpcClient } from "../client.js"; import { probeIMessage } from "../probe.js"; import { sendMessageIMessage } from "../send.js"; import { formatIMessageChatTarget, isAllowedIMessageSender, normalizeIMessageHandle, } from "../targets.js"; import { deliverReplies } from "./deliver.js"; import { normalizeAllowList, resolveRuntime } from "./runtime.js"; import type { IMessagePayload, MonitorIMessageOpts } from "./types.js"; /** * Try to detect remote host from an SSH wrapper script like: * exec ssh -T clawdbot@192.168.64.3 /opt/homebrew/bin/imsg "$@" * exec ssh -T mac-mini imsg "$@" * Returns the user@host or host portion if found, undefined otherwise. */ async function detectRemoteHostFromCliPath(cliPath: string): Promise { try { // Expand ~ to home directory const expanded = cliPath.startsWith("~") ? cliPath.replace(/^~/, process.env.HOME ?? "") : cliPath; const content = await fs.readFile(expanded, "utf8"); // Match user@host pattern first (e.g., clawdbot@192.168.64.3) const userHostMatch = content.match(/\bssh\b[^\n]*?\s+([a-zA-Z0-9._-]+@[a-zA-Z0-9._-]+)/); if (userHostMatch) return userHostMatch[1]; // Fallback: match host-only before imsg command (e.g., ssh -T mac-mini imsg) const hostOnlyMatch = content.match(/\bssh\b[^\n]*?\s+([a-zA-Z][a-zA-Z0-9._-]*)\s+\S*\bimsg\b/); return hostOnlyMatch?.[1]; } catch { return undefined; } } export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): Promise { const runtime = resolveRuntime(opts); const cfg = opts.config ?? loadConfig(); const accountInfo = resolveIMessageAccount({ cfg, accountId: opts.accountId, }); const imessageCfg = accountInfo.config; const historyLimit = Math.max( 0, imessageCfg.historyLimit ?? cfg.messages?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT, ); const groupHistories = new Map(); const textLimit = resolveTextChunkLimit(cfg, "imessage", accountInfo.accountId); const allowFrom = normalizeAllowList(opts.allowFrom ?? imessageCfg.allowFrom); const groupAllowFrom = normalizeAllowList( opts.groupAllowFrom ?? imessageCfg.groupAllowFrom ?? (imessageCfg.allowFrom && imessageCfg.allowFrom.length > 0 ? imessageCfg.allowFrom : []), ); const groupPolicy = imessageCfg.groupPolicy ?? "open"; const dmPolicy = imessageCfg.dmPolicy ?? "pairing"; const includeAttachments = opts.includeAttachments ?? imessageCfg.includeAttachments ?? false; const mediaMaxBytes = (opts.mediaMaxMb ?? imessageCfg.mediaMaxMb ?? 16) * 1024 * 1024; const cliPath = opts.cliPath ?? imessageCfg.cliPath ?? "imsg"; const dbPath = opts.dbPath ?? imessageCfg.dbPath; // Resolve remoteHost: explicit config, or auto-detect from SSH wrapper script let remoteHost = imessageCfg.remoteHost; if (!remoteHost && cliPath && cliPath !== "imsg") { remoteHost = await detectRemoteHostFromCliPath(cliPath); if (remoteHost) { logVerbose(`imessage: detected remoteHost=${remoteHost} from cliPath`); } } const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "imessage" }); const inboundDebouncer = createInboundDebouncer<{ message: IMessagePayload }>({ debounceMs: inboundDebounceMs, buildKey: (entry) => { const sender = entry.message.sender?.trim(); if (!sender) return null; const conversationId = entry.message.chat_id != null ? `chat:${entry.message.chat_id}` : (entry.message.chat_guid ?? entry.message.chat_identifier ?? "unknown"); return `imessage:${accountInfo.accountId}:${conversationId}:${sender}`; }, shouldDebounce: (entry) => { const text = entry.message.text?.trim() ?? ""; if (!text) return false; if (entry.message.attachments && entry.message.attachments.length > 0) return false; return !hasControlCommand(text, cfg); }, onFlush: async (entries) => { const last = entries.at(-1); if (!last) return; if (entries.length === 1) { await handleMessageNow(last.message); return; } const combinedText = entries .map((entry) => entry.message.text ?? "") .filter(Boolean) .join("\n"); const syntheticMessage: IMessagePayload = { ...last.message, text: combinedText, attachments: null, }; await handleMessageNow(syntheticMessage); }, onError: (err) => { runtime.error?.(`imessage debounce flush failed: ${String(err)}`); }, }); async function handleMessageNow(message: IMessagePayload) { const senderRaw = message.sender ?? ""; const sender = senderRaw.trim(); if (!sender) return; if (message.is_from_me) return; const chatId = message.chat_id ?? undefined; const chatGuid = message.chat_guid ?? undefined; const chatIdentifier = message.chat_identifier ?? undefined; const groupIdCandidate = chatId !== undefined ? String(chatId) : undefined; const groupListPolicy = groupIdCandidate ? resolveChannelGroupPolicy({ cfg, channel: "imessage", accountId: accountInfo.accountId, groupId: groupIdCandidate, }) : { allowlistEnabled: false, allowed: true, groupConfig: undefined, defaultConfig: undefined, }; // Some iMessage threads can have multiple participants but still report // is_group=false depending on how Messages stores the identifier. // If the owner explicitly configures a chat_id under imessage.groups, treat // that thread as a "group" for permission gating and session isolation. const treatAsGroupByConfig = Boolean( groupIdCandidate && groupListPolicy.allowlistEnabled && groupListPolicy.groupConfig, ); const isGroup = Boolean(message.is_group) || treatAsGroupByConfig; if (isGroup && !chatId) return; const groupId = isGroup ? groupIdCandidate : undefined; const storeAllowFrom = await readChannelAllowFromStore("imessage").catch(() => []); const effectiveDmAllowFrom = Array.from(new Set([...allowFrom, ...storeAllowFrom])) .map((v) => String(v).trim()) .filter(Boolean); const effectiveGroupAllowFrom = Array.from(new Set([...groupAllowFrom, ...storeAllowFrom])) .map((v) => String(v).trim()) .filter(Boolean); if (isGroup) { if (groupPolicy === "disabled") { logVerbose("Blocked iMessage group message (groupPolicy: disabled)"); return; } if (groupPolicy === "allowlist") { if (effectiveGroupAllowFrom.length === 0) { logVerbose("Blocked iMessage group message (groupPolicy: allowlist, no groupAllowFrom)"); return; } const allowed = isAllowedIMessageSender({ allowFrom: effectiveGroupAllowFrom, sender, chatId: chatId ?? undefined, chatGuid, chatIdentifier, }); if (!allowed) { logVerbose(`Blocked iMessage sender ${sender} (not in groupAllowFrom)`); return; } } if (groupListPolicy.allowlistEnabled && !groupListPolicy.allowed) { logVerbose(`imessage: skipping group message (${groupId ?? "unknown"}) not in allowlist`); return; } } const dmHasWildcard = effectiveDmAllowFrom.includes("*"); const dmAuthorized = dmPolicy === "open" ? true : dmHasWildcard || (effectiveDmAllowFrom.length > 0 && isAllowedIMessageSender({ allowFrom: effectiveDmAllowFrom, sender, chatId: chatId ?? undefined, chatGuid, chatIdentifier, })); if (!isGroup) { if (dmPolicy === "disabled") return; if (!dmAuthorized) { if (dmPolicy === "pairing") { const senderId = normalizeIMessageHandle(sender); const { code, created } = await upsertChannelPairingRequest({ channel: "imessage", id: senderId, meta: { sender: senderId, chatId: chatId ? String(chatId) : undefined, }, }); if (created) { logVerbose(`imessage pairing request sender=${senderId}`); try { await sendMessageIMessage( sender, buildPairingReply({ channel: "imessage", idLine: `Your iMessage sender id: ${senderId}`, code, }), { client, maxBytes: mediaMaxBytes, accountId: accountInfo.accountId, ...(chatId ? { chatId } : {}), }, ); } catch (err) { logVerbose(`imessage pairing reply failed for ${senderId}: ${String(err)}`); } } } else { logVerbose(`Blocked iMessage sender ${sender} (dmPolicy=${dmPolicy})`); } return; } } const route = resolveAgentRoute({ cfg, channel: "imessage", accountId: accountInfo.accountId, peer: { kind: isGroup ? "group" : "dm", id: isGroup ? String(chatId ?? "unknown") : normalizeIMessageHandle(sender), }, }); const mentionRegexes = buildMentionRegexes(cfg, route.agentId); const messageText = (message.text ?? "").trim(); const attachments = includeAttachments ? (message.attachments ?? []) : []; const firstAttachment = attachments?.find((entry) => entry?.original_path && !entry?.missing); const mediaPath = firstAttachment?.original_path ?? undefined; const mediaType = firstAttachment?.mime_type ?? undefined; const kind = mediaKindFromMime(mediaType ?? undefined); const placeholder = kind ? `` : attachments?.length ? "" : ""; const bodyText = messageText || placeholder; if (!bodyText) return; const createdAt = message.created_at ? Date.parse(message.created_at) : undefined; const historyKey = isGroup ? String(chatId ?? chatGuid ?? chatIdentifier ?? "unknown") : undefined; const mentioned = isGroup ? matchesMentionPatterns(messageText, mentionRegexes) : true; const requireMention = resolveChannelGroupRequireMention({ cfg, channel: "imessage", accountId: accountInfo.accountId, groupId, requireMentionOverride: opts.requireMention, overrideOrder: "before-config", }); const canDetectMention = mentionRegexes.length > 0; const commandAuthorized = isGroup ? effectiveGroupAllowFrom.length > 0 ? isAllowedIMessageSender({ allowFrom: effectiveGroupAllowFrom, sender, chatId: chatId ?? undefined, chatGuid, chatIdentifier, }) : true : dmAuthorized; const shouldBypassMention = isGroup && requireMention && !mentioned && commandAuthorized && hasControlCommand(messageText); const effectiveWasMentioned = mentioned || shouldBypassMention; if (isGroup && requireMention && canDetectMention && !mentioned && !shouldBypassMention) { logVerbose(`imessage: skipping group message (no mention)`); if (historyKey && historyLimit > 0) { recordPendingHistoryEntry({ historyMap: groupHistories, historyKey, limit: historyLimit, entry: { sender: normalizeIMessageHandle(sender), body: bodyText, timestamp: createdAt, messageId: message.id ? String(message.id) : undefined, }, }); } return; } const chatTarget = formatIMessageChatTarget(chatId); const fromLabel = isGroup ? `${message.chat_name || "iMessage Group"} id:${chatId ?? "unknown"}` : `${normalizeIMessageHandle(sender)} id:${sender}`; const body = formatAgentEnvelope({ channel: "iMessage", from: fromLabel, timestamp: createdAt, body: bodyText, }); let combinedBody = body; if (isGroup && historyKey && historyLimit > 0) { combinedBody = buildPendingHistoryContextFromMap({ historyMap: groupHistories, historyKey, limit: historyLimit, currentMessage: combinedBody, formatEntry: (entry) => formatAgentEnvelope({ channel: "iMessage", from: fromLabel, timestamp: entry.timestamp, body: `${entry.sender}: ${entry.body}${ entry.messageId ? ` [id:${entry.messageId}]` : "" }`, }), }); } const imessageTo = (isGroup ? chatTarget : undefined) || `imessage:${sender}`; const ctxPayload = { Body: combinedBody, RawBody: bodyText, CommandBody: bodyText, From: isGroup ? `group:${chatId}` : `imessage:${sender}`, To: imessageTo, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", GroupSubject: isGroup ? (message.chat_name ?? undefined) : undefined, GroupMembers: isGroup ? (message.participants ?? []).filter(Boolean).join(", ") : undefined, SenderName: sender, SenderId: sender, Provider: "imessage", Surface: "imessage", MessageSid: message.id ? String(message.id) : undefined, Timestamp: createdAt, MediaPath: mediaPath, MediaType: mediaType, MediaUrl: mediaPath, MediaRemoteHost: remoteHost, WasMentioned: effectiveWasMentioned, CommandAuthorized: commandAuthorized, // Originating channel for reply routing. OriginatingChannel: "imessage" as const, OriginatingTo: imessageTo, }; if (!isGroup) { const sessionCfg = cfg.session; const storePath = resolveStorePath(sessionCfg?.store, { agentId: route.agentId, }); const to = (isGroup ? chatTarget : undefined) || sender; if (to) { await updateLastRoute({ storePath, sessionKey: route.mainSessionKey, channel: "imessage", to, accountId: route.accountId, }); } } if (shouldLogVerbose()) { const preview = truncateUtf16Safe(body, 200).replace(/\n/g, "\\n"); logVerbose( `imessage inbound: chatId=${chatId ?? "unknown"} from=${ctxPayload.From} len=${body.length} preview="${preview}"`, ); } // Create mutable context for response prefix template interpolation let prefixContext: ResponsePrefixContext = { identityName: resolveIdentityName(cfg, route.agentId), }; const dispatcher = createReplyDispatcher({ responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId).responsePrefix, responsePrefixContextProvider: () => prefixContext, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), deliver: async (payload) => { await deliverReplies({ replies: [payload], target: ctxPayload.To, client, accountId: accountInfo.accountId, runtime, maxBytes: mediaMaxBytes, textLimit, }); }, onError: (err, info) => { runtime.error?.(danger(`imessage ${info.kind} reply failed: ${String(err)}`)); }, }); const { queuedFinal } = await dispatchReplyFromConfig({ ctx: ctxPayload, cfg, dispatcher, replyOptions: { disableBlockStreaming: typeof accountInfo.config.blockStreaming === "boolean" ? !accountInfo.config.blockStreaming : undefined, onModelSelected: (ctx) => { // Mutate the object directly instead of reassigning to ensure the closure sees updates prefixContext.provider = ctx.provider; prefixContext.model = extractShortModelName(ctx.model); prefixContext.modelFull = `${ctx.provider}/${ctx.model}`; prefixContext.thinkingLevel = ctx.thinkLevel ?? "off"; }, }, }); if (!queuedFinal) { if (isGroup && historyKey && historyLimit > 0) { clearHistoryEntries({ historyMap: groupHistories, historyKey }); } return; } if (isGroup && historyKey && historyLimit > 0) { clearHistoryEntries({ historyMap: groupHistories, historyKey }); } } const handleMessage = async (raw: unknown) => { const params = raw as { message?: IMessagePayload | null }; const message = params?.message ?? null; if (!message) return; await inboundDebouncer.enqueue({ message }); }; await waitForTransportReady({ label: "imsg rpc", timeoutMs: 30_000, logAfterMs: 10_000, logIntervalMs: 10_000, pollIntervalMs: 500, abortSignal: opts.abortSignal, runtime, check: async () => { const probe = await probeIMessage(2000, { cliPath, dbPath, runtime }); if (probe.ok) return { ok: true }; if (probe.fatal) { throw new Error(probe.error ?? "imsg rpc unavailable"); } return { ok: false, error: probe.error ?? "unreachable" }; }, }); if (opts.abortSignal?.aborted) return; const client = await createIMessageRpcClient({ cliPath, dbPath, runtime, onNotification: (msg) => { if (msg.method === "message") { void handleMessage(msg.params).catch((err) => { runtime.error?.(`imessage: handler failed: ${String(err)}`); }); } else if (msg.method === "error") { runtime.error?.(`imessage: watch error ${JSON.stringify(msg.params)}`); } }, }); let subscriptionId: number | null = null; const abort = opts.abortSignal; const onAbort = () => { if (subscriptionId) { void client .request("watch.unsubscribe", { subscription: subscriptionId, }) .catch(() => { // Ignore disconnect errors during shutdown. }); } void client.stop().catch(() => { // Ignore disconnect errors during shutdown. }); }; abort?.addEventListener("abort", onAbort, { once: true }); try { const result = await client.request<{ subscription?: number }>("watch.subscribe", { attachments: includeAttachments, }); subscriptionId = result?.subscription ?? null; await client.waitForClose(); } catch (err) { if (abort?.aborted) return; runtime.error?.(danger(`imessage: monitor failed: ${String(err)}`)); throw err; } finally { abort?.removeEventListener("abort", onAbort); await client.stop(); } }