diff --git a/CHANGELOG.md b/CHANGELOG.md index 13dd783b7..224f73d2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,7 @@ - Control UI: avoid Slack config ReferenceError by reading slack config snapshots. Thanks @sreekaransrinath for PR #249. - Auth: rotate across multiple OAuth profiles with cooldown tracking and email-based profile IDs. Thanks @mukhtharcm for PR #269. - Auth: fix multi-account OAuth rotation so round-robin alternates instead of pinning to lastGood. Thanks @mukhtharcm for PR #281. +- Auth: lock auth profile usage updates and fail fast on 429s during rotation. Thanks @mukhtharcm for PR #342. - Configure: stop auto-writing `auth.order` for newly added auth profiles (round-robin default unless explicitly pinned). - Telegram: honor routing.groupChat.mentionPatterns for group mention gating. Thanks Kevin Kern (@regenrek) for PR #242. - Telegram: gate groups via `telegram.groups` allowlist (align with WhatsApp/iMessage). Thanks @kitze for PR #241. diff --git a/src/agents/auth-profiles.ts b/src/agents/auth-profiles.ts index 07b8f44ba..8b746707a 100644 --- a/src/agents/auth-profiles.ts +++ b/src/agents/auth-profiles.ts @@ -16,6 +16,16 @@ import { resolveClawdbotAgentDir } from "./agent-paths.js"; const AUTH_STORE_VERSION = 1; const AUTH_PROFILE_FILENAME = "auth-profiles.json"; const LEGACY_AUTH_FILENAME = "auth.json"; +const AUTH_STORE_LOCK_OPTIONS = { + retries: { + retries: 10, + factor: 2, + minTimeout: 100, + maxTimeout: 10_000, + randomize: true, + }, + stale: 30_000, +} as const; export type ApiKeyCredential = { type: "api_key"; @@ -87,6 +97,49 @@ function ensureAuthStoreFile(pathname: string) { saveJsonFile(pathname, payload); } +function syncAuthProfileStore( + target: AuthProfileStore, + source: AuthProfileStore, +): void { + target.version = source.version; + target.profiles = source.profiles; + target.lastGood = source.lastGood; + target.usageStats = source.usageStats; +} + +function updateAuthProfileStoreWithLock(params: { + agentDir?: string; + updater: (store: AuthProfileStore) => boolean; +}): AuthProfileStore | null { + const authPath = resolveAuthStorePath(params.agentDir); + ensureAuthStoreFile(authPath); + + let release: (() => void) | undefined; + try { + release = lockfile.lockSync(authPath, AUTH_STORE_LOCK_OPTIONS); + const store = ensureAuthProfileStore(params.agentDir); + const shouldSave = params.updater(store); + if (shouldSave) { + saveAuthProfileStore(store, params.agentDir); + } + return store; + } catch { + return null; + } finally { + if (release) { + try { + release(); + } catch { + try { + lockfile.unlockSync(authPath); + } catch { + // ignore unlock errors + } + } + } + } +} + function buildOAuthApiKey( provider: OAuthProvider, credentials: OAuthCredentials, @@ -112,14 +165,7 @@ async function refreshOAuthTokenWithLock(params: { let release: (() => Promise) | undefined; try { release = await lockfile.lock(authPath, { - retries: { - retries: 10, - factor: 2, - minTimeout: 100, - maxTimeout: 10_000, - randomize: true, - }, - stale: 30_000, + ...AUTH_STORE_LOCK_OPTIONS, }); const store = ensureAuthProfileStore(params.agentDir); @@ -355,26 +401,42 @@ export function isProfileInCooldown( /** * Mark a profile as successfully used. Resets error count and updates lastUsed. - * Re-reads the store from disk to avoid overwriting concurrent updates. + * Uses store lock to avoid overwriting concurrent usage updates. */ export function markAuthProfileUsed(params: { store: AuthProfileStore; profileId: string; agentDir?: string; }): void { - const { profileId, agentDir } = params; - // Re-read from disk to get fresh usageStats from other sessions - const freshStore = ensureAuthProfileStore(agentDir); - if (!freshStore.profiles[profileId]) return; + const { store, profileId, agentDir } = params; + const updated = updateAuthProfileStoreWithLock({ + agentDir, + updater: (freshStore) => { + if (!freshStore.profiles[profileId]) return false; + freshStore.usageStats = freshStore.usageStats ?? {}; + freshStore.usageStats[profileId] = { + ...freshStore.usageStats[profileId], + lastUsed: Date.now(), + errorCount: 0, + cooldownUntil: undefined, + }; + return true; + }, + }); + if (updated) { + syncAuthProfileStore(store, updated); + return; + } + if (!store.profiles[profileId]) return; - freshStore.usageStats = freshStore.usageStats ?? {}; - freshStore.usageStats[profileId] = { - ...freshStore.usageStats[profileId], + store.usageStats = store.usageStats ?? {}; + store.usageStats[profileId] = { + ...store.usageStats[profileId], lastUsed: Date.now(), errorCount: 0, cooldownUntil: undefined, }; - saveAuthProfileStore(freshStore, agentDir); + saveAuthProfileStore(store, agentDir); } export function calculateAuthProfileCooldownMs(errorCount: number): number { @@ -388,53 +450,90 @@ export function calculateAuthProfileCooldownMs(errorCount: number): number { /** * Mark a profile as failed/rate-limited. Applies exponential backoff cooldown. * Cooldown times: 1min, 5min, 25min, max 1 hour. - * Re-reads the store from disk to avoid overwriting concurrent updates. + * Uses store lock to avoid overwriting concurrent usage updates. */ export function markAuthProfileCooldown(params: { store: AuthProfileStore; profileId: string; agentDir?: string; }): void { - const { profileId, agentDir } = params; - // Re-read from disk to get fresh usageStats from other sessions - const freshStore = ensureAuthProfileStore(agentDir); - if (!freshStore.profiles[profileId]) return; + const { store, profileId, agentDir } = params; + const updated = updateAuthProfileStoreWithLock({ + agentDir, + updater: (freshStore) => { + if (!freshStore.profiles[profileId]) return false; - freshStore.usageStats = freshStore.usageStats ?? {}; - const existing = freshStore.usageStats[profileId] ?? {}; + freshStore.usageStats = freshStore.usageStats ?? {}; + const existing = freshStore.usageStats[profileId] ?? {}; + const errorCount = (existing.errorCount ?? 0) + 1; + + // Exponential backoff: 1min, 5min, 25min, capped at 1h + const backoffMs = calculateAuthProfileCooldownMs(errorCount); + + freshStore.usageStats[profileId] = { + ...existing, + errorCount, + cooldownUntil: Date.now() + backoffMs, + }; + return true; + }, + }); + if (updated) { + syncAuthProfileStore(store, updated); + return; + } + if (!store.profiles[profileId]) return; + + store.usageStats = store.usageStats ?? {}; + const existing = store.usageStats[profileId] ?? {}; const errorCount = (existing.errorCount ?? 0) + 1; // Exponential backoff: 1min, 5min, 25min, capped at 1h const backoffMs = calculateAuthProfileCooldownMs(errorCount); - freshStore.usageStats[profileId] = { + store.usageStats[profileId] = { ...existing, errorCount, cooldownUntil: Date.now() + backoffMs, }; - saveAuthProfileStore(freshStore, agentDir); + saveAuthProfileStore(store, agentDir); } /** * Clear cooldown for a profile (e.g., manual reset). - * Re-reads the store from disk to avoid overwriting concurrent updates. + * Uses store lock to avoid overwriting concurrent usage updates. */ export function clearAuthProfileCooldown(params: { store: AuthProfileStore; profileId: string; agentDir?: string; }): void { - const { profileId, agentDir } = params; - // Re-read from disk to get fresh usageStats from other sessions - const freshStore = ensureAuthProfileStore(agentDir); - if (!freshStore.usageStats?.[profileId]) return; + const { store, profileId, agentDir } = params; + const updated = updateAuthProfileStoreWithLock({ + agentDir, + updater: (freshStore) => { + if (!freshStore.usageStats?.[profileId]) return false; - freshStore.usageStats[profileId] = { - ...freshStore.usageStats[profileId], + freshStore.usageStats[profileId] = { + ...freshStore.usageStats[profileId], + errorCount: 0, + cooldownUntil: undefined, + }; + return true; + }, + }); + if (updated) { + syncAuthProfileStore(store, updated); + return; + } + if (!store.usageStats?.[profileId]) return; + + store.usageStats[profileId] = { + ...store.usageStats[profileId], errorCount: 0, cooldownUntil: undefined, }; - saveAuthProfileStore(freshStore, agentDir); + saveAuthProfileStore(store, agentDir); } export function resolveAuthProfileOrder(params: { @@ -600,13 +699,24 @@ export function markAuthProfileGood(params: { profileId: string; agentDir?: string; }): void { - const { provider, profileId, agentDir } = params; - // Re-read from disk to avoid overwriting concurrent updates - const freshStore = ensureAuthProfileStore(agentDir); - const profile = freshStore.profiles[profileId]; + const { store, provider, profileId, agentDir } = params; + const updated = updateAuthProfileStoreWithLock({ + agentDir, + updater: (freshStore) => { + const profile = freshStore.profiles[profileId]; + if (!profile || profile.provider !== provider) return false; + freshStore.lastGood = { ...freshStore.lastGood, [provider]: profileId }; + return true; + }, + }); + if (updated) { + syncAuthProfileStore(store, updated); + return; + } + const profile = store.profiles[profileId]; if (!profile || profile.provider !== provider) return; - freshStore.lastGood = { ...freshStore.lastGood, [provider]: profileId }; - saveAuthProfileStore(freshStore, agentDir); + store.lastGood = { ...store.lastGood, [provider]: profileId }; + saveAuthProfileStore(store, agentDir); } export function resolveAuthStorePathForDisplay(): string { diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 3339a6031..af60309b1 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -133,6 +133,17 @@ type EmbeddedRunWaiter = { }; const EMBEDDED_RUN_WAITERS = new Map>(); +const isAbortError = (err: unknown): boolean => { + if (!err || typeof err !== "object") return false; + const name = "name" in err ? String(err.name) : ""; + if (name === "AbortError") return true; + const message = + "message" in err && typeof err.message === "string" + ? err.message.toLowerCase() + : ""; + return message.includes("aborted"); +}; + type EmbeddedSandboxInfo = { enabled: boolean; workspaceDir?: string; @@ -913,7 +924,11 @@ export async function runEmbeddedPiAgent(params: { await waitForCompactionRetry(); } catch (err) { // Capture AbortError from waitForCompactionRetry to enable fallback/rotation - if (!promptError) promptError = err; + if (isAbortError(err)) { + if (!promptError) promptError = err; + } else { + throw err; + } } messagesSnapshot = session.messages.slice(); sessionIdUsed = session.sessionId;