From 4a99b9b65118cec5f0c5cc06958e6d40fb448f34 Mon Sep 17 00:00:00 2001 From: juanpablodlc <92012363+juanpablodlc@users.noreply.github.com> Date: Thu, 15 Jan 2026 15:07:19 -0800 Subject: [PATCH] feat(whatsapp): add debounceMs for batching rapid messages (#971) * feat(whatsapp): add debounceMs for batching rapid messages Add a `debounceMs` configuration option to WhatsApp channel settings that batches rapid consecutive messages from the same sender into a single response. This prevents triggering separate agent runs for each message when a user sends multiple short messages in quick succession (e.g., "Hey!", "how are you?", "I was wondering..."). Changes: - Add `debounceMs` config to WhatsAppConfig and WhatsAppAccountConfig - Implement message buffering in `monitorWebInbox` with: - Map-based buffer keyed by sender (DM) or chat ID (groups) - Debounce timer that resets on each new message - Message combination with newline separator - Single message optimization (no modification if only one message) - Wire `debounceMs` through account resolution and monitor tuning - Add UI hints and schema documentation Usage example: { "channels": { "whatsapp": { "debounceMs": 5000 // 5 second window } } } Default behavior: `debounceMs: 0` (disabled by default) Verified: All existing tests pass (3204 tests), TypeScript compilation succeeds with no errors. Implemented with assistance from AI coding tools. Closes #967 * chore: wip inbound debounce * fix: debounce inbound messages across channels (#971) (thanks @juanpablodlc) --------- Co-authored-by: Peter Steinberger --- CHANGELOG.md | 1 + docs/concepts/messages.md | 10 + docs/gateway/configuration.md | 25 ++ src/agents/pi-embedded-runner/compact.ts | 10 +- src/agents/tools/discord-actions.test.ts | 16 +- src/agents/tools/slack-actions.test.ts | 4 +- src/auto-reply/inbound-debounce.test.ts | 48 +++ src/auto-reply/inbound-debounce.ts | 101 +++++ ...uick-model-picker-grouped-by-model.test.ts | 4 +- src/auto-reply/templating.ts | 3 + src/config/schema.ts | 6 + src/config/types.messages.ts | 18 + src/config/types.whatsapp.ts | 4 + src/config/zod-schema.core.ts | 20 + src/config/zod-schema.providers-whatsapp.ts | 2 + src/config/zod-schema.session.ts | 8 +- src/discord/monitor/message-handler.ts | 90 ++++- src/imessage/monitor/monitor-provider.ts | 56 ++- .../monitor-handler/message-handler.ts | 87 ++++- src/signal/monitor/event-handler.ts | 361 +++++++++++------- src/slack/monitor/message-handler.ts | 69 +++- src/telegram/bot-handlers.ts | 75 +++- src/web/accounts.ts | 2 + src/web/auto-reply/monitor.ts | 12 + src/web/auto-reply/types.ts | 2 + src/web/inbound/monitor.ts | 105 +++-- 26 files changed, 927 insertions(+), 212 deletions(-) create mode 100644 src/auto-reply/inbound-debounce.test.ts create mode 100644 src/auto-reply/inbound-debounce.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 12f11f298..47d767ec2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Agents: add Current Date & Time system prompt section with configurable time format (auto/12/24). - Tools: normalize Slack/Discord message timestamps with `timestampMs`/`timestampUtc` while keeping raw provider fields. - Docs: add Date & Time guide and update prompt/timezone configuration docs. +- Messages: debounce rapid inbound messages across channels with per-connector overrides. (#971) — thanks @juanpablodlc. - Fix: guard model fallback against undefined provider/model values. (#954) — thanks @roshanasingh4. - Memory: make `node-llama-cpp` an optional dependency (avoid Node 25 install failures) and improve local-embeddings fallback/errors. - Browser: add `snapshot refs=aria` (Playwright aria-ref ids) for self-resolving refs across `snapshot` → `act`. diff --git a/docs/concepts/messages.md b/docs/concepts/messages.md index 959c62ca6..7045c915d 100644 --- a/docs/concepts/messages.md +++ b/docs/concepts/messages.md @@ -33,6 +33,16 @@ Channels can redeliver the same message after reconnects. Clawdbot keeps a short-lived cache keyed by channel/account/peer/session/message id so duplicate deliveries do not trigger another agent run. +## Inbound debouncing + +Rapid consecutive messages from the **same sender** can be batched into a single +agent turn via `messages.inbound`. Debouncing is scoped per channel + conversation +and uses the most recent message for reply threading/IDs. + +Notes: +- Debounce applies to **text-only** messages; media/attachments flush immediately. +- Control commands bypass debouncing so they remain standalone. + ## Sessions and devices Sessions are owned by the gateway, not by clients. diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index 790e93036..65988c1a3 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -822,6 +822,31 @@ Controls how inbound messages behave when an agent run is already active. } ``` +### `messages.inbound` + +Debounce rapid inbound messages from the **same sender** so multiple back-to-back +messages become a single agent turn. Debouncing is scoped per channel + conversation +and uses the most recent message for reply threading/IDs. + +```json5 +{ + messages: { + inbound: { + debounceMs: 2000, // 0 disables + byChannel: { + whatsapp: 5000, + slack: 1500, + discord: 1500 + } + } + } +} +``` + +Notes: +- Debounce batches **text-only** messages; media/attachments flush immediately. +- Control commands (e.g. `/queue`, `/new`) bypass debouncing so they stay standalone. + ### `commands` (chat command handling) Controls how chat commands are enabled across connectors. diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 3cffa1c08..56dc84ba8 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -54,11 +54,7 @@ import { buildEmbeddedSystemPrompt, createSystemPromptOverride } from "./system- import { splitSdkTools } from "./tool-split.js"; import type { EmbeddedPiCompactResult } from "./types.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; -import { - describeUnknownError, - mapThinkingLevel, - resolveExecToolDefaults, -} from "./utils.js"; +import { describeUnknownError, mapThinkingLevel, resolveExecToolDefaults } from "./utils.js"; export async function compactEmbeddedPiSession(params: { sessionId: string; @@ -227,9 +223,7 @@ export async function compactEmbeddedPiSession(params: { const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated); const reasoningTagHint = isReasoningTagProvider(provider); const userTimezone = resolveUserTimezone(params.config?.agents?.defaults?.userTimezone); - const userTimeFormat = resolveUserTimeFormat( - params.config?.agents?.defaults?.timeFormat, - ); + const userTimeFormat = resolveUserTimeFormat(params.config?.agents?.defaults?.timeFormat); const userTime = formatUserTime(new Date(), userTimezone, userTimeFormat); const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({ sessionKey: params.sessionKey, diff --git a/src/agents/tools/discord-actions.test.ts b/src/agents/tools/discord-actions.test.ts index 74059ba57..3eead3f40 100644 --- a/src/agents/tools/discord-actions.test.ts +++ b/src/agents/tools/discord-actions.test.ts @@ -138,16 +138,16 @@ describe("handleDiscordMessagingAction", () => { }); it("adds normalized timestamps to readMessages payloads", async () => { - readMessagesDiscord.mockResolvedValueOnce([ - { id: "1", timestamp: "2026-01-15T10:00:00.000Z" }, - ]); + readMessagesDiscord.mockResolvedValueOnce([{ id: "1", timestamp: "2026-01-15T10:00:00.000Z" }]); const result = await handleDiscordMessagingAction( "readMessages", { channelId: "C1" }, enableAllActions, ); - const payload = result.details as { messages: Array<{ timestampMs?: number; timestampUtc?: string }> }; + const payload = result.details as { + messages: Array<{ timestampMs?: number; timestampUtc?: string }>; + }; const expectedMs = Date.parse("2026-01-15T10:00:00.000Z"); expect(payload.messages[0].timestampMs).toBe(expectedMs); @@ -173,16 +173,16 @@ describe("handleDiscordMessagingAction", () => { }); it("adds normalized timestamps to listPins payloads", async () => { - listPinsDiscord.mockResolvedValueOnce([ - { id: "1", timestamp: "2026-01-15T12:00:00.000Z" }, - ]); + listPinsDiscord.mockResolvedValueOnce([{ id: "1", timestamp: "2026-01-15T12:00:00.000Z" }]); const result = await handleDiscordMessagingAction( "listPins", { channelId: "C1" }, enableAllActions, ); - const payload = result.details as { pins: Array<{ timestampMs?: number; timestampUtc?: string }> }; + const payload = result.details as { + pins: Array<{ timestampMs?: number; timestampUtc?: string }>; + }; const expectedMs = Date.parse("2026-01-15T12:00:00.000Z"); expect(payload.pins[0].timestampMs).toBe(expectedMs); diff --git a/src/agents/tools/slack-actions.test.ts b/src/agents/tools/slack-actions.test.ts index f9ffe72f0..611721e02 100644 --- a/src/agents/tools/slack-actions.test.ts +++ b/src/agents/tools/slack-actions.test.ts @@ -334,7 +334,9 @@ describe("handleSlackAction", () => { }); const result = await handleSlackAction({ action: "readMessages", channelId: "C1" }, cfg); - const payload = result.details as { messages: Array<{ timestampMs?: number; timestampUtc?: string }> }; + const payload = result.details as { + messages: Array<{ timestampMs?: number; timestampUtc?: string }>; + }; const expectedMs = Math.round(1735689600.456 * 1000); expect(payload.messages[0].timestampMs).toBe(expectedMs); diff --git a/src/auto-reply/inbound-debounce.test.ts b/src/auto-reply/inbound-debounce.test.ts new file mode 100644 index 000000000..a50d403b4 --- /dev/null +++ b/src/auto-reply/inbound-debounce.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it, vi } from "vitest"; + +import { createInboundDebouncer } from "./inbound-debounce.js"; + +describe("createInboundDebouncer", () => { + it("debounces and combines items", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string }>({ + debounceMs: 10, + buildKey: (item) => item.key, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "a", id: "1" }); + await debouncer.enqueue({ key: "a", id: "2" }); + + expect(calls).toEqual([]); + await vi.advanceTimersByTimeAsync(10); + expect(calls).toEqual([["1", "2"]]); + + vi.useRealTimers(); + }); + + it("flushes buffered items before non-debounced item", async () => { + vi.useFakeTimers(); + const calls: Array = []; + + const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({ + debounceMs: 50, + buildKey: (item) => item.key, + shouldDebounce: (item) => item.debounce, + onFlush: async (items) => { + calls.push(items.map((entry) => entry.id)); + }, + }); + + await debouncer.enqueue({ key: "a", id: "1", debounce: true }); + await debouncer.enqueue({ key: "a", id: "2", debounce: false }); + + expect(calls).toEqual([["1"], ["2"]]); + + vi.useRealTimers(); + }); +}); diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts new file mode 100644 index 000000000..47a29687d --- /dev/null +++ b/src/auto-reply/inbound-debounce.ts @@ -0,0 +1,101 @@ +import type { ClawdbotConfig } from "../config/config.js"; +import type { InboundDebounceByProvider } from "../config/types.messages.js"; + +const resolveMs = (value: unknown): number | undefined => { + if (typeof value !== "number" || !Number.isFinite(value)) return undefined; + return Math.max(0, Math.trunc(value)); +}; + +const resolveChannelOverride = (params: { + byChannel?: InboundDebounceByProvider; + channel: string; +}): number | undefined => { + if (!params.byChannel) return undefined; + const channelKey = params.channel as keyof InboundDebounceByProvider; + return resolveMs(params.byChannel[channelKey]); +}; + +export function resolveInboundDebounceMs(params: { + cfg: ClawdbotConfig; + channel: string; + overrideMs?: number; +}): number { + const inbound = params.cfg.messages?.inbound; + const override = resolveMs(params.overrideMs); + const byChannel = resolveChannelOverride({ + byChannel: inbound?.byChannel, + channel: params.channel, + }); + const base = resolveMs(inbound?.debounceMs); + return override ?? byChannel ?? base ?? 0; +} + +type DebounceBuffer = { + items: T[]; + timeout: ReturnType | null; +}; + +export function createInboundDebouncer(params: { + debounceMs: number; + buildKey: (item: T) => string | null | undefined; + shouldDebounce?: (item: T) => boolean; + onFlush: (items: T[]) => Promise; + onError?: (err: unknown, items: T[]) => void; +}) { + const buffers = new Map>(); + const debounceMs = Math.max(0, Math.trunc(params.debounceMs)); + + const flushBuffer = async (key: string, buffer: DebounceBuffer) => { + buffers.delete(key); + if (buffer.timeout) { + clearTimeout(buffer.timeout); + buffer.timeout = null; + } + if (buffer.items.length === 0) return; + try { + await params.onFlush(buffer.items); + } catch (err) { + params.onError?.(err, buffer.items); + } + }; + + const flushKey = async (key: string) => { + const buffer = buffers.get(key); + if (!buffer) return; + await flushBuffer(key, buffer); + }; + + const scheduleFlush = (key: string, buffer: DebounceBuffer) => { + if (buffer.timeout) clearTimeout(buffer.timeout); + buffer.timeout = setTimeout(() => { + void flushBuffer(key, buffer); + }, debounceMs); + buffer.timeout.unref?.(); + }; + + const enqueue = async (item: T) => { + const key = params.buildKey(item); + const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true); + + if (!canDebounce || !key) { + if (key && buffers.has(key)) { + await flushKey(key); + } + await params.onFlush([item]); + return; + } + + const existing = buffers.get(key); + if (existing) { + existing.items.push(item); + scheduleFlush(key, existing); + return; + } + + const buffer: DebounceBuffer = { items: [item], timeout: null }; + buffers.set(key, buffer); + scheduleFlush(key, buffer); + }; + + return { enqueue, flushKey }; +} diff --git a/src/auto-reply/reply.triggers.trigger-handling.shows-quick-model-picker-grouped-by-model.test.ts b/src/auto-reply/reply.triggers.trigger-handling.shows-quick-model-picker-grouped-by-model.test.ts index 9fc8eca80..1fd21f2f2 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.shows-quick-model-picker-grouped-by-model.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.shows-quick-model-picker-grouped-by-model.test.ts @@ -263,9 +263,7 @@ describe("trigger handling", () => { const text = Array.isArray(res) ? res[0]?.text : res?.text; // Selecting the default model shows "reset to default" instead of "set to" - expect(normalizeTestText(text ?? "")).toContain( - "anthropic/claude-opus-4-5", - ); + expect(normalizeTestText(text ?? "")).toContain("anthropic/claude-opus-4-5"); const store = loadSessionStore(cfg.session.store); // When selecting the default, overrides are cleared diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index 03bd488f6..610e2f8dc 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -24,6 +24,9 @@ export type MsgContext = { AccountId?: string; ParentSessionKey?: string; MessageSid?: string; + MessageSids?: string[]; + MessageSidFirst?: string; + MessageSidLast?: string; ReplyToId?: string; ReplyToBody?: string; ReplyToSender?: string; diff --git a/src/config/schema.ts b/src/config/schema.ts index 4b84c154e..f3b09fc45 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -168,6 +168,7 @@ const FIELD_LABELS: Record = { "session.agentToAgent.maxPingPongTurns": "Agent-to-Agent Ping-Pong Turns", "messages.ackReaction": "Ack Reaction Emoji", "messages.ackReactionScope": "Ack Reaction Scope", + "messages.inbound.debounceMs": "Inbound Message Debounce (ms)", "talk.apiKey": "Talk API Key", "channels.whatsapp": "WhatsApp", "channels.telegram": "Telegram", @@ -189,6 +190,7 @@ const FIELD_LABELS: Record = { "channels.telegram.timeoutSeconds": "Telegram API Timeout (seconds)", "channels.whatsapp.dmPolicy": "WhatsApp DM Policy", "channels.whatsapp.selfChatMode": "WhatsApp Self-Phone Mode", + "channels.whatsapp.debounceMs": "WhatsApp Message Debounce (ms)", "channels.signal.dmPolicy": "Signal DM Policy", "channels.imessage.dmPolicy": "iMessage DM Policy", "channels.discord.dm.policy": "Discord DM Policy", @@ -327,6 +329,8 @@ const FIELD_HELP: Record = { "messages.ackReaction": "Emoji reaction used to acknowledge inbound messages (empty disables).", "messages.ackReactionScope": 'When to send ack reactions ("group-mentions", "group-all", "direct", "all").', + "messages.inbound.debounceMs": + "Debounce window (ms) for batching rapid inbound messages from the same sender (0 to disable).", "channels.telegram.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].', "channels.telegram.streamMode": @@ -348,6 +352,8 @@ const FIELD_HELP: Record = { "channels.whatsapp.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires channels.whatsapp.allowFrom=["*"].', "channels.whatsapp.selfChatMode": "Same-phone setup (bot uses your personal WhatsApp number).", + "channels.whatsapp.debounceMs": + "Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable).", "channels.signal.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires channels.signal.allowFrom=["*"].', "channels.imessage.dmPolicy": diff --git a/src/config/types.messages.ts b/src/config/types.messages.ts index d137b56d8..61a4655f3 100644 --- a/src/config/types.messages.ts +++ b/src/config/types.messages.ts @@ -17,6 +17,22 @@ export type QueueConfig = { drop?: QueueDropPolicy; }; +export type InboundDebounceByProvider = { + whatsapp?: number; + telegram?: number; + discord?: number; + slack?: number; + signal?: number; + imessage?: number; + msteams?: number; + webchat?: number; +}; + +export type InboundDebounceConfig = { + debounceMs?: number; + byChannel?: InboundDebounceByProvider; +}; + export type BroadcastStrategy = "parallel" | "sequential"; export type BroadcastConfig = { @@ -64,6 +80,8 @@ export type MessagesConfig = { responsePrefix?: string; groupChat?: GroupChatConfig; queue?: QueueConfig; + /** Debounce rapid inbound messages per sender (global + per-channel overrides). */ + inbound?: InboundDebounceConfig; /** Emoji reaction used to acknowledge inbound messages (empty disables). */ ackReaction?: string; /** When to send ack reactions. Default: "group-mentions". */ diff --git a/src/config/types.whatsapp.ts b/src/config/types.whatsapp.ts index c28178f06..28ed34c56 100644 --- a/src/config/types.whatsapp.ts +++ b/src/config/types.whatsapp.ts @@ -75,6 +75,8 @@ export type WhatsAppConfig = { */ group?: "always" | "mentions" | "never"; }; + /** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */ + debounceMs?: number; }; export type WhatsAppAccountConfig = { @@ -131,4 +133,6 @@ export type WhatsAppAccountConfig = { */ group?: "always" | "mentions" | "never"; }; + /** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */ + debounceMs?: number; }; diff --git a/src/config/zod-schema.core.ts b/src/config/zod-schema.core.ts index 89454235d..5510e8d74 100644 --- a/src/config/zod-schema.core.ts +++ b/src/config/zod-schema.core.ts @@ -188,6 +188,19 @@ export const QueueModeBySurfaceSchema = z }) .optional(); +export const DebounceMsBySurfaceSchema = z + .object({ + whatsapp: z.number().int().nonnegative().optional(), + telegram: z.number().int().nonnegative().optional(), + discord: z.number().int().nonnegative().optional(), + slack: z.number().int().nonnegative().optional(), + signal: z.number().int().nonnegative().optional(), + imessage: z.number().int().nonnegative().optional(), + msteams: z.number().int().nonnegative().optional(), + webchat: z.number().int().nonnegative().optional(), + }) + .optional(); + export const QueueSchema = z .object({ mode: QueueModeSchema.optional(), @@ -198,6 +211,13 @@ export const QueueSchema = z }) .optional(); +export const InboundDebounceSchema = z + .object({ + debounceMs: z.number().int().nonnegative().optional(), + byChannel: DebounceMsBySurfaceSchema, + }) + .optional(); + export const TranscribeAudioSchema = z .object({ command: z.array(z.string()).superRefine((value, ctx) => { diff --git a/src/config/zod-schema.providers-whatsapp.ts b/src/config/zod-schema.providers-whatsapp.ts index 962bb5619..171a7c476 100644 --- a/src/config/zod-schema.providers-whatsapp.ts +++ b/src/config/zod-schema.providers-whatsapp.ts @@ -46,6 +46,7 @@ export const WhatsAppAccountSchema = z group: z.enum(["always", "mentions", "never"]).optional().default("mentions"), }) .optional(), + debounceMs: z.number().int().nonnegative().optional().default(0), }) .superRefine((value, ctx) => { if (value.dmPolicy !== "open") return; @@ -101,6 +102,7 @@ export const WhatsAppConfigSchema = z group: z.enum(["always", "mentions", "never"]).optional().default("mentions"), }) .optional(), + debounceMs: z.number().int().nonnegative().optional().default(0), }) .superRefine((value, ctx) => { if (value.dmPolicy !== "open") return; diff --git a/src/config/zod-schema.session.ts b/src/config/zod-schema.session.ts index 389f67719..3a9efe2fc 100644 --- a/src/config/zod-schema.session.ts +++ b/src/config/zod-schema.session.ts @@ -1,6 +1,11 @@ import { z } from "zod"; -import { GroupChatSchema, NativeCommandsSettingSchema, QueueSchema } from "./zod-schema.core.js"; +import { + GroupChatSchema, + InboundDebounceSchema, + NativeCommandsSettingSchema, + QueueSchema, +} from "./zod-schema.core.js"; export const SessionSchema = z .object({ @@ -54,6 +59,7 @@ export const MessagesSchema = z responsePrefix: z.string().optional(), groupChat: GroupChatSchema, queue: QueueSchema, + inbound: InboundDebounceSchema, ackReaction: z.string().optional(), ackReactionScope: z.enum(["group-mentions", "group-all", "direct", "all"]).optional(), removeAckAfterReply: z.boolean().optional(), diff --git a/src/discord/monitor/message-handler.ts b/src/discord/monitor/message-handler.ts index cdce50769..c4335d724 100644 --- a/src/discord/monitor/message-handler.ts +++ b/src/discord/monitor/message-handler.ts @@ -1,11 +1,19 @@ +import type { Client } from "@buape/carbon"; + +import { hasControlCommand } from "../../auto-reply/command-detection.js"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, +} from "../../auto-reply/inbound-debounce.js"; import type { HistoryEntry } from "../../auto-reply/reply/history.js"; import type { ReplyToMode } from "../../config/config.js"; import { danger } from "../../globals.js"; import type { RuntimeEnv } from "../../runtime.js"; import type { DiscordGuildEntryResolved } from "./allow-list.js"; -import type { DiscordMessageHandler } from "./listeners.js"; +import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js"; import { preflightDiscordMessage } from "./message-handler.preflight.js"; import { processDiscordMessage } from "./message-handler.process.js"; +import { resolveDiscordMessageText } from "./message-utils.js"; type LoadedConfig = ReturnType; type DiscordConfig = NonNullable< @@ -32,18 +40,90 @@ export function createDiscordMessageHandler(params: { }): DiscordMessageHandler { const groupPolicy = params.discordConfig?.groupPolicy ?? "open"; const ackReactionScope = params.cfg.messages?.ackReactionScope ?? "group-mentions"; + const debounceMs = resolveInboundDebounceMs({ cfg: params.cfg, channel: "discord" }); - return async (data, client) => { - try { + const debouncer = createInboundDebouncer<{ data: DiscordMessageEvent; client: Client }>({ + debounceMs, + buildKey: (entry) => { + const message = entry.data.message; + const authorId = entry.data.author?.id; + if (!message || !authorId) return null; + const channelId = message.channelId; + if (!channelId) return null; + return `discord:${params.accountId}:${channelId}:${authorId}`; + }, + shouldDebounce: (entry) => { + const message = entry.data.message; + if (!message) return false; + if (message.attachments && message.attachments.length > 0) return false; + const baseText = resolveDiscordMessageText(message, { includeForwarded: false }); + if (!baseText.trim()) return false; + return !hasControlCommand(baseText, params.cfg); + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) return; + if (entries.length === 1) { + const ctx = await preflightDiscordMessage({ + ...params, + ackReactionScope, + groupPolicy, + data: last.data, + client: last.client, + }); + if (!ctx) return; + await processDiscordMessage(ctx); + return; + } + const combinedBaseText = entries + .map((entry) => resolveDiscordMessageText(entry.data.message, { includeForwarded: false })) + .filter(Boolean) + .join("\n"); + const syntheticMessage = { + ...last.data.message, + content: combinedBaseText, + attachments: [], + message_snapshots: (last.data.message as { message_snapshots?: unknown }).message_snapshots, + messageSnapshots: (last.data.message as { messageSnapshots?: unknown }).messageSnapshots, + rawData: { + ...(last.data.message as { rawData?: Record }).rawData, + }, + }; + const syntheticData: DiscordMessageEvent = { + ...last.data, + message: syntheticMessage, + }; const ctx = await preflightDiscordMessage({ ...params, ackReactionScope, groupPolicy, - data, - client, + data: syntheticData, + client: last.client, }); if (!ctx) return; + if (entries.length > 1) { + const ids = entries.map((entry) => entry.data.message?.id).filter(Boolean) as string[]; + if (ids.length > 0) { + const ctxBatch = ctx as typeof ctx & { + MessageSids?: string[]; + MessageSidFirst?: string; + MessageSidLast?: string; + }; + ctxBatch.MessageSids = ids; + ctxBatch.MessageSidFirst = ids[0]; + ctxBatch.MessageSidLast = ids[ids.length - 1]; + } + } await processDiscordMessage(ctx); + }, + onError: (err) => { + params.runtime.error?.(danger(`discord debounce flush failed: ${String(err)}`)); + }, + }); + + return async (data, client) => { + try { + await debouncer.enqueue({ data, client }); } catch (err) { params.runtime.error?.(danger(`handler failed: ${String(err)}`)); } diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index be98db0a4..2918b5fc8 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -10,6 +10,10 @@ import { import { resolveTextChunkLimit } from "../../auto-reply/chunk.js"; import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../../auto-reply/envelope.js"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, +} from "../../auto-reply/inbound-debounce.js"; import { dispatchReplyFromConfig } from "../../auto-reply/reply/dispatch-from-config.js"; import { buildHistoryContextFromMap, @@ -73,11 +77,48 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P const includeAttachments = opts.includeAttachments ?? imessageCfg.includeAttachments ?? false; const mediaMaxBytes = (opts.mediaMaxMb ?? imessageCfg.mediaMaxMb ?? 16) * 1024 * 1024; - const handleMessage = async (raw: unknown) => { - const params = raw as { message?: IMessagePayload | null }; - const message = params?.message ?? null; - if (!message) return; + const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "imessage" }); + const inboundDebouncer = createInboundDebouncer<{ message: IMessagePayload }>({ + debounceMs: inboundDebounceMs, + buildKey: (entry) => { + const sender = entry.message.sender?.trim(); + if (!sender) return null; + const conversationId = + entry.message.chat_id != null + ? `chat:${entry.message.chat_id}` + : (entry.message.chat_guid ?? entry.message.chat_identifier ?? "unknown"); + return `imessage:${accountInfo.accountId}:${conversationId}:${sender}`; + }, + shouldDebounce: (entry) => { + const text = entry.message.text?.trim() ?? ""; + if (!text) return false; + if (entry.message.attachments && entry.message.attachments.length > 0) return false; + return !hasControlCommand(text, cfg); + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) return; + if (entries.length === 1) { + await handleMessageNow(last.message); + return; + } + const combinedText = entries + .map((entry) => entry.message.text ?? "") + .filter(Boolean) + .join("\n"); + const syntheticMessage: IMessagePayload = { + ...last.message, + text: combinedText, + attachments: null, + }; + await handleMessageNow(syntheticMessage); + }, + onError: (err) => { + runtime.error?.(`imessage debounce flush failed: ${String(err)}`); + }, + }); + async function handleMessageNow(message: IMessagePayload) { const senderRaw = message.sender ?? ""; const sender = senderRaw.trim(); if (!sender) return; @@ -403,6 +444,13 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P if (isGroup && historyKey && historyLimit > 0 && didSendReply) { clearHistoryEntries({ historyMap: groupHistories, historyKey }); } + } + + const handleMessage = async (raw: unknown) => { + const params = raw as { message?: IMessagePayload | null }; + const message = params?.message ?? null; + if (!message) return; + await inboundDebouncer.enqueue({ message }); }; const client = await createIMessageRpcClient({ diff --git a/src/msteams/monitor-handler/message-handler.ts b/src/msteams/monitor-handler/message-handler.ts index d0a5122df..b93ddc3b8 100644 --- a/src/msteams/monitor-handler/message-handler.ts +++ b/src/msteams/monitor-handler/message-handler.ts @@ -1,4 +1,9 @@ +import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../../auto-reply/envelope.js"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, +} from "../../auto-reply/inbound-debounce.js"; import { dispatchReplyFromConfig } from "../../auto-reply/reply/dispatch-from-config.js"; import { buildHistoryContextFromMap, @@ -61,14 +66,22 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { DEFAULT_GROUP_HISTORY_LIMIT, ); const conversationHistories = new Map(); + const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "msteams" }); - return async function handleTeamsMessage(context: MSTeamsTurnContext) { + type MSTeamsDebounceEntry = { + context: MSTeamsTurnContext; + rawText: string; + text: string; + attachments: MSTeamsAttachmentLike[]; + wasMentioned: boolean; + }; + + const handleTeamsMessageNow = async (params: MSTeamsDebounceEntry) => { + const context = params.context; const activity = context.activity; - const rawText = activity.text?.trim() ?? ""; - const text = stripMSTeamsMentionTags(rawText); - const attachments = Array.isArray(activity.attachments) - ? (activity.attachments as unknown as MSTeamsAttachmentLike[]) - : []; + const rawText = params.rawText; + const text = params.text; + const attachments = params.attachments; const attachmentPlaceholder = buildMSTeamsAttachmentPlaceholder(attachments); const rawBody = text || attachmentPlaceholder; const from = activity.from; @@ -288,7 +301,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { }); if (!isDirectMessage) { - const mentioned = wasMSTeamsBotMentioned(activity); + const mentioned = params.wasMentioned; if (requireMention && !mentioned) { log.debug("skipping message (mention required)", { teamId, @@ -366,7 +379,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { Surface: "msteams" as const, MessageSid: activity.id, Timestamp: timestamp?.getTime() ?? Date.now(), - WasMentioned: isDirectMessage || wasMSTeamsBotMentioned(activity), + WasMentioned: isDirectMessage || params.wasMentioned, CommandAuthorized: true, OriginatingChannel: "msteams" as const, OriginatingTo: teamsTo, @@ -433,4 +446,62 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { } } }; + + const inboundDebouncer = createInboundDebouncer({ + debounceMs: inboundDebounceMs, + buildKey: (entry) => { + const conversationId = normalizeMSTeamsConversationId( + entry.context.activity.conversation?.id ?? "", + ); + const senderId = + entry.context.activity.from?.aadObjectId ?? entry.context.activity.from?.id ?? ""; + if (!senderId || !conversationId) return null; + return `msteams:${appId}:${conversationId}:${senderId}`; + }, + shouldDebounce: (entry) => { + if (!entry.text.trim()) return false; + if (entry.attachments.length > 0) return false; + return !hasControlCommand(entry.text, cfg); + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) return; + if (entries.length === 1) { + await handleTeamsMessageNow(last); + return; + } + const combinedText = entries + .map((entry) => entry.text) + .filter(Boolean) + .join("\n"); + if (!combinedText.trim()) return; + const combinedRawText = entries + .map((entry) => entry.rawText) + .filter(Boolean) + .join("\n"); + const wasMentioned = entries.some((entry) => entry.wasMentioned); + await handleTeamsMessageNow({ + context: last.context, + rawText: combinedRawText, + text: combinedText, + attachments: [], + wasMentioned, + }); + }, + onError: (err) => { + runtime.error?.(danger(`msteams debounce flush failed: ${String(err)}`)); + }, + }); + + return async function handleTeamsMessage(context: MSTeamsTurnContext) { + const activity = context.activity; + const rawText = activity.text?.trim() ?? ""; + const text = stripMSTeamsMentionTags(rawText); + const attachments = Array.isArray(activity.attachments) + ? (activity.attachments as unknown as MSTeamsAttachmentLike[]) + : []; + const wasMentioned = wasMSTeamsBotMentioned(activity); + + await inboundDebouncer.enqueue({ context, rawText, text, attachments, wasMentioned }); + }; } diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index 0aac7c97b..71dd0b00f 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -7,7 +7,12 @@ import { extractShortModelName, type ResponsePrefixContext, } from "../../auto-reply/reply/response-prefix-template.js"; +import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../../auto-reply/envelope.js"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, +} from "../../auto-reply/inbound-debounce.js"; import { dispatchReplyFromConfig } from "../../auto-reply/reply/dispatch-from-config.js"; import { buildHistoryContextFromMap, clearHistoryEntries } from "../../auto-reply/reply/history.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; @@ -36,6 +41,207 @@ import { sendMessageSignal } from "../send.js"; import type { SignalEventHandlerDeps, SignalReceivePayload } from "./event-handler.types.js"; export function createSignalEventHandler(deps: SignalEventHandlerDeps) { + const inboundDebounceMs = resolveInboundDebounceMs({ cfg: deps.cfg, channel: "signal" }); + + type SignalInboundEntry = { + senderName: string; + senderDisplay: string; + senderRecipient: string; + senderPeerId: string; + groupId?: string; + groupName?: string; + isGroup: boolean; + bodyText: string; + timestamp?: number; + messageId?: string; + mediaPath?: string; + mediaType?: string; + commandAuthorized: boolean; + }; + + async function handleSignalInboundMessage(entry: SignalInboundEntry) { + const fromLabel = entry.isGroup + ? `${entry.groupName ?? "Signal Group"} id:${entry.groupId}` + : `${entry.senderName} id:${entry.senderDisplay}`; + const body = formatAgentEnvelope({ + channel: "Signal", + from: fromLabel, + timestamp: entry.timestamp ?? undefined, + body: entry.bodyText, + }); + let combinedBody = body; + const historyKey = entry.isGroup ? String(entry.groupId ?? "unknown") : undefined; + if (entry.isGroup && historyKey && deps.historyLimit > 0) { + combinedBody = buildHistoryContextFromMap({ + historyMap: deps.groupHistories, + historyKey, + limit: deps.historyLimit, + entry: { + sender: entry.senderName, + body: entry.bodyText, + timestamp: entry.timestamp ?? undefined, + messageId: entry.messageId, + }, + currentMessage: combinedBody, + formatEntry: (historyEntry) => + formatAgentEnvelope({ + channel: "Signal", + from: fromLabel, + timestamp: historyEntry.timestamp, + body: `${historyEntry.sender}: ${historyEntry.body}${ + historyEntry.messageId ? ` [id:${historyEntry.messageId}]` : "" + }`, + }), + }); + } + + const route = resolveAgentRoute({ + cfg: deps.cfg, + channel: "signal", + accountId: deps.accountId, + peer: { + kind: entry.isGroup ? "group" : "dm", + id: entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId, + }, + }); + const signalTo = entry.isGroup ? `group:${entry.groupId}` : `signal:${entry.senderRecipient}`; + const ctxPayload = { + Body: combinedBody, + RawBody: entry.bodyText, + CommandBody: entry.bodyText, + From: entry.isGroup + ? `group:${entry.groupId ?? "unknown"}` + : `signal:${entry.senderRecipient}`, + To: signalTo, + SessionKey: route.sessionKey, + AccountId: route.accountId, + ChatType: entry.isGroup ? "group" : "direct", + GroupSubject: entry.isGroup ? (entry.groupName ?? undefined) : undefined, + SenderName: entry.senderName, + SenderId: entry.senderDisplay, + Provider: "signal" as const, + Surface: "signal" as const, + MessageSid: entry.messageId, + Timestamp: entry.timestamp ?? undefined, + MediaPath: entry.mediaPath, + MediaType: entry.mediaType, + MediaUrl: entry.mediaPath, + CommandAuthorized: entry.commandAuthorized, + OriginatingChannel: "signal" as const, + OriginatingTo: signalTo, + }; + + if (!entry.isGroup) { + const sessionCfg = deps.cfg.session; + const storePath = resolveStorePath(sessionCfg?.store, { + agentId: route.agentId, + }); + await updateLastRoute({ + storePath, + sessionKey: route.mainSessionKey, + channel: "signal", + to: entry.senderRecipient, + accountId: route.accountId, + }); + } + + if (shouldLogVerbose()) { + const preview = body.slice(0, 200).replace(/\\n/g, "\\\\n"); + logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`); + } + + let didSendReply = false; + + // Create mutable context for response prefix template interpolation + let prefixContext: ResponsePrefixContext = { + identityName: resolveIdentityName(deps.cfg, route.agentId), + }; + + const dispatcher = createReplyDispatcher({ + responsePrefix: resolveEffectiveMessagesConfig(deps.cfg, route.agentId).responsePrefix, + responsePrefixContextProvider: () => prefixContext, + humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId), + deliver: async (payload) => { + await deps.deliverReplies({ + replies: [payload], + target: ctxPayload.To, + baseUrl: deps.baseUrl, + account: deps.account, + accountId: deps.accountId, + runtime: deps.runtime, + maxBytes: deps.mediaMaxBytes, + textLimit: deps.textLimit, + }); + didSendReply = true; + }, + onError: (err, info) => { + deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`)); + }, + }); + + const { queuedFinal } = await dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg: deps.cfg, + dispatcher, + replyOptions: { + disableBlockStreaming: + typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined, + onModelSelected: (ctx) => { + // Mutate the object directly instead of reassigning to ensure the closure sees updates + prefixContext.provider = ctx.provider; + prefixContext.model = extractShortModelName(ctx.model); + prefixContext.modelFull = `${ctx.provider}/${ctx.model}`; + prefixContext.thinkingLevel = ctx.thinkLevel ?? "off"; + }, + }, + }); + if (!queuedFinal) { + if (entry.isGroup && historyKey && deps.historyLimit > 0 && didSendReply) { + clearHistoryEntries({ historyMap: deps.groupHistories, historyKey }); + } + return; + } + if (entry.isGroup && historyKey && deps.historyLimit > 0 && didSendReply) { + clearHistoryEntries({ historyMap: deps.groupHistories, historyKey }); + } + } + + const inboundDebouncer = createInboundDebouncer({ + debounceMs: inboundDebounceMs, + buildKey: (entry) => { + const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId; + if (!conversationId || !entry.senderPeerId) return null; + return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`; + }, + shouldDebounce: (entry) => { + if (!entry.bodyText.trim()) return false; + if (entry.mediaPath || entry.mediaType) return false; + return !hasControlCommand(entry.bodyText, deps.cfg); + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) return; + if (entries.length === 1) { + await handleSignalInboundMessage(last); + return; + } + const combinedText = entries + .map((entry) => entry.bodyText) + .filter(Boolean) + .join("\\n"); + if (!combinedText.trim()) return; + await handleSignalInboundMessage({ + ...last, + bodyText: combinedText, + mediaPath: undefined, + mediaType: undefined, + }); + }, + onError: (err) => { + deps.runtime.error?.(`signal debounce flush failed: ${String(err)}`); + }, + }); + return async (event: { event?: string; data?: string }) => { if (event.event !== "receive" || !event.data) return; @@ -230,146 +436,23 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || ""; if (!bodyText) return; - const fromLabel = isGroup - ? `${groupName ?? "Signal Group"} id:${groupId}` - : `${envelope.sourceName ?? senderDisplay} id:${senderDisplay}`; - const body = formatAgentEnvelope({ - channel: "Signal", - from: fromLabel, + const senderName = envelope.sourceName ?? senderDisplay; + const messageId = + typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined; + await inboundDebouncer.enqueue({ + senderName, + senderDisplay, + senderRecipient, + senderPeerId, + groupId, + groupName, + isGroup, + bodyText, timestamp: envelope.timestamp ?? undefined, - body: bodyText, + messageId, + mediaPath, + mediaType, + commandAuthorized, }); - let combinedBody = body; - const historyKey = isGroup ? String(groupId ?? "unknown") : undefined; - if (isGroup && historyKey && deps.historyLimit > 0) { - combinedBody = buildHistoryContextFromMap({ - historyMap: deps.groupHistories, - historyKey, - limit: deps.historyLimit, - entry: { - sender: envelope.sourceName ?? senderDisplay, - body: bodyText, - timestamp: envelope.timestamp ?? undefined, - messageId: - typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined, - }, - currentMessage: combinedBody, - formatEntry: (entry) => - formatAgentEnvelope({ - channel: "Signal", - from: fromLabel, - timestamp: entry.timestamp, - body: `${entry.sender}: ${entry.body}${entry.messageId ? ` [id:${entry.messageId}]` : ""}`, - }), - }); - } - - const route = resolveAgentRoute({ - cfg: deps.cfg, - channel: "signal", - accountId: deps.accountId, - peer: { - kind: isGroup ? "group" : "dm", - id: isGroup ? (groupId ?? "unknown") : senderPeerId, - }, - }); - const signalTo = isGroup ? `group:${groupId}` : `signal:${senderRecipient}`; - const ctxPayload = { - Body: combinedBody, - RawBody: bodyText, - CommandBody: bodyText, - From: isGroup ? `group:${groupId ?? "unknown"}` : `signal:${senderRecipient}`, - To: signalTo, - SessionKey: route.sessionKey, - AccountId: route.accountId, - ChatType: isGroup ? "group" : "direct", - GroupSubject: isGroup ? (groupName ?? undefined) : undefined, - SenderName: envelope.sourceName ?? senderDisplay, - SenderId: senderDisplay, - Provider: "signal" as const, - Surface: "signal" as const, - MessageSid: envelope.timestamp ? String(envelope.timestamp) : undefined, - Timestamp: envelope.timestamp ?? undefined, - MediaPath: mediaPath, - MediaType: mediaType, - MediaUrl: mediaPath, - CommandAuthorized: commandAuthorized, - OriginatingChannel: "signal" as const, - OriginatingTo: signalTo, - }; - - if (!isGroup) { - const sessionCfg = deps.cfg.session; - const storePath = resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); - await updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - channel: "signal", - to: senderRecipient, - accountId: route.accountId, - }); - } - - if (shouldLogVerbose()) { - const preview = body.slice(0, 200).replace(/\n/g, "\\n"); - logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`); - } - - let didSendReply = false; - - // Create mutable context for response prefix template interpolation - let prefixContext: ResponsePrefixContext = { - identityName: resolveIdentityName(deps.cfg, route.agentId), - }; - - const dispatcher = createReplyDispatcher({ - responsePrefix: resolveEffectiveMessagesConfig(deps.cfg, route.agentId).responsePrefix, - responsePrefixContextProvider: () => prefixContext, - humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId), - deliver: async (payload) => { - await deps.deliverReplies({ - replies: [payload], - target: ctxPayload.To, - baseUrl: deps.baseUrl, - account: deps.account, - accountId: deps.accountId, - runtime: deps.runtime, - maxBytes: deps.mediaMaxBytes, - textLimit: deps.textLimit, - }); - didSendReply = true; - }, - onError: (err, info) => { - deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`)); - }, - }); - - const { queuedFinal } = await dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg: deps.cfg, - dispatcher, - replyOptions: { - disableBlockStreaming: - typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined, - onModelSelected: (ctx) => { - // Mutate the object directly instead of reassigning to ensure the closure sees updates - prefixContext.provider = ctx.provider; - prefixContext.model = extractShortModelName(ctx.model); - prefixContext.modelFull = `${ctx.provider}/${ctx.model}`; - prefixContext.thinkingLevel = ctx.thinkLevel ?? "off"; - }, - }, - }); - if (!queuedFinal) { - if (isGroup && historyKey && deps.historyLimit > 0 && didSendReply) { - clearHistoryEntries({ historyMap: deps.groupHistories, historyKey }); - } - return; - } - if (isGroup && historyKey && deps.historyLimit > 0 && didSendReply) { - clearHistoryEntries({ historyMap: deps.groupHistories, historyKey }); - } }; } diff --git a/src/slack/monitor/message-handler.ts b/src/slack/monitor/message-handler.ts index 9ca3c4b76..2926d6ce2 100644 --- a/src/slack/monitor/message-handler.ts +++ b/src/slack/monitor/message-handler.ts @@ -1,3 +1,8 @@ +import { hasControlCommand } from "../../auto-reply/command-detection.js"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, +} from "../../auto-reply/inbound-debounce.js"; import type { ResolvedSlackAccount } from "../accounts.js"; import type { SlackMessageEvent } from "../types.js"; import type { SlackMonitorContext } from "./context.js"; @@ -14,6 +19,66 @@ export function createSlackMessageHandler(params: { account: ResolvedSlackAccount; }): SlackMessageHandler { const { ctx, account } = params; + const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" }); + + const debouncer = createInboundDebouncer<{ + message: SlackMessageEvent; + opts: { source: "message" | "app_mention"; wasMentioned?: boolean }; + }>({ + debounceMs, + buildKey: (entry) => { + const senderId = entry.message.user ?? entry.message.bot_id; + if (!senderId) return null; + const threadKey = entry.message.thread_ts + ? `${entry.message.channel}:${entry.message.thread_ts}` + : entry.message.channel; + return `slack:${ctx.accountId}:${threadKey}:${senderId}`; + }, + shouldDebounce: (entry) => { + const text = entry.message.text ?? ""; + if (!text.trim()) return false; + if (entry.message.files && entry.message.files.length > 0) return false; + return !hasControlCommand(text, ctx.cfg); + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) return; + const combinedText = + entries.length === 1 + ? (last.message.text ?? "") + : entries + .map((entry) => entry.message.text ?? "") + .filter(Boolean) + .join("\n"); + const combinedMentioned = entries.some((entry) => Boolean(entry.opts.wasMentioned)); + const syntheticMessage: SlackMessageEvent = { + ...last.message, + text: combinedText, + }; + const prepared = await prepareSlackMessage({ + ctx, + account, + message: syntheticMessage, + opts: { + ...last.opts, + wasMentioned: combinedMentioned || last.opts.wasMentioned, + }, + }); + if (!prepared) return; + if (entries.length > 1) { + const ids = entries.map((entry) => entry.message.ts).filter(Boolean) as string[]; + if (ids.length > 0) { + prepared.ctxPayload.MessageSids = ids; + prepared.ctxPayload.MessageSidFirst = ids[0]; + prepared.ctxPayload.MessageSidLast = ids[ids.length - 1]; + } + } + await dispatchPreparedSlackMessage(prepared); + }, + onError: (err) => { + ctx.runtime.error?.(`slack inbound debounce flush failed: ${String(err)}`); + }, + }); return async (message, opts) => { if (opts.source === "message" && message.type !== "message") return; @@ -26,8 +91,6 @@ export function createSlackMessageHandler(params: { return; } if (ctx.markMessageSeen(message.channel, message.ts)) return; - const prepared = await prepareSlackMessage({ ctx, account, message, opts }); - if (!prepared) return; - await dispatchPreparedSlackMessage(prepared); + await debouncer.enqueue({ message, opts }); }; } diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index c49c0944e..3e2c421a1 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -1,4 +1,9 @@ // @ts-nocheck +import { hasControlCommand } from "../auto-reply/command-detection.js"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, +} from "../auto-reply/inbound-debounce.js"; import { loadConfig } from "../config/config.js"; import { writeConfigFile } from "../config/io.js"; import { danger, logVerbose, warn } from "../globals.js"; @@ -43,6 +48,61 @@ export const registerTelegramHandlers = ({ const textFragmentBuffer = new Map(); let textFragmentProcessing: Promise = Promise.resolve(); + const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" }); + type TelegramDebounceEntry = { + ctx: unknown; + msg: TelegramMessage; + allMedia: Array<{ path: string; contentType?: string }>; + storeAllowFrom: string[]; + debounceKey: string | null; + botUsername?: string; + }; + const inboundDebouncer = createInboundDebouncer({ + debounceMs, + buildKey: (entry) => entry.debounceKey, + shouldDebounce: (entry) => { + if (entry.allMedia.length > 0) return false; + const text = entry.msg.text ?? entry.msg.caption ?? ""; + if (!text.trim()) return false; + return !hasControlCommand(text, cfg, { botUsername: entry.botUsername }); + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) return; + if (entries.length === 1) { + await processMessage(last.ctx, last.allMedia, last.storeAllowFrom); + return; + } + const combinedText = entries + .map((entry) => entry.msg.text ?? entry.msg.caption ?? "") + .filter(Boolean) + .join("\n"); + if (!combinedText.trim()) return; + const first = entries[0]; + const baseCtx = first.ctx as { me?: unknown; getFile?: unknown } & Record; + const getFile = + typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({}); + const syntheticMessage: TelegramMessage = { + ...first.msg, + text: combinedText, + caption: undefined, + caption_entities: undefined, + entities: undefined, + date: last.msg.date ?? first.msg.date, + }; + const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined; + await processMessage( + { message: syntheticMessage, me: baseCtx.me, getFile }, + [], + first.storeAllowFrom, + messageIdOverride ? { messageIdOverride } : undefined, + ); + }, + onError: (err) => { + runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`)); + }, + }); + const processMediaGroup = async (entry: MediaGroupEntry) => { try { entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); @@ -403,7 +463,20 @@ export const registerTelegramHandlers = ({ throw mediaErr; } const allMedia = media ? [{ path: media.path, contentType: media.contentType }] : []; - await processMessage(ctx, allMedia, storeAllowFrom); + const senderId = msg.from?.id ? String(msg.from.id) : ""; + const conversationKey = + resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId); + const debounceKey = senderId + ? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}` + : null; + await inboundDebouncer.enqueue({ + ctx, + msg, + allMedia, + storeAllowFrom, + debounceKey, + botUsername: ctx.me?.username, + }); } catch (err) { runtime.error?.(danger(`handler failed: ${String(err)}`)); } diff --git a/src/web/accounts.ts b/src/web/accounts.ts index 618b57d2d..7a4467390 100644 --- a/src/web/accounts.ts +++ b/src/web/accounts.ts @@ -26,6 +26,7 @@ export type ResolvedWhatsAppAccount = { blockStreaming?: boolean; ackReaction?: WhatsAppAccountConfig["ackReaction"]; groups?: WhatsAppAccountConfig["groups"]; + debounceMs?: number; }; function listConfiguredAccountIds(cfg: ClawdbotConfig): string[] { @@ -153,6 +154,7 @@ export function resolveWhatsAppAccount(params: { blockStreaming: accountCfg?.blockStreaming ?? rootCfg?.blockStreaming, ackReaction: accountCfg?.ackReaction ?? rootCfg?.ackReaction, groups: accountCfg?.groups ?? rootCfg?.groups, + debounceMs: accountCfg?.debounceMs ?? rootCfg?.debounceMs, }; } diff --git a/src/web/auto-reply/monitor.ts b/src/web/auto-reply/monitor.ts index c8d139032..b5f4383e0 100644 --- a/src/web/auto-reply/monitor.ts +++ b/src/web/auto-reply/monitor.ts @@ -1,5 +1,7 @@ import { DEFAULT_GROUP_HISTORY_LIMIT } from "../../auto-reply/reply/history.js"; import { getReplyFromConfig } from "../../auto-reply/reply.js"; +import { hasControlCommand } from "../../auto-reply/command-detection.js"; +import { resolveInboundDebounceMs } from "../../auto-reply/inbound-debounce.js"; import { waitForever } from "../../cli/wait.js"; import { loadConfig } from "../../config/config.js"; import { logVerbose } from "../../globals.js"; @@ -169,12 +171,22 @@ export async function monitorWebChannel( account, }); + const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "whatsapp" }); + const shouldDebounce = (msg: WebInboundMsg) => { + if (msg.mediaPath || msg.mediaType) return false; + if (msg.location) return false; + if (msg.replyToId || msg.replyToBody) return false; + return !hasControlCommand(msg.body, cfg); + }; + const listener = await (listenerFactory ?? monitorWebInbox)({ verbose, accountId: account.accountId, authDir: account.authDir, mediaMaxMb: account.mediaMaxMb, sendReadReceipts: account.sendReadReceipts, + debounceMs: inboundDebounceMs, + shouldDebounce, onMessage: async (msg: WebInboundMsg) => { handledMessages += 1; lastMessageAt = Date.now(); diff --git a/src/web/auto-reply/types.ts b/src/web/auto-reply/types.ts index db135f4c8..cb6ce4ce4 100644 --- a/src/web/auto-reply/types.ts +++ b/src/web/auto-reply/types.ts @@ -30,4 +30,6 @@ export type WebMonitorTuning = { statusSink?: (status: WebChannelStatus) => void; /** WhatsApp account id. Default: "default". */ accountId?: string; + /** Debounce window (ms) for batching rapid consecutive messages from the same sender. */ + debounceMs?: number; }; diff --git a/src/web/inbound/monitor.ts b/src/web/inbound/monitor.ts index 3c92fe4a2..b6e8a575b 100644 --- a/src/web/inbound/monitor.ts +++ b/src/web/inbound/monitor.ts @@ -5,6 +5,7 @@ import { logVerbose, shouldLogVerbose } from "../../globals.js"; import { recordChannelActivity } from "../../infra/channel-activity.js"; import { createSubsystemLogger, getChildLogger } from "../../logging.js"; import { saveMediaBuffer } from "../../media/store.js"; +import { createInboundDebouncer } from "../../auto-reply/inbound-debounce.js"; import { jidToE164, resolveJidToE164 } from "../../utils.js"; import { createWaSocket, getStatusCode, waitForWaConnection } from "../session.js"; import { checkInboundAccessControl } from "./access-control.js"; @@ -28,6 +29,10 @@ export async function monitorWebInbox(options: { mediaMaxMb?: number; /** Send read receipts for incoming messages (default true). */ sendReadReceipts?: boolean; + /** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */ + debounceMs?: number; + /** Optional debounce gating predicate. */ + shouldDebounce?: (msg: WebInboundMessage) => boolean; }) { const inboundLogger = getChildLogger({ module: "web-inbound" }); const inboundConsoleLog = createSubsystemLogger("gateway/channels/whatsapp").child("inbound"); @@ -56,6 +61,45 @@ export async function monitorWebInbox(options: { const selfJid = sock.user?.id; const selfE164 = selfJid ? jidToE164(selfJid) : null; + const debouncer = createInboundDebouncer({ + debounceMs: options.debounceMs ?? 0, + buildKey: (msg) => { + const senderKey = + msg.chatType === "group" + ? (msg.senderJid ?? msg.senderE164 ?? msg.senderName ?? msg.from) + : msg.from; + if (!senderKey) return null; + const conversationKey = msg.chatType === "group" ? msg.chatId : msg.from; + return `${msg.accountId}:${conversationKey}:${senderKey}`; + }, + shouldDebounce: options.shouldDebounce, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) return; + if (entries.length === 1) { + await options.onMessage(last); + return; + } + const mentioned = new Set(); + for (const entry of entries) { + for (const jid of entry.mentionedJids ?? []) mentioned.add(jid); + } + const combinedBody = entries + .map((entry) => entry.body) + .filter(Boolean) + .join("\n"); + const combinedMessage: WebInboundMessage = { + ...last, + body: combinedBody, + mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined, + }; + await options.onMessage(combinedMessage); + }, + onError: (err) => { + inboundLogger.error({ error: String(err) }, "failed handling inbound web message"); + inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`); + }, + }); const groupMetaCache = new Map< string, { subject?: string; participants?: string[]; expires: number } @@ -217,38 +261,37 @@ export async function monitorWebInbox(options: { { from, to: selfE164 ?? "me", body, mediaPath, mediaType, timestamp }, "inbound message", ); + const inboundMessage: WebInboundMessage = { + id, + from, + conversationId: from, + to: selfE164 ?? "me", + accountId: access.resolvedAccountId, + body, + pushName: senderName, + timestamp, + chatType: group ? "group" : "direct", + chatId: remoteJid, + senderJid: participantJid, + senderE164: senderE164 ?? undefined, + senderName, + replyToId: replyContext?.id, + replyToBody: replyContext?.body, + replyToSender: replyContext?.sender, + groupSubject, + groupParticipants, + mentionedJids: mentionedJids ?? undefined, + selfJid, + selfE164, + location: location ?? undefined, + sendComposing, + reply, + sendMedia, + mediaPath, + mediaType, + }; try { - const task = Promise.resolve( - options.onMessage({ - id, - from, - conversationId: from, - to: selfE164 ?? "me", - accountId: access.resolvedAccountId, - body, - pushName: senderName, - timestamp, - chatType: group ? "group" : "direct", - chatId: remoteJid, - senderJid: participantJid, - senderE164: senderE164 ?? undefined, - senderName, - replyToId: replyContext?.id, - replyToBody: replyContext?.body, - replyToSender: replyContext?.sender, - groupSubject, - groupParticipants, - mentionedJids: mentionedJids ?? undefined, - selfJid, - selfE164, - location: location ?? undefined, - sendComposing, - reply, - sendMedia, - mediaPath, - mediaType, - }), - ); + const task = Promise.resolve(debouncer.enqueue(inboundMessage)); void task.catch((err) => { inboundLogger.error({ error: String(err) }, "failed handling inbound web message"); inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);