feat(telegram): buffer audio blocks for [[audio_as_voice]] tag support
- Add [[audio_as_voice]] detection to splitMediaFromOutput() - Pass audioAsVoice through onBlockReply callback chain - Buffer audio blocks during streaming, flush at end with correct flag - Non-audio media still streams immediately - Fix: emit payloads with audioAsVoice flag even if text is empty Co-authored-by: Manuel Hettich <17690367+ManuelHettich@users.noreply.github.com>
This commit is contained in:
committed by
Peter Steinberger
parent
60bd65dfac
commit
05a99aa49b
@@ -1044,6 +1044,7 @@ export async function runEmbeddedPiAgent(params: {
|
||||
onBlockReply?: (payload: {
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
audioAsVoice?: boolean;
|
||||
}) => void | Promise<void>;
|
||||
blockReplyBreak?: "text_end" | "message_end";
|
||||
blockReplyChunking?: BlockReplyChunking;
|
||||
@@ -1640,6 +1641,7 @@ export async function runEmbeddedPiAgent(params: {
|
||||
text: string;
|
||||
media?: string[];
|
||||
isError?: boolean;
|
||||
audioAsVoice?: boolean;
|
||||
}> = [];
|
||||
|
||||
const errorText = lastAssistant
|
||||
@@ -1656,10 +1658,10 @@ export async function runEmbeddedPiAgent(params: {
|
||||
if (inlineToolResults) {
|
||||
for (const { toolName, meta } of toolMetas) {
|
||||
const agg = formatToolAggregate(toolName, meta ? [meta] : []);
|
||||
const { text: cleanedText, mediaUrls } =
|
||||
const { text: cleanedText, mediaUrls, audioAsVoice } =
|
||||
splitMediaFromOutput(agg);
|
||||
if (cleanedText)
|
||||
replyItems.push({ text: cleanedText, media: mediaUrls });
|
||||
replyItems.push({ text: cleanedText, media: mediaUrls, audioAsVoice });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1678,18 +1680,31 @@ export async function runEmbeddedPiAgent(params: {
|
||||
? [fallbackAnswerText]
|
||||
: [];
|
||||
for (const text of answerTexts) {
|
||||
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
|
||||
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0))
|
||||
const { text: cleanedText, mediaUrls, audioAsVoice } =
|
||||
splitMediaFromOutput(text);
|
||||
if (
|
||||
!cleanedText &&
|
||||
(!mediaUrls || mediaUrls.length === 0) &&
|
||||
!audioAsVoice
|
||||
)
|
||||
continue;
|
||||
replyItems.push({ text: cleanedText, media: mediaUrls });
|
||||
replyItems.push({ text: cleanedText, media: mediaUrls, audioAsVoice });
|
||||
}
|
||||
|
||||
// Check if any replyItem has audioAsVoice tag - if so, apply to all media payloads
|
||||
const hasAudioAsVoiceTag = replyItems.some(
|
||||
(item) => item.audioAsVoice,
|
||||
);
|
||||
|
||||
const payloads = replyItems
|
||||
.map((item) => ({
|
||||
text: item.text?.trim() ? item.text.trim() : undefined,
|
||||
mediaUrls: item.media?.length ? item.media : undefined,
|
||||
mediaUrl: item.media?.[0],
|
||||
isError: item.isError,
|
||||
// Apply audioAsVoice to media payloads if tag was found anywhere in response
|
||||
audioAsVoice:
|
||||
item.audioAsVoice || (hasAudioAsVoiceTag && item.media?.length),
|
||||
}))
|
||||
.filter(
|
||||
(p) =>
|
||||
|
||||
@@ -262,6 +262,7 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
onBlockReply?: (payload: {
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
audioAsVoice?: boolean;
|
||||
}) => void | Promise<void>;
|
||||
blockReplyBreak?: "text_end" | "message_end";
|
||||
blockReplyChunking?: BlockReplyChunking;
|
||||
@@ -436,11 +437,13 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
lastBlockReplyText = chunk;
|
||||
assistantTexts.push(chunk);
|
||||
if (!params.onBlockReply) return;
|
||||
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(chunk);
|
||||
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) return;
|
||||
const { text: cleanedText, mediaUrls, audioAsVoice } = splitMediaFromOutput(chunk);
|
||||
// Skip empty payloads, but always emit if audioAsVoice is set (to propagate the flag)
|
||||
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) return;
|
||||
void params.onBlockReply({
|
||||
text: cleanedText,
|
||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||
audioAsVoice,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -859,12 +862,18 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
);
|
||||
} else {
|
||||
lastBlockReplyText = text;
|
||||
const { text: cleanedText, mediaUrls } =
|
||||
const { text: cleanedText, mediaUrls, audioAsVoice } =
|
||||
splitMediaFromOutput(text);
|
||||
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
|
||||
// Emit if there's content OR audioAsVoice flag (to propagate the flag)
|
||||
if (
|
||||
cleanedText ||
|
||||
(mediaUrls && mediaUrls.length > 0) ||
|
||||
audioAsVoice
|
||||
) {
|
||||
void onBlockReply({
|
||||
text: cleanedText,
|
||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||
audioAsVoice,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,6 +260,14 @@ export async function runReplyAgent(params: {
|
||||
const pendingToolTasks = new Set<Promise<void>>();
|
||||
const blockReplyTimeoutMs =
|
||||
opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
|
||||
|
||||
// Buffer audio blocks to apply [[audio_as_voice]] tag that may come later
|
||||
const bufferedAudioBlocks: ReplyPayload[] = [];
|
||||
let seenAudioAsVoice = false;
|
||||
|
||||
const AUDIO_EXTENSIONS = /\.(opus|mp3|m4a|wav|ogg|aac|flac)$/i;
|
||||
const hasAudioMedia = (urls?: string[]): boolean =>
|
||||
Boolean(urls?.some((u) => AUDIO_EXTENSIONS.test(u)));
|
||||
const replyToChannel =
|
||||
sessionCtx.OriginatingChannel ??
|
||||
((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as
|
||||
@@ -532,23 +540,34 @@ export async function runReplyAgent(params: {
|
||||
},
|
||||
sessionCtx.MessageSid,
|
||||
);
|
||||
if (!isRenderablePayload(taggedPayload)) return;
|
||||
// Let through payloads with audioAsVoice flag even if empty (need to track it)
|
||||
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice)
|
||||
return;
|
||||
const audioTagResult = extractAudioTag(taggedPayload.text);
|
||||
const cleaned = audioTagResult.cleaned || undefined;
|
||||
const hasMedia =
|
||||
Boolean(taggedPayload.mediaUrl) ||
|
||||
(taggedPayload.mediaUrls?.length ?? 0) > 0;
|
||||
if (!cleaned && !hasMedia) return;
|
||||
// Skip empty payloads unless they have audioAsVoice flag (need to track it)
|
||||
if (!cleaned && !hasMedia && !payload.audioAsVoice) return;
|
||||
if (
|
||||
isSilentReplyText(cleaned, SILENT_REPLY_TOKEN) &&
|
||||
!hasMedia
|
||||
)
|
||||
return;
|
||||
|
||||
// Track if we've seen [[audio_as_voice]] from payload or text extraction
|
||||
if (payload.audioAsVoice || audioTagResult.audioAsVoice) {
|
||||
seenAudioAsVoice = true;
|
||||
}
|
||||
|
||||
const blockPayload: ReplyPayload = applyReplyToMode({
|
||||
...taggedPayload,
|
||||
text: cleaned,
|
||||
audioAsVoice: audioTagResult.audioAsVoice,
|
||||
audioAsVoice:
|
||||
audioTagResult.audioAsVoice || payload.audioAsVoice,
|
||||
});
|
||||
|
||||
void typingSignals
|
||||
.signalTextDelta(taggedPayload.text)
|
||||
.catch((err) => {
|
||||
@@ -556,6 +575,14 @@ export async function runReplyAgent(params: {
|
||||
`block reply typing signal failed: ${String(err)}`,
|
||||
);
|
||||
});
|
||||
|
||||
// Buffer audio blocks to apply [[audio_as_voice]] that may come later
|
||||
const isAudioBlock = hasAudioMedia(taggedPayload.mediaUrls);
|
||||
if (isAudioBlock) {
|
||||
bufferedAudioBlocks.push(blockPayload);
|
||||
return; // Don't send immediately - wait for potential [[audio_as_voice]] tag
|
||||
}
|
||||
|
||||
blockReplyPipeline?.enqueue(blockPayload);
|
||||
}
|
||||
: undefined,
|
||||
@@ -670,6 +697,17 @@ export async function runReplyAgent(params: {
|
||||
}
|
||||
|
||||
const payloadArray = runResult.payloads ?? [];
|
||||
|
||||
if (bufferedAudioBlocks.length > 0 && blockReplyPipeline) {
|
||||
for (const audioPayload of bufferedAudioBlocks) {
|
||||
const finalPayload = seenAudioAsVoice
|
||||
? { ...audioPayload, audioAsVoice: true }
|
||||
: audioPayload;
|
||||
blockReplyPipeline.enqueue(finalPayload);
|
||||
}
|
||||
bufferedAudioBlocks.length = 0;
|
||||
}
|
||||
|
||||
if (blockReplyPipeline) {
|
||||
await blockReplyPipeline.flush({ force: true });
|
||||
blockReplyPipeline.stop();
|
||||
@@ -677,6 +715,7 @@ export async function runReplyAgent(params: {
|
||||
if (pendingToolTasks.size > 0) {
|
||||
await Promise.allSettled(pendingToolTasks);
|
||||
}
|
||||
|
||||
// Drain any late tool/block deliveries before deciding there's "nothing to send".
|
||||
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
|
||||
// keep the typing indicator stuck.
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
// Shared helpers for parsing MEDIA tokens from command/stdout text.
|
||||
|
||||
import { parseFenceSpans } from "../markdown/fences.js";
|
||||
|
||||
// Allow optional wrapping backticks and punctuation after the token; capture the core token.
|
||||
export const MEDIA_TOKEN_RE = /\bMEDIA:\s*`?([^\n]+)`?/gi;
|
||||
|
||||
@@ -22,10 +24,22 @@ function isValidMedia(candidate: string) {
|
||||
);
|
||||
}
|
||||
|
||||
// Check if a character offset is inside any fenced code block
|
||||
function isInsideFence(
|
||||
fenceSpans: Array<{ start: number; end: number }>,
|
||||
offset: number,
|
||||
): boolean {
|
||||
return fenceSpans.some((span) => offset >= span.start && offset < span.end);
|
||||
}
|
||||
|
||||
// Regex to detect [[audio_as_voice]] tag
|
||||
const AUDIO_AS_VOICE_RE = /\[\[audio_as_voice\]\]/gi;
|
||||
|
||||
export function splitMediaFromOutput(raw: string): {
|
||||
text: string;
|
||||
mediaUrls?: string[];
|
||||
mediaUrl?: string; // legacy first item for backward compatibility
|
||||
audioAsVoice?: boolean; // true if [[audio_as_voice]] tag was found
|
||||
} {
|
||||
// KNOWN: Leading whitespace is semantically meaningful in Markdown (lists, indented fences).
|
||||
// We only trim the end; token cleanup below handles removing `MEDIA:` lines.
|
||||
@@ -35,14 +49,26 @@ export function splitMediaFromOutput(raw: string): {
|
||||
const media: string[] = [];
|
||||
let foundMediaToken = false;
|
||||
|
||||
// Parse fenced code blocks to avoid extracting MEDIA tokens from inside them
|
||||
const fenceSpans = parseFenceSpans(trimmedRaw);
|
||||
|
||||
// Collect tokens line by line so we can strip them cleanly.
|
||||
const lines = trimmedRaw.split("\n");
|
||||
const keptLines: string[] = [];
|
||||
|
||||
let lineOffset = 0; // Track character offset for fence checking
|
||||
for (const line of lines) {
|
||||
// Skip MEDIA extraction if this line is inside a fenced code block
|
||||
if (isInsideFence(fenceSpans, lineOffset)) {
|
||||
keptLines.push(line);
|
||||
lineOffset += line.length + 1; // +1 for newline
|
||||
continue;
|
||||
}
|
||||
|
||||
const matches = Array.from(line.matchAll(MEDIA_TOKEN_RE));
|
||||
if (matches.length === 0) {
|
||||
keptLines.push(line);
|
||||
lineOffset += line.length + 1; // +1 for newline
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -86,18 +112,39 @@ export function splitMediaFromOutput(raw: string): {
|
||||
if (cleanedLine) {
|
||||
keptLines.push(cleanedLine);
|
||||
}
|
||||
lineOffset += line.length + 1; // +1 for newline
|
||||
}
|
||||
|
||||
const cleanedText = keptLines
|
||||
let cleanedText = keptLines
|
||||
.join("\n")
|
||||
.replace(/[ \t]+\n/g, "\n")
|
||||
.replace(/[ \t]{2,}/g, " ")
|
||||
.replace(/\n{2,}/g, "\n")
|
||||
.trim();
|
||||
|
||||
if (media.length === 0) {
|
||||
return { text: foundMediaToken ? cleanedText : trimmedRaw };
|
||||
// Detect and strip [[audio_as_voice]] tag
|
||||
const hasAudioAsVoice = AUDIO_AS_VOICE_RE.test(cleanedText);
|
||||
if (hasAudioAsVoice) {
|
||||
cleanedText = cleanedText
|
||||
.replace(AUDIO_AS_VOICE_RE, "")
|
||||
.replace(/[ \t]+/g, " ")
|
||||
.replace(/\n{2,}/g, "\n")
|
||||
.trim();
|
||||
}
|
||||
|
||||
return { text: cleanedText, mediaUrls: media, mediaUrl: media[0] };
|
||||
if (media.length === 0) {
|
||||
const result: ReturnType<typeof splitMediaFromOutput> = {
|
||||
// Return cleaned text if we found a media token OR audio tag, otherwise original
|
||||
text: (foundMediaToken || hasAudioAsVoice) ? cleanedText : trimmedRaw,
|
||||
};
|
||||
if (hasAudioAsVoice) result.audioAsVoice = true;
|
||||
return result;
|
||||
}
|
||||
|
||||
return {
|
||||
text: cleanedText,
|
||||
mediaUrls: media,
|
||||
mediaUrl: media[0],
|
||||
...(hasAudioAsVoice ? { audioAsVoice: true } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1388,6 +1388,7 @@ async function deliverReplies(params: {
|
||||
});
|
||||
} else if (kind === "audio") {
|
||||
const useVoice = reply.audioAsVoice === true; // default false (backward compatible)
|
||||
log.warn(`[DEBUG] Audio media: audioAsVoice=${reply.audioAsVoice}, useVoice=${useVoice}`);
|
||||
if (useVoice) {
|
||||
// Voice message - displays as round playable bubble (opt-in via [[audio_as_voice]])
|
||||
await bot.api.sendVoice(chatId, file, {
|
||||
|
||||
Reference in New Issue
Block a user