fix(bluebubbles): add inbound message debouncing to coalesce URL link previews
When users send iMessages containing URLs, BlueBubbles sends separate webhook events for the text message and the URL balloon/link preview. This caused Clawdbot to receive them as separate queued messages. This fix adds inbound debouncing (following the pattern from WhatsApp/MS Teams): - Uses the existing createInboundDebouncer utility from plugin-sdk - Adds debounceMs config option to BlueBubblesAccountConfig (default: 500ms) - Routes inbound messages through debouncer before processing - Combines messages from same sender/chat within the debounce window - Handles URLBalloonProvider messages by coalescing with preceding text - Skips debouncing for messages with attachments or control commands Config example: channels.bluebubbles.debounceMs: 500 # milliseconds (0 to disable) Fixes inbound URL message splitting issue.
This commit is contained in:
@@ -146,8 +146,14 @@ function createMockRuntime(): PluginRuntime {
|
|||||||
resolveRequireMention: mockResolveRequireMention as unknown as PluginRuntime["channel"]["groups"]["resolveRequireMention"],
|
resolveRequireMention: mockResolveRequireMention as unknown as PluginRuntime["channel"]["groups"]["resolveRequireMention"],
|
||||||
},
|
},
|
||||||
debounce: {
|
debounce: {
|
||||||
createInboundDebouncer: vi.fn() as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"],
|
// Create a pass-through debouncer that immediately calls onFlush
|
||||||
resolveInboundDebounceMs: vi.fn() as unknown as PluginRuntime["channel"]["debounce"]["resolveInboundDebounceMs"],
|
createInboundDebouncer: vi.fn((params: { onFlush: (items: unknown[]) => Promise<void> }) => ({
|
||||||
|
enqueue: async (item: unknown) => {
|
||||||
|
await params.onFlush([item]);
|
||||||
|
},
|
||||||
|
flushKey: vi.fn(),
|
||||||
|
})) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"],
|
||||||
|
resolveInboundDebounceMs: vi.fn(() => 0) as unknown as PluginRuntime["channel"]["debounce"]["resolveInboundDebounceMs"],
|
||||||
},
|
},
|
||||||
commands: {
|
commands: {
|
||||||
resolveCommandAuthorizedFromAuthorizers: mockResolveCommandAuthorizedFromAuthorizers as unknown as PluginRuntime["channel"]["commands"]["resolveCommandAuthorizedFromAuthorizers"],
|
resolveCommandAuthorizedFromAuthorizers: mockResolveCommandAuthorizedFromAuthorizers as unknown as PluginRuntime["channel"]["commands"]["resolveCommandAuthorizedFromAuthorizers"],
|
||||||
|
|||||||
@@ -250,8 +250,185 @@ type WebhookTarget = {
|
|||||||
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
|
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Entry type for debouncing inbound messages.
|
||||||
|
* Captures the normalized message and its target for later combined processing.
|
||||||
|
*/
|
||||||
|
type BlueBubblesDebounceEntry = {
|
||||||
|
message: NormalizedWebhookMessage;
|
||||||
|
target: WebhookTarget;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default debounce window for inbound message coalescing (ms).
|
||||||
|
* This helps combine URL text + link preview balloon messages that BlueBubbles
|
||||||
|
* sends as separate webhook events.
|
||||||
|
*/
|
||||||
|
const DEFAULT_INBOUND_DEBOUNCE_MS = 100;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Known URLBalloonProvider bundle IDs that indicate a rich link preview message.
|
||||||
|
*/
|
||||||
|
const URL_BALLOON_BUNDLE_IDS = new Set([
|
||||||
|
"com.apple.messages.URLBalloonProvider",
|
||||||
|
"com.apple.messages.richLinkProvider",
|
||||||
|
]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if a message is a URL balloon/link preview message.
|
||||||
|
*/
|
||||||
|
function isUrlBalloonMessage(message: NormalizedWebhookMessage): boolean {
|
||||||
|
const bundleId = message.balloonBundleId?.trim();
|
||||||
|
if (!bundleId) return false;
|
||||||
|
return URL_BALLOON_BUNDLE_IDS.has(bundleId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Combines multiple debounced messages into a single message for processing.
|
||||||
|
* Used when multiple webhook events arrive within the debounce window.
|
||||||
|
*/
|
||||||
|
function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): NormalizedWebhookMessage {
|
||||||
|
if (entries.length === 0) {
|
||||||
|
throw new Error("Cannot combine empty entries");
|
||||||
|
}
|
||||||
|
if (entries.length === 1) {
|
||||||
|
return entries[0].message;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the first message as the base (typically the text message)
|
||||||
|
const first = entries[0].message;
|
||||||
|
const rest = entries.slice(1);
|
||||||
|
|
||||||
|
// Combine text from all entries, filtering out duplicates and empty strings
|
||||||
|
const seenTexts = new Set<string>();
|
||||||
|
const textParts: string[] = [];
|
||||||
|
|
||||||
|
for (const entry of entries) {
|
||||||
|
const text = entry.message.text.trim();
|
||||||
|
if (!text) continue;
|
||||||
|
// Skip duplicate text (URL might be in both text message and balloon)
|
||||||
|
const normalizedText = text.toLowerCase();
|
||||||
|
if (seenTexts.has(normalizedText)) continue;
|
||||||
|
seenTexts.add(normalizedText);
|
||||||
|
textParts.push(text);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge attachments from all entries
|
||||||
|
const allAttachments = entries.flatMap((e) => e.message.attachments ?? []);
|
||||||
|
|
||||||
|
// Use the latest timestamp
|
||||||
|
const timestamps = entries
|
||||||
|
.map((e) => e.message.timestamp)
|
||||||
|
.filter((t): t is number => typeof t === "number");
|
||||||
|
const latestTimestamp = timestamps.length > 0 ? Math.max(...timestamps) : first.timestamp;
|
||||||
|
|
||||||
|
// Collect all message IDs for reference
|
||||||
|
const messageIds = entries
|
||||||
|
.map((e) => e.message.messageId)
|
||||||
|
.filter((id): id is string => Boolean(id));
|
||||||
|
|
||||||
|
// Prefer reply context from any entry that has it
|
||||||
|
const entryWithReply = entries.find((e) => e.message.replyToId);
|
||||||
|
|
||||||
|
return {
|
||||||
|
...first,
|
||||||
|
text: textParts.join(" "),
|
||||||
|
attachments: allAttachments.length > 0 ? allAttachments : first.attachments,
|
||||||
|
timestamp: latestTimestamp,
|
||||||
|
// Use first message's ID as primary (for reply reference), but we've coalesced others
|
||||||
|
messageId: messageIds[0] ?? first.messageId,
|
||||||
|
// Preserve reply context if present
|
||||||
|
replyToId: entryWithReply?.message.replyToId ?? first.replyToId,
|
||||||
|
replyToBody: entryWithReply?.message.replyToBody ?? first.replyToBody,
|
||||||
|
replyToSender: entryWithReply?.message.replyToSender ?? first.replyToSender,
|
||||||
|
// Clear balloonBundleId since we've combined (the combined message is no longer just a balloon)
|
||||||
|
balloonBundleId: undefined,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
const webhookTargets = new Map<string, WebhookTarget[]>();
|
const webhookTargets = new Map<string, WebhookTarget[]>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps webhook targets to their inbound debouncers.
|
||||||
|
* Each target gets its own debouncer keyed by a unique identifier.
|
||||||
|
*/
|
||||||
|
const targetDebouncers = new Map<
|
||||||
|
WebhookTarget,
|
||||||
|
ReturnType<BlueBubblesCoreRuntime["channel"]["debounce"]["createInboundDebouncer"]>
|
||||||
|
>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates or retrieves a debouncer for a webhook target.
|
||||||
|
*/
|
||||||
|
function getOrCreateDebouncer(target: WebhookTarget) {
|
||||||
|
const existing = targetDebouncers.get(target);
|
||||||
|
if (existing) return existing;
|
||||||
|
|
||||||
|
const { account, config, runtime, core } = target;
|
||||||
|
|
||||||
|
const debouncer = core.channel.debounce.createInboundDebouncer<BlueBubblesDebounceEntry>({
|
||||||
|
debounceMs: DEFAULT_INBOUND_DEBOUNCE_MS,
|
||||||
|
buildKey: (entry) => {
|
||||||
|
const msg = entry.message;
|
||||||
|
// Build key from account + chat + sender to coalesce messages from same source
|
||||||
|
const chatKey =
|
||||||
|
msg.chatGuid?.trim() ??
|
||||||
|
msg.chatIdentifier?.trim() ??
|
||||||
|
(msg.chatId ? String(msg.chatId) : "dm");
|
||||||
|
return `bluebubbles:${account.accountId}:${chatKey}:${msg.senderId}`;
|
||||||
|
},
|
||||||
|
shouldDebounce: (entry) => {
|
||||||
|
const msg = entry.message;
|
||||||
|
// Skip debouncing for messages with attachments - process immediately
|
||||||
|
if (msg.attachments && msg.attachments.length > 0) return false;
|
||||||
|
// Skip debouncing for from-me messages (they're just cached, not processed)
|
||||||
|
if (msg.fromMe) return false;
|
||||||
|
// Skip debouncing for control commands - process immediately
|
||||||
|
if (core.channel.text.hasControlCommand(msg.text, config)) return false;
|
||||||
|
// Debounce normal text messages and URL balloon messages
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
onFlush: async (entries) => {
|
||||||
|
if (entries.length === 0) return;
|
||||||
|
|
||||||
|
// Use target from first entry (all entries have same target due to key structure)
|
||||||
|
const flushTarget = entries[0].target;
|
||||||
|
|
||||||
|
if (entries.length === 1) {
|
||||||
|
// Single message - process normally
|
||||||
|
await processMessage(entries[0].message, flushTarget);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multiple messages - combine and process
|
||||||
|
const combined = combineDebounceEntries(entries);
|
||||||
|
|
||||||
|
if (core.logging.shouldLogVerbose()) {
|
||||||
|
const count = entries.length;
|
||||||
|
const preview = combined.text.slice(0, 50);
|
||||||
|
runtime.log?.(
|
||||||
|
`[bluebubbles] coalesced ${count} messages: "${preview}${combined.text.length > 50 ? "..." : ""}"`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
await processMessage(combined, flushTarget);
|
||||||
|
},
|
||||||
|
onError: (err) => {
|
||||||
|
runtime.error?.(`[bluebubbles] debounce flush failed: ${String(err)}`);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
targetDebouncers.set(target, debouncer);
|
||||||
|
return debouncer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a debouncer for a target (called during unregistration).
|
||||||
|
*/
|
||||||
|
function removeDebouncer(target: WebhookTarget): void {
|
||||||
|
targetDebouncers.delete(target);
|
||||||
|
}
|
||||||
|
|
||||||
function normalizeWebhookPath(raw: string): string {
|
function normalizeWebhookPath(raw: string): string {
|
||||||
const trimmed = raw.trim();
|
const trimmed = raw.trim();
|
||||||
if (!trimmed) return "/";
|
if (!trimmed) return "/";
|
||||||
@@ -275,6 +452,8 @@ export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => v
|
|||||||
} else {
|
} else {
|
||||||
webhookTargets.delete(key);
|
webhookTargets.delete(key);
|
||||||
}
|
}
|
||||||
|
// Clean up debouncer when target is unregistered
|
||||||
|
removeDebouncer(normalizedTarget);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1205,7 +1384,10 @@ export async function handleBlueBubblesWebhookRequest(
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
} else if (message) {
|
} else if (message) {
|
||||||
processMessage(message, target).catch((err) => {
|
// Route messages through debouncer to coalesce rapid-fire events
|
||||||
|
// (e.g., text message + URL balloon arriving as separate webhooks)
|
||||||
|
const debouncer = getOrCreateDebouncer(target);
|
||||||
|
debouncer.enqueue({ message, target }).catch((err) => {
|
||||||
target.runtime.error?.(
|
target.runtime.error?.(
|
||||||
`[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`,
|
`[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`,
|
||||||
);
|
);
|
||||||
|
|||||||
Reference in New Issue
Block a user