diff --git a/src/agents/pi-embedded-helpers.test.ts b/src/agents/pi-embedded-helpers.test.ts index 5bc3f1dcb..c738bf4af 100644 --- a/src/agents/pi-embedded-helpers.test.ts +++ b/src/agents/pi-embedded-helpers.test.ts @@ -15,6 +15,7 @@ import { sanitizeGoogleTurnOrdering, sanitizeSessionMessagesImages, sanitizeToolCallId, + validateAnthropicTurns, validateGeminiTurns, } from "./pi-embedded-helpers.js"; import { @@ -171,6 +172,124 @@ describe("validateGeminiTurns", () => { }); }); +describe("validateAnthropicTurns", () => { + it("should return empty array unchanged", () => { + const result = validateAnthropicTurns([]); + expect(result).toEqual([]); + }); + + it("should return single message unchanged", () => { + const msgs: AgentMessage[] = [ + { + role: "user", + content: [{ type: "text", text: "Hello" }], + }, + ]; + const result = validateAnthropicTurns(msgs); + expect(result).toEqual(msgs); + }); + + it("should return alternating user/assistant unchanged", () => { + const msgs: AgentMessage[] = [ + { role: "user", content: [{ type: "text", text: "Question" }] }, + { + role: "assistant", + content: [{ type: "text", text: "Answer" }], + }, + { role: "user", content: [{ type: "text", text: "Follow-up" }] }, + ]; + const result = validateAnthropicTurns(msgs); + expect(result).toEqual(msgs); + }); + + it("should merge consecutive user messages", () => { + const msgs: AgentMessage[] = [ + { + role: "user", + content: [{ type: "text", text: "First message" }], + timestamp: 1000, + }, + { + role: "user", + content: [{ type: "text", text: "Second message" }], + timestamp: 2000, + }, + ]; + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(1); + expect(result[0].role).toBe("user"); + const content = (result[0] as { content: unknown[] }).content; + expect(content).toHaveLength(2); + expect(content[0]).toEqual({ type: "text", text: "First message" }); + expect(content[1]).toEqual({ type: "text", text: "Second message" }); + // Should take timestamp from the newer message + expect((result[0] as { timestamp?: number }).timestamp).toBe(2000); + }); + + it("should merge three consecutive user messages", () => { + const msgs: AgentMessage[] = [ + { role: "user", content: [{ type: "text", text: "One" }] }, + { role: "user", content: [{ type: "text", text: "Two" }] }, + { role: "user", content: [{ type: "text", text: "Three" }] }, + ]; + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(1); + const content = (result[0] as { content: unknown[] }).content; + expect(content).toHaveLength(3); + }); + + it("should not merge consecutive assistant messages", () => { + const msgs: AgentMessage[] = [ + { role: "user", content: [{ type: "text", text: "Question" }] }, + { + role: "assistant", + content: [{ type: "text", text: "Answer 1" }], + }, + { + role: "assistant", + content: [{ type: "text", text: "Answer 2" }], + }, + ]; + + const result = validateAnthropicTurns(msgs); + + // validateAnthropicTurns only merges user messages, not assistant + expect(result).toHaveLength(3); + }); + + it("should handle mixed scenario with steering messages", () => { + // Simulates: user asks -> assistant errors -> steering user message injected + const msgs: AgentMessage[] = [ + { role: "user", content: [{ type: "text", text: "Original question" }] }, + { + role: "assistant", + content: [], + stopReason: "error", + errorMessage: "Overloaded", + }, + { + role: "user", + content: [{ type: "text", text: "Steering: try again" }], + }, + { role: "user", content: [{ type: "text", text: "Another follow-up" }] }, + ]; + + const result = validateAnthropicTurns(msgs); + + // The two consecutive user messages at the end should be merged + expect(result).toHaveLength(3); + expect(result[0].role).toBe("user"); + expect(result[1].role).toBe("assistant"); + expect(result[2].role).toBe("user"); + const lastContent = (result[2] as { content: unknown[] }).content; + expect(lastContent).toHaveLength(2); + }); +}); + describe("buildBootstrapContextFiles", () => { it("keeps missing markers", () => { const files = [makeFile({ missing: true, content: undefined })]; diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts index 793c68257..d1910303b 100644 --- a/src/agents/pi-embedded-helpers.ts +++ b/src/agents/pi-embedded-helpers.ts @@ -297,6 +297,16 @@ export function formatAssistantErrorText( ); } + // Check for role ordering errors (Anthropic 400 "Incorrect role information") + // This typically happens when consecutive user messages are sent without + // an assistant response between them, often due to steering/queueing timing. + if (/incorrect role information|roles must alternate/i.test(raw)) { + return ( + "Message ordering conflict - please try again. " + + "If this persists, use /new to start a fresh session." + ); + } + const invalidRequest = raw.match( /"type":"invalid_request_error".*?"message":"([^"]+)"/, ); @@ -553,6 +563,77 @@ export function validateGeminiTurns(messages: AgentMessage[]): AgentMessage[] { return result; } +/** + * Validates and fixes conversation turn sequences for Anthropic API. + * Anthropic requires strict alternating user→assistant pattern. + * This function: + * 1. Detects consecutive user messages + * 2. Merges consecutive user messages together + * 3. Preserves timestamps from the later message + * + * This prevents the "400 Incorrect role information" error that occurs + * when steering messages are injected during streaming and create + * consecutive user messages. + */ +export function validateAnthropicTurns( + messages: AgentMessage[], +): AgentMessage[] { + if (!Array.isArray(messages) || messages.length === 0) { + return messages; + } + + const result: AgentMessage[] = []; + let lastRole: string | undefined; + + for (const msg of messages) { + if (!msg || typeof msg !== "object") { + result.push(msg); + continue; + } + + const msgRole = (msg as { role?: unknown }).role as string | undefined; + if (!msgRole) { + result.push(msg); + continue; + } + + // Check if this message has the same role as the last one + if (msgRole === lastRole && lastRole === "user") { + // Merge consecutive user messages + const lastMsg = result[result.length - 1]; + const currentMsg = msg as Extract; + + if (lastMsg && typeof lastMsg === "object") { + const lastUser = lastMsg as Extract; + + // Merge content blocks + const mergedContent = [ + ...(Array.isArray(lastUser.content) ? lastUser.content : []), + ...(Array.isArray(currentMsg.content) ? currentMsg.content : []), + ]; + + // Preserve timestamp from the later message (more recent) + const merged: Extract = { + ...lastUser, + content: mergedContent, + // Take timestamp from the newer message + ...(currentMsg.timestamp && { timestamp: currentMsg.timestamp }), + }; + + // Replace the last message with merged version + result[result.length - 1] = merged; + continue; + } + } + + // Not a consecutive duplicate, add normally + result.push(msg); + lastRole = msgRole; + } + + return result; +} + // ── Messaging tool duplicate detection ────────────────────────────────────── // When the agent uses a messaging tool (telegram, discord, slack, message, sessions_send) // to send a message, we track the text so we can suppress duplicate block replies. diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index d6fa1664e..591b9e5b6 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -100,6 +100,7 @@ import { pickFallbackThinkingLevel, sanitizeGoogleTurnOrdering, sanitizeSessionMessagesImages, + validateAnthropicTurns, validateGeminiTurns, } from "./pi-embedded-helpers.js"; import { @@ -1291,7 +1292,9 @@ export async function compactEmbeddedPiSession(params: { sessionManager, sessionId: params.sessionId, }); - const validated = validateGeminiTurns(prior); + // Validate turn ordering for both Gemini (consecutive assistant) and Anthropic (consecutive user) + const validatedGemini = validateGeminiTurns(prior); + const validated = validateAnthropicTurns(validatedGemini); const limited = limitHistoryTurns( validated, getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), @@ -1714,7 +1717,9 @@ export async function runEmbeddedPiAgent(params: { sessionManager, sessionId: params.sessionId, }); - const validated = validateGeminiTurns(prior); + // Validate turn ordering for both Gemini (consecutive assistant) and Anthropic (consecutive user) + const validatedGemini = validateGeminiTurns(prior); + const validated = validateAnthropicTurns(validatedGemini); const limited = limitHistoryTurns( validated, getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),