diff --git a/src/agents/pi-embedded-runner/google.ts b/src/agents/pi-embedded-runner/google.ts index 8158c2c83..605e9b10b 100644 --- a/src/agents/pi-embedded-runner/google.ts +++ b/src/agents/pi-embedded-runner/google.ts @@ -1,3 +1,5 @@ +import { EventEmitter } from "node:events"; + import type { AgentMessage, AgentTool } from "@mariozechner/pi-agent-core"; import type { TSchema } from "@sinclair/typebox"; import type { SessionManager } from "@mariozechner/pi-coding-agent"; @@ -184,10 +186,28 @@ export function logToolSchemasForGoogle(params: { tools: AgentTool[]; provider: } } +// Event emitter for unhandled compaction failures that escape try-catch blocks. +// Listeners can use this to trigger session recovery with retry. +const compactionFailureEmitter = new EventEmitter(); + +export type CompactionFailureListener = (reason: string) => void; + +/** + * Register a listener for unhandled compaction failures. + * Called when auto-compaction fails in a way that escapes the normal try-catch, + * e.g., when the summarization request itself exceeds the model's token limit. + * Returns an unsubscribe function. + */ +export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void { + compactionFailureEmitter.on("failure", cb); + return () => compactionFailureEmitter.off("failure", cb); +} + registerUnhandledRejectionHandler((reason) => { const message = describeUnknownError(reason); if (!isCompactionFailureError(message)) return false; log.error(`Auto-compaction failed (unhandled): ${message}`); + compactionFailureEmitter.emit("failure", message); return true; }); diff --git a/src/agents/pi-extensions/compaction-safeguard.test.ts b/src/agents/pi-extensions/compaction-safeguard.test.ts index d542da4fc..275e10e9f 100644 --- a/src/agents/pi-extensions/compaction-safeguard.test.ts +++ b/src/agents/pi-extensions/compaction-safeguard.test.ts @@ -3,7 +3,15 @@ import { describe, expect, it } from "vitest"; import { __testing } from "./compaction-safeguard.js"; -const { collectToolFailures, formatToolFailuresSection } = __testing; +const { + collectToolFailures, + formatToolFailuresSection, + computeAdaptiveChunkRatio, + isOversizedForSummary, + BASE_CHUNK_RATIO, + MIN_CHUNK_RATIO, + SAFETY_MARGIN, +} = __testing; describe("compaction-safeguard tool failures", () => { it("formats tool failures with meta and summary", () => { @@ -96,3 +104,107 @@ describe("compaction-safeguard tool failures", () => { expect(section).toBe(""); }); }); + +describe("computeAdaptiveChunkRatio", () => { + const CONTEXT_WINDOW = 200_000; + + it("returns BASE_CHUNK_RATIO for normal messages", () => { + // Small messages: 1000 tokens each, well under 10% of context + const messages: AgentMessage[] = [ + { role: "user", content: "x".repeat(1000), timestamp: Date.now() }, + { + role: "assistant", + content: [{ type: "text", text: "y".repeat(1000) }], + timestamp: Date.now(), + }, + ]; + + const ratio = computeAdaptiveChunkRatio(messages, CONTEXT_WINDOW); + expect(ratio).toBe(BASE_CHUNK_RATIO); + }); + + it("reduces ratio when average message > 10% of context", () => { + // Large messages: ~50K tokens each (25% of context) + const messages: AgentMessage[] = [ + { role: "user", content: "x".repeat(50_000 * 4), timestamp: Date.now() }, + { + role: "assistant", + content: [{ type: "text", text: "y".repeat(50_000 * 4) }], + timestamp: Date.now(), + }, + ]; + + const ratio = computeAdaptiveChunkRatio(messages, CONTEXT_WINDOW); + expect(ratio).toBeLessThan(BASE_CHUNK_RATIO); + expect(ratio).toBeGreaterThanOrEqual(MIN_CHUNK_RATIO); + }); + + it("respects MIN_CHUNK_RATIO floor", () => { + // Very large messages that would push ratio below minimum + const messages: AgentMessage[] = [ + { role: "user", content: "x".repeat(150_000 * 4), timestamp: Date.now() }, + ]; + + const ratio = computeAdaptiveChunkRatio(messages, CONTEXT_WINDOW); + expect(ratio).toBeGreaterThanOrEqual(MIN_CHUNK_RATIO); + }); + + it("handles empty message array", () => { + const ratio = computeAdaptiveChunkRatio([], CONTEXT_WINDOW); + expect(ratio).toBe(BASE_CHUNK_RATIO); + }); + + it("handles single huge message", () => { + // Single massive message + const messages: AgentMessage[] = [ + { role: "user", content: "x".repeat(180_000 * 4), timestamp: Date.now() }, + ]; + + const ratio = computeAdaptiveChunkRatio(messages, CONTEXT_WINDOW); + expect(ratio).toBeGreaterThanOrEqual(MIN_CHUNK_RATIO); + expect(ratio).toBeLessThanOrEqual(BASE_CHUNK_RATIO); + }); +}); + +describe("isOversizedForSummary", () => { + const CONTEXT_WINDOW = 200_000; + + it("returns false for small messages", () => { + const msg: AgentMessage = { + role: "user", + content: "Hello, world!", + timestamp: Date.now(), + }; + + expect(isOversizedForSummary(msg, CONTEXT_WINDOW)).toBe(false); + }); + + it("returns true for messages > 50% of context", () => { + // Message with ~120K tokens (60% of 200K context) + // After safety margin (1.2x), effective is 144K which is > 100K (50%) + const msg: AgentMessage = { + role: "user", + content: "x".repeat(120_000 * 4), + timestamp: Date.now(), + }; + + expect(isOversizedForSummary(msg, CONTEXT_WINDOW)).toBe(true); + }); + + it("applies safety margin", () => { + // Message at exactly 50% of context before margin + // After SAFETY_MARGIN (1.2), it becomes 60% which is > 50% + const halfContextChars = (CONTEXT_WINDOW * 0.5) / SAFETY_MARGIN; + const msg: AgentMessage = { + role: "user", + content: "x".repeat(Math.floor(halfContextChars * 4)), + timestamp: Date.now(), + }; + + // With safety margin applied, this should be at the boundary + // The function checks if tokens * SAFETY_MARGIN > contextWindow * 0.5 + const isOversized = isOversizedForSummary(msg, CONTEXT_WINDOW); + // Due to token estimation, this could be either true or false at the boundary + expect(typeof isOversized).toBe("boolean"); + }); +}); diff --git a/src/agents/pi-extensions/compaction-safeguard.ts b/src/agents/pi-extensions/compaction-safeguard.ts index 9e4c20fef..a6a66637a 100644 --- a/src/agents/pi-extensions/compaction-safeguard.ts +++ b/src/agents/pi-extensions/compaction-safeguard.ts @@ -4,7 +4,9 @@ import { estimateTokens, generateSummary } from "@mariozechner/pi-coding-agent"; import { DEFAULT_CONTEXT_TOKENS } from "../defaults.js"; -const MAX_CHUNK_RATIO = 0.4; +const BASE_CHUNK_RATIO = 0.4; +const MIN_CHUNK_RATIO = 0.15; +const SAFETY_MARGIN = 1.2; // 20% buffer for estimateTokens() inaccuracy const FALLBACK_SUMMARY = "Summary unavailable due to context limits. Older messages were truncated."; const TURN_PREFIX_INSTRUCTIONS = @@ -160,6 +162,38 @@ function chunkMessages(messages: AgentMessage[], maxTokens: number): AgentMessag return chunks; } +/** + * Compute adaptive chunk ratio based on average message size. + * When messages are large, we use smaller chunks to avoid exceeding model limits. + */ +function computeAdaptiveChunkRatio(messages: AgentMessage[], contextWindow: number): number { + if (messages.length === 0) return BASE_CHUNK_RATIO; + + const totalTokens = messages.reduce((sum, m) => sum + estimateTokens(m), 0); + const avgTokens = totalTokens / messages.length; + + // Apply safety margin to account for estimation inaccuracy + const safeAvgTokens = avgTokens * SAFETY_MARGIN; + const avgRatio = safeAvgTokens / contextWindow; + + // If average message is > 10% of context, reduce chunk ratio + if (avgRatio > 0.1) { + const reduction = Math.min(avgRatio * 2, BASE_CHUNK_RATIO - MIN_CHUNK_RATIO); + return Math.max(MIN_CHUNK_RATIO, BASE_CHUNK_RATIO - reduction); + } + + return BASE_CHUNK_RATIO; +} + +/** + * Check if a single message is too large to summarize. + * If single message > 50% of context, it can't be summarized safely. + */ +function isOversizedForSummary(msg: AgentMessage, contextWindow: number): boolean { + const tokens = estimateTokens(msg) * SAFETY_MARGIN; + return tokens > contextWindow * 0.5; +} + async function summarizeChunks(params: { messages: AgentMessage[]; model: NonNullable; @@ -192,6 +226,78 @@ async function summarizeChunks(params: { return summary ?? "No prior history."; } +/** + * Summarize with progressive fallback for handling oversized messages. + * If full summarization fails, tries partial summarization excluding oversized messages. + */ +async function summarizeWithFallback(params: { + messages: AgentMessage[]; + model: NonNullable; + apiKey: string; + signal: AbortSignal; + reserveTokens: number; + maxChunkTokens: number; + contextWindow: number; + customInstructions?: string; + previousSummary?: string; +}): Promise { + const { messages, contextWindow } = params; + + if (messages.length === 0) { + return params.previousSummary ?? "No prior history."; + } + + // Try full summarization first + try { + return await summarizeChunks(params); + } catch (fullError) { + console.warn( + `Full summarization failed, trying partial: ${ + fullError instanceof Error ? fullError.message : String(fullError) + }`, + ); + } + + // Fallback 1: Summarize only small messages, note oversized ones + const smallMessages: AgentMessage[] = []; + const oversizedNotes: string[] = []; + + for (const msg of messages) { + if (isOversizedForSummary(msg, contextWindow)) { + const role = (msg as { role?: string }).role ?? "message"; + const tokens = estimateTokens(msg); + oversizedNotes.push( + `[Large ${role} (~${Math.round(tokens / 1000)}K tokens) omitted from summary]`, + ); + } else { + smallMessages.push(msg); + } + } + + if (smallMessages.length > 0) { + try { + const partialSummary = await summarizeChunks({ + ...params, + messages: smallMessages, + }); + const notes = oversizedNotes.length > 0 ? `\n\n${oversizedNotes.join("\n")}` : ""; + return partialSummary + notes; + } catch (partialError) { + console.warn( + `Partial summarization also failed: ${ + partialError instanceof Error ? partialError.message : String(partialError) + }`, + ); + } + } + + // Final fallback: Just note what was there + return ( + `Context contained ${messages.length} messages (${oversizedNotes.length} oversized). ` + + `Summary unavailable due to size limits.` + ); +} + export default function compactionSafeguardExtension(api: ExtensionAPI): void { api.on("session_before_compact", async (event, ctx) => { const { preparation, customInstructions, signal } = event; @@ -233,29 +339,35 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void { 1, Math.floor(model.contextWindow ?? DEFAULT_CONTEXT_TOKENS), ); - const maxChunkTokens = Math.max(1, Math.floor(contextWindowTokens * MAX_CHUNK_RATIO)); + + // Use adaptive chunk ratio based on message sizes + const allMessages = [...preparation.messagesToSummarize, ...preparation.turnPrefixMessages]; + const adaptiveRatio = computeAdaptiveChunkRatio(allMessages, contextWindowTokens); + const maxChunkTokens = Math.max(1, Math.floor(contextWindowTokens * adaptiveRatio)); const reserveTokens = Math.max(1, Math.floor(preparation.settings.reserveTokens)); - const historySummary = await summarizeChunks({ + const historySummary = await summarizeWithFallback({ messages: preparation.messagesToSummarize, model, apiKey, signal, reserveTokens, maxChunkTokens, + contextWindow: contextWindowTokens, customInstructions, previousSummary: preparation.previousSummary, }); let summary = historySummary; if (preparation.isSplitTurn && preparation.turnPrefixMessages.length > 0) { - const prefixSummary = await summarizeChunks({ + const prefixSummary = await summarizeWithFallback({ messages: preparation.turnPrefixMessages, model, apiKey, signal, reserveTokens, maxChunkTokens, + contextWindow: contextWindowTokens, customInstructions: TURN_PREFIX_INSTRUCTIONS, }); summary = `${historySummary}\n\n---\n\n**Turn Context (split turn):**\n\n${prefixSummary}`; @@ -293,4 +405,9 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void { export const __testing = { collectToolFailures, formatToolFailuresSection, + computeAdaptiveChunkRatio, + isOversizedForSummary, + BASE_CHUNK_RATIO, + MIN_CHUNK_RATIO, + SAFETY_MARGIN, } as const; diff --git a/src/gateway/assistant-identity.ts b/src/gateway/assistant-identity.ts index 35ff43490..bc0506f50 100644 --- a/src/gateway/assistant-identity.ts +++ b/src/gateway/assistant-identity.ts @@ -7,7 +7,8 @@ import { normalizeAgentId } from "../routing/session-key.js"; const MAX_ASSISTANT_NAME = 50; const MAX_ASSISTANT_AVATAR = 200; -export const DEFAULT_ASSISTANT_IDENTITY = { +export const DEFAULT_ASSISTANT_IDENTITY: AssistantIdentity = { + agentId: "main", name: "Assistant", avatar: "A", }; diff --git a/ui/src/styles/components.css b/ui/src/styles/components.css index 6a047fb6c..b46f55b6f 100644 --- a/ui/src/styles/components.css +++ b/ui/src/styles/components.css @@ -415,6 +415,51 @@ color: var(--danger); } +.callout.info { + border-color: rgba(92, 156, 255, 0.4); + color: var(--accent); +} + +.callout.success { + border-color: rgba(92, 255, 128, 0.4); + color: var(--positive, #5cff80); +} + +.compaction-indicator { + font-size: 13px; + padding: 8px 12px; + margin-bottom: 8px; + animation: compaction-fade-in 0.2s ease-out; +} + +.compaction-indicator--active { + animation: compaction-pulse 1.5s ease-in-out infinite; +} + +.compaction-indicator--complete { + animation: compaction-fade-in 0.2s ease-out; +} + +@keyframes compaction-fade-in { + from { + opacity: 0; + transform: translateY(-4px); + } + to { + opacity: 1; + transform: translateY(0); + } +} + +@keyframes compaction-pulse { + 0%, 100% { + opacity: 0.7; + } + 50% { + opacity: 1; + } +} + .code-block { font-family: var(--mono); font-size: 12px; diff --git a/ui/src/ui/app-gateway.ts b/ui/src/ui/app-gateway.ts index d6b0bbcb5..edabb574f 100644 --- a/ui/src/ui/app-gateway.ts +++ b/ui/src/ui/app-gateway.ts @@ -145,6 +145,14 @@ export function connectGateway(host: GatewayHost) { } export function handleGatewayEvent(host: GatewayHost, evt: GatewayEventFrame) { + try { + handleGatewayEventUnsafe(host, evt); + } catch (err) { + console.error("[gateway] handleGatewayEvent error:", evt.event, err); + } +} + +function handleGatewayEventUnsafe(host: GatewayHost, evt: GatewayEventFrame) { host.eventLogBuffer = [ { ts: Date.now(), event: evt.event, payload: evt.payload }, ...host.eventLogBuffer, diff --git a/ui/src/ui/app-render.ts b/ui/src/ui/app-render.ts index 96f2b729e..4fa30722f 100644 --- a/ui/src/ui/app-render.ts +++ b/ui/src/ui/app-render.ts @@ -444,6 +444,7 @@ export function renderApp(state: AppViewState) { showThinking, loading: state.chatLoading, sending: state.chatSending, + compactionStatus: state.compactionStatus, assistantAvatarUrl: chatAvatarUrl, messages: state.chatMessages, toolMessages: state.chatToolMessages, diff --git a/ui/src/ui/app-tool-stream.ts b/ui/src/ui/app-tool-stream.ts index b94adada2..5c83c3a79 100644 --- a/ui/src/ui/app-tool-stream.ts +++ b/ui/src/ui/app-tool-stream.ts @@ -138,8 +138,59 @@ export function resetToolStream(host: ToolStreamHost) { flushToolStreamSync(host); } +export type CompactionStatus = { + active: boolean; + startedAt: number | null; + completedAt: number | null; +}; + +type CompactionHost = ToolStreamHost & { + compactionStatus?: CompactionStatus | null; + compactionClearTimer?: number | null; +}; + +const COMPACTION_TOAST_DURATION_MS = 5000; + +export function handleCompactionEvent(host: CompactionHost, payload: AgentEventPayload) { + const data = payload.data ?? {}; + const phase = typeof data.phase === "string" ? data.phase : ""; + + // Clear any existing timer + if (host.compactionClearTimer != null) { + window.clearTimeout(host.compactionClearTimer); + host.compactionClearTimer = null; + } + + if (phase === "start") { + host.compactionStatus = { + active: true, + startedAt: Date.now(), + completedAt: null, + }; + } else if (phase === "end") { + host.compactionStatus = { + active: false, + startedAt: host.compactionStatus?.startedAt ?? null, + completedAt: Date.now(), + }; + // Auto-clear the toast after duration + host.compactionClearTimer = window.setTimeout(() => { + host.compactionStatus = null; + host.compactionClearTimer = null; + }, COMPACTION_TOAST_DURATION_MS); + } +} + export function handleAgentEvent(host: ToolStreamHost, payload?: AgentEventPayload) { - if (!payload || payload.stream !== "tool") return; + if (!payload) return; + + // Handle compaction events + if (payload.stream === "compaction") { + handleCompactionEvent(host as CompactionHost, payload); + return; + } + + if (payload.stream !== "tool") return; const sessionKey = typeof payload.sessionKey === "string" ? payload.sessionKey : undefined; if (sessionKey && sessionKey !== host.sessionKey) return; diff --git a/ui/src/ui/app.ts b/ui/src/ui/app.ts index 94886a393..cd3537f6d 100644 --- a/ui/src/ui/app.ts +++ b/ui/src/ui/app.ts @@ -125,6 +125,7 @@ export class ClawdbotApp extends LitElement { @state() chatStream: string | null = null; @state() chatStreamStartedAt: number | null = null; @state() chatRunId: string | null = null; + @state() compactionStatus: import("./app-tool-stream").CompactionStatus | null = null; @state() chatAvatarUrl: string | null = null; @state() chatThinkingLevel: string | null = null; @state() chatQueue: ChatQueueItem[] = []; diff --git a/ui/src/ui/gateway.ts b/ui/src/ui/gateway.ts index 5f1ab01ff..fc8dde08a 100644 --- a/ui/src/ui/gateway.ts +++ b/ui/src/ui/gateway.ts @@ -254,7 +254,11 @@ export class GatewayBrowserClient { } this.lastSeq = seq; } - this.opts.onEvent?.(evt); + try { + this.opts.onEvent?.(evt); + } catch (err) { + console.error("[gateway] event handler error:", err); + } return; } diff --git a/ui/src/ui/views/chat.ts b/ui/src/ui/views/chat.ts index 01b06f22e..97ce9d4ec 100644 --- a/ui/src/ui/views/chat.ts +++ b/ui/src/ui/views/chat.ts @@ -16,6 +16,12 @@ import { import { renderMarkdownSidebar } from "./markdown-sidebar"; import "../components/resizable-divider"; +export type CompactionIndicatorStatus = { + active: boolean; + startedAt: number | null; + completedAt: number | null; +}; + export type ChatProps = { sessionKey: string; onSessionKeyChange: (next: string) => void; @@ -24,6 +30,7 @@ export type ChatProps = { loading: boolean; sending: boolean; canAbort?: boolean; + compactionStatus?: CompactionIndicatorStatus | null; messages: unknown[]; toolMessages: unknown[]; stream: string | null; @@ -59,6 +66,35 @@ export type ChatProps = { onChatScroll?: (event: Event) => void; }; +const COMPACTION_TOAST_DURATION_MS = 5000; + +function renderCompactionIndicator(status: CompactionIndicatorStatus | null | undefined) { + if (!status) return nothing; + + // Show "compacting..." while active + if (status.active) { + return html` +
+ 🧹 Compacting context... +
+ `; + } + + // Show "compaction complete" briefly after completion + if (status.completedAt) { + const elapsed = Date.now() - status.completedAt; + if (elapsed < COMPACTION_TOAST_DURATION_MS) { + return html` +
+ 🧹 Context compacted +
+ `; + } + } + + return nothing; +} + export function renderChat(props: ChatProps) { const canCompose = props.connected; const isBusy = props.sending || props.stream !== null; @@ -89,6 +125,8 @@ export function renderChat(props: ChatProps) { ? html`
${props.error}
` : nothing} + ${renderCompactionIndicator(props.compactionStatus)} + ${props.focusMode ? html`