feat: add models scan and fallbacks
This commit is contained in:
@@ -5,6 +5,7 @@ import {
|
||||
queueEmbeddedPiMessage,
|
||||
runEmbeddedPiAgent,
|
||||
} from "../../agents/pi-embedded.js";
|
||||
import { runWithModelFallback } from "../../agents/model-fallback.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
type SessionEntry,
|
||||
@@ -170,131 +171,154 @@ export async function runReplyAgent(params: {
|
||||
registerAgentRunContext(runId, { sessionKey });
|
||||
}
|
||||
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
|
||||
let fallbackProvider = followupRun.run.provider;
|
||||
let fallbackModel = followupRun.run.model;
|
||||
try {
|
||||
runResult = await runEmbeddedPiAgent({
|
||||
sessionId: followupRun.run.sessionId,
|
||||
sessionKey,
|
||||
surface: sessionCtx.Surface?.trim().toLowerCase() || undefined,
|
||||
sessionFile: followupRun.run.sessionFile,
|
||||
workspaceDir: followupRun.run.workspaceDir,
|
||||
config: followupRun.run.config,
|
||||
skillsSnapshot: followupRun.run.skillsSnapshot,
|
||||
prompt: commandBody,
|
||||
extraSystemPrompt: followupRun.run.extraSystemPrompt,
|
||||
ownerNumbers: followupRun.run.ownerNumbers,
|
||||
enforceFinalTag: followupRun.run.enforceFinalTag,
|
||||
const fallbackResult = await runWithModelFallback({
|
||||
cfg: followupRun.run.config,
|
||||
provider: followupRun.run.provider,
|
||||
model: followupRun.run.model,
|
||||
thinkLevel: followupRun.run.thinkLevel,
|
||||
verboseLevel: followupRun.run.verboseLevel,
|
||||
bashElevated: followupRun.run.bashElevated,
|
||||
timeoutMs: followupRun.run.timeoutMs,
|
||||
runId,
|
||||
blockReplyBreak: resolvedBlockStreamingBreak,
|
||||
blockReplyChunking,
|
||||
onPartialReply: opts?.onPartialReply
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, { mode: "message" });
|
||||
if (stripped.didStrip && !didLogHeartbeatStrip) {
|
||||
didLogHeartbeatStrip = true;
|
||||
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
|
||||
}
|
||||
if (
|
||||
stripped.shouldSkip &&
|
||||
(payload.mediaUrls?.length ?? 0) === 0
|
||||
) {
|
||||
return;
|
||||
}
|
||||
text = stripped.text;
|
||||
}
|
||||
await typing.startTypingOnText(text);
|
||||
await opts.onPartialReply?.({
|
||||
text,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
});
|
||||
}
|
||||
: undefined,
|
||||
onBlockReply:
|
||||
blockStreamingEnabled && opts?.onBlockReply
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, {
|
||||
mode: "message",
|
||||
});
|
||||
if (stripped.didStrip && !didLogHeartbeatStrip) {
|
||||
didLogHeartbeatStrip = true;
|
||||
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
|
||||
run: (provider, model) =>
|
||||
runEmbeddedPiAgent({
|
||||
sessionId: followupRun.run.sessionId,
|
||||
sessionKey,
|
||||
surface: sessionCtx.Surface?.trim().toLowerCase() || undefined,
|
||||
sessionFile: followupRun.run.sessionFile,
|
||||
workspaceDir: followupRun.run.workspaceDir,
|
||||
config: followupRun.run.config,
|
||||
skillsSnapshot: followupRun.run.skillsSnapshot,
|
||||
prompt: commandBody,
|
||||
extraSystemPrompt: followupRun.run.extraSystemPrompt,
|
||||
ownerNumbers: followupRun.run.ownerNumbers,
|
||||
enforceFinalTag: followupRun.run.enforceFinalTag,
|
||||
provider,
|
||||
model,
|
||||
thinkLevel: followupRun.run.thinkLevel,
|
||||
verboseLevel: followupRun.run.verboseLevel,
|
||||
bashElevated: followupRun.run.bashElevated,
|
||||
timeoutMs: followupRun.run.timeoutMs,
|
||||
runId,
|
||||
blockReplyBreak: resolvedBlockStreamingBreak,
|
||||
blockReplyChunking,
|
||||
onPartialReply: opts?.onPartialReply
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, {
|
||||
mode: "message",
|
||||
});
|
||||
if (stripped.didStrip && !didLogHeartbeatStrip) {
|
||||
didLogHeartbeatStrip = true;
|
||||
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
|
||||
}
|
||||
if (
|
||||
stripped.shouldSkip &&
|
||||
(payload.mediaUrls?.length ?? 0) === 0
|
||||
) {
|
||||
return;
|
||||
}
|
||||
text = stripped.text;
|
||||
}
|
||||
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
|
||||
if (stripped.shouldSkip && !hasMedia) return;
|
||||
text = stripped.text;
|
||||
}
|
||||
const tagResult = extractReplyToTag(
|
||||
text,
|
||||
sessionCtx.MessageSid,
|
||||
);
|
||||
const cleaned = tagResult.cleaned || undefined;
|
||||
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
|
||||
if (!cleaned && !hasMedia) return;
|
||||
if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia) return;
|
||||
const blockPayload: ReplyPayload = {
|
||||
text: cleaned,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
mediaUrl: payload.mediaUrls?.[0],
|
||||
replyToId: tagResult.replyToId,
|
||||
};
|
||||
const payloadKey = buildPayloadKey(blockPayload);
|
||||
if (
|
||||
streamedPayloadKeys.has(payloadKey) ||
|
||||
pendingStreamedPayloadKeys.has(payloadKey)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
pendingStreamedPayloadKeys.add(payloadKey);
|
||||
const task = (async () => {
|
||||
await typing.startTypingOnText(cleaned);
|
||||
await opts.onBlockReply?.(blockPayload);
|
||||
})()
|
||||
.then(() => {
|
||||
streamedPayloadKeys.add(payloadKey);
|
||||
didStreamBlockReply = true;
|
||||
})
|
||||
.catch((err) => {
|
||||
logVerbose(`block reply delivery failed: ${String(err)}`);
|
||||
})
|
||||
.finally(() => {
|
||||
pendingStreamedPayloadKeys.delete(payloadKey);
|
||||
await typing.startTypingOnText(text);
|
||||
await opts.onPartialReply?.({
|
||||
text,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
});
|
||||
pendingBlockTasks.add(task);
|
||||
void task.finally(() => pendingBlockTasks.delete(task));
|
||||
}
|
||||
: undefined,
|
||||
shouldEmitToolResult,
|
||||
onToolResult: opts?.onToolResult
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, { mode: "message" });
|
||||
if (stripped.didStrip && !didLogHeartbeatStrip) {
|
||||
didLogHeartbeatStrip = true;
|
||||
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
|
||||
}
|
||||
if (
|
||||
stripped.shouldSkip &&
|
||||
(payload.mediaUrls?.length ?? 0) === 0
|
||||
) {
|
||||
return;
|
||||
: undefined,
|
||||
onBlockReply:
|
||||
blockStreamingEnabled && opts?.onBlockReply
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, {
|
||||
mode: "message",
|
||||
});
|
||||
if (stripped.didStrip && !didLogHeartbeatStrip) {
|
||||
didLogHeartbeatStrip = true;
|
||||
logVerbose(
|
||||
"Stripped stray HEARTBEAT_OK token from reply",
|
||||
);
|
||||
}
|
||||
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
|
||||
if (stripped.shouldSkip && !hasMedia) return;
|
||||
text = stripped.text;
|
||||
}
|
||||
const tagResult = extractReplyToTag(
|
||||
text,
|
||||
sessionCtx.MessageSid,
|
||||
);
|
||||
const cleaned = tagResult.cleaned || undefined;
|
||||
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
|
||||
if (!cleaned && !hasMedia) return;
|
||||
if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia)
|
||||
return;
|
||||
const blockPayload: ReplyPayload = {
|
||||
text: cleaned,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
mediaUrl: payload.mediaUrls?.[0],
|
||||
replyToId: tagResult.replyToId,
|
||||
};
|
||||
const payloadKey = buildPayloadKey(blockPayload);
|
||||
if (
|
||||
streamedPayloadKeys.has(payloadKey) ||
|
||||
pendingStreamedPayloadKeys.has(payloadKey)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
pendingStreamedPayloadKeys.add(payloadKey);
|
||||
const task = (async () => {
|
||||
await typing.startTypingOnText(cleaned);
|
||||
await opts.onBlockReply?.(blockPayload);
|
||||
})()
|
||||
.then(() => {
|
||||
streamedPayloadKeys.add(payloadKey);
|
||||
didStreamBlockReply = true;
|
||||
})
|
||||
.catch((err) => {
|
||||
logVerbose(
|
||||
`block reply delivery failed: ${String(err)}`,
|
||||
);
|
||||
})
|
||||
.finally(() => {
|
||||
pendingStreamedPayloadKeys.delete(payloadKey);
|
||||
});
|
||||
pendingBlockTasks.add(task);
|
||||
void task.finally(() => pendingBlockTasks.delete(task));
|
||||
}
|
||||
: undefined,
|
||||
shouldEmitToolResult,
|
||||
onToolResult: opts?.onToolResult
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, {
|
||||
mode: "message",
|
||||
});
|
||||
if (stripped.didStrip && !didLogHeartbeatStrip) {
|
||||
didLogHeartbeatStrip = true;
|
||||
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
|
||||
}
|
||||
if (
|
||||
stripped.shouldSkip &&
|
||||
(payload.mediaUrls?.length ?? 0) === 0
|
||||
) {
|
||||
return;
|
||||
}
|
||||
text = stripped.text;
|
||||
}
|
||||
await typing.startTypingOnText(text);
|
||||
await opts.onToolResult?.({
|
||||
text,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
});
|
||||
}
|
||||
text = stripped.text;
|
||||
}
|
||||
await typing.startTypingOnText(text);
|
||||
await opts.onToolResult?.({ text, mediaUrls: payload.mediaUrls });
|
||||
}
|
||||
: undefined,
|
||||
: undefined,
|
||||
}),
|
||||
});
|
||||
runResult = fallbackResult.result;
|
||||
fallbackProvider = fallbackResult.provider;
|
||||
fallbackModel = fallbackResult.model;
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
const isContextOverflow =
|
||||
@@ -388,7 +412,12 @@ export async function runReplyAgent(params: {
|
||||
|
||||
if (sessionStore && sessionKey) {
|
||||
const usage = runResult.meta.agentMeta?.usage;
|
||||
const modelUsed = runResult.meta.agentMeta?.model ?? defaultModel;
|
||||
const modelUsed =
|
||||
runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
|
||||
const providerUsed =
|
||||
runResult.meta.agentMeta?.provider ??
|
||||
fallbackProvider ??
|
||||
followupRun.run.provider;
|
||||
const contextTokensUsed =
|
||||
agentCfgContextTokens ??
|
||||
lookupContextTokens(modelUsed) ??
|
||||
@@ -408,6 +437,7 @@ export async function runReplyAgent(params: {
|
||||
outputTokens: output,
|
||||
totalTokens:
|
||||
promptTokens > 0 ? promptTokens : (usage.total ?? input),
|
||||
modelProvider: providerUsed,
|
||||
model: modelUsed,
|
||||
contextTokens: contextTokensUsed ?? entry.contextTokens,
|
||||
updatedAt: Date.now(),
|
||||
@@ -422,6 +452,7 @@ export async function runReplyAgent(params: {
|
||||
if (entry) {
|
||||
sessionStore[sessionKey] = {
|
||||
...entry,
|
||||
modelProvider: providerUsed ?? entry.modelProvider,
|
||||
model: modelUsed ?? entry.model,
|
||||
contextTokens: contextTokensUsed ?? entry.contextTokens,
|
||||
};
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import crypto from "node:crypto";
|
||||
import { lookupContextTokens } from "../../agents/context.js";
|
||||
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
|
||||
import { runWithModelFallback } from "../../agents/model-fallback.js";
|
||||
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
||||
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
@@ -61,28 +62,39 @@ export function createFollowupRunner(params: {
|
||||
registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey });
|
||||
}
|
||||
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
|
||||
let fallbackProvider = queued.run.provider;
|
||||
let fallbackModel = queued.run.model;
|
||||
try {
|
||||
runResult = await runEmbeddedPiAgent({
|
||||
sessionId: queued.run.sessionId,
|
||||
sessionKey: queued.run.sessionKey,
|
||||
surface: queued.run.surface,
|
||||
sessionFile: queued.run.sessionFile,
|
||||
workspaceDir: queued.run.workspaceDir,
|
||||
config: queued.run.config,
|
||||
skillsSnapshot: queued.run.skillsSnapshot,
|
||||
prompt: queued.prompt,
|
||||
extraSystemPrompt: queued.run.extraSystemPrompt,
|
||||
ownerNumbers: queued.run.ownerNumbers,
|
||||
enforceFinalTag: queued.run.enforceFinalTag,
|
||||
const fallbackResult = await runWithModelFallback({
|
||||
cfg: queued.run.config,
|
||||
provider: queued.run.provider,
|
||||
model: queued.run.model,
|
||||
thinkLevel: queued.run.thinkLevel,
|
||||
verboseLevel: queued.run.verboseLevel,
|
||||
bashElevated: queued.run.bashElevated,
|
||||
timeoutMs: queued.run.timeoutMs,
|
||||
runId,
|
||||
blockReplyBreak: queued.run.blockReplyBreak,
|
||||
run: (provider, model) =>
|
||||
runEmbeddedPiAgent({
|
||||
sessionId: queued.run.sessionId,
|
||||
sessionKey: queued.run.sessionKey,
|
||||
surface: queued.run.surface,
|
||||
sessionFile: queued.run.sessionFile,
|
||||
workspaceDir: queued.run.workspaceDir,
|
||||
config: queued.run.config,
|
||||
skillsSnapshot: queued.run.skillsSnapshot,
|
||||
prompt: queued.prompt,
|
||||
extraSystemPrompt: queued.run.extraSystemPrompt,
|
||||
ownerNumbers: queued.run.ownerNumbers,
|
||||
enforceFinalTag: queued.run.enforceFinalTag,
|
||||
provider,
|
||||
model,
|
||||
thinkLevel: queued.run.thinkLevel,
|
||||
verboseLevel: queued.run.verboseLevel,
|
||||
bashElevated: queued.run.bashElevated,
|
||||
timeoutMs: queued.run.timeoutMs,
|
||||
runId,
|
||||
blockReplyBreak: queued.run.blockReplyBreak,
|
||||
}),
|
||||
});
|
||||
runResult = fallbackResult.result;
|
||||
fallbackProvider = fallbackResult.provider;
|
||||
fallbackModel = fallbackResult.model;
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
defaultRuntime.error?.(`Followup agent failed before reply: ${message}`);
|
||||
@@ -121,7 +133,8 @@ export function createFollowupRunner(params: {
|
||||
|
||||
if (sessionStore && sessionKey) {
|
||||
const usage = runResult.meta.agentMeta?.usage;
|
||||
const modelUsed = runResult.meta.agentMeta?.model ?? defaultModel;
|
||||
const modelUsed =
|
||||
runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
|
||||
const contextTokensUsed =
|
||||
agentCfgContextTokens ??
|
||||
lookupContextTokens(modelUsed) ??
|
||||
@@ -141,6 +154,7 @@ export function createFollowupRunner(params: {
|
||||
outputTokens: output,
|
||||
totalTokens:
|
||||
promptTokens > 0 ? promptTokens : (usage.total ?? input),
|
||||
modelProvider: fallbackProvider ?? entry.modelProvider,
|
||||
model: modelUsed,
|
||||
contextTokens: contextTokensUsed ?? entry.contextTokens,
|
||||
updatedAt: Date.now(),
|
||||
@@ -154,6 +168,7 @@ export function createFollowupRunner(params: {
|
||||
if (entry) {
|
||||
sessionStore[sessionKey] = {
|
||||
...entry,
|
||||
modelProvider: fallbackProvider ?? entry.modelProvider,
|
||||
model: modelUsed ?? entry.model,
|
||||
contextTokens: contextTokensUsed ?? entry.contextTokens,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user