fix(telegram): prevent stuck typing after tool runs
This commit is contained in:
@@ -314,6 +314,9 @@ export async function runReplyAgent(params: {
|
|||||||
shouldEmitToolResult,
|
shouldEmitToolResult,
|
||||||
onToolResult: opts?.onToolResult
|
onToolResult: opts?.onToolResult
|
||||||
? (payload) => {
|
? (payload) => {
|
||||||
|
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them.
|
||||||
|
// If a tool callback starts typing after the run finalized, we can end up with
|
||||||
|
// a typing loop that never sees a matching markRunComplete(). Track and drain.
|
||||||
const task = (async () => {
|
const task = (async () => {
|
||||||
let text = payload.text;
|
let text = payload.text;
|
||||||
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||||
@@ -384,13 +387,16 @@ export async function runReplyAgent(params: {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const payloadArray = runResult.payloads ?? [];
|
const payloadArray = runResult.payloads ?? [];
|
||||||
if (payloadArray.length === 0) return finalizeWithFollowup(undefined);
|
|
||||||
if (pendingBlockTasks.size > 0) {
|
if (pendingBlockTasks.size > 0) {
|
||||||
await Promise.allSettled(pendingBlockTasks);
|
await Promise.allSettled(pendingBlockTasks);
|
||||||
}
|
}
|
||||||
if (pendingToolTasks.size > 0) {
|
if (pendingToolTasks.size > 0) {
|
||||||
await Promise.allSettled(pendingToolTasks);
|
await Promise.allSettled(pendingToolTasks);
|
||||||
}
|
}
|
||||||
|
// Drain any late tool/block deliveries before deciding there's "nothing to send".
|
||||||
|
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
|
||||||
|
// keep the typing indicator stuck.
|
||||||
|
if (payloadArray.length === 0) return finalizeWithFollowup(undefined);
|
||||||
|
|
||||||
const sanitizedPayloads = isHeartbeat
|
const sanitizedPayloads = isHeartbeat
|
||||||
? payloadArray
|
? payloadArray
|
||||||
|
|||||||
@@ -51,4 +51,28 @@ describe("typing controller", () => {
|
|||||||
vi.advanceTimersByTime(2_000);
|
vi.advanceTimersByTime(2_000);
|
||||||
expect(onReplyStart).toHaveBeenCalledTimes(3);
|
expect(onReplyStart).toHaveBeenCalledTimes(3);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("does not restart typing after it has stopped", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const onReplyStart = vi.fn(async () => {});
|
||||||
|
const typing = createTypingController({
|
||||||
|
onReplyStart,
|
||||||
|
typingIntervalSeconds: 1,
|
||||||
|
typingTtlMs: 30_000,
|
||||||
|
});
|
||||||
|
|
||||||
|
await typing.startTypingLoop();
|
||||||
|
expect(onReplyStart).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
typing.markRunComplete();
|
||||||
|
typing.markDispatchIdle();
|
||||||
|
|
||||||
|
vi.advanceTimersByTime(5_000);
|
||||||
|
expect(onReplyStart).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Late callbacks should be ignored and must not restart the interval.
|
||||||
|
await typing.startTypingOnText("late tool result");
|
||||||
|
vi.advanceTimersByTime(5_000);
|
||||||
|
expect(onReplyStart).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -26,6 +26,10 @@ export function createTypingController(params: {
|
|||||||
let active = false;
|
let active = false;
|
||||||
let runComplete = false;
|
let runComplete = false;
|
||||||
let dispatchIdle = false;
|
let dispatchIdle = false;
|
||||||
|
// Important: callbacks (tool/block streaming) can fire late (after the run completed),
|
||||||
|
// especially when upstream event emitters don't await async listeners.
|
||||||
|
// Once we stop typing, we "seal" the controller so late events can't restart typing forever.
|
||||||
|
let sealed = false;
|
||||||
let typingTimer: NodeJS.Timeout | undefined;
|
let typingTimer: NodeJS.Timeout | undefined;
|
||||||
let typingTtlTimer: NodeJS.Timeout | undefined;
|
let typingTtlTimer: NodeJS.Timeout | undefined;
|
||||||
const typingIntervalMs = typingIntervalSeconds * 1000;
|
const typingIntervalMs = typingIntervalSeconds * 1000;
|
||||||
@@ -43,6 +47,7 @@ export function createTypingController(params: {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const cleanup = () => {
|
const cleanup = () => {
|
||||||
|
if (sealed) return;
|
||||||
if (typingTtlTimer) {
|
if (typingTtlTimer) {
|
||||||
clearTimeout(typingTtlTimer);
|
clearTimeout(typingTtlTimer);
|
||||||
typingTtlTimer = undefined;
|
typingTtlTimer = undefined;
|
||||||
@@ -52,9 +57,11 @@ export function createTypingController(params: {
|
|||||||
typingTimer = undefined;
|
typingTimer = undefined;
|
||||||
}
|
}
|
||||||
resetCycle();
|
resetCycle();
|
||||||
|
sealed = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
const refreshTypingTtl = () => {
|
const refreshTypingTtl = () => {
|
||||||
|
if (sealed) return;
|
||||||
if (!typingIntervalMs || typingIntervalMs <= 0) return;
|
if (!typingIntervalMs || typingIntervalMs <= 0) return;
|
||||||
if (typingTtlMs <= 0) return;
|
if (typingTtlMs <= 0) return;
|
||||||
if (typingTtlTimer) {
|
if (typingTtlTimer) {
|
||||||
@@ -70,10 +77,14 @@ export function createTypingController(params: {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const triggerTyping = async () => {
|
const triggerTyping = async () => {
|
||||||
|
if (sealed) return;
|
||||||
await onReplyStart?.();
|
await onReplyStart?.();
|
||||||
};
|
};
|
||||||
|
|
||||||
const ensureStart = async () => {
|
const ensureStart = async () => {
|
||||||
|
if (sealed) return;
|
||||||
|
// Late callbacks after a run completed should never restart typing.
|
||||||
|
if (runComplete) return;
|
||||||
if (!active) {
|
if (!active) {
|
||||||
active = true;
|
active = true;
|
||||||
}
|
}
|
||||||
@@ -89,6 +100,7 @@ export function createTypingController(params: {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const startTypingLoop = async () => {
|
const startTypingLoop = async () => {
|
||||||
|
if (sealed) return;
|
||||||
if (!onReplyStart) return;
|
if (!onReplyStart) return;
|
||||||
if (typingIntervalMs <= 0) return;
|
if (typingIntervalMs <= 0) return;
|
||||||
if (typingTimer) return;
|
if (typingTimer) return;
|
||||||
@@ -100,6 +112,7 @@ export function createTypingController(params: {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const startTypingOnText = async (text?: string) => {
|
const startTypingOnText = async (text?: string) => {
|
||||||
|
if (sealed) return;
|
||||||
const trimmed = text?.trim();
|
const trimmed = text?.trim();
|
||||||
if (!trimmed) return;
|
if (!trimmed) return;
|
||||||
if (silentToken && trimmed === silentToken) return;
|
if (silentToken && trimmed === silentToken) return;
|
||||||
|
|||||||
Reference in New Issue
Block a user