Merge pull request #728 from pkrmf/feature/dm-history-limit

feat: add configurable DM history limits with per-chat overrides
This commit is contained in:
Ayaan Zaidi
2026-01-11 22:25:12 +05:30
committed by GitHub
4 changed files with 451 additions and 4 deletions

View File

@@ -11,6 +11,8 @@ import {
applyGoogleTurnOrderingFix,
buildEmbeddedSandboxInfo,
createSystemPromptOverride,
getDmHistoryLimitFromSessionKey,
limitHistoryTurns,
runEmbeddedPiAgent,
splitSdkTools,
} from "./pi-embedded-runner.js";
@@ -279,6 +281,280 @@ describe("applyGoogleTurnOrderingFix", () => {
});
});
describe("limitHistoryTurns", () => {
const makeMessages = (roles: ("user" | "assistant")[]): AgentMessage[] =>
roles.map((role, i) => ({
role,
content: [{ type: "text", text: `message ${i}` }],
}));
it("returns all messages when limit is undefined", () => {
const messages = makeMessages(["user", "assistant", "user", "assistant"]);
expect(limitHistoryTurns(messages, undefined)).toBe(messages);
});
it("returns all messages when limit is 0", () => {
const messages = makeMessages(["user", "assistant", "user", "assistant"]);
expect(limitHistoryTurns(messages, 0)).toBe(messages);
});
it("returns all messages when limit is negative", () => {
const messages = makeMessages(["user", "assistant", "user", "assistant"]);
expect(limitHistoryTurns(messages, -1)).toBe(messages);
});
it("returns empty array when messages is empty", () => {
expect(limitHistoryTurns([], 5)).toEqual([]);
});
it("keeps all messages when fewer user turns than limit", () => {
const messages = makeMessages(["user", "assistant", "user", "assistant"]);
expect(limitHistoryTurns(messages, 10)).toBe(messages);
});
it("limits to last N user turns", () => {
const messages = makeMessages([
"user",
"assistant",
"user",
"assistant",
"user",
"assistant",
]);
const limited = limitHistoryTurns(messages, 2);
expect(limited.length).toBe(4);
expect(limited[0].content).toEqual([{ type: "text", text: "message 2" }]);
});
it("handles single user turn limit", () => {
const messages = makeMessages([
"user",
"assistant",
"user",
"assistant",
"user",
"assistant",
]);
const limited = limitHistoryTurns(messages, 1);
expect(limited.length).toBe(2);
expect(limited[0].content).toEqual([{ type: "text", text: "message 4" }]);
expect(limited[1].content).toEqual([{ type: "text", text: "message 5" }]);
});
it("handles messages with multiple assistant responses per user turn", () => {
const messages = makeMessages([
"user",
"assistant",
"assistant",
"user",
"assistant",
]);
const limited = limitHistoryTurns(messages, 1);
expect(limited.length).toBe(2);
expect(limited[0].role).toBe("user");
expect(limited[1].role).toBe("assistant");
});
it("preserves message content integrity", () => {
const messages: AgentMessage[] = [
{ role: "user", content: [{ type: "text", text: "first" }] },
{
role: "assistant",
content: [{ type: "toolCall", id: "1", name: "bash", arguments: {} }],
},
{ role: "user", content: [{ type: "text", text: "second" }] },
{ role: "assistant", content: [{ type: "text", text: "response" }] },
];
const limited = limitHistoryTurns(messages, 1);
expect(limited[0].content).toEqual([{ type: "text", text: "second" }]);
expect(limited[1].content).toEqual([{ type: "text", text: "response" }]);
});
});
describe("getDmHistoryLimitFromSessionKey", () => {
it("returns undefined when sessionKey is undefined", () => {
expect(getDmHistoryLimitFromSessionKey(undefined, {})).toBeUndefined();
});
it("returns undefined when config is undefined", () => {
expect(
getDmHistoryLimitFromSessionKey("telegram:dm:123", undefined),
).toBeUndefined();
});
it("returns dmHistoryLimit for telegram provider", () => {
const config = { telegram: { dmHistoryLimit: 15 } } as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("telegram:dm:123", config)).toBe(15);
});
it("returns dmHistoryLimit for whatsapp provider", () => {
const config = { whatsapp: { dmHistoryLimit: 20 } } as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("whatsapp:dm:123", config)).toBe(20);
});
it("returns dmHistoryLimit for agent-prefixed session keys", () => {
const config = { telegram: { dmHistoryLimit: 10 } } as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123", config),
).toBe(10);
});
it("returns undefined for non-dm session kinds", () => {
const config = {
slack: { dmHistoryLimit: 10 },
telegram: { dmHistoryLimit: 15 },
} as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey("agent:beta:slack:channel:C1", config),
).toBeUndefined();
expect(
getDmHistoryLimitFromSessionKey("telegram:slash:123", config),
).toBeUndefined();
});
it("returns undefined for unknown provider", () => {
const config = { telegram: { dmHistoryLimit: 15 } } as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey("unknown:dm:123", config),
).toBeUndefined();
});
it("returns undefined when provider config has no dmHistoryLimit", () => {
const config = { telegram: {} } as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey("telegram:dm:123", config),
).toBeUndefined();
});
it("handles all supported providers", () => {
const providers = [
"telegram",
"whatsapp",
"discord",
"slack",
"signal",
"imessage",
"msteams",
] as const;
for (const provider of providers) {
const config = { [provider]: { dmHistoryLimit: 5 } } as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey(`${provider}:dm:123`, config),
).toBe(5);
}
});
it("handles per-DM overrides for all supported providers", () => {
const providers = [
"telegram",
"whatsapp",
"discord",
"slack",
"signal",
"imessage",
"msteams",
] as const;
for (const provider of providers) {
// Test per-DM override takes precedence
const configWithOverride = {
[provider]: {
dmHistoryLimit: 20,
dms: { user123: { historyLimit: 7 } },
},
} as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey(
`${provider}:dm:user123`,
configWithOverride,
),
).toBe(7);
// Test fallback to provider default when user not in dms
expect(
getDmHistoryLimitFromSessionKey(
`${provider}:dm:otheruser`,
configWithOverride,
),
).toBe(20);
// Test with agent-prefixed key
expect(
getDmHistoryLimitFromSessionKey(
`agent:main:${provider}:dm:user123`,
configWithOverride,
),
).toBe(7);
}
});
it("returns per-DM override when set", () => {
const config = {
telegram: {
dmHistoryLimit: 15,
dms: { "123": { historyLimit: 5 } },
},
} as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("telegram:dm:123", config)).toBe(5);
});
it("falls back to provider default when per-DM not set", () => {
const config = {
telegram: {
dmHistoryLimit: 15,
dms: { "456": { historyLimit: 5 } },
},
} as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("telegram:dm:123", config)).toBe(15);
});
it("returns per-DM override for agent-prefixed keys", () => {
const config = {
telegram: {
dmHistoryLimit: 20,
dms: { "789": { historyLimit: 3 } },
},
} as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:789", config),
).toBe(3);
});
it("handles userId with colons (e.g., email)", () => {
const config = {
msteams: {
dmHistoryLimit: 10,
dms: { "user@example.com": { historyLimit: 7 } },
},
} as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey("msteams:dm:user@example.com", config),
).toBe(7);
});
it("returns undefined when per-DM historyLimit is not set", () => {
const config = {
telegram: {
dms: { "123": {} },
},
} as ClawdbotConfig;
expect(
getDmHistoryLimitFromSessionKey("telegram:dm:123", config),
).toBeUndefined();
});
it("returns 0 when per-DM historyLimit is explicitly 0 (unlimited)", () => {
const config = {
telegram: {
dmHistoryLimit: 15,
dms: { "123": { historyLimit: 0 } },
},
} as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("telegram:dm:123", config)).toBe(0);
});
});
describe("runEmbeddedPiAgent", () => {
it("writes models.json into the provided agentDir", async () => {
const agentDir = await fs.mkdtemp(

View File

@@ -413,6 +413,113 @@ async function sanitizeSessionHistory(params: {
}).messages;
}
/**
* Limits conversation history to the last N user turns (and their associated
* assistant responses). This reduces token usage for long-running DM sessions.
*
* @param messages - The full message history
* @param limit - Max number of user turns to keep (undefined = no limit)
* @returns Messages trimmed to the last `limit` user turns
*/
export function limitHistoryTurns(
messages: AgentMessage[],
limit: number | undefined,
): AgentMessage[] {
if (!limit || limit <= 0 || messages.length === 0) return messages;
// Count user messages from the end, find cutoff point
let userCount = 0;
let lastUserIndex = messages.length;
for (let i = messages.length - 1; i >= 0; i--) {
if (messages[i].role === "user") {
userCount++;
if (userCount > limit) {
// We exceeded the limit; keep from the last valid user turn onwards
return messages.slice(lastUserIndex);
}
lastUserIndex = i;
}
}
// Fewer than limit user turns, keep all
return messages;
}
/**
* Extracts the provider name and user ID from a session key and looks up
* dmHistoryLimit from the provider config, with per-DM override support.
*
* Session key formats:
* - `telegram:dm:123` → provider = telegram, userId = 123
* - `agent:main:telegram:dm:123` → provider = telegram, userId = 123
*
* Resolution order:
* 1. Per-DM override: provider.dms[userId].historyLimit
* 2. Provider default: provider.dmHistoryLimit
*/
export function getDmHistoryLimitFromSessionKey(
sessionKey: string | undefined,
config: ClawdbotConfig | undefined,
): number | undefined {
if (!sessionKey || !config) return undefined;
const parts = sessionKey.split(":").filter(Boolean);
// Handle agent-prefixed keys: agent:<agentId>:<provider>:...
const providerParts =
parts.length >= 3 && parts[0] === "agent" ? parts.slice(2) : parts;
const provider = providerParts[0]?.toLowerCase();
if (!provider) return undefined;
// Extract userId: format is provider:dm:userId or provider:dm:userId:...
// The userId may contain colons (e.g., email addresses), so join remaining parts
const kind = providerParts[1]?.toLowerCase();
const userId = providerParts.slice(2).join(":");
if (kind !== "dm") return undefined;
// Helper to get limit with per-DM override support
const getLimit = (
providerConfig:
| {
dmHistoryLimit?: number;
dms?: Record<string, { historyLimit?: number }>;
}
| undefined,
): number | undefined => {
if (!providerConfig) return undefined;
// Check per-DM override first
if (
userId &&
kind === "dm" &&
providerConfig.dms?.[userId]?.historyLimit !== undefined
) {
return providerConfig.dms[userId].historyLimit;
}
// Fall back to provider default
return providerConfig.dmHistoryLimit;
};
// Map provider to config key
switch (provider) {
case "telegram":
return getLimit(config.telegram);
case "whatsapp":
return getLimit(config.whatsapp);
case "discord":
return getLimit(config.discord);
case "slack":
return getLimit(config.slack);
case "signal":
return getLimit(config.signal);
case "imessage":
return getLimit(config.imessage);
case "msteams":
return getLimit(config.msteams);
default:
return undefined;
}
}
const ACTIVE_EMBEDDED_RUNS = new Map<string, EmbeddedPiQueueHandle>();
type EmbeddedRunWaiter = {
resolve: (ended: boolean) => void;
@@ -1026,8 +1133,12 @@ export async function compactEmbeddedPiSession(params: {
sessionId: params.sessionId,
});
const validated = validateGeminiTurns(prior);
if (validated.length > 0) {
session.agent.replaceMessages(validated);
const limited = limitHistoryTurns(
validated,
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
);
if (limited.length > 0) {
session.agent.replaceMessages(limited);
}
const result = await session.compact(params.customInstructions);
return {
@@ -1417,8 +1528,12 @@ export async function runEmbeddedPiAgent(params: {
sessionId: params.sessionId,
});
const validated = validateGeminiTurns(prior);
if (validated.length > 0) {
session.agent.replaceMessages(validated);
const limited = limitHistoryTurns(
validated,
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
);
if (limited.length > 0) {
session.agent.replaceMessages(limited);
}
} catch (err) {
session.dispose();

View File

@@ -147,6 +147,10 @@ export type WhatsAppConfig = {
groupPolicy?: GroupPolicy;
/** Max group messages to keep as history context (0 disables). */
historyLimit?: number;
/** Max DM turns to keep as history context. */
dmHistoryLimit?: number;
/** Per-DM config overrides keyed by user ID. */
dms?: Record<string, DmConfig>;
/** Outbound text chunk size (chars). Default: 4000. */
textChunkLimit?: number;
/** Maximum media file size in MB. Default: 50. */
@@ -200,6 +204,10 @@ export type WhatsAppAccountConfig = {
groupPolicy?: GroupPolicy;
/** Max group messages to keep as history context (0 disables). */
historyLimit?: number;
/** Max DM turns to keep as history context. */
dmHistoryLimit?: number;
/** Per-DM config overrides keyed by user ID. */
dms?: Record<string, DmConfig>;
textChunkLimit?: number;
mediaMaxMb?: number;
blockStreaming?: boolean;
@@ -379,6 +387,10 @@ export type TelegramAccountConfig = {
groupPolicy?: GroupPolicy;
/** Max group messages to keep as history context (0 disables). */
historyLimit?: number;
/** Max DM turns to keep as history context. */
dmHistoryLimit?: number;
/** Per-DM config overrides keyed by user ID. */
dms?: Record<string, DmConfig>;
/** Outbound text chunk size (chars). Default: 4000. */
textChunkLimit?: number;
/** Disable block streaming for this account. */
@@ -522,6 +534,10 @@ export type DiscordAccountConfig = {
maxLinesPerMessage?: number;
mediaMaxMb?: number;
historyLimit?: number;
/** Max DM turns to keep as history context. */
dmHistoryLimit?: number;
/** Per-DM config overrides keyed by user ID. */
dms?: Record<string, DmConfig>;
/** Retry policy for outbound Discord API calls. */
retry?: OutboundRetryConfig;
/** Per-action tool gating (default: true for all). */
@@ -618,6 +634,10 @@ export type SlackAccountConfig = {
groupPolicy?: GroupPolicy;
/** Max channel messages to keep as history context (0 disables). */
historyLimit?: number;
/** Max DM turns to keep as history context. */
dmHistoryLimit?: number;
/** Per-DM config overrides keyed by user ID. */
dms?: Record<string, DmConfig>;
textChunkLimit?: number;
blockStreaming?: boolean;
/** Merge streamed block replies before sending. */
@@ -677,6 +697,10 @@ export type SignalAccountConfig = {
groupPolicy?: GroupPolicy;
/** Max group messages to keep as history context (0 disables). */
historyLimit?: number;
/** Max DM turns to keep as history context. */
dmHistoryLimit?: number;
/** Per-DM config overrides keyed by user ID. */
dms?: Record<string, DmConfig>;
/** Outbound text chunk size (chars). Default: 4000. */
textChunkLimit?: number;
blockStreaming?: boolean;
@@ -752,6 +776,10 @@ export type MSTeamsConfig = {
requireMention?: boolean;
/** Max group/channel messages to keep as history context (0 disables). */
historyLimit?: number;
/** Max DM turns to keep as history context. */
dmHistoryLimit?: number;
/** Per-DM config overrides keyed by user ID. */
dms?: Record<string, DmConfig>;
/** Default reply style: "thread" replies to the message, "top-level" posts a new message. */
replyStyle?: MSTeamsReplyStyle;
/** Per-team config. Key is team ID (from the /team/ URL path segment). */
@@ -788,6 +816,10 @@ export type IMessageAccountConfig = {
groupPolicy?: GroupPolicy;
/** Max group messages to keep as history context (0 disables). */
historyLimit?: number;
/** Max DM turns to keep as history context. */
dmHistoryLimit?: number;
/** Per-DM config overrides keyed by user ID. */
dms?: Record<string, DmConfig>;
/** Include attachments + reactions in watch payloads. */
includeAttachments?: boolean;
/** Max outbound media size in MB. */
@@ -925,6 +957,10 @@ export type GroupChatConfig = {
historyLimit?: number;
};
export type DmConfig = {
historyLimit?: number;
};
export type QueueConfig = {
mode?: QueueMode;
byProvider?: QueueModeByProvider;

View File

@@ -62,6 +62,10 @@ const GroupChatSchema = z
})
.optional();
const DmConfigSchema = z.object({
historyLimit: z.number().int().min(0).optional(),
});
const IdentitySchema = z
.object({
name: z.string().optional(),
@@ -273,6 +277,8 @@ const TelegramAccountSchemaBase = z.object({
groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(),
groupPolicy: GroupPolicySchema.optional().default("open"),
historyLimit: z.number().int().min(0).optional(),
dmHistoryLimit: z.number().int().min(0).optional(),
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
textChunkLimit: z.number().int().positive().optional(),
blockStreaming: z.boolean().optional(),
draftChunk: BlockStreamingChunkSchema.optional(),
@@ -362,6 +368,8 @@ const DiscordAccountSchema = z.object({
token: z.string().optional(),
groupPolicy: GroupPolicySchema.optional().default("open"),
historyLimit: z.number().int().min(0).optional(),
dmHistoryLimit: z.number().int().min(0).optional(),
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
textChunkLimit: z.number().int().positive().optional(),
blockStreaming: z.boolean().optional(),
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
@@ -434,6 +442,8 @@ const SlackAccountSchema = z.object({
allowBots: z.boolean().optional(),
groupPolicy: GroupPolicySchema.optional().default("open"),
historyLimit: z.number().int().min(0).optional(),
dmHistoryLimit: z.number().int().min(0).optional(),
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
textChunkLimit: z.number().int().positive().optional(),
blockStreaming: z.boolean().optional(),
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
@@ -488,6 +498,8 @@ const SignalAccountSchemaBase = z.object({
groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(),
groupPolicy: GroupPolicySchema.optional().default("open"),
historyLimit: z.number().int().min(0).optional(),
dmHistoryLimit: z.number().int().min(0).optional(),
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
textChunkLimit: z.number().int().positive().optional(),
blockStreaming: z.boolean().optional(),
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
@@ -536,6 +548,8 @@ const IMessageAccountSchemaBase = z.object({
groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(),
groupPolicy: GroupPolicySchema.optional().default("open"),
historyLimit: z.number().int().min(0).optional(),
dmHistoryLimit: z.number().int().min(0).optional(),
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
includeAttachments: z.boolean().optional(),
mediaMaxMb: z.number().int().positive().optional(),
textChunkLimit: z.number().int().positive().optional(),
@@ -610,6 +624,8 @@ const MSTeamsConfigSchema = z
mediaAllowHosts: z.array(z.string()).optional(),
requireMention: z.boolean().optional(),
historyLimit: z.number().int().min(0).optional(),
dmHistoryLimit: z.number().int().min(0).optional(),
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
replyStyle: MSTeamsReplyStyleSchema.optional(),
teams: z.record(z.string(), MSTeamsTeamSchema.optional()).optional(),
})
@@ -1354,6 +1370,8 @@ export const ClawdbotSchema = z
groupAllowFrom: z.array(z.string()).optional(),
groupPolicy: GroupPolicySchema.optional().default("open"),
historyLimit: z.number().int().min(0).optional(),
dmHistoryLimit: z.number().int().min(0).optional(),
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
textChunkLimit: z.number().int().positive().optional(),
mediaMaxMb: z.number().int().positive().optional(),
blockStreaming: z.boolean().optional(),
@@ -1403,6 +1421,8 @@ export const ClawdbotSchema = z
groupAllowFrom: z.array(z.string()).optional(),
groupPolicy: GroupPolicySchema.optional().default("open"),
historyLimit: z.number().int().min(0).optional(),
dmHistoryLimit: z.number().int().min(0).optional(),
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
textChunkLimit: z.number().int().positive().optional(),
mediaMaxMb: z.number().int().positive().optional().default(50),
blockStreaming: z.boolean().optional(),