diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 7a416c60c..5ef32ea65 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -1044,6 +1044,7 @@ export async function runEmbeddedPiAgent(params: { onBlockReply?: (payload: { text?: string; mediaUrls?: string[]; + audioAsVoice?: boolean; }) => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; @@ -1640,6 +1641,7 @@ export async function runEmbeddedPiAgent(params: { text: string; media?: string[]; isError?: boolean; + audioAsVoice?: boolean; }> = []; const errorText = lastAssistant @@ -1656,10 +1658,10 @@ export async function runEmbeddedPiAgent(params: { if (inlineToolResults) { for (const { toolName, meta } of toolMetas) { const agg = formatToolAggregate(toolName, meta ? [meta] : []); - const { text: cleanedText, mediaUrls } = + const { text: cleanedText, mediaUrls, audioAsVoice } = splitMediaFromOutput(agg); if (cleanedText) - replyItems.push({ text: cleanedText, media: mediaUrls }); + replyItems.push({ text: cleanedText, media: mediaUrls, audioAsVoice }); } } @@ -1678,18 +1680,31 @@ export async function runEmbeddedPiAgent(params: { ? [fallbackAnswerText] : []; for (const text of answerTexts) { - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); - if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) + const { text: cleanedText, mediaUrls, audioAsVoice } = + splitMediaFromOutput(text); + if ( + !cleanedText && + (!mediaUrls || mediaUrls.length === 0) && + !audioAsVoice + ) continue; - replyItems.push({ text: cleanedText, media: mediaUrls }); + replyItems.push({ text: cleanedText, media: mediaUrls, audioAsVoice }); } + // Check if any replyItem has audioAsVoice tag - if so, apply to all media payloads + const hasAudioAsVoiceTag = replyItems.some( + (item) => item.audioAsVoice, + ); + const payloads = replyItems .map((item) => ({ text: item.text?.trim() ? item.text.trim() : undefined, mediaUrls: item.media?.length ? item.media : undefined, mediaUrl: item.media?.[0], isError: item.isError, + // Apply audioAsVoice to media payloads if tag was found anywhere in response + audioAsVoice: + item.audioAsVoice || (hasAudioAsVoiceTag && item.media?.length), })) .filter( (p) => diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 4c407a483..6dc5957cd 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -262,6 +262,7 @@ export function subscribeEmbeddedPiSession(params: { onBlockReply?: (payload: { text?: string; mediaUrls?: string[]; + audioAsVoice?: boolean; }) => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; @@ -436,11 +437,13 @@ export function subscribeEmbeddedPiSession(params: { lastBlockReplyText = chunk; assistantTexts.push(chunk); if (!params.onBlockReply) return; - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(chunk); - if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) return; + const { text: cleanedText, mediaUrls, audioAsVoice } = splitMediaFromOutput(chunk); + // Skip empty payloads, but always emit if audioAsVoice is set (to propagate the flag) + if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) return; void params.onBlockReply({ text: cleanedText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, }); }; @@ -859,12 +862,18 @@ export function subscribeEmbeddedPiSession(params: { ); } else { lastBlockReplyText = text; - const { text: cleanedText, mediaUrls } = + const { text: cleanedText, mediaUrls, audioAsVoice } = splitMediaFromOutput(text); - if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { + // Emit if there's content OR audioAsVoice flag (to propagate the flag) + if ( + cleanedText || + (mediaUrls && mediaUrls.length > 0) || + audioAsVoice + ) { void onBlockReply({ text: cleanedText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, }); } } diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 56a04b5b2..1063dcdd7 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -260,6 +260,14 @@ export async function runReplyAgent(params: { const pendingToolTasks = new Set>(); const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS; + + // Buffer audio blocks to apply [[audio_as_voice]] tag that may come later + const bufferedAudioBlocks: ReplyPayload[] = []; + let seenAudioAsVoice = false; + + const AUDIO_EXTENSIONS = /\.(opus|mp3|m4a|wav|ogg|aac|flac)$/i; + const hasAudioMedia = (urls?: string[]): boolean => + Boolean(urls?.some((u) => AUDIO_EXTENSIONS.test(u))); const replyToChannel = sessionCtx.OriginatingChannel ?? ((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as @@ -532,23 +540,34 @@ export async function runReplyAgent(params: { }, sessionCtx.MessageSid, ); - if (!isRenderablePayload(taggedPayload)) return; + // Let through payloads with audioAsVoice flag even if empty (need to track it) + if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) + return; const audioTagResult = extractAudioTag(taggedPayload.text); const cleaned = audioTagResult.cleaned || undefined; const hasMedia = Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; - if (!cleaned && !hasMedia) return; + // Skip empty payloads unless they have audioAsVoice flag (need to track it) + if (!cleaned && !hasMedia && !payload.audioAsVoice) return; if ( isSilentReplyText(cleaned, SILENT_REPLY_TOKEN) && !hasMedia ) return; + + // Track if we've seen [[audio_as_voice]] from payload or text extraction + if (payload.audioAsVoice || audioTagResult.audioAsVoice) { + seenAudioAsVoice = true; + } + const blockPayload: ReplyPayload = applyReplyToMode({ ...taggedPayload, text: cleaned, - audioAsVoice: audioTagResult.audioAsVoice, + audioAsVoice: + audioTagResult.audioAsVoice || payload.audioAsVoice, }); + void typingSignals .signalTextDelta(taggedPayload.text) .catch((err) => { @@ -556,6 +575,14 @@ export async function runReplyAgent(params: { `block reply typing signal failed: ${String(err)}`, ); }); + + // Buffer audio blocks to apply [[audio_as_voice]] that may come later + const isAudioBlock = hasAudioMedia(taggedPayload.mediaUrls); + if (isAudioBlock) { + bufferedAudioBlocks.push(blockPayload); + return; // Don't send immediately - wait for potential [[audio_as_voice]] tag + } + blockReplyPipeline?.enqueue(blockPayload); } : undefined, @@ -670,6 +697,17 @@ export async function runReplyAgent(params: { } const payloadArray = runResult.payloads ?? []; + + if (bufferedAudioBlocks.length > 0 && blockReplyPipeline) { + for (const audioPayload of bufferedAudioBlocks) { + const finalPayload = seenAudioAsVoice + ? { ...audioPayload, audioAsVoice: true } + : audioPayload; + blockReplyPipeline.enqueue(finalPayload); + } + bufferedAudioBlocks.length = 0; + } + if (blockReplyPipeline) { await blockReplyPipeline.flush({ force: true }); blockReplyPipeline.stop(); @@ -677,6 +715,7 @@ export async function runReplyAgent(params: { if (pendingToolTasks.size > 0) { await Promise.allSettled(pendingToolTasks); } + // Drain any late tool/block deliveries before deciding there's "nothing to send". // Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and // keep the typing indicator stuck. diff --git a/src/media/parse.ts b/src/media/parse.ts index 2a01156c4..08f6d492e 100644 --- a/src/media/parse.ts +++ b/src/media/parse.ts @@ -1,5 +1,7 @@ // Shared helpers for parsing MEDIA tokens from command/stdout text. +import { parseFenceSpans } from "../markdown/fences.js"; + // Allow optional wrapping backticks and punctuation after the token; capture the core token. export const MEDIA_TOKEN_RE = /\bMEDIA:\s*`?([^\n]+)`?/gi; @@ -22,10 +24,22 @@ function isValidMedia(candidate: string) { ); } +// Check if a character offset is inside any fenced code block +function isInsideFence( + fenceSpans: Array<{ start: number; end: number }>, + offset: number, +): boolean { + return fenceSpans.some((span) => offset >= span.start && offset < span.end); +} + +// Regex to detect [[audio_as_voice]] tag +const AUDIO_AS_VOICE_RE = /\[\[audio_as_voice\]\]/gi; + export function splitMediaFromOutput(raw: string): { text: string; mediaUrls?: string[]; mediaUrl?: string; // legacy first item for backward compatibility + audioAsVoice?: boolean; // true if [[audio_as_voice]] tag was found } { // KNOWN: Leading whitespace is semantically meaningful in Markdown (lists, indented fences). // We only trim the end; token cleanup below handles removing `MEDIA:` lines. @@ -35,14 +49,26 @@ export function splitMediaFromOutput(raw: string): { const media: string[] = []; let foundMediaToken = false; + // Parse fenced code blocks to avoid extracting MEDIA tokens from inside them + const fenceSpans = parseFenceSpans(trimmedRaw); + // Collect tokens line by line so we can strip them cleanly. const lines = trimmedRaw.split("\n"); const keptLines: string[] = []; + let lineOffset = 0; // Track character offset for fence checking for (const line of lines) { + // Skip MEDIA extraction if this line is inside a fenced code block + if (isInsideFence(fenceSpans, lineOffset)) { + keptLines.push(line); + lineOffset += line.length + 1; // +1 for newline + continue; + } + const matches = Array.from(line.matchAll(MEDIA_TOKEN_RE)); if (matches.length === 0) { keptLines.push(line); + lineOffset += line.length + 1; // +1 for newline continue; } @@ -86,18 +112,39 @@ export function splitMediaFromOutput(raw: string): { if (cleanedLine) { keptLines.push(cleanedLine); } + lineOffset += line.length + 1; // +1 for newline } - const cleanedText = keptLines + let cleanedText = keptLines .join("\n") .replace(/[ \t]+\n/g, "\n") .replace(/[ \t]{2,}/g, " ") .replace(/\n{2,}/g, "\n") .trim(); - if (media.length === 0) { - return { text: foundMediaToken ? cleanedText : trimmedRaw }; + // Detect and strip [[audio_as_voice]] tag + const hasAudioAsVoice = AUDIO_AS_VOICE_RE.test(cleanedText); + if (hasAudioAsVoice) { + cleanedText = cleanedText + .replace(AUDIO_AS_VOICE_RE, "") + .replace(/[ \t]+/g, " ") + .replace(/\n{2,}/g, "\n") + .trim(); } - return { text: cleanedText, mediaUrls: media, mediaUrl: media[0] }; + if (media.length === 0) { + const result: ReturnType = { + // Return cleaned text if we found a media token OR audio tag, otherwise original + text: (foundMediaToken || hasAudioAsVoice) ? cleanedText : trimmedRaw, + }; + if (hasAudioAsVoice) result.audioAsVoice = true; + return result; + } + + return { + text: cleanedText, + mediaUrls: media, + mediaUrl: media[0], + ...(hasAudioAsVoice ? { audioAsVoice: true } : {}), + }; } diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index aaba18307..80404a185 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -1388,6 +1388,7 @@ async function deliverReplies(params: { }); } else if (kind === "audio") { const useVoice = reply.audioAsVoice === true; // default false (backward compatible) + log.warn(`[DEBUG] Audio media: audioAsVoice=${reply.audioAsVoice}, useVoice=${useVoice}`); if (useVoice) { // Voice message - displays as round playable bubble (opt-in via [[audio_as_voice]]) await bot.api.sendVoice(chatId, file, {