diff --git a/CHANGELOG.md b/CHANGELOG.md index b023a40ce..0fe5f253c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -143,6 +143,7 @@ ### Fixes - Packaging: include `dist/memory/**` in the npm tarball (fixes `ERR_MODULE_NOT_FOUND` for `dist/memory/index.js`). - Agents: persist sub-agent registry across gateway restarts and resume announce flow safely. (#831) β€” thanks @roshanasingh4. +- Agents: strip invalid Gemini thought signatures from OpenRouter history to avoid 400s. (#841) ## 2026.1.12-1 diff --git a/src/agents/pi-embedded-helpers.test.ts b/src/agents/pi-embedded-helpers.test.ts new file mode 100644 index 000000000..314e64b01 --- /dev/null +++ b/src/agents/pi-embedded-helpers.test.ts @@ -0,0 +1,755 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import type { AssistantMessage } from "@mariozechner/pi-ai"; +import { describe, expect, it } from "vitest"; +import type { ClawdbotConfig } from "../config/config.js"; +import { + buildBootstrapContextFiles, + classifyFailoverReason, + DEFAULT_BOOTSTRAP_MAX_CHARS, + formatAssistantErrorText, + isAuthErrorMessage, + isBillingErrorMessage, + isCloudCodeAssistFormatError, + isCompactionFailureError, + isContextOverflowError, + isFailoverErrorMessage, + isMessagingToolDuplicate, + normalizeTextForComparison, + resolveBootstrapMaxChars, + sanitizeGoogleTurnOrdering, + sanitizeSessionMessagesImages, + sanitizeToolCallId, + stripThoughtSignatures, +} from "./pi-embedded-helpers.js"; +import { + DEFAULT_AGENTS_FILENAME, + type WorkspaceBootstrapFile, +} from "./workspace.js"; + +const makeFile = ( + overrides: Partial, +): WorkspaceBootstrapFile => ({ + name: DEFAULT_AGENTS_FILENAME, + path: "/tmp/AGENTS.md", + content: "", + missing: false, + ...overrides, +}); +describe("buildBootstrapContextFiles", () => { + it("keeps missing markers", () => { + const files = [makeFile({ missing: true, content: undefined })]; + expect(buildBootstrapContextFiles(files)).toEqual([ + { + path: DEFAULT_AGENTS_FILENAME, + content: "[MISSING] Expected at: /tmp/AGENTS.md", + }, + ]); + }); + + it("skips empty or whitespace-only content", () => { + const files = [makeFile({ content: " \n " })]; + expect(buildBootstrapContextFiles(files)).toEqual([]); + }); + + it("truncates large bootstrap content", () => { + const head = `HEAD-${"a".repeat(600)}`; + const tail = `${"b".repeat(300)}-TAIL`; + const long = `${head}${tail}`; + const files = [makeFile({ name: "TOOLS.md", content: long })]; + const warnings: string[] = []; + const maxChars = 200; + const expectedTailChars = Math.floor(maxChars * 0.2); + const [result] = buildBootstrapContextFiles(files, { + maxChars, + warn: (message) => warnings.push(message), + }); + expect(result?.content).toContain( + "[...truncated, read TOOLS.md for full content...]", + ); + expect(result?.content.length).toBeLessThan(long.length); + expect(result?.content.startsWith(long.slice(0, 120))).toBe(true); + expect(result?.content.endsWith(long.slice(-expectedTailChars))).toBe(true); + expect(warnings).toHaveLength(1); + expect(warnings[0]).toContain("TOOLS.md"); + expect(warnings[0]).toContain("limit 200"); + }); + + it("keeps content under the default limit", () => { + const long = "a".repeat(DEFAULT_BOOTSTRAP_MAX_CHARS - 10); + const files = [makeFile({ content: long })]; + const [result] = buildBootstrapContextFiles(files); + expect(result?.content).toBe(long); + expect(result?.content).not.toContain( + "[...truncated, read AGENTS.md for full content...]", + ); + }); +}); + +describe("resolveBootstrapMaxChars", () => { + it("returns default when unset", () => { + expect(resolveBootstrapMaxChars()).toBe(DEFAULT_BOOTSTRAP_MAX_CHARS); + }); + + it("uses configured value when valid", () => { + const cfg = { + agents: { defaults: { bootstrapMaxChars: 12345 } }, + } as ClawdbotConfig; + expect(resolveBootstrapMaxChars(cfg)).toBe(12345); + }); + + it("falls back when invalid", () => { + const cfg = { + agents: { defaults: { bootstrapMaxChars: -1 } }, + } as ClawdbotConfig; + expect(resolveBootstrapMaxChars(cfg)).toBe(DEFAULT_BOOTSTRAP_MAX_CHARS); + }); +}); + +describe("isContextOverflowError", () => { + it("matches known overflow hints", () => { + const samples = [ + "request_too_large", + "Request exceeds the maximum size", + "context length exceeded", + "Maximum context length", + "prompt is too long: 208423 tokens > 200000 maximum", + "Context overflow: Summarization failed", + "413 Request Entity Too Large", + ]; + for (const sample of samples) { + expect(isContextOverflowError(sample)).toBe(true); + } + }); + + it("ignores unrelated errors", () => { + expect(isContextOverflowError("rate limit exceeded")).toBe(false); + }); +}); + +describe("isCompactionFailureError", () => { + it("matches compaction overflow failures", () => { + const samples = [ + 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', + "auto-compaction failed due to context overflow", + "Compaction failed: prompt is too long", + ]; + for (const sample of samples) { + expect(isCompactionFailureError(sample)).toBe(true); + } + }); + + it("ignores non-compaction overflow errors", () => { + expect(isCompactionFailureError("Context overflow: prompt too large")).toBe( + false, + ); + expect(isCompactionFailureError("rate limit exceeded")).toBe(false); + }); +}); + +describe("isBillingErrorMessage", () => { + it("matches credit / payment failures", () => { + const samples = [ + "Your credit balance is too low to access the Anthropic API.", + "insufficient credits", + "Payment Required", + "HTTP 402 Payment Required", + "plans & billing", + "billing: please upgrade your plan", + ]; + for (const sample of samples) { + expect(isBillingErrorMessage(sample)).toBe(true); + } + }); + + it("ignores unrelated errors", () => { + expect(isBillingErrorMessage("rate limit exceeded")).toBe(false); + expect(isBillingErrorMessage("invalid api key")).toBe(false); + expect(isBillingErrorMessage("context length exceeded")).toBe(false); + }); +}); + +describe("isAuthErrorMessage", () => { + it("matches credential validation errors", () => { + const samples = [ + 'No credentials found for profile "anthropic:claude-cli".', + "No API key found for profile openai.", + ]; + for (const sample of samples) { + expect(isAuthErrorMessage(sample)).toBe(true); + } + }); + + it("ignores unrelated errors", () => { + expect(isAuthErrorMessage("rate limit exceeded")).toBe(false); + expect(isAuthErrorMessage("billing issue detected")).toBe(false); + }); +}); + +describe("isFailoverErrorMessage", () => { + it("matches auth/rate/billing/timeout", () => { + const samples = [ + "invalid api key", + "429 rate limit exceeded", + "Your credit balance is too low", + "request timed out", + "invalid request format", + ]; + for (const sample of samples) { + expect(isFailoverErrorMessage(sample)).toBe(true); + } + }); +}); + +describe("classifyFailoverReason", () => { + it("returns a stable reason", () => { + expect(classifyFailoverReason("invalid api key")).toBe("auth"); + expect(classifyFailoverReason("no credentials found")).toBe("auth"); + expect(classifyFailoverReason("no api key found")).toBe("auth"); + expect(classifyFailoverReason("429 too many requests")).toBe("rate_limit"); + expect(classifyFailoverReason("resource has been exhausted")).toBe( + "rate_limit", + ); + expect( + classifyFailoverReason( + '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}', + ), + ).toBe("rate_limit"); + expect(classifyFailoverReason("invalid request format")).toBe("format"); + expect(classifyFailoverReason("credit balance too low")).toBe("billing"); + expect(classifyFailoverReason("deadline exceeded")).toBe("timeout"); + expect(classifyFailoverReason("string should match pattern")).toBe( + "format", + ); + expect(classifyFailoverReason("bad request")).toBeNull(); + }); + + it("classifies OpenAI usage limit errors as rate_limit", () => { + expect( + classifyFailoverReason( + "You have hit your ChatGPT usage limit (plus plan)", + ), + ).toBe("rate_limit"); + }); +}); + +describe("isCloudCodeAssistFormatError", () => { + it("matches format errors", () => { + const samples = [ + "INVALID_REQUEST_ERROR: string should match pattern", + "messages.1.content.1.tool_use.id", + "tool_use.id should match pattern", + "invalid request format", + ]; + for (const sample of samples) { + expect(isCloudCodeAssistFormatError(sample)).toBe(true); + } + }); + + it("ignores unrelated errors", () => { + expect(isCloudCodeAssistFormatError("rate limit exceeded")).toBe(false); + }); +}); + +describe("formatAssistantErrorText", () => { + const makeAssistantError = (errorMessage: string): AssistantMessage => + ({ + stopReason: "error", + errorMessage, + }) as AssistantMessage; + + it("returns a friendly message for context overflow", () => { + const msg = makeAssistantError("request_too_large"); + expect(formatAssistantErrorText(msg)).toContain("Context overflow"); + }); + + it("returns a friendly message for Anthropic role ordering", () => { + const msg = makeAssistantError( + 'messages: roles must alternate between "user" and "assistant"', + ); + expect(formatAssistantErrorText(msg)).toContain( + "Message ordering conflict", + ); + }); + + it("returns a friendly message for Anthropic overload errors", () => { + const msg = makeAssistantError( + '{"type":"error","error":{"details":null,"type":"overloaded_error","message":"Overloaded"},"request_id":"req_123"}', + ); + expect(formatAssistantErrorText(msg)).toBe( + "The AI service is temporarily overloaded. Please try again in a moment.", + ); + }); +}); + +describe("sanitizeToolCallId", () => { + it("keeps valid tool call IDs", () => { + expect(sanitizeToolCallId("call_abc-123")).toBe("call_abc-123"); + }); + + it("replaces invalid characters with underscores", () => { + expect(sanitizeToolCallId("call_abc|item:456")).toBe("call_abc_item_456"); + }); + + it("returns default for empty IDs", () => { + expect(sanitizeToolCallId("")).toBe("default_tool_id"); + }); +}); + +describe("sanitizeGoogleTurnOrdering", () => { + it("prepends a synthetic user turn when history starts with assistant", () => { + const input = [ + { + role: "assistant", + content: [ + { type: "toolCall", id: "call_1", name: "exec", arguments: {} }, + ], + }, + ] satisfies AgentMessage[]; + + const out = sanitizeGoogleTurnOrdering(input); + expect(out[0]?.role).toBe("user"); + expect(out[1]?.role).toBe("assistant"); + }); + + it("is a no-op when history starts with user", () => { + const input = [{ role: "user", content: "hi" }] satisfies AgentMessage[]; + const out = sanitizeGoogleTurnOrdering(input); + expect(out).toBe(input); + }); +}); + +describe("sanitizeSessionMessagesImages", () => { + it("removes empty assistant text blocks but preserves tool calls", async () => { + const input = [ + { + role: "assistant", + content: [ + { type: "text", text: "" }, + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + ], + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test"); + + expect(out).toHaveLength(1); + const content = (out[0] as { content?: unknown }).content; + expect(Array.isArray(content)).toBe(true); + expect(content).toHaveLength(1); + expect((content as Array<{ type?: string }>)[0]?.type).toBe("toolCall"); + }); + + it("sanitizes tool ids for assistant blocks and tool results when enabled", async () => { + const input = [ + { + role: "assistant", + content: [ + { type: "toolUse", id: "call_abc|item:123", name: "test", input: {} }, + { + type: "toolCall", + id: "call_abc|item:456", + name: "exec", + arguments: {}, + }, + ], + }, + { + role: "toolResult", + toolUseId: "call_abc|item:123", + content: [{ type: "text", text: "ok" }], + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test", { + sanitizeToolCallIds: true, + }); + + const assistant = out[0] as { content?: Array<{ id?: string }> }; + expect(assistant.content?.[0]?.id).toBe("call_abc_item_123"); + expect(assistant.content?.[1]?.id).toBe("call_abc_item_456"); + + const toolResult = out[1] as { toolUseId?: string }; + expect(toolResult.toolUseId).toBe("call_abc_item_123"); + }); + + it("filters whitespace-only assistant text blocks", async () => { + const input = [ + { + role: "assistant", + content: [ + { type: "text", text: " " }, + { type: "text", text: "ok" }, + ], + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test"); + + expect(out).toHaveLength(1); + const content = (out[0] as { content?: unknown }).content; + expect(Array.isArray(content)).toBe(true); + expect(content).toHaveLength(1); + expect((content as Array<{ text?: string }>)[0]?.text).toBe("ok"); + }); + + it("drops assistant messages that only contain empty text", async () => { + const input = [ + { role: "user", content: "hello" }, + { role: "assistant", content: [{ type: "text", text: "" }] }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test"); + + expect(out).toHaveLength(1); + expect(out[0]?.role).toBe("user"); + }); + + it("drops empty assistant error messages", async () => { + const input = [ + { role: "user", content: "hello" }, + { role: "assistant", stopReason: "error", content: [] }, + { role: "assistant", stopReason: "error" }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test"); + + expect(out).toHaveLength(1); + expect(out[0]?.role).toBe("user"); + }); + + it("leaves non-assistant messages unchanged", async () => { + const input = [ + { role: "user", content: "hello" }, + { + role: "toolResult", + toolCallId: "tool-1", + content: [{ type: "text", text: "result" }], + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test"); + + expect(out).toHaveLength(2); + expect(out[0]?.role).toBe("user"); + expect(out[1]?.role).toBe("toolResult"); + }); + + it("keeps tool call + tool result IDs unchanged by default", async () => { + const input = [ + { + role: "assistant", + content: [ + { + type: "toolCall", + id: "call_123|fc_456", + name: "read", + arguments: { path: "package.json" }, + }, + ], + }, + { + role: "toolResult", + toolCallId: "call_123|fc_456", + toolName: "read", + content: [{ type: "text", text: "ok" }], + isError: false, + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test"); + + const assistant = out[0] as unknown as { role?: string; content?: unknown }; + expect(assistant.role).toBe("assistant"); + expect(Array.isArray(assistant.content)).toBe(true); + const toolCall = ( + assistant.content as Array<{ type?: string; id?: string }> + ).find((b) => b.type === "toolCall"); + expect(toolCall?.id).toBe("call_123|fc_456"); + + const toolResult = out[1] as unknown as { + role?: string; + toolCallId?: string; + }; + expect(toolResult.role).toBe("toolResult"); + expect(toolResult.toolCallId).toBe("call_123|fc_456"); + }); + + it("sanitizes tool call + tool result IDs when enabled", async () => { + const input = [ + { + role: "assistant", + content: [ + { + type: "toolCall", + id: "call_123|fc_456", + name: "read", + arguments: { path: "package.json" }, + }, + ], + }, + { + role: "toolResult", + toolCallId: "call_123|fc_456", + toolName: "read", + content: [{ type: "text", text: "ok" }], + isError: false, + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test", { + sanitizeToolCallIds: true, + }); + + const assistant = out[0] as unknown as { role?: string; content?: unknown }; + expect(assistant.role).toBe("assistant"); + expect(Array.isArray(assistant.content)).toBe(true); + const toolCall = ( + assistant.content as Array<{ type?: string; id?: string }> + ).find((b) => b.type === "toolCall"); + expect(toolCall?.id).toBe("call_123_fc_456"); + + const toolResult = out[1] as unknown as { + role?: string; + toolCallId?: string; + }; + expect(toolResult.role).toBe("toolResult"); + expect(toolResult.toolCallId).toBe("call_123_fc_456"); + }); + + it("drops assistant blocks after a tool call when enforceToolCallLast is enabled", async () => { + const input = [ + { + role: "assistant", + content: [ + { type: "text", text: "before" }, + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + { type: "thinking", thinking: "after", thinkingSignature: "sig" }, + { type: "text", text: "after text" }, + ], + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test", { + enforceToolCallLast: true, + }); + const assistant = out[0] as { content?: Array<{ type?: string }> }; + expect(assistant.content?.map((b) => b.type)).toEqual(["text", "toolCall"]); + }); + + it("keeps assistant blocks after a tool call when enforceToolCallLast is disabled", async () => { + const input = [ + { + role: "assistant", + content: [ + { type: "text", text: "before" }, + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + { type: "thinking", thinking: "after", thinkingSignature: "sig" }, + { type: "text", text: "after text" }, + ], + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test"); + const assistant = out[0] as { content?: Array<{ type?: string }> }; + expect(assistant.content?.map((b) => b.type)).toEqual([ + "text", + "toolCall", + "thinking", + "text", + ]); + }); +}); + +describe("normalizeTextForComparison", () => { + it("lowercases text", () => { + expect(normalizeTextForComparison("Hello World")).toBe("hello world"); + }); + + it("trims whitespace", () => { + expect(normalizeTextForComparison(" hello ")).toBe("hello"); + }); + + it("collapses multiple spaces", () => { + expect(normalizeTextForComparison("hello world")).toBe("hello world"); + }); + + it("strips emoji", () => { + expect(normalizeTextForComparison("Hello πŸ‘‹ World 🌍")).toBe("hello world"); + }); + + it("handles mixed normalization", () => { + expect(normalizeTextForComparison(" Hello πŸ‘‹ WORLD 🌍 ")).toBe( + "hello world", + ); + }); +}); + +describe("stripThoughtSignatures", () => { + it("returns non-array content unchanged", () => { + expect(stripThoughtSignatures("hello")).toBe("hello"); + expect(stripThoughtSignatures(null)).toBe(null); + expect(stripThoughtSignatures(undefined)).toBe(undefined); + expect(stripThoughtSignatures(123)).toBe(123); + }); + + it("removes msg_-prefixed thought_signature from content blocks", () => { + const input = [ + { type: "text", text: "hello", thought_signature: "msg_abc123" }, + { type: "thinking", thinking: "test", thought_signature: "AQID" }, + ]; + const result = stripThoughtSignatures(input); + + expect(result).toHaveLength(2); + expect(result[0]).toEqual({ type: "text", text: "hello" }); + expect(result[1]).toEqual({ + type: "thinking", + thinking: "test", + thought_signature: "AQID", + }); + expect("thought_signature" in result[0]).toBe(false); + expect("thought_signature" in result[1]).toBe(true); + }); + + it("preserves blocks without thought_signature", () => { + const input = [ + { type: "text", text: "hello" }, + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + ]; + const result = stripThoughtSignatures(input); + + expect(result).toEqual(input); + }); + + it("handles mixed blocks with and without thought_signature", () => { + const input = [ + { type: "text", text: "hello", thought_signature: "msg_abc" }, + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + { type: "thinking", thinking: "hmm", thought_signature: "msg_xyz" }, + ]; + const result = stripThoughtSignatures(input); + + expect(result).toEqual([ + { type: "text", text: "hello" }, + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + { type: "thinking", thinking: "hmm" }, + ]); + }); + + it("handles empty array", () => { + expect(stripThoughtSignatures([])).toEqual([]); + }); + + it("handles null/undefined blocks in array", () => { + const input = [null, undefined, { type: "text", text: "hello" }]; + const result = stripThoughtSignatures(input); + expect(result).toEqual([null, undefined, { type: "text", text: "hello" }]); + }); + + it("strips non-base64 thought signatures when configured for Gemini", () => { + const input = [ + { type: "text", text: "hello", thought_signature: "msg_abc123" }, + { type: "thinking", thinking: "ok", thought_signature: "c2ln" }, + { type: "toolCall", id: "call_1", thoughtSignature: '{"id":1}' }, + { type: "toolCall", id: "call_2", thoughtSignature: "c2ln" }, + ]; + + const result = stripThoughtSignatures(input, { + allowBase64Only: true, + includeCamelCase: true, + }); + + expect(result).toEqual([ + { type: "text", text: "hello" }, + { type: "thinking", thinking: "ok", thought_signature: "c2ln" }, + { type: "toolCall", id: "call_1" }, + { type: "toolCall", id: "call_2", thoughtSignature: "c2ln" }, + ]); + }); +}); + +describe("sanitizeSessionMessagesImages - thought_signature stripping", () => { + it("strips msg_-prefixed thought_signature from assistant message content blocks", async () => { + const input = [ + { + role: "assistant", + content: [ + { type: "text", text: "hello", thought_signature: "msg_abc123" }, + { + type: "thinking", + thinking: "reasoning", + thought_signature: "AQID", + }, + ], + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionMessagesImages(input, "test"); + + expect(out).toHaveLength(1); + const content = (out[0] as { content?: unknown[] }).content; + expect(content).toHaveLength(2); + expect("thought_signature" in ((content?.[0] ?? {}) as object)).toBe(false); + expect( + (content?.[1] as { thought_signature?: unknown })?.thought_signature, + ).toBe("AQID"); + }); +}); + +describe("isMessagingToolDuplicate", () => { + it("returns false for empty sentTexts", () => { + expect(isMessagingToolDuplicate("hello world", [])).toBe(false); + }); + + it("returns false for short texts", () => { + expect(isMessagingToolDuplicate("short", ["short"])).toBe(false); + }); + + it("detects exact duplicates", () => { + expect( + isMessagingToolDuplicate("Hello, this is a test message!", [ + "Hello, this is a test message!", + ]), + ).toBe(true); + }); + + it("detects duplicates with different casing", () => { + expect( + isMessagingToolDuplicate("HELLO, THIS IS A TEST MESSAGE!", [ + "hello, this is a test message!", + ]), + ).toBe(true); + }); + + it("detects duplicates with emoji variations", () => { + expect( + isMessagingToolDuplicate("Hello! πŸ‘‹ This is a test message!", [ + "Hello! This is a test message!", + ]), + ).toBe(true); + }); + + it("detects substring duplicates (LLM elaboration)", () => { + expect( + isMessagingToolDuplicate( + 'I sent the message: "Hello, this is a test message!"', + ["Hello, this is a test message!"], + ), + ).toBe(true); + }); + + it("detects when sent text contains block reply (reverse substring)", () => { + expect( + isMessagingToolDuplicate("Hello, this is a test message!", [ + 'I sent the message: "Hello, this is a test message!"', + ]), + ).toBe(true); + }); + + it("returns false for non-matching texts", () => { + expect( + isMessagingToolDuplicate("This is completely different content.", [ + "Hello, this is a test message!", + ]), + ).toBe(false); + }); +}); diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts index 48beb95a2..693c2f901 100644 --- a/src/agents/pi-embedded-helpers.ts +++ b/src/agents/pi-embedded-helpers.ts @@ -1,55 +1,995 @@ -export { - buildBootstrapContextFiles, - DEFAULT_BOOTSTRAP_MAX_CHARS, - ensureSessionHeader, - resolveBootstrapMaxChars, - stripThoughtSignatures, -} from "./pi-embedded-helpers/bootstrap.js"; -export { - classifyFailoverReason, - formatRawAssistantErrorForUi, - formatAssistantErrorText, - getApiErrorPayloadFingerprint, - isAuthAssistantError, - isAuthErrorMessage, - isBillingAssistantError, - parseApiErrorInfo, - sanitizeUserFacingText, - isBillingErrorMessage, - isCloudCodeAssistFormatError, - isCompactionFailureError, - isContextOverflowError, - isFailoverAssistantError, - isFailoverErrorMessage, - isOverloadedErrorMessage, - isRawApiErrorPayload, - isRateLimitAssistantError, - isRateLimitErrorMessage, - isTimeoutErrorMessage, -} from "./pi-embedded-helpers/errors.js"; -export { - downgradeGeminiHistory, - downgradeGeminiThinkingBlocks, - isGoogleModelApi, - sanitizeGoogleTurnOrdering, -} from "./pi-embedded-helpers/google.js"; -export { - isEmptyAssistantMessageContent, - sanitizeSessionMessagesImages, -} from "./pi-embedded-helpers/images.js"; -export { - isMessagingToolDuplicate, - isMessagingToolDuplicateNormalized, - normalizeTextForComparison, -} from "./pi-embedded-helpers/messaging-dedupe.js"; +import fs from "node:fs/promises"; +import path from "node:path"; -export { pickFallbackThinkingLevel } from "./pi-embedded-helpers/thinking.js"; +import type { + AgentMessage, + AgentToolResult, +} from "@mariozechner/pi-agent-core"; +import type { AssistantMessage } from "@mariozechner/pi-ai"; +import { + normalizeThinkLevel, + type ThinkLevel, +} from "../auto-reply/thinking.js"; +import type { ClawdbotConfig } from "../config/config.js"; +import { formatSandboxToolPolicyBlockedMessage } from "./sandbox.js"; +import { + isValidCloudCodeAssistToolId, + sanitizeToolCallId, + sanitizeToolCallIdsForCloudCodeAssist, +} from "./tool-call-id.js"; +import { sanitizeContentBlocksImages } from "./tool-images.js"; +import type { WorkspaceBootstrapFile } from "./workspace.js"; -export { - mergeConsecutiveUserTurns, - validateAnthropicTurns, - validateGeminiTurns, -} from "./pi-embedded-helpers/turns.js"; -export type { EmbeddedContextFile, FailoverReason } from "./pi-embedded-helpers/types.js"; +export type EmbeddedContextFile = { path: string; content: string }; -export { isValidCloudCodeAssistToolId, sanitizeToolCallId } from "./tool-call-id.js"; +// ── Cross-provider thought_signature sanitization ────────────────────────────── +// Claude's extended thinking feature generates thought_signature fields (message IDs +// like "msg_abc123...") in content blocks. When these are sent to Google's Gemini API, +// it expects Base64-encoded bytes and rejects Claude's format with a 400 error. +// This function strips thought_signature fields to enable cross-provider session sharing. + +type ContentBlockWithSignature = { + thought_signature?: unknown; + thoughtSignature?: unknown; + [key: string]: unknown; +}; + +type ThoughtSignatureSanitizeOptions = { + allowBase64Only?: boolean; + includeCamelCase?: boolean; +}; + +function isBase64Signature(value: string): boolean { + const trimmed = value.trim(); + if (!trimmed) return false; + const compact = trimmed.replace(/\s+/g, ""); + if (!/^[A-Za-z0-9+/=_-]+$/.test(compact)) return false; + const isUrl = compact.includes("-") || compact.includes("_"); + try { + const buf = Buffer.from(compact, isUrl ? "base64url" : "base64"); + if (buf.length === 0) return false; + const encoded = buf.toString(isUrl ? "base64url" : "base64"); + const normalize = (input: string) => input.replace(/=+$/g, ""); + return normalize(encoded) === normalize(compact); + } catch { + return false; + } +} + +/** + * Strips Claude-style thought_signature fields from content blocks. + * + * Gemini expects thought signatures as base64-encoded bytes, but Claude stores message ids + * like "msg_abc123...". We only strip "msg_*" to preserve any provider-valid signatures. + */ +export function stripThoughtSignatures( + content: T, + options?: ThoughtSignatureSanitizeOptions, +): T { + if (!Array.isArray(content)) return content; + const allowBase64Only = options?.allowBase64Only ?? false; + const includeCamelCase = options?.includeCamelCase ?? false; + const shouldStripSignature = (value: unknown): boolean => { + if (!allowBase64Only) { + return typeof value === "string" && value.startsWith("msg_"); + } + return typeof value !== "string" || !isBase64Signature(value); + }; + return content.map((block) => { + if (!block || typeof block !== "object") return block; + const rec = block as ContentBlockWithSignature; + const stripSnake = shouldStripSignature(rec.thought_signature); + const stripCamel = includeCamelCase + ? shouldStripSignature(rec.thoughtSignature) + : false; + if (!stripSnake && !stripCamel) { + return block; + } + const next = { ...rec }; + if (stripSnake) delete next.thought_signature; + if (stripCamel) delete next.thoughtSignature; + return next; + }) as T; +} + +export const DEFAULT_BOOTSTRAP_MAX_CHARS = 20_000; +const BOOTSTRAP_HEAD_RATIO = 0.7; +const BOOTSTRAP_TAIL_RATIO = 0.2; + +type TrimBootstrapResult = { + content: string; + truncated: boolean; + maxChars: number; + originalLength: number; +}; + +export function resolveBootstrapMaxChars(cfg?: ClawdbotConfig): number { + const raw = cfg?.agents?.defaults?.bootstrapMaxChars; + if (typeof raw === "number" && Number.isFinite(raw) && raw > 0) { + return Math.floor(raw); + } + return DEFAULT_BOOTSTRAP_MAX_CHARS; +} + +function trimBootstrapContent( + content: string, + fileName: string, + maxChars: number, +): TrimBootstrapResult { + const trimmed = content.trimEnd(); + if (trimmed.length <= maxChars) { + return { + content: trimmed, + truncated: false, + maxChars, + originalLength: trimmed.length, + }; + } + + const headChars = Math.max(1, Math.floor(maxChars * BOOTSTRAP_HEAD_RATIO)); + const tailChars = Math.max(1, Math.floor(maxChars * BOOTSTRAP_TAIL_RATIO)); + const head = trimmed.slice(0, headChars); + const tail = trimmed.slice(-tailChars); + const contentWithMarker = [ + head, + "", + `[...truncated, read ${fileName} for full content...]`, + "", + tail, + ].join("\n"); + return { + content: contentWithMarker, + truncated: true, + maxChars, + originalLength: trimmed.length, + }; +} + +export async function ensureSessionHeader(params: { + sessionFile: string; + sessionId: string; + cwd: string; +}) { + const file = params.sessionFile; + try { + await fs.stat(file); + return; + } catch { + // create + } + await fs.mkdir(path.dirname(file), { recursive: true }); + const sessionVersion = 2; + const entry = { + type: "session", + version: sessionVersion, + id: params.sessionId, + timestamp: new Date().toISOString(), + cwd: params.cwd, + }; + await fs.writeFile(file, `${JSON.stringify(entry)}\n`, "utf-8"); +} + +type ContentBlock = AgentToolResult["content"][number]; + +export function isEmptyAssistantMessageContent( + message: Extract, +): boolean { + const content = message.content; + if (content == null) return true; + if (!Array.isArray(content)) return false; + return content.every((block) => { + if (!block || typeof block !== "object") return true; + const rec = block as { type?: unknown; text?: unknown }; + if (rec.type !== "text") return false; + return typeof rec.text !== "string" || rec.text.trim().length === 0; + }); +} + +function isEmptyAssistantErrorMessage( + message: Extract, +): boolean { + if (message.stopReason !== "error") return false; + return isEmptyAssistantMessageContent(message); +} + +export async function sanitizeSessionMessagesImages( + messages: AgentMessage[], + label: string, + options?: { + sanitizeToolCallIds?: boolean; + enforceToolCallLast?: boolean; + sanitizeThoughtSignatures?: ThoughtSignatureSanitizeOptions; + }, +): Promise { + // We sanitize historical session messages because Anthropic can reject a request + // if the transcript contains oversized base64 images (see MAX_IMAGE_DIMENSION_PX). + const sanitizedIds = options?.sanitizeToolCallIds + ? sanitizeToolCallIdsForCloudCodeAssist(messages) + : messages; + const base = sanitizedIds; + const out: AgentMessage[] = []; + for (const msg of base) { + if (!msg || typeof msg !== "object") { + out.push(msg); + continue; + } + + const role = (msg as { role?: unknown }).role; + if (role === "toolResult") { + const toolMsg = msg as Extract; + const content = Array.isArray(toolMsg.content) ? toolMsg.content : []; + const nextContent = (await sanitizeContentBlocksImages( + content as ContentBlock[], + label, + )) as unknown as typeof toolMsg.content; + out.push({ ...toolMsg, content: nextContent }); + continue; + } + + if (role === "user") { + const userMsg = msg as Extract; + const content = userMsg.content; + if (Array.isArray(content)) { + const nextContent = (await sanitizeContentBlocksImages( + content as unknown as ContentBlock[], + label, + )) as unknown as typeof userMsg.content; + out.push({ ...userMsg, content: nextContent }); + continue; + } + } + + if (role === "assistant") { + const assistantMsg = msg as Extract; + if (isEmptyAssistantErrorMessage(assistantMsg)) { + continue; + } + const content = assistantMsg.content; + if (Array.isArray(content)) { + // Strip thought_signature fields to enable cross-provider session sharing + const strippedContent = stripThoughtSignatures( + content, + options?.sanitizeThoughtSignatures, + ); + const filteredContent = strippedContent.filter((block) => { + if (!block || typeof block !== "object") return true; + const rec = block as { type?: unknown; text?: unknown }; + if (rec.type !== "text" || typeof rec.text !== "string") return true; + return rec.text.trim().length > 0; + }); + const normalizedContent = options?.enforceToolCallLast + ? (() => { + let lastToolIndex = -1; + for (let i = filteredContent.length - 1; i >= 0; i -= 1) { + const block = filteredContent[i]; + if (!block || typeof block !== "object") continue; + const type = (block as { type?: unknown }).type; + if ( + type === "functionCall" || + type === "toolUse" || + type === "toolCall" + ) { + lastToolIndex = i; + break; + } + } + if (lastToolIndex === -1) return filteredContent; + return filteredContent.slice(0, lastToolIndex + 1); + })() + : filteredContent; + const finalContent = (await sanitizeContentBlocksImages( + normalizedContent as unknown as ContentBlock[], + label, + )) as unknown as typeof assistantMsg.content; + if (finalContent.length === 0) { + continue; + } + out.push({ ...assistantMsg, content: finalContent }); + continue; + } + } + + out.push(msg); + } + return out; +} + +const GOOGLE_TURN_ORDER_BOOTSTRAP_TEXT = "(session bootstrap)"; + +export function isGoogleModelApi(api?: string | null): boolean { + return ( + api === "google-gemini-cli" || + api === "google-generative-ai" || + api === "google-antigravity" + ); +} + +export function sanitizeGoogleTurnOrdering( + messages: AgentMessage[], +): AgentMessage[] { + const first = messages[0] as + | { role?: unknown; content?: unknown } + | undefined; + const role = first?.role; + const content = first?.content; + if ( + role === "user" && + typeof content === "string" && + content.trim() === GOOGLE_TURN_ORDER_BOOTSTRAP_TEXT + ) { + return messages; + } + if (role !== "assistant") return messages; + + // Cloud Code Assist rejects histories that begin with a model turn (tool call or text). + // Prepend a tiny synthetic user turn so the rest of the transcript can be used. + const bootstrap: AgentMessage = { + role: "user", + content: GOOGLE_TURN_ORDER_BOOTSTRAP_TEXT, + timestamp: Date.now(), + } as AgentMessage; + + return [bootstrap, ...messages]; +} + +export function buildBootstrapContextFiles( + files: WorkspaceBootstrapFile[], + opts?: { warn?: (message: string) => void; maxChars?: number }, +): EmbeddedContextFile[] { + const maxChars = opts?.maxChars ?? DEFAULT_BOOTSTRAP_MAX_CHARS; + const result: EmbeddedContextFile[] = []; + for (const file of files) { + if (file.missing) { + result.push({ + path: file.name, + content: `[MISSING] Expected at: ${file.path}`, + }); + continue; + } + const trimmed = trimBootstrapContent( + file.content ?? "", + file.name, + maxChars, + ); + if (!trimmed.content) continue; + if (trimmed.truncated) { + opts?.warn?.( + `workspace bootstrap file ${file.name} is ${trimmed.originalLength} chars (limit ${trimmed.maxChars}); truncating in injected context`, + ); + } + result.push({ + path: file.name, + content: trimmed.content, + }); + } + return result; +} + +export function isContextOverflowError(errorMessage?: string): boolean { + if (!errorMessage) return false; + const lower = errorMessage.toLowerCase(); + return ( + lower.includes("request_too_large") || + lower.includes("request exceeds the maximum size") || + lower.includes("context length exceeded") || + lower.includes("maximum context length") || + lower.includes("prompt is too long") || + lower.includes("context overflow") || + (lower.includes("413") && lower.includes("too large")) + ); +} + +export function isCompactionFailureError(errorMessage?: string): boolean { + if (!errorMessage) return false; + if (!isContextOverflowError(errorMessage)) return false; + const lower = errorMessage.toLowerCase(); + return ( + lower.includes("summarization failed") || + lower.includes("auto-compaction") || + lower.includes("compaction failed") || + lower.includes("compaction") + ); +} + +export function formatAssistantErrorText( + msg: AssistantMessage, + opts?: { cfg?: ClawdbotConfig; sessionKey?: string }, +): string | undefined { + if (msg.stopReason !== "error") return undefined; + const raw = (msg.errorMessage ?? "").trim(); + if (!raw) return "LLM request failed with an unknown error."; + + const unknownTool = + raw.match(/unknown tool[:\s]+["']?([a-z0-9_-]+)["']?/i) ?? + raw.match( + /tool\s+["']?([a-z0-9_-]+)["']?\s+(?:not found|is not available)/i, + ); + if (unknownTool?.[1]) { + const rewritten = formatSandboxToolPolicyBlockedMessage({ + cfg: opts?.cfg, + sessionKey: opts?.sessionKey, + toolName: unknownTool[1], + }); + if (rewritten) return rewritten; + } + + // Check for context overflow (413) errors + if (isContextOverflowError(raw)) { + return ( + "Context overflow: prompt too large for the model. " + + "Try again with less input or a larger-context model." + ); + } + + // Check for role ordering errors (Anthropic 400 "Incorrect role information") + // This typically happens when consecutive user messages are sent without + // an assistant response between them, often due to steering/queueing timing. + if (/incorrect role information|roles must alternate/i.test(raw)) { + return ( + "Message ordering conflict - please try again. " + + "If this persists, use /new to start a fresh session." + ); + } + + const invalidRequest = raw.match( + /"type":"invalid_request_error".*?"message":"([^"]+)"/, + ); + if (invalidRequest?.[1]) { + return `LLM request rejected: ${invalidRequest[1]}`; + } + + // Check for overloaded errors (Anthropic API capacity) + if (isOverloadedErrorMessage(raw)) { + return "The AI service is temporarily overloaded. Please try again in a moment."; + } + + // Keep it short for WhatsApp. + return raw.length > 600 ? `${raw.slice(0, 600)}…` : raw; +} + +export function isRateLimitAssistantError( + msg: AssistantMessage | undefined, +): boolean { + if (!msg || msg.stopReason !== "error") return false; + return isRateLimitErrorMessage(msg.errorMessage ?? ""); +} + +type ErrorPattern = RegExp | string; + +const ERROR_PATTERNS = { + rateLimit: [ + /rate[_ ]limit|too many requests|429/, + "exceeded your current quota", + "resource has been exhausted", + "quota exceeded", + "resource_exhausted", + "usage limit", + ], + overloaded: [ + /overloaded_error|"type"\s*:\s*"overloaded_error"/i, + "overloaded", + ], + timeout: [ + "timeout", + "timed out", + "deadline exceeded", + "context deadline exceeded", + ], + billing: [ + /\b402\b/, + "payment required", + "insufficient credits", + "credit balance", + "plans & billing", + ], + auth: [ + /invalid[_ ]?api[_ ]?key/, + "incorrect api key", + "invalid token", + "authentication", + "unauthorized", + "forbidden", + "access denied", + "expired", + "token has expired", + /\b401\b/, + /\b403\b/, + // Credential validation failures should trigger fallback (#761) + "no credentials found", + "no api key found", + ], + format: [ + "invalid_request_error", + "string should match pattern", + "tool_use.id", + "tool_use_id", + "messages.1.content.1.tool_use.id", + "invalid request format", + ], +} as const; + +function matchesErrorPatterns( + raw: string, + patterns: readonly ErrorPattern[], +): boolean { + if (!raw) return false; + const value = raw.toLowerCase(); + return patterns.some((pattern) => + pattern instanceof RegExp ? pattern.test(value) : value.includes(pattern), + ); +} + +export function isRateLimitErrorMessage(raw: string): boolean { + return matchesErrorPatterns(raw, ERROR_PATTERNS.rateLimit); +} + +export function isTimeoutErrorMessage(raw: string): boolean { + return matchesErrorPatterns(raw, ERROR_PATTERNS.timeout); +} + +export function isBillingErrorMessage(raw: string): boolean { + const value = raw.toLowerCase(); + if (!value) return false; + if (matchesErrorPatterns(value, ERROR_PATTERNS.billing)) return true; + return ( + value.includes("billing") && + (value.includes("upgrade") || + value.includes("credits") || + value.includes("payment") || + value.includes("plan")) + ); +} + +export function isBillingAssistantError( + msg: AssistantMessage | undefined, +): boolean { + if (!msg || msg.stopReason !== "error") return false; + return isBillingErrorMessage(msg.errorMessage ?? ""); +} + +export function isAuthErrorMessage(raw: string): boolean { + return matchesErrorPatterns(raw, ERROR_PATTERNS.auth); +} + +export function isOverloadedErrorMessage(raw: string): boolean { + return matchesErrorPatterns(raw, ERROR_PATTERNS.overloaded); +} + +export function isCloudCodeAssistFormatError(raw: string): boolean { + return matchesErrorPatterns(raw, ERROR_PATTERNS.format); +} + +export function isAuthAssistantError( + msg: AssistantMessage | undefined, +): boolean { + if (!msg || msg.stopReason !== "error") return false; + return isAuthErrorMessage(msg.errorMessage ?? ""); +} + +export type FailoverReason = + | "auth" + | "format" + | "rate_limit" + | "billing" + | "timeout" + | "unknown"; + +export function classifyFailoverReason(raw: string): FailoverReason | null { + if (isRateLimitErrorMessage(raw)) return "rate_limit"; + if (isOverloadedErrorMessage(raw)) return "rate_limit"; // Treat overloaded as rate limit for failover + if (isCloudCodeAssistFormatError(raw)) return "format"; + if (isBillingErrorMessage(raw)) return "billing"; + if (isTimeoutErrorMessage(raw)) return "timeout"; + if (isAuthErrorMessage(raw)) return "auth"; + return null; +} + +export function isFailoverErrorMessage(raw: string): boolean { + return classifyFailoverReason(raw) !== null; +} + +export function isFailoverAssistantError( + msg: AssistantMessage | undefined, +): boolean { + if (!msg || msg.stopReason !== "error") return false; + return isFailoverErrorMessage(msg.errorMessage ?? ""); +} + +function extractSupportedValues(raw: string): string[] { + const match = + raw.match(/supported values are:\s*([^\n.]+)/i) ?? + raw.match(/supported values:\s*([^\n.]+)/i); + if (!match?.[1]) return []; + const fragment = match[1]; + const quoted = Array.from(fragment.matchAll(/['"]([^'"]+)['"]/g)).map( + (entry) => entry[1]?.trim(), + ); + if (quoted.length > 0) { + return quoted.filter((entry): entry is string => Boolean(entry)); + } + return fragment + .split(/,|\band\b/gi) + .map((entry) => entry.replace(/^[^a-zA-Z]+|[^a-zA-Z]+$/g, "").trim()) + .filter(Boolean); +} + +export function pickFallbackThinkingLevel(params: { + message?: string; + attempted: Set; +}): ThinkLevel | undefined { + const raw = params.message?.trim(); + if (!raw) return undefined; + const supported = extractSupportedValues(raw); + if (supported.length === 0) return undefined; + for (const entry of supported) { + const normalized = normalizeThinkLevel(entry); + if (!normalized) continue; + if (params.attempted.has(normalized)) continue; + return normalized; + } + return undefined; +} + +/** + * Validates and fixes conversation turn sequences for Gemini API. + * Gemini requires strict alternating userβ†’assistantβ†’toolβ†’user pattern. + * This function: + * 1. Detects consecutive messages from the same role + * 2. Merges consecutive assistant messages together + * 3. Preserves metadata (usage, stopReason, etc.) + * + * This prevents the "function call turn comes immediately after a user turn or after a function response turn" error. + */ +export function validateGeminiTurns(messages: AgentMessage[]): AgentMessage[] { + if (!Array.isArray(messages) || messages.length === 0) { + return messages; + } + + const result: AgentMessage[] = []; + let lastRole: string | undefined; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + result.push(msg); + continue; + } + + const msgRole = (msg as { role?: unknown }).role as string | undefined; + if (!msgRole) { + result.push(msg); + continue; + } + + // Check if this message has the same role as the last one + if (msgRole === lastRole && lastRole === "assistant") { + // Merge consecutive assistant messages + const lastMsg = result[result.length - 1]; + const currentMsg = msg as Extract; + + if (lastMsg && typeof lastMsg === "object") { + const lastAsst = lastMsg as Extract< + AgentMessage, + { role: "assistant" } + >; + + // Merge content blocks + const mergedContent = [ + ...(Array.isArray(lastAsst.content) ? lastAsst.content : []), + ...(Array.isArray(currentMsg.content) ? currentMsg.content : []), + ]; + + // Preserve metadata from the later message (more recent) + const merged: Extract = { + ...lastAsst, + content: mergedContent, + // Take timestamps, usage, stopReason from the newer message if present + ...(currentMsg.usage && { usage: currentMsg.usage }), + ...(currentMsg.stopReason && { stopReason: currentMsg.stopReason }), + ...(currentMsg.errorMessage && { + errorMessage: currentMsg.errorMessage, + }), + }; + + // Replace the last message with merged version + result[result.length - 1] = merged; + continue; + } + } + + // Not a consecutive duplicate, add normally + result.push(msg); + lastRole = msgRole; + } + + return result; +} + +export function mergeConsecutiveUserTurns( + previous: Extract, + current: Extract, +): Extract { + const mergedContent = [ + ...(Array.isArray(previous.content) ? previous.content : []), + ...(Array.isArray(current.content) ? current.content : []), + ]; + + // Preserve newest metadata while backfilling timestamp if the latest is missing. + return { + ...current, // newest wins for metadata + content: mergedContent, + timestamp: current.timestamp ?? previous.timestamp, + }; +} + +/** + * Validates and fixes conversation turn sequences for Anthropic API. + * Anthropic requires strict alternating userβ†’assistant pattern. + * This function: + * 1. Detects consecutive user messages + * 2. Merges consecutive user messages together + * 3. Preserves timestamps from the later message + * + * This prevents the "400 Incorrect role information" error that occurs + * when steering messages are injected during streaming and create + * consecutive user messages. + */ +export function validateAnthropicTurns( + messages: AgentMessage[], +): AgentMessage[] { + if (!Array.isArray(messages) || messages.length === 0) { + return messages; + } + + const result: AgentMessage[] = []; + let lastRole: string | undefined; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + result.push(msg); + continue; + } + + const msgRole = (msg as { role?: unknown }).role as string | undefined; + if (!msgRole) { + result.push(msg); + continue; + } + + // Check if this message has the same role as the last one + if (msgRole === lastRole && lastRole === "user") { + // Merge consecutive user messages. Base on the newest message so we keep + // fresh metadata (attachments, timestamps, future fields) while + // appending prior content. + const lastMsg = result[result.length - 1]; + const currentMsg = msg as Extract; + + if (lastMsg && typeof lastMsg === "object") { + const lastUser = lastMsg as Extract; + const merged = mergeConsecutiveUserTurns(lastUser, currentMsg); + + // Replace the last message with merged version + result[result.length - 1] = merged; + continue; + } + } + + // Not a consecutive duplicate, add normally + result.push(msg); + lastRole = msgRole; + } + + return result; +} + +// ── Messaging tool duplicate detection ────────────────────────────────────── +// When the agent uses a messaging tool (telegram, discord, slack, message, sessions_send) +// to send a message, we track the text so we can suppress duplicate block replies. +// The LLM sometimes elaborates or wraps the same content, so we use substring matching. + +const MIN_DUPLICATE_TEXT_LENGTH = 10; + +/** + * Normalize text for duplicate comparison. + * - Trims whitespace + * - Lowercases + * - Strips emoji (Emoji_Presentation and Extended_Pictographic) + * - Collapses multiple spaces to single space + */ +export function normalizeTextForComparison(text: string): string { + return text + .trim() + .toLowerCase() + .replace(/\p{Emoji_Presentation}|\p{Extended_Pictographic}/gu, "") + .replace(/\s+/g, " ") + .trim(); +} + +export function isMessagingToolDuplicateNormalized( + normalized: string, + normalizedSentTexts: string[], +): boolean { + if (normalizedSentTexts.length === 0) return false; + if (!normalized || normalized.length < MIN_DUPLICATE_TEXT_LENGTH) + return false; + return normalizedSentTexts.some((normalizedSent) => { + if (!normalizedSent || normalizedSent.length < MIN_DUPLICATE_TEXT_LENGTH) + return false; + return ( + normalized.includes(normalizedSent) || normalizedSent.includes(normalized) + ); + }); +} + +/** + * Check if a text is a duplicate of any previously sent messaging tool text. + * Uses substring matching to handle LLM elaboration (e.g., wrapping in quotes, + * adding context, or slight rephrasing that includes the original). + */ +// ── Tool Call ID Sanitization (Google Cloud Code Assist) ─────────────────────── +// Google Cloud Code Assist rejects tool call IDs that contain invalid characters. +// OpenAI Codex generates IDs like "call_abc123|item_456" with pipe characters, +// but Google requires IDs matching ^[a-zA-Z0-9_-]+$ pattern. +// This function sanitizes tool call IDs by replacing invalid characters with underscores. +export { sanitizeToolCallId, isValidCloudCodeAssistToolId }; + +export function isMessagingToolDuplicate( + text: string, + sentTexts: string[], +): boolean { + if (sentTexts.length === 0) return false; + const normalized = normalizeTextForComparison(text); + if (!normalized || normalized.length < MIN_DUPLICATE_TEXT_LENGTH) + return false; + return isMessagingToolDuplicateNormalized( + normalized, + sentTexts.map(normalizeTextForComparison), + ); +} + +/** + * Downgrades tool calls that are missing `thought_signature` (required by Gemini) + * into text representations, to prevent 400 INVALID_ARGUMENT errors. + * Also converts corresponding tool results into user messages. + */ +type GeminiToolCallBlock = { + type?: unknown; + thought_signature?: unknown; + thoughtSignature?: unknown; + id?: unknown; + toolCallId?: unknown; + name?: unknown; + toolName?: unknown; + arguments?: unknown; + input?: unknown; +}; + +export function downgradeGeminiHistory( + messages: AgentMessage[], +): AgentMessage[] { + const downgradedIds = new Set(); + const out: AgentMessage[] = []; + + const resolveToolResultId = ( + msg: Extract, + ): string | undefined => { + const toolCallId = (msg as { toolCallId?: unknown }).toolCallId; + if (typeof toolCallId === "string" && toolCallId) return toolCallId; + const toolUseId = (msg as { toolUseId?: unknown }).toolUseId; + if (typeof toolUseId === "string" && toolUseId) return toolUseId; + return undefined; + }; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + out.push(msg); + continue; + } + + const role = (msg as { role?: unknown }).role; + + if (role === "assistant") { + const assistantMsg = msg as Extract; + if (!Array.isArray(assistantMsg.content)) { + out.push(msg); + continue; + } + + let hasDowngraded = false; + const newContent = assistantMsg.content.map((block) => { + if (!block || typeof block !== "object") return block; + const blockRecord = block as GeminiToolCallBlock; + const type = blockRecord.type; + + // Check for tool calls / function calls + if ( + type === "toolCall" || + type === "functionCall" || + type === "toolUse" + ) { + // Check if thought_signature is missing + // Note: TypeScript doesn't know about thought_signature on standard types + const signature = + blockRecord.thought_signature ?? blockRecord.thoughtSignature; + const hasSignature = Boolean(signature); + + if (!hasSignature) { + const id = + typeof blockRecord.id === "string" + ? blockRecord.id + : typeof blockRecord.toolCallId === "string" + ? blockRecord.toolCallId + : undefined; + const name = + typeof blockRecord.name === "string" + ? blockRecord.name + : typeof blockRecord.toolName === "string" + ? blockRecord.toolName + : undefined; + const args = + blockRecord.arguments !== undefined + ? blockRecord.arguments + : blockRecord.input; + + if (id) downgradedIds.add(id); + hasDowngraded = true; + + const argsText = + typeof args === "string" ? args : JSON.stringify(args, null, 2); + + return { + type: "text", + text: `[Tool Call: ${name ?? "unknown"}${ + id ? ` (ID: ${id})` : "" + }]\nArguments: ${argsText}`, + }; + } + } + return block; + }); + + if (hasDowngraded) { + out.push({ ...assistantMsg, content: newContent } as AgentMessage); + } else { + out.push(msg); + } + continue; + } + + if (role === "toolResult") { + const toolMsg = msg as Extract; + const toolResultId = resolveToolResultId(toolMsg); + if (toolResultId && downgradedIds.has(toolResultId)) { + // Convert to User message + let textContent = ""; + if (Array.isArray(toolMsg.content)) { + textContent = toolMsg.content + .map((entry) => { + if (entry && typeof entry === "object") { + const text = (entry as { text?: unknown }).text; + if (typeof text === "string") return text; + } + return JSON.stringify(entry); + }) + .join("\n"); + } else { + textContent = JSON.stringify(toolMsg.content); + } + + out.push({ + role: "user", + content: [ + { + type: "text", + text: `[Tool Result for ID ${toolResultId}]\n${textContent}`, + }, + ], + } as AgentMessage); + + continue; + } + } + + out.push(msg); + } + return out; +} diff --git a/src/agents/pi-embedded-helpers/bootstrap.ts b/src/agents/pi-embedded-helpers/bootstrap.ts index 3e17e35e4..8f49f612e 100644 --- a/src/agents/pi-embedded-helpers/bootstrap.ts +++ b/src/agents/pi-embedded-helpers/bootstrap.ts @@ -9,26 +9,65 @@ import type { EmbeddedContextFile } from "./types.js"; type ContentBlockWithSignature = { thought_signature?: unknown; + thoughtSignature?: unknown; [key: string]: unknown; }; +type ThoughtSignatureSanitizeOptions = { + allowBase64Only?: boolean; + includeCamelCase?: boolean; +}; + +function isBase64Signature(value: string): boolean { + const trimmed = value.trim(); + if (!trimmed) return false; + const compact = trimmed.replace(/\s+/g, ""); + if (!/^[A-Za-z0-9+/=_-]+$/.test(compact)) return false; + const isUrl = compact.includes("-") || compact.includes("_"); + try { + const buf = Buffer.from(compact, isUrl ? "base64url" : "base64"); + if (buf.length === 0) return false; + const encoded = buf.toString(isUrl ? "base64url" : "base64"); + const normalize = (input: string) => input.replace(/=+$/g, ""); + return normalize(encoded) === normalize(compact); + } catch { + return false; + } +} + /** * Strips Claude-style thought_signature fields from content blocks. * * Gemini expects thought signatures as base64-encoded bytes, but Claude stores message ids * like "msg_abc123...". We only strip "msg_*" to preserve any provider-valid signatures. */ -export function stripThoughtSignatures(content: T): T { +export function stripThoughtSignatures( + content: T, + options?: ThoughtSignatureSanitizeOptions, +): T { if (!Array.isArray(content)) return content; + const allowBase64Only = options?.allowBase64Only ?? false; + const includeCamelCase = options?.includeCamelCase ?? false; + const shouldStripSignature = (value: unknown): boolean => { + if (!allowBase64Only) { + return typeof value === "string" && value.startsWith("msg_"); + } + return typeof value !== "string" || !isBase64Signature(value); + }; return content.map((block) => { if (!block || typeof block !== "object") return block; const rec = block as ContentBlockWithSignature; - const signature = rec.thought_signature; - if (typeof signature !== "string" || !signature.startsWith("msg_")) { + const stripSnake = shouldStripSignature(rec.thought_signature); + const stripCamel = includeCamelCase + ? shouldStripSignature(rec.thoughtSignature) + : false; + if (!stripSnake && !stripCamel) { return block; } - const { thought_signature: _signature, ...rest } = rec; - return rest; + const next = { ...rec }; + if (stripSnake) delete next.thought_signature; + if (stripCamel) delete next.thoughtSignature; + return next; }) as T; } diff --git a/src/agents/pi-embedded-helpers/google.ts b/src/agents/pi-embedded-helpers/google.ts index 91443e34b..a5fdee75a 100644 --- a/src/agents/pi-embedded-helpers/google.ts +++ b/src/agents/pi-embedded-helpers/google.ts @@ -23,6 +23,7 @@ export { sanitizeGoogleTurnOrdering }; type GeminiToolCallBlock = { type?: unknown; thought_signature?: unknown; + thoughtSignature?: unknown; id?: unknown; toolCallId?: unknown; name?: unknown; @@ -118,7 +119,8 @@ export function downgradeGeminiHistory(messages: AgentMessage[]): AgentMessage[] const blockRecord = block as GeminiToolCallBlock; const type = blockRecord.type; if (type === "toolCall" || type === "functionCall" || type === "toolUse") { - const hasSignature = Boolean(blockRecord.thought_signature); + const signature = blockRecord.thought_signature ?? blockRecord.thoughtSignature; + const hasSignature = Boolean(signature); if (!hasSignature) { const id = typeof blockRecord.id === "string" diff --git a/src/agents/pi-embedded-helpers/images.ts b/src/agents/pi-embedded-helpers/images.ts index bb1ddb260..a53c27a00 100644 --- a/src/agents/pi-embedded-helpers/images.ts +++ b/src/agents/pi-embedded-helpers/images.ts @@ -34,6 +34,10 @@ export async function sanitizeSessionMessagesImages( sanitizeToolCallIds?: boolean; enforceToolCallLast?: boolean; preserveSignatures?: boolean; + sanitizeThoughtSignatures?: { + allowBase64Only?: boolean; + includeCamelCase?: boolean; + }; }, ): Promise { // We sanitize historical session messages because Anthropic can reject a request @@ -82,7 +86,7 @@ export async function sanitizeSessionMessagesImages( if (Array.isArray(content)) { const strippedContent = options?.preserveSignatures ? content // Keep signatures for Antigravity Claude - : stripThoughtSignatures(content); // Strip for Gemini + : stripThoughtSignatures(content, options?.sanitizeThoughtSignatures); // Strip for Gemini const filteredContent = strippedContent.filter((block) => { if (!block || typeof block !== "object") return true; diff --git a/src/agents/pi-embedded-runner.google-sanitize-thinking.test.ts b/src/agents/pi-embedded-runner.google-sanitize-thinking.test.ts index 17a58230d..ccda8b578 100644 --- a/src/agents/pi-embedded-runner.google-sanitize-thinking.test.ts +++ b/src/agents/pi-embedded-runner.google-sanitize-thinking.test.ts @@ -145,6 +145,44 @@ describe("sanitizeSessionHistory (google thinking)", () => { expect(assistant.content?.[1]?.text).toBe("internal note"); }); + it("strips non-base64 thought signatures for OpenRouter Gemini", async () => { + const sessionManager = SessionManager.inMemory(); + const input = [ + { + role: "user", + content: "hi", + }, + { + role: "assistant", + content: [ + { type: "text", text: "hello", thought_signature: "msg_abc123" }, + { type: "thinking", thinking: "ok", thought_signature: "c2ln" }, + { type: "toolCall", id: "call_1", thoughtSignature: "{\"id\":1}" }, + { type: "toolCall", id: "call_2", thoughtSignature: "c2ln" }, + ], + }, + ] satisfies AgentMessage[]; + + const out = await sanitizeSessionHistory({ + messages: input, + modelApi: "openrouter", + provider: "openrouter", + modelId: "google/gemini-1.5-pro", + sessionManager, + sessionId: "session:openrouter-gemini", + }); + + const assistant = out.find((msg) => (msg as { role?: string }).role === "assistant") as { + content?: Array<{ type?: string; thought_signature?: string; thoughtSignature?: string }>; + }; + expect(assistant.content).toEqual([ + { type: "text", text: "hello" }, + { type: "thinking", thinking: "ok", thought_signature: "c2ln" }, + { type: "toolCall", id: "call_1" }, + { type: "toolCall", id: "call_2", thoughtSignature: "c2ln" }, + ]); + }); + it("downgrades only unsigned thinking blocks when mixed with signed ones", async () => { const sessionManager = SessionManager.inMemory(); const input = [ diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index bdebd0005..0eccb7447 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -1,27 +1,2307 @@ -export type { MessagingToolSend } from "./pi-embedded-messaging.js"; -export { compactEmbeddedPiSession } from "./pi-embedded-runner/compact.js"; -export { applyExtraParamsToAgent, resolveExtraParams } from "./pi-embedded-runner/extra-params.js"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; -export { applyGoogleTurnOrderingFix } from "./pi-embedded-runner/google.js"; -export { - getDmHistoryLimitFromSessionKey, - limitHistoryTurns, -} from "./pi-embedded-runner/history.js"; -export { resolveEmbeddedSessionLane } from "./pi-embedded-runner/lanes.js"; -export { runEmbeddedPiAgent } from "./pi-embedded-runner/run.js"; -export { - abortEmbeddedPiRun, - isEmbeddedPiRunActive, - isEmbeddedPiRunStreaming, - queueEmbeddedPiMessage, - waitForEmbeddedPiRunEnd, -} from "./pi-embedded-runner/runs.js"; -export { buildEmbeddedSandboxInfo } from "./pi-embedded-runner/sandbox-info.js"; -export { createSystemPromptOverride } from "./pi-embedded-runner/system-prompt.js"; -export { splitSdkTools } from "./pi-embedded-runner/tool-split.js"; -export type { - EmbeddedPiAgentMeta, - EmbeddedPiCompactResult, - EmbeddedPiRunMeta, - EmbeddedPiRunResult, -} from "./pi-embedded-runner/types.js"; +import type { + AgentMessage, + AgentTool, + StreamFn, + ThinkingLevel, +} from "@mariozechner/pi-agent-core"; +import type { + Api, + AssistantMessage, + ImageContent, + Model, + SimpleStreamOptions, +} from "@mariozechner/pi-ai"; +import { streamSimple } from "@mariozechner/pi-ai"; +import { + createAgentSession, + discoverAuthStorage, + discoverModels, + SessionManager, + SettingsManager, +} from "@mariozechner/pi-coding-agent"; +import { resolveHeartbeatPrompt } from "../auto-reply/heartbeat.js"; +import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js"; +import type { + ReasoningLevel, + ThinkLevel, + VerboseLevel, +} from "../auto-reply/thinking.js"; +import { formatToolAggregate } from "../auto-reply/tool-meta.js"; +import { isCacheEnabled, resolveCacheTtlMs } from "../config/cache-utils.js"; +import { resolveChannelCapabilities } from "../config/channel-capabilities.js"; +import type { ClawdbotConfig } from "../config/config.js"; +import { getMachineDisplayName } from "../infra/machine-name.js"; +import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js"; +import { createSubsystemLogger } from "../logging.js"; +import { + type enqueueCommand, + enqueueCommandInLane, +} from "../process/command-queue.js"; +import { normalizeMessageChannel } from "../utils/message-channel.js"; +import { isReasoningTagProvider } from "../utils/provider-utils.js"; +import { resolveUserPath } from "../utils.js"; +import { resolveClawdbotAgentDir } from "./agent-paths.js"; +import { resolveSessionAgentIds } from "./agent-scope.js"; +import { + markAuthProfileFailure, + markAuthProfileGood, + markAuthProfileUsed, +} from "./auth-profiles.js"; +import type { ExecElevatedDefaults, ExecToolDefaults } from "./bash-tools.js"; +import { + CONTEXT_WINDOW_HARD_MIN_TOKENS, + CONTEXT_WINDOW_WARN_BELOW_TOKENS, + evaluateContextWindowGuard, + resolveContextWindowInfo, +} from "./context-window-guard.js"; +import { + DEFAULT_CONTEXT_TOKENS, + DEFAULT_MODEL, + DEFAULT_PROVIDER, +} from "./defaults.js"; +import { FailoverError, resolveFailoverStatus } from "./failover-error.js"; +import { + ensureAuthProfileStore, + getApiKeyForModel, + resolveAuthProfileOrder, + resolveModelAuthMode, +} from "./model-auth.js"; +import { normalizeModelCompat } from "./model-compat.js"; +import { ensureClawdbotModelsJson } from "./models-config.js"; +import type { MessagingToolSend } from "./pi-embedded-messaging.js"; +import { + ensurePiCompactionReserveTokens, + resolveCompactionReserveTokensFloor, +} from "./pi-settings.js"; +import { acquireSessionWriteLock } from "./session-write-lock.js"; + +export type { MessagingToolSend } from "./pi-embedded-messaging.js"; + +import { + buildBootstrapContextFiles, + classifyFailoverReason, + downgradeGeminiHistory, + type EmbeddedContextFile, + ensureSessionHeader, + formatAssistantErrorText, + isAuthAssistantError, + isCloudCodeAssistFormatError, + isCompactionFailureError, + isContextOverflowError, + isFailoverAssistantError, + isFailoverErrorMessage, + isGoogleModelApi, + isRateLimitAssistantError, + isTimeoutErrorMessage, + pickFallbackThinkingLevel, + resolveBootstrapMaxChars, + sanitizeGoogleTurnOrdering, + sanitizeSessionMessagesImages, + validateAnthropicTurns, + validateGeminiTurns, +} from "./pi-embedded-helpers.js"; +import { + type BlockReplyChunking, + subscribeEmbeddedPiSession, +} from "./pi-embedded-subscribe.js"; +import { + extractAssistantText, + extractAssistantThinking, + formatReasoningMessage, +} from "./pi-embedded-utils.js"; +import { setContextPruningRuntime } from "./pi-extensions/context-pruning/runtime.js"; +import { computeEffectiveSettings } from "./pi-extensions/context-pruning/settings.js"; +import { makeToolPrunablePredicate } from "./pi-extensions/context-pruning/tools.js"; +import { toToolDefinitions } from "./pi-tool-definition-adapter.js"; +import { createClawdbotCodingTools } from "./pi-tools.js"; +import { resolveSandboxContext } from "./sandbox.js"; +import { guardSessionManager } from "./session-tool-result-guard-wrapper.js"; +import { sanitizeToolUseResultPairing } from "./session-transcript-repair.js"; +import { + applySkillEnvOverrides, + applySkillEnvOverridesFromSnapshot, + loadWorkspaceSkillEntries, + resolveSkillsPromptForRun, + type SkillSnapshot, +} from "./skills.js"; +import { buildAgentSystemPrompt } from "./system-prompt.js"; +import { buildToolSummaryMap } from "./tool-summaries.js"; +import { normalizeUsage, type UsageLike } from "./usage.js"; +import { + filterBootstrapFilesForSession, + loadWorkspaceBootstrapFiles, +} from "./workspace.js"; + +// Optional features can be implemented as Pi extensions that run in the same Node process. + +/** + * Resolve provider-specific extraParams from model config. + * Auto-enables thinking mode for GLM-4.x models unless explicitly disabled. + * + * For ZAI GLM-4.x models, we auto-enable thinking via the Z.AI Cloud API format: + * thinking: { type: "enabled", clear_thinking: boolean } + * + * - GLM-4.7: Preserved thinking (clear_thinking: false) - reasoning kept across turns + * - GLM-4.5/4.6: Interleaved thinking (clear_thinking: true) - reasoning cleared each turn + * + * Users can override via config: + * agents.defaults.models["zai/glm-4.7"].params.thinking = { type: "disabled" } + * + * Or disable via runtime flag: --thinking off + * + * @see https://docs.z.ai/guides/capabilities/thinking-mode + * @internal Exported for testing only + */ +export function resolveExtraParams(params: { + cfg: ClawdbotConfig | undefined; + provider: string; + modelId: string; + thinkLevel?: string; +}): Record | undefined { + const modelKey = `${params.provider}/${params.modelId}`; + const modelConfig = params.cfg?.agents?.defaults?.models?.[modelKey]; + let extraParams = modelConfig?.params ? { ...modelConfig.params } : undefined; + + // Auto-enable thinking for ZAI GLM-4.x models when not explicitly configured + // Skip if user explicitly disabled thinking via --thinking off + if (params.provider === "zai" && params.thinkLevel !== "off") { + const modelIdLower = params.modelId.toLowerCase(); + const isGlm4 = modelIdLower.includes("glm-4"); + + if (isGlm4) { + // Check if user has explicitly configured thinking params + const hasThinkingConfig = extraParams?.thinking !== undefined; + + if (!hasThinkingConfig) { + // GLM-4.7 supports preserved thinking (reasoning kept across turns) + // GLM-4.5/4.6 use interleaved thinking (reasoning cleared each turn) + // Z.AI Cloud API format: thinking: { type: "enabled", clear_thinking: boolean } + const isGlm47 = modelIdLower.includes("glm-4.7"); + const clearThinking = !isGlm47; + + extraParams = { + ...extraParams, + thinking: { + type: "enabled", + clear_thinking: clearThinking, + }, + }; + + log.debug( + `auto-enabled thinking for ${modelKey}: type=enabled, clear_thinking=${clearThinking}`, + ); + } + } + } + + return extraParams; +} + +/** + * Create a wrapped streamFn that injects extra params (like temperature) from config. + * + * @internal + */ +function createStreamFnWithExtraParams( + baseStreamFn: StreamFn | undefined, + extraParams: Record | undefined, +): StreamFn | undefined { + if (!extraParams || Object.keys(extraParams).length === 0) { + return undefined; // No wrapper needed + } + + const streamParams: Partial = {}; + if (typeof extraParams.temperature === "number") { + streamParams.temperature = extraParams.temperature; + } + if (typeof extraParams.maxTokens === "number") { + streamParams.maxTokens = extraParams.maxTokens; + } + + if (Object.keys(streamParams).length === 0) { + return undefined; + } + + log.debug( + `creating streamFn wrapper with params: ${JSON.stringify(streamParams)}`, + ); + + const underlying = baseStreamFn ?? streamSimple; + const wrappedStreamFn: StreamFn = (model, context, options) => + underlying(model, context, { + ...streamParams, + ...options, // Caller options take precedence + }); + + return wrappedStreamFn; +} + +/** + * Apply extra params (like temperature) to an agent's streamFn. + * + * @internal Exported for testing + */ +export function applyExtraParamsToAgent( + agent: { streamFn?: StreamFn }, + cfg: ClawdbotConfig | undefined, + provider: string, + modelId: string, + thinkLevel?: string, +): void { + const extraParams = resolveExtraParams({ + cfg, + provider, + modelId, + thinkLevel, + }); + const wrappedStreamFn = createStreamFnWithExtraParams( + agent.streamFn, + extraParams, + ); + + if (wrappedStreamFn) { + log.debug( + `applying extraParams to agent streamFn for ${provider}/${modelId}`, + ); + agent.streamFn = wrappedStreamFn; + } +} + +// We configure context pruning per-session via a WeakMap registry keyed by the SessionManager instance. + +function resolvePiExtensionPath(id: string): string { + const self = fileURLToPath(import.meta.url); + const dir = path.dirname(self); + // In dev this file is `.ts` (tsx), in production it's `.js`. + const ext = path.extname(self) === ".ts" ? "ts" : "js"; + return path.join(dir, "pi-extensions", `${id}.${ext}`); +} + +function resolveContextWindowTokens(params: { + cfg: ClawdbotConfig | undefined; + provider: string; + modelId: string; + model: Model | undefined; +}): number { + return resolveContextWindowInfo({ + cfg: params.cfg, + provider: params.provider, + modelId: params.modelId, + modelContextWindow: params.model?.contextWindow, + defaultTokens: DEFAULT_CONTEXT_TOKENS, + }).tokens; +} + +function buildContextPruningExtension(params: { + cfg: ClawdbotConfig | undefined; + sessionManager: SessionManager; + provider: string; + modelId: string; + model: Model | undefined; +}): { additionalExtensionPaths?: string[] } { + const raw = params.cfg?.agents?.defaults?.contextPruning; + if (raw?.mode !== "adaptive" && raw?.mode !== "aggressive") return {}; + + const settings = computeEffectiveSettings(raw); + if (!settings) return {}; + + setContextPruningRuntime(params.sessionManager, { + settings, + contextWindowTokens: resolveContextWindowTokens(params), + isToolPrunable: makeToolPrunablePredicate(settings.tools), + }); + + return { + additionalExtensionPaths: [resolvePiExtensionPath("context-pruning")], + }; +} + +function resolveCompactionMode(cfg?: ClawdbotConfig): "default" | "safeguard" { + return cfg?.agents?.defaults?.compaction?.mode === "safeguard" + ? "safeguard" + : "default"; +} + +function buildEmbeddedExtensionPaths(params: { + cfg: ClawdbotConfig | undefined; + sessionManager: SessionManager; + provider: string; + modelId: string; + model: Model | undefined; +}): string[] { + const paths = [resolvePiExtensionPath("transcript-sanitize")]; + if (resolveCompactionMode(params.cfg) === "safeguard") { + paths.push(resolvePiExtensionPath("compaction-safeguard")); + } + const pruning = buildContextPruningExtension(params); + if (pruning.additionalExtensionPaths) { + paths.push(...pruning.additionalExtensionPaths); + } + return paths; +} + +export type EmbeddedPiAgentMeta = { + sessionId: string; + provider: string; + model: string; + usage?: { + input?: number; + output?: number; + cacheRead?: number; + cacheWrite?: number; + total?: number; + }; +}; + +export type EmbeddedPiRunMeta = { + durationMs: number; + agentMeta?: EmbeddedPiAgentMeta; + aborted?: boolean; + error?: { + kind: "context_overflow" | "compaction_failure"; + message: string; + }; +}; + +function buildModelAliasLines(cfg?: ClawdbotConfig) { + const models = cfg?.agents?.defaults?.models ?? {}; + const entries: Array<{ alias: string; model: string }> = []; + for (const [keyRaw, entryRaw] of Object.entries(models)) { + const model = String(keyRaw ?? "").trim(); + if (!model) continue; + const alias = String( + (entryRaw as { alias?: string } | undefined)?.alias ?? "", + ).trim(); + if (!alias) continue; + entries.push({ alias, model }); + } + return entries + .sort((a, b) => a.alias.localeCompare(b.alias)) + .map((entry) => `- ${entry.alias}: ${entry.model}`); +} + +type ApiKeyInfo = { + apiKey: string; + profileId?: string; + source: string; +}; + +export type EmbeddedPiRunResult = { + payloads?: Array<{ + text?: string; + mediaUrl?: string; + mediaUrls?: string[]; + replyToId?: string; + isError?: boolean; + }>; + meta: EmbeddedPiRunMeta; + // True if a messaging tool (telegram, whatsapp, discord, slack, sessions_send) + // successfully sent a message. Used to suppress agent's confirmation text. + didSendViaMessagingTool?: boolean; + // Texts successfully sent via messaging tools during the run. + messagingToolSentTexts?: string[]; + // Messaging tool targets that successfully sent a message during the run. + messagingToolSentTargets?: MessagingToolSend[]; +}; + +export type EmbeddedPiCompactResult = { + ok: boolean; + compacted: boolean; + reason?: string; + result?: { + summary: string; + firstKeptEntryId: string; + tokensBefore: number; + details?: unknown; + }; +}; + +type EmbeddedPiQueueHandle = { + queueMessage: (text: string) => Promise; + isStreaming: () => boolean; + isCompacting: () => boolean; + abort: () => void; +}; + +const log = createSubsystemLogger("agent/embedded"); +const GOOGLE_TURN_ORDERING_CUSTOM_TYPE = "google-turn-ordering-bootstrap"; +const GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS = new Set([ + "patternProperties", + "additionalProperties", + "$schema", + "$id", + "$ref", + "$defs", + "definitions", + "examples", + "minLength", + "maxLength", + "minimum", + "maximum", + "multipleOf", + "pattern", + "format", + "minItems", + "maxItems", + "uniqueItems", + "minProperties", + "maxProperties", +]); + +function findUnsupportedSchemaKeywords( + schema: unknown, + path: string, +): string[] { + if (!schema || typeof schema !== "object") return []; + if (Array.isArray(schema)) { + return schema.flatMap((item, index) => + findUnsupportedSchemaKeywords(item, `${path}[${index}]`), + ); + } + const record = schema as Record; + const violations: string[] = []; + for (const [key, value] of Object.entries(record)) { + if (GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS.has(key)) { + violations.push(`${path}.${key}`); + } + if (value && typeof value === "object") { + violations.push( + ...findUnsupportedSchemaKeywords(value, `${path}.${key}`), + ); + } + } + return violations; +} + +function logToolSchemasForGoogle(params: { + tools: AgentTool[]; + provider: string; +}) { + if ( + params.provider !== "google-antigravity" && + params.provider !== "google-gemini-cli" + ) { + return; + } + const toolNames = params.tools.map((tool, index) => `${index}:${tool.name}`); + log.info("google tool schema snapshot", { + provider: params.provider, + toolCount: params.tools.length, + tools: toolNames, + }); + for (const [index, tool] of params.tools.entries()) { + const violations = findUnsupportedSchemaKeywords( + tool.parameters, + `${tool.name}.parameters`, + ); + if (violations.length > 0) { + log.warn("google tool schema has unsupported keywords", { + index, + tool: tool.name, + violations: violations.slice(0, 12), + violationCount: violations.length, + }); + } + } +} + +registerUnhandledRejectionHandler((reason) => { + const message = describeUnknownError(reason); + if (!isCompactionFailureError(message)) return false; + log.error(`Auto-compaction failed (unhandled): ${message}`); + return true; +}); + +type CustomEntryLike = { type?: unknown; customType?: unknown }; + +function hasGoogleTurnOrderingMarker(sessionManager: SessionManager): boolean { + try { + return sessionManager + .getEntries() + .some( + (entry) => + (entry as CustomEntryLike)?.type === "custom" && + (entry as CustomEntryLike)?.customType === + GOOGLE_TURN_ORDERING_CUSTOM_TYPE, + ); + } catch { + return false; + } +} + +function markGoogleTurnOrderingMarker(sessionManager: SessionManager): void { + try { + sessionManager.appendCustomEntry(GOOGLE_TURN_ORDERING_CUSTOM_TYPE, { + timestamp: Date.now(), + }); + } catch { + // ignore marker persistence failures + } +} + +export function applyGoogleTurnOrderingFix(params: { + messages: AgentMessage[]; + modelApi?: string | null; + sessionManager: SessionManager; + sessionId: string; + warn?: (message: string) => void; +}): { messages: AgentMessage[]; didPrepend: boolean } { + if (!isGoogleModelApi(params.modelApi)) { + return { messages: params.messages, didPrepend: false }; + } + const first = params.messages[0] as + | { role?: unknown; content?: unknown } + | undefined; + if (first?.role !== "assistant") { + return { messages: params.messages, didPrepend: false }; + } + const sanitized = sanitizeGoogleTurnOrdering(params.messages); + const didPrepend = sanitized !== params.messages; + if (didPrepend && !hasGoogleTurnOrderingMarker(params.sessionManager)) { + const warn = params.warn ?? ((message: string) => log.warn(message)); + warn( + `google turn ordering fixup: prepended user bootstrap (sessionId=${params.sessionId})`, + ); + markGoogleTurnOrderingMarker(params.sessionManager); + } + return { messages: sanitized, didPrepend }; +} + +function isGeminiThoughtSignatureModel(params: { + modelApi?: string | null; + provider?: string; + modelId?: string; +}): boolean { + if (isGoogleModelApi(params.modelApi)) return true; + const provider = (params.provider ?? "").toLowerCase(); + if (provider !== "openrouter" && provider !== "opencode") return false; + const modelId = (params.modelId ?? "").toLowerCase(); + return modelId.includes("gemini"); +} + +async function sanitizeSessionHistory(params: { + messages: AgentMessage[]; + modelApi?: string | null; + provider?: string; + modelId?: string; + sessionManager: SessionManager; + sessionId: string; +}): Promise { + const geminiLike = isGeminiThoughtSignatureModel(params); + const sanitizedImages = await sanitizeSessionMessagesImages( + params.messages, + "session:history", + { + sanitizeToolCallIds: isGoogleModelApi(params.modelApi), + enforceToolCallLast: params.modelApi === "anthropic-messages", + sanitizeThoughtSignatures: geminiLike + ? { allowBase64Only: true, includeCamelCase: true } + : undefined, + }, + ); + const repairedTools = sanitizeToolUseResultPairing(sanitizedImages); + + // Downgrade tool calls missing thought_signature if using Gemini + const downgraded = geminiLike + ? downgradeGeminiHistory(repairedTools) + : repairedTools; + + return applyGoogleTurnOrderingFix({ + messages: downgraded, + modelApi: params.modelApi, + sessionManager: params.sessionManager, + sessionId: params.sessionId, + }).messages; +} + +/** + * Limits conversation history to the last N user turns (and their associated + * assistant responses). This reduces token usage for long-running DM sessions. + * + * @param messages - The full message history + * @param limit - Max number of user turns to keep (undefined = no limit) + * @returns Messages trimmed to the last `limit` user turns + */ +export function limitHistoryTurns( + messages: AgentMessage[], + limit: number | undefined, +): AgentMessage[] { + if (!limit || limit <= 0 || messages.length === 0) return messages; + + // Count user messages from the end, find cutoff point + let userCount = 0; + let lastUserIndex = messages.length; + + for (let i = messages.length - 1; i >= 0; i--) { + if (messages[i].role === "user") { + userCount++; + if (userCount > limit) { + // We exceeded the limit; keep from the last valid user turn onwards + return messages.slice(lastUserIndex); + } + lastUserIndex = i; + } + } + // Fewer than limit user turns, keep all + return messages; +} + +/** + * Extracts the provider name and user ID from a session key and looks up + * dmHistoryLimit from the provider config, with per-DM override support. + * + * Session key formats: + * - `telegram:dm:123` β†’ provider = telegram, userId = 123 + * - `agent:main:telegram:dm:123` β†’ provider = telegram, userId = 123 + * + * Resolution order: + * 1. Per-DM override: provider.dms[userId].historyLimit + * 2. Provider default: provider.dmHistoryLimit + */ +export function getDmHistoryLimitFromSessionKey( + sessionKey: string | undefined, + config: ClawdbotConfig | undefined, +): number | undefined { + if (!sessionKey || !config) return undefined; + + const parts = sessionKey.split(":").filter(Boolean); + // Handle agent-prefixed keys: agent:::... + const providerParts = + parts.length >= 3 && parts[0] === "agent" ? parts.slice(2) : parts; + + const provider = providerParts[0]?.toLowerCase(); + if (!provider) return undefined; + + // Extract userId: format is provider:dm:userId or provider:dm:userId:... + // The userId may contain colons (e.g., email addresses), so join remaining parts + const kind = providerParts[1]?.toLowerCase(); + const userId = providerParts.slice(2).join(":"); + if (kind !== "dm") return undefined; + + // Helper to get limit with per-DM override support + const getLimit = ( + providerConfig: + | { + dmHistoryLimit?: number; + dms?: Record; + } + | undefined, + ): number | undefined => { + if (!providerConfig) return undefined; + // Check per-DM override first + if ( + userId && + kind === "dm" && + providerConfig.dms?.[userId]?.historyLimit !== undefined + ) { + return providerConfig.dms[userId].historyLimit; + } + // Fall back to provider default + return providerConfig.dmHistoryLimit; + }; + + // Map provider to config key + switch (provider) { + case "telegram": + return getLimit(config.channels?.telegram); + case "whatsapp": + return getLimit(config.channels?.whatsapp); + case "discord": + return getLimit(config.channels?.discord); + case "slack": + return getLimit(config.channels?.slack); + case "signal": + return getLimit(config.channels?.signal); + case "imessage": + return getLimit(config.channels?.imessage); + case "msteams": + return getLimit(config.channels?.msteams); + default: + return undefined; + } +} + +const ACTIVE_EMBEDDED_RUNS = new Map(); +type EmbeddedRunWaiter = { + resolve: (ended: boolean) => void; + timer: NodeJS.Timeout; +}; +const EMBEDDED_RUN_WAITERS = new Map>(); + +// ============================================================================ +// SessionManager Pre-warming Cache +// ============================================================================ + +type SessionManagerCacheEntry = { + sessionFile: string; + loadedAt: number; +}; + +const SESSION_MANAGER_CACHE = new Map(); +const DEFAULT_SESSION_MANAGER_TTL_MS = 45_000; // 45 seconds + +function getSessionManagerTtl(): number { + return resolveCacheTtlMs({ + envValue: process.env.CLAWDBOT_SESSION_MANAGER_CACHE_TTL_MS, + defaultTtlMs: DEFAULT_SESSION_MANAGER_TTL_MS, + }); +} + +function isSessionManagerCacheEnabled(): boolean { + return isCacheEnabled(getSessionManagerTtl()); +} + +function trackSessionManagerAccess(sessionFile: string): void { + if (!isSessionManagerCacheEnabled()) return; + const now = Date.now(); + SESSION_MANAGER_CACHE.set(sessionFile, { + sessionFile, + loadedAt: now, + }); +} + +function isSessionManagerCached(sessionFile: string): boolean { + if (!isSessionManagerCacheEnabled()) return false; + const entry = SESSION_MANAGER_CACHE.get(sessionFile); + if (!entry) return false; + const now = Date.now(); + const ttl = getSessionManagerTtl(); + return now - entry.loadedAt <= ttl; +} + +async function prewarmSessionFile(sessionFile: string): Promise { + if (!isSessionManagerCacheEnabled()) return; + if (isSessionManagerCached(sessionFile)) return; + + try { + // Read a small chunk to encourage OS page cache warmup. + const handle = await fs.open(sessionFile, "r"); + try { + const buffer = Buffer.alloc(4096); + await handle.read(buffer, 0, buffer.length, 0); + } finally { + await handle.close(); + } + trackSessionManagerAccess(sessionFile); + } catch { + // File doesn't exist yet, SessionManager will create it + } +} + +const isAbortError = (err: unknown): boolean => { + if (!err || typeof err !== "object") return false; + const name = "name" in err ? String(err.name) : ""; + if (name === "AbortError") return true; + const message = + "message" in err && typeof err.message === "string" + ? err.message.toLowerCase() + : ""; + return message.includes("aborted"); +}; + +type EmbeddedSandboxInfo = { + enabled: boolean; + workspaceDir?: string; + workspaceAccess?: "none" | "ro" | "rw"; + agentWorkspaceMount?: string; + browserControlUrl?: string; + browserNoVncUrl?: string; + hostBrowserAllowed?: boolean; + allowedControlUrls?: string[]; + allowedControlHosts?: string[]; + allowedControlPorts?: number[]; + elevated?: { + allowed: boolean; + defaultLevel: "on" | "off"; + }; +}; + +function resolveSessionLane(key: string) { + const cleaned = key.trim() || "main"; + return cleaned.startsWith("session:") ? cleaned : `session:${cleaned}`; +} + +function resolveGlobalLane(lane?: string) { + const cleaned = lane?.trim(); + return cleaned ? cleaned : "main"; +} + +function resolveUserTimezone(configured?: string): string { + const trimmed = configured?.trim(); + if (trimmed) { + try { + new Intl.DateTimeFormat("en-US", { timeZone: trimmed }).format( + new Date(), + ); + return trimmed; + } catch { + // ignore invalid timezone + } + } + const host = Intl.DateTimeFormat().resolvedOptions().timeZone; + return host?.trim() || "UTC"; +} + +function formatUserTime(date: Date, timeZone: string): string | undefined { + try { + const parts = new Intl.DateTimeFormat("en-CA", { + timeZone, + weekday: "long", + year: "numeric", + month: "2-digit", + day: "2-digit", + hour: "2-digit", + minute: "2-digit", + hourCycle: "h23", + }).formatToParts(date); + const map: Record = {}; + for (const part of parts) { + if (part.type !== "literal") map[part.type] = part.value; + } + if ( + !map.weekday || + !map.year || + !map.month || + !map.day || + !map.hour || + !map.minute + ) { + return undefined; + } + return `${map.weekday} ${map.year}-${map.month}-${map.day} ${map.hour}:${map.minute}`; + } catch { + return undefined; + } +} + +function describeUnknownError(error: unknown): string { + if (error instanceof Error) return error.message; + if (typeof error === "string") return error; + try { + const serialized = JSON.stringify(error); + return serialized ?? "Unknown error"; + } catch { + return "Unknown error"; + } +} + +export function buildEmbeddedSandboxInfo( + sandbox?: Awaited>, + execElevated?: ExecElevatedDefaults, +): EmbeddedSandboxInfo | undefined { + if (!sandbox?.enabled) return undefined; + const elevatedAllowed = Boolean( + execElevated?.enabled && execElevated.allowed, + ); + return { + enabled: true, + workspaceDir: sandbox.workspaceDir, + workspaceAccess: sandbox.workspaceAccess, + agentWorkspaceMount: + sandbox.workspaceAccess === "ro" ? "/agent" : undefined, + browserControlUrl: sandbox.browser?.controlUrl, + browserNoVncUrl: sandbox.browser?.noVncUrl, + hostBrowserAllowed: sandbox.browserAllowHostControl, + allowedControlUrls: sandbox.browserAllowedControlUrls, + allowedControlHosts: sandbox.browserAllowedControlHosts, + allowedControlPorts: sandbox.browserAllowedControlPorts, + ...(elevatedAllowed + ? { + elevated: { + allowed: true, + defaultLevel: execElevated?.defaultLevel ?? "off", + }, + } + : {}), + }; +} + +function buildEmbeddedSystemPrompt(params: { + workspaceDir: string; + defaultThinkLevel?: ThinkLevel; + reasoningLevel?: ReasoningLevel; + extraSystemPrompt?: string; + ownerNumbers?: string[]; + reasoningTagHint: boolean; + heartbeatPrompt?: string; + skillsPrompt?: string; + runtimeInfo: { + host: string; + os: string; + arch: string; + node: string; + model: string; + provider?: string; + capabilities?: string[]; + }; + sandboxInfo?: EmbeddedSandboxInfo; + tools: AgentTool[]; + modelAliasLines: string[]; + userTimezone: string; + userTime?: string; + contextFiles?: EmbeddedContextFile[]; +}): string { + return buildAgentSystemPrompt({ + workspaceDir: params.workspaceDir, + defaultThinkLevel: params.defaultThinkLevel, + reasoningLevel: params.reasoningLevel, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + reasoningTagHint: params.reasoningTagHint, + heartbeatPrompt: params.heartbeatPrompt, + skillsPrompt: params.skillsPrompt, + runtimeInfo: params.runtimeInfo, + sandboxInfo: params.sandboxInfo, + toolNames: params.tools.map((tool) => tool.name), + toolSummaries: buildToolSummaryMap(params.tools), + modelAliasLines: params.modelAliasLines, + userTimezone: params.userTimezone, + userTime: params.userTime, + contextFiles: params.contextFiles, + }); +} + +export function createSystemPromptOverride( + systemPrompt: string, +): (defaultPrompt: string) => string { + const trimmed = systemPrompt.trim(); + return () => trimmed; +} + +// We always pass tools via `customTools` so our policy filtering, sandbox integration, +// and extended toolset remain consistent across providers. + +type AnyAgentTool = AgentTool; + +export function splitSdkTools(options: { + tools: AnyAgentTool[]; + sandboxEnabled: boolean; +}): { + builtInTools: AnyAgentTool[]; + customTools: ReturnType; +} { + // Always pass all tools as customTools so the SDK doesn't "helpfully" swap in + // its own built-in implementations (we need our tool wrappers + policy). + const { tools } = options; + return { + builtInTools: [], + customTools: toToolDefinitions(tools), + }; +} + +export function queueEmbeddedPiMessage( + sessionId: string, + text: string, +): boolean { + const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); + if (!handle) return false; + if (!handle.isStreaming()) return false; + if (handle.isCompacting()) return false; + void handle.queueMessage(text); + return true; +} + +export function abortEmbeddedPiRun(sessionId: string): boolean { + const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); + if (!handle) return false; + handle.abort(); + return true; +} + +export function isEmbeddedPiRunActive(sessionId: string): boolean { + return ACTIVE_EMBEDDED_RUNS.has(sessionId); +} + +export function isEmbeddedPiRunStreaming(sessionId: string): boolean { + const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); + if (!handle) return false; + return handle.isStreaming(); +} + +export function waitForEmbeddedPiRunEnd( + sessionId: string, + timeoutMs = 15_000, +): Promise { + if (!sessionId || !ACTIVE_EMBEDDED_RUNS.has(sessionId)) + return Promise.resolve(true); + return new Promise((resolve) => { + const waiters = EMBEDDED_RUN_WAITERS.get(sessionId) ?? new Set(); + const waiter: EmbeddedRunWaiter = { + resolve, + timer: setTimeout( + () => { + waiters.delete(waiter); + if (waiters.size === 0) EMBEDDED_RUN_WAITERS.delete(sessionId); + resolve(false); + }, + Math.max(100, timeoutMs), + ), + }; + waiters.add(waiter); + EMBEDDED_RUN_WAITERS.set(sessionId, waiters); + if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) { + waiters.delete(waiter); + if (waiters.size === 0) EMBEDDED_RUN_WAITERS.delete(sessionId); + clearTimeout(waiter.timer); + resolve(true); + } + }); +} + +function notifyEmbeddedRunEnded(sessionId: string) { + const waiters = EMBEDDED_RUN_WAITERS.get(sessionId); + if (!waiters || waiters.size === 0) return; + EMBEDDED_RUN_WAITERS.delete(sessionId); + for (const waiter of waiters) { + clearTimeout(waiter.timer); + waiter.resolve(true); + } +} + +export function resolveEmbeddedSessionLane(key: string) { + return resolveSessionLane(key); +} + +function mapThinkingLevel(level?: ThinkLevel): ThinkingLevel { + // pi-agent-core supports "xhigh"; Clawdbot enables it for specific models. + if (!level) return "off"; + return level; +} + +function resolveExecToolDefaults( + config?: ClawdbotConfig, +): ExecToolDefaults | undefined { + const tools = config?.tools; + if (!tools) return undefined; + if (!tools.exec) return tools.bash; + if (!tools.bash) return tools.exec; + return { ...tools.bash, ...tools.exec }; +} + +function resolveModel( + provider: string, + modelId: string, + agentDir?: string, + cfg?: ClawdbotConfig, +): { + model?: Model; + error?: string; + authStorage: ReturnType; + modelRegistry: ReturnType; +} { + const resolvedAgentDir = agentDir ?? resolveClawdbotAgentDir(); + const authStorage = discoverAuthStorage(resolvedAgentDir); + const modelRegistry = discoverModels(authStorage, resolvedAgentDir); + const model = modelRegistry.find(provider, modelId) as Model | null; + if (!model) { + const providers = cfg?.models?.providers ?? {}; + const inlineModels = + providers[provider]?.models ?? + Object.values(providers) + .flatMap((entry) => entry?.models ?? []) + .map((entry) => ({ ...entry, provider })); + const inlineMatch = inlineModels.find((entry) => entry.id === modelId); + if (inlineMatch) { + const normalized = normalizeModelCompat(inlineMatch as Model); + return { + model: normalized, + authStorage, + modelRegistry, + }; + } + const providerCfg = providers[provider]; + if (providerCfg || modelId.startsWith("mock-")) { + const fallbackModel: Model = normalizeModelCompat({ + id: modelId, + name: modelId, + api: providerCfg?.api ?? "openai-responses", + provider, + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: + providerCfg?.models?.[0]?.contextWindow ?? DEFAULT_CONTEXT_TOKENS, + maxTokens: + providerCfg?.models?.[0]?.maxTokens ?? DEFAULT_CONTEXT_TOKENS, + } as Model); + return { model: fallbackModel, authStorage, modelRegistry }; + } + return { + error: `Unknown model: ${provider}/${modelId}`, + authStorage, + modelRegistry, + }; + } + return { model: normalizeModelCompat(model), authStorage, modelRegistry }; +} + +export async function compactEmbeddedPiSession(params: { + sessionId: string; + sessionKey?: string; + messageChannel?: string; + messageProvider?: string; + agentAccountId?: string; + sessionFile: string; + workspaceDir: string; + agentDir?: string; + config?: ClawdbotConfig; + skillsSnapshot?: SkillSnapshot; + provider?: string; + model?: string; + thinkLevel?: ThinkLevel; + reasoningLevel?: ReasoningLevel; + bashElevated?: ExecElevatedDefaults; + customInstructions?: string; + lane?: string; + enqueue?: typeof enqueueCommand; + extraSystemPrompt?: string; + ownerNumbers?: string[]; +}): Promise { + const sessionLane = resolveSessionLane( + params.sessionKey?.trim() || params.sessionId, + ); + const globalLane = resolveGlobalLane(params.lane); + const enqueueGlobal = + params.enqueue ?? + ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); + return enqueueCommandInLane(sessionLane, () => + enqueueGlobal(async () => { + const resolvedWorkspace = resolveUserPath(params.workspaceDir); + const prevCwd = process.cwd(); + + const provider = + (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; + const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; + const agentDir = params.agentDir ?? resolveClawdbotAgentDir(); + await ensureClawdbotModelsJson(params.config, agentDir); + const { model, error, authStorage, modelRegistry } = resolveModel( + provider, + modelId, + agentDir, + params.config, + ); + if (!model) { + return { + ok: false, + compacted: false, + reason: error ?? `Unknown model: ${provider}/${modelId}`, + }; + } + try { + const apiKeyInfo = await getApiKeyForModel({ + model, + cfg: params.config, + }); + + if (model.provider === "github-copilot") { + const { resolveCopilotApiToken } = await import( + "../providers/github-copilot-token.js" + ); + const copilotToken = await resolveCopilotApiToken({ + githubToken: apiKeyInfo.apiKey, + }); + authStorage.setRuntimeApiKey(model.provider, copilotToken.token); + } else { + authStorage.setRuntimeApiKey(model.provider, apiKeyInfo.apiKey); + } + } catch (err) { + return { + ok: false, + compacted: false, + reason: describeUnknownError(err), + }; + } + + await fs.mkdir(resolvedWorkspace, { recursive: true }); + const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId; + const sandbox = await resolveSandboxContext({ + config: params.config, + sessionKey: sandboxSessionKey, + workspaceDir: resolvedWorkspace, + }); + const effectiveWorkspace = sandbox?.enabled + ? sandbox.workspaceAccess === "rw" + ? resolvedWorkspace + : sandbox.workspaceDir + : resolvedWorkspace; + await fs.mkdir(effectiveWorkspace, { recursive: true }); + await ensureSessionHeader({ + sessionFile: params.sessionFile, + sessionId: params.sessionId, + cwd: effectiveWorkspace, + }); + + let restoreSkillEnv: (() => void) | undefined; + process.chdir(effectiveWorkspace); + try { + const shouldLoadSkillEntries = + !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; + const skillEntries = shouldLoadSkillEntries + ? loadWorkspaceSkillEntries(effectiveWorkspace) + : []; + restoreSkillEnv = params.skillsSnapshot + ? applySkillEnvOverridesFromSnapshot({ + snapshot: params.skillsSnapshot, + config: params.config, + }) + : applySkillEnvOverrides({ + skills: skillEntries ?? [], + config: params.config, + }); + const skillsPrompt = resolveSkillsPromptForRun({ + skillsSnapshot: params.skillsSnapshot, + entries: shouldLoadSkillEntries ? skillEntries : undefined, + config: params.config, + workspaceDir: effectiveWorkspace, + }); + + const bootstrapFiles = filterBootstrapFilesForSession( + await loadWorkspaceBootstrapFiles(effectiveWorkspace), + params.sessionKey ?? params.sessionId, + ); + const sessionLabel = params.sessionKey ?? params.sessionId; + const contextFiles = buildBootstrapContextFiles(bootstrapFiles, { + maxChars: resolveBootstrapMaxChars(params.config), + warn: (message) => + log.warn(`${message} (sessionKey=${sessionLabel})`), + }); + const runAbortController = new AbortController(); + const tools = createClawdbotCodingTools({ + exec: { + ...resolveExecToolDefaults(params.config), + elevated: params.bashElevated, + }, + sandbox, + messageProvider: params.messageChannel ?? params.messageProvider, + agentAccountId: params.agentAccountId, + sessionKey: params.sessionKey ?? params.sessionId, + agentDir, + workspaceDir: effectiveWorkspace, + config: params.config, + abortSignal: runAbortController.signal, + modelProvider: model.provider, + modelId, + modelAuthMode: resolveModelAuthMode(model.provider, params.config), + // No currentChannelId/currentThreadTs for compaction - not in message context + }); + logToolSchemasForGoogle({ tools, provider }); + const machineName = await getMachineDisplayName(); + const runtimeChannel = normalizeMessageChannel( + params.messageChannel ?? params.messageProvider, + ); + const runtimeCapabilities = runtimeChannel + ? (resolveChannelCapabilities({ + cfg: params.config, + channel: runtimeChannel, + accountId: params.agentAccountId, + }) ?? []) + : undefined; + const runtimeInfo = { + host: machineName, + os: `${os.type()} ${os.release()}`, + arch: os.arch(), + node: process.version, + model: `${provider}/${modelId}`, + channel: runtimeChannel, + capabilities: runtimeCapabilities, + }; + const sandboxInfo = buildEmbeddedSandboxInfo( + sandbox, + params.bashElevated, + ); + const reasoningTagHint = isReasoningTagProvider(provider); + const userTimezone = resolveUserTimezone( + params.config?.agents?.defaults?.userTimezone, + ); + const userTime = formatUserTime(new Date(), userTimezone); + // Only include heartbeat prompt for the default agent + const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({ + sessionKey: params.sessionKey, + config: params.config, + }); + const isDefaultAgent = sessionAgentId === defaultAgentId; + const appendPrompt = buildEmbeddedSystemPrompt({ + workspaceDir: effectiveWorkspace, + defaultThinkLevel: params.thinkLevel, + reasoningLevel: params.reasoningLevel ?? "off", + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + reasoningTagHint, + heartbeatPrompt: isDefaultAgent + ? resolveHeartbeatPrompt( + params.config?.agents?.defaults?.heartbeat?.prompt, + ) + : undefined, + skillsPrompt, + runtimeInfo, + sandboxInfo, + tools, + modelAliasLines: buildModelAliasLines(params.config), + userTimezone, + userTime, + contextFiles, + }); + const systemPrompt = createSystemPromptOverride(appendPrompt); + + const sessionLock = await acquireSessionWriteLock({ + sessionFile: params.sessionFile, + }); + try { + // Pre-warm session file to bring it into OS page cache + await prewarmSessionFile(params.sessionFile); + const sessionManager = guardSessionManager( + SessionManager.open(params.sessionFile), + ); + trackSessionManagerAccess(params.sessionFile); + const settingsManager = SettingsManager.create( + effectiveWorkspace, + agentDir, + ); + ensurePiCompactionReserveTokens({ + settingsManager, + minReserveTokens: resolveCompactionReserveTokensFloor( + params.config, + ), + }); + const additionalExtensionPaths = buildEmbeddedExtensionPaths({ + cfg: params.config, + sessionManager, + provider, + modelId, + model, + }); + + const { builtInTools, customTools } = splitSdkTools({ + tools, + sandboxEnabled: !!sandbox?.enabled, + }); + + let session: Awaited< + ReturnType + >["session"]; + ({ session } = await createAgentSession({ + cwd: resolvedWorkspace, + agentDir, + authStorage, + modelRegistry, + model, + thinkingLevel: mapThinkingLevel(params.thinkLevel), + systemPrompt, + tools: builtInTools, + customTools, + sessionManager, + settingsManager, + skills: [], + contextFiles: [], + additionalExtensionPaths, + })); + + // Wire up config-driven model params (e.g., temperature/maxTokens) + applyExtraParamsToAgent( + session.agent, + params.config, + provider, + modelId, + params.thinkLevel, + ); + + try { + const prior = await sanitizeSessionHistory({ + messages: session.messages, + modelApi: model.api, + provider, + modelId, + sessionManager, + sessionId: params.sessionId, + }); + // Validate turn ordering for both Gemini (consecutive assistant) and Anthropic (consecutive user) + const validatedGemini = validateGeminiTurns(prior); + const validated = validateAnthropicTurns(validatedGemini); + const limited = limitHistoryTurns( + validated, + getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), + ); + if (limited.length > 0) { + session.agent.replaceMessages(limited); + } + const result = await session.compact(params.customInstructions); + return { + ok: true, + compacted: true, + result: { + summary: result.summary, + firstKeptEntryId: result.firstKeptEntryId, + tokensBefore: result.tokensBefore, + details: result.details, + }, + }; + } finally { + sessionManager.flushPendingToolResults?.(); + session.dispose(); + } + } finally { + await sessionLock.release(); + } + } catch (err) { + return { + ok: false, + compacted: false, + reason: describeUnknownError(err), + }; + } finally { + restoreSkillEnv?.(); + process.chdir(prevCwd); + } + }), + ); +} + +export async function runEmbeddedPiAgent(params: { + sessionId: string; + sessionKey?: string; + messageChannel?: string; + messageProvider?: string; + agentAccountId?: string; + /** Current channel ID for auto-threading (Slack). */ + currentChannelId?: string; + /** Current thread timestamp for auto-threading (Slack). */ + currentThreadTs?: string; + /** Reply-to mode for Slack auto-threading. */ + replyToMode?: "off" | "first" | "all"; + /** Mutable ref to track if a reply was sent (for "first" mode). */ + hasRepliedRef?: { value: boolean }; + sessionFile: string; + workspaceDir: string; + agentDir?: string; + config?: ClawdbotConfig; + skillsSnapshot?: SkillSnapshot; + prompt: string; + /** Optional image attachments for multimodal messages. */ + images?: ImageContent[]; + provider?: string; + model?: string; + authProfileId?: string; + thinkLevel?: ThinkLevel; + verboseLevel?: VerboseLevel; + reasoningLevel?: ReasoningLevel; + bashElevated?: ExecElevatedDefaults; + timeoutMs: number; + runId: string; + abortSignal?: AbortSignal; + shouldEmitToolResult?: () => boolean; + onPartialReply?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; + onAssistantMessageStart?: () => void | Promise; + onBlockReply?: (payload: { + text?: string; + mediaUrls?: string[]; + audioAsVoice?: boolean; + }) => void | Promise; + /** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */ + onBlockReplyFlush?: () => void | Promise; + blockReplyBreak?: "text_end" | "message_end"; + blockReplyChunking?: BlockReplyChunking; + onReasoningStream?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; + onToolResult?: (payload: { + text?: string; + mediaUrls?: string[]; + }) => void | Promise; + onAgentEvent?: (evt: { + stream: string; + data: Record; + }) => void; + lane?: string; + enqueue?: typeof enqueueCommand; + extraSystemPrompt?: string; + ownerNumbers?: string[]; + enforceFinalTag?: boolean; +}): Promise { + const sessionLane = resolveSessionLane( + params.sessionKey?.trim() || params.sessionId, + ); + const globalLane = resolveGlobalLane(params.lane); + const enqueueGlobal = + params.enqueue ?? + ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); + const runAbortController = new AbortController(); + return enqueueCommandInLane(sessionLane, () => + enqueueGlobal(async () => { + const started = Date.now(); + const resolvedWorkspace = resolveUserPath(params.workspaceDir); + const prevCwd = process.cwd(); + + const provider = + (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; + const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; + const agentDir = params.agentDir ?? resolveClawdbotAgentDir(); + await ensureClawdbotModelsJson(params.config, agentDir); + const { model, error, authStorage, modelRegistry } = resolveModel( + provider, + modelId, + agentDir, + params.config, + ); + if (!model) { + throw new Error(error ?? `Unknown model: ${provider}/${modelId}`); + } + + const ctxInfo = resolveContextWindowInfo({ + cfg: params.config, + provider, + modelId, + modelContextWindow: model.contextWindow, + defaultTokens: DEFAULT_CONTEXT_TOKENS, + }); + const ctxGuard = evaluateContextWindowGuard({ + info: ctxInfo, + warnBelowTokens: CONTEXT_WINDOW_WARN_BELOW_TOKENS, + hardMinTokens: CONTEXT_WINDOW_HARD_MIN_TOKENS, + }); + if (ctxGuard.shouldWarn) { + log.warn( + `low context window: ${provider}/${modelId} ctx=${ctxGuard.tokens} (warn<${CONTEXT_WINDOW_WARN_BELOW_TOKENS}) source=${ctxGuard.source}`, + ); + } + if (ctxGuard.shouldBlock) { + log.error( + `blocked model (context window too small): ${provider}/${modelId} ctx=${ctxGuard.tokens} (min=${CONTEXT_WINDOW_HARD_MIN_TOKENS}) source=${ctxGuard.source}`, + ); + throw new FailoverError( + `Model context window too small (${ctxGuard.tokens} tokens). Minimum is ${CONTEXT_WINDOW_HARD_MIN_TOKENS}.`, + { reason: "unknown", provider, model: modelId }, + ); + } + const authStore = ensureAuthProfileStore(agentDir); + const explicitProfileId = params.authProfileId?.trim(); + const profileOrder = resolveAuthProfileOrder({ + cfg: params.config, + store: authStore, + provider, + preferredProfile: explicitProfileId, + }); + if (explicitProfileId && !profileOrder.includes(explicitProfileId)) { + throw new Error( + `Auth profile "${explicitProfileId}" is not configured for ${provider}.`, + ); + } + const profileCandidates = + profileOrder.length > 0 ? profileOrder : [undefined]; + let profileIndex = 0; + const initialThinkLevel = params.thinkLevel ?? "off"; + let thinkLevel = initialThinkLevel; + const attemptedThinking = new Set(); + let apiKeyInfo: ApiKeyInfo | null = null; + let lastProfileId: string | undefined; + + const resolveApiKeyForCandidate = async (candidate?: string) => { + return getApiKeyForModel({ + model, + cfg: params.config, + profileId: candidate, + store: authStore, + }); + }; + + const applyApiKeyInfo = async (candidate?: string): Promise => { + apiKeyInfo = await resolveApiKeyForCandidate(candidate); + + if (model.provider === "github-copilot") { + const { resolveCopilotApiToken } = await import( + "../providers/github-copilot-token.js" + ); + const copilotToken = await resolveCopilotApiToken({ + githubToken: apiKeyInfo.apiKey, + }); + authStorage.setRuntimeApiKey(model.provider, copilotToken.token); + } else { + authStorage.setRuntimeApiKey(model.provider, apiKeyInfo.apiKey); + } + + lastProfileId = apiKeyInfo.profileId; + }; + + const advanceAuthProfile = async (): Promise => { + let nextIndex = profileIndex + 1; + while (nextIndex < profileCandidates.length) { + const candidate = profileCandidates[nextIndex]; + try { + await applyApiKeyInfo(candidate); + profileIndex = nextIndex; + thinkLevel = initialThinkLevel; + attemptedThinking.clear(); + return true; + } catch (err) { + if (candidate && candidate === explicitProfileId) throw err; + nextIndex += 1; + } + } + return false; + }; + + try { + await applyApiKeyInfo(profileCandidates[profileIndex]); + } catch (err) { + if (profileCandidates[profileIndex] === explicitProfileId) throw err; + const advanced = await advanceAuthProfile(); + if (!advanced) throw err; + } + + while (true) { + const thinkingLevel = mapThinkingLevel(thinkLevel); + attemptedThinking.add(thinkLevel); + + log.debug( + `embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${provider} model=${modelId} thinking=${thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`, + ); + + await fs.mkdir(resolvedWorkspace, { recursive: true }); + const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId; + const sandbox = await resolveSandboxContext({ + config: params.config, + sessionKey: sandboxSessionKey, + workspaceDir: resolvedWorkspace, + }); + const effectiveWorkspace = sandbox?.enabled + ? sandbox.workspaceAccess === "rw" + ? resolvedWorkspace + : sandbox.workspaceDir + : resolvedWorkspace; + await fs.mkdir(effectiveWorkspace, { recursive: true }); + + let restoreSkillEnv: (() => void) | undefined; + process.chdir(effectiveWorkspace); + try { + const shouldLoadSkillEntries = + !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; + const skillEntries = shouldLoadSkillEntries + ? loadWorkspaceSkillEntries(effectiveWorkspace) + : []; + restoreSkillEnv = params.skillsSnapshot + ? applySkillEnvOverridesFromSnapshot({ + snapshot: params.skillsSnapshot, + config: params.config, + }) + : applySkillEnvOverrides({ + skills: skillEntries ?? [], + config: params.config, + }); + const skillsPrompt = resolveSkillsPromptForRun({ + skillsSnapshot: params.skillsSnapshot, + entries: shouldLoadSkillEntries ? skillEntries : undefined, + config: params.config, + workspaceDir: effectiveWorkspace, + }); + + const bootstrapFiles = filterBootstrapFilesForSession( + await loadWorkspaceBootstrapFiles(effectiveWorkspace), + params.sessionKey ?? params.sessionId, + ); + const sessionLabel = params.sessionKey ?? params.sessionId; + const contextFiles = buildBootstrapContextFiles(bootstrapFiles, { + maxChars: resolveBootstrapMaxChars(params.config), + warn: (message) => + log.warn(`${message} (sessionKey=${sessionLabel})`), + }); + // Tool schemas must be provider-compatible (OpenAI requires top-level `type: "object"`). + // `createClawdbotCodingTools()` normalizes schemas so the session can pass them through unchanged. + const tools = createClawdbotCodingTools({ + exec: { + ...resolveExecToolDefaults(params.config), + elevated: params.bashElevated, + }, + sandbox, + messageProvider: params.messageChannel ?? params.messageProvider, + agentAccountId: params.agentAccountId, + sessionKey: params.sessionKey ?? params.sessionId, + agentDir, + workspaceDir: effectiveWorkspace, + config: params.config, + abortSignal: runAbortController.signal, + modelProvider: model.provider, + modelId, + modelAuthMode: resolveModelAuthMode(model.provider, params.config), + currentChannelId: params.currentChannelId, + currentThreadTs: params.currentThreadTs, + replyToMode: params.replyToMode, + hasRepliedRef: params.hasRepliedRef, + }); + logToolSchemasForGoogle({ tools, provider }); + const machineName = await getMachineDisplayName(); + const runtimeInfo = { + host: machineName, + os: `${os.type()} ${os.release()}`, + arch: os.arch(), + node: process.version, + model: `${provider}/${modelId}`, + }; + const sandboxInfo = buildEmbeddedSandboxInfo( + sandbox, + params.bashElevated, + ); + const reasoningTagHint = isReasoningTagProvider(provider); + const userTimezone = resolveUserTimezone( + params.config?.agents?.defaults?.userTimezone, + ); + const userTime = formatUserTime(new Date(), userTimezone); + // Only include heartbeat prompt for the default agent + const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({ + sessionKey: params.sessionKey, + config: params.config, + }); + const isDefaultAgent = sessionAgentId === defaultAgentId; + const appendPrompt = buildEmbeddedSystemPrompt({ + workspaceDir: effectiveWorkspace, + defaultThinkLevel: thinkLevel, + reasoningLevel: params.reasoningLevel ?? "off", + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + reasoningTagHint, + heartbeatPrompt: isDefaultAgent + ? resolveHeartbeatPrompt( + params.config?.agents?.defaults?.heartbeat?.prompt, + ) + : undefined, + skillsPrompt, + runtimeInfo, + sandboxInfo, + tools, + modelAliasLines: buildModelAliasLines(params.config), + userTimezone, + userTime, + contextFiles, + }); + const systemPrompt = createSystemPromptOverride(appendPrompt); + + const sessionLock = await acquireSessionWriteLock({ + sessionFile: params.sessionFile, + }); + // Pre-warm session file to bring it into OS page cache + await prewarmSessionFile(params.sessionFile); + const sessionManager = guardSessionManager( + SessionManager.open(params.sessionFile), + ); + trackSessionManagerAccess(params.sessionFile); + const settingsManager = SettingsManager.create( + effectiveWorkspace, + agentDir, + ); + ensurePiCompactionReserveTokens({ + settingsManager, + minReserveTokens: resolveCompactionReserveTokensFloor( + params.config, + ), + }); + const additionalExtensionPaths = buildEmbeddedExtensionPaths({ + cfg: params.config, + sessionManager, + provider, + modelId, + model, + }); + + const { builtInTools, customTools } = splitSdkTools({ + tools, + sandboxEnabled: !!sandbox?.enabled, + }); + + let session: Awaited< + ReturnType + >["session"]; + ({ session } = await createAgentSession({ + cwd: resolvedWorkspace, + agentDir, + authStorage, + modelRegistry, + model, + thinkingLevel, + systemPrompt, + // Built-in tools recognized by pi-coding-agent SDK + tools: builtInTools, + // Custom clawdbot tools (browser, canvas, nodes, cron, etc.) + customTools, + sessionManager, + settingsManager, + skills: [], + contextFiles: [], + additionalExtensionPaths, + })); + + // Wire up config-driven model params (e.g., temperature/maxTokens) + applyExtraParamsToAgent( + session.agent, + params.config, + provider, + modelId, + params.thinkLevel, + ); + + try { + const prior = await sanitizeSessionHistory({ + messages: session.messages, + modelApi: model.api, + provider, + modelId, + sessionManager, + sessionId: params.sessionId, + }); + // Validate turn ordering for both Gemini (consecutive assistant) and Anthropic (consecutive user) + const validatedGemini = validateGeminiTurns(prior); + const validated = validateAnthropicTurns(validatedGemini); + const limited = limitHistoryTurns( + validated, + getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), + ); + if (limited.length > 0) { + session.agent.replaceMessages(limited); + } + } catch (err) { + sessionManager.flushPendingToolResults?.(); + session.dispose(); + await sessionLock.release(); + throw err; + } + let aborted = Boolean(params.abortSignal?.aborted); + let timedOut = false; + const abortRun = (isTimeout = false) => { + aborted = true; + if (isTimeout) timedOut = true; + runAbortController.abort(); + void session.abort(); + }; + let subscription: ReturnType; + try { + subscription = subscribeEmbeddedPiSession({ + session, + runId: params.runId, + verboseLevel: params.verboseLevel, + reasoningMode: params.reasoningLevel ?? "off", + shouldEmitToolResult: params.shouldEmitToolResult, + onToolResult: params.onToolResult, + onReasoningStream: params.onReasoningStream, + onBlockReply: params.onBlockReply, + onBlockReplyFlush: params.onBlockReplyFlush, + blockReplyBreak: params.blockReplyBreak, + blockReplyChunking: params.blockReplyChunking, + onPartialReply: params.onPartialReply, + onAssistantMessageStart: params.onAssistantMessageStart, + onAgentEvent: params.onAgentEvent, + enforceFinalTag: params.enforceFinalTag, + }); + } catch (err) { + sessionManager.flushPendingToolResults?.(); + session.dispose(); + await sessionLock.release(); + throw err; + } + const { + assistantTexts, + toolMetas, + unsubscribe, + waitForCompactionRetry, + getMessagingToolSentTexts, + getMessagingToolSentTargets, + didSendViaMessagingTool, + } = subscription; + + const queueHandle: EmbeddedPiQueueHandle = { + queueMessage: async (text: string) => { + await session.steer(text); + }, + isStreaming: () => session.isStreaming, + isCompacting: () => subscription.isCompacting(), + abort: abortRun, + }; + ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); + + let abortWarnTimer: NodeJS.Timeout | undefined; + const abortTimer = setTimeout( + () => { + log.warn( + `embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`, + ); + abortRun(true); + if (!abortWarnTimer) { + abortWarnTimer = setTimeout(() => { + if (!session.isStreaming) return; + log.warn( + `embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`, + ); + }, 10_000); + } + }, + Math.max(1, params.timeoutMs), + ); + + let messagesSnapshot: AgentMessage[] = []; + let sessionIdUsed = session.sessionId; + const onAbort = () => { + abortRun(); + }; + if (params.abortSignal) { + if (params.abortSignal.aborted) { + onAbort(); + } else { + params.abortSignal.addEventListener("abort", onAbort, { + once: true, + }); + } + } + let promptError: unknown = null; + try { + const promptStartedAt = Date.now(); + log.debug( + `embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`, + ); + try { + await session.prompt(params.prompt, { + images: params.images, + }); + } catch (err) { + promptError = err; + } finally { + log.debug( + `embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`, + ); + } + try { + await waitForCompactionRetry(); + } catch (err) { + // Capture AbortError from waitForCompactionRetry to enable fallback/rotation. + if (isAbortError(err)) { + if (!promptError) promptError = err; + } else { + throw err; + } + } + messagesSnapshot = session.messages.slice(); + sessionIdUsed = session.sessionId; + } finally { + clearTimeout(abortTimer); + if (abortWarnTimer) { + clearTimeout(abortWarnTimer); + abortWarnTimer = undefined; + } + unsubscribe(); + if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { + ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); + notifyEmbeddedRunEnded(params.sessionId); + } + sessionManager.flushPendingToolResults?.(); + session.dispose(); + await sessionLock.release(); + params.abortSignal?.removeEventListener?.("abort", onAbort); + } + if (promptError && !aborted) { + const errorText = describeUnknownError(promptError); + if (isContextOverflowError(errorText)) { + const kind = isCompactionFailureError(errorText) + ? "compaction_failure" + : "context_overflow"; + return { + payloads: [ + { + text: + "Context overflow: prompt too large for the model. " + + "Try again with less input or a larger-context model.", + isError: true, + }, + ], + meta: { + durationMs: Date.now() - started, + agentMeta: { + sessionId: sessionIdUsed, + provider, + model: model.id, + }, + error: { kind, message: errorText }, + }, + }; + } + const promptFailoverReason = classifyFailoverReason(errorText); + if ( + promptFailoverReason && + promptFailoverReason !== "timeout" && + lastProfileId + ) { + await markAuthProfileFailure({ + store: authStore, + profileId: lastProfileId, + reason: promptFailoverReason, + cfg: params.config, + agentDir: params.agentDir, + }); + } + if ( + isFailoverErrorMessage(errorText) && + promptFailoverReason !== "timeout" && + (await advanceAuthProfile()) + ) { + continue; + } + const fallbackThinking = pickFallbackThinkingLevel({ + message: errorText, + attempted: attemptedThinking, + }); + if (fallbackThinking) { + log.warn( + `unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`, + ); + thinkLevel = fallbackThinking; + continue; + } + throw promptError; + } + + const lastAssistant = messagesSnapshot + .slice() + .reverse() + .find((m) => (m as AgentMessage)?.role === "assistant") as + | AssistantMessage + | undefined; + + const fallbackThinking = pickFallbackThinkingLevel({ + message: lastAssistant?.errorMessage, + attempted: attemptedThinking, + }); + if (fallbackThinking && !aborted) { + log.warn( + `unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`, + ); + thinkLevel = fallbackThinking; + continue; + } + + const fallbackConfigured = + (params.config?.agents?.defaults?.model?.fallbacks?.length ?? 0) > + 0; + const authFailure = isAuthAssistantError(lastAssistant); + const rateLimitFailure = isRateLimitAssistantError(lastAssistant); + const failoverFailure = isFailoverAssistantError(lastAssistant); + const assistantFailoverReason = classifyFailoverReason( + lastAssistant?.errorMessage ?? "", + ); + const cloudCodeAssistFormatError = lastAssistant?.errorMessage + ? isCloudCodeAssistFormatError(lastAssistant.errorMessage) + : false; + + // Treat timeout as potential rate limit (Antigravity hangs on rate limit) + const shouldRotate = (!aborted && failoverFailure) || timedOut; + + if (shouldRotate) { + // Mark current profile for cooldown before rotating + if (lastProfileId) { + const reason = + timedOut || assistantFailoverReason === "timeout" + ? "timeout" + : (assistantFailoverReason ?? "unknown"); + await markAuthProfileFailure({ + store: authStore, + profileId: lastProfileId, + reason, + cfg: params.config, + agentDir: params.agentDir, + }); + if (timedOut) { + log.warn( + `Profile ${lastProfileId} timed out (possible rate limit). Trying next account...`, + ); + } + if (cloudCodeAssistFormatError) { + log.warn( + `Profile ${lastProfileId} hit Cloud Code Assist format error. Tool calls will be sanitized on retry.`, + ); + } + } + const rotated = await advanceAuthProfile(); + if (rotated) { + continue; + } + if (fallbackConfigured) { + const message = + lastAssistant?.errorMessage?.trim() || + (lastAssistant + ? formatAssistantErrorText(lastAssistant, { + cfg: params.config, + sessionKey: params.sessionKey ?? params.sessionId, + }) + : "") || + (timedOut + ? "LLM request timed out." + : rateLimitFailure + ? "LLM request rate limited." + : authFailure + ? "LLM request unauthorized." + : "LLM request failed."); + const status = + resolveFailoverStatus(assistantFailoverReason ?? "unknown") ?? + (isTimeoutErrorMessage(message) ? 408 : undefined); + throw new FailoverError(message, { + reason: assistantFailoverReason ?? "unknown", + provider, + model: modelId, + profileId: lastProfileId, + status, + }); + } + } + + const usage = normalizeUsage(lastAssistant?.usage as UsageLike); + const agentMeta: EmbeddedPiAgentMeta = { + sessionId: sessionIdUsed, + provider: lastAssistant?.provider ?? provider, + model: lastAssistant?.model ?? model.id, + usage, + }; + + const replyItems: Array<{ + text: string; + media?: string[]; + isError?: boolean; + audioAsVoice?: boolean; + replyToId?: string; + replyToTag?: boolean; + replyToCurrent?: boolean; + }> = []; + + const errorText = lastAssistant + ? formatAssistantErrorText(lastAssistant, { + cfg: params.config, + sessionKey: params.sessionKey ?? params.sessionId, + }) + : undefined; + + if (errorText) replyItems.push({ text: errorText, isError: true }); + + const inlineToolResults = + params.verboseLevel === "on" && + !params.onPartialReply && + !params.onToolResult && + toolMetas.length > 0; + if (inlineToolResults) { + for (const { toolName, meta } of toolMetas) { + const agg = formatToolAggregate(toolName, meta ? [meta] : []); + const { + text: cleanedText, + mediaUrls, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + } = parseReplyDirectives(agg); + if (cleanedText) + replyItems.push({ + text: cleanedText, + media: mediaUrls, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } + } + + const reasoningText = + lastAssistant && params.reasoningLevel === "on" + ? formatReasoningMessage(extractAssistantThinking(lastAssistant)) + : ""; + if (reasoningText) replyItems.push({ text: reasoningText }); + + const fallbackAnswerText = lastAssistant + ? extractAssistantText(lastAssistant) + : ""; + const answerTexts = assistantTexts.length + ? assistantTexts + : fallbackAnswerText + ? [fallbackAnswerText] + : []; + for (const text of answerTexts) { + const { + text: cleanedText, + mediaUrls, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + } = parseReplyDirectives(text); + if ( + !cleanedText && + (!mediaUrls || mediaUrls.length === 0) && + !audioAsVoice + ) + continue; + replyItems.push({ + text: cleanedText, + media: mediaUrls, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } + + // Check if any replyItem has audioAsVoice tag - if so, apply to all media payloads + const hasAudioAsVoiceTag = replyItems.some( + (item) => item.audioAsVoice, + ); + const payloads = replyItems + .map((item) => ({ + text: item.text?.trim() ? item.text.trim() : undefined, + mediaUrls: item.media?.length ? item.media : undefined, + mediaUrl: item.media?.[0], + isError: item.isError, + replyToId: item.replyToId, + replyToTag: item.replyToTag, + replyToCurrent: item.replyToCurrent, + // Apply audioAsVoice to media payloads if tag was found anywhere in response + audioAsVoice: + item.audioAsVoice || (hasAudioAsVoiceTag && item.media?.length), + })) + .filter( + (p) => + p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0), + ); + + log.debug( + `embedded run done: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - started} aborted=${aborted}`, + ); + if (lastProfileId) { + await markAuthProfileGood({ + store: authStore, + provider, + profileId: lastProfileId, + }); + // Track usage for round-robin rotation + await markAuthProfileUsed({ + store: authStore, + profileId: lastProfileId, + }); + } + return { + payloads: payloads.length ? payloads : undefined, + meta: { + durationMs: Date.now() - started, + agentMeta, + aborted, + }, + didSendViaMessagingTool: didSendViaMessagingTool(), + messagingToolSentTexts: getMessagingToolSentTexts(), + messagingToolSentTargets: getMessagingToolSentTargets(), + }; + } finally { + restoreSkillEnv?.(); + process.chdir(prevCwd); + } + } + }), + ); +} diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index b9ab4a034..6d9e11461 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -308,6 +308,7 @@ export async function compactEmbeddedPiSession(params: { messages: session.messages, modelApi: model.api, modelId, + provider, sessionManager, sessionId: params.sessionId, }); diff --git a/src/agents/pi-embedded-runner/google.ts b/src/agents/pi-embedded-runner/google.ts index d8e170a08..31ee19a59 100644 --- a/src/agents/pi-embedded-runner/google.ts +++ b/src/agents/pi-embedded-runner/google.ts @@ -185,17 +185,26 @@ export async function sanitizeSessionHistory(params: { messages: AgentMessage[]; modelApi?: string | null; modelId?: string; + provider?: string; sessionManager: SessionManager; sessionId: string; }): Promise { const isAntigravityClaudeModel = isAntigravityClaude(params.modelApi, params.modelId); + const provider = (params.provider ?? "").toLowerCase(); + const modelId = (params.modelId ?? "").toLowerCase(); + const isOpenRouterGemini = + (provider === "openrouter" || provider === "opencode") && modelId.includes("gemini"); + const isGeminiLike = isGoogleModelApi(params.modelApi) || isOpenRouterGemini; const sanitizedImages = await sanitizeSessionMessagesImages(params.messages, "session:history", { sanitizeToolCallIds: shouldSanitizeToolCallIds(params.modelApi), enforceToolCallLast: params.modelApi === "anthropic-messages", preserveSignatures: params.modelApi === "google-antigravity" && isAntigravityClaudeModel, + sanitizeThoughtSignatures: isOpenRouterGemini + ? { allowBase64Only: true, includeCamelCase: true } + : undefined, }); const repairedTools = sanitizeToolUseResultPairing(sanitizedImages); - const shouldDowngradeGemini = isGoogleModelApi(params.modelApi) && !isAntigravityClaudeModel; + const shouldDowngradeGemini = isGeminiLike && !isAntigravityClaudeModel; // Gemini rejects unsigned thinking blocks; downgrade them before send to avoid INVALID_ARGUMENT. const downgradedThinking = shouldDowngradeGemini ? downgradeGeminiThinkingBlocks(repairedTools) diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 065132828..40c11de2f 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -323,6 +323,7 @@ export async function runEmbeddedAttempt( messages: activeSession.messages, modelApi: params.model.api, modelId: params.modelId, + provider: params.provider, sessionManager, sessionId: params.sessionId, });