fix(chat): reduce system spam and cap history

This commit is contained in:
Peter Steinberger
2025-12-16 20:35:03 +01:00
parent 49a9f74753
commit e1e3da946f
4 changed files with 202 additions and 46 deletions

View File

@@ -676,23 +676,24 @@ export async function getReplyFromConfig(
}
}
// Prepend queued system events and (for new main sessions) a provider snapshot.
// For new main sessions, prepend a provider snapshot.
// Note: We intentionally do NOT prepend queued system events to the user prompt,
// since that bloats session logs (token cost) and clutters chat history.
const isGroupSession =
typeof ctx.From === "string" &&
(ctx.From.includes("@g.us") || ctx.From.startsWith("group:"));
const isMainSession =
!isGroupSession && sessionKey === (sessionCfg?.mainKey ?? "main");
if (isMainSession) {
const systemLines: string[] = [];
const queued = drainSystemEvents();
systemLines.push(...queued);
// Drain (discard) queued system events so they remain ephemeral.
// They are still available via presence/health in the gateway UI.
drainSystemEvents();
if (isNewSession) {
const summary = await buildProviderSummary(cfg);
if (summary.length > 0) systemLines.unshift(...summary);
}
if (systemLines.length > 0) {
const block = systemLines.map((l) => `System: ${l}`).join("\n");
prefixedBodyBase = `${block}\n\n${prefixedBodyBase}`;
if (summary.length > 0) {
const block = summary.map((l) => `System: ${l}`).join("\n");
prefixedBodyBase = `${block}\n\n${prefixedBodyBase}`;
}
}
}
if (

View File

@@ -1832,14 +1832,10 @@ describe("gateway server", () => {
expect(cappedMsgs.length).toBe(200);
expect(firstContentText(cappedMsgs[0])).toBe("b1300");
const maxRes = await rpcReq<{ messages?: unknown[] }>(
ws,
"chat.history",
{
sessionKey: "main",
limit: 1000,
},
);
const maxRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", {
sessionKey: "main",
limit: 1000,
});
expect(maxRes.ok).toBe(true);
const maxMsgs = maxRes.payload?.messages ?? [];
expect(maxMsgs.length).toBe(1000);
@@ -1849,6 +1845,97 @@ describe("gateway server", () => {
await server.close();
});
test("chat.history strips injected System blocks and caps payload bytes", async () => {
const firstContentText = (msg: unknown): string | undefined => {
if (!msg || typeof msg !== "object") return undefined;
const content = (msg as { content?: unknown }).content;
if (!Array.isArray(content) || content.length === 0) return undefined;
const first = content[0];
if (!first || typeof first !== "object") return undefined;
const text = (first as { text?: unknown }).text;
return typeof text === "string" ? text : undefined;
};
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
testSessionStorePath = path.join(dir, "sessions.json");
await fs.writeFile(
testSessionStorePath,
JSON.stringify(
{
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
},
null,
2,
),
"utf-8",
);
const injected =
"System: Node: Peters Mac · app 2.0.0 · last input 0s ago · mode local · reason periodic\n" +
"System: WhatsApp gateway connected.\n\n" +
"Hello from user";
await fs.writeFile(
path.join(dir, "sess-main.jsonl"),
JSON.stringify({
message: {
role: "user",
content: [{ type: "text", text: injected }],
timestamp: Date.now(),
},
}),
"utf-8",
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const scrubbedRes = await rpcReq<{ messages?: unknown[] }>(
ws,
"chat.history",
{ sessionKey: "main", limit: 5 },
);
expect(scrubbedRes.ok).toBe(true);
const scrubbedMsgs = scrubbedRes.payload?.messages ?? [];
expect(scrubbedMsgs.length).toBe(1);
expect(firstContentText(scrubbedMsgs[0])).toBe("Hello from user");
const bigText = "x".repeat(300_000);
const largeLines: string[] = [];
for (let i = 0; i < 60; i += 1) {
largeLines.push(
JSON.stringify({
message: {
role: "user",
content: [{ type: "text", text: `${i}:${bigText}` }],
timestamp: Date.now() + i,
},
}),
);
}
await fs.writeFile(
path.join(dir, "sess-main.jsonl"),
largeLines.join("\n"),
"utf-8",
);
const cappedRes = await rpcReq<{ messages?: unknown[] }>(
ws,
"chat.history",
{ sessionKey: "main", limit: 1000 },
);
expect(cappedRes.ok).toBe(true);
const cappedMsgs = cappedRes.payload?.messages ?? [];
const bytes = Buffer.byteLength(JSON.stringify(cappedMsgs), "utf8");
expect(bytes).toBeLessThanOrEqual(6 * 1024 * 1024);
expect(cappedMsgs.length).toBeLessThan(60);
ws.close();
await server.close();
});
test("chat.send does not overwrite last delivery route", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
testSessionStorePath = path.join(dir, "sessions.json");

View File

@@ -254,6 +254,7 @@ function buildSnapshot(): Snapshot {
const MAX_PAYLOAD_BYTES = 512 * 1024; // cap incoming frame size
const MAX_BUFFERED_BYTES = 1.5 * 1024 * 1024; // per-connection send buffer limit
const MAX_CHAT_HISTORY_MESSAGES_BYTES = 6 * 1024 * 1024; // keep history responses comfortably under client WS limits
const HANDSHAKE_TIMEOUT_MS = 10_000;
const TICK_INTERVAL_MS = 30_000;
const HEALTH_REFRESH_INTERVAL_MS = 60_000;
@@ -350,6 +351,66 @@ function readSessionMessages(
return messages;
}
function stripInjectedSystemBlock(text: string): string {
if (!text.startsWith("System: ")) return text;
const sep = text.indexOf("\n\n");
if (sep <= 0) return text;
const head = text.slice(0, sep);
const lines = head.split("\n");
if (lines.length === 0) return text;
if (!lines.every((l) => l.startsWith("System: "))) return text;
return text.slice(sep + 2);
}
function scrubInjectedSystemBlocks(messages: unknown[]): unknown[] {
let changed = false;
const out = messages.map((msg) => {
if (!msg || typeof msg !== "object") return msg;
const obj = msg as Record<string, unknown>;
if (obj.role !== "user") return msg;
const content = obj.content;
if (!Array.isArray(content) || content.length === 0) return msg;
const first = content[0];
if (!first || typeof first !== "object") return msg;
const firstObj = first as Record<string, unknown>;
if (firstObj.type !== "text") return msg;
const text = firstObj.text;
if (typeof text !== "string") return msg;
const stripped = stripInjectedSystemBlock(text);
if (stripped === text) return msg;
changed = true;
const nextFirst = { ...firstObj, text: stripped };
const nextContent = [...content];
nextContent[0] = nextFirst;
return { ...obj, content: nextContent };
});
return changed ? out : messages;
}
function jsonUtf8Bytes(value: unknown): number {
try {
return Buffer.byteLength(JSON.stringify(value), "utf8");
} catch {
return Buffer.byteLength(String(value), "utf8");
}
}
function capArrayByJsonBytes<T>(
items: T[],
maxBytes: number,
): { items: T[]; bytes: number } {
if (items.length === 0) return { items, bytes: 2 };
const parts = items.map((item) => jsonUtf8Bytes(item));
let bytes = 2 + parts.reduce((a, b) => a + b, 0) + (items.length - 1); // [] + commas
let start = 0;
while (bytes > maxBytes && start < items.length - 1) {
bytes -= parts[start] + 1; // item + comma
start += 1;
}
const next = start > 0 ? items.slice(start) : items;
return { items: next, bytes };
}
function loadSessionEntry(sessionKey: string) {
const cfg = loadConfig();
const sessionCfg = cfg.inbound?.reply?.session;
@@ -853,8 +914,13 @@ export async function startGatewayServer(
? readSessionMessages(sessionId, storePath)
: [];
const max = typeof limit === "number" ? limit : 200;
const messages =
const sliced =
rawMessages.length > max ? rawMessages.slice(-max) : rawMessages;
const scrubbed = scrubInjectedSystemBlocks(sliced);
const capped = capArrayByJsonBytes(
scrubbed,
MAX_CHAT_HISTORY_MESSAGES_BYTES,
).items;
const thinkingLevel =
entry?.thinkingLevel ??
loadConfig().inbound?.reply?.thinkingDefault ??
@@ -864,7 +930,7 @@ export async function startGatewayServer(
payloadJSON: JSON.stringify({
sessionKey,
sessionId,
messages,
messages: capped,
thinkingLevel,
}),
};
@@ -1827,13 +1893,23 @@ export async function startGatewayServer(
const defaultLimit = 200;
const requested = typeof limit === "number" ? limit : defaultLimit;
const max = Math.min(hardMax, requested);
const messages =
const sliced =
rawMessages.length > max ? rawMessages.slice(-max) : rawMessages;
const scrubbed = scrubInjectedSystemBlocks(sliced);
const capped = capArrayByJsonBytes(
scrubbed,
MAX_CHAT_HISTORY_MESSAGES_BYTES,
).items;
const thinkingLevel =
entry?.thinkingLevel ??
loadConfig().inbound?.reply?.thinkingDefault ??
"off";
respond(true, { sessionKey, sessionId, messages, thinkingLevel });
respond(true, {
sessionKey,
sessionId,
messages: capped,
thinkingLevel,
});
break;
}
case "chat.send": {
@@ -2336,7 +2412,14 @@ export async function startGatewayServer(
reason,
tags,
});
enqueueSystemEvent(text);
const normalizedReason = (reason ?? "").toLowerCase();
const looksPeriodic =
normalizedReason.startsWith("periodic") ||
normalizedReason === "heartbeat";
const isNodePresenceLine = text.startsWith("Node:");
if (!(isNodePresenceLine && looksPeriodic)) {
enqueueSystemEvent(text);
}
presenceVersion += 1;
broadcast(
"presence",