refactor: consolidate reply/media helpers
This commit is contained in:
@@ -22,6 +22,7 @@ import {
|
||||
emitAgentEvent,
|
||||
registerAgentRunContext,
|
||||
} from "../../infra/agent-events.js";
|
||||
import { isAudioFileName } from "../../media/mime.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import {
|
||||
estimateUsageCost,
|
||||
@@ -34,8 +35,11 @@ import type { OriginatingChannelType, TemplateContext } from "../templating.js";
|
||||
import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { extractAudioTag } from "./audio-tags.js";
|
||||
import { createBlockReplyPipeline } from "./block-reply-pipeline.js";
|
||||
import { parseAudioTag } from "./audio-tags.js";
|
||||
import {
|
||||
createAudioAsVoiceBuffer,
|
||||
createBlockReplyPipeline,
|
||||
} from "./block-reply-pipeline.js";
|
||||
import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
|
||||
import { createFollowupRunner } from "./followup-runner.js";
|
||||
import {
|
||||
@@ -261,13 +265,12 @@ export async function runReplyAgent(params: {
|
||||
const blockReplyTimeoutMs =
|
||||
opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
|
||||
|
||||
// Buffer audio blocks to apply [[audio_as_voice]] tag that may come later
|
||||
const bufferedAudioBlocks: ReplyPayload[] = [];
|
||||
let seenAudioAsVoice = false;
|
||||
|
||||
const AUDIO_EXTENSIONS = /\.(opus|mp3|m4a|wav|ogg|aac|flac)$/i;
|
||||
const hasAudioMedia = (urls?: string[]): boolean =>
|
||||
Boolean(urls?.some((u) => AUDIO_EXTENSIONS.test(u)));
|
||||
Boolean(urls?.some((u) => isAudioFileName(u)));
|
||||
const isAudioPayload = (payload: ReplyPayload) =>
|
||||
hasAudioMedia(
|
||||
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : undefined),
|
||||
);
|
||||
const replyToChannel =
|
||||
sessionCtx.OriginatingChannel ??
|
||||
((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as
|
||||
@@ -297,6 +300,7 @@ export async function runReplyAgent(params: {
|
||||
onBlockReply: opts.onBlockReply,
|
||||
timeoutMs: blockReplyTimeoutMs,
|
||||
coalescing: blockReplyCoalescing,
|
||||
buffer: createAudioAsVoiceBuffer({ isAudioPayload }),
|
||||
})
|
||||
: null;
|
||||
|
||||
@@ -546,8 +550,8 @@ export async function runReplyAgent(params: {
|
||||
!payload.audioAsVoice
|
||||
)
|
||||
return;
|
||||
const audioTagResult = extractAudioTag(taggedPayload.text);
|
||||
const cleaned = audioTagResult.cleaned || undefined;
|
||||
const audioTagResult = parseAudioTag(taggedPayload.text);
|
||||
const cleaned = audioTagResult.text || undefined;
|
||||
const hasMedia =
|
||||
Boolean(taggedPayload.mediaUrl) ||
|
||||
(taggedPayload.mediaUrls?.length ?? 0) > 0;
|
||||
@@ -559,11 +563,6 @@ export async function runReplyAgent(params: {
|
||||
)
|
||||
return;
|
||||
|
||||
// Track if we've seen [[audio_as_voice]] from payload or text extraction
|
||||
if (payload.audioAsVoice || audioTagResult.audioAsVoice) {
|
||||
seenAudioAsVoice = true;
|
||||
}
|
||||
|
||||
const blockPayload: ReplyPayload = applyReplyToMode({
|
||||
...taggedPayload,
|
||||
text: cleaned,
|
||||
@@ -579,13 +578,6 @@ export async function runReplyAgent(params: {
|
||||
);
|
||||
});
|
||||
|
||||
// Buffer audio blocks to apply [[audio_as_voice]] that may come later
|
||||
const isAudioBlock = hasAudioMedia(taggedPayload.mediaUrls);
|
||||
if (isAudioBlock) {
|
||||
bufferedAudioBlocks.push(blockPayload);
|
||||
return; // Don't send immediately - wait for potential [[audio_as_voice]] tag
|
||||
}
|
||||
|
||||
blockReplyPipeline?.enqueue(blockPayload);
|
||||
}
|
||||
: undefined,
|
||||
@@ -701,16 +693,6 @@ export async function runReplyAgent(params: {
|
||||
|
||||
const payloadArray = runResult.payloads ?? [];
|
||||
|
||||
if (bufferedAudioBlocks.length > 0 && blockReplyPipeline) {
|
||||
for (const audioPayload of bufferedAudioBlocks) {
|
||||
const finalPayload = seenAudioAsVoice
|
||||
? { ...audioPayload, audioAsVoice: true }
|
||||
: audioPayload;
|
||||
blockReplyPipeline.enqueue(finalPayload);
|
||||
}
|
||||
bufferedAudioBlocks.length = 0;
|
||||
}
|
||||
|
||||
if (blockReplyPipeline) {
|
||||
await blockReplyPipeline.flush({ force: true });
|
||||
blockReplyPipeline.stop();
|
||||
@@ -753,10 +735,10 @@ export async function runReplyAgent(params: {
|
||||
currentMessageId: sessionCtx.MessageSid,
|
||||
})
|
||||
.map((payload) => {
|
||||
const audioTagResult = extractAudioTag(payload.text);
|
||||
const audioTagResult = parseAudioTag(payload.text);
|
||||
return {
|
||||
...payload,
|
||||
text: audioTagResult.cleaned ? audioTagResult.cleaned : undefined,
|
||||
text: audioTagResult.text ? audioTagResult.text : undefined,
|
||||
audioAsVoice: audioTagResult.audioAsVoice,
|
||||
};
|
||||
})
|
||||
|
||||
@@ -1,25 +1,25 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { extractAudioTag } from "./audio-tags.js";
|
||||
import { parseAudioTag } from "./audio-tags.js";
|
||||
|
||||
describe("extractAudioTag", () => {
|
||||
describe("parseAudioTag", () => {
|
||||
it("detects audio_as_voice and strips the tag", () => {
|
||||
const result = extractAudioTag("Hello [[audio_as_voice]] world");
|
||||
const result = parseAudioTag("Hello [[audio_as_voice]] world");
|
||||
expect(result.audioAsVoice).toBe(true);
|
||||
expect(result.hasTag).toBe(true);
|
||||
expect(result.cleaned).toBe("Hello world");
|
||||
expect(result.hadTag).toBe(true);
|
||||
expect(result.text).toBe("Hello world");
|
||||
});
|
||||
|
||||
it("returns empty output for missing text", () => {
|
||||
const result = extractAudioTag(undefined);
|
||||
const result = parseAudioTag(undefined);
|
||||
expect(result.audioAsVoice).toBe(false);
|
||||
expect(result.hasTag).toBe(false);
|
||||
expect(result.cleaned).toBe("");
|
||||
expect(result.hadTag).toBe(false);
|
||||
expect(result.text).toBe("");
|
||||
});
|
||||
|
||||
it("removes tag-only messages", () => {
|
||||
const result = extractAudioTag("[[audio_as_voice]]");
|
||||
const result = parseAudioTag("[[audio_as_voice]]");
|
||||
expect(result.audioAsVoice).toBe(true);
|
||||
expect(result.cleaned).toBe("");
|
||||
expect(result.text).toBe("");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,31 +1 @@
|
||||
/**
|
||||
* Extract audio mode tag from text.
|
||||
* Supports [[audio_as_voice]] to send audio as voice bubble instead of file.
|
||||
* Default is file (preserves backward compatibility).
|
||||
*/
|
||||
export function extractAudioTag(text?: string): {
|
||||
cleaned: string;
|
||||
audioAsVoice: boolean;
|
||||
hasTag: boolean;
|
||||
} {
|
||||
if (!text) return { cleaned: "", audioAsVoice: false, hasTag: false };
|
||||
let cleaned = text;
|
||||
let audioAsVoice = false; // default: audio file (backward compatible)
|
||||
let hasTag = false;
|
||||
|
||||
// [[audio_as_voice]] -> send as voice bubble (opt-in)
|
||||
const voiceMatch = cleaned.match(/\[\[audio_as_voice\]\]/i);
|
||||
if (voiceMatch) {
|
||||
cleaned = cleaned.replace(/\[\[audio_as_voice\]\]/gi, " ");
|
||||
audioAsVoice = true;
|
||||
hasTag = true;
|
||||
}
|
||||
|
||||
// Clean up whitespace
|
||||
cleaned = cleaned
|
||||
.replace(/[ \t]+/g, " ")
|
||||
.replace(/[ \t]*\n[ \t]*/g, "\n")
|
||||
.trim();
|
||||
|
||||
return { cleaned, audioAsVoice, hasTag };
|
||||
}
|
||||
export { parseAudioTag } from "../../media/audio-tags.js";
|
||||
|
||||
@@ -13,6 +13,28 @@ export type BlockReplyPipeline = {
|
||||
hasSentPayload: (payload: ReplyPayload) => boolean;
|
||||
};
|
||||
|
||||
export type BlockReplyBuffer = {
|
||||
shouldBuffer: (payload: ReplyPayload) => boolean;
|
||||
onEnqueue?: (payload: ReplyPayload) => void;
|
||||
finalize?: (payload: ReplyPayload) => ReplyPayload;
|
||||
};
|
||||
|
||||
export function createAudioAsVoiceBuffer(params: {
|
||||
isAudioPayload: (payload: ReplyPayload) => boolean;
|
||||
}): BlockReplyBuffer {
|
||||
let seenAudioAsVoice = false;
|
||||
return {
|
||||
onEnqueue: (payload) => {
|
||||
if (payload.audioAsVoice) {
|
||||
seenAudioAsVoice = true;
|
||||
}
|
||||
},
|
||||
shouldBuffer: (payload) => params.isAudioPayload(payload),
|
||||
finalize: (payload) =>
|
||||
seenAudioAsVoice ? { ...payload, audioAsVoice: true } : payload,
|
||||
};
|
||||
}
|
||||
|
||||
export function createBlockReplyPayloadKey(payload: ReplyPayload): string {
|
||||
const text = payload.text?.trim() ?? "";
|
||||
const mediaList = payload.mediaUrls?.length
|
||||
@@ -51,12 +73,15 @@ export function createBlockReplyPipeline(params: {
|
||||
) => Promise<void> | void;
|
||||
timeoutMs: number;
|
||||
coalescing?: BlockStreamingCoalescing;
|
||||
buffer?: BlockReplyBuffer;
|
||||
}): BlockReplyPipeline {
|
||||
const { onBlockReply, timeoutMs, coalescing } = params;
|
||||
const { onBlockReply, timeoutMs, coalescing, buffer } = params;
|
||||
const sentKeys = new Set<string>();
|
||||
const pendingKeys = new Set<string>();
|
||||
const seenKeys = new Set<string>();
|
||||
const bufferedKeys = new Set<string>();
|
||||
const bufferedPayloadKeys = new Set<string>();
|
||||
const bufferedPayloads: ReplyPayload[] = [];
|
||||
let sendChain: Promise<void> = Promise.resolve();
|
||||
let aborted = false;
|
||||
let didStream = false;
|
||||
@@ -124,8 +149,37 @@ export function createBlockReplyPipeline(params: {
|
||||
})
|
||||
: null;
|
||||
|
||||
const bufferPayload = (payload: ReplyPayload) => {
|
||||
buffer?.onEnqueue?.(payload);
|
||||
if (!buffer?.shouldBuffer(payload)) return false;
|
||||
const payloadKey = createBlockReplyPayloadKey(payload);
|
||||
if (
|
||||
seenKeys.has(payloadKey) ||
|
||||
sentKeys.has(payloadKey) ||
|
||||
pendingKeys.has(payloadKey) ||
|
||||
bufferedPayloadKeys.has(payloadKey)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
seenKeys.add(payloadKey);
|
||||
bufferedPayloadKeys.add(payloadKey);
|
||||
bufferedPayloads.push(payload);
|
||||
return true;
|
||||
};
|
||||
|
||||
const flushBuffered = () => {
|
||||
if (!bufferedPayloads.length) return;
|
||||
for (const payload of bufferedPayloads) {
|
||||
const finalPayload = buffer?.finalize?.(payload) ?? payload;
|
||||
sendPayload(finalPayload, true);
|
||||
}
|
||||
bufferedPayloads.length = 0;
|
||||
bufferedPayloadKeys.clear();
|
||||
};
|
||||
|
||||
const enqueue = (payload: ReplyPayload) => {
|
||||
if (aborted) return;
|
||||
if (bufferPayload(payload)) return;
|
||||
const hasMedia =
|
||||
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
if (hasMedia) {
|
||||
@@ -151,6 +205,7 @@ export function createBlockReplyPipeline(params: {
|
||||
|
||||
const flush = async (options?: { force?: boolean }) => {
|
||||
await coalescer?.flush(options);
|
||||
flushBuffered();
|
||||
await sendChain;
|
||||
};
|
||||
|
||||
@@ -162,7 +217,8 @@ export function createBlockReplyPipeline(params: {
|
||||
enqueue,
|
||||
flush,
|
||||
stop,
|
||||
hasBuffered: () => Boolean(coalescer?.hasBuffered()),
|
||||
hasBuffered: () =>
|
||||
Boolean(coalescer?.hasBuffered() || bufferedPayloads.length > 0),
|
||||
didStream: () => didStream,
|
||||
isAborted: () => aborted,
|
||||
hasSentPayload: (payload) => {
|
||||
|
||||
@@ -7,7 +7,7 @@ import { tryFastAbortFromMessage } from "./abort.js";
|
||||
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
|
||||
import { isRoutableChannel, routeReply } from "./route-reply.js";
|
||||
|
||||
type DispatchFromConfigResult = {
|
||||
export type DispatchFromConfigResult = {
|
||||
queuedFinal: boolean;
|
||||
counts: Record<ReplyDispatchKind, number>;
|
||||
};
|
||||
|
||||
34
src/auto-reply/reply/provider-dispatcher.ts
Normal file
34
src/auto-reply/reply/provider-dispatcher.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import type { ClawdbotConfig } from "../../config/config.js";
|
||||
import type { MsgContext } from "../templating.js";
|
||||
import type { GetReplyOptions } from "../types.js";
|
||||
import type { DispatchFromConfigResult } from "./dispatch-from-config.js";
|
||||
import { dispatchReplyFromConfig } from "./dispatch-from-config.js";
|
||||
import {
|
||||
createReplyDispatcherWithTyping,
|
||||
type ReplyDispatcherWithTypingOptions,
|
||||
} from "./reply-dispatcher.js";
|
||||
|
||||
export async function dispatchReplyWithBufferedBlockDispatcher(params: {
|
||||
ctx: MsgContext;
|
||||
cfg: ClawdbotConfig;
|
||||
dispatcherOptions: ReplyDispatcherWithTypingOptions;
|
||||
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
|
||||
replyResolver?: typeof import("../reply.js").getReplyFromConfig;
|
||||
}): Promise<DispatchFromConfigResult> {
|
||||
const { dispatcher, replyOptions, markDispatchIdle } =
|
||||
createReplyDispatcherWithTyping(params.dispatcherOptions);
|
||||
|
||||
const result = await dispatchReplyFromConfig({
|
||||
ctx: params.ctx,
|
||||
cfg: params.cfg,
|
||||
dispatcher,
|
||||
replyResolver: params.replyResolver,
|
||||
replyOptions: {
|
||||
...params.replyOptions,
|
||||
...replyOptions,
|
||||
},
|
||||
});
|
||||
|
||||
markDispatchIdle();
|
||||
return result;
|
||||
}
|
||||
@@ -22,7 +22,7 @@ export type ReplyDispatcherOptions = {
|
||||
onError?: ReplyDispatchErrorHandler;
|
||||
};
|
||||
|
||||
type ReplyDispatcherWithTypingOptions = Omit<
|
||||
export type ReplyDispatcherWithTypingOptions = Omit<
|
||||
ReplyDispatcherOptions,
|
||||
"onIdle"
|
||||
> & {
|
||||
|
||||
Reference in New Issue
Block a user