diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fdb94ec9..17d774c8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ - Commands: harden slash command registry and list text-only commands in `/commands`. - Models/Auth: show per-agent auth candidates in `/model status`, and add `clawdbot models auth order {get,set,clear}` (per-agent auth rotation overrides). — thanks @steipete - Telegram: keep streamMode draft-only; avoid forcing block streaming. (#619) — thanks @rubyrunsstuff +- Telegram: add `[[audio_as_voice]]` tag support for voice notes with streaming-safe delivery. (#490) — thanks @jarvis-medmatic - Debugging: add raw model stream logging flags and document gateway watch mode. - Gateway: decode dns-sd escaped UTF-8 in discovery output and show scan progress immediately. — thanks @steipete - Agent: add claude-cli/opus-4.5 runner via Claude CLI with resume support (tools disabled). diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 4812444fc..f2fc0ae35 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -776,6 +776,7 @@ export async function compactEmbeddedPiSession(params: { const enqueueGlobal = params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); + const runAbortController = new AbortController(); return enqueueCommandInLane(sessionLane, () => enqueueGlobal(async () => { const resolvedWorkspace = resolveUserPath(params.workspaceDir); @@ -1045,6 +1046,7 @@ export async function runEmbeddedPiAgent(params: { onBlockReply?: (payload: { text?: string; mediaUrls?: string[]; + audioAsVoice?: boolean; }) => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; @@ -1641,6 +1643,7 @@ export async function runEmbeddedPiAgent(params: { text: string; media?: string[]; isError?: boolean; + audioAsVoice?: boolean; }> = []; const errorText = lastAssistant @@ -1657,10 +1660,17 @@ export async function runEmbeddedPiAgent(params: { if (inlineToolResults) { for (const { toolName, meta } of toolMetas) { const agg = formatToolAggregate(toolName, meta ? [meta] : []); - const { text: cleanedText, mediaUrls } = - splitMediaFromOutput(agg); + const { + text: cleanedText, + mediaUrls, + audioAsVoice, + } = splitMediaFromOutput(agg); if (cleanedText) - replyItems.push({ text: cleanedText, media: mediaUrls }); + replyItems.push({ + text: cleanedText, + media: mediaUrls, + audioAsVoice, + }); } } @@ -1679,18 +1689,37 @@ export async function runEmbeddedPiAgent(params: { ? [fallbackAnswerText] : []; for (const text of answerTexts) { - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); - if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) + const { + text: cleanedText, + mediaUrls, + audioAsVoice, + } = splitMediaFromOutput(text); + if ( + !cleanedText && + (!mediaUrls || mediaUrls.length === 0) && + !audioAsVoice + ) continue; - replyItems.push({ text: cleanedText, media: mediaUrls }); + replyItems.push({ + text: cleanedText, + media: mediaUrls, + audioAsVoice, + }); } + // Check if any replyItem has audioAsVoice tag - if so, apply to all media payloads + const hasAudioAsVoiceTag = replyItems.some( + (item) => item.audioAsVoice, + ); const payloads = replyItems .map((item) => ({ text: item.text?.trim() ? item.text.trim() : undefined, mediaUrls: item.media?.length ? item.media : undefined, mediaUrl: item.media?.[0], isError: item.isError, + // Apply audioAsVoice to media payloads if tag was found anywhere in response + audioAsVoice: + item.audioAsVoice || (hasAudioAsVoiceTag && item.media?.length), })) .filter( (p) => diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 4c407a483..001d579f1 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -262,6 +262,7 @@ export function subscribeEmbeddedPiSession(params: { onBlockReply?: (payload: { text?: string; mediaUrls?: string[]; + audioAsVoice?: boolean; }) => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; @@ -436,11 +437,15 @@ export function subscribeEmbeddedPiSession(params: { lastBlockReplyText = chunk; assistantTexts.push(chunk); if (!params.onBlockReply) return; - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(chunk); - if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) return; + const splitResult = splitMediaFromOutput(chunk); + const { text: cleanedText, mediaUrls, audioAsVoice } = splitResult; + // Skip empty payloads, but always emit if audioAsVoice is set (to propagate the flag) + if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) + return; void params.onBlockReply({ text: cleanedText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, }); }; @@ -859,12 +864,21 @@ export function subscribeEmbeddedPiSession(params: { ); } else { lastBlockReplyText = text; - const { text: cleanedText, mediaUrls } = - splitMediaFromOutput(text); - if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { + const { + text: cleanedText, + mediaUrls, + audioAsVoice, + } = splitMediaFromOutput(text); + // Emit if there's content OR audioAsVoice flag (to propagate the flag) + if ( + cleanedText || + (mediaUrls && mediaUrls.length > 0) || + audioAsVoice + ) { void onBlockReply({ text: cleanedText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, }); } } diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index 07c00babb..f06948626 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -3,6 +3,7 @@ import type { ClawdbotConfig } from "../../config/config.js"; import { loadSessionStore, resolveStorePath, + type SessionEntry, saveSessionStore, type SessionEntry, } from "../../config/sessions.js"; diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 56a04b5b2..c4878690d 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -260,6 +260,14 @@ export async function runReplyAgent(params: { const pendingToolTasks = new Set>(); const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS; + + // Buffer audio blocks to apply [[audio_as_voice]] tag that may come later + const bufferedAudioBlocks: ReplyPayload[] = []; + let seenAudioAsVoice = false; + + const AUDIO_EXTENSIONS = /\.(opus|mp3|m4a|wav|ogg|aac|flac)$/i; + const hasAudioMedia = (urls?: string[]): boolean => + Boolean(urls?.some((u) => AUDIO_EXTENSIONS.test(u))); const replyToChannel = sessionCtx.OriginatingChannel ?? ((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as @@ -532,23 +540,37 @@ export async function runReplyAgent(params: { }, sessionCtx.MessageSid, ); - if (!isRenderablePayload(taggedPayload)) return; + // Let through payloads with audioAsVoice flag even if empty (need to track it) + if ( + !isRenderablePayload(taggedPayload) && + !payload.audioAsVoice + ) + return; const audioTagResult = extractAudioTag(taggedPayload.text); const cleaned = audioTagResult.cleaned || undefined; const hasMedia = Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; - if (!cleaned && !hasMedia) return; + // Skip empty payloads unless they have audioAsVoice flag (need to track it) + if (!cleaned && !hasMedia && !payload.audioAsVoice) return; if ( isSilentReplyText(cleaned, SILENT_REPLY_TOKEN) && !hasMedia ) return; + + // Track if we've seen [[audio_as_voice]] from payload or text extraction + if (payload.audioAsVoice || audioTagResult.audioAsVoice) { + seenAudioAsVoice = true; + } + const blockPayload: ReplyPayload = applyReplyToMode({ ...taggedPayload, text: cleaned, - audioAsVoice: audioTagResult.audioAsVoice, + audioAsVoice: + audioTagResult.audioAsVoice || payload.audioAsVoice, }); + void typingSignals .signalTextDelta(taggedPayload.text) .catch((err) => { @@ -556,6 +578,14 @@ export async function runReplyAgent(params: { `block reply typing signal failed: ${String(err)}`, ); }); + + // Buffer audio blocks to apply [[audio_as_voice]] that may come later + const isAudioBlock = hasAudioMedia(taggedPayload.mediaUrls); + if (isAudioBlock) { + bufferedAudioBlocks.push(blockPayload); + return; // Don't send immediately - wait for potential [[audio_as_voice]] tag + } + blockReplyPipeline?.enqueue(blockPayload); } : undefined, @@ -670,6 +700,17 @@ export async function runReplyAgent(params: { } const payloadArray = runResult.payloads ?? []; + + if (bufferedAudioBlocks.length > 0 && blockReplyPipeline) { + for (const audioPayload of bufferedAudioBlocks) { + const finalPayload = seenAudioAsVoice + ? { ...audioPayload, audioAsVoice: true } + : audioPayload; + blockReplyPipeline.enqueue(finalPayload); + } + bufferedAudioBlocks.length = 0; + } + if (blockReplyPipeline) { await blockReplyPipeline.flush({ force: true }); blockReplyPipeline.stop(); @@ -677,6 +718,7 @@ export async function runReplyAgent(params: { if (pendingToolTasks.size > 0) { await Promise.allSettled(pendingToolTasks); } + // Drain any late tool/block deliveries before deciding there's "nothing to send". // Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and // keep the typing indicator stuck. diff --git a/src/media/parse.test.ts b/src/media/parse.test.ts new file mode 100644 index 000000000..f910d851c --- /dev/null +++ b/src/media/parse.test.ts @@ -0,0 +1,19 @@ +import { describe, expect, it } from "vitest"; + +import { splitMediaFromOutput } from "./parse.js"; + +describe("splitMediaFromOutput", () => { + it("detects audio_as_voice tag and strips it", () => { + const result = splitMediaFromOutput("Hello [[audio_as_voice]] world"); + expect(result.audioAsVoice).toBe(true); + expect(result.text).toBe("Hello world"); + }); + + it("keeps audio_as_voice detection stable across calls", () => { + const input = "Hello [[audio_as_voice]]"; + const first = splitMediaFromOutput(input); + const second = splitMediaFromOutput(input); + expect(first.audioAsVoice).toBe(true); + expect(second.audioAsVoice).toBe(true); + }); +}); diff --git a/src/media/parse.ts b/src/media/parse.ts index 2a01156c4..38f751992 100644 --- a/src/media/parse.ts +++ b/src/media/parse.ts @@ -1,5 +1,7 @@ // Shared helpers for parsing MEDIA tokens from command/stdout text. +import { parseFenceSpans } from "../markdown/fences.js"; + // Allow optional wrapping backticks and punctuation after the token; capture the core token. export const MEDIA_TOKEN_RE = /\bMEDIA:\s*`?([^\n]+)`?/gi; @@ -22,10 +24,23 @@ function isValidMedia(candidate: string) { ); } +// Check if a character offset is inside any fenced code block +function isInsideFence( + fenceSpans: Array<{ start: number; end: number }>, + offset: number, +): boolean { + return fenceSpans.some((span) => offset >= span.start && offset < span.end); +} + +// Regex to detect [[audio_as_voice]] tag +const AUDIO_AS_VOICE_RE = /\[\[audio_as_voice\]\]/gi; +const AUDIO_AS_VOICE_TEST_RE = /\[\[audio_as_voice\]\]/i; + export function splitMediaFromOutput(raw: string): { text: string; mediaUrls?: string[]; mediaUrl?: string; // legacy first item for backward compatibility + audioAsVoice?: boolean; // true if [[audio_as_voice]] tag was found } { // KNOWN: Leading whitespace is semantically meaningful in Markdown (lists, indented fences). // We only trim the end; token cleanup below handles removing `MEDIA:` lines. @@ -35,14 +50,26 @@ export function splitMediaFromOutput(raw: string): { const media: string[] = []; let foundMediaToken = false; + // Parse fenced code blocks to avoid extracting MEDIA tokens from inside them + const fenceSpans = parseFenceSpans(trimmedRaw); + // Collect tokens line by line so we can strip them cleanly. const lines = trimmedRaw.split("\n"); const keptLines: string[] = []; + let lineOffset = 0; // Track character offset for fence checking for (const line of lines) { + // Skip MEDIA extraction if this line is inside a fenced code block + if (isInsideFence(fenceSpans, lineOffset)) { + keptLines.push(line); + lineOffset += line.length + 1; // +1 for newline + continue; + } + const matches = Array.from(line.matchAll(MEDIA_TOKEN_RE)); if (matches.length === 0) { keptLines.push(line); + lineOffset += line.length + 1; // +1 for newline continue; } @@ -86,18 +113,39 @@ export function splitMediaFromOutput(raw: string): { if (cleanedLine) { keptLines.push(cleanedLine); } + lineOffset += line.length + 1; // +1 for newline } - const cleanedText = keptLines + let cleanedText = keptLines .join("\n") .replace(/[ \t]+\n/g, "\n") .replace(/[ \t]{2,}/g, " ") .replace(/\n{2,}/g, "\n") .trim(); - if (media.length === 0) { - return { text: foundMediaToken ? cleanedText : trimmedRaw }; + // Detect and strip [[audio_as_voice]] tag + const hasAudioAsVoice = AUDIO_AS_VOICE_TEST_RE.test(cleanedText); + if (hasAudioAsVoice) { + cleanedText = cleanedText + .replace(AUDIO_AS_VOICE_RE, "") + .replace(/[ \t]+/g, " ") + .replace(/\n{2,}/g, "\n") + .trim(); } - return { text: cleanedText, mediaUrls: media, mediaUrl: media[0] }; + if (media.length === 0) { + const result: ReturnType = { + // Return cleaned text if we found a media token OR audio tag, otherwise original + text: foundMediaToken || hasAudioAsVoice ? cleanedText : trimmedRaw, + }; + if (hasAudioAsVoice) result.audioAsVoice = true; + return result; + } + + return { + text: cleanedText, + mediaUrls: media, + mediaUrl: media[0], + ...(hasAudioAsVoice ? { audioAsVoice: true } : {}), + }; } diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index aaba18307..8ec6756b4 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -64,6 +64,7 @@ import { readTelegramAllowFromStore, upsertTelegramPairingRequest, } from "./pairing-store.js"; +import { resolveTelegramVoiceDecision } from "./voice.js"; const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i; @@ -1387,7 +1388,16 @@ async function deliverReplies(params: { ...mediaParams, }); } else if (kind === "audio") { - const useVoice = reply.audioAsVoice === true; // default false (backward compatible) + const { useVoice, reason } = resolveTelegramVoiceDecision({ + wantsVoice: reply.audioAsVoice === true, // default false (backward compatible) + contentType: media.contentType, + fileName, + }); + if (reason) { + logVerbose( + `Telegram voice requested but ${reason}; sending as audio file instead.`, + ); + } if (useVoice) { // Voice message - displays as round playable bubble (opt-in via [[audio_as_voice]]) await bot.api.sendVoice(chatId, file, { diff --git a/src/telegram/send.test.ts b/src/telegram/send.test.ts index 115f88851..d2172d65d 100644 --- a/src/telegram/send.test.ts +++ b/src/telegram/send.test.ts @@ -324,6 +324,40 @@ describe("sendMessageTelegram", () => { expect(sendAudio).not.toHaveBeenCalled(); }); + it("falls back to audio when asVoice is true but media is not voice compatible", async () => { + const chatId = "123"; + const sendAudio = vi.fn().mockResolvedValue({ + message_id: 14, + chat: { id: chatId }, + }); + const sendVoice = vi.fn().mockResolvedValue({ + message_id: 15, + chat: { id: chatId }, + }); + const api = { sendAudio, sendVoice } as unknown as { + sendAudio: typeof sendAudio; + sendVoice: typeof sendVoice; + }; + + loadWebMedia.mockResolvedValueOnce({ + buffer: Buffer.from("audio"), + contentType: "audio/mpeg", + fileName: "clip.mp3", + }); + + await sendMessageTelegram(chatId, "caption", { + token: "tok", + api, + mediaUrl: "https://example.com/clip.mp3", + asVoice: true, + }); + + expect(sendAudio).toHaveBeenCalledWith(chatId, expect.anything(), { + caption: "caption", + }); + expect(sendVoice).not.toHaveBeenCalled(); + }); + it("includes message_thread_id for forum topic messages", async () => { const chatId = "-1001234567890"; const sendMessage = vi.fn().mockResolvedValue({ diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 8799e32f7..7aed6ee31 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -6,6 +6,7 @@ import type { } from "@grammyjs/types"; import { type ApiClientOptions, Bot, InputFile } from "grammy"; import { loadConfig } from "../config/config.js"; +import { logVerbose } from "../globals.js"; import { formatErrorMessage } from "../infra/errors.js"; import { recordProviderActivity } from "../infra/provider-activity.js"; import type { RetryConfig } from "../infra/retry.js"; @@ -20,6 +21,7 @@ import { parseTelegramTarget, stripTelegramInternalPrefixes, } from "./targets.js"; +import { resolveTelegramVoiceDecision } from "./voice.js"; type TelegramSendOpts = { token?: string; @@ -237,7 +239,16 @@ export async function sendMessageTelegram( throw wrapChatNotFound(err); }); } else if (kind === "audio") { - const useVoice = opts.asVoice === true; // default false (backward compatible) + const { useVoice, reason } = resolveTelegramVoiceDecision({ + wantsVoice: opts.asVoice === true, // default false (backward compatible) + contentType: media.contentType, + fileName, + }); + if (reason) { + logVerbose( + `Telegram voice requested but ${reason}; sending as audio file instead.`, + ); + } if (useVoice) { result = await request( () => api.sendVoice(chatId, file, mediaParams), diff --git a/src/telegram/voice.ts b/src/telegram/voice.ts new file mode 100644 index 000000000..d1527a4ce --- /dev/null +++ b/src/telegram/voice.ts @@ -0,0 +1,30 @@ +import path from "node:path"; + +export function isTelegramVoiceCompatible(opts: { + contentType?: string | null; + fileName?: string | null; +}): boolean { + const mime = opts.contentType?.toLowerCase(); + if (mime && (mime.includes("ogg") || mime.includes("opus"))) { + return true; + } + const fileName = opts.fileName?.trim(); + if (!fileName) return false; + const ext = path.extname(fileName).toLowerCase(); + return ext === ".ogg" || ext === ".opus" || ext === ".oga"; +} + +export function resolveTelegramVoiceDecision(opts: { + wantsVoice: boolean; + contentType?: string | null; + fileName?: string | null; +}): { useVoice: boolean; reason?: string } { + if (!opts.wantsVoice) return { useVoice: false }; + if (isTelegramVoiceCompatible(opts)) return { useVoice: true }; + const contentType = opts.contentType ?? "unknown"; + const fileName = opts.fileName ?? "unknown"; + return { + useVoice: false, + reason: `media is ${contentType} (${fileName})`, + }; +}