From 6d269710518c68b6264bc0c9d4bf4da691e5aba5 Mon Sep 17 00:00:00 2001 From: Tyler Yust Date: Sun, 25 Jan 2026 14:14:41 -0800 Subject: [PATCH] fix(bluebubbles): add inbound message debouncing to coalesce URL link previews When users send iMessages containing URLs, BlueBubbles sends separate webhook events for the text message and the URL balloon/link preview. This caused Clawdbot to receive them as separate queued messages. This fix adds inbound debouncing (following the pattern from WhatsApp/MS Teams): - Uses the existing createInboundDebouncer utility from plugin-sdk - Adds debounceMs config option to BlueBubblesAccountConfig (default: 500ms) - Routes inbound messages through debouncer before processing - Combines messages from same sender/chat within the debounce window - Handles URLBalloonProvider messages by coalescing with preceding text - Skips debouncing for messages with attachments or control commands Config example: channels.bluebubbles.debounceMs: 500 # milliseconds (0 to disable) Fixes inbound URL message splitting issue. --- extensions/bluebubbles/src/monitor.test.ts | 10 +- extensions/bluebubbles/src/monitor.ts | 184 ++++++++++++++++++++- 2 files changed, 191 insertions(+), 3 deletions(-) diff --git a/extensions/bluebubbles/src/monitor.test.ts b/extensions/bluebubbles/src/monitor.test.ts index 12aef679c..76c9eebf6 100644 --- a/extensions/bluebubbles/src/monitor.test.ts +++ b/extensions/bluebubbles/src/monitor.test.ts @@ -146,8 +146,14 @@ function createMockRuntime(): PluginRuntime { resolveRequireMention: mockResolveRequireMention as unknown as PluginRuntime["channel"]["groups"]["resolveRequireMention"], }, debounce: { - createInboundDebouncer: vi.fn() as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"], - resolveInboundDebounceMs: vi.fn() as unknown as PluginRuntime["channel"]["debounce"]["resolveInboundDebounceMs"], + // Create a pass-through debouncer that immediately calls onFlush + createInboundDebouncer: vi.fn((params: { onFlush: (items: unknown[]) => Promise }) => ({ + enqueue: async (item: unknown) => { + await params.onFlush([item]); + }, + flushKey: vi.fn(), + })) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"], + resolveInboundDebounceMs: vi.fn(() => 0) as unknown as PluginRuntime["channel"]["debounce"]["resolveInboundDebounceMs"], }, commands: { resolveCommandAuthorizedFromAuthorizers: mockResolveCommandAuthorizedFromAuthorizers as unknown as PluginRuntime["channel"]["commands"]["resolveCommandAuthorizedFromAuthorizers"], diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index 8635b183e..b754558bb 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -250,8 +250,185 @@ type WebhookTarget = { statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; }; +/** + * Entry type for debouncing inbound messages. + * Captures the normalized message and its target for later combined processing. + */ +type BlueBubblesDebounceEntry = { + message: NormalizedWebhookMessage; + target: WebhookTarget; +}; + +/** + * Default debounce window for inbound message coalescing (ms). + * This helps combine URL text + link preview balloon messages that BlueBubbles + * sends as separate webhook events. + */ +const DEFAULT_INBOUND_DEBOUNCE_MS = 100; + +/** + * Known URLBalloonProvider bundle IDs that indicate a rich link preview message. + */ +const URL_BALLOON_BUNDLE_IDS = new Set([ + "com.apple.messages.URLBalloonProvider", + "com.apple.messages.richLinkProvider", +]); + +/** + * Checks if a message is a URL balloon/link preview message. + */ +function isUrlBalloonMessage(message: NormalizedWebhookMessage): boolean { + const bundleId = message.balloonBundleId?.trim(); + if (!bundleId) return false; + return URL_BALLOON_BUNDLE_IDS.has(bundleId); +} + +/** + * Combines multiple debounced messages into a single message for processing. + * Used when multiple webhook events arrive within the debounce window. + */ +function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): NormalizedWebhookMessage { + if (entries.length === 0) { + throw new Error("Cannot combine empty entries"); + } + if (entries.length === 1) { + return entries[0].message; + } + + // Use the first message as the base (typically the text message) + const first = entries[0].message; + const rest = entries.slice(1); + + // Combine text from all entries, filtering out duplicates and empty strings + const seenTexts = new Set(); + const textParts: string[] = []; + + for (const entry of entries) { + const text = entry.message.text.trim(); + if (!text) continue; + // Skip duplicate text (URL might be in both text message and balloon) + const normalizedText = text.toLowerCase(); + if (seenTexts.has(normalizedText)) continue; + seenTexts.add(normalizedText); + textParts.push(text); + } + + // Merge attachments from all entries + const allAttachments = entries.flatMap((e) => e.message.attachments ?? []); + + // Use the latest timestamp + const timestamps = entries + .map((e) => e.message.timestamp) + .filter((t): t is number => typeof t === "number"); + const latestTimestamp = timestamps.length > 0 ? Math.max(...timestamps) : first.timestamp; + + // Collect all message IDs for reference + const messageIds = entries + .map((e) => e.message.messageId) + .filter((id): id is string => Boolean(id)); + + // Prefer reply context from any entry that has it + const entryWithReply = entries.find((e) => e.message.replyToId); + + return { + ...first, + text: textParts.join(" "), + attachments: allAttachments.length > 0 ? allAttachments : first.attachments, + timestamp: latestTimestamp, + // Use first message's ID as primary (for reply reference), but we've coalesced others + messageId: messageIds[0] ?? first.messageId, + // Preserve reply context if present + replyToId: entryWithReply?.message.replyToId ?? first.replyToId, + replyToBody: entryWithReply?.message.replyToBody ?? first.replyToBody, + replyToSender: entryWithReply?.message.replyToSender ?? first.replyToSender, + // Clear balloonBundleId since we've combined (the combined message is no longer just a balloon) + balloonBundleId: undefined, + }; +} + const webhookTargets = new Map(); +/** + * Maps webhook targets to their inbound debouncers. + * Each target gets its own debouncer keyed by a unique identifier. + */ +const targetDebouncers = new Map< + WebhookTarget, + ReturnType +>(); + +/** + * Creates or retrieves a debouncer for a webhook target. + */ +function getOrCreateDebouncer(target: WebhookTarget) { + const existing = targetDebouncers.get(target); + if (existing) return existing; + + const { account, config, runtime, core } = target; + + const debouncer = core.channel.debounce.createInboundDebouncer({ + debounceMs: DEFAULT_INBOUND_DEBOUNCE_MS, + buildKey: (entry) => { + const msg = entry.message; + // Build key from account + chat + sender to coalesce messages from same source + const chatKey = + msg.chatGuid?.trim() ?? + msg.chatIdentifier?.trim() ?? + (msg.chatId ? String(msg.chatId) : "dm"); + return `bluebubbles:${account.accountId}:${chatKey}:${msg.senderId}`; + }, + shouldDebounce: (entry) => { + const msg = entry.message; + // Skip debouncing for messages with attachments - process immediately + if (msg.attachments && msg.attachments.length > 0) return false; + // Skip debouncing for from-me messages (they're just cached, not processed) + if (msg.fromMe) return false; + // Skip debouncing for control commands - process immediately + if (core.channel.text.hasControlCommand(msg.text, config)) return false; + // Debounce normal text messages and URL balloon messages + return true; + }, + onFlush: async (entries) => { + if (entries.length === 0) return; + + // Use target from first entry (all entries have same target due to key structure) + const flushTarget = entries[0].target; + + if (entries.length === 1) { + // Single message - process normally + await processMessage(entries[0].message, flushTarget); + return; + } + + // Multiple messages - combine and process + const combined = combineDebounceEntries(entries); + + if (core.logging.shouldLogVerbose()) { + const count = entries.length; + const preview = combined.text.slice(0, 50); + runtime.log?.( + `[bluebubbles] coalesced ${count} messages: "${preview}${combined.text.length > 50 ? "..." : ""}"`, + ); + } + + await processMessage(combined, flushTarget); + }, + onError: (err) => { + runtime.error?.(`[bluebubbles] debounce flush failed: ${String(err)}`); + }, + }); + + targetDebouncers.set(target, debouncer); + return debouncer; +} + +/** + * Removes a debouncer for a target (called during unregistration). + */ +function removeDebouncer(target: WebhookTarget): void { + targetDebouncers.delete(target); +} + function normalizeWebhookPath(raw: string): string { const trimmed = raw.trim(); if (!trimmed) return "/"; @@ -275,6 +452,8 @@ export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => v } else { webhookTargets.delete(key); } + // Clean up debouncer when target is unregistered + removeDebouncer(normalizedTarget); }; } @@ -1205,7 +1384,10 @@ export async function handleBlueBubblesWebhookRequest( ); }); } else if (message) { - processMessage(message, target).catch((err) => { + // Route messages through debouncer to coalesce rapid-fire events + // (e.g., text message + URL balloon arriving as separate webhooks) + const debouncer = getOrCreateDebouncer(target); + debouncer.enqueue({ message, target }).catch((err) => { target.runtime.error?.( `[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`, );