From 92e794dc18127fc1afbbb3e8eff716d4906d3712 Mon Sep 17 00:00:00 2001 From: Tyler Yust <64381258+tyler6204@users.noreply.github.com> Date: Sat, 24 Jan 2026 16:47:10 -0800 Subject: [PATCH] feat: add chunking mode option for BlueBubbles (#1645) * feat: add chunking mode for outbound messages - Introduced `chunkMode` option in various account configurations to allow splitting messages by "length" or "newline". - Updated message processing to handle chunking based on the selected mode. - Added tests for new chunking functionality, ensuring correct behavior for both modes. * feat: enhance chunking mode documentation and configuration - Added `chunkMode` option to the BlueBubbles account configuration, allowing users to choose between "length" and "newline" for message chunking. - Updated documentation to clarify the behavior of the `chunkMode` setting. - Adjusted account merging logic to incorporate the new `chunkMode` configuration. * refactor: simplify chunk mode handling for BlueBubbles - Removed `chunkMode` configuration from various account schemas and types, centralizing chunk mode logic to BlueBubbles only. - Updated `processMessage` to default to "newline" for BlueBubbles chunking. - Adjusted tests to reflect changes in chunk mode handling for BlueBubbles, ensuring proper functionality. * fix: update default chunk mode to 'length' for BlueBubbles - Changed the default value of `chunkMode` from 'newline' to 'length' in the BlueBubbles configuration and related processing functions. - Updated documentation to reflect the new default behavior for chunking messages. - Adjusted tests to ensure the correct default value is returned for BlueBubbles chunk mode. --- docs/channels/bluebubbles.md | 1 + extensions/bluebubbles/src/accounts.ts | 3 +- extensions/bluebubbles/src/config-schema.ts | 1 + extensions/bluebubbles/src/monitor.ts | 20 +++- extensions/bluebubbles/src/types.ts | 2 + src/auto-reply/chunk.test.ts | 101 +++++++++++++++++++- src/auto-reply/chunk.ts | 84 +++++++++++++++- src/auto-reply/reply/block-streaming.ts | 20 +++- src/config/zod-schema.providers-core.ts | 1 + src/infra/outbound/deliver.ts | 3 +- src/plugin-sdk/index.ts | 1 + src/plugins/runtime/index.ts | 12 ++- src/plugins/runtime/types.ts | 6 ++ 13 files changed, 247 insertions(+), 8 deletions(-) diff --git a/docs/channels/bluebubbles.md b/docs/channels/bluebubbles.md index cf5faee1d..eed40b681 100644 --- a/docs/channels/bluebubbles.md +++ b/docs/channels/bluebubbles.md @@ -196,6 +196,7 @@ Provider options: - `channels.bluebubbles.sendReadReceipts`: Send read receipts (default: `true`). - `channels.bluebubbles.blockStreaming`: Enable block streaming (default: `true`). - `channels.bluebubbles.textChunkLimit`: Outbound chunk size in chars (default: 4000). +- `channels.bluebubbles.chunkMode`: `length` (default) splits only when exceeding `textChunkLimit`; `newline` splits on every newline and sends each line immediately during streaming. - `channels.bluebubbles.mediaMaxMb`: Inbound media cap in MB (default: 8). - `channels.bluebubbles.historyLimit`: Max group messages for context (0 disables). - `channels.bluebubbles.dmHistoryLimit`: DM history limit. diff --git a/extensions/bluebubbles/src/accounts.ts b/extensions/bluebubbles/src/accounts.ts index 5a4fee8ba..9fc94356d 100644 --- a/extensions/bluebubbles/src/accounts.ts +++ b/extensions/bluebubbles/src/accounts.ts @@ -47,7 +47,8 @@ function mergeBlueBubblesAccountConfig( }; const { accounts: _ignored, ...rest } = base; const account = resolveAccountConfig(cfg, accountId) ?? {}; - return { ...rest, ...account }; + const chunkMode = account.chunkMode ?? rest.chunkMode ?? "length"; + return { ...rest, ...account, chunkMode }; } export function resolveBlueBubblesAccount(params: { diff --git a/extensions/bluebubbles/src/config-schema.ts b/extensions/bluebubbles/src/config-schema.ts index 844641b94..dc532e979 100644 --- a/extensions/bluebubbles/src/config-schema.ts +++ b/extensions/bluebubbles/src/config-schema.ts @@ -38,6 +38,7 @@ const bluebubblesAccountSchema = z.object({ historyLimit: z.number().int().min(0).optional(), dmHistoryLimit: z.number().int().min(0).optional(), textChunkLimit: z.number().int().positive().optional(), + chunkMode: z.enum(["length", "newline"]).optional(), mediaMaxMb: z.number().int().positive().optional(), sendReadReceipts: z.boolean().optional(), blockStreaming: z.boolean().optional(), diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index 570ca42e0..8635b183e 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -1851,16 +1851,21 @@ async function processMessage( account.config.textChunkLimit && account.config.textChunkLimit > 0 ? account.config.textChunkLimit : DEFAULT_TEXT_LIMIT; + const chunkMode = account.config.chunkMode ?? "length"; const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg: config, channel: "bluebubbles", accountId: account.accountId, }); const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode); - const chunks = core.channel.text.chunkMarkdownText(text, textLimit); + const chunks = + chunkMode === "newline" + ? core.channel.text.chunkTextWithMode(text, textLimit, chunkMode) + : core.channel.text.chunkMarkdownText(text, textLimit); if (!chunks.length && text) chunks.push(text); if (!chunks.length) return; - for (const chunk of chunks) { + for (let i = 0; i < chunks.length; i++) { + const chunk = chunks[i]; const result = await sendMessageBlueBubbles(outboundTarget, chunk, { cfg: config, accountId: account.accountId, @@ -1869,6 +1874,17 @@ async function processMessage( maybeEnqueueOutboundMessageId(result.messageId, chunk); sentMessage = true; statusSink?.({ lastOutboundAt: Date.now() }); + // In newline mode, restart typing after each chunk if more chunks remain + // Small delay allows the Apple API to finish clearing the typing state from message send + if (chunkMode === "newline" && i < chunks.length - 1 && chatGuidForActions) { + await new Promise((r) => setTimeout(r, 150)); + sendBlueBubblesTyping(chatGuidForActions, true, { + cfg: config, + accountId: account.accountId, + }).catch(() => { + // Ignore typing errors + }); + } } }, onReplyStart: async () => { diff --git a/extensions/bluebubbles/src/types.ts b/extensions/bluebubbles/src/types.ts index 6b1da775b..d2aeb4022 100644 --- a/extensions/bluebubbles/src/types.ts +++ b/extensions/bluebubbles/src/types.ts @@ -38,6 +38,8 @@ export type BlueBubblesAccountConfig = { dms?: Record; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; + /** Chunking mode: "newline" (default) splits on every newline; "length" splits by size. */ + chunkMode?: "length" | "newline"; blockStreaming?: boolean; /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: Record; diff --git a/src/auto-reply/chunk.test.ts b/src/auto-reply/chunk.test.ts index 17e98739c..7a4b41d0e 100644 --- a/src/auto-reply/chunk.test.ts +++ b/src/auto-reply/chunk.test.ts @@ -1,6 +1,13 @@ import { describe, expect, it } from "vitest"; -import { chunkMarkdownText, chunkText, resolveTextChunkLimit } from "./chunk.js"; +import { + chunkByNewline, + chunkMarkdownText, + chunkText, + chunkTextWithMode, + resolveChunkMode, + resolveTextChunkLimit, +} from "./chunk.js"; function expectFencesBalanced(chunks: string[]) { for (const chunk of chunks) { @@ -231,3 +238,95 @@ describe("chunkMarkdownText", () => { expect(chunks.join("")).toBe(text); }); }); + +describe("chunkByNewline", () => { + it("splits text on newlines", () => { + const text = "Line one\nLine two\nLine three"; + const chunks = chunkByNewline(text, 1000); + expect(chunks).toEqual(["Line one", "Line two", "Line three"]); + }); + + it("filters empty lines", () => { + const text = "Line one\n\n\nLine two\n\nLine three"; + const chunks = chunkByNewline(text, 1000); + expect(chunks).toEqual(["Line one", "Line two", "Line three"]); + }); + + it("trims whitespace from lines", () => { + const text = " Line one \n Line two "; + const chunks = chunkByNewline(text, 1000); + expect(chunks).toEqual(["Line one", "Line two"]); + }); + + it("falls back to length-based for long lines", () => { + const text = "Short line\n" + "a".repeat(50) + "\nAnother short"; + const chunks = chunkByNewline(text, 20); + expect(chunks[0]).toBe("Short line"); + // Long line gets split into multiple chunks + expect(chunks[1].length).toBe(20); + expect(chunks[2].length).toBe(20); + expect(chunks[3].length).toBe(10); + expect(chunks[4]).toBe("Another short"); + }); + + it("returns empty array for empty input", () => { + expect(chunkByNewline("", 100)).toEqual([]); + }); + + it("returns empty array for whitespace-only input", () => { + expect(chunkByNewline(" \n\n ", 100)).toEqual([]); + }); +}); + +describe("chunkTextWithMode", () => { + it("uses length-based chunking for length mode", () => { + const text = "Line one\nLine two"; + const chunks = chunkTextWithMode(text, 1000, "length"); + expect(chunks).toEqual(["Line one\nLine two"]); + }); + + it("uses newline-based chunking for newline mode", () => { + const text = "Line one\nLine two"; + const chunks = chunkTextWithMode(text, 1000, "newline"); + expect(chunks).toEqual(["Line one", "Line two"]); + }); +}); + +describe("resolveChunkMode", () => { + it("returns length as default", () => { + expect(resolveChunkMode(undefined, "telegram")).toBe("length"); + expect(resolveChunkMode({}, "discord")).toBe("length"); + expect(resolveChunkMode(undefined, "bluebubbles")).toBe("length"); + }); + + it("returns length for internal channel", () => { + const cfg = { channels: { bluebubbles: { chunkMode: "newline" as const } } }; + expect(resolveChunkMode(cfg, "__internal__")).toBe("length"); + }); + + it("supports provider-level overrides for bluebubbles", () => { + const cfg = { channels: { bluebubbles: { chunkMode: "newline" as const } } }; + expect(resolveChunkMode(cfg, "bluebubbles")).toBe("newline"); + expect(resolveChunkMode(cfg, "discord")).toBe("length"); + }); + + it("supports account-level overrides for bluebubbles", () => { + const cfg = { + channels: { + bluebubbles: { + chunkMode: "length" as const, + accounts: { + primary: { chunkMode: "newline" as const }, + }, + }, + }, + }; + expect(resolveChunkMode(cfg, "bluebubbles", "primary")).toBe("newline"); + expect(resolveChunkMode(cfg, "bluebubbles", "other")).toBe("length"); + }); + + it("ignores chunkMode for non-bluebubbles providers", () => { + const cfg = { channels: { ["telegram" as string]: { chunkMode: "newline" as const } } }; + expect(resolveChunkMode(cfg, "telegram")).toBe("length"); + }); +}); diff --git a/src/auto-reply/chunk.ts b/src/auto-reply/chunk.ts index abbd830a2..281612e37 100644 --- a/src/auto-reply/chunk.ts +++ b/src/auto-reply/chunk.ts @@ -10,11 +10,20 @@ import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js"; export type TextChunkProvider = ChannelId | typeof INTERNAL_MESSAGE_CHANNEL; +/** + * Chunking mode for outbound messages: + * - "length": Split only when exceeding textChunkLimit (default) + * - "newline": Split on every newline, with fallback to length-based for long lines + */ +export type ChunkMode = "length" | "newline"; + const DEFAULT_CHUNK_LIMIT = 4000; +const DEFAULT_CHUNK_MODE: ChunkMode = "length"; type ProviderChunkConfig = { textChunkLimit?: number; - accounts?: Record; + chunkMode?: ChunkMode; + accounts?: Record; }; function resolveChunkLimitForProvider( @@ -63,6 +72,79 @@ export function resolveTextChunkLimit( return fallback; } +function resolveChunkModeForProvider( + cfgSection: ProviderChunkConfig | undefined, + accountId?: string | null, +): ChunkMode | undefined { + if (!cfgSection) return undefined; + const normalizedAccountId = normalizeAccountId(accountId); + const accounts = cfgSection.accounts; + if (accounts && typeof accounts === "object") { + const direct = accounts[normalizedAccountId]; + if (direct?.chunkMode) { + return direct.chunkMode; + } + const matchKey = Object.keys(accounts).find( + (key) => key.toLowerCase() === normalizedAccountId.toLowerCase(), + ); + const match = matchKey ? accounts[matchKey] : undefined; + if (match?.chunkMode) { + return match.chunkMode; + } + } + return cfgSection.chunkMode; +} + +export function resolveChunkMode( + cfg: ClawdbotConfig | undefined, + provider?: TextChunkProvider, + accountId?: string | null, +): ChunkMode { + if (!provider || provider === INTERNAL_MESSAGE_CHANNEL) return DEFAULT_CHUNK_MODE; + // Chunk mode is only supported for BlueBubbles. + if (provider !== "bluebubbles") return DEFAULT_CHUNK_MODE; + const channelsConfig = cfg?.channels as Record | undefined; + const providerConfig = (channelsConfig?.[provider] ?? + (cfg as Record | undefined)?.[provider]) as ProviderChunkConfig | undefined; + const mode = resolveChunkModeForProvider(providerConfig, accountId); + return mode ?? DEFAULT_CHUNK_MODE; +} + +/** + * Split text on newlines, filtering empty lines. + * Lines exceeding maxLineLength are further split using length-based chunking. + */ +export function chunkByNewline(text: string, maxLineLength: number): string[] { + if (!text) return []; + const lines = text.split("\n"); + const chunks: string[] = []; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; // skip empty lines + + if (trimmed.length <= maxLineLength) { + chunks.push(trimmed); + } else { + // Long line: fall back to length-based chunking + const subChunks = chunkText(trimmed, maxLineLength); + chunks.push(...subChunks); + } + } + + return chunks; +} + +/** + * Unified chunking function that dispatches based on mode. + */ +export function chunkTextWithMode(text: string, limit: number, mode: ChunkMode): string[] { + if (mode === "newline") { + return chunkByNewline(text, limit); + } + return chunkText(text, limit); +} + export function chunkText(text: string, limit: number): string[] { if (!text) return []; if (limit <= 0) return [text]; diff --git a/src/auto-reply/reply/block-streaming.ts b/src/auto-reply/reply/block-streaming.ts index 52d114500..fb462b107 100644 --- a/src/auto-reply/reply/block-streaming.ts +++ b/src/auto-reply/reply/block-streaming.ts @@ -7,7 +7,7 @@ import { INTERNAL_MESSAGE_CHANNEL, listDeliverableMessageChannels, } from "../../utils/message-channel.js"; -import { resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js"; +import { resolveChunkMode, resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js"; const DEFAULT_BLOCK_STREAM_MIN = 800; const DEFAULT_BLOCK_STREAM_MAX = 1200; @@ -68,6 +68,17 @@ export function resolveBlockStreamingChunking( fallbackLimit: providerChunkLimit, }); const chunkCfg = cfg?.agents?.defaults?.blockStreamingChunk; + + // BlueBubbles-only: if chunkMode is "newline", use newline-based streaming + const channelChunkMode = resolveChunkMode(cfg, providerKey, accountId); + if (channelChunkMode === "newline") { + // For newline mode: use very low minChars to flush quickly on newlines + const minChars = Math.max(1, Math.floor(chunkCfg?.minChars ?? 1)); + const maxRequested = Math.max(1, Math.floor(chunkCfg?.maxChars ?? textLimit)); + const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); + return { minChars, maxChars, breakPreference: "newline" }; + } + const maxRequested = Math.max(1, Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX)); const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); const minFallback = DEFAULT_BLOCK_STREAM_MIN; @@ -91,6 +102,13 @@ export function resolveBlockStreamingCoalescing( }, ): BlockStreamingCoalescing | undefined { const providerKey = normalizeChunkProvider(provider); + + // BlueBubbles-only: when chunkMode is "newline", disable coalescing to send each line immediately + const channelChunkMode = resolveChunkMode(cfg, providerKey, accountId); + if (channelChunkMode === "newline") { + return undefined; + } + const providerId = providerKey ? normalizeChannelId(providerKey) : null; const providerChunkLimit = providerId ? getChannelDock(providerId)?.outbound?.textChunkLimit diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 2aee48711..9d1eaa285 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -633,6 +633,7 @@ export const BlueBubblesAccountSchemaBase = z dmHistoryLimit: z.number().int().min(0).optional(), dms: z.record(z.string(), DmConfigSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), + chunkMode: z.enum(["length", "newline"]).optional(), mediaMaxMb: z.number().int().positive().optional(), sendReadReceipts: z.boolean().optional(), blockStreaming: z.boolean().optional(), diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 73f5550e0..2665e9957 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -212,7 +212,8 @@ export async function deliverOutboundPayloads(params: { results.push(await handler.sendText(text)); return; } - for (const chunk of handler.chunker(text, textLimit)) { + const chunks = handler.chunker(text, textLimit); + for (const chunk of chunks) { throwIfAborted(abortSignal); results.push(await handler.sendText(chunk)); } diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index f40d99d82..cb4e95a82 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -112,6 +112,7 @@ export type { WizardPrompter } from "../wizard/prompts.js"; export { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "../routing/session-key.js"; export { resolveAckReaction } from "../agents/identity.js"; export type { ReplyPayload } from "../auto-reply/types.js"; +export type { ChunkMode } from "../auto-reply/chunk.js"; export { SILENT_REPLY_TOKEN, isSilentReplyText } from "../auto-reply/tokens.js"; export { buildPendingHistoryContextFromMap, diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index 5783711b1..50e0a2d03 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -1,6 +1,13 @@ import { createRequire } from "node:module"; -import { chunkMarkdownText, chunkText, resolveTextChunkLimit } from "../../auto-reply/chunk.js"; +import { + chunkByNewline, + chunkMarkdownText, + chunkText, + chunkTextWithMode, + resolveChunkMode, + resolveTextChunkLimit, +} from "../../auto-reply/chunk.js"; import { hasControlCommand, isControlCommandMessage, @@ -160,8 +167,11 @@ export function createPluginRuntime(): PluginRuntime { }, channel: { text: { + chunkByNewline, chunkMarkdownText, chunkText, + chunkTextWithMode, + resolveChunkMode, resolveTextChunkLimit, hasControlCommand, resolveMarkdownTableMode, diff --git a/src/plugins/runtime/types.ts b/src/plugins/runtime/types.ts index 115cb447e..40d936762 100644 --- a/src/plugins/runtime/types.ts +++ b/src/plugins/runtime/types.ts @@ -35,8 +35,11 @@ type ResolveInboundDebounceMs = type ResolveCommandAuthorizedFromAuthorizers = typeof import("../../channels/command-gating.js").resolveCommandAuthorizedFromAuthorizers; type ResolveTextChunkLimit = typeof import("../../auto-reply/chunk.js").resolveTextChunkLimit; +type ResolveChunkMode = typeof import("../../auto-reply/chunk.js").resolveChunkMode; type ChunkMarkdownText = typeof import("../../auto-reply/chunk.js").chunkMarkdownText; type ChunkText = typeof import("../../auto-reply/chunk.js").chunkText; +type ChunkTextWithMode = typeof import("../../auto-reply/chunk.js").chunkTextWithMode; +type ChunkByNewline = typeof import("../../auto-reply/chunk.js").chunkByNewline; type ResolveMarkdownTableMode = typeof import("../../config/markdown-tables.js").resolveMarkdownTableMode; type ConvertMarkdownTables = typeof import("../../markdown/tables.js").convertMarkdownTables; @@ -173,8 +176,11 @@ export type PluginRuntime = { }; channel: { text: { + chunkByNewline: ChunkByNewline; chunkMarkdownText: ChunkMarkdownText; chunkText: ChunkText; + chunkTextWithMode: ChunkTextWithMode; + resolveChunkMode: ResolveChunkMode; resolveTextChunkLimit: ResolveTextChunkLimit; hasControlCommand: HasControlCommand; resolveMarkdownTableMode: ResolveMarkdownTableMode;