feat: telegram draft streaming

This commit is contained in:
Peter Steinberger
2026-01-07 11:08:11 +01:00
parent e8420bd047
commit a700f9896d
26 changed files with 458 additions and 52 deletions

View File

@@ -659,6 +659,10 @@ export async function runEmbeddedPiAgent(params: {
}) => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
blockReplyChunking?: BlockReplyChunking;
onReasoningStream?: (payload: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
onToolResult?: (payload: {
text?: string;
mediaUrls?: string[];
@@ -917,9 +921,10 @@ export async function runEmbeddedPiAgent(params: {
session,
runId: params.runId,
verboseLevel: params.verboseLevel,
includeReasoning: params.reasoningLevel === "on",
reasoningMode: params.reasoningLevel ?? "off",
shouldEmitToolResult: params.shouldEmitToolResult,
onToolResult: params.onToolResult,
onReasoningStream: params.onReasoningStream,
onBlockReply: params.onBlockReply,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,

View File

@@ -147,7 +147,7 @@ describe("subscribeEmbeddedPiSession", () => {
runId: "run",
onBlockReply,
blockReplyBreak: "message_end",
includeReasoning: true,
reasoningMode: "on",
});
const assistantMessage = {

View File

@@ -1,7 +1,7 @@
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import type { AgentSession } from "@mariozechner/pi-coding-agent";
import type { ReasoningLevel } from "../auto-reply/thinking.js";
import { formatToolAggregate } from "../auto-reply/tool-meta.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { createSubsystemLogger } from "../logging.js";
@@ -18,6 +18,8 @@ import {
const THINKING_TAG_RE = /<\s*\/?\s*think(?:ing)?\s*>/gi;
const THINKING_OPEN_RE = /<\s*think(?:ing)?\s*>/i;
const THINKING_CLOSE_RE = /<\s*\/\s*think(?:ing)?\s*>/i;
const THINKING_OPEN_GLOBAL_RE = /<\s*think(?:ing)?\s*>/gi;
const THINKING_CLOSE_GLOBAL_RE = /<\s*\/\s*think(?:ing)?\s*>/gi;
const TOOL_RESULT_MAX_CHARS = 8000;
const log = createSubsystemLogger("agent/embedded");
@@ -87,12 +89,16 @@ export function subscribeEmbeddedPiSession(params: {
session: AgentSession;
runId: string;
verboseLevel?: "off" | "on";
includeReasoning?: boolean;
reasoningMode?: ReasoningLevel;
shouldEmitToolResult?: () => boolean;
onToolResult?: (payload: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
onReasoningStream?: (payload: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
onBlockReply?: (payload: {
text?: string;
mediaUrls?: string[];
@@ -114,9 +120,15 @@ export function subscribeEmbeddedPiSession(params: {
const toolMetaById = new Map<string, string | undefined>();
const toolSummaryById = new Set<string>();
const blockReplyBreak = params.blockReplyBreak ?? "text_end";
const reasoningMode = params.reasoningMode ?? "off";
const includeReasoning = reasoningMode === "on";
const streamReasoning =
reasoningMode === "stream" &&
typeof params.onReasoningStream === "function";
let deltaBuffer = "";
let blockBuffer = "";
let lastStreamedAssistant: string | undefined;
let lastStreamedReasoning: string | undefined;
let lastBlockReplyText: string | undefined;
let assistantTextBaseline = 0;
let compactionInFlight = false;
@@ -238,6 +250,39 @@ export function subscribeEmbeddedPiSession(params: {
return result.trim();
};
const extractThinkingFromStream = (text: string): string => {
if (!text) return "";
const closed = extractThinkingFromText(text);
if (closed) return closed;
const openMatches = [...text.matchAll(THINKING_OPEN_GLOBAL_RE)];
if (openMatches.length === 0) return "";
const closeMatches = [...text.matchAll(THINKING_CLOSE_GLOBAL_RE)];
const lastOpen = openMatches[openMatches.length - 1];
const lastClose = closeMatches[closeMatches.length - 1];
if (lastClose && (lastClose.index ?? -1) > (lastOpen.index ?? -1)) {
return closed;
}
const start = (lastOpen.index ?? 0) + lastOpen[0].length;
return text.slice(start).trim();
};
const formatReasoningDraft = (text: string): string => {
const trimmed = text.trim();
if (!trimmed) return "";
return `Reasoning:\n${trimmed}`;
};
const emitReasoningStream = (text: string) => {
if (!streamReasoning || !params.onReasoningStream) return;
const formatted = formatReasoningDraft(text);
if (!formatted) return;
if (formatted === lastStreamedReasoning) return;
lastStreamedReasoning = formatted;
void params.onReasoningStream({
text: formatted,
});
};
const resetForCompactionRetry = () => {
assistantTexts.length = 0;
toolMetas.length = 0;
@@ -247,6 +292,7 @@ export function subscribeEmbeddedPiSession(params: {
blockBuffer = "";
blockChunker?.reset();
lastStreamedAssistant = undefined;
lastStreamedReasoning = undefined;
lastBlockReplyText = undefined;
assistantTextBaseline = 0;
};
@@ -266,6 +312,7 @@ export function subscribeEmbeddedPiSession(params: {
blockChunker?.reset();
lastStreamedAssistant = undefined;
lastBlockReplyText = undefined;
lastStreamedReasoning = undefined;
lastReasoningSent = undefined;
assistantTextBaseline = assistantTexts.length;
}
@@ -436,6 +483,11 @@ export function subscribeEmbeddedPiSession(params: {
}
}
if (streamReasoning) {
// Handle partial <think> tags: stream whatever reasoning is visible so far.
emitReasoningStream(extractThinkingFromStream(deltaBuffer));
}
const cleaned = params.enforceFinalTag
? stripThinkingSegments(stripUnpairedThinkingTags(deltaBuffer))
: stripThinkingSegments(deltaBuffer);
@@ -502,17 +554,19 @@ export function subscribeEmbeddedPiSession(params: {
params.enforceFinalTag && cleaned
? (extractFinalText(cleaned)?.trim() ?? cleaned)
: cleaned;
const rawThinking = params.includeReasoning
? extractAssistantThinking(assistantMessage) ||
extractThinkingFromText(rawText)
: "";
const rawThinking =
includeReasoning || streamReasoning
? extractAssistantThinking(assistantMessage) ||
extractThinkingFromText(rawText)
: "";
const formattedReasoning = rawThinking
? formatReasoningMarkdown(rawThinking)
: "";
const text =
baseText && formattedReasoning
const text = includeReasoning
? baseText && formattedReasoning
? `${formattedReasoning}\n\n${baseText}`
: formattedReasoning || baseText;
: formattedReasoning || baseText
: baseText;
const addedDuringMessage =
assistantTexts.length > assistantTextBaseline;
@@ -550,6 +604,7 @@ export function subscribeEmbeddedPiSession(params: {
}
const onBlockReply = params.onBlockReply;
const shouldEmitReasoningBlock =
includeReasoning &&
Boolean(formattedReasoning) &&
Boolean(onBlockReply) &&
formattedReasoning !== lastReasoningSent &&
@@ -558,6 +613,9 @@ export function subscribeEmbeddedPiSession(params: {
lastReasoningSent = formattedReasoning;
void onBlockReply({ text: formattedReasoning });
}
if (streamReasoning && rawThinking) {
emitReasoningStream(rawThinking);
}
deltaBuffer = "";
blockBuffer = "";
blockChunker?.reset();

View File

@@ -106,6 +106,12 @@ describe("directive parsing", () => {
expect(res.reasoningLevel).toBe("on");
});
it("matches reasoning stream directive", () => {
const res = extractReasoningDirective("/reasoning stream please");
expect(res.hasDirective).toBe(true);
expect(res.reasoningLevel).toBe("stream");
});
it("matches elevated with leading space", () => {
const res = extractElevatedDirective(" please /elevated on now");
expect(res.hasDirective).toBe(true);

View File

@@ -401,7 +401,8 @@ export async function getReplyFromConfig(
agentCfg?.blockStreamingBreak === "message_end"
? "message_end"
: "text_end";
const blockStreamingEnabled = resolvedBlockStreaming === "on";
const blockStreamingEnabled =
resolvedBlockStreaming === "on" && opts?.disableBlockStreaming !== true;
const blockReplyChunking = blockStreamingEnabled
? resolveBlockStreamingChunking(cfg, sessionCtx.Provider)
: undefined;

View File

@@ -197,6 +197,9 @@ export async function runReplyAgent(params: {
let fallbackProvider = followupRun.run.provider;
let fallbackModel = followupRun.run.model;
try {
const allowPartialStream = !(
followupRun.run.reasoningLevel === "stream" && opts?.onReasoningStream
);
const fallbackResult = await runWithModelFallback({
cfg: followupRun.run.config,
provider: followupRun.run.provider,
@@ -227,32 +230,41 @@ export async function runReplyAgent(params: {
runId,
blockReplyBreak: resolvedBlockStreamingBreak,
blockReplyChunking,
onPartialReply: opts?.onPartialReply
? async (payload) => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
onPartialReply:
opts?.onPartialReply && allowPartialStream
? async (payload) => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
if (!isHeartbeat) {
await typing.startTypingOnText(text);
}
await opts.onPartialReply?.({
text,
mediaUrls: payload.mediaUrls,
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
if (!isHeartbeat) {
await typing.startTypingOnText(text);
}
await opts.onPartialReply?.({
text,
: undefined,
onReasoningStream: opts?.onReasoningStream
? async (payload) => {
await opts.onReasoningStream?.({
text: payload.text,
mediaUrls: payload.mediaUrls,
});
}

View File

@@ -385,7 +385,7 @@ export async function handleDirectiveOnly(params: {
}
if (directives.hasReasoningDirective && !directives.reasoningLevel) {
return {
text: `Unrecognized reasoning level "${directives.rawReasoningLevel ?? ""}". Valid levels: on, off.`,
text: `Unrecognized reasoning level "${directives.rawReasoningLevel ?? ""}". Valid levels: on, off, stream.`,
};
}
if (directives.hasElevatedDirective && !directives.elevatedLevel) {
@@ -563,7 +563,9 @@ export async function handleDirectiveOnly(params: {
parts.push(
directives.reasoningLevel === "off"
? `${SYSTEM_MARK} Reasoning visibility disabled.`
: `${SYSTEM_MARK} Reasoning visibility enabled.`,
: directives.reasoningLevel === "stream"
? `${SYSTEM_MARK} Reasoning stream enabled (Telegram only).`
: `${SYSTEM_MARK} Reasoning visibility enabled.`,
);
}
if (directives.hasElevatedDirective && directives.elevatedLevel) {

View File

@@ -17,4 +17,9 @@ describe("normalizeReasoningLevel", () => {
expect(normalizeReasoningLevel("show")).toBe("on");
expect(normalizeReasoningLevel("hide")).toBe("off");
});
it("accepts stream", () => {
expect(normalizeReasoningLevel("stream")).toBe("stream");
expect(normalizeReasoningLevel("streaming")).toBe("stream");
});
});

View File

@@ -1,7 +1,7 @@
export type ThinkLevel = "off" | "minimal" | "low" | "medium" | "high";
export type VerboseLevel = "off" | "on";
export type ElevatedLevel = "off" | "on";
export type ReasoningLevel = "off" | "on";
export type ReasoningLevel = "off" | "on" | "stream";
// Normalize user-provided thinking level strings to the canonical enum.
export function normalizeThinkLevel(
@@ -82,5 +82,6 @@ export function normalizeReasoningLevel(
)
)
return "on";
if (["stream", "streaming", "draft", "live"].includes(key)) return "stream";
return undefined;
}

View File

@@ -5,8 +5,10 @@ export type GetReplyOptions = {
onTypingController?: (typing: TypingController) => void;
isHeartbeat?: boolean;
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
onReasoningStream?: (payload: ReplyPayload) => Promise<void> | void;
onBlockReply?: (payload: ReplyPayload) => Promise<void> | void;
onToolResult?: (payload: ReplyPayload) => Promise<void> | void;
disableBlockStreaming?: boolean;
};
export type ReplyPayload = {

View File

@@ -713,6 +713,16 @@ describe("legacy config detection", () => {
}
});
it("defaults telegram.streamMode to partial when telegram section exists", async () => {
vi.resetModules();
const { validateConfigObject } = await import("./config.js");
const res = validateConfigObject({ telegram: {} });
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.telegram?.streamMode).toBe("partial");
}
});
it('rejects whatsapp.dmPolicy="open" without allowFrom "*"', async () => {
vi.resetModules();
const { validateConfigObject } = await import("./config.js");

View File

@@ -107,6 +107,7 @@ const FIELD_LABELS: Record<string, string> = {
"talk.apiKey": "Talk API Key",
"telegram.botToken": "Telegram Bot Token",
"telegram.dmPolicy": "Telegram DM Policy",
"telegram.streamMode": "Telegram Stream Mode",
"whatsapp.dmPolicy": "WhatsApp DM Policy",
"signal.dmPolicy": "Signal DM Policy",
"imessage.dmPolicy": "iMessage DM Policy",
@@ -155,6 +156,8 @@ const FIELD_HELP: Record<string, string> = {
'When to send ack reactions ("group-mentions", "group-all", "direct", "all").',
"telegram.dmPolicy":
'Direct message access control ("pairing" recommended). "open" requires telegram.allowFrom=["*"].',
"telegram.streamMode":
"Draft streaming mode for Telegram replies (off | partial | block). Requires private topics + sendMessageDraft.",
"whatsapp.dmPolicy":
'Direct message access control ("pairing" recommended). "open" requires whatsapp.allowFrom=["*"].',
"signal.dmPolicy":

View File

@@ -270,6 +270,8 @@ export type TelegramConfig = {
groupPolicy?: GroupPolicy;
/** Outbound text chunk size (chars). Default: 4000. */
textChunkLimit?: number;
/** Draft streaming mode for Telegram (off|partial|block). Default: partial. */
streamMode?: "off" | "partial" | "block";
mediaMaxMb?: number;
proxy?: string;
webhookUrl?: string;

View File

@@ -793,6 +793,10 @@ export const ClawdbotSchema = z.object({
groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(),
groupPolicy: GroupPolicySchema.optional().default("open"),
textChunkLimit: z.number().int().positive().optional(),
streamMode: z
.enum(["off", "partial", "block"])
.optional()
.default("partial"),
mediaMaxMb: z.number().positive().optional(),
proxy: z.string().optional(),
webhookUrl: z.string().optional(),

View File

@@ -446,7 +446,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
ok: false,
error: {
code: ErrorCodes.INVALID_REQUEST,
message: `invalid reasoningLevel: ${String(raw)}`,
message: `invalid reasoningLevel: ${String(raw)} (use on|off|stream)`,
},
};
}

View File

@@ -224,7 +224,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
'invalid reasoningLevel (use "on"|"off")',
'invalid reasoningLevel (use "on"|"off"|"stream")',
),
);
return;

View File

@@ -4,6 +4,7 @@ import { Buffer } from "node:buffer";
import { apiThrottler } from "@grammyjs/transformer-throttler";
import type { ApiClientOptions, Message } from "grammy";
import { Bot, InputFile, webhookCallback } from "grammy";
import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js";
import {
chunkMarkdownText,
resolveTextChunkLimit,
@@ -14,6 +15,7 @@ import {
listNativeCommandSpecs,
} from "../auto-reply/commands-registry.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { resolveBlockStreamingChunking } from "../auto-reply/reply/block-streaming.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import {
buildMentionRegexes,
@@ -43,6 +45,7 @@ import {
import { resolveAgentRoute } from "../routing/resolve-route.js";
import type { RuntimeEnv } from "../runtime.js";
import { loadWebMedia } from "../web/media.js";
import { createTelegramDraftStream } from "./draft-stream.js";
import {
readTelegramAllowFromStore,
upsertTelegramPairingRequest,
@@ -57,6 +60,8 @@ const MEDIA_GROUP_TIMEOUT_MS = 500;
type TelegramMessage = Message.CommonMessage;
type TelegramStreamMode = "off" | "partial" | "block";
type MediaGroupEntry = {
messages: Array<{
msg: TelegramMessage;
@@ -164,6 +169,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
);
};
const replyToMode = opts.replyToMode ?? cfg.telegram?.replyToMode ?? "off";
const streamMode = resolveTelegramStreamMode(cfg);
const nativeEnabled = cfg.commands?.native === true;
const nativeDisabledExplicit = cfg.commands?.native === false;
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
@@ -173,6 +179,23 @@ export function createTelegramBot(opts: TelegramBotOptions) {
(opts.mediaMaxMb ?? cfg.telegram?.mediaMaxMb ?? 5) * 1024 * 1024;
const logger = getChildLogger({ module: "telegram-auto-reply" });
const mentionRegexes = buildMentionRegexes(cfg);
let botHasTopicsEnabled: boolean | undefined;
const resolveBotTopicsEnabled = async (ctx?: TelegramContext) => {
const fromCtx = ctx?.me as { has_topics_enabled?: boolean } | undefined;
if (typeof fromCtx?.has_topics_enabled === "boolean") {
botHasTopicsEnabled = fromCtx.has_topics_enabled;
return botHasTopicsEnabled;
}
if (typeof botHasTopicsEnabled === "boolean") return botHasTopicsEnabled;
try {
const me = (await bot.api.getMe()) as { has_topics_enabled?: boolean };
botHasTopicsEnabled = Boolean(me?.has_topics_enabled);
} catch (err) {
logVerbose(`telegram getMe failed: ${String(err)}`);
botHasTopicsEnabled = false;
}
return botHasTopicsEnabled;
};
const resolveGroupPolicy = (chatId: string | number) =>
resolveProviderGroupPolicy({
cfg,
@@ -397,7 +420,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
kind: isGroup ? "group" : "dm",
id: isGroup
? buildTelegramGroupPeerId(chatId, messageThreadId)
: String(chatId),
: buildTelegramDmPeerId(chatId, messageThreadId),
},
});
const ctxPayload = {
@@ -471,10 +494,88 @@ export function createTelegramBot(opts: TelegramBotOptions) {
);
}
const isPrivateChat = msg.chat.type === "private";
const draftMaxChars = Math.min(textLimit, 4096);
const canStreamDraft =
streamMode !== "off" &&
isPrivateChat &&
typeof messageThreadId === "number" &&
(await resolveBotTopicsEnabled(primaryCtx));
const draftStream = canStreamDraft
? createTelegramDraftStream({
api: bot.api,
chatId,
draftId: msg.message_id || Date.now(),
maxChars: draftMaxChars,
messageThreadId,
log: logVerbose,
warn: logVerbose,
})
: undefined;
const draftChunking =
draftStream && streamMode === "block"
? resolveBlockStreamingChunking(cfg, "telegram")
: undefined;
const draftChunker = draftChunking
? new EmbeddedBlockChunker(draftChunking)
: undefined;
let lastPartialText = "";
let draftText = "";
const updateDraftFromPartial = (text?: string) => {
if (!draftStream || !text) return;
if (text === lastPartialText) return;
if (streamMode === "partial") {
lastPartialText = text;
draftStream.update(text);
return;
}
let delta = text;
if (text.startsWith(lastPartialText)) {
delta = text.slice(lastPartialText.length);
} else {
// Streaming buffer reset (or non-monotonic stream). Start fresh.
draftChunker?.reset();
draftText = "";
}
lastPartialText = text;
if (!delta) return;
if (!draftChunker) {
draftText = text;
draftStream.update(draftText);
return;
}
draftChunker.append(delta);
draftChunker.drain({
force: false,
emit: (chunk) => {
draftText += chunk;
draftStream.update(draftText);
},
});
};
const flushDraft = async () => {
if (!draftStream) return;
if (draftChunker?.hasBuffered()) {
draftChunker.drain({
force: true,
emit: (chunk) => {
draftText += chunk;
},
});
draftChunker.reset();
if (draftText) draftStream.update(draftText);
}
await draftStream.flush();
};
const { dispatcher, replyOptions, markDispatchIdle } =
createReplyDispatcherWithTyping({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
deliver: async (payload, info) => {
if (info.kind === "final") {
await flushDraft();
draftStream?.stop();
}
await deliverReplies({
replies: [payload],
chatId: String(chatId),
@@ -498,9 +599,21 @@ export function createTelegramBot(opts: TelegramBotOptions) {
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions,
replyOptions: {
...replyOptions,
onPartialReply: draftStream
? (payload) => updateDraftFromPartial(payload.text)
: undefined,
onReasoningStream: draftStream
? (payload) => {
if (payload.text) draftStream.update(payload.text);
}
: undefined,
disableBlockStreaming: Boolean(draftStream),
},
});
markDispatchIdle();
draftStream?.stop();
if (!queuedFinal) return;
};
@@ -602,7 +715,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
kind: isGroup ? "group" : "dm",
id: isGroup
? buildTelegramGroupPeerId(chatId, messageThreadId)
: String(chatId),
: buildTelegramDmPeerId(chatId, messageThreadId),
},
});
const ctxPayload = {
@@ -925,6 +1038,14 @@ function buildTelegramThreadParams(messageThreadId?: number) {
: undefined;
}
function resolveTelegramStreamMode(
cfg: ReturnType<typeof loadConfig>,
): TelegramStreamMode {
const raw = cfg.telegram?.streamMode?.trim().toLowerCase();
if (raw === "off" || raw === "partial" || raw === "block") return raw;
return "partial";
}
function buildTelegramGroupPeerId(
chatId: number | string,
messageThreadId?: number,
@@ -934,6 +1055,15 @@ function buildTelegramGroupPeerId(
: String(chatId);
}
function buildTelegramDmPeerId(
chatId: number | string,
messageThreadId?: number,
) {
return messageThreadId != null
? `${chatId}:topic:${messageThreadId}`
: String(chatId);
}
function buildTelegramGroupFrom(
chatId: number | string,
messageThreadId?: number,

View File

@@ -0,0 +1,130 @@
import type { Bot } from "grammy";
const TELEGRAM_DRAFT_MAX_CHARS = 4096;
const DEFAULT_THROTTLE_MS = 300;
export type TelegramDraftStream = {
update: (text: string) => void;
flush: () => Promise<void>;
stop: () => void;
};
export function createTelegramDraftStream(params: {
api: Bot["api"];
chatId: number;
draftId: number;
maxChars?: number;
messageThreadId?: number;
throttleMs?: number;
log?: (message: string) => void;
warn?: (message: string) => void;
}): TelegramDraftStream {
const maxChars = Math.min(
params.maxChars ?? TELEGRAM_DRAFT_MAX_CHARS,
TELEGRAM_DRAFT_MAX_CHARS,
);
const throttleMs = Math.max(50, params.throttleMs ?? DEFAULT_THROTTLE_MS);
const rawDraftId = Number.isFinite(params.draftId)
? Math.trunc(params.draftId)
: 1;
const draftId = rawDraftId === 0 ? 1 : Math.abs(rawDraftId);
const chatId = params.chatId;
const threadParams =
typeof params.messageThreadId === "number"
? { message_thread_id: Math.trunc(params.messageThreadId) }
: undefined;
let lastSentText = "";
let lastSentAt = 0;
let pendingText = "";
let inFlight = false;
let timer: ReturnType<typeof setTimeout> | undefined;
let stopped = false;
const sendDraft = async (text: string) => {
if (stopped) return;
const trimmed = text.trimEnd();
if (!trimmed) return;
if (trimmed.length > maxChars) {
// Drafts are capped at 4096 chars. Stop streaming once we exceed the cap
// so we don't keep sending failing updates or a truncated preview.
stopped = true;
params.warn?.(
`telegram draft stream stopped (draft length ${trimmed.length} > ${maxChars})`,
);
return;
}
if (trimmed === lastSentText) return;
lastSentText = trimmed;
lastSentAt = Date.now();
try {
await params.api.sendMessageDraft(chatId, draftId, trimmed, threadParams);
} catch (err) {
stopped = true;
params.warn?.(
`telegram draft stream failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
};
const flush = async () => {
if (timer) {
clearTimeout(timer);
timer = undefined;
}
if (inFlight) {
schedule();
return;
}
const text = pendingText;
pendingText = "";
if (!text.trim()) {
if (pendingText) schedule();
return;
}
inFlight = true;
try {
await sendDraft(text);
} finally {
inFlight = false;
}
if (pendingText) schedule();
};
const schedule = () => {
if (timer) return;
const delay = Math.max(0, throttleMs - (Date.now() - lastSentAt));
timer = setTimeout(() => {
void flush();
}, delay);
};
const update = (text: string) => {
if (stopped) return;
pendingText = text;
if (inFlight) {
schedule();
return;
}
if (!timer && Date.now() - lastSentAt >= throttleMs) {
void flush();
return;
}
schedule();
};
const stop = () => {
stopped = true;
pendingText = "";
if (timer) {
clearTimeout(timer);
timer = undefined;
}
};
params.log?.(
`telegram draft stream ready (draftId=${draftId}, maxChars=${maxChars}, throttleMs=${throttleMs})`,
);
return { update, flush, stop };
}