refactor: share queue helpers

Co-authored-by: adam91holt <adam91holt@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-01-17 06:01:37 +00:00
parent 285ed8bac3
commit c7ae5100fa
4 changed files with 149 additions and 173 deletions

View File

@@ -1,84 +1,14 @@
import { defaultRuntime } from "../../../runtime.js";
import {
buildCollectPrompt,
buildQueueSummaryPrompt,
hasCrossChannelItems,
waitForQueueDebounce,
} from "../../../utils/queue-helpers.js";
import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
import type { FollowupRun } from "./types.js";
async function waitForQueueDebounce(queue: { debounceMs: number; lastEnqueuedAt: number }) {
const debounceMs = Math.max(0, queue.debounceMs);
if (debounceMs <= 0) return;
while (true) {
const since = Date.now() - queue.lastEnqueuedAt;
if (since >= debounceMs) return;
await new Promise((resolve) => setTimeout(resolve, debounceMs - since));
}
}
function buildSummaryPrompt(queue: {
dropPolicy: "summarize" | "old" | "new";
droppedCount: number;
summaryLines: string[];
}): string | undefined {
if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) {
return undefined;
}
const lines = [
`[Queue overflow] Dropped ${queue.droppedCount} message${queue.droppedCount === 1 ? "" : "s"} due to cap.`,
];
if (queue.summaryLines.length > 0) {
lines.push("Summary:");
for (const line of queue.summaryLines) {
lines.push(`- ${line}`);
}
}
queue.droppedCount = 0;
queue.summaryLines = [];
return lines.join("\\n");
}
function buildCollectPrompt(items: FollowupRun[], summary?: string): string {
const blocks: string[] = ["[Queued messages while agent was busy]"];
if (summary) blocks.push(summary);
items.forEach((item, idx) => {
blocks.push(`---\\nQueued #${idx + 1}\\n${item.prompt}`.trim());
});
return blocks.join("\\n\\n");
}
/**
* Checks if queued items have different routable originating channels.
*
* Returns true if messages come from different channels (e.g., Slack + Telegram),
* meaning they cannot be safely collected into one prompt without losing routing.
* Also returns true for a mix of routable and non-routable channels.
*/
function hasCrossChannelItems(items: FollowupRun[]): boolean {
const keys = new Set<string>();
let hasUnkeyed = false;
for (const item of items) {
const channel = item.originatingChannel;
const to = item.originatingTo;
const accountId = item.originatingAccountId;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && typeof threadId !== "number") {
hasUnkeyed = true;
continue;
}
if (!isRoutableChannel(channel) || !to) {
return true;
}
keys.add(
[channel, to, accountId || "", typeof threadId === "number" ? String(threadId) : ""].join(
"|",
),
);
}
if (keys.size === 0) return false;
if (hasUnkeyed) return true;
return keys.size > 1;
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
@@ -105,7 +35,22 @@ export function scheduleFollowupDrain(
// Check if messages span multiple channels.
// If so, process individually to preserve per-message routing.
const isCrossChannel = hasCrossChannelItems(queue.items);
const isCrossChannel = hasCrossChannelItems(queue.items, (item) => {
const channel = item.originatingChannel;
const to = item.originatingTo;
const accountId = item.originatingAccountId;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && typeof threadId !== "number") {
return {};
}
if (!isRoutableChannel(channel) || !to) {
return { cross: true };
}
const threadKey = typeof threadId === "number" ? String(threadId) : "";
return {
key: [channel, to, accountId || "", threadKey].join("|"),
};
});
if (isCrossChannel) {
forceIndividualCollect = true;
@@ -116,7 +61,7 @@ export function scheduleFollowupDrain(
}
const items = queue.items.splice(0, queue.items.length);
const summary = buildSummaryPrompt(queue);
const summary = buildQueueSummaryPrompt({ state: queue, noun: "message" });
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) break;
@@ -130,7 +75,12 @@ export function scheduleFollowupDrain(
(i) => typeof i.originatingThreadId === "number",
)?.originatingThreadId;
const prompt = buildCollectPrompt(items, summary);
const prompt = buildCollectPrompt({
title: "[Queued messages while agent was busy]",
items,
summary,
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
});
await runFollowup({
prompt,
run,
@@ -143,7 +93,7 @@ export function scheduleFollowupDrain(
continue;
}
const summaryPrompt = buildSummaryPrompt(queue);
const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "message" });
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) break;

View File

@@ -1,17 +1,7 @@
import { buildQueueSummaryLine } from "../../../utils/queue-helpers.js";
import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js";
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
function elideText(text: string, limit = 140): string {
if (text.length <= limit) return text;
return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}`;
}
function buildQueueSummaryLine(run: FollowupRun): string {
const base = run.summaryLine?.trim() || run.prompt.trim();
const cleaned = base.replace(/\\s+/g, " ").trim();
return elideText(cleaned, 160);
}
function isRunAlreadyQueued(
run: FollowupRun,
items: FollowupRun[],
@@ -62,7 +52,8 @@ export function enqueueFollowupRun(
if (queue.dropPolicy === "summarize") {
for (const item of dropped) {
queue.droppedCount += 1;
queue.summaryLines.push(buildQueueSummaryLine(item));
const base = item.summaryLine?.trim() || item.prompt.trim();
queue.summaryLines.push(buildQueueSummaryLine(base));
}
while (queue.summaryLines.length > cap) queue.summaryLines.shift();
}