fix/heartbeat ok delivery filter (#246)

* cron: skip delivery for HEARTBEAT_OK responses

When an isolated cron job has deliver:true, skip message delivery if the
response is just HEARTBEAT_OK (or contains HEARTBEAT_OK at edges with
short remaining content <= 30 chars). This allows cron jobs to silently
ack when nothing to report but still deliver actual content when there
is something meaningful to say.

Media is still delivered even if text is HEARTBEAT_OK, since the
presence of media indicates there's something to share.

* fix(heartbeat): make ack padding configurable

* chore(deps): update to latest

---------

Co-authored-by: Josh Lehman <josh@martian.engineering>
This commit is contained in:
Peter Steinberger
2026-01-05 22:52:13 +00:00
committed by GitHub
parent dae7f560a5
commit f790f3f3ba
17 changed files with 549 additions and 397 deletions

View File

@@ -341,328 +341,336 @@ export async function runEmbeddedPiAgent(params: {
let restoreSkillEnv: (() => void) | undefined;
process.chdir(resolvedWorkspace);
try {
const shouldLoadSkillEntries =
!params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(resolvedWorkspace)
: [];
const skillsSnapshot =
params.skillsSnapshot ??
buildWorkspaceSkillSnapshot(resolvedWorkspace, {
const shouldLoadSkillEntries =
!params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(resolvedWorkspace)
: [];
const skillsSnapshot =
params.skillsSnapshot ??
buildWorkspaceSkillSnapshot(resolvedWorkspace, {
config: params.config,
entries: skillEntries,
});
const sandboxSessionKey =
params.sessionKey?.trim() || params.sessionId;
const sandbox = await resolveSandboxContext({
config: params.config,
entries: skillEntries,
});
const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId;
const sandbox = await resolveSandboxContext({
config: params.config,
sessionKey: sandboxSessionKey,
workspaceDir: resolvedWorkspace,
});
restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot,
config: params.config,
})
: applySkillEnvOverrides({
skills: skillEntries ?? [],
config: params.config,
});
const bootstrapFiles =
await loadWorkspaceBootstrapFiles(resolvedWorkspace);
const contextFiles = buildBootstrapContextFiles(bootstrapFiles);
const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries);
// Tool schemas must be provider-compatible (OpenAI requires top-level `type: "object"`).
// `createClawdbotCodingTools()` normalizes schemas so the session can pass them through unchanged.
const tools = createClawdbotCodingTools({
bash: {
...params.config?.agent?.bash,
elevated: params.bashElevated,
},
sandbox,
surface: params.surface,
sessionKey: params.sessionKey ?? params.sessionId,
config: params.config,
});
const machineName = await getMachineDisplayName();
const runtimeInfo = {
host: machineName,
os: `${os.type()} ${os.release()}`,
arch: os.arch(),
node: process.version,
model: `${provider}/${modelId}`,
};
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox);
const reasoningTagHint = provider === "ollama";
const systemPrompt = buildSystemPrompt({
appendPrompt: buildAgentSystemPromptAppend({
sessionKey: sandboxSessionKey,
workspaceDir: resolvedWorkspace,
defaultThinkLevel: thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
reasoningTagHint,
runtimeInfo,
sandboxInfo,
toolNames: tools.map((tool) => tool.name),
}),
contextFiles,
skills: promptSkills,
cwd: resolvedWorkspace,
tools,
});
});
restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot,
config: params.config,
})
: applySkillEnvOverrides({
skills: skillEntries ?? [],
config: params.config,
});
const sessionManager = SessionManager.open(params.sessionFile);
const settingsManager = SettingsManager.create(
resolvedWorkspace,
agentDir,
);
// Split tools into built-in (recognized by pi-coding-agent SDK) and custom (clawdbot-specific)
const builtInToolNames = new Set(["read", "bash", "edit", "write"]);
const builtInTools = tools.filter((t) => builtInToolNames.has(t.name));
const customTools = toToolDefinitions(
tools.filter((t) => !builtInToolNames.has(t.name)),
);
const { session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage,
modelRegistry,
model,
thinkingLevel,
systemPrompt,
// Built-in tools recognized by pi-coding-agent SDK
tools: builtInTools,
// Custom clawdbot tools (browser, canvas, nodes, cron, etc.)
customTools,
sessionManager,
settingsManager,
skills: promptSkills,
contextFiles,
});
const prior = await sanitizeSessionMessagesImages(
session.messages,
"session:history",
);
if (prior.length > 0) {
session.agent.replaceMessages(prior);
}
let aborted = Boolean(params.abortSignal?.aborted);
const abortRun = () => {
aborted = true;
void session.abort();
};
const queueHandle: EmbeddedPiQueueHandle = {
queueMessage: async (text: string) => {
await session.steer(text);
},
isStreaming: () => session.isStreaming,
abort: abortRun,
};
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
const {
assistantTexts,
toolMetas,
unsubscribe,
waitForCompactionRetry,
} = subscribeEmbeddedPiSession({
session,
runId: params.runId,
verboseLevel: params.verboseLevel,
shouldEmitToolResult: params.shouldEmitToolResult,
onToolResult: params.onToolResult,
onBlockReply: params.onBlockReply,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
});
let abortWarnTimer: NodeJS.Timeout | undefined;
const abortTimer = setTimeout(
() => {
log.warn(
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
);
abortRun();
if (!abortWarnTimer) {
abortWarnTimer = setTimeout(() => {
if (!session.isStreaming) return;
log.warn(
`embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`,
);
}, 10_000);
}
},
Math.max(1, params.timeoutMs),
);
let messagesSnapshot: AgentMessage[] = [];
let sessionIdUsed = session.sessionId;
const onAbort = () => {
abortRun();
};
if (params.abortSignal) {
if (params.abortSignal.aborted) {
onAbort();
} else {
params.abortSignal.addEventListener("abort", onAbort, {
once: true,
});
}
}
let promptError: unknown = null;
try {
const promptStartedAt = Date.now();
log.debug(
`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`,
const bootstrapFiles =
await loadWorkspaceBootstrapFiles(resolvedWorkspace);
const contextFiles = buildBootstrapContextFiles(bootstrapFiles);
const promptSkills = resolvePromptSkills(
skillsSnapshot,
skillEntries,
);
// Tool schemas must be provider-compatible (OpenAI requires top-level `type: "object"`).
// `createClawdbotCodingTools()` normalizes schemas so the session can pass them through unchanged.
const tools = createClawdbotCodingTools({
bash: {
...params.config?.agent?.bash,
elevated: params.bashElevated,
},
sandbox,
surface: params.surface,
sessionKey: params.sessionKey ?? params.sessionId,
config: params.config,
});
const machineName = await getMachineDisplayName();
const runtimeInfo = {
host: machineName,
os: `${os.type()} ${os.release()}`,
arch: os.arch(),
node: process.version,
model: `${provider}/${modelId}`,
};
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox);
const reasoningTagHint = provider === "ollama";
const systemPrompt = buildSystemPrompt({
appendPrompt: buildAgentSystemPromptAppend({
workspaceDir: resolvedWorkspace,
defaultThinkLevel: thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
reasoningTagHint,
runtimeInfo,
sandboxInfo,
toolNames: tools.map((tool) => tool.name),
}),
contextFiles,
skills: promptSkills,
cwd: resolvedWorkspace,
tools,
});
const sessionManager = SessionManager.open(params.sessionFile);
const settingsManager = SettingsManager.create(
resolvedWorkspace,
agentDir,
);
// Split tools into built-in (recognized by pi-coding-agent SDK) and custom (clawdbot-specific)
const builtInToolNames = new Set(["read", "bash", "edit", "write"]);
const builtInTools = tools.filter((t) =>
builtInToolNames.has(t.name),
);
const customTools = toToolDefinitions(
tools.filter((t) => !builtInToolNames.has(t.name)),
);
const { session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage,
modelRegistry,
model,
thinkingLevel,
systemPrompt,
// Built-in tools recognized by pi-coding-agent SDK
tools: builtInTools,
// Custom clawdbot tools (browser, canvas, nodes, cron, etc.)
customTools,
sessionManager,
settingsManager,
skills: promptSkills,
contextFiles,
});
const prior = await sanitizeSessionMessagesImages(
session.messages,
"session:history",
);
if (prior.length > 0) {
session.agent.replaceMessages(prior);
}
let aborted = Boolean(params.abortSignal?.aborted);
const abortRun = () => {
aborted = true;
void session.abort();
};
const queueHandle: EmbeddedPiQueueHandle = {
queueMessage: async (text: string) => {
await session.steer(text);
},
isStreaming: () => session.isStreaming,
abort: abortRun,
};
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
const {
assistantTexts,
toolMetas,
unsubscribe,
waitForCompactionRetry,
} = subscribeEmbeddedPiSession({
session,
runId: params.runId,
verboseLevel: params.verboseLevel,
shouldEmitToolResult: params.shouldEmitToolResult,
onToolResult: params.onToolResult,
onBlockReply: params.onBlockReply,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
});
let abortWarnTimer: NodeJS.Timeout | undefined;
const abortTimer = setTimeout(
() => {
log.warn(
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
);
abortRun();
if (!abortWarnTimer) {
abortWarnTimer = setTimeout(() => {
if (!session.isStreaming) return;
log.warn(
`embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`,
);
}, 10_000);
}
},
Math.max(1, params.timeoutMs),
);
let messagesSnapshot: AgentMessage[] = [];
let sessionIdUsed = session.sessionId;
const onAbort = () => {
abortRun();
};
if (params.abortSignal) {
if (params.abortSignal.aborted) {
onAbort();
} else {
params.abortSignal.addEventListener("abort", onAbort, {
once: true,
});
}
}
let promptError: unknown = null;
try {
await session.prompt(params.prompt);
} catch (err) {
promptError = err;
} finally {
const promptStartedAt = Date.now();
log.debug(
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`,
);
try {
await session.prompt(params.prompt);
} catch (err) {
promptError = err;
} finally {
log.debug(
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
);
}
await waitForCompactionRetry();
messagesSnapshot = session.messages.slice();
sessionIdUsed = session.sessionId;
} finally {
clearTimeout(abortTimer);
if (abortWarnTimer) {
clearTimeout(abortWarnTimer);
abortWarnTimer = undefined;
}
unsubscribe();
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
notifyEmbeddedRunEnded(params.sessionId);
}
session.dispose();
params.abortSignal?.removeEventListener?.("abort", onAbort);
}
await waitForCompactionRetry();
messagesSnapshot = session.messages.slice();
sessionIdUsed = session.sessionId;
} finally {
clearTimeout(abortTimer);
if (abortWarnTimer) {
clearTimeout(abortWarnTimer);
abortWarnTimer = undefined;
if (promptError && !aborted) {
const fallbackThinking = pickFallbackThinkingLevel({
message:
promptError instanceof Error
? promptError.message
: String(promptError),
attempted: attemptedThinking,
});
if (fallbackThinking) {
log.warn(
`unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`,
);
thinkLevel = fallbackThinking;
continue;
}
throw promptError;
}
unsubscribe();
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
notifyEmbeddedRunEnded(params.sessionId);
}
session.dispose();
params.abortSignal?.removeEventListener?.("abort", onAbort);
}
if (promptError && !aborted) {
const lastAssistant = messagesSnapshot
.slice()
.reverse()
.find((m) => (m as AgentMessage)?.role === "assistant") as
| AssistantMessage
| undefined;
const fallbackThinking = pickFallbackThinkingLevel({
message:
promptError instanceof Error
? promptError.message
: String(promptError),
message: lastAssistant?.errorMessage,
attempted: attemptedThinking,
});
if (fallbackThinking) {
if (fallbackThinking && !aborted) {
log.warn(
`unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`,
);
thinkLevel = fallbackThinking;
continue;
}
throw promptError;
}
const lastAssistant = messagesSnapshot
.slice()
.reverse()
.find((m) => (m as AgentMessage)?.role === "assistant") as
| AssistantMessage
| undefined;
const fallbackThinking = pickFallbackThinkingLevel({
message: lastAssistant?.errorMessage,
attempted: attemptedThinking,
});
if (fallbackThinking && !aborted) {
log.warn(
`unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`,
);
thinkLevel = fallbackThinking;
continue;
}
const fallbackConfigured =
(params.config?.agent?.modelFallbacks?.length ?? 0) > 0;
if (fallbackConfigured && isRateLimitAssistantError(lastAssistant)) {
const message =
lastAssistant?.errorMessage?.trim() ||
(lastAssistant ? formatAssistantErrorText(lastAssistant) : "") ||
"LLM request rate limited.";
throw new Error(message);
}
const usage = lastAssistant?.usage;
const agentMeta: EmbeddedPiAgentMeta = {
sessionId: sessionIdUsed,
provider: lastAssistant?.provider ?? provider,
model: lastAssistant?.model ?? model.id,
usage: usage
? {
input: usage.input,
output: usage.output,
cacheRead: usage.cacheRead,
cacheWrite: usage.cacheWrite,
total: usage.totalTokens,
}
: undefined,
};
const replyItems: Array<{ text: string; media?: string[] }> = [];
const errorText = lastAssistant
? formatAssistantErrorText(lastAssistant)
: undefined;
if (errorText) replyItems.push({ text: errorText });
const inlineToolResults =
params.verboseLevel === "on" &&
!params.onPartialReply &&
!params.onToolResult &&
toolMetas.length > 0;
if (inlineToolResults) {
for (const { toolName, meta } of toolMetas) {
const agg = formatToolAggregate(toolName, meta ? [meta] : []);
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg);
if (cleanedText)
replyItems.push({ text: cleanedText, media: mediaUrls });
const fallbackConfigured =
(params.config?.agent?.modelFallbacks?.length ?? 0) > 0;
if (fallbackConfigured && isRateLimitAssistantError(lastAssistant)) {
const message =
lastAssistant?.errorMessage?.trim() ||
(lastAssistant ? formatAssistantErrorText(lastAssistant) : "") ||
"LLM request rate limited.";
throw new Error(message);
}
}
for (const text of assistantTexts.length
? assistantTexts
: lastAssistant
? [extractAssistantText(lastAssistant)]
: []) {
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue;
replyItems.push({ text: cleanedText, media: mediaUrls });
}
const usage = lastAssistant?.usage;
const agentMeta: EmbeddedPiAgentMeta = {
sessionId: sessionIdUsed,
provider: lastAssistant?.provider ?? provider,
model: lastAssistant?.model ?? model.id,
usage: usage
? {
input: usage.input,
output: usage.output,
cacheRead: usage.cacheRead,
cacheWrite: usage.cacheWrite,
total: usage.totalTokens,
}
: undefined,
};
const payloads = replyItems
.map((item) => ({
text: item.text?.trim() ? item.text.trim() : undefined,
mediaUrls: item.media?.length ? item.media : undefined,
mediaUrl: item.media?.[0],
}))
.filter(
(p) =>
p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0),
const replyItems: Array<{ text: string; media?: string[] }> = [];
const errorText = lastAssistant
? formatAssistantErrorText(lastAssistant)
: undefined;
if (errorText) replyItems.push({ text: errorText });
const inlineToolResults =
params.verboseLevel === "on" &&
!params.onPartialReply &&
!params.onToolResult &&
toolMetas.length > 0;
if (inlineToolResults) {
for (const { toolName, meta } of toolMetas) {
const agg = formatToolAggregate(toolName, meta ? [meta] : []);
const { text: cleanedText, mediaUrls } =
splitMediaFromOutput(agg);
if (cleanedText)
replyItems.push({ text: cleanedText, media: mediaUrls });
}
}
for (const text of assistantTexts.length
? assistantTexts
: lastAssistant
? [extractAssistantText(lastAssistant)]
: []) {
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0))
continue;
replyItems.push({ text: cleanedText, media: mediaUrls });
}
const payloads = replyItems
.map((item) => ({
text: item.text?.trim() ? item.text.trim() : undefined,
mediaUrls: item.media?.length ? item.media : undefined,
mediaUrl: item.media?.[0],
}))
.filter(
(p) =>
p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0),
);
log.debug(
`embedded run done: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - started} aborted=${aborted}`,
);
log.debug(
`embedded run done: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - started} aborted=${aborted}`,
);
return {
payloads: payloads.length ? payloads : undefined,
meta: {
durationMs: Date.now() - started,
agentMeta,
aborted,
},
};
return {
payloads: payloads.length ? payloads : undefined,
meta: {
durationMs: Date.now() - started,
agentMeta,
aborted,
},
};
} finally {
restoreSkillEnv?.();
process.chdir(prevCwd);