diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index c4878690d..3391666da 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -22,6 +22,7 @@ import { emitAgentEvent, registerAgentRunContext, } from "../../infra/agent-events.js"; +import { isAudioFileName } from "../../media/mime.js"; import { defaultRuntime } from "../../runtime.js"; import { estimateUsageCost, @@ -34,8 +35,11 @@ import type { OriginatingChannelType, TemplateContext } from "../templating.js"; import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { extractAudioTag } from "./audio-tags.js"; -import { createBlockReplyPipeline } from "./block-reply-pipeline.js"; +import { parseAudioTag } from "./audio-tags.js"; +import { + createAudioAsVoiceBuffer, + createBlockReplyPipeline, +} from "./block-reply-pipeline.js"; import { resolveBlockStreamingCoalescing } from "./block-streaming.js"; import { createFollowupRunner } from "./followup-runner.js"; import { @@ -261,13 +265,12 @@ export async function runReplyAgent(params: { const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS; - // Buffer audio blocks to apply [[audio_as_voice]] tag that may come later - const bufferedAudioBlocks: ReplyPayload[] = []; - let seenAudioAsVoice = false; - - const AUDIO_EXTENSIONS = /\.(opus|mp3|m4a|wav|ogg|aac|flac)$/i; const hasAudioMedia = (urls?: string[]): boolean => - Boolean(urls?.some((u) => AUDIO_EXTENSIONS.test(u))); + Boolean(urls?.some((u) => isAudioFileName(u))); + const isAudioPayload = (payload: ReplyPayload) => + hasAudioMedia( + payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : undefined), + ); const replyToChannel = sessionCtx.OriginatingChannel ?? ((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as @@ -297,6 +300,7 @@ export async function runReplyAgent(params: { onBlockReply: opts.onBlockReply, timeoutMs: blockReplyTimeoutMs, coalescing: blockReplyCoalescing, + buffer: createAudioAsVoiceBuffer({ isAudioPayload }), }) : null; @@ -546,8 +550,8 @@ export async function runReplyAgent(params: { !payload.audioAsVoice ) return; - const audioTagResult = extractAudioTag(taggedPayload.text); - const cleaned = audioTagResult.cleaned || undefined; + const audioTagResult = parseAudioTag(taggedPayload.text); + const cleaned = audioTagResult.text || undefined; const hasMedia = Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; @@ -559,11 +563,6 @@ export async function runReplyAgent(params: { ) return; - // Track if we've seen [[audio_as_voice]] from payload or text extraction - if (payload.audioAsVoice || audioTagResult.audioAsVoice) { - seenAudioAsVoice = true; - } - const blockPayload: ReplyPayload = applyReplyToMode({ ...taggedPayload, text: cleaned, @@ -579,13 +578,6 @@ export async function runReplyAgent(params: { ); }); - // Buffer audio blocks to apply [[audio_as_voice]] that may come later - const isAudioBlock = hasAudioMedia(taggedPayload.mediaUrls); - if (isAudioBlock) { - bufferedAudioBlocks.push(blockPayload); - return; // Don't send immediately - wait for potential [[audio_as_voice]] tag - } - blockReplyPipeline?.enqueue(blockPayload); } : undefined, @@ -701,16 +693,6 @@ export async function runReplyAgent(params: { const payloadArray = runResult.payloads ?? []; - if (bufferedAudioBlocks.length > 0 && blockReplyPipeline) { - for (const audioPayload of bufferedAudioBlocks) { - const finalPayload = seenAudioAsVoice - ? { ...audioPayload, audioAsVoice: true } - : audioPayload; - blockReplyPipeline.enqueue(finalPayload); - } - bufferedAudioBlocks.length = 0; - } - if (blockReplyPipeline) { await blockReplyPipeline.flush({ force: true }); blockReplyPipeline.stop(); @@ -753,10 +735,10 @@ export async function runReplyAgent(params: { currentMessageId: sessionCtx.MessageSid, }) .map((payload) => { - const audioTagResult = extractAudioTag(payload.text); + const audioTagResult = parseAudioTag(payload.text); return { ...payload, - text: audioTagResult.cleaned ? audioTagResult.cleaned : undefined, + text: audioTagResult.text ? audioTagResult.text : undefined, audioAsVoice: audioTagResult.audioAsVoice, }; }) diff --git a/src/auto-reply/reply/audio-tags.test.ts b/src/auto-reply/reply/audio-tags.test.ts index 4a1b8d16b..48d952c15 100644 --- a/src/auto-reply/reply/audio-tags.test.ts +++ b/src/auto-reply/reply/audio-tags.test.ts @@ -1,25 +1,25 @@ import { describe, expect, it } from "vitest"; -import { extractAudioTag } from "./audio-tags.js"; +import { parseAudioTag } from "./audio-tags.js"; -describe("extractAudioTag", () => { +describe("parseAudioTag", () => { it("detects audio_as_voice and strips the tag", () => { - const result = extractAudioTag("Hello [[audio_as_voice]] world"); + const result = parseAudioTag("Hello [[audio_as_voice]] world"); expect(result.audioAsVoice).toBe(true); - expect(result.hasTag).toBe(true); - expect(result.cleaned).toBe("Hello world"); + expect(result.hadTag).toBe(true); + expect(result.text).toBe("Hello world"); }); it("returns empty output for missing text", () => { - const result = extractAudioTag(undefined); + const result = parseAudioTag(undefined); expect(result.audioAsVoice).toBe(false); - expect(result.hasTag).toBe(false); - expect(result.cleaned).toBe(""); + expect(result.hadTag).toBe(false); + expect(result.text).toBe(""); }); it("removes tag-only messages", () => { - const result = extractAudioTag("[[audio_as_voice]]"); + const result = parseAudioTag("[[audio_as_voice]]"); expect(result.audioAsVoice).toBe(true); - expect(result.cleaned).toBe(""); + expect(result.text).toBe(""); }); }); diff --git a/src/auto-reply/reply/audio-tags.ts b/src/auto-reply/reply/audio-tags.ts index 4ec9fc15c..8d05068ca 100644 --- a/src/auto-reply/reply/audio-tags.ts +++ b/src/auto-reply/reply/audio-tags.ts @@ -1,31 +1 @@ -/** - * Extract audio mode tag from text. - * Supports [[audio_as_voice]] to send audio as voice bubble instead of file. - * Default is file (preserves backward compatibility). - */ -export function extractAudioTag(text?: string): { - cleaned: string; - audioAsVoice: boolean; - hasTag: boolean; -} { - if (!text) return { cleaned: "", audioAsVoice: false, hasTag: false }; - let cleaned = text; - let audioAsVoice = false; // default: audio file (backward compatible) - let hasTag = false; - - // [[audio_as_voice]] -> send as voice bubble (opt-in) - const voiceMatch = cleaned.match(/\[\[audio_as_voice\]\]/i); - if (voiceMatch) { - cleaned = cleaned.replace(/\[\[audio_as_voice\]\]/gi, " "); - audioAsVoice = true; - hasTag = true; - } - - // Clean up whitespace - cleaned = cleaned - .replace(/[ \t]+/g, " ") - .replace(/[ \t]*\n[ \t]*/g, "\n") - .trim(); - - return { cleaned, audioAsVoice, hasTag }; -} +export { parseAudioTag } from "../../media/audio-tags.js"; diff --git a/src/auto-reply/reply/block-reply-pipeline.ts b/src/auto-reply/reply/block-reply-pipeline.ts index a6ff7141d..2ca263be9 100644 --- a/src/auto-reply/reply/block-reply-pipeline.ts +++ b/src/auto-reply/reply/block-reply-pipeline.ts @@ -13,6 +13,28 @@ export type BlockReplyPipeline = { hasSentPayload: (payload: ReplyPayload) => boolean; }; +export type BlockReplyBuffer = { + shouldBuffer: (payload: ReplyPayload) => boolean; + onEnqueue?: (payload: ReplyPayload) => void; + finalize?: (payload: ReplyPayload) => ReplyPayload; +}; + +export function createAudioAsVoiceBuffer(params: { + isAudioPayload: (payload: ReplyPayload) => boolean; +}): BlockReplyBuffer { + let seenAudioAsVoice = false; + return { + onEnqueue: (payload) => { + if (payload.audioAsVoice) { + seenAudioAsVoice = true; + } + }, + shouldBuffer: (payload) => params.isAudioPayload(payload), + finalize: (payload) => + seenAudioAsVoice ? { ...payload, audioAsVoice: true } : payload, + }; +} + export function createBlockReplyPayloadKey(payload: ReplyPayload): string { const text = payload.text?.trim() ?? ""; const mediaList = payload.mediaUrls?.length @@ -51,12 +73,15 @@ export function createBlockReplyPipeline(params: { ) => Promise | void; timeoutMs: number; coalescing?: BlockStreamingCoalescing; + buffer?: BlockReplyBuffer; }): BlockReplyPipeline { - const { onBlockReply, timeoutMs, coalescing } = params; + const { onBlockReply, timeoutMs, coalescing, buffer } = params; const sentKeys = new Set(); const pendingKeys = new Set(); const seenKeys = new Set(); const bufferedKeys = new Set(); + const bufferedPayloadKeys = new Set(); + const bufferedPayloads: ReplyPayload[] = []; let sendChain: Promise = Promise.resolve(); let aborted = false; let didStream = false; @@ -124,8 +149,37 @@ export function createBlockReplyPipeline(params: { }) : null; + const bufferPayload = (payload: ReplyPayload) => { + buffer?.onEnqueue?.(payload); + if (!buffer?.shouldBuffer(payload)) return false; + const payloadKey = createBlockReplyPayloadKey(payload); + if ( + seenKeys.has(payloadKey) || + sentKeys.has(payloadKey) || + pendingKeys.has(payloadKey) || + bufferedPayloadKeys.has(payloadKey) + ) { + return true; + } + seenKeys.add(payloadKey); + bufferedPayloadKeys.add(payloadKey); + bufferedPayloads.push(payload); + return true; + }; + + const flushBuffered = () => { + if (!bufferedPayloads.length) return; + for (const payload of bufferedPayloads) { + const finalPayload = buffer?.finalize?.(payload) ?? payload; + sendPayload(finalPayload, true); + } + bufferedPayloads.length = 0; + bufferedPayloadKeys.clear(); + }; + const enqueue = (payload: ReplyPayload) => { if (aborted) return; + if (bufferPayload(payload)) return; const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; if (hasMedia) { @@ -151,6 +205,7 @@ export function createBlockReplyPipeline(params: { const flush = async (options?: { force?: boolean }) => { await coalescer?.flush(options); + flushBuffered(); await sendChain; }; @@ -162,7 +217,8 @@ export function createBlockReplyPipeline(params: { enqueue, flush, stop, - hasBuffered: () => Boolean(coalescer?.hasBuffered()), + hasBuffered: () => + Boolean(coalescer?.hasBuffered() || bufferedPayloads.length > 0), didStream: () => didStream, isAborted: () => aborted, hasSentPayload: (payload) => { diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index a2b8d5738..2c07cb4d8 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -7,7 +7,7 @@ import { tryFastAbortFromMessage } from "./abort.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; -type DispatchFromConfigResult = { +export type DispatchFromConfigResult = { queuedFinal: boolean; counts: Record; }; diff --git a/src/auto-reply/reply/provider-dispatcher.ts b/src/auto-reply/reply/provider-dispatcher.ts new file mode 100644 index 000000000..7826ff117 --- /dev/null +++ b/src/auto-reply/reply/provider-dispatcher.ts @@ -0,0 +1,34 @@ +import type { ClawdbotConfig } from "../../config/config.js"; +import type { MsgContext } from "../templating.js"; +import type { GetReplyOptions } from "../types.js"; +import type { DispatchFromConfigResult } from "./dispatch-from-config.js"; +import { dispatchReplyFromConfig } from "./dispatch-from-config.js"; +import { + createReplyDispatcherWithTyping, + type ReplyDispatcherWithTypingOptions, +} from "./reply-dispatcher.js"; + +export async function dispatchReplyWithBufferedBlockDispatcher(params: { + ctx: MsgContext; + cfg: ClawdbotConfig; + dispatcherOptions: ReplyDispatcherWithTypingOptions; + replyOptions?: Omit; + replyResolver?: typeof import("../reply.js").getReplyFromConfig; +}): Promise { + const { dispatcher, replyOptions, markDispatchIdle } = + createReplyDispatcherWithTyping(params.dispatcherOptions); + + const result = await dispatchReplyFromConfig({ + ctx: params.ctx, + cfg: params.cfg, + dispatcher, + replyResolver: params.replyResolver, + replyOptions: { + ...params.replyOptions, + ...replyOptions, + }, + }); + + markDispatchIdle(); + return result; +} diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index 83b33b8d1..02861af3d 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -22,7 +22,7 @@ export type ReplyDispatcherOptions = { onError?: ReplyDispatchErrorHandler; }; -type ReplyDispatcherWithTypingOptions = Omit< +export type ReplyDispatcherWithTypingOptions = Omit< ReplyDispatcherOptions, "onIdle" > & { diff --git a/src/media/audio-tags.ts b/src/media/audio-tags.ts new file mode 100644 index 000000000..4ebb0b6fb --- /dev/null +++ b/src/media/audio-tags.ts @@ -0,0 +1,31 @@ +/** + * Extract audio mode tag from text. + * Supports [[audio_as_voice]] to send audio as voice bubble instead of file. + * Default is file (preserves backward compatibility). + */ +export function parseAudioTag(text?: string): { + text: string; + audioAsVoice: boolean; + hadTag: boolean; +} { + if (!text) return { text: "", audioAsVoice: false, hadTag: false }; + let cleaned = text; + let audioAsVoice = false; // default: audio file (backward compatible) + let hadTag = false; + + // [[audio_as_voice]] -> send as voice bubble (opt-in) + const voiceMatch = cleaned.match(/\[\[audio_as_voice\]\]/i); + if (voiceMatch) { + cleaned = cleaned.replace(/\[\[audio_as_voice\]\]/gi, " "); + audioAsVoice = true; + hadTag = true; + } + + // Clean up whitespace + cleaned = cleaned + .replace(/[ \t]+/g, " ") + .replace(/[ \t]*\n[ \t]*/g, "\n") + .trim(); + + return { text: cleaned, audioAsVoice, hadTag }; +} diff --git a/src/media/fetch.ts b/src/media/fetch.ts new file mode 100644 index 000000000..55d15a63d --- /dev/null +++ b/src/media/fetch.ts @@ -0,0 +1,125 @@ +import path from "node:path"; + +import { detectMime, extensionForMime } from "./mime.js"; + +type FetchMediaResult = { + buffer: Buffer; + contentType?: string; + fileName?: string; +}; + +type FetchMediaOptions = { + url: string; + fetchImpl?: typeof fetch; + filePathHint?: string; +}; + +function stripQuotes(value: string): string { + return value.replace(/^["']|["']$/g, ""); +} + +function parseContentDispositionFileName( + header?: string | null, +): string | undefined { + if (!header) return undefined; + const starMatch = /filename\*\s*=\s*([^;]+)/i.exec(header); + if (starMatch?.[1]) { + const cleaned = stripQuotes(starMatch[1].trim()); + const encoded = cleaned.split("''").slice(1).join("''") || cleaned; + try { + return path.basename(decodeURIComponent(encoded)); + } catch { + return path.basename(encoded); + } + } + const match = /filename\s*=\s*([^;]+)/i.exec(header); + if (match?.[1]) return path.basename(stripQuotes(match[1].trim())); + return undefined; +} + +async function readErrorBodySnippet( + res: Response, + maxChars = 200, +): Promise { + try { + const text = await res.text(); + if (!text) return undefined; + const collapsed = text.replace(/\s+/g, " ").trim(); + if (!collapsed) return undefined; + if (collapsed.length <= maxChars) return collapsed; + return `${collapsed.slice(0, maxChars)}…`; + } catch { + return undefined; + } +} + +export async function fetchRemoteMedia( + options: FetchMediaOptions, +): Promise { + const { url, fetchImpl, filePathHint } = options; + const fetcher = fetchImpl ?? globalThis.fetch; + if (!fetcher) { + throw new Error("fetch is not available"); + } + + let res: Response; + try { + res = await fetcher(url); + } catch (err) { + throw new Error(`Failed to fetch media from ${url}: ${String(err)}`); + } + + if (!res.ok) { + const statusText = res.statusText ? ` ${res.statusText}` : ""; + const redirected = + res.url && res.url !== url ? ` (redirected to ${res.url})` : ""; + let detail = `HTTP ${res.status}${statusText}`; + if (!res.body) { + detail = `HTTP ${res.status}${statusText}; empty response body`; + } else { + const snippet = await readErrorBodySnippet(res); + if (snippet) detail += `; body: ${snippet}`; + } + throw new Error( + `Failed to fetch media from ${url}${redirected}: ${detail}`, + ); + } + + const buffer = Buffer.from(await res.arrayBuffer()); + let fileNameFromUrl: string | undefined; + try { + const parsed = new URL(url); + const base = path.basename(parsed.pathname); + fileNameFromUrl = base || undefined; + } catch { + // ignore parse errors; leave undefined + } + + const headerFileName = parseContentDispositionFileName( + res.headers.get("content-disposition"), + ); + let fileName = + headerFileName || + fileNameFromUrl || + (filePathHint ? path.basename(filePathHint) : undefined); + + const filePathForMime = + headerFileName && path.extname(headerFileName) + ? headerFileName + : (filePathHint ?? url); + const contentType = await detectMime({ + buffer, + headerMime: res.headers.get("content-type"), + filePath: filePathForMime, + }); + if (fileName && !path.extname(fileName) && contentType) { + const ext = extensionForMime(contentType); + if (ext) fileName = `${fileName}${ext}`; + } + + return { + buffer, + contentType: contentType ?? undefined, + fileName, + }; +} diff --git a/src/media/mime.ts b/src/media/mime.ts index a53abdb23..308b2ede1 100644 --- a/src/media/mime.ts +++ b/src/media/mime.ts @@ -36,6 +36,17 @@ const MIME_BY_EXT: Record = Object.fromEntries( Object.entries(EXT_BY_MIME).map(([mime, ext]) => [ext, mime]), ); +const AUDIO_FILE_EXTENSIONS = new Set([ + ".aac", + ".flac", + ".m4a", + ".mp3", + ".oga", + ".ogg", + ".opus", + ".wav", +]); + function normalizeHeaderMime(mime?: string | null): string | undefined { if (!mime) return undefined; const cleaned = mime.split(";")[0]?.trim().toLowerCase(); @@ -52,7 +63,7 @@ async function sniffMime(buffer?: Buffer): Promise { } } -function extFromPath(filePath?: string): string | undefined { +export function getFileExtension(filePath?: string | null): string | undefined { if (!filePath) return undefined; try { if (/^https?:\/\//i.test(filePath)) { @@ -66,6 +77,12 @@ function extFromPath(filePath?: string): string | undefined { return ext || undefined; } +export function isAudioFileName(fileName?: string | null): boolean { + const ext = getFileExtension(fileName); + if (!ext) return false; + return AUDIO_FILE_EXTENSIONS.has(ext); +} + export function detectMime(opts: { buffer?: Buffer; headerMime?: string | null; @@ -85,7 +102,7 @@ async function detectMimeImpl(opts: { headerMime?: string | null; filePath?: string; }): Promise { - const ext = extFromPath(opts.filePath); + const ext = getFileExtension(opts.filePath); const extMime = ext ? MIME_BY_EXT[ext] : undefined; const headerMime = normalizeHeaderMime(opts.headerMime); @@ -112,9 +129,7 @@ export function isGifMedia(opts: { fileName?: string | null; }): boolean { if (opts.contentType?.toLowerCase() === "image/gif") return true; - const ext = opts.fileName - ? path.extname(opts.fileName).toLowerCase() - : undefined; + const ext = getFileExtension(opts.fileName); return ext === ".gif"; } diff --git a/src/media/parse.ts b/src/media/parse.ts index 38f751992..36ca2bef7 100644 --- a/src/media/parse.ts +++ b/src/media/parse.ts @@ -1,6 +1,7 @@ // Shared helpers for parsing MEDIA tokens from command/stdout text. import { parseFenceSpans } from "../markdown/fences.js"; +import { parseAudioTag } from "./audio-tags.js"; // Allow optional wrapping backticks and punctuation after the token; capture the core token. export const MEDIA_TOKEN_RE = /\bMEDIA:\s*`?([^\n]+)`?/gi; @@ -32,10 +33,6 @@ function isInsideFence( return fenceSpans.some((span) => offset >= span.start && offset < span.end); } -// Regex to detect [[audio_as_voice]] tag -const AUDIO_AS_VOICE_RE = /\[\[audio_as_voice\]\]/gi; -const AUDIO_AS_VOICE_TEST_RE = /\[\[audio_as_voice\]\]/i; - export function splitMediaFromOutput(raw: string): { text: string; mediaUrls?: string[]; @@ -124,13 +121,10 @@ export function splitMediaFromOutput(raw: string): { .trim(); // Detect and strip [[audio_as_voice]] tag - const hasAudioAsVoice = AUDIO_AS_VOICE_TEST_RE.test(cleanedText); - if (hasAudioAsVoice) { - cleanedText = cleanedText - .replace(AUDIO_AS_VOICE_RE, "") - .replace(/[ \t]+/g, " ") - .replace(/\n{2,}/g, "\n") - .trim(); + const audioTagResult = parseAudioTag(cleanedText); + const hasAudioAsVoice = audioTagResult.audioAsVoice; + if (audioTagResult.hadTag) { + cleanedText = audioTagResult.text.replace(/\n{2,}/g, "\n").trim(); } if (media.length === 0) { diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 1a3f84b8d..a0b6170bb 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -1,6 +1,4 @@ // @ts-nocheck -import { Buffer } from "node:buffer"; - import { sequentialize } from "@grammyjs/runner"; import { apiThrottler } from "@grammyjs/transformer-throttler"; import type { ApiClientOptions, Message } from "grammy"; @@ -22,12 +20,11 @@ import { } from "../auto-reply/commands-registry.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { resolveBlockStreamingChunking } from "../auto-reply/reply/block-streaming.js"; -import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; -import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; +import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { ClawdbotConfig, ReplyToMode } from "../config/config.js"; @@ -46,7 +43,8 @@ import { formatErrorMessage } from "../infra/errors.js"; import { recordProviderActivity } from "../infra/provider-activity.js"; import { getChildLogger } from "../logging.js"; import { mediaKindFromMime } from "../media/constants.js"; -import { detectMime, isGifMedia } from "../media/mime.js"; +import { fetchRemoteMedia } from "../media/fetch.js"; +import { isGifMedia } from "../media/mime.js"; import { saveMediaBuffer } from "../media/store.js"; import { formatLocationText, @@ -64,7 +62,7 @@ import { readTelegramAllowFromStore, upsertTelegramPairingRequest, } from "./pairing-store.js"; -import { resolveTelegramVoiceDecision } from "./voice.js"; +import { resolveTelegramVoiceSend } from "./voice.js"; const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i; @@ -805,8 +803,16 @@ export function createTelegramBot(opts: TelegramBotOptions) { await draftStream.flush(); }; - const { dispatcher, replyOptions, markDispatchIdle } = - createReplyDispatcherWithTyping({ + const disableBlockStreaming = + Boolean(draftStream) || + (typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : undefined); + + const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg, + dispatcherOptions: { responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) .responsePrefix, deliver: async (payload, info) => { @@ -831,20 +837,8 @@ export function createTelegramBot(opts: TelegramBotOptions) { ); }, onReplyStart: sendTyping, - }); - - const disableBlockStreaming = - Boolean(draftStream) || - (typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : undefined); - - const { queuedFinal } = await dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, + }, replyOptions: { - ...replyOptions, skillFilter, onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) @@ -857,7 +851,6 @@ export function createTelegramBot(opts: TelegramBotOptions) { disableBlockStreaming, }, }); - markDispatchIdle(); draftStream?.stop(); if (!queuedFinal) return; if ( @@ -1409,16 +1402,12 @@ async function deliverReplies(params: { ...mediaParams, }); } else if (kind === "audio") { - const { useVoice, reason } = resolveTelegramVoiceDecision({ + const { useVoice } = resolveTelegramVoiceSend({ wantsVoice: reply.audioAsVoice === true, // default false (backward compatible) contentType: media.contentType, fileName, + logFallback: logVerbose, }); - if (reason) { - logVerbose( - `Telegram voice requested but ${reason}; sending as audio file instead.`, - ); - } if (useVoice) { // Voice message - displays as round playable bubble (opt-in via [[audio_as_voice]]) await bot.api.sendVoice(chatId, file, { @@ -1571,19 +1560,17 @@ async function resolveMedia( throw new Error("fetch is not available; set telegram.proxy in config"); } const url = `https://api.telegram.org/file/bot${token}/${file.file_path}`; - const res = await fetchImpl(url); - if (!res.ok) { - throw new Error( - `Failed to download telegram file: HTTP ${res.status} ${res.statusText}`, - ); - } - const data = Buffer.from(await res.arrayBuffer()); - const mime = await detectMime({ - buffer: data, - headerMime: res.headers.get("content-type"), - filePath: file.file_path, + const fetched = await fetchRemoteMedia({ + url, + fetchImpl, + filePathHint: file.file_path, }); - const saved = await saveMediaBuffer(data, mime, "inbound", maxBytes); + const saved = await saveMediaBuffer( + fetched.buffer, + fetched.contentType, + "inbound", + maxBytes, + ); let placeholder = ""; if (msg.photo) placeholder = ""; else if (msg.video) placeholder = ""; diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 7aed6ee31..ae7af484a 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -21,7 +21,7 @@ import { parseTelegramTarget, stripTelegramInternalPrefixes, } from "./targets.js"; -import { resolveTelegramVoiceDecision } from "./voice.js"; +import { resolveTelegramVoiceSend } from "./voice.js"; type TelegramSendOpts = { token?: string; @@ -239,16 +239,12 @@ export async function sendMessageTelegram( throw wrapChatNotFound(err); }); } else if (kind === "audio") { - const { useVoice, reason } = resolveTelegramVoiceDecision({ + const { useVoice } = resolveTelegramVoiceSend({ wantsVoice: opts.asVoice === true, // default false (backward compatible) contentType: media.contentType, fileName, + logFallback: logVerbose, }); - if (reason) { - logVerbose( - `Telegram voice requested but ${reason}; sending as audio file instead.`, - ); - } if (useVoice) { result = await request( () => api.sendVoice(chatId, file, mediaParams), diff --git a/src/telegram/voice.test.ts b/src/telegram/voice.test.ts new file mode 100644 index 000000000..e1e74caeb --- /dev/null +++ b/src/telegram/voice.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, it, vi } from "vitest"; + +import { resolveTelegramVoiceSend } from "./voice.js"; + +describe("resolveTelegramVoiceSend", () => { + it("skips voice when wantsVoice is false", () => { + const logFallback = vi.fn(); + const result = resolveTelegramVoiceSend({ + wantsVoice: false, + contentType: "audio/ogg", + fileName: "voice.ogg", + logFallback, + }); + expect(result.useVoice).toBe(false); + expect(logFallback).not.toHaveBeenCalled(); + }); + + it("logs fallback for incompatible media", () => { + const logFallback = vi.fn(); + const result = resolveTelegramVoiceSend({ + wantsVoice: true, + contentType: "audio/mpeg", + fileName: "track.mp3", + logFallback, + }); + expect(result.useVoice).toBe(false); + expect(logFallback).toHaveBeenCalledWith( + "Telegram voice requested but media is audio/mpeg (track.mp3); sending as audio file instead.", + ); + }); + + it("keeps voice when compatible", () => { + const logFallback = vi.fn(); + const result = resolveTelegramVoiceSend({ + wantsVoice: true, + contentType: "audio/ogg", + fileName: "voice.ogg", + logFallback, + }); + expect(result.useVoice).toBe(true); + expect(logFallback).not.toHaveBeenCalled(); + }); +}); diff --git a/src/telegram/voice.ts b/src/telegram/voice.ts index d1527a4ce..c78ca4fee 100644 --- a/src/telegram/voice.ts +++ b/src/telegram/voice.ts @@ -1,4 +1,4 @@ -import path from "node:path"; +import { getFileExtension } from "../media/mime.js"; export function isTelegramVoiceCompatible(opts: { contentType?: string | null; @@ -10,7 +10,8 @@ export function isTelegramVoiceCompatible(opts: { } const fileName = opts.fileName?.trim(); if (!fileName) return false; - const ext = path.extname(fileName).toLowerCase(); + const ext = getFileExtension(fileName); + if (!ext) return false; return ext === ".ogg" || ext === ".opus" || ext === ".oga"; } @@ -28,3 +29,18 @@ export function resolveTelegramVoiceDecision(opts: { reason: `media is ${contentType} (${fileName})`, }; } + +export function resolveTelegramVoiceSend(opts: { + wantsVoice: boolean; + contentType?: string | null; + fileName?: string | null; + logFallback?: (message: string) => void; +}): { useVoice: boolean } { + const decision = resolveTelegramVoiceDecision(opts); + if (decision.reason && opts.logFallback) { + opts.logFallback( + `Telegram voice requested but ${decision.reason}; sending as audio file instead.`, + ); + } + return { useVoice: decision.useVoice }; +} diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 28b1805aa..3873a067a 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -17,12 +17,11 @@ import { resolveHeartbeatPrompt, stripHeartbeatToken, } from "../auto-reply/heartbeat.js"; -import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; import { buildMentionRegexes, normalizeMentionText, } from "../auto-reply/reply/mentions.js"; -import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; +import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; @@ -1219,8 +1218,39 @@ export async function monitorWebProvider( cfg, route.agentId, ).responsePrefix; - const { dispatcher, replyOptions, markDispatchIdle } = - createReplyDispatcherWithTyping({ + const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ + ctx: { + Body: combinedBody, + From: msg.from, + To: msg.to, + SessionKey: route.sessionKey, + AccountId: route.accountId, + MessageSid: msg.id, + ReplyToId: msg.replyToId, + ReplyToBody: msg.replyToBody, + ReplyToSender: msg.replyToSender, + MediaPath: msg.mediaPath, + MediaUrl: msg.mediaUrl, + MediaType: msg.mediaType, + ChatType: msg.chatType, + GroupSubject: msg.groupSubject, + GroupMembers: formatGroupMembers( + msg.groupParticipants, + groupMemberNames.get(groupHistoryKey), + msg.senderE164, + ), + SenderName: msg.senderName, + SenderE164: msg.senderE164, + WasMentioned: msg.wasMentioned, + ...(msg.location ? toLocationContext(msg.location) : {}), + Provider: "whatsapp", + Surface: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: msg.from, + }, + cfg, + replyResolver, + dispatcherOptions: { responsePrefix, onHeartbeatStrip: () => { if (!didLogHeartbeatStrip) { @@ -1283,50 +1313,14 @@ export async function monitorWebProvider( ); }, onReplyStart: msg.sendComposing, - }); - - const { queuedFinal } = await dispatchReplyFromConfig({ - ctx: { - Body: combinedBody, - From: msg.from, - To: msg.to, - SessionKey: route.sessionKey, - AccountId: route.accountId, - MessageSid: msg.id, - ReplyToId: msg.replyToId, - ReplyToBody: msg.replyToBody, - ReplyToSender: msg.replyToSender, - MediaPath: msg.mediaPath, - MediaUrl: msg.mediaUrl, - MediaType: msg.mediaType, - ChatType: msg.chatType, - GroupSubject: msg.groupSubject, - GroupMembers: formatGroupMembers( - msg.groupParticipants, - groupMemberNames.get(groupHistoryKey), - msg.senderE164, - ), - SenderName: msg.senderName, - SenderE164: msg.senderE164, - WasMentioned: msg.wasMentioned, - ...(msg.location ? toLocationContext(msg.location) : {}), - Provider: "whatsapp", - Surface: "whatsapp", - OriginatingChannel: "whatsapp", - OriginatingTo: msg.from, }, - cfg, - dispatcher, - replyResolver, replyOptions: { - ...replyOptions, disableBlockStreaming: typeof cfg.whatsapp?.blockStreaming === "boolean" ? !cfg.whatsapp.blockStreaming : undefined, }, }); - markDispatchIdle(); if (!queuedFinal) { if (shouldClearGroupHistory && didSendReply) { groupHistories.set(groupHistoryKey, []); diff --git a/src/web/media.ts b/src/web/media.ts index c0f9de41a..59bc52e0a 100644 --- a/src/web/media.ts +++ b/src/web/media.ts @@ -7,6 +7,7 @@ import { maxBytesForKind, mediaKindFromMime, } from "../media/constants.js"; +import { fetchRemoteMedia } from "../media/fetch.js"; import { resizeToJpeg } from "../media/image-ops.js"; import { detectMime, extensionForMime } from "../media/mime.js"; @@ -22,45 +23,6 @@ type WebMediaOptions = { optimizeImages?: boolean; }; -function stripQuotes(value: string): string { - return value.replace(/^["']|["']$/g, ""); -} - -function parseContentDispositionFileName( - header?: string | null, -): string | undefined { - if (!header) return undefined; - const starMatch = /filename\*\s*=\s*([^;]+)/i.exec(header); - if (starMatch?.[1]) { - const cleaned = stripQuotes(starMatch[1].trim()); - const encoded = cleaned.split("''").slice(1).join("''") || cleaned; - try { - return path.basename(decodeURIComponent(encoded)); - } catch { - return path.basename(encoded); - } - } - const match = /filename\s*=\s*([^;]+)/i.exec(header); - if (match?.[1]) return path.basename(stripQuotes(match[1].trim())); - return undefined; -} - -async function readErrorBodySnippet( - res: Response, - maxChars = 200, -): Promise { - try { - const text = await res.text(); - if (!text) return undefined; - const collapsed = text.replace(/\s+/g, " ").trim(); - if (!collapsed) return undefined; - if (collapsed.length <= maxChars) return collapsed; - return `${collapsed.slice(0, maxChars)}…`; - } catch { - return undefined; - } -} - async function loadWebMediaInternal( mediaUrl: string, options: WebMediaOptions = {}, @@ -93,53 +55,8 @@ async function loadWebMediaInternal( }; if (/^https?:\/\//i.test(mediaUrl)) { - let fileNameFromUrl: string | undefined; - try { - const url = new URL(mediaUrl); - const base = path.basename(url.pathname); - fileNameFromUrl = base || undefined; - } catch { - // ignore parse errors; leave undefined - } - let res: Response; - try { - res = await fetch(mediaUrl); - } catch (err) { - throw new Error(`Failed to fetch media from ${mediaUrl}: ${String(err)}`); - } - if (!res.ok || !res.body) { - const statusText = res.statusText ? ` ${res.statusText}` : ""; - const redirected = - res.url && res.url !== mediaUrl ? ` (redirected to ${res.url})` : ""; - let detail = `HTTP ${res.status}${statusText}`; - if (!res.body) { - detail = `HTTP ${res.status}${statusText}; empty response body`; - } else if (!res.ok) { - const snippet = await readErrorBodySnippet(res); - if (snippet) detail += `; body: ${snippet}`; - } - throw new Error( - `Failed to fetch media from ${mediaUrl}${redirected}: ${detail}`, - ); - } - const array = Buffer.from(await res.arrayBuffer()); - const headerFileName = parseContentDispositionFileName( - res.headers.get("content-disposition"), - ); - let fileName = headerFileName || fileNameFromUrl || undefined; - const filePathForMime = - headerFileName && path.extname(headerFileName) - ? headerFileName - : mediaUrl; - const contentType = await detectMime({ - buffer: array, - headerMime: res.headers.get("content-type"), - filePath: filePathForMime, - }); - if (fileName && !path.extname(fileName) && contentType) { - const ext = extensionForMime(contentType); - if (ext) fileName = `${fileName}${ext}`; - } + const fetched = await fetchRemoteMedia({ url: mediaUrl }); + const { buffer, contentType, fileName } = fetched; const kind = mediaKindFromMime(contentType); const cap = Math.min( maxBytes ?? maxBytesForKind(kind), @@ -148,28 +65,28 @@ async function loadWebMediaInternal( if (kind === "image") { // Skip optimization for GIFs to preserve animation. if (contentType === "image/gif" || !optimizeImages) { - if (array.length > cap) { + if (buffer.length > cap) { throw new Error( `${ contentType === "image/gif" ? "GIF" : "Media" } exceeds ${(cap / (1024 * 1024)).toFixed(0)}MB limit (got ${( - array.length / (1024 * 1024) + buffer.length / (1024 * 1024) ).toFixed(2)}MB)`, ); } - return { buffer: array, contentType, kind, fileName }; + return { buffer, contentType, kind, fileName }; } - return { ...(await optimizeAndClampImage(array, cap)), fileName }; + return { ...(await optimizeAndClampImage(buffer, cap)), fileName }; } - if (array.length > cap) { + if (buffer.length > cap) { throw new Error( `Media exceeds ${(cap / (1024 * 1024)).toFixed(0)}MB limit (got ${( - array.length / (1024 * 1024) + buffer.length / (1024 * 1024) ).toFixed(2)}MB)`, ); } return { - buffer: array, + buffer, contentType: contentType ?? undefined, kind, fileName,