From d41372b9d9bce72cb7aee6497148ebd93226b5c9 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 10 Jan 2026 18:53:33 +0100 Subject: [PATCH] feat: unify provider history context --- src/auto-reply/reply.raw-body.test.ts | 87 +++++++++++++++++++++++++++ src/auto-reply/reply.ts | 35 +++++++---- src/auto-reply/reply/abort.ts | 6 +- src/auto-reply/reply/commands.ts | 2 +- src/auto-reply/reply/history.test.ts | 63 +++++++++++++++++++ src/auto-reply/reply/history.ts | 64 ++++++++++++++++++++ src/auto-reply/reply/session.ts | 12 ++-- src/auto-reply/templating.ts | 6 +- src/config/config.test.ts | 28 +++++++++ src/config/types.ts | 14 +++++ src/config/zod-schema.ts | 9 ++- src/discord/monitor.ts | 76 +++++++++++------------ src/imessage/monitor.ts | 60 +++++++++++++++++- src/msteams/monitor-handler.ts | 58 +++++++++++++++++- src/signal/monitor.ts | 61 ++++++++++++++++++- src/slack/monitor.tool-result.test.ts | 64 ++++++++++++++++++++ src/slack/monitor.ts | 66 +++++++++++++++++++- src/telegram/bot.ts | 64 +++++++++++++++++++- src/web/auto-reply.ts | 23 ++++--- 19 files changed, 718 insertions(+), 80 deletions(-) create mode 100644 src/auto-reply/reply/history.test.ts create mode 100644 src/auto-reply/reply/history.ts diff --git a/src/auto-reply/reply.raw-body.test.ts b/src/auto-reply/reply.raw-body.test.ts index 640006abb..8f4562210 100644 --- a/src/auto-reply/reply.raw-body.test.ts +++ b/src/auto-reply/reply.raw-body.test.ts @@ -114,6 +114,39 @@ describe("RawBody directive parsing", () => { }); }); + it("CommandBody is honored when RawBody is missing", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + + const groupMessageCtx = { + Body: `[Context]\nJake: /verbose on\n[from: Jake]`, + CommandBody: "/verbose on", + From: "+1222", + To: "+1222", + ChatType: "group", + }; + + const res = await getReplyFromConfig( + groupMessageCtx, + {}, + { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toContain("Verbose logging enabled."); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + }); + }); + it("Integration: WhatsApp group message with structural wrapper and RawBody command", async () => { await withTempHome(async (home) => { vi.mocked(runEmbeddedPiAgent).mockReset(); @@ -151,4 +184,58 @@ describe("RawBody directive parsing", () => { expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); }); }); + + it("preserves history when RawBody is provided for command parsing", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 1, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const groupMessageCtx = { + Body: [ + "[Chat messages since your last reply - for context]", + "[WhatsApp ...] Peter: hello", + "", + "[Current message - respond to this]", + "[WhatsApp ...] Jake: /think:high status please", + "[from: Jake McInteer (+6421807830)]", + ].join("\n"), + RawBody: "/think:high status please", + From: "+1222", + To: "+1222", + ChatType: "group", + }; + + const res = await getReplyFromConfig( + groupMessageCtx, + {}, + { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toBe("ok"); + expect(runEmbeddedPiAgent).toHaveBeenCalledOnce(); + const prompt = + vi.mocked(runEmbeddedPiAgent).mock.calls[0]?.[0]?.prompt ?? ""; + expect(prompt).toContain( + "[Chat messages since your last reply - for context]", + ); + expect(prompt).toContain("Peter: hello"); + expect(prompt).toContain("status please"); + expect(prompt).not.toContain("/think:high"); + }); + }); }); diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 6ffe877f5..b66af6286 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -340,10 +340,14 @@ export async function getReplyFromConfig( triggerBodyNormalized, } = sessionState; - // Prefer RawBody (clean message without structural context) for directive parsing. + // Prefer CommandBody/RawBody (clean message without structural context) for directive parsing. // Keep `Body`/`BodyStripped` as the best-available prompt text (may include context). - const rawBody = - sessionCtx.RawBody ?? sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; + const commandSource = + sessionCtx.CommandBody ?? + sessionCtx.RawBody ?? + sessionCtx.BodyStripped ?? + sessionCtx.Body ?? + ""; const clearInlineDirectives = (cleaned: string): InlineDirectives => ({ cleaned, hasThinkDirective: false, @@ -382,7 +386,7 @@ export async function getReplyFromConfig( .map((entry) => entry.alias?.trim()) .filter((alias): alias is string => Boolean(alias)) .filter((alias) => !reservedCommands.has(alias.toLowerCase())); - let parsedDirectives = parseInlineDirectives(rawBody, { + let parsedDirectives = parseInlineDirectives(commandSource, { modelAliases: configuredAliases, }); if ( @@ -436,7 +440,7 @@ export async function getReplyFromConfig( const existingBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; const cleanedBody = (() => { if (!existingBody) return parsedDirectives.cleaned; - if (!sessionCtx.RawBody) { + if (!sessionCtx.CommandBody && !sessionCtx.RawBody) { return parseInlineDirectives(existingBody, { modelAliases: configuredAliases, }).cleaned; @@ -786,14 +790,19 @@ export async function getReplyFromConfig( .filter(Boolean) .join("\n\n"); const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; - // Use RawBody for bare reset detection (clean message without structural context). - const rawBodyTrimmed = (ctx.RawBody ?? ctx.Body ?? "").trim(); + // Use CommandBody/RawBody for bare reset detection (clean message without structural context). + const rawBodyTrimmed = ( + ctx.CommandBody ?? + ctx.RawBody ?? + ctx.Body ?? + "" + ).trim(); const baseBodyTrimmedRaw = baseBody.trim(); if ( allowTextCommands && !commandAuthorized && !baseBodyTrimmedRaw && - hasControlCommand(rawBody) + hasControlCommand(commandSource) ) { typing.cleanup(); return undefined; @@ -863,18 +872,18 @@ export async function getReplyFromConfig( const mediaReplyHint = mediaNote ? "To send an image back, add a line like: MEDIA:https://example.com/image.jpg (no spaces). Keep caption in the text body." : undefined; - let commandBody = mediaNote + let prefixedCommandBody = mediaNote ? [mediaNote, mediaReplyHint, prefixedBody ?? ""] .filter(Boolean) .join("\n") .trim() : prefixedBody; - if (!resolvedThinkLevel && commandBody) { - const parts = commandBody.split(/\s+/); + if (!resolvedThinkLevel && prefixedCommandBody) { + const parts = prefixedCommandBody.split(/\s+/); const maybeLevel = normalizeThinkLevel(parts[0]); if (maybeLevel) { resolvedThinkLevel = maybeLevel; - commandBody = parts.slice(1).join(" ").trim(); + prefixedCommandBody = parts.slice(1).join(" ").trim(); } } if (!resolvedThinkLevel) { @@ -968,7 +977,7 @@ export async function getReplyFromConfig( } return runReplyAgent({ - commandBody, + commandBody: prefixedCommandBody, followupRun, queueKey, resolvedQueue, diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index a33dc4b7d..97d092354 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -81,8 +81,10 @@ export async function tryFastAbortFromMessage(params: { sessionKey: targetKey ?? ctx.SessionKey ?? "", config: cfg, }); - // Use RawBody for abort detection (clean message without structural context). - const raw = stripStructuralPrefixes(ctx.RawBody ?? ctx.Body ?? ""); + // Use RawBody/CommandBody for abort detection (clean message without structural context). + const raw = stripStructuralPrefixes( + ctx.CommandBody ?? ctx.RawBody ?? ctx.Body ?? "", + ); const isGroup = ctx.ChatType?.trim().toLowerCase() === "group"; const stripped = isGroup ? stripMentions(raw, ctx, cfg, agentId) : raw; const normalized = normalizeCommandBody(stripped); diff --git a/src/auto-reply/reply/commands.ts b/src/auto-reply/reply/commands.ts index 894550896..49d8bd589 100644 --- a/src/auto-reply/reply/commands.ts +++ b/src/auto-reply/reply/commands.ts @@ -900,7 +900,7 @@ export async function handleCommands(params: { await waitForEmbeddedPiRunEnd(sessionId, 15_000); } const customInstructions = extractCompactInstructions({ - rawBody: ctx.Body, + rawBody: ctx.CommandBody ?? ctx.RawBody ?? ctx.Body, ctx, cfg, agentId: params.agentId, diff --git a/src/auto-reply/reply/history.test.ts b/src/auto-reply/reply/history.test.ts new file mode 100644 index 000000000..8838e8c8e --- /dev/null +++ b/src/auto-reply/reply/history.test.ts @@ -0,0 +1,63 @@ +import { describe, expect, it } from "vitest"; +import { CURRENT_MESSAGE_MARKER } from "./mentions.js"; +import { + HISTORY_CONTEXT_MARKER, + appendHistoryEntry, + buildHistoryContext, + buildHistoryContextFromEntries, +} from "./history.js"; + +describe("history helpers", () => { + it("returns current message when history is empty", () => { + const result = buildHistoryContext({ + historyText: " ", + currentMessage: "hello", + }); + expect(result).toBe("hello"); + }); + + it("wraps history entries and excludes current by default", () => { + const result = buildHistoryContextFromEntries({ + entries: [ + { sender: "A", body: "one" }, + { sender: "B", body: "two" }, + ], + currentMessage: "current", + formatEntry: (entry) => `${entry.sender}: ${entry.body}`, + }); + + expect(result).toContain(HISTORY_CONTEXT_MARKER); + expect(result).toContain("A: one"); + expect(result).not.toContain("B: two"); + expect(result).toContain(CURRENT_MESSAGE_MARKER); + expect(result).toContain("current"); + }); + + it("trims history to configured limit", () => { + const historyMap = new Map(); + + appendHistoryEntry({ + historyMap, + historyKey: "room", + limit: 2, + entry: { sender: "A", body: "one" }, + }); + appendHistoryEntry({ + historyMap, + historyKey: "room", + limit: 2, + entry: { sender: "B", body: "two" }, + }); + appendHistoryEntry({ + historyMap, + historyKey: "room", + limit: 2, + entry: { sender: "C", body: "three" }, + }); + + expect(historyMap.get("room")?.map((entry) => entry.body)).toEqual([ + "two", + "three", + ]); + }); +}); diff --git a/src/auto-reply/reply/history.ts b/src/auto-reply/reply/history.ts new file mode 100644 index 000000000..2a39ea970 --- /dev/null +++ b/src/auto-reply/reply/history.ts @@ -0,0 +1,64 @@ +import { CURRENT_MESSAGE_MARKER } from "./mentions.js"; + +export const HISTORY_CONTEXT_MARKER = + "[Chat messages since your last reply - for context]"; +export const DEFAULT_GROUP_HISTORY_LIMIT = 50; + +export type HistoryEntry = { + sender: string; + body: string; + timestamp?: number; + messageId?: string; +}; + +export function buildHistoryContext(params: { + historyText: string; + currentMessage: string; + lineBreak?: string; +}): string { + const { historyText, currentMessage } = params; + const lineBreak = params.lineBreak ?? "\n"; + if (!historyText.trim()) return currentMessage; + return [ + HISTORY_CONTEXT_MARKER, + historyText, + "", + CURRENT_MESSAGE_MARKER, + currentMessage, + ].join(lineBreak); +} + +export function appendHistoryEntry(params: { + historyMap: Map; + historyKey: string; + entry: HistoryEntry; + limit: number; +}): HistoryEntry[] { + const { historyMap, historyKey, entry } = params; + if (params.limit <= 0) return []; + const history = historyMap.get(historyKey) ?? []; + history.push(entry); + while (history.length > params.limit) history.shift(); + historyMap.set(historyKey, history); + return history; +} + +export function buildHistoryContextFromEntries(params: { + entries: HistoryEntry[]; + currentMessage: string; + formatEntry: (entry: HistoryEntry) => string; + lineBreak?: string; + excludeLast?: boolean; +}): string { + const lineBreak = params.lineBreak ?? "\n"; + const entries = params.excludeLast === false + ? params.entries + : params.entries.slice(0, -1); + if (entries.length === 0) return params.currentMessage; + const historyText = entries.map(params.formatEntry).join(lineBreak); + return buildHistoryContext({ + historyText, + currentMessage: params.currentMessage, + lineBreak, + }); +} diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index fb769a6c3..89a1458c2 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -136,15 +136,15 @@ export async function initSessionState(params: { resolveGroupSessionKey(sessionCtxForState) ?? undefined; const isGroup = ctx.ChatType?.trim().toLowerCase() === "group" || Boolean(groupResolution); - // Prefer RawBody (clean message) for command detection; fall back to Body - // which may contain structural context (history, sender labels). - const commandSource = ctx.RawBody ?? ctx.Body ?? ""; + // Prefer CommandBody/RawBody (clean message) for command detection; fall back + // to Body which may contain structural context (history, sender labels). + const commandSource = ctx.CommandBody ?? ctx.RawBody ?? ctx.Body ?? ""; const triggerBodyNormalized = stripStructuralPrefixes(commandSource) .trim() .toLowerCase(); - // Use RawBody for reset trigger matching (clean message without structural context). - const rawBody = ctx.RawBody ?? ctx.Body ?? ""; + // Use CommandBody/RawBody for reset trigger matching (clean message without structural context). + const rawBody = commandSource; const trimmedBody = rawBody.trim(); const resetAuthorized = resolveCommandAuthorization({ ctx, @@ -290,7 +290,7 @@ export async function initSessionState(params: { ...ctx, // Keep BodyStripped aligned with Body (best default for agent prompts). // RawBody is reserved for command/directive parsing and may omit context. - BodyStripped: bodyStripped ?? ctx.Body ?? ctx.RawBody, + BodyStripped: bodyStripped ?? ctx.Body ?? ctx.CommandBody ?? ctx.RawBody, SessionId: sessionId, IsNewSession: isNewSession ? "true" : "false", }; diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index 4c5a7cf91..86182d856 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -13,9 +13,13 @@ export type MsgContext = { Body?: string; /** * Raw message body without structural context (history, sender labels). - * Used for command detection. Falls back to Body if not set. + * Legacy alias for CommandBody. Falls back to Body if not set. */ RawBody?: string; + /** + * Prefer for command detection; RawBody is treated as legacy alias. + */ + CommandBody?: string; From?: string; To?: string; SessionKey?: string; diff --git a/src/config/config.test.ts b/src/config/config.test.ts index 1670f4e9d..13c114451 100644 --- a/src/config/config.test.ts +++ b/src/config/config.test.ts @@ -1225,6 +1225,34 @@ describe("legacy config detection", () => { } }); + it("accepts historyLimit overrides per provider and account", async () => { + vi.resetModules(); + const { validateConfigObject } = await import("./config.js"); + const res = validateConfigObject({ + messages: { groupChat: { historyLimit: 12 } }, + whatsapp: { historyLimit: 9, accounts: { work: { historyLimit: 4 } } }, + telegram: { historyLimit: 8, accounts: { ops: { historyLimit: 3 } } }, + slack: { historyLimit: 7, accounts: { ops: { historyLimit: 2 } } }, + signal: { historyLimit: 6 }, + imessage: { historyLimit: 5 }, + msteams: { historyLimit: 4 }, + discord: { historyLimit: 3 }, + }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.config.whatsapp?.historyLimit).toBe(9); + expect(res.config.whatsapp?.accounts?.work?.historyLimit).toBe(4); + expect(res.config.telegram?.historyLimit).toBe(8); + expect(res.config.telegram?.accounts?.ops?.historyLimit).toBe(3); + expect(res.config.slack?.historyLimit).toBe(7); + expect(res.config.slack?.accounts?.ops?.historyLimit).toBe(2); + expect(res.config.signal?.historyLimit).toBe(6); + expect(res.config.imessage?.historyLimit).toBe(5); + expect(res.config.msteams?.historyLimit).toBe(4); + expect(res.config.discord?.historyLimit).toBe(3); + } + }); + it('rejects imessage.dmPolicy="open" without allowFrom "*"', async () => { vi.resetModules(); const { validateConfigObject } = await import("./config.js"); diff --git a/src/config/types.ts b/src/config/types.ts index 36c2b3f4d..6ca0d56e0 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -151,6 +151,8 @@ export type WhatsAppConfig = { * - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom */ groupPolicy?: GroupPolicy; + /** Max group messages to keep as history context (0 disables). */ + historyLimit?: number; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; /** Maximum media file size in MB. Default: 50. */ @@ -187,6 +189,8 @@ export type WhatsAppAccountConfig = { allowFrom?: string[]; groupAllowFrom?: string[]; groupPolicy?: GroupPolicy; + /** Max group messages to keep as history context (0 disables). */ + historyLimit?: number; textChunkLimit?: number; mediaMaxMb?: number; blockStreaming?: boolean; @@ -347,6 +351,8 @@ export type TelegramAccountConfig = { * - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom */ groupPolicy?: GroupPolicy; + /** Max group messages to keep as history context (0 disables). */ + historyLimit?: number; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; /** Disable block streaming for this account. */ @@ -584,6 +590,8 @@ export type SlackAccountConfig = { * - "allowlist": only allow channels present in slack.channels */ groupPolicy?: GroupPolicy; + /** Max channel messages to keep as history context (0 disables). */ + historyLimit?: number; textChunkLimit?: number; blockStreaming?: boolean; /** Merge streamed block replies before sending. */ @@ -641,6 +649,8 @@ export type SignalAccountConfig = { * - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom */ groupPolicy?: GroupPolicy; + /** Max group messages to keep as history context (0 disables). */ + historyLimit?: number; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; blockStreaming?: boolean; @@ -714,6 +724,8 @@ export type MSTeamsConfig = { mediaAllowHosts?: Array; /** Default: require @mention to respond in channels/groups. */ requireMention?: boolean; + /** Max group/channel messages to keep as history context (0 disables). */ + historyLimit?: number; /** Default reply style: "thread" replies to the message, "top-level" posts a new message. */ replyStyle?: MSTeamsReplyStyle; /** Per-team config. Key is team ID (from the /team/ URL path segment). */ @@ -748,6 +760,8 @@ export type IMessageAccountConfig = { * - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom */ groupPolicy?: GroupPolicy; + /** Max group messages to keep as history context (0 disables). */ + historyLimit?: number; /** Include attachments + reactions in watch payloads. */ includeAttachments?: boolean; /** Max outbound media size in MB. */ diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index c63db5593..44212317a 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -217,6 +217,7 @@ const TelegramAccountSchemaBase = z.object({ allowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), + historyLimit: z.number().int().min(0).optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), draftChunk: BlockStreamingChunkSchema.optional(), @@ -305,12 +306,12 @@ const DiscordAccountSchema = z.object({ enabled: z.boolean().optional(), token: z.string().optional(), groupPolicy: GroupPolicySchema.optional().default("open"), + historyLimit: z.number().int().min(0).optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), maxLinesPerMessage: z.number().int().positive().optional(), mediaMaxMb: z.number().positive().optional(), - historyLimit: z.number().int().min(0).optional(), retry: RetryConfigSchema, actions: z .object({ @@ -377,6 +378,7 @@ const SlackAccountSchema = z.object({ appToken: z.string().optional(), allowBots: z.boolean().optional(), groupPolicy: GroupPolicySchema.optional().default("open"), + historyLimit: z.number().int().min(0).optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), @@ -430,6 +432,7 @@ const SignalAccountSchemaBase = z.object({ allowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), + historyLimit: z.number().int().min(0).optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), @@ -477,6 +480,7 @@ const IMessageAccountSchemaBase = z.object({ allowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupAllowFrom: z.array(z.union([z.string(), z.number()])).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), + historyLimit: z.number().int().min(0).optional(), includeAttachments: z.boolean().optional(), mediaMaxMb: z.number().int().positive().optional(), textChunkLimit: z.number().int().positive().optional(), @@ -550,6 +554,7 @@ const MSTeamsConfigSchema = z blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), mediaAllowHosts: z.array(z.string()).optional(), requireMention: z.boolean().optional(), + historyLimit: z.number().int().min(0).optional(), replyStyle: MSTeamsReplyStyleSchema.optional(), teams: z.record(z.string(), MSTeamsTeamSchema.optional()).optional(), }) @@ -1286,6 +1291,7 @@ export const ClawdbotSchema = z allowFrom: z.array(z.string()).optional(), groupAllowFrom: z.array(z.string()).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), + historyLimit: z.number().int().min(0).optional(), textChunkLimit: z.number().int().positive().optional(), mediaMaxMb: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), @@ -1324,6 +1330,7 @@ export const ClawdbotSchema = z allowFrom: z.array(z.string()).optional(), groupAllowFrom: z.array(z.string()).optional(), groupPolicy: GroupPolicySchema.optional().default("open"), + historyLimit: z.number().int().min(0).optional(), textChunkLimit: z.number().int().positive().optional(), mediaMaxMb: z.number().int().positive().optional().default(50), blockStreaming: z.boolean().optional(), diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 84b8e2beb..7c8c20a82 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -38,6 +38,11 @@ import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; +import { + appendHistoryEntry, + buildHistoryContextFromEntries, + type HistoryEntry, +} from "../auto-reply/reply/history.js"; import { createReplyDispatcher, createReplyDispatcherWithTyping, @@ -117,14 +122,6 @@ type DiscordSnapshotMessage = { type DiscordMessageSnapshot = { message?: DiscordSnapshotMessage | null; }; - -type DiscordHistoryEntry = { - sender: string; - body: string; - timestamp?: number; - messageId?: string; -}; - type DiscordReactionEvent = Parameters[0]; type DiscordThreadChannel = { id: string; @@ -392,7 +389,10 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { const textLimit = resolveTextChunkLimit(cfg, "discord", account.accountId); const historyLimit = Math.max( 0, - opts.historyLimit ?? discordCfg.historyLimit ?? 20, + opts.historyLimit ?? + discordCfg.historyLimit ?? + cfg.messages?.groupChat?.historyLimit ?? + 20, ); const replyToMode = opts.replyToMode ?? discordCfg.replyToMode ?? "off"; const dmEnabled = dmConfig?.enabled ?? true; @@ -459,7 +459,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { ); const logger = getChildLogger({ module: "discord-auto-reply" }); - const guildHistories = new Map(); + const guildHistories = new Map(); let botUserId: string | undefined; if (nativeDisabledExplicit) { @@ -604,7 +604,7 @@ export function createDiscordMessageHandler(params: { token: string; runtime: RuntimeEnv; botUserId?: string; - guildHistories: Map; + guildHistories: Map; historyLimit: number; mediaMaxBytes: number; textLimit: number; @@ -888,19 +888,21 @@ export function createDiscordMessageHandler(params: { includeForwarded: true, }); if (isGuildMessage && historyLimit > 0 && textForHistory) { - const history = guildHistories.get(message.channelId) ?? []; - history.push({ - sender: - data.member?.nickname ?? - author.globalName ?? - author.username ?? - author.id, - body: textForHistory, - timestamp: resolveTimestampMs(message.timestamp), - messageId: message.id, + appendHistoryEntry({ + historyMap: guildHistories, + historyKey: message.channelId, + limit: historyLimit, + entry: { + sender: + data.member?.nickname ?? + author.globalName ?? + author.username ?? + author.id, + body: textForHistory, + timestamp: resolveTimestampMs(message.timestamp), + messageId: message.id, + }, }); - while (history.length > historyLimit) history.shift(); - guildHistories.set(message.channelId, history); } const shouldRequireMention = @@ -1049,21 +1051,17 @@ export function createDiscordMessageHandler(params: { if (!isDirectMessage) { const history = historyLimit > 0 ? (guildHistories.get(message.channelId) ?? []) : []; - const historyWithoutCurrent = - history.length > 0 ? history.slice(0, -1) : []; - if (historyWithoutCurrent.length > 0) { - const historyText = historyWithoutCurrent - .map((entry) => - formatAgentEnvelope({ - provider: "Discord", - from: fromLabel, - timestamp: entry.timestamp, - body: `${entry.sender}: ${entry.body} [id:${entry.messageId ?? "unknown"} channel:${message.channelId}]`, - }), - ) - .join("\n"); - combinedBody = `[Chat messages since your last reply - for context]\n${historyText}\n\n[Current message - respond to this]\n${combinedBody}`; - } + combinedBody = buildHistoryContextFromEntries({ + entries: history, + currentMessage: combinedBody, + formatEntry: (entry) => + formatAgentEnvelope({ + provider: "Discord", + from: fromLabel, + timestamp: entry.timestamp, + body: `${entry.sender}: ${entry.body} [id:${entry.messageId ?? "unknown"} channel:${message.channelId}]`, + }), + }); const name = formatDiscordUserTag(author); const id = author.id; combinedBody = `${combinedBody}\n[from: ${name} user id:${id}]`; @@ -1116,6 +1114,7 @@ export function createDiscordMessageHandler(params: { const ctxPayload = { Body: combinedBody, RawBody: baseText, + CommandBody: baseText, From: isDirectMessage ? `discord:${author.id}` : `group:${message.channelId}`, @@ -1649,6 +1648,7 @@ function createDiscordNativeCommand(params: { }); const ctxPayload = { Body: prompt, + CommandBody: prompt, From: isDirectMessage ? `discord:${user.id}` : `group:${channelId}`, To: `slash:${user.id}`, SessionKey: `agent:${route.agentId}:${sessionPrefix}:${user.id}`, diff --git a/src/imessage/monitor.ts b/src/imessage/monitor.ts index 72e4989e7..3da711932 100644 --- a/src/imessage/monitor.ts +++ b/src/imessage/monitor.ts @@ -6,6 +6,12 @@ import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { hasControlCommand } from "../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; +import { + DEFAULT_GROUP_HISTORY_LIMIT, + appendHistoryEntry, + buildHistoryContextFromEntries, + type HistoryEntry, +} from "../auto-reply/reply/history.js"; import { buildMentionRegexes, matchesMentionPatterns, @@ -140,6 +146,13 @@ export async function monitorIMessageProvider( accountId: opts.accountId, }); const imessageCfg = accountInfo.config; + const historyLimit = Math.max( + 0, + imessageCfg.historyLimit ?? + cfg.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const groupHistories = new Map(); const textLimit = resolveTextChunkLimit( cfg, "imessage", @@ -391,10 +404,43 @@ export async function monitorIMessageProvider( timestamp: createdAt, body: bodyText, }); + let combinedBody = body; + const historyKey = isGroup + ? String(chatId ?? chatGuid ?? chatIdentifier ?? "unknown") + : undefined; + if (isGroup && historyKey && historyLimit > 0) { + appendHistoryEntry({ + historyMap: groupHistories, + historyKey, + limit: historyLimit, + entry: { + sender: normalizeIMessageHandle(sender), + body: bodyText, + timestamp: createdAt, + messageId: message.id ? String(message.id) : undefined, + }, + }); + const history = groupHistories.get(historyKey) ?? []; + combinedBody = buildHistoryContextFromEntries({ + entries: history, + currentMessage: combinedBody, + formatEntry: (entry) => + formatAgentEnvelope({ + provider: "iMessage", + from: fromLabel, + timestamp: entry.timestamp, + body: `${entry.sender}: ${entry.body}${ + entry.messageId ? ` [id:${entry.messageId}]` : "" + }`, + }), + }); + } const imessageTo = chatTarget || `imessage:${sender}`; const ctxPayload = { - Body: body, + Body: combinedBody, + RawBody: bodyText, + CommandBody: bodyText, From: isGroup ? `group:${chatId}` : `imessage:${sender}`, To: imessageTo, SessionKey: route.sessionKey, @@ -444,6 +490,7 @@ export async function monitorIMessageProvider( ); } + let didSendReply = false; const dispatcher = createReplyDispatcher({ responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) .responsePrefix, @@ -458,6 +505,7 @@ export async function monitorIMessageProvider( maxBytes: mediaMaxBytes, textLimit, }); + didSendReply = true; }, onError: (err, info) => { runtime.error?.( @@ -477,7 +525,15 @@ export async function monitorIMessageProvider( : undefined, }, }); - if (!queuedFinal) return; + if (!queuedFinal) { + if (isGroup && historyKey && historyLimit > 0 && didSendReply) { + groupHistories.set(historyKey, []); + } + return; + } + if (isGroup && historyKey && historyLimit > 0 && didSendReply) { + groupHistories.set(historyKey, []); + } }; const client = await createIMessageRpcClient({ diff --git a/src/msteams/monitor-handler.ts b/src/msteams/monitor-handler.ts index 3d4ab2893..4c16706f9 100644 --- a/src/msteams/monitor-handler.ts +++ b/src/msteams/monitor-handler.ts @@ -1,5 +1,11 @@ import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; +import { + DEFAULT_GROUP_HISTORY_LIMIT, + appendHistoryEntry, + buildHistoryContextFromEntries, + type HistoryEntry, +} from "../auto-reply/reply/history.js"; import type { ClawdbotConfig } from "../config/types.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; @@ -111,6 +117,13 @@ function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { log, } = deps; const msteamsCfg = cfg.msteams; + const historyLimit = Math.max( + 0, + msteamsCfg?.historyLimit ?? + cfg.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const conversationHistories = new Map(); return async function handleTeamsMessage(context: MSTeamsTurnContext) { const activity = context.activity; @@ -415,10 +428,42 @@ function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { timestamp, body: rawBody, }); + let combinedBody = body; + const isRoomish = !isDirectMessage; + const historyKey = isRoomish ? conversationId : undefined; + if (isRoomish && historyKey && historyLimit > 0) { + appendHistoryEntry({ + historyMap: conversationHistories, + historyKey, + limit: historyLimit, + entry: { + sender: senderName, + body: rawBody, + timestamp: timestamp?.getTime(), + messageId: activity.id ?? undefined, + }, + }); + const history = conversationHistories.get(historyKey) ?? []; + combinedBody = buildHistoryContextFromEntries({ + entries: history, + currentMessage: combinedBody, + formatEntry: (entry) => + formatAgentEnvelope({ + provider: "Teams", + from: conversationType, + timestamp: entry.timestamp, + body: `${entry.sender}: ${entry.body}${ + entry.messageId ? ` [id:${entry.messageId}]` : "" + }`, + }), + }); + } // Build context payload for agent const ctxPayload = { - Body: body, + Body: combinedBody, + RawBody: rawBody, + CommandBody: rawBody, From: teamsFrom, To: teamsTo, SessionKey: route.sessionKey, @@ -472,13 +517,22 @@ function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { markDispatchIdle(); log.info("dispatch complete", { queuedFinal, counts }); - if (!queuedFinal) return; + const didSendReply = counts.final + counts.tool + counts.block > 0; + if (!queuedFinal) { + if (isRoomish && historyKey && historyLimit > 0 && didSendReply) { + conversationHistories.set(historyKey, []); + } + return; + } if (shouldLogVerbose()) { const finalCount = counts.final; logVerbose( `msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, ); } + if (isRoomish && historyKey && historyLimit > 0 && didSendReply) { + conversationHistories.set(historyKey, []); + } } catch (err) { log.error("dispatch failed", { error: String(err) }); runtime.error?.(danger(`msteams dispatch failed: ${String(err)}`)); diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index 34f0c7f44..c0a605b82 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -5,6 +5,12 @@ import { import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; +import { + DEFAULT_GROUP_HISTORY_LIMIT, + appendHistoryEntry, + buildHistoryContextFromEntries, + type HistoryEntry, +} from "../auto-reply/reply/history.js"; import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { ClawdbotConfig } from "../config/config.js"; @@ -328,6 +334,13 @@ export async function monitorSignalProvider( cfg, accountId: opts.accountId, }); + const historyLimit = Math.max( + 0, + accountInfo.config.historyLimit ?? + cfg.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const groupHistories = new Map(); const textLimit = resolveTextChunkLimit(cfg, "signal", accountInfo.accountId); const baseUrl = opts.baseUrl?.trim() || accountInfo.baseUrl; const account = opts.account?.trim() || accountInfo.config.account?.trim(); @@ -619,6 +632,38 @@ export async function monitorSignalProvider( timestamp: envelope.timestamp ?? undefined, body: bodyText, }); + let combinedBody = body; + const historyKey = isGroup ? String(groupId ?? "unknown") : undefined; + if (isGroup && historyKey && historyLimit > 0) { + appendHistoryEntry({ + historyMap: groupHistories, + historyKey, + limit: historyLimit, + entry: { + sender: envelope.sourceName ?? senderDisplay, + body: bodyText, + timestamp: envelope.timestamp ?? undefined, + messageId: + typeof envelope.timestamp === "number" + ? String(envelope.timestamp) + : undefined, + }, + }); + const history = groupHistories.get(historyKey) ?? []; + combinedBody = buildHistoryContextFromEntries({ + entries: history, + currentMessage: combinedBody, + formatEntry: (entry) => + formatAgentEnvelope({ + provider: "Signal", + from: fromLabel, + timestamp: entry.timestamp, + body: `${entry.sender}: ${entry.body}${ + entry.messageId ? ` [id:${entry.messageId}]` : "" + }`, + }), + }); + } const route = resolveAgentRoute({ cfg, @@ -633,7 +678,9 @@ export async function monitorSignalProvider( ? `group:${groupId}` : `signal:${senderRecipient}`; const ctxPayload = { - Body: body, + Body: combinedBody, + RawBody: bodyText, + CommandBody: bodyText, From: isGroup ? `group:${groupId ?? "unknown"}` : `signal:${senderRecipient}`, @@ -678,6 +725,7 @@ export async function monitorSignalProvider( ); } + let didSendReply = false; const dispatcher = createReplyDispatcher({ responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) .responsePrefix, @@ -693,6 +741,7 @@ export async function monitorSignalProvider( maxBytes: mediaMaxBytes, textLimit, }); + didSendReply = true; }, onError: (err, info) => { runtime.error?.( @@ -712,7 +761,15 @@ export async function monitorSignalProvider( : undefined, }, }); - if (!queuedFinal) return; + if (!queuedFinal) { + if (isGroup && historyKey && historyLimit > 0 && didSendReply) { + groupHistories.set(historyKey, []); + } + return; + } + if (isGroup && historyKey && historyLimit > 0 && didSendReply) { + groupHistories.set(historyKey, []); + } }; await runSignalSseLoop({ diff --git a/src/slack/monitor.tool-result.test.ts b/src/slack/monitor.tool-result.test.ts index 387f81c73..b7ef431a8 100644 --- a/src/slack/monitor.tool-result.test.ts +++ b/src/slack/monitor.tool-result.test.ts @@ -1,5 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js"; +import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js"; import { monitorSlackProvider } from "./monitor.js"; const sendMock = vi.fn(); @@ -218,6 +220,68 @@ describe("monitorSlackProvider tool results", () => { expect(sendMock.mock.calls[1][1]).toBe("final reply"); }); + it("wraps room history in Body and preserves RawBody", async () => { + config = { + messages: { ackReactionScope: "group-mentions" }, + slack: { + historyLimit: 5, + dm: { enabled: true, policy: "open", allowFrom: ["*"] }, + channels: { "*": { requireMention: false } }, + }, + }; + + let capturedCtx: { Body?: string; RawBody?: string; CommandBody?: string } = + {}; + replyMock.mockImplementation(async (ctx) => { + capturedCtx = ctx ?? {}; + return undefined; + }); + + const controller = new AbortController(); + const run = monitorSlackProvider({ + botToken: "bot-token", + appToken: "app-token", + abortSignal: controller.signal, + }); + + await waitForEvent("message"); + const handler = getSlackHandlers()?.get("message"); + if (!handler) throw new Error("Slack message handler not registered"); + + await handler({ + event: { + type: "message", + user: "U1", + text: "first", + ts: "123", + channel: "C1", + channel_type: "channel", + }, + }); + + await handler({ + event: { + type: "message", + user: "U2", + text: "second", + ts: "124", + channel: "C1", + channel_type: "channel", + }, + }); + + await flush(); + controller.abort(); + await run; + + expect(replyMock).toHaveBeenCalledTimes(2); + expect(capturedCtx.Body).toContain(HISTORY_CONTEXT_MARKER); + expect(capturedCtx.Body).toContain(CURRENT_MESSAGE_MARKER); + expect(capturedCtx.Body).toContain("first"); + expect(capturedCtx.RawBody).toBe("second"); + expect(capturedCtx.CommandBody).toBe("second"); + }); + it("updates assistant thread status when replies start", async () => { replyMock.mockImplementation(async (_ctx, opts) => { await opts?.onReplyStart?.(); diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index 9bc3d0035..1915ba602 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -28,6 +28,12 @@ import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; +import { + DEFAULT_GROUP_HISTORY_LIMIT, + appendHistoryEntry, + buildHistoryContextFromEntries, + type HistoryEntry, +} from "../auto-reply/reply/history.js"; import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; @@ -430,6 +436,13 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { cfg, accountId: opts.accountId, }); + const historyLimit = Math.max( + 0, + account.config.historyLimit ?? + cfg.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const channelHistories = new Map(); const sessionCfg = cfg.session; const sessionScope = sessionCfg?.scope ?? "per-sender"; const mainKey = normalizeMainKey(sessionCfg?.mainKey); @@ -954,6 +967,23 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { : null; const roomLabel = channelName ? `#${channelName}` : `#${message.channel}`; + const isRoomish = isRoom || isGroupDm; + const historyKey = message.channel; + if (isRoomish && historyLimit > 0) { + appendHistoryEntry({ + historyMap: channelHistories, + historyKey, + limit: historyLimit, + entry: { + sender: senderName, + body: rawBody, + timestamp: message.ts + ? Math.round(Number(message.ts) * 1000) + : undefined, + messageId: message.ts, + }, + }); + } const preview = rawBody.replace(/\s+/g, " ").slice(0, 160); const inboundLabel = isDirectMessage @@ -989,7 +1019,25 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { body: textWithId, }); - const isRoomish = isRoom || isGroupDm; + let combinedBody = body; + if (isRoomish && historyLimit > 0) { + const history = channelHistories.get(historyKey) ?? []; + combinedBody = buildHistoryContextFromEntries({ + entries: history, + currentMessage: combinedBody, + formatEntry: (entry) => + formatAgentEnvelope({ + provider: "Slack", + from: roomLabel, + timestamp: entry.timestamp, + body: `${entry.sender}: ${entry.body}${ + entry.messageId + ? ` [id:${entry.messageId} channel:${message.channel}]` + : "" + }`, + }), + }); + } const slackTo = isDirectMessage ? `user:${message.user}` : `channel:${message.channel}`; @@ -1033,7 +1081,9 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { } } const ctxPayload = { - Body: body, + Body: combinedBody, + RawBody: rawBody, + CommandBody: rawBody, From: slackFrom, To: slackTo, SessionKey: sessionKey, @@ -1106,6 +1156,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { status: "is typing...", }); }; + let didSendReply = false; const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) @@ -1127,6 +1178,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { textLimit, replyThreadTs: effectiveThreadTs, }); + didSendReply = true; hasRepliedRef.value = true; }, onError: (err, info) => { @@ -1166,7 +1218,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { status: "", }); } - if (!queuedFinal) return; + if (!queuedFinal) { + if (isRoomish && historyLimit > 0 && didSendReply) { + channelHistories.set(historyKey, []); + } + return; + } if (shouldLogVerbose()) { const finalCount = counts.final; logVerbose( @@ -1187,6 +1244,9 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { }); }); } + if (isRoomish && historyLimit > 0 && didSendReply) { + channelHistories.set(historyKey, []); + } }; app.event( diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index f10a42a27..b7550a1cf 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -24,6 +24,12 @@ import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; +import { + DEFAULT_GROUP_HISTORY_LIMIT, + appendHistoryEntry, + buildHistoryContextFromEntries, + type HistoryEntry, +} from "../auto-reply/reply/history.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; @@ -246,6 +252,13 @@ export function createTelegramBot(opts: TelegramBotOptions) { accountId: opts.accountId, }); const telegramCfg = account.config; + const historyLimit = Math.max( + 0, + telegramCfg.historyLimit ?? + cfg.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT, + ); + const groupHistories = new Map(); const textLimit = resolveTextChunkLimit(cfg, "telegram", account.accountId); const dmPolicy = telegramCfg.dmPolicy ?? "pairing"; const allowFrom = opts.allowFrom ?? telegramCfg.allowFrom; @@ -642,6 +655,9 @@ export function createTelegramBot(opts: TelegramBotOptions) { replyTarget.id ? ` id:${replyTarget.id}` : "" }]\n${replyTarget.body}\n[/Replying]` : ""; + const groupLabel = isGroup + ? buildGroupLabel(msg, chatId, messageThreadId) + : undefined; const body = formatAgentEnvelope({ provider: "Telegram", from: isGroup @@ -650,6 +666,38 @@ export function createTelegramBot(opts: TelegramBotOptions) { timestamp: msg.date ? msg.date * 1000 : undefined, body: `${bodyText}${replySuffix}`, }); + let combinedBody = body; + const historyKey = isGroup + ? buildTelegramGroupPeerId(chatId, messageThreadId) + : undefined; + if (isGroup && historyKey && historyLimit > 0) { + appendHistoryEntry({ + historyMap: groupHistories, + historyKey, + limit: historyLimit, + entry: { + sender: buildSenderLabel(msg, senderId || chatId), + body: rawBody, + timestamp: msg.date ? msg.date * 1000 : undefined, + messageId: + typeof msg.message_id === "number" + ? String(msg.message_id) + : undefined, + }, + }); + const history = groupHistories.get(historyKey) ?? []; + combinedBody = buildHistoryContextFromEntries({ + entries: history, + currentMessage: combinedBody, + formatEntry: (entry) => + formatAgentEnvelope({ + provider: "Telegram", + from: groupLabel ?? `group:${chatId}`, + timestamp: entry.timestamp, + body: `${entry.sender}: ${entry.body} [id:${entry.messageId ?? "unknown"} chat:${chatId}]`, + }), + }); + } const skillFilter = firstDefined(topicConfig?.skills, groupConfig?.skills); const systemPromptParts = [ @@ -659,7 +707,9 @@ export function createTelegramBot(opts: TelegramBotOptions) { const groupSystemPrompt = systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined; const ctxPayload = { - Body: body, + Body: combinedBody, + RawBody: rawBody, + CommandBody: rawBody, From: isGroup ? buildTelegramGroupFrom(chatId, messageThreadId) : `telegram:${chatId}`, @@ -810,6 +860,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { ? !telegramCfg.blockStreaming : undefined); + let didSendReply = false; const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg, @@ -831,6 +882,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { textLimit, messageThreadId, }); + didSendReply = true; }, onError: (err, info) => { runtime.error?.( @@ -853,7 +905,12 @@ export function createTelegramBot(opts: TelegramBotOptions) { }, }); draftStream?.stop(); - if (!queuedFinal) return; + if (!queuedFinal) { + if (isGroup && historyKey && historyLimit > 0 && didSendReply) { + groupHistories.set(historyKey, []); + } + return; + } if ( removeAckAfterReply && ackReactionPromise && @@ -869,6 +926,9 @@ export function createTelegramBot(opts: TelegramBotOptions) { }); }); } + if (isGroup && historyKey && historyLimit > 0 && didSendReply) { + groupHistories.set(historyKey, []); + } }; const nativeCommands = nativeEnabled ? listNativeCommandSpecs() : []; diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 47c8d8e0e..07fce8c8f 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -21,6 +21,10 @@ import { buildMentionRegexes, normalizeMentionText, } from "../auto-reply/reply/mentions.js"; +import { + buildHistoryContext, + DEFAULT_GROUP_HISTORY_LIMIT, +} from "../auto-reply/reply/history.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; @@ -75,7 +79,6 @@ import { } from "./reconnect.js"; import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js"; -const DEFAULT_GROUP_HISTORY_LIMIT = 50; const whatsappLog = createSubsystemLogger("gateway/providers/whatsapp"); const whatsappInboundLog = whatsappLog.child("inbound"); const whatsappOutboundLog = whatsappLog.child("outbound"); @@ -817,7 +820,10 @@ export async function monitorWebProvider( buildMentionConfig(cfg, agentId); const baseMentionConfig = resolveMentionConfig(); const groupHistoryLimit = - cfg.messages?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT; + cfg.whatsapp?.accounts?.[tuning.accountId ?? ""]?.historyLimit ?? + cfg.whatsapp?.historyLimit ?? + cfg.messages?.groupChat?.historyLimit ?? + DEFAULT_GROUP_HISTORY_LIMIT; const groupHistories = new Map< string, Array<{ sender: string; body: string; timestamp?: number }> @@ -1115,6 +1121,7 @@ export async function monitorWebProvider( const historyWithoutCurrent = history.length > 0 ? history.slice(0, -1) : []; if (historyWithoutCurrent.length > 0) { + const lineBreak = "\\n"; const historyText = historyWithoutCurrent .map((m) => formatAgentEnvelope({ @@ -1124,11 +1131,12 @@ export async function monitorWebProvider( body: `${m.sender}: ${m.body}`, }), ) - .join("\\n"); - combinedBody = `[Chat messages since your last reply - for context]\\n${historyText}\\n\\n[Current message - respond to this]\\n${buildLine( - msg, - route.agentId, - )}`; + .join(lineBreak); + combinedBody = buildHistoryContext({ + historyText, + currentMessage: buildLine(msg, route.agentId), + lineBreak, + }); } // Always surface who sent the triggering message so the agent can address them. const senderLabel = @@ -1222,6 +1230,7 @@ export async function monitorWebProvider( ctx: { Body: combinedBody, RawBody: msg.body, + CommandBody: msg.body, From: msg.from, To: msg.to, SessionKey: route.sessionKey,