fix(auth): lock auth profile updates
This commit is contained in:
@@ -16,6 +16,16 @@ import { resolveClawdbotAgentDir } from "./agent-paths.js";
|
||||
const AUTH_STORE_VERSION = 1;
|
||||
const AUTH_PROFILE_FILENAME = "auth-profiles.json";
|
||||
const LEGACY_AUTH_FILENAME = "auth.json";
|
||||
const AUTH_STORE_LOCK_OPTIONS = {
|
||||
retries: {
|
||||
retries: 10,
|
||||
factor: 2,
|
||||
minTimeout: 100,
|
||||
maxTimeout: 10_000,
|
||||
randomize: true,
|
||||
},
|
||||
stale: 30_000,
|
||||
} as const;
|
||||
|
||||
export type ApiKeyCredential = {
|
||||
type: "api_key";
|
||||
@@ -87,6 +97,49 @@ function ensureAuthStoreFile(pathname: string) {
|
||||
saveJsonFile(pathname, payload);
|
||||
}
|
||||
|
||||
function syncAuthProfileStore(
|
||||
target: AuthProfileStore,
|
||||
source: AuthProfileStore,
|
||||
): void {
|
||||
target.version = source.version;
|
||||
target.profiles = source.profiles;
|
||||
target.lastGood = source.lastGood;
|
||||
target.usageStats = source.usageStats;
|
||||
}
|
||||
|
||||
function updateAuthProfileStoreWithLock(params: {
|
||||
agentDir?: string;
|
||||
updater: (store: AuthProfileStore) => boolean;
|
||||
}): AuthProfileStore | null {
|
||||
const authPath = resolveAuthStorePath(params.agentDir);
|
||||
ensureAuthStoreFile(authPath);
|
||||
|
||||
let release: (() => void) | undefined;
|
||||
try {
|
||||
release = lockfile.lockSync(authPath, AUTH_STORE_LOCK_OPTIONS);
|
||||
const store = ensureAuthProfileStore(params.agentDir);
|
||||
const shouldSave = params.updater(store);
|
||||
if (shouldSave) {
|
||||
saveAuthProfileStore(store, params.agentDir);
|
||||
}
|
||||
return store;
|
||||
} catch {
|
||||
return null;
|
||||
} finally {
|
||||
if (release) {
|
||||
try {
|
||||
release();
|
||||
} catch {
|
||||
try {
|
||||
lockfile.unlockSync(authPath);
|
||||
} catch {
|
||||
// ignore unlock errors
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function buildOAuthApiKey(
|
||||
provider: OAuthProvider,
|
||||
credentials: OAuthCredentials,
|
||||
@@ -112,14 +165,7 @@ async function refreshOAuthTokenWithLock(params: {
|
||||
let release: (() => Promise<void>) | undefined;
|
||||
try {
|
||||
release = await lockfile.lock(authPath, {
|
||||
retries: {
|
||||
retries: 10,
|
||||
factor: 2,
|
||||
minTimeout: 100,
|
||||
maxTimeout: 10_000,
|
||||
randomize: true,
|
||||
},
|
||||
stale: 30_000,
|
||||
...AUTH_STORE_LOCK_OPTIONS,
|
||||
});
|
||||
|
||||
const store = ensureAuthProfileStore(params.agentDir);
|
||||
@@ -355,26 +401,42 @@ export function isProfileInCooldown(
|
||||
|
||||
/**
|
||||
* Mark a profile as successfully used. Resets error count and updates lastUsed.
|
||||
* Re-reads the store from disk to avoid overwriting concurrent updates.
|
||||
* Uses store lock to avoid overwriting concurrent usage updates.
|
||||
*/
|
||||
export function markAuthProfileUsed(params: {
|
||||
store: AuthProfileStore;
|
||||
profileId: string;
|
||||
agentDir?: string;
|
||||
}): void {
|
||||
const { profileId, agentDir } = params;
|
||||
// Re-read from disk to get fresh usageStats from other sessions
|
||||
const freshStore = ensureAuthProfileStore(agentDir);
|
||||
if (!freshStore.profiles[profileId]) return;
|
||||
const { store, profileId, agentDir } = params;
|
||||
const updated = updateAuthProfileStoreWithLock({
|
||||
agentDir,
|
||||
updater: (freshStore) => {
|
||||
if (!freshStore.profiles[profileId]) return false;
|
||||
freshStore.usageStats = freshStore.usageStats ?? {};
|
||||
freshStore.usageStats[profileId] = {
|
||||
...freshStore.usageStats[profileId],
|
||||
lastUsed: Date.now(),
|
||||
errorCount: 0,
|
||||
cooldownUntil: undefined,
|
||||
};
|
||||
return true;
|
||||
},
|
||||
});
|
||||
if (updated) {
|
||||
syncAuthProfileStore(store, updated);
|
||||
return;
|
||||
}
|
||||
if (!store.profiles[profileId]) return;
|
||||
|
||||
freshStore.usageStats = freshStore.usageStats ?? {};
|
||||
freshStore.usageStats[profileId] = {
|
||||
...freshStore.usageStats[profileId],
|
||||
store.usageStats = store.usageStats ?? {};
|
||||
store.usageStats[profileId] = {
|
||||
...store.usageStats[profileId],
|
||||
lastUsed: Date.now(),
|
||||
errorCount: 0,
|
||||
cooldownUntil: undefined,
|
||||
};
|
||||
saveAuthProfileStore(freshStore, agentDir);
|
||||
saveAuthProfileStore(store, agentDir);
|
||||
}
|
||||
|
||||
export function calculateAuthProfileCooldownMs(errorCount: number): number {
|
||||
@@ -388,53 +450,90 @@ export function calculateAuthProfileCooldownMs(errorCount: number): number {
|
||||
/**
|
||||
* Mark a profile as failed/rate-limited. Applies exponential backoff cooldown.
|
||||
* Cooldown times: 1min, 5min, 25min, max 1 hour.
|
||||
* Re-reads the store from disk to avoid overwriting concurrent updates.
|
||||
* Uses store lock to avoid overwriting concurrent usage updates.
|
||||
*/
|
||||
export function markAuthProfileCooldown(params: {
|
||||
store: AuthProfileStore;
|
||||
profileId: string;
|
||||
agentDir?: string;
|
||||
}): void {
|
||||
const { profileId, agentDir } = params;
|
||||
// Re-read from disk to get fresh usageStats from other sessions
|
||||
const freshStore = ensureAuthProfileStore(agentDir);
|
||||
if (!freshStore.profiles[profileId]) return;
|
||||
const { store, profileId, agentDir } = params;
|
||||
const updated = updateAuthProfileStoreWithLock({
|
||||
agentDir,
|
||||
updater: (freshStore) => {
|
||||
if (!freshStore.profiles[profileId]) return false;
|
||||
|
||||
freshStore.usageStats = freshStore.usageStats ?? {};
|
||||
const existing = freshStore.usageStats[profileId] ?? {};
|
||||
freshStore.usageStats = freshStore.usageStats ?? {};
|
||||
const existing = freshStore.usageStats[profileId] ?? {};
|
||||
const errorCount = (existing.errorCount ?? 0) + 1;
|
||||
|
||||
// Exponential backoff: 1min, 5min, 25min, capped at 1h
|
||||
const backoffMs = calculateAuthProfileCooldownMs(errorCount);
|
||||
|
||||
freshStore.usageStats[profileId] = {
|
||||
...existing,
|
||||
errorCount,
|
||||
cooldownUntil: Date.now() + backoffMs,
|
||||
};
|
||||
return true;
|
||||
},
|
||||
});
|
||||
if (updated) {
|
||||
syncAuthProfileStore(store, updated);
|
||||
return;
|
||||
}
|
||||
if (!store.profiles[profileId]) return;
|
||||
|
||||
store.usageStats = store.usageStats ?? {};
|
||||
const existing = store.usageStats[profileId] ?? {};
|
||||
const errorCount = (existing.errorCount ?? 0) + 1;
|
||||
|
||||
// Exponential backoff: 1min, 5min, 25min, capped at 1h
|
||||
const backoffMs = calculateAuthProfileCooldownMs(errorCount);
|
||||
|
||||
freshStore.usageStats[profileId] = {
|
||||
store.usageStats[profileId] = {
|
||||
...existing,
|
||||
errorCount,
|
||||
cooldownUntil: Date.now() + backoffMs,
|
||||
};
|
||||
saveAuthProfileStore(freshStore, agentDir);
|
||||
saveAuthProfileStore(store, agentDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear cooldown for a profile (e.g., manual reset).
|
||||
* Re-reads the store from disk to avoid overwriting concurrent updates.
|
||||
* Uses store lock to avoid overwriting concurrent usage updates.
|
||||
*/
|
||||
export function clearAuthProfileCooldown(params: {
|
||||
store: AuthProfileStore;
|
||||
profileId: string;
|
||||
agentDir?: string;
|
||||
}): void {
|
||||
const { profileId, agentDir } = params;
|
||||
// Re-read from disk to get fresh usageStats from other sessions
|
||||
const freshStore = ensureAuthProfileStore(agentDir);
|
||||
if (!freshStore.usageStats?.[profileId]) return;
|
||||
const { store, profileId, agentDir } = params;
|
||||
const updated = updateAuthProfileStoreWithLock({
|
||||
agentDir,
|
||||
updater: (freshStore) => {
|
||||
if (!freshStore.usageStats?.[profileId]) return false;
|
||||
|
||||
freshStore.usageStats[profileId] = {
|
||||
...freshStore.usageStats[profileId],
|
||||
freshStore.usageStats[profileId] = {
|
||||
...freshStore.usageStats[profileId],
|
||||
errorCount: 0,
|
||||
cooldownUntil: undefined,
|
||||
};
|
||||
return true;
|
||||
},
|
||||
});
|
||||
if (updated) {
|
||||
syncAuthProfileStore(store, updated);
|
||||
return;
|
||||
}
|
||||
if (!store.usageStats?.[profileId]) return;
|
||||
|
||||
store.usageStats[profileId] = {
|
||||
...store.usageStats[profileId],
|
||||
errorCount: 0,
|
||||
cooldownUntil: undefined,
|
||||
};
|
||||
saveAuthProfileStore(freshStore, agentDir);
|
||||
saveAuthProfileStore(store, agentDir);
|
||||
}
|
||||
|
||||
export function resolveAuthProfileOrder(params: {
|
||||
@@ -600,13 +699,24 @@ export function markAuthProfileGood(params: {
|
||||
profileId: string;
|
||||
agentDir?: string;
|
||||
}): void {
|
||||
const { provider, profileId, agentDir } = params;
|
||||
// Re-read from disk to avoid overwriting concurrent updates
|
||||
const freshStore = ensureAuthProfileStore(agentDir);
|
||||
const profile = freshStore.profiles[profileId];
|
||||
const { store, provider, profileId, agentDir } = params;
|
||||
const updated = updateAuthProfileStoreWithLock({
|
||||
agentDir,
|
||||
updater: (freshStore) => {
|
||||
const profile = freshStore.profiles[profileId];
|
||||
if (!profile || profile.provider !== provider) return false;
|
||||
freshStore.lastGood = { ...freshStore.lastGood, [provider]: profileId };
|
||||
return true;
|
||||
},
|
||||
});
|
||||
if (updated) {
|
||||
syncAuthProfileStore(store, updated);
|
||||
return;
|
||||
}
|
||||
const profile = store.profiles[profileId];
|
||||
if (!profile || profile.provider !== provider) return;
|
||||
freshStore.lastGood = { ...freshStore.lastGood, [provider]: profileId };
|
||||
saveAuthProfileStore(freshStore, agentDir);
|
||||
store.lastGood = { ...store.lastGood, [provider]: profileId };
|
||||
saveAuthProfileStore(store, agentDir);
|
||||
}
|
||||
|
||||
export function resolveAuthStorePathForDisplay(): string {
|
||||
|
||||
@@ -133,6 +133,17 @@ type EmbeddedRunWaiter = {
|
||||
};
|
||||
const EMBEDDED_RUN_WAITERS = new Map<string, Set<EmbeddedRunWaiter>>();
|
||||
|
||||
const isAbortError = (err: unknown): boolean => {
|
||||
if (!err || typeof err !== "object") return false;
|
||||
const name = "name" in err ? String(err.name) : "";
|
||||
if (name === "AbortError") return true;
|
||||
const message =
|
||||
"message" in err && typeof err.message === "string"
|
||||
? err.message.toLowerCase()
|
||||
: "";
|
||||
return message.includes("aborted");
|
||||
};
|
||||
|
||||
type EmbeddedSandboxInfo = {
|
||||
enabled: boolean;
|
||||
workspaceDir?: string;
|
||||
@@ -913,7 +924,11 @@ export async function runEmbeddedPiAgent(params: {
|
||||
await waitForCompactionRetry();
|
||||
} catch (err) {
|
||||
// Capture AbortError from waitForCompactionRetry to enable fallback/rotation
|
||||
if (!promptError) promptError = err;
|
||||
if (isAbortError(err)) {
|
||||
if (!promptError) promptError = err;
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
messagesSnapshot = session.messages.slice();
|
||||
sessionIdUsed = session.sessionId;
|
||||
|
||||
Reference in New Issue
Block a user