merge upstream/main

This commit is contained in:
Peter Steinberger
2026-01-06 23:09:01 +01:00
365 changed files with 19758 additions and 4684 deletions

View File

@@ -51,6 +51,8 @@ function createTyping(): TypingController {
startTypingLoop: vi.fn(async () => {}),
startTypingOnText: vi.fn(async () => {}),
refreshTypingTtl: vi.fn(),
markRunComplete: vi.fn(),
markDispatchIdle: vi.fn(),
cleanup: vi.fn(),
};
}
@@ -70,7 +72,7 @@ function createMinimalRun(params?: {
const typing = createTyping();
const opts = params?.opts;
const sessionCtx = {
Surface: "whatsapp",
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext;
const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings;
@@ -82,7 +84,7 @@ function createMinimalRun(params?: {
run: {
sessionId: "session",
sessionKey,
surface: "whatsapp",
messageProvider: "whatsapp",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
config: {},
@@ -208,7 +210,6 @@ describe("runReplyAgent typing (heartbeat)", () => {
expect(payloads[0]?.text).toContain("count 1");
expect(sessionStore.main.compactionCount).toBe(1);
});
it("resets corrupted Gemini sessions and deletes transcripts", async () => {
const prevStateDir = process.env.CLAWDBOT_STATE_DIR;
const stateDir = await fs.mkdtemp(
@@ -354,4 +355,26 @@ describe("runReplyAgent typing (heartbeat)", () => {
}
}
});
it("rewrites Bun socket errors into friendly text", async () => {
runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [
{
text: "TypeError: The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()",
isError: true,
},
],
meta: {},
}));
const { run } = createMinimalRun();
const res = await run();
const payloads = Array.isArray(res) ? res : res ? [res] : [];
expect(payloads.length).toBe(1);
expect(payloads[0]?.text).toContain("LLM connection failed");
expect(payloads[0]?.text).toContain(
"socket connection was closed unexpectedly",
);
expect(payloads[0]?.text).toContain("```");
});
});

View File

@@ -7,6 +7,7 @@ import {
queueEmbeddedPiMessage,
runEmbeddedPiAgent,
} from "../../agents/pi-embedded.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import {
loadSessionStore,
resolveSessionTranscriptPath,
@@ -32,6 +33,21 @@ import { extractReplyToTag } from "./reply-tags.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js";
const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i;
const isBunFetchSocketError = (message?: string) =>
Boolean(message && BUN_FETCH_SOCKET_ERROR_RE.test(message));
const formatBunFetchSocketError = (message: string) => {
const trimmed = message.trim();
return [
"⚠️ LLM connection failed. This could be due to server issues, network problems, or context length exceeded (e.g., with local LLMs like LM Studio). Original error:",
"```",
trimmed || "Unknown error",
"```",
].join("\n");
};
export async function runReplyAgent(params: {
commandBody: string;
followupRun: FollowupRun;
@@ -107,6 +123,7 @@ export async function runReplyAgent(params: {
const streamedPayloadKeys = new Set<string>();
const pendingStreamedPayloadKeys = new Set<string>();
const pendingBlockTasks = new Set<Promise<void>>();
const pendingToolTasks = new Set<Promise<void>>();
let didStreamBlockReply = false;
const buildPayloadKey = (payload: ReplyPayload) => {
const text = payload.text?.trim() ?? "";
@@ -188,9 +205,11 @@ export async function runReplyAgent(params: {
runEmbeddedPiAgent({
sessionId: followupRun.run.sessionId,
sessionKey,
surface: sessionCtx.Surface?.trim().toLowerCase() || undefined,
messageProvider:
sessionCtx.Provider?.trim().toLowerCase() || undefined,
sessionFile: followupRun.run.sessionFile,
workspaceDir: followupRun.run.workspaceDir,
agentDir: followupRun.run.agentDir,
config: followupRun.run.config,
skillsSnapshot: followupRun.run.skillsSnapshot,
prompt: commandBody,
@@ -239,7 +258,8 @@ export async function runReplyAgent(params: {
: undefined,
onAgentEvent: (evt) => {
if (evt.stream !== "compaction") return;
const phase = String(evt.data.phase ?? "");
const phase =
typeof evt.data.phase === "string" ? evt.data.phase : "";
const willRetry = Boolean(evt.data.willRetry);
if (phase === "end" && !willRetry) {
autoCompactionCompleted = true;
@@ -310,33 +330,45 @@ export async function runReplyAgent(params: {
: undefined,
shouldEmitToolResult,
onToolResult: opts?.onToolResult
? async (payload) => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
? (payload) => {
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them.
// If a tool callback starts typing after the run finalized, we can end up with
// a typing loop that never sees a matching markRunComplete(). Track and drain.
const task = (async () => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
if (!isHeartbeat) {
await typing.startTypingOnText(text);
}
await opts.onToolResult?.({
text,
mediaUrls: payload.mediaUrls,
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
if (!isHeartbeat) {
await typing.startTypingOnText(text);
}
await opts.onToolResult?.({
text,
mediaUrls: payload.mediaUrls,
});
})()
.catch((err) => {
logVerbose(`tool result delivery failed: ${String(err)}`);
})
.finally(() => {
pendingToolTasks.delete(task);
});
pendingToolTasks.add(task);
}
: undefined,
}),
@@ -408,16 +440,28 @@ export async function runReplyAgent(params: {
}
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return finalizeWithFollowup(undefined);
if (pendingBlockTasks.size > 0) {
await Promise.allSettled(pendingBlockTasks);
}
if (pendingToolTasks.size > 0) {
await Promise.allSettled(pendingToolTasks);
}
// Drain any late tool/block deliveries before deciding there's "nothing to send".
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
// keep the typing indicator stuck.
if (payloadArray.length === 0) return finalizeWithFollowup(undefined);
const sanitizedPayloads = isHeartbeat
? payloadArray
: payloadArray.flatMap((payload) => {
const text = payload.text;
if (!text || !text.includes("HEARTBEAT_OK")) return [payload];
let text = payload.text;
if (payload.isError && text && isBunFetchSocketError(text)) {
text = formatBunFetchSocketError(text);
}
if (!text || !text.includes("HEARTBEAT_OK"))
return [{ ...payload, text }];
const stripped = stripHeartbeatToken(text, { mode: "message" });
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
@@ -485,7 +529,7 @@ export async function runReplyAgent(params: {
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (usage) {
if (hasNonzeroUsage(usage)) {
const entry = sessionEntry ?? sessionStore[sessionKey];
if (entry) {
const input = usage.input ?? 0;
@@ -552,6 +596,6 @@ export async function runReplyAgent(params: {
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
);
} finally {
typing.cleanup();
typing.markRunComplete();
}
}

View File

@@ -1,10 +1,10 @@
import type { ClawdbotConfig } from "../../config/config.js";
import { resolveTextChunkLimit, type TextChunkSurface } from "../chunk.js";
import { resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js";
const DEFAULT_BLOCK_STREAM_MIN = 800;
const DEFAULT_BLOCK_STREAM_MAX = 1200;
const BLOCK_CHUNK_SURFACES = new Set<TextChunkSurface>([
const BLOCK_CHUNK_PROVIDERS = new Set<TextChunkProvider>([
"whatsapp",
"telegram",
"discord",
@@ -14,24 +14,26 @@ const BLOCK_CHUNK_SURFACES = new Set<TextChunkSurface>([
"webchat",
]);
function normalizeChunkSurface(surface?: string): TextChunkSurface | undefined {
if (!surface) return undefined;
const cleaned = surface.trim().toLowerCase();
return BLOCK_CHUNK_SURFACES.has(cleaned as TextChunkSurface)
? (cleaned as TextChunkSurface)
function normalizeChunkProvider(
provider?: string,
): TextChunkProvider | undefined {
if (!provider) return undefined;
const cleaned = provider.trim().toLowerCase();
return BLOCK_CHUNK_PROVIDERS.has(cleaned as TextChunkProvider)
? (cleaned as TextChunkProvider)
: undefined;
}
export function resolveBlockStreamingChunking(
cfg: ClawdbotConfig | undefined,
surface?: string,
provider?: string,
): {
minChars: number;
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
} {
const surfaceKey = normalizeChunkSurface(surface);
const textLimit = resolveTextChunkLimit(cfg, surfaceKey);
const providerKey = normalizeChunkProvider(provider);
const textLimit = resolveTextChunkLimit(cfg, providerKey);
const chunkCfg = cfg?.agent?.blockStreamingChunk;
const maxRequested = Math.max(
1,

View File

@@ -27,6 +27,7 @@ import { normalizeE164 } from "../../utils.js";
import { resolveHeartbeatSeconds } from "../../web/reconnect.js";
import { getWebAuthAgeMs, webAuthExists } from "../../web/session.js";
import { resolveCommandAuthorization } from "../command-auth.js";
import { shouldHandleTextCommands } from "../commands-registry.js";
import {
normalizeGroupActivation,
parseActivationCommand,
@@ -48,7 +49,8 @@ import { incrementCompactionCount } from "./session-updates.js";
export type CommandContext = {
surface: string;
isWhatsAppSurface: boolean;
provider: string;
isWhatsAppProvider: boolean;
ownerList: string[];
isAuthorizedSender: boolean;
senderE164?: string;
@@ -102,11 +104,7 @@ function extractCompactInstructions(params: {
const trimmed = stripped.trim();
if (!trimmed) return undefined;
const lowered = trimmed.toLowerCase();
const prefix = lowered.startsWith("/compact")
? "/compact"
: lowered.startsWith("compact")
? "compact"
: null;
const prefix = lowered.startsWith("/compact") ? "/compact" : null;
if (!prefix) return undefined;
let rest = trimmed.slice(prefix.length).trimStart();
if (rest.startsWith(":")) rest = rest.slice(1).trimStart();
@@ -127,7 +125,8 @@ export function buildCommandContext(params: {
cfg,
commandAuthorized: params.commandAuthorized,
});
const surface = (ctx.Surface ?? "").trim().toLowerCase();
const surface = (ctx.Surface ?? ctx.Provider ?? "").trim().toLowerCase();
const provider = (ctx.Provider ?? surface).trim().toLowerCase();
const abortKey =
sessionKey ?? (auth.from || undefined) ?? (auth.to || undefined);
const rawBodyNormalized = triggerBodyNormalized;
@@ -137,7 +136,8 @@ export function buildCommandContext(params: {
return {
surface,
isWhatsAppSurface: auth.isWhatsAppSurface,
provider,
isWhatsAppProvider: auth.isWhatsAppProvider,
ownerList: auth.ownerList,
isAuthorizedSender: auth.isAuthorizedSender,
senderE164: auth.senderE164,
@@ -197,9 +197,7 @@ export async function handleCommands(params: {
const resetRequested =
command.commandBodyNormalized === "/reset" ||
command.commandBodyNormalized === "reset" ||
command.commandBodyNormalized === "/new" ||
command.commandBodyNormalized === "new";
command.commandBodyNormalized === "/new";
if (resetRequested && !command.isAuthorizedSender) {
logVerbose(
`Ignoring /reset from unauthorized sender: ${command.senderE164 || "<unknown>"}`,
@@ -213,8 +211,13 @@ export async function handleCommands(params: {
const sendPolicyCommand = parseSendPolicyCommand(
command.commandBodyNormalized,
);
const allowTextCommands = shouldHandleTextCommands({
cfg,
surface: command.surface,
commandSource: ctx.CommandSource,
});
if (activationCommand.hasCommand) {
if (allowTextCommands && activationCommand.hasCommand) {
if (!isGroup) {
return {
shouldContinue: false,
@@ -226,14 +229,14 @@ export async function handleCommands(params: {
? normalizeE164(command.senderE164)
: "";
const isActivationOwner =
!command.isWhatsAppSurface || activationOwnerList.length === 0
!command.isWhatsAppProvider || activationOwnerList.length === 0
? command.isAuthorizedSender
: Boolean(activationSenderE164) &&
activationOwnerList.includes(activationSenderE164);
if (
!command.isAuthorizedSender ||
(command.isWhatsAppSurface && !isActivationOwner)
(command.isWhatsAppProvider && !isActivationOwner)
) {
logVerbose(
`Ignoring /activation from unauthorized sender in group: ${command.senderE164 || "<unknown>"}`,
@@ -261,7 +264,7 @@ export async function handleCommands(params: {
};
}
if (sendPolicyCommand.hasCommand) {
if (allowTextCommands && sendPolicyCommand.hasCommand) {
if (!command.isAuthorizedSender) {
logVerbose(
`Ignoring /send from unauthorized sender: ${command.senderE164 || "<unknown>"}`,
@@ -298,11 +301,7 @@ export async function handleCommands(params: {
};
}
if (
command.commandBodyNormalized === "/restart" ||
command.commandBodyNormalized === "restart" ||
command.commandBodyNormalized.startsWith("/restart ")
) {
if (allowTextCommands && command.commandBodyNormalized === "/restart") {
if (!command.isAuthorizedSender) {
logVerbose(
`Ignoring /restart from unauthorized sender: ${command.senderE164 || "<unknown>"}`,
@@ -318,11 +317,8 @@ export async function handleCommands(params: {
};
}
const helpRequested =
command.commandBodyNormalized === "/help" ||
command.commandBodyNormalized === "help" ||
/(?:^|\s)\/help(?=$|\s|:)\b/i.test(command.commandBodyNormalized);
if (helpRequested) {
const helpRequested = command.commandBodyNormalized === "/help";
if (allowTextCommands && helpRequested) {
if (!command.isAuthorizedSender) {
logVerbose(
`Ignoring /help from unauthorized sender: ${command.senderE164 || "<unknown>"}`,
@@ -334,10 +330,8 @@ export async function handleCommands(params: {
const statusRequested =
directives.hasStatusDirective ||
command.commandBodyNormalized === "/status" ||
command.commandBodyNormalized === "status" ||
command.commandBodyNormalized.startsWith("/status ");
if (statusRequested) {
command.commandBodyNormalized === "/status";
if (allowTextCommands && statusRequested) {
if (!command.isAuthorizedSender) {
logVerbose(
`Ignoring /status from unauthorized sender: ${command.senderE164 || "<unknown>"}`,
@@ -383,9 +377,7 @@ export async function handleCommands(params: {
const compactRequested =
command.commandBodyNormalized === "/compact" ||
command.commandBodyNormalized === "compact" ||
command.commandBodyNormalized.startsWith("/compact ") ||
command.commandBodyNormalized.startsWith("compact ");
command.commandBodyNormalized.startsWith("/compact ");
if (compactRequested) {
if (!command.isAuthorizedSender) {
logVerbose(
@@ -413,7 +405,7 @@ export async function handleCommands(params: {
const result = await compactEmbeddedPiSession({
sessionId,
sessionKey,
surface: command.surface,
messageProvider: command.provider,
sessionFile: resolveSessionTranscriptPath(sessionId),
workspaceDir,
config: cfg,
@@ -462,7 +454,7 @@ export async function handleCommands(params: {
}
const abortRequested = isAbortTrigger(command.rawBodyNormalized);
if (abortRequested) {
if (allowTextCommands && abortRequested) {
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.abortedLastRun = true;
sessionEntry.updatedAt = Date.now();
@@ -480,7 +472,7 @@ export async function handleCommands(params: {
cfg,
entry: sessionEntry,
sessionKey,
surface: sessionEntry?.surface ?? command.surface,
provider: sessionEntry?.provider ?? command.provider,
chatType: sessionEntry?.chatType,
});
if (sendPolicy === "deny") {

View File

@@ -0,0 +1,46 @@
import type { ClawdbotConfig } from "../../config/config.js";
import { getReplyFromConfig } from "../reply.js";
import type { MsgContext } from "../templating.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
type DispatchFromConfigResult = {
queuedFinal: boolean;
counts: Record<ReplyDispatchKind, number>;
};
export async function dispatchReplyFromConfig(params: {
ctx: MsgContext;
cfg: ClawdbotConfig;
dispatcher: ReplyDispatcher;
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
replyResolver?: typeof getReplyFromConfig;
}): Promise<DispatchFromConfigResult> {
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
params.ctx,
{
...params.replyOptions,
onToolResult: (payload: ReplyPayload) => {
params.dispatcher.sendToolResult(payload);
},
onBlockReply: (payload: ReplyPayload) => {
params.dispatcher.sendBlockReply(payload);
},
},
params.cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
let queuedFinal = false;
for (const reply of replies) {
queuedFinal = params.dispatcher.sendFinalReply(reply) || queuedFinal;
}
await params.dispatcher.waitForIdle();
return { queuedFinal, counts: params.dispatcher.getQueuedCounts() };
}

View File

@@ -37,6 +37,8 @@ function createTyping(): TypingController {
startTypingLoop: vi.fn(async () => {}),
startTypingOnText: vi.fn(async () => {}),
refreshTypingTtl: vi.fn(),
markRunComplete: vi.fn(),
markDispatchIdle: vi.fn(),
cleanup: vi.fn(),
};
}
@@ -88,7 +90,7 @@ describe("createFollowupRunner compaction", () => {
run: {
sessionId: "session",
sessionKey: "main",
surface: "whatsapp",
messageProvider: "whatsapp",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
config: {},

View File

@@ -3,6 +3,7 @@ import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { runWithModelFallback } from "../../agents/model-fallback.js";
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
@@ -58,153 +59,160 @@ export function createFollowupRunner(params: {
};
return async (queued: FollowupRun) => {
const runId = crypto.randomUUID();
if (queued.run.sessionKey) {
registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey });
}
let autoCompactionCompleted = false;
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = queued.run.provider;
let fallbackModel = queued.run.model;
try {
const fallbackResult = await runWithModelFallback({
cfg: queued.run.config,
provider: queued.run.provider,
model: queued.run.model,
run: (provider, model) =>
runEmbeddedPiAgent({
sessionId: queued.run.sessionId,
sessionKey: queued.run.sessionKey,
surface: queued.run.surface,
sessionFile: queued.run.sessionFile,
workspaceDir: queued.run.workspaceDir,
config: queued.run.config,
skillsSnapshot: queued.run.skillsSnapshot,
prompt: queued.prompt,
extraSystemPrompt: queued.run.extraSystemPrompt,
ownerNumbers: queued.run.ownerNumbers,
enforceFinalTag: queued.run.enforceFinalTag,
provider,
model,
authProfileId: queued.run.authProfileId,
thinkLevel: queued.run.thinkLevel,
verboseLevel: queued.run.verboseLevel,
bashElevated: queued.run.bashElevated,
timeoutMs: queued.run.timeoutMs,
runId,
blockReplyBreak: queued.run.blockReplyBreak,
onAgentEvent: (evt) => {
if (evt.stream !== "compaction") return;
const phase = String(evt.data.phase ?? "");
const willRetry = Boolean(evt.data.willRetry);
if (phase === "end" && !willRetry) {
autoCompactionCompleted = true;
}
},
}),
});
runResult = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
defaultRuntime.error?.(`Followup agent failed before reply: ${message}`);
return;
}
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return;
const sanitizedPayloads = payloadArray.flatMap((payload) => {
const text = payload.text;
if (!text || !text.includes("HEARTBEAT_OK")) return [payload];
const stripped = stripHeartbeatToken(text, { mode: "message" });
const hasMedia =
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return [];
return [{ ...payload, text: stripped.text }];
});
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
.map((payload) => {
const { cleaned, replyToId } = extractReplyToTag(payload.text);
return {
...payload,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? payload.replyToId,
};
})
.filter(
(payload) =>
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
if (replyTaggedPayloads.length === 0) return;
if (autoCompactionCompleted) {
const count = await incrementCompactionCount({
sessionEntry,
sessionStore,
sessionKey,
storePath,
});
if (queued.run.verboseLevel === "on") {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
replyTaggedPayloads.unshift({
text: `🧹 Auto-compaction complete${suffix}.`,
const runId = crypto.randomUUID();
if (queued.run.sessionKey) {
registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey });
}
let autoCompactionCompleted = false;
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = queued.run.provider;
let fallbackModel = queued.run.model;
try {
const fallbackResult = await runWithModelFallback({
cfg: queued.run.config,
provider: queued.run.provider,
model: queued.run.model,
run: (provider, model) =>
runEmbeddedPiAgent({
sessionId: queued.run.sessionId,
sessionKey: queued.run.sessionKey,
messageProvider: queued.run.messageProvider,
sessionFile: queued.run.sessionFile,
workspaceDir: queued.run.workspaceDir,
config: queued.run.config,
skillsSnapshot: queued.run.skillsSnapshot,
prompt: queued.prompt,
extraSystemPrompt: queued.run.extraSystemPrompt,
ownerNumbers: queued.run.ownerNumbers,
enforceFinalTag: queued.run.enforceFinalTag,
provider,
model,
authProfileId: queued.run.authProfileId,
thinkLevel: queued.run.thinkLevel,
verboseLevel: queued.run.verboseLevel,
bashElevated: queued.run.bashElevated,
timeoutMs: queued.run.timeoutMs,
runId,
blockReplyBreak: queued.run.blockReplyBreak,
onAgentEvent: (evt) => {
if (evt.stream !== "compaction") return;
const phase =
typeof evt.data.phase === "string" ? evt.data.phase : "";
const willRetry = Boolean(evt.data.willRetry);
if (phase === "end" && !willRetry) {
autoCompactionCompleted = true;
}
},
}),
});
runResult = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
defaultRuntime.error?.(
`Followup agent failed before reply: ${message}`,
);
return;
}
}
if (sessionStore && sessionKey) {
const usage = runResult.meta.agentMeta?.usage;
const modelUsed =
runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return;
const sanitizedPayloads = payloadArray.flatMap((payload) => {
const text = payload.text;
if (!text || !text.includes("HEARTBEAT_OK")) return [payload];
const stripped = stripHeartbeatToken(text, { mode: "message" });
const hasMedia =
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return [];
return [{ ...payload, text: stripped.text }];
});
if (usage) {
const entry = sessionStore[sessionKey];
if (entry) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
sessionStore[sessionKey] = {
...entry,
inputTokens: input,
outputTokens: output,
totalTokens:
promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
.map((payload) => {
const { cleaned, replyToId } = extractReplyToTag(payload.text);
return {
...payload,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? payload.replyToId,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
})
.filter(
(payload) =>
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
if (replyTaggedPayloads.length === 0) return;
if (autoCompactionCompleted) {
const count = await incrementCompactionCount({
sessionEntry,
sessionStore,
sessionKey,
storePath,
});
if (queued.run.verboseLevel === "on") {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
replyTaggedPayloads.unshift({
text: `🧹 Auto-compaction complete${suffix}.`,
});
}
} else if (modelUsed || contextTokensUsed) {
const entry = sessionStore[sessionKey];
if (entry) {
sessionStore[sessionKey] = {
...entry,
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
if (sessionStore && sessionKey) {
const usage = runResult.meta.agentMeta?.usage;
const modelUsed =
runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (hasNonzeroUsage(usage)) {
const entry = sessionStore[sessionKey];
if (entry) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
sessionStore[sessionKey] = {
...entry,
inputTokens: input,
outputTokens: output,
totalTokens:
promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
} else if (modelUsed || contextTokensUsed) {
const entry = sessionStore[sessionKey];
if (entry) {
sessionStore[sessionKey] = {
...entry,
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
}
}
}
await sendFollowupPayloads(replyTaggedPayloads);
await sendFollowupPayloads(replyTaggedPayloads);
} finally {
typing.markRunComplete();
}
};
}

View File

@@ -19,13 +19,13 @@ describe("resolveGroupRequireMention", () => {
},
};
const ctx: TemplateContext = {
Surface: "discord",
Provider: "discord",
From: "group:123",
GroupRoom: "#general",
GroupSpace: "145",
};
const groupResolution: GroupKeyResolution = {
surface: "discord",
provider: "discord",
id: "123",
chatType: "group",
};
@@ -44,12 +44,12 @@ describe("resolveGroupRequireMention", () => {
},
};
const ctx: TemplateContext = {
Surface: "slack",
Provider: "slack",
From: "slack:channel:C123",
GroupSubject: "#general",
};
const groupResolution: GroupKeyResolution = {
surface: "slack",
provider: "slack",
id: "C123",
chatType: "group",
};

View File

@@ -1,4 +1,5 @@
import type { ClawdbotConfig } from "../../config/config.js";
import { resolveProviderGroupRequireMention } from "../../config/group-policy.js";
import type {
GroupKeyResolution,
SessionEntry,
@@ -49,44 +50,23 @@ export function resolveGroupRequireMention(params: {
groupResolution?: GroupKeyResolution;
}): boolean {
const { cfg, ctx, groupResolution } = params;
const surface = groupResolution?.surface ?? ctx.Surface?.trim().toLowerCase();
const provider =
groupResolution?.provider ?? ctx.Provider?.trim().toLowerCase();
const groupId = groupResolution?.id ?? ctx.From?.replace(/^group:/, "");
const groupRoom = ctx.GroupRoom?.trim() ?? ctx.GroupSubject?.trim();
const groupSpace = ctx.GroupSpace?.trim();
if (surface === "telegram") {
if (groupId) {
const groupConfig = cfg.telegram?.groups?.[groupId];
if (typeof groupConfig?.requireMention === "boolean") {
return groupConfig.requireMention;
}
}
const groupDefault = cfg.telegram?.groups?.["*"]?.requireMention;
if (typeof groupDefault === "boolean") return groupDefault;
return true;
if (
provider === "telegram" ||
provider === "whatsapp" ||
provider === "imessage"
) {
return resolveProviderGroupRequireMention({
cfg,
provider,
groupId,
});
}
if (surface === "whatsapp") {
if (groupId) {
const groupConfig = cfg.whatsapp?.groups?.[groupId];
if (typeof groupConfig?.requireMention === "boolean") {
return groupConfig.requireMention;
}
}
const groupDefault = cfg.whatsapp?.groups?.["*"]?.requireMention;
if (typeof groupDefault === "boolean") return groupDefault;
return true;
}
if (surface === "imessage") {
if (groupId) {
const groupConfig = cfg.imessage?.groups?.[groupId];
if (typeof groupConfig?.requireMention === "boolean") {
return groupConfig.requireMention;
}
}
const groupDefault = cfg.imessage?.groups?.["*"]?.requireMention;
if (typeof groupDefault === "boolean") return groupDefault;
return true;
}
if (surface === "discord") {
if (provider === "discord") {
const guildEntry = resolveDiscordGuildEntry(
cfg.discord?.guilds,
groupSpace,
@@ -111,7 +91,7 @@ export function resolveGroupRequireMention(params: {
}
return true;
}
if (surface === "slack") {
if (provider === "slack") {
const channels = cfg.slack?.channels ?? {};
const keys = Object.keys(channels);
if (keys.length === 0) return true;
@@ -158,18 +138,18 @@ export function buildGroupIntro(params: {
params.defaultActivation;
const subject = params.sessionCtx.GroupSubject?.trim();
const members = params.sessionCtx.GroupMembers?.trim();
const surface = params.sessionCtx.Surface?.trim().toLowerCase();
const surfaceLabel = (() => {
if (!surface) return "chat";
if (surface === "whatsapp") return "WhatsApp";
if (surface === "telegram") return "Telegram";
if (surface === "discord") return "Discord";
if (surface === "webchat") return "WebChat";
return `${surface.at(0)?.toUpperCase() ?? ""}${surface.slice(1)}`;
const provider = params.sessionCtx.Provider?.trim().toLowerCase();
const providerLabel = (() => {
if (!provider) return "chat";
if (provider === "whatsapp") return "WhatsApp";
if (provider === "telegram") return "Telegram";
if (provider === "discord") return "Discord";
if (provider === "webchat") return "WebChat";
return `${provider.at(0)?.toUpperCase() ?? ""}${provider.slice(1)}`;
})();
const subjectLine = subject
? `You are replying inside the ${surfaceLabel} group "${subject}".`
: `You are replying inside a ${surfaceLabel} group chat.`;
? `You are replying inside the ${providerLabel} group "${subject}".`
: `You are replying inside a ${providerLabel} group chat.`;
const membersLine = members ? `Group members: ${members}.` : undefined;
const activationLine =
activation === "always"

View File

@@ -23,9 +23,11 @@ export type FollowupRun = {
summaryLine?: string;
enqueuedAt: number;
run: {
agentId: string;
agentDir: string;
sessionId: string;
sessionKey?: string;
surface?: string;
messageProvider?: string;
sessionFile: string;
workspaceDir: string;
config: ClawdbotConfig;
@@ -425,8 +427,8 @@ export function scheduleFollowupDrain(
}
})();
}
function defaultQueueModeForSurface(surface?: string): QueueMode {
const normalized = surface?.trim().toLowerCase();
function defaultQueueModeForProvider(provider?: string): QueueMode {
const normalized = provider?.trim().toLowerCase();
if (normalized === "discord") return "collect";
if (normalized === "webchat") return "collect";
if (normalized === "whatsapp") return "collect";
@@ -437,23 +439,23 @@ function defaultQueueModeForSurface(surface?: string): QueueMode {
}
export function resolveQueueSettings(params: {
cfg: ClawdbotConfig;
surface?: string;
provider?: string;
sessionEntry?: SessionEntry;
inlineMode?: QueueMode;
inlineOptions?: Partial<QueueSettings>;
}): QueueSettings {
const surfaceKey = params.surface?.trim().toLowerCase();
const providerKey = params.provider?.trim().toLowerCase();
const queueCfg = params.cfg.routing?.queue;
const surfaceModeRaw =
surfaceKey && queueCfg?.bySurface
? (queueCfg.bySurface as Record<string, string | undefined>)[surfaceKey]
const providerModeRaw =
providerKey && queueCfg?.byProvider
? (queueCfg.byProvider as Record<string, string | undefined>)[providerKey]
: undefined;
const resolvedMode =
params.inlineMode ??
normalizeQueueMode(params.sessionEntry?.queueMode) ??
normalizeQueueMode(surfaceModeRaw) ??
normalizeQueueMode(providerModeRaw) ??
normalizeQueueMode(queueCfg?.mode) ??
defaultQueueModeForSurface(surfaceKey);
defaultQueueModeForProvider(providerKey);
const debounceRaw =
params.inlineOptions?.debounceMs ??
params.sessionEntry?.queueDebounceMs ??

View File

@@ -79,4 +79,18 @@ describe("createReplyDispatcher", () => {
await dispatcher.waitForIdle();
expect(delivered).toEqual(["tool", "block", "final"]);
});
it("fires onIdle when the queue drains", async () => {
const deliver = vi.fn(
async () => await new Promise((resolve) => setTimeout(resolve, 5)),
);
const onIdle = vi.fn();
const dispatcher = createReplyDispatcher({ deliver, onIdle });
dispatcher.sendToolResult({ text: "one" });
dispatcher.sendFinalReply({ text: "two" });
await dispatcher.waitForIdle();
expect(onIdle).toHaveBeenCalledTimes(1);
});
});

View File

@@ -1,6 +1,7 @@
import { stripHeartbeatToken } from "../heartbeat.js";
import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyPayload } from "../types.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { TypingController } from "./typing.js";
export type ReplyDispatchKind = "tool" | "block" | "final";
@@ -18,10 +19,25 @@ export type ReplyDispatcherOptions = {
deliver: ReplyDispatchDeliverer;
responsePrefix?: string;
onHeartbeatStrip?: () => void;
onIdle?: () => void;
onError?: ReplyDispatchErrorHandler;
};
type ReplyDispatcher = {
type ReplyDispatcherWithTypingOptions = Omit<
ReplyDispatcherOptions,
"onIdle"
> & {
onReplyStart?: () => Promise<void> | void;
onIdle?: () => void;
};
type ReplyDispatcherWithTypingResult = {
dispatcher: ReplyDispatcher;
replyOptions: Pick<GetReplyOptions, "onReplyStart" | "onTypingController">;
markDispatchIdle: () => void;
};
export type ReplyDispatcher = {
sendToolResult: (payload: ReplyPayload) => boolean;
sendBlockReply: (payload: ReplyPayload) => boolean;
sendFinalReply: (payload: ReplyPayload) => boolean;
@@ -70,6 +86,8 @@ export function createReplyDispatcher(
options: ReplyDispatcherOptions,
): ReplyDispatcher {
let sendChain: Promise<void> = Promise.resolve();
// Track in-flight deliveries so we can emit a reliable "idle" signal.
let pending = 0;
// Serialize outbound replies to preserve tool/block/final order.
const queuedCounts: Record<ReplyDispatchKind, number> = {
tool: 0,
@@ -81,10 +99,17 @@ export function createReplyDispatcher(
const normalized = normalizeReplyPayload(payload, options);
if (!normalized) return false;
queuedCounts[kind] += 1;
pending += 1;
sendChain = sendChain
.then(() => options.deliver(normalized, { kind }))
.catch((err) => {
options.onError?.(err, { kind });
})
.finally(() => {
pending -= 1;
if (pending === 0) {
options.onIdle?.();
}
});
return true;
};
@@ -97,3 +122,31 @@ export function createReplyDispatcher(
getQueuedCounts: () => ({ ...queuedCounts }),
};
}
export function createReplyDispatcherWithTyping(
options: ReplyDispatcherWithTypingOptions,
): ReplyDispatcherWithTypingResult {
const { onReplyStart, onIdle, ...dispatcherOptions } = options;
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
...dispatcherOptions,
onIdle: () => {
typingController?.markDispatchIdle();
onIdle?.();
},
});
return {
dispatcher,
replyOptions: {
onReplyStart,
onTypingController: (typing) => {
typingController = typing;
},
},
markDispatchIdle: () => {
typingController?.markDispatchIdle();
onIdle?.();
},
};
}

View File

@@ -7,6 +7,7 @@ import {
DEFAULT_RESET_TRIGGERS,
type GroupKeyResolution,
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveGroupSessionKey,
resolveSessionKey,
resolveStorePath,
@@ -43,6 +44,7 @@ export async function initSessionState(params: {
const { ctx, cfg, commandAuthorized } = params;
const sessionCfg = cfg.session;
const mainKey = sessionCfg?.mainKey ?? "main";
const agentId = resolveAgentIdFromSessionKey(ctx.SessionKey);
const resetTriggers = sessionCfg?.resetTriggers?.length
? sessionCfg.resetTriggers
: DEFAULT_RESET_TRIGGERS;
@@ -51,12 +53,12 @@ export async function initSessionState(params: {
1,
);
const sessionScope = sessionCfg?.scope ?? "per-sender";
const storePath = resolveStorePath(sessionCfg?.store);
const storePath = resolveStorePath(sessionCfg?.store, { agentId });
const sessionStore: Record<string, SessionEntry> =
loadSessionStore(storePath);
let sessionKey: string | undefined;
let sessionEntry: SessionEntry | undefined;
let sessionEntry: SessionEntry;
let sessionId: string | undefined;
let isNewSession = false;
@@ -154,30 +156,30 @@ export async function initSessionState(params: {
queueDrop: baseEntry?.queueDrop,
displayName: baseEntry?.displayName,
chatType: baseEntry?.chatType,
surface: baseEntry?.surface,
provider: baseEntry?.provider,
subject: baseEntry?.subject,
room: baseEntry?.room,
space: baseEntry?.space,
};
if (groupResolution?.surface) {
const surface = groupResolution.surface;
if (groupResolution?.provider) {
const provider = groupResolution.provider;
const subject = ctx.GroupSubject?.trim();
const space = ctx.GroupSpace?.trim();
const explicitRoom = ctx.GroupRoom?.trim();
const isRoomSurface = surface === "discord" || surface === "slack";
const isRoomProvider = provider === "discord" || provider === "slack";
const nextRoom =
explicitRoom ??
(isRoomSurface && subject && subject.startsWith("#")
(isRoomProvider && subject && subject.startsWith("#")
? subject
: undefined);
const nextSubject = nextRoom ? undefined : subject;
sessionEntry.chatType = groupResolution.chatType ?? "group";
sessionEntry.surface = surface;
sessionEntry.provider = provider;
if (nextSubject) sessionEntry.subject = nextSubject;
if (nextRoom) sessionEntry.room = nextRoom;
if (space) sessionEntry.space = space;
sessionEntry.displayName = buildGroupDisplayName({
surface: sessionEntry.surface,
provider: sessionEntry.provider,
subject: sessionEntry.subject,
room: sessionEntry.room,
space: sessionEntry.space,

View File

@@ -0,0 +1,78 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { createTypingController } from "./typing.js";
describe("typing controller", () => {
afterEach(() => {
vi.useRealTimers();
});
it("stops after run completion and dispatcher idle", async () => {
vi.useFakeTimers();
const onReplyStart = vi.fn(async () => {});
const typing = createTypingController({
onReplyStart,
typingIntervalSeconds: 1,
typingTtlMs: 30_000,
});
await typing.startTypingLoop();
expect(onReplyStart).toHaveBeenCalledTimes(1);
vi.advanceTimersByTime(2_000);
expect(onReplyStart).toHaveBeenCalledTimes(3);
typing.markRunComplete();
vi.advanceTimersByTime(1_000);
expect(onReplyStart).toHaveBeenCalledTimes(4);
typing.markDispatchIdle();
vi.advanceTimersByTime(2_000);
expect(onReplyStart).toHaveBeenCalledTimes(4);
});
it("keeps typing until both idle and run completion are set", async () => {
vi.useFakeTimers();
const onReplyStart = vi.fn(async () => {});
const typing = createTypingController({
onReplyStart,
typingIntervalSeconds: 1,
typingTtlMs: 30_000,
});
await typing.startTypingLoop();
expect(onReplyStart).toHaveBeenCalledTimes(1);
typing.markDispatchIdle();
vi.advanceTimersByTime(2_000);
expect(onReplyStart).toHaveBeenCalledTimes(3);
typing.markRunComplete();
vi.advanceTimersByTime(2_000);
expect(onReplyStart).toHaveBeenCalledTimes(3);
});
it("does not restart typing after it has stopped", async () => {
vi.useFakeTimers();
const onReplyStart = vi.fn(async () => {});
const typing = createTypingController({
onReplyStart,
typingIntervalSeconds: 1,
typingTtlMs: 30_000,
});
await typing.startTypingLoop();
expect(onReplyStart).toHaveBeenCalledTimes(1);
typing.markRunComplete();
typing.markDispatchIdle();
vi.advanceTimersByTime(5_000);
expect(onReplyStart).toHaveBeenCalledTimes(1);
// Late callbacks should be ignored and must not restart the interval.
await typing.startTypingOnText("late tool result");
vi.advanceTimersByTime(5_000);
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
});

View File

@@ -3,6 +3,8 @@ export type TypingController = {
startTypingLoop: () => Promise<void>;
startTypingOnText: (text?: string) => Promise<void>;
refreshTypingTtl: () => void;
markRunComplete: () => void;
markDispatchIdle: () => void;
cleanup: () => void;
};
@@ -21,6 +23,13 @@ export function createTypingController(params: {
log,
} = params;
let started = false;
let active = false;
let runComplete = false;
let dispatchIdle = false;
// Important: callbacks (tool/block streaming) can fire late (after the run completed),
// especially when upstream event emitters don't await async listeners.
// Once we stop typing, we "seal" the controller so late events can't restart typing forever.
let sealed = false;
let typingTimer: NodeJS.Timeout | undefined;
let typingTtlTimer: NodeJS.Timeout | undefined;
const typingIntervalMs = typingIntervalSeconds * 1000;
@@ -30,7 +39,15 @@ export function createTypingController(params: {
return `${Math.round(ms / 1000)}s`;
};
const resetCycle = () => {
started = false;
active = false;
runComplete = false;
dispatchIdle = false;
};
const cleanup = () => {
if (sealed) return;
if (typingTtlTimer) {
clearTimeout(typingTtlTimer);
typingTtlTimer = undefined;
@@ -39,9 +56,12 @@ export function createTypingController(params: {
clearInterval(typingTimer);
typingTimer = undefined;
}
resetCycle();
sealed = true;
};
const refreshTypingTtl = () => {
if (sealed) return;
if (!typingIntervalMs || typingIntervalMs <= 0) return;
if (typingTtlMs <= 0) return;
if (typingTtlTimer) {
@@ -57,16 +77,30 @@ export function createTypingController(params: {
};
const triggerTyping = async () => {
if (sealed) return;
await onReplyStart?.();
};
const ensureStart = async () => {
if (sealed) return;
// Late callbacks after a run completed should never restart typing.
if (runComplete) return;
if (!active) {
active = true;
}
if (started) return;
started = true;
await triggerTyping();
};
const maybeStopOnIdle = () => {
if (!active) return;
// Stop only when the model run is done and the dispatcher queue is empty.
if (runComplete && dispatchIdle) cleanup();
};
const startTypingLoop = async () => {
if (sealed) return;
if (!onReplyStart) return;
if (typingIntervalMs <= 0) return;
if (typingTimer) return;
@@ -78,6 +112,7 @@ export function createTypingController(params: {
};
const startTypingOnText = async (text?: string) => {
if (sealed) return;
const trimmed = text?.trim();
if (!trimmed) return;
if (silentToken && trimmed === silentToken) return;
@@ -85,11 +120,23 @@ export function createTypingController(params: {
await startTypingLoop();
};
const markRunComplete = () => {
runComplete = true;
maybeStopOnIdle();
};
const markDispatchIdle = () => {
dispatchIdle = true;
maybeStopOnIdle();
};
return {
onReplyStart: ensureStart,
startTypingLoop,
startTypingOnText,
refreshTypingTtl,
markRunComplete,
markDispatchIdle,
cleanup,
};
}