fix(agents): prevent Anthropic 400 'Incorrect role information' error

Add validateAnthropicTurns() to merge consecutive user messages that can
occur when steering messages are injected during streaming. This prevents
the API from rejecting requests due to improper role alternation.

Changes:
- Add validateAnthropicTurns() function in pi-embedded-helpers.ts
- Integrate validation into sanitization pipeline in pi-embedded-runner.ts
- Add user-friendly error message for role ordering errors
- Add comprehensive tests for the new validation function
This commit is contained in:
Drake Thomsen
2026-01-12 16:41:10 -05:00
committed by Peter Steinberger
parent bb9a9633a8
commit c5fa757ef6
3 changed files with 207 additions and 2 deletions

View File

@@ -15,6 +15,7 @@ import {
sanitizeGoogleTurnOrdering,
sanitizeSessionMessagesImages,
sanitizeToolCallId,
validateAnthropicTurns,
validateGeminiTurns,
} from "./pi-embedded-helpers.js";
import {
@@ -171,6 +172,124 @@ describe("validateGeminiTurns", () => {
});
});
describe("validateAnthropicTurns", () => {
it("should return empty array unchanged", () => {
const result = validateAnthropicTurns([]);
expect(result).toEqual([]);
});
it("should return single message unchanged", () => {
const msgs: AgentMessage[] = [
{
role: "user",
content: [{ type: "text", text: "Hello" }],
},
];
const result = validateAnthropicTurns(msgs);
expect(result).toEqual(msgs);
});
it("should return alternating user/assistant unchanged", () => {
const msgs: AgentMessage[] = [
{ role: "user", content: [{ type: "text", text: "Question" }] },
{
role: "assistant",
content: [{ type: "text", text: "Answer" }],
},
{ role: "user", content: [{ type: "text", text: "Follow-up" }] },
];
const result = validateAnthropicTurns(msgs);
expect(result).toEqual(msgs);
});
it("should merge consecutive user messages", () => {
const msgs: AgentMessage[] = [
{
role: "user",
content: [{ type: "text", text: "First message" }],
timestamp: 1000,
},
{
role: "user",
content: [{ type: "text", text: "Second message" }],
timestamp: 2000,
},
];
const result = validateAnthropicTurns(msgs);
expect(result).toHaveLength(1);
expect(result[0].role).toBe("user");
const content = (result[0] as { content: unknown[] }).content;
expect(content).toHaveLength(2);
expect(content[0]).toEqual({ type: "text", text: "First message" });
expect(content[1]).toEqual({ type: "text", text: "Second message" });
// Should take timestamp from the newer message
expect((result[0] as { timestamp?: number }).timestamp).toBe(2000);
});
it("should merge three consecutive user messages", () => {
const msgs: AgentMessage[] = [
{ role: "user", content: [{ type: "text", text: "One" }] },
{ role: "user", content: [{ type: "text", text: "Two" }] },
{ role: "user", content: [{ type: "text", text: "Three" }] },
];
const result = validateAnthropicTurns(msgs);
expect(result).toHaveLength(1);
const content = (result[0] as { content: unknown[] }).content;
expect(content).toHaveLength(3);
});
it("should not merge consecutive assistant messages", () => {
const msgs: AgentMessage[] = [
{ role: "user", content: [{ type: "text", text: "Question" }] },
{
role: "assistant",
content: [{ type: "text", text: "Answer 1" }],
},
{
role: "assistant",
content: [{ type: "text", text: "Answer 2" }],
},
];
const result = validateAnthropicTurns(msgs);
// validateAnthropicTurns only merges user messages, not assistant
expect(result).toHaveLength(3);
});
it("should handle mixed scenario with steering messages", () => {
// Simulates: user asks -> assistant errors -> steering user message injected
const msgs: AgentMessage[] = [
{ role: "user", content: [{ type: "text", text: "Original question" }] },
{
role: "assistant",
content: [],
stopReason: "error",
errorMessage: "Overloaded",
},
{
role: "user",
content: [{ type: "text", text: "Steering: try again" }],
},
{ role: "user", content: [{ type: "text", text: "Another follow-up" }] },
];
const result = validateAnthropicTurns(msgs);
// The two consecutive user messages at the end should be merged
expect(result).toHaveLength(3);
expect(result[0].role).toBe("user");
expect(result[1].role).toBe("assistant");
expect(result[2].role).toBe("user");
const lastContent = (result[2] as { content: unknown[] }).content;
expect(lastContent).toHaveLength(2);
});
});
describe("buildBootstrapContextFiles", () => {
it("keeps missing markers", () => {
const files = [makeFile({ missing: true, content: undefined })];

View File

@@ -297,6 +297,16 @@ export function formatAssistantErrorText(
);
}
// Check for role ordering errors (Anthropic 400 "Incorrect role information")
// This typically happens when consecutive user messages are sent without
// an assistant response between them, often due to steering/queueing timing.
if (/incorrect role information|roles must alternate/i.test(raw)) {
return (
"Message ordering conflict - please try again. " +
"If this persists, use /new to start a fresh session."
);
}
const invalidRequest = raw.match(
/"type":"invalid_request_error".*?"message":"([^"]+)"/,
);
@@ -553,6 +563,77 @@ export function validateGeminiTurns(messages: AgentMessage[]): AgentMessage[] {
return result;
}
/**
* Validates and fixes conversation turn sequences for Anthropic API.
* Anthropic requires strict alternating user→assistant pattern.
* This function:
* 1. Detects consecutive user messages
* 2. Merges consecutive user messages together
* 3. Preserves timestamps from the later message
*
* This prevents the "400 Incorrect role information" error that occurs
* when steering messages are injected during streaming and create
* consecutive user messages.
*/
export function validateAnthropicTurns(
messages: AgentMessage[],
): AgentMessage[] {
if (!Array.isArray(messages) || messages.length === 0) {
return messages;
}
const result: AgentMessage[] = [];
let lastRole: string | undefined;
for (const msg of messages) {
if (!msg || typeof msg !== "object") {
result.push(msg);
continue;
}
const msgRole = (msg as { role?: unknown }).role as string | undefined;
if (!msgRole) {
result.push(msg);
continue;
}
// Check if this message has the same role as the last one
if (msgRole === lastRole && lastRole === "user") {
// Merge consecutive user messages
const lastMsg = result[result.length - 1];
const currentMsg = msg as Extract<AgentMessage, { role: "user" }>;
if (lastMsg && typeof lastMsg === "object") {
const lastUser = lastMsg as Extract<AgentMessage, { role: "user" }>;
// Merge content blocks
const mergedContent = [
...(Array.isArray(lastUser.content) ? lastUser.content : []),
...(Array.isArray(currentMsg.content) ? currentMsg.content : []),
];
// Preserve timestamp from the later message (more recent)
const merged: Extract<AgentMessage, { role: "user" }> = {
...lastUser,
content: mergedContent,
// Take timestamp from the newer message
...(currentMsg.timestamp && { timestamp: currentMsg.timestamp }),
};
// Replace the last message with merged version
result[result.length - 1] = merged;
continue;
}
}
// Not a consecutive duplicate, add normally
result.push(msg);
lastRole = msgRole;
}
return result;
}
// ── Messaging tool duplicate detection ──────────────────────────────────────
// When the agent uses a messaging tool (telegram, discord, slack, message, sessions_send)
// to send a message, we track the text so we can suppress duplicate block replies.

View File

@@ -100,6 +100,7 @@ import {
pickFallbackThinkingLevel,
sanitizeGoogleTurnOrdering,
sanitizeSessionMessagesImages,
validateAnthropicTurns,
validateGeminiTurns,
} from "./pi-embedded-helpers.js";
import {
@@ -1291,7 +1292,9 @@ export async function compactEmbeddedPiSession(params: {
sessionManager,
sessionId: params.sessionId,
});
const validated = validateGeminiTurns(prior);
// Validate turn ordering for both Gemini (consecutive assistant) and Anthropic (consecutive user)
const validatedGemini = validateGeminiTurns(prior);
const validated = validateAnthropicTurns(validatedGemini);
const limited = limitHistoryTurns(
validated,
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
@@ -1714,7 +1717,9 @@ export async function runEmbeddedPiAgent(params: {
sessionManager,
sessionId: params.sessionId,
});
const validated = validateGeminiTurns(prior);
// Validate turn ordering for both Gemini (consecutive assistant) and Anthropic (consecutive user)
const validatedGemini = validateGeminiTurns(prior);
const validated = validateAnthropicTurns(validatedGemini);
const limited = limitHistoryTurns(
validated,
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),