diff --git a/src/agents/pi-embedded-runner.test.ts b/src/agents/pi-embedded-runner.test.ts index 6c85bff4b..c1a7ffae7 100644 --- a/src/agents/pi-embedded-runner.test.ts +++ b/src/agents/pi-embedded-runner.test.ts @@ -11,6 +11,8 @@ import { applyGoogleTurnOrderingFix, buildEmbeddedSandboxInfo, createSystemPromptOverride, + getDmHistoryLimitFromSessionKey, + limitHistoryTurns, runEmbeddedPiAgent, splitSdkTools, } from "./pi-embedded-runner.js"; @@ -279,6 +281,280 @@ describe("applyGoogleTurnOrderingFix", () => { }); }); +describe("limitHistoryTurns", () => { + const makeMessages = (roles: ("user" | "assistant")[]): AgentMessage[] => + roles.map((role, i) => ({ + role, + content: [{ type: "text", text: `message ${i}` }], + })); + + it("returns all messages when limit is undefined", () => { + const messages = makeMessages(["user", "assistant", "user", "assistant"]); + expect(limitHistoryTurns(messages, undefined)).toBe(messages); + }); + + it("returns all messages when limit is 0", () => { + const messages = makeMessages(["user", "assistant", "user", "assistant"]); + expect(limitHistoryTurns(messages, 0)).toBe(messages); + }); + + it("returns all messages when limit is negative", () => { + const messages = makeMessages(["user", "assistant", "user", "assistant"]); + expect(limitHistoryTurns(messages, -1)).toBe(messages); + }); + + it("returns empty array when messages is empty", () => { + expect(limitHistoryTurns([], 5)).toEqual([]); + }); + + it("keeps all messages when fewer user turns than limit", () => { + const messages = makeMessages(["user", "assistant", "user", "assistant"]); + expect(limitHistoryTurns(messages, 10)).toBe(messages); + }); + + it("limits to last N user turns", () => { + const messages = makeMessages([ + "user", + "assistant", + "user", + "assistant", + "user", + "assistant", + ]); + const limited = limitHistoryTurns(messages, 2); + expect(limited.length).toBe(4); + expect(limited[0].content).toEqual([{ type: "text", text: "message 2" }]); + }); + + it("handles single user turn limit", () => { + const messages = makeMessages([ + "user", + "assistant", + "user", + "assistant", + "user", + "assistant", + ]); + const limited = limitHistoryTurns(messages, 1); + expect(limited.length).toBe(2); + expect(limited[0].content).toEqual([{ type: "text", text: "message 4" }]); + expect(limited[1].content).toEqual([{ type: "text", text: "message 5" }]); + }); + + it("handles messages with multiple assistant responses per user turn", () => { + const messages = makeMessages([ + "user", + "assistant", + "assistant", + "user", + "assistant", + ]); + const limited = limitHistoryTurns(messages, 1); + expect(limited.length).toBe(2); + expect(limited[0].role).toBe("user"); + expect(limited[1].role).toBe("assistant"); + }); + + it("preserves message content integrity", () => { + const messages: AgentMessage[] = [ + { role: "user", content: [{ type: "text", text: "first" }] }, + { + role: "assistant", + content: [{ type: "toolCall", id: "1", name: "bash", arguments: {} }], + }, + { role: "user", content: [{ type: "text", text: "second" }] }, + { role: "assistant", content: [{ type: "text", text: "response" }] }, + ]; + const limited = limitHistoryTurns(messages, 1); + expect(limited[0].content).toEqual([{ type: "text", text: "second" }]); + expect(limited[1].content).toEqual([{ type: "text", text: "response" }]); + }); +}); + +describe("getDmHistoryLimitFromSessionKey", () => { + it("returns undefined when sessionKey is undefined", () => { + expect(getDmHistoryLimitFromSessionKey(undefined, {})).toBeUndefined(); + }); + + it("returns undefined when config is undefined", () => { + expect( + getDmHistoryLimitFromSessionKey("telegram:dm:123", undefined), + ).toBeUndefined(); + }); + + it("returns dmHistoryLimit for telegram provider", () => { + const config = { telegram: { dmHistoryLimit: 15 } } as ClawdbotConfig; + expect(getDmHistoryLimitFromSessionKey("telegram:dm:123", config)).toBe(15); + }); + + it("returns dmHistoryLimit for whatsapp provider", () => { + const config = { whatsapp: { dmHistoryLimit: 20 } } as ClawdbotConfig; + expect(getDmHistoryLimitFromSessionKey("whatsapp:dm:123", config)).toBe(20); + }); + + it("returns dmHistoryLimit for agent-prefixed session keys", () => { + const config = { telegram: { dmHistoryLimit: 10 } } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123", config), + ).toBe(10); + }); + + it("returns undefined for non-dm session kinds", () => { + const config = { + slack: { dmHistoryLimit: 10 }, + telegram: { dmHistoryLimit: 15 }, + } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey("agent:beta:slack:channel:C1", config), + ).toBeUndefined(); + expect( + getDmHistoryLimitFromSessionKey("telegram:slash:123", config), + ).toBeUndefined(); + }); + + it("returns undefined for unknown provider", () => { + const config = { telegram: { dmHistoryLimit: 15 } } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey("unknown:dm:123", config), + ).toBeUndefined(); + }); + + it("returns undefined when provider config has no dmHistoryLimit", () => { + const config = { telegram: {} } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey("telegram:dm:123", config), + ).toBeUndefined(); + }); + + it("handles all supported providers", () => { + const providers = [ + "telegram", + "whatsapp", + "discord", + "slack", + "signal", + "imessage", + "msteams", + ] as const; + + for (const provider of providers) { + const config = { [provider]: { dmHistoryLimit: 5 } } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey(`${provider}:dm:123`, config), + ).toBe(5); + } + }); + + it("handles per-DM overrides for all supported providers", () => { + const providers = [ + "telegram", + "whatsapp", + "discord", + "slack", + "signal", + "imessage", + "msteams", + ] as const; + + for (const provider of providers) { + // Test per-DM override takes precedence + const configWithOverride = { + [provider]: { + dmHistoryLimit: 20, + dms: { user123: { historyLimit: 7 } }, + }, + } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey( + `${provider}:dm:user123`, + configWithOverride, + ), + ).toBe(7); + + // Test fallback to provider default when user not in dms + expect( + getDmHistoryLimitFromSessionKey( + `${provider}:dm:otheruser`, + configWithOverride, + ), + ).toBe(20); + + // Test with agent-prefixed key + expect( + getDmHistoryLimitFromSessionKey( + `agent:main:${provider}:dm:user123`, + configWithOverride, + ), + ).toBe(7); + } + }); + + it("returns per-DM override when set", () => { + const config = { + telegram: { + dmHistoryLimit: 15, + dms: { "123": { historyLimit: 5 } }, + }, + } as ClawdbotConfig; + expect(getDmHistoryLimitFromSessionKey("telegram:dm:123", config)).toBe(5); + }); + + it("falls back to provider default when per-DM not set", () => { + const config = { + telegram: { + dmHistoryLimit: 15, + dms: { "456": { historyLimit: 5 } }, + }, + } as ClawdbotConfig; + expect(getDmHistoryLimitFromSessionKey("telegram:dm:123", config)).toBe(15); + }); + + it("returns per-DM override for agent-prefixed keys", () => { + const config = { + telegram: { + dmHistoryLimit: 20, + dms: { "789": { historyLimit: 3 } }, + }, + } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:789", config), + ).toBe(3); + }); + + it("handles userId with colons (e.g., email)", () => { + const config = { + msteams: { + dmHistoryLimit: 10, + dms: { "user@example.com": { historyLimit: 7 } }, + }, + } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey("msteams:dm:user@example.com", config), + ).toBe(7); + }); + + it("returns undefined when per-DM historyLimit is not set", () => { + const config = { + telegram: { + dms: { "123": {} }, + }, + } as ClawdbotConfig; + expect( + getDmHistoryLimitFromSessionKey("telegram:dm:123", config), + ).toBeUndefined(); + }); + + it("returns 0 when per-DM historyLimit is explicitly 0 (unlimited)", () => { + const config = { + telegram: { + dmHistoryLimit: 15, + dms: { "123": { historyLimit: 0 } }, + }, + } as ClawdbotConfig; + expect(getDmHistoryLimitFromSessionKey("telegram:dm:123", config)).toBe(0); + }); +}); + describe("runEmbeddedPiAgent", () => { it("writes models.json into the provided agentDir", async () => { const agentDir = await fs.mkdtemp( diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 48516bf54..3932f49fd 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -413,6 +413,113 @@ async function sanitizeSessionHistory(params: { }).messages; } +/** + * Limits conversation history to the last N user turns (and their associated + * assistant responses). This reduces token usage for long-running DM sessions. + * + * @param messages - The full message history + * @param limit - Max number of user turns to keep (undefined = no limit) + * @returns Messages trimmed to the last `limit` user turns + */ +export function limitHistoryTurns( + messages: AgentMessage[], + limit: number | undefined, +): AgentMessage[] { + if (!limit || limit <= 0 || messages.length === 0) return messages; + + // Count user messages from the end, find cutoff point + let userCount = 0; + let lastUserIndex = messages.length; + + for (let i = messages.length - 1; i >= 0; i--) { + if (messages[i].role === "user") { + userCount++; + if (userCount > limit) { + // We exceeded the limit; keep from the last valid user turn onwards + return messages.slice(lastUserIndex); + } + lastUserIndex = i; + } + } + // Fewer than limit user turns, keep all + return messages; +} + +/** + * Extracts the provider name and user ID from a session key and looks up + * dmHistoryLimit from the provider config, with per-DM override support. + * + * Session key formats: + * - `telegram:dm:123` → provider = telegram, userId = 123 + * - `agent:main:telegram:dm:123` → provider = telegram, userId = 123 + * + * Resolution order: + * 1. Per-DM override: provider.dms[userId].historyLimit + * 2. Provider default: provider.dmHistoryLimit + */ +export function getDmHistoryLimitFromSessionKey( + sessionKey: string | undefined, + config: ClawdbotConfig | undefined, +): number | undefined { + if (!sessionKey || !config) return undefined; + + const parts = sessionKey.split(":").filter(Boolean); + // Handle agent-prefixed keys: agent:::... + const providerParts = + parts.length >= 3 && parts[0] === "agent" ? parts.slice(2) : parts; + + const provider = providerParts[0]?.toLowerCase(); + if (!provider) return undefined; + + // Extract userId: format is provider:dm:userId or provider:dm:userId:... + // The userId may contain colons (e.g., email addresses), so join remaining parts + const kind = providerParts[1]?.toLowerCase(); + const userId = providerParts.slice(2).join(":"); + if (kind !== "dm") return undefined; + + // Helper to get limit with per-DM override support + const getLimit = ( + providerConfig: + | { + dmHistoryLimit?: number; + dms?: Record; + } + | undefined, + ): number | undefined => { + if (!providerConfig) return undefined; + // Check per-DM override first + if ( + userId && + kind === "dm" && + providerConfig.dms?.[userId]?.historyLimit !== undefined + ) { + return providerConfig.dms[userId].historyLimit; + } + // Fall back to provider default + return providerConfig.dmHistoryLimit; + }; + + // Map provider to config key + switch (provider) { + case "telegram": + return getLimit(config.telegram); + case "whatsapp": + return getLimit(config.whatsapp); + case "discord": + return getLimit(config.discord); + case "slack": + return getLimit(config.slack); + case "signal": + return getLimit(config.signal); + case "imessage": + return getLimit(config.imessage); + case "msteams": + return getLimit(config.msteams); + default: + return undefined; + } +} + const ACTIVE_EMBEDDED_RUNS = new Map(); type EmbeddedRunWaiter = { resolve: (ended: boolean) => void; @@ -1026,8 +1133,12 @@ export async function compactEmbeddedPiSession(params: { sessionId: params.sessionId, }); const validated = validateGeminiTurns(prior); - if (validated.length > 0) { - session.agent.replaceMessages(validated); + const limited = limitHistoryTurns( + validated, + getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), + ); + if (limited.length > 0) { + session.agent.replaceMessages(limited); } const result = await session.compact(params.customInstructions); return { @@ -1417,8 +1528,12 @@ export async function runEmbeddedPiAgent(params: { sessionId: params.sessionId, }); const validated = validateGeminiTurns(prior); - if (validated.length > 0) { - session.agent.replaceMessages(validated); + const limited = limitHistoryTurns( + validated, + getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), + ); + if (limited.length > 0) { + session.agent.replaceMessages(limited); } } catch (err) { session.dispose(); diff --git a/src/config/types.ts b/src/config/types.ts index 8a827a0ee..9bf8b6f06 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -147,6 +147,10 @@ export type WhatsAppConfig = { groupPolicy?: GroupPolicy; /** Max group messages to keep as history context (0 disables). */ historyLimit?: number; + /** Max DM turns to keep as history context. */ + dmHistoryLimit?: number; + /** Per-DM config overrides keyed by user ID. */ + dms?: Record; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; /** Maximum media file size in MB. Default: 50. */ @@ -200,6 +204,10 @@ export type WhatsAppAccountConfig = { groupPolicy?: GroupPolicy; /** Max group messages to keep as history context (0 disables). */ historyLimit?: number; + /** Max DM turns to keep as history context. */ + dmHistoryLimit?: number; + /** Per-DM config overrides keyed by user ID. */ + dms?: Record; textChunkLimit?: number; mediaMaxMb?: number; blockStreaming?: boolean; @@ -379,6 +387,10 @@ export type TelegramAccountConfig = { groupPolicy?: GroupPolicy; /** Max group messages to keep as history context (0 disables). */ historyLimit?: number; + /** Max DM turns to keep as history context. */ + dmHistoryLimit?: number; + /** Per-DM config overrides keyed by user ID. */ + dms?: Record; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; /** Disable block streaming for this account. */ @@ -522,6 +534,10 @@ export type DiscordAccountConfig = { maxLinesPerMessage?: number; mediaMaxMb?: number; historyLimit?: number; + /** Max DM turns to keep as history context. */ + dmHistoryLimit?: number; + /** Per-DM config overrides keyed by user ID. */ + dms?: Record; /** Retry policy for outbound Discord API calls. */ retry?: OutboundRetryConfig; /** Per-action tool gating (default: true for all). */ @@ -618,6 +634,10 @@ export type SlackAccountConfig = { groupPolicy?: GroupPolicy; /** Max channel messages to keep as history context (0 disables). */ historyLimit?: number; + /** Max DM turns to keep as history context. */ + dmHistoryLimit?: number; + /** Per-DM config overrides keyed by user ID. */ + dms?: Record; textChunkLimit?: number; blockStreaming?: boolean; /** Merge streamed block replies before sending. */ @@ -677,6 +697,10 @@ export type SignalAccountConfig = { groupPolicy?: GroupPolicy; /** Max group messages to keep as history context (0 disables). */ historyLimit?: number; + /** Max DM turns to keep as history context. */ + dmHistoryLimit?: number; + /** Per-DM config overrides keyed by user ID. */ + dms?: Record; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; blockStreaming?: boolean; @@ -752,6 +776,10 @@ export type MSTeamsConfig = { requireMention?: boolean; /** Max group/channel messages to keep as history context (0 disables). */ historyLimit?: number; + /** Max DM turns to keep as history context. */ + dmHistoryLimit?: number; + /** Per-DM config overrides keyed by user ID. */ + dms?: Record; /** Default reply style: "thread" replies to the message, "top-level" posts a new message. */ replyStyle?: MSTeamsReplyStyle; /** Per-team config. Key is team ID (from the /team/ URL path segment). */ @@ -788,6 +816,10 @@ export type IMessageAccountConfig = { groupPolicy?: GroupPolicy; /** Max group messages to keep as history context (0 disables). */ historyLimit?: number; + /** Max DM turns to keep as history context. */ + dmHistoryLimit?: number; + /** Per-DM config overrides keyed by user ID. */ + dms?: Record; /** Include attachments + reactions in watch payloads. */ includeAttachments?: boolean; /** Max outbound media size in MB. */ @@ -925,6 +957,10 @@ export type GroupChatConfig = { historyLimit?: number; }; +export type DmConfig = { + historyLimit?: number; +}; + export type QueueConfig = { mode?: QueueMode; byProvider?: QueueModeByProvider; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index f7fa0c7c9..c30dba4bd 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -62,6 +62,10 @@ const GroupChatSchema = z }) .optional(); +const DmConfigSchema = z.object({ + historyLimit: z.number().int().min(0).optional(), +}); + const IdentitySchema = z .object({ name: z.string().optional(), @@ -273,6 +277,8 @@ const TelegramAccountSchemaBase = z.object({ groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + dms: z.record(z.string(), DmConfigSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), draftChunk: BlockStreamingChunkSchema.optional(), @@ -362,6 +368,8 @@ const DiscordAccountSchema = z.object({ token: z.string().optional(), groupPolicy: GroupPolicySchema.optional().default("open"), historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + dms: z.record(z.string(), DmConfigSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), @@ -434,6 +442,8 @@ const SlackAccountSchema = z.object({ allowBots: z.boolean().optional(), groupPolicy: GroupPolicySchema.optional().default("open"), historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + dms: z.record(z.string(), DmConfigSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), @@ -488,6 +498,8 @@ const SignalAccountSchemaBase = z.object({ groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + dms: z.record(z.string(), DmConfigSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), @@ -536,6 +548,8 @@ const IMessageAccountSchemaBase = z.object({ groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + dms: z.record(z.string(), DmConfigSchema.optional()).optional(), includeAttachments: z.boolean().optional(), mediaMaxMb: z.number().int().positive().optional(), textChunkLimit: z.number().int().positive().optional(), @@ -610,6 +624,8 @@ const MSTeamsConfigSchema = z mediaAllowHosts: z.array(z.string()).optional(), requireMention: z.boolean().optional(), historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + dms: z.record(z.string(), DmConfigSchema.optional()).optional(), replyStyle: MSTeamsReplyStyleSchema.optional(), teams: z.record(z.string(), MSTeamsTeamSchema.optional()).optional(), }) @@ -1354,6 +1370,8 @@ export const ClawdbotSchema = z groupAllowFrom: z.array(z.string()).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + dms: z.record(z.string(), DmConfigSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), mediaMaxMb: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), @@ -1403,6 +1421,8 @@ export const ClawdbotSchema = z groupAllowFrom: z.array(z.string()).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + dms: z.record(z.string(), DmConfigSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), mediaMaxMb: z.number().int().positive().optional().default(50), blockStreaming: z.boolean().optional(),