feat: unify provider history context

This commit is contained in:
Peter Steinberger
2026-01-10 18:53:33 +01:00
parent 8c1d39064d
commit d41372b9d9
19 changed files with 718 additions and 80 deletions

View File

@@ -1,5 +1,7 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js";
import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js";
import { monitorSlackProvider } from "./monitor.js";
const sendMock = vi.fn();
@@ -218,6 +220,68 @@ describe("monitorSlackProvider tool results", () => {
expect(sendMock.mock.calls[1][1]).toBe("final reply");
});
it("wraps room history in Body and preserves RawBody", async () => {
config = {
messages: { ackReactionScope: "group-mentions" },
slack: {
historyLimit: 5,
dm: { enabled: true, policy: "open", allowFrom: ["*"] },
channels: { "*": { requireMention: false } },
},
};
let capturedCtx: { Body?: string; RawBody?: string; CommandBody?: string } =
{};
replyMock.mockImplementation(async (ctx) => {
capturedCtx = ctx ?? {};
return undefined;
});
const controller = new AbortController();
const run = monitorSlackProvider({
botToken: "bot-token",
appToken: "app-token",
abortSignal: controller.signal,
});
await waitForEvent("message");
const handler = getSlackHandlers()?.get("message");
if (!handler) throw new Error("Slack message handler not registered");
await handler({
event: {
type: "message",
user: "U1",
text: "first",
ts: "123",
channel: "C1",
channel_type: "channel",
},
});
await handler({
event: {
type: "message",
user: "U2",
text: "second",
ts: "124",
channel: "C1",
channel_type: "channel",
},
});
await flush();
controller.abort();
await run;
expect(replyMock).toHaveBeenCalledTimes(2);
expect(capturedCtx.Body).toContain(HISTORY_CONTEXT_MARKER);
expect(capturedCtx.Body).toContain(CURRENT_MESSAGE_MARKER);
expect(capturedCtx.Body).toContain("first");
expect(capturedCtx.RawBody).toBe("second");
expect(capturedCtx.CommandBody).toBe("second");
});
it("updates assistant thread status when replies start", async () => {
replyMock.mockImplementation(async (_ctx, opts) => {
await opts?.onReplyStart?.();

View File

@@ -28,6 +28,12 @@ import {
buildMentionRegexes,
matchesMentionPatterns,
} from "../auto-reply/reply/mentions.js";
import {
DEFAULT_GROUP_HISTORY_LIMIT,
appendHistoryEntry,
buildHistoryContextFromEntries,
type HistoryEntry,
} from "../auto-reply/reply/history.js";
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
@@ -430,6 +436,13 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
cfg,
accountId: opts.accountId,
});
const historyLimit = Math.max(
0,
account.config.historyLimit ??
cfg.messages?.groupChat?.historyLimit ??
DEFAULT_GROUP_HISTORY_LIMIT,
);
const channelHistories = new Map<string, HistoryEntry[]>();
const sessionCfg = cfg.session;
const sessionScope = sessionCfg?.scope ?? "per-sender";
const mainKey = normalizeMainKey(sessionCfg?.mainKey);
@@ -954,6 +967,23 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
: null;
const roomLabel = channelName ? `#${channelName}` : `#${message.channel}`;
const isRoomish = isRoom || isGroupDm;
const historyKey = message.channel;
if (isRoomish && historyLimit > 0) {
appendHistoryEntry({
historyMap: channelHistories,
historyKey,
limit: historyLimit,
entry: {
sender: senderName,
body: rawBody,
timestamp: message.ts
? Math.round(Number(message.ts) * 1000)
: undefined,
messageId: message.ts,
},
});
}
const preview = rawBody.replace(/\s+/g, " ").slice(0, 160);
const inboundLabel = isDirectMessage
@@ -989,7 +1019,25 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
body: textWithId,
});
const isRoomish = isRoom || isGroupDm;
let combinedBody = body;
if (isRoomish && historyLimit > 0) {
const history = channelHistories.get(historyKey) ?? [];
combinedBody = buildHistoryContextFromEntries({
entries: history,
currentMessage: combinedBody,
formatEntry: (entry) =>
formatAgentEnvelope({
provider: "Slack",
from: roomLabel,
timestamp: entry.timestamp,
body: `${entry.sender}: ${entry.body}${
entry.messageId
? ` [id:${entry.messageId} channel:${message.channel}]`
: ""
}`,
}),
});
}
const slackTo = isDirectMessage
? `user:${message.user}`
: `channel:${message.channel}`;
@@ -1033,7 +1081,9 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
}
}
const ctxPayload = {
Body: body,
Body: combinedBody,
RawBody: rawBody,
CommandBody: rawBody,
From: slackFrom,
To: slackTo,
SessionKey: sessionKey,
@@ -1106,6 +1156,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
status: "is typing...",
});
};
let didSendReply = false;
const { dispatcher, replyOptions, markDispatchIdle } =
createReplyDispatcherWithTyping({
responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId)
@@ -1127,6 +1178,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
textLimit,
replyThreadTs: effectiveThreadTs,
});
didSendReply = true;
hasRepliedRef.value = true;
},
onError: (err, info) => {
@@ -1166,7 +1218,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
status: "",
});
}
if (!queuedFinal) return;
if (!queuedFinal) {
if (isRoomish && historyLimit > 0 && didSendReply) {
channelHistories.set(historyKey, []);
}
return;
}
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(
@@ -1187,6 +1244,9 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
});
});
}
if (isRoomish && historyLimit > 0 && didSendReply) {
channelHistories.set(historyKey, []);
}
};
app.event(