refactor(auto-reply): split reply flow

This commit is contained in:
Peter Steinberger
2026-01-04 05:47:21 +01:00
parent fd91da2b7f
commit 72a9e58777
17 changed files with 3128 additions and 2235 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,16 @@
const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]);
const ABORT_MEMORY = new Map<string, boolean>();
export function isAbortTrigger(text?: string): boolean {
if (!text) return false;
const normalized = text.trim().toLowerCase();
return ABORT_TRIGGERS.has(normalized);
}
export function getAbortMemory(key: string): boolean | undefined {
return ABORT_MEMORY.get(key);
}
export function setAbortMemory(key: string, value: boolean): void {
ABORT_MEMORY.set(key, value);
}

View File

@@ -0,0 +1,449 @@
import crypto from "node:crypto";
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import {
queueEmbeddedPiMessage,
runEmbeddedPiAgent,
} from "../../agents/pi-embedded.js";
import {
loadSessionStore,
type SessionEntry,
saveSessionStore,
} from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { defaultRuntime } from "../../runtime.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import type { TemplateContext } from "../templating.js";
import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { createFollowupRunner } from "./followup-runner.js";
import {
enqueueFollowupRun,
type FollowupRun,
type QueueSettings,
scheduleFollowupDrain,
} from "./queue.js";
import { extractReplyToTag } from "./reply-tags.js";
import type { TypingController } from "./typing.js";
export async function runReplyAgent(params: {
commandBody: string;
followupRun: FollowupRun;
queueKey: string;
resolvedQueue: QueueSettings;
shouldSteer: boolean;
shouldFollowup: boolean;
isActive: boolean;
isStreaming: boolean;
opts?: GetReplyOptions;
typing: TypingController;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
defaultModel: string;
agentCfgContextTokens?: number;
resolvedVerboseLevel: VerboseLevel;
isNewSession: boolean;
blockStreamingEnabled: boolean;
blockReplyChunking?: {
minChars: number;
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
};
resolvedBlockStreamingBreak: "text_end" | "message_end";
sessionCtx: TemplateContext;
shouldInjectGroupIntro: boolean;
}): Promise<ReplyPayload | ReplyPayload[] | undefined> {
const {
commandBody,
followupRun,
queueKey,
resolvedQueue,
shouldSteer,
shouldFollowup,
isActive,
isStreaming,
opts,
typing,
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultModel,
agentCfgContextTokens,
resolvedVerboseLevel,
isNewSession,
blockStreamingEnabled,
blockReplyChunking,
resolvedBlockStreamingBreak,
sessionCtx,
shouldInjectGroupIntro,
} = params;
const shouldEmitToolResult = () => {
if (!sessionKey || !storePath) {
return resolvedVerboseLevel === "on";
}
try {
const store = loadSessionStore(storePath);
const entry = store[sessionKey];
const current = normalizeVerboseLevel(entry?.verboseLevel);
if (current) return current === "on";
} catch {
// ignore store read failures
}
return resolvedVerboseLevel === "on";
};
const streamedPayloadKeys = new Set<string>();
const pendingStreamedPayloadKeys = new Set<string>();
const pendingBlockTasks = new Set<Promise<void>>();
let didStreamBlockReply = false;
const buildPayloadKey = (payload: ReplyPayload) => {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
return JSON.stringify({
text,
mediaList,
replyToId: payload.replyToId ?? null,
});
};
if (shouldSteer && isStreaming) {
const steered = queueEmbeddedPiMessage(
followupRun.run.sessionId,
followupRun.prompt,
);
if (steered && !shouldFollowup) {
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
typing.cleanup();
return undefined;
}
}
if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
typing.cleanup();
return undefined;
}
const runFollowupTurn = createFollowupRunner({
opts,
typing,
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultModel,
agentCfgContextTokens,
});
const finalizeWithFollowup = <T>(value: T): T => {
scheduleFollowupDrain(queueKey, runFollowupTurn);
return value;
};
let didLogHeartbeatStrip = false;
try {
const runId = crypto.randomUUID();
if (sessionKey) {
registerAgentRunContext(runId, { sessionKey });
}
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
try {
runResult = await runEmbeddedPiAgent({
sessionId: followupRun.run.sessionId,
sessionKey,
surface: sessionCtx.Surface?.trim().toLowerCase() || undefined,
sessionFile: followupRun.run.sessionFile,
workspaceDir: followupRun.run.workspaceDir,
config: followupRun.run.config,
skillsSnapshot: followupRun.run.skillsSnapshot,
prompt: commandBody,
extraSystemPrompt: followupRun.run.extraSystemPrompt,
ownerNumbers: followupRun.run.ownerNumbers,
enforceFinalTag: followupRun.run.enforceFinalTag,
provider: followupRun.run.provider,
model: followupRun.run.model,
thinkLevel: followupRun.run.thinkLevel,
verboseLevel: followupRun.run.verboseLevel,
timeoutMs: followupRun.run.timeoutMs,
runId,
blockReplyBreak: resolvedBlockStreamingBreak,
blockReplyChunking,
onPartialReply: opts?.onPartialReply
? async (payload) => {
let text = payload.text;
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, { mode: "message" });
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
await typing.startTypingOnText(text);
await opts.onPartialReply?.({
text,
mediaUrls: payload.mediaUrls,
});
}
: undefined,
onBlockReply:
blockStreamingEnabled && opts?.onBlockReply
? async (payload) => {
let text = payload.text;
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return;
text = stripped.text;
}
const tagResult = extractReplyToTag(
text,
sessionCtx.MessageSid,
);
const cleaned = tagResult.cleaned || undefined;
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (!cleaned && !hasMedia) return;
if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia) return;
const blockPayload: ReplyPayload = {
text: cleaned,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
replyToId: tagResult.replyToId,
};
const payloadKey = buildPayloadKey(blockPayload);
if (
streamedPayloadKeys.has(payloadKey) ||
pendingStreamedPayloadKeys.has(payloadKey)
) {
return;
}
pendingStreamedPayloadKeys.add(payloadKey);
const task = (async () => {
await typing.startTypingOnText(cleaned);
await opts.onBlockReply?.(blockPayload);
})()
.then(() => {
streamedPayloadKeys.add(payloadKey);
didStreamBlockReply = true;
})
.catch((err) => {
logVerbose(`block reply delivery failed: ${String(err)}`);
})
.finally(() => {
pendingStreamedPayloadKeys.delete(payloadKey);
});
pendingBlockTasks.add(task);
void task.finally(() => pendingBlockTasks.delete(task));
}
: undefined,
shouldEmitToolResult,
onToolResult: opts?.onToolResult
? async (payload) => {
let text = payload.text;
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, { mode: "message" });
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
await typing.startTypingOnText(text);
await opts.onToolResult?.({ text, mediaUrls: payload.mediaUrls });
}
: undefined,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const isContextOverflow =
/context.*overflow|too large|context window/i.test(message);
defaultRuntime.error(`Embedded agent failed before reply: ${message}`);
return finalizeWithFollowup({
text: isContextOverflow
? "⚠️ Context overflow - conversation too long. Starting fresh might help!"
: `⚠️ Agent failed before reply: ${message}. Check gateway logs for details.`,
});
}
if (
shouldInjectGroupIntro &&
sessionEntry &&
sessionStore &&
sessionKey &&
sessionEntry.groupActivationNeedsSystemIntro
) {
sessionEntry.groupActivationNeedsSystemIntro = false;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return finalizeWithFollowup(undefined);
if (pendingBlockTasks.size > 0) {
await Promise.allSettled(pendingBlockTasks);
}
const sanitizedPayloads = opts?.isHeartbeat
? payloadArray
: payloadArray.flatMap((payload) => {
const text = payload.text;
if (!text || !text.includes("HEARTBEAT_OK")) return [payload];
const stripped = stripHeartbeatToken(text, { mode: "message" });
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
const hasMedia =
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return [];
return [{ ...payload, text: stripped.text }];
});
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
.map((payload) => {
const { cleaned, replyToId } = extractReplyToTag(
payload.text,
sessionCtx.MessageSid,
);
return {
...payload,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? payload.replyToId,
};
})
.filter(
(payload) =>
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
const shouldDropFinalPayloads =
blockStreamingEnabled && didStreamBlockReply;
const filteredPayloads = shouldDropFinalPayloads
? []
: blockStreamingEnabled
? replyTaggedPayloads.filter(
(payload) => !streamedPayloadKeys.has(buildPayloadKey(payload)),
)
: replyTaggedPayloads;
if (filteredPayloads.length === 0) return finalizeWithFollowup(undefined);
const shouldSignalTyping = filteredPayloads.some((payload) => {
const trimmed = payload.text?.trim();
if (trimmed && trimmed !== SILENT_REPLY_TOKEN) return true;
if (payload.mediaUrl) return true;
if (payload.mediaUrls && payload.mediaUrls.length > 0) return true;
return false;
});
if (shouldSignalTyping) {
await typing.startTypingLoop();
}
if (sessionStore && sessionKey) {
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? defaultModel;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (usage) {
const entry = sessionEntry ?? sessionStore[sessionKey];
if (entry) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
const nextEntry = {
...entry,
inputTokens: input,
outputTokens: output,
totalTokens:
promptTokens > 0 ? promptTokens : (usage.total ?? input),
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
};
sessionStore[sessionKey] = nextEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
} else if (modelUsed || contextTokensUsed) {
const entry = sessionEntry ?? sessionStore[sessionKey];
if (entry) {
sessionStore[sessionKey] = {
...entry,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
}
}
// If verbose is enabled and this is a new session, prepend a session hint.
let finalPayloads = filteredPayloads;
if (resolvedVerboseLevel === "on" && isNewSession) {
finalPayloads = [
{ text: `🧭 New session: ${followupRun.run.sessionId}` },
...finalPayloads,
];
}
return finalizeWithFollowup(
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
);
} finally {
typing.cleanup();
}
}

View File

@@ -0,0 +1,51 @@
import type { ClawdisConfig } from "../../config/config.js";
import { resolveTextChunkLimit, type TextChunkSurface } from "../chunk.js";
const DEFAULT_BLOCK_STREAM_MIN = 800;
const DEFAULT_BLOCK_STREAM_MAX = 1200;
const BLOCK_CHUNK_SURFACES = new Set<TextChunkSurface>([
"whatsapp",
"telegram",
"discord",
"signal",
"imessage",
"webchat",
]);
function normalizeChunkSurface(surface?: string): TextChunkSurface | undefined {
if (!surface) return undefined;
const cleaned = surface.trim().toLowerCase();
return BLOCK_CHUNK_SURFACES.has(cleaned as TextChunkSurface)
? (cleaned as TextChunkSurface)
: undefined;
}
export function resolveBlockStreamingChunking(
cfg: ClawdisConfig | undefined,
surface?: string,
): {
minChars: number;
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
} {
const surfaceKey = normalizeChunkSurface(surface);
const textLimit = resolveTextChunkLimit(cfg, surfaceKey);
const chunkCfg = cfg?.agent?.blockStreamingChunk;
const maxRequested = Math.max(
1,
Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX),
);
const maxChars = Math.max(1, Math.min(maxRequested, textLimit));
const minRequested = Math.max(
1,
Math.floor(chunkCfg?.minChars ?? DEFAULT_BLOCK_STREAM_MIN),
);
const minChars = Math.min(minRequested, maxChars);
const breakPreference =
chunkCfg?.breakPreference === "newline" ||
chunkCfg?.breakPreference === "sentence"
? chunkCfg.breakPreference
: "paragraph";
return { minChars, maxChars, breakPreference };
}

View File

@@ -0,0 +1,41 @@
import type { SessionEntry } from "../../config/sessions.js";
import { saveSessionStore } from "../../config/sessions.js";
import { setAbortMemory } from "./abort.js";
export async function applySessionHints(params: {
baseBody: string;
abortedLastRun: boolean;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
abortKey?: string;
messageId?: string;
}): Promise<string> {
let prefixedBodyBase = params.baseBody;
const abortedHint = params.abortedLastRun
? "Note: The previous agent run was aborted by the user. Resume carefully or ask for clarification."
: "";
if (abortedHint) {
prefixedBodyBase = `${abortedHint}\n\n${prefixedBodyBase}`;
if (params.sessionEntry && params.sessionStore && params.sessionKey) {
params.sessionEntry.abortedLastRun = false;
params.sessionEntry.updatedAt = Date.now();
params.sessionStore[params.sessionKey] = params.sessionEntry;
if (params.storePath) {
await saveSessionStore(params.storePath, params.sessionStore);
}
} else if (params.abortKey) {
setAbortMemory(params.abortKey, false);
}
}
const messageIdHint = params.messageId?.trim()
? `[message_id: ${params.messageId.trim()}]`
: "";
if (messageIdHint) {
prefixedBodyBase = `${prefixedBodyBase}\n${messageIdHint}`;
}
return prefixedBodyBase;
}

View File

@@ -0,0 +1,303 @@
import type { ClawdisConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { triggerClawdisRestart } from "../../infra/restart.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { normalizeE164 } from "../../utils.js";
import { resolveHeartbeatSeconds } from "../../web/reconnect.js";
import { getWebAuthAgeMs, webAuthExists } from "../../web/session.js";
import {
normalizeGroupActivation,
parseActivationCommand,
} from "../group-activation.js";
import { parseSendPolicyCommand } from "../send-policy.js";
import { buildStatusMessage } from "../status.js";
import type { MsgContext } from "../templating.js";
import type { ThinkLevel, VerboseLevel } from "../thinking.js";
import type { ReplyPayload } from "../types.js";
import { isAbortTrigger, setAbortMemory } from "./abort.js";
import { stripMentions } from "./mentions.js";
export type CommandContext = {
surface: string;
isWhatsAppSurface: boolean;
ownerList: string[];
isOwnerSender: boolean;
senderE164?: string;
abortKey?: string;
rawBodyNormalized: string;
commandBodyNormalized: string;
from?: string;
to?: string;
};
export function buildCommandContext(params: {
ctx: MsgContext;
cfg: ClawdisConfig;
sessionKey?: string;
isGroup: boolean;
triggerBodyNormalized: string;
}): CommandContext {
const { ctx, cfg, sessionKey, isGroup, triggerBodyNormalized } = params;
const surface = (ctx.Surface ?? "").trim().toLowerCase();
const isWhatsAppSurface =
surface === "whatsapp" ||
(ctx.From ?? "").startsWith("whatsapp:") ||
(ctx.To ?? "").startsWith("whatsapp:");
const configuredAllowFrom = isWhatsAppSurface
? cfg.whatsapp?.allowFrom
: undefined;
const from = (ctx.From ?? "").replace(/^whatsapp:/, "");
const to = (ctx.To ?? "").replace(/^whatsapp:/, "");
const defaultAllowFrom =
isWhatsAppSurface &&
(!configuredAllowFrom || configuredAllowFrom.length === 0) &&
to
? [to]
: undefined;
const allowFrom =
configuredAllowFrom && configuredAllowFrom.length > 0
? configuredAllowFrom
: defaultAllowFrom;
const abortKey = sessionKey ?? (from || undefined) ?? (to || undefined);
const rawBodyNormalized = triggerBodyNormalized;
const commandBodyNormalized = isGroup
? stripMentions(rawBodyNormalized, ctx, cfg)
: rawBodyNormalized;
const senderE164 = normalizeE164(ctx.SenderE164 ?? "");
const ownerCandidates = isWhatsAppSurface
? (allowFrom ?? []).filter((entry) => entry && entry !== "*")
: [];
if (isWhatsAppSurface && ownerCandidates.length === 0 && to) {
ownerCandidates.push(to);
}
const ownerList = ownerCandidates
.map((entry) => normalizeE164(entry))
.filter((entry): entry is string => Boolean(entry));
const isOwnerSender =
Boolean(senderE164) && ownerList.includes(senderE164 ?? "");
return {
surface,
isWhatsAppSurface,
ownerList,
isOwnerSender,
senderE164: senderE164 || undefined,
abortKey,
rawBodyNormalized,
commandBodyNormalized,
from: from || undefined,
to: to || undefined,
};
}
export async function handleCommands(params: {
ctx: MsgContext;
cfg: ClawdisConfig;
command: CommandContext;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
sessionScope: string;
workspaceDir: string;
defaultGroupActivation: () => "always" | "mention";
resolvedThinkLevel?: ThinkLevel;
resolvedVerboseLevel: VerboseLevel;
resolveDefaultThinkingLevel: () => Promise<ThinkLevel | undefined>;
provider: string;
model: string;
contextTokens: number;
isGroup: boolean;
}): Promise<{
reply?: ReplyPayload;
shouldContinue: boolean;
}> {
const {
cfg,
command,
sessionEntry,
sessionStore,
sessionKey,
storePath,
sessionScope,
workspaceDir,
defaultGroupActivation,
resolvedThinkLevel,
resolvedVerboseLevel,
resolveDefaultThinkingLevel,
model,
contextTokens,
isGroup,
} = params;
const activationCommand = parseActivationCommand(
command.commandBodyNormalized,
);
const sendPolicyCommand = parseSendPolicyCommand(
command.commandBodyNormalized,
);
if (activationCommand.hasCommand) {
if (!isGroup) {
return {
shouldContinue: false,
reply: { text: "⚙️ Group activation only applies to group chats." },
};
}
if (!command.isOwnerSender) {
logVerbose(
`Ignoring /activation from non-owner in group: ${command.senderE164 || "<unknown>"}`,
);
return { shouldContinue: false };
}
if (!activationCommand.mode) {
return {
shouldContinue: false,
reply: { text: "⚙️ Usage: /activation mention|always" },
};
}
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.groupActivation = activationCommand.mode;
sessionEntry.groupActivationNeedsSystemIntro = true;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
return {
shouldContinue: false,
reply: { text: `⚙️ Group activation set to ${activationCommand.mode}.` },
};
}
if (sendPolicyCommand.hasCommand) {
if (!command.isOwnerSender) {
logVerbose(
`Ignoring /send from non-owner: ${command.senderE164 || "<unknown>"}`,
);
return { shouldContinue: false };
}
if (!sendPolicyCommand.mode) {
return {
shouldContinue: false,
reply: { text: "⚙️ Usage: /send on|off|inherit" },
};
}
if (sessionEntry && sessionStore && sessionKey) {
if (sendPolicyCommand.mode === "inherit") {
delete sessionEntry.sendPolicy;
} else {
sessionEntry.sendPolicy = sendPolicyCommand.mode;
}
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
const label =
sendPolicyCommand.mode === "inherit"
? "inherit"
: sendPolicyCommand.mode === "allow"
? "on"
: "off";
return {
shouldContinue: false,
reply: { text: `⚙️ Send policy set to ${label}.` },
};
}
if (
command.commandBodyNormalized === "/restart" ||
command.commandBodyNormalized === "restart" ||
command.commandBodyNormalized.startsWith("/restart ")
) {
if (isGroup && !command.isOwnerSender) {
logVerbose(
`Ignoring /restart from non-owner in group: ${command.senderE164 || "<unknown>"}`,
);
return { shouldContinue: false };
}
const restartMethod = triggerClawdisRestart();
return {
shouldContinue: false,
reply: {
text: `⚙️ Restarting clawdis via ${restartMethod}; give me a few seconds to come back online.`,
},
};
}
if (
command.commandBodyNormalized === "/status" ||
command.commandBodyNormalized === "status" ||
command.commandBodyNormalized.startsWith("/status ")
) {
if (isGroup && !command.isOwnerSender) {
logVerbose(
`Ignoring /status from non-owner in group: ${command.senderE164 || "<unknown>"}`,
);
return { shouldContinue: false };
}
const webLinked = await webAuthExists();
const webAuthAgeMs = getWebAuthAgeMs();
const heartbeatSeconds = resolveHeartbeatSeconds(cfg, undefined);
const groupActivation = isGroup
? (normalizeGroupActivation(sessionEntry?.groupActivation) ??
defaultGroupActivation())
: undefined;
const statusText = buildStatusMessage({
agent: {
model,
contextTokens,
thinkingDefault: cfg.agent?.thinkingDefault,
verboseDefault: cfg.agent?.verboseDefault,
},
workspaceDir,
sessionEntry,
sessionKey,
sessionScope,
storePath,
groupActivation,
resolvedThink:
resolvedThinkLevel ?? (await resolveDefaultThinkingLevel()),
resolvedVerbose: resolvedVerboseLevel,
webLinked,
webAuthAgeMs,
heartbeatSeconds,
});
return { shouldContinue: false, reply: { text: statusText } };
}
const abortRequested = isAbortTrigger(command.rawBodyNormalized);
if (abortRequested) {
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.abortedLastRun = true;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
} else if (command.abortKey) {
setAbortMemory(command.abortKey, true);
}
return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } };
}
const sendPolicy = resolveSendPolicy({
cfg,
entry: sessionEntry,
sessionKey,
surface: sessionEntry?.surface ?? command.surface,
chatType: sessionEntry?.chatType,
});
if (sendPolicy === "deny") {
logVerbose(`Send blocked by policy for session ${sessionKey ?? "unknown"}`);
return { shouldContinue: false };
}
return { shouldContinue: true };
}

View File

@@ -0,0 +1,513 @@
import { lookupContextTokens } from "../../agents/context.js";
import {
DEFAULT_CONTEXT_TOKENS,
DEFAULT_MODEL,
DEFAULT_PROVIDER,
} from "../../agents/defaults.js";
import {
buildModelAliasIndex,
type ModelAliasIndex,
modelKey,
resolveConfiguredModelRef,
resolveModelRefFromString,
} from "../../agents/model-selection.js";
import type { ClawdisConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { extractModelDirective } from "../model.js";
import type { MsgContext } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import {
extractThinkDirective,
extractVerboseDirective,
type ThinkLevel,
type VerboseLevel,
} from "./directives.js";
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
import {
type ModelDirectiveSelection,
resolveModelDirectiveSelection,
} from "./model-selection.js";
import {
extractQueueDirective,
type QueueDropPolicy,
type QueueMode,
} from "./queue.js";
const SYSTEM_MARK = "⚙️";
export type InlineDirectives = {
cleaned: string;
hasThinkDirective: boolean;
thinkLevel?: ThinkLevel;
rawThinkLevel?: string;
hasVerboseDirective: boolean;
verboseLevel?: VerboseLevel;
rawVerboseLevel?: string;
hasModelDirective: boolean;
rawModelDirective?: string;
hasQueueDirective: boolean;
queueMode?: QueueMode;
queueReset: boolean;
rawQueueMode?: string;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
rawDebounce?: string;
rawCap?: string;
rawDrop?: string;
hasQueueOptions: boolean;
};
export function parseInlineDirectives(body: string): InlineDirectives {
const {
cleaned: thinkCleaned,
thinkLevel,
rawLevel: rawThinkLevel,
hasDirective: hasThinkDirective,
} = extractThinkDirective(body);
const {
cleaned: verboseCleaned,
verboseLevel,
rawLevel: rawVerboseLevel,
hasDirective: hasVerboseDirective,
} = extractVerboseDirective(thinkCleaned);
const {
cleaned: modelCleaned,
rawModel,
hasDirective: hasModelDirective,
} = extractModelDirective(verboseCleaned);
const {
cleaned: queueCleaned,
queueMode,
queueReset,
rawMode,
debounceMs,
cap,
dropPolicy,
rawDebounce,
rawCap,
rawDrop,
hasDirective: hasQueueDirective,
hasOptions: hasQueueOptions,
} = extractQueueDirective(modelCleaned);
return {
cleaned: queueCleaned,
hasThinkDirective,
thinkLevel,
rawThinkLevel,
hasVerboseDirective,
verboseLevel,
rawVerboseLevel,
hasModelDirective,
rawModelDirective: rawModel,
hasQueueDirective,
queueMode,
queueReset,
rawQueueMode: rawMode,
debounceMs,
cap,
dropPolicy,
rawDebounce,
rawCap,
rawDrop,
hasQueueOptions,
};
}
export function isDirectiveOnly(params: {
directives: InlineDirectives;
cleanedBody: string;
ctx: MsgContext;
cfg: ClawdisConfig;
isGroup: boolean;
}): boolean {
const { directives, cleanedBody, ctx, cfg, isGroup } = params;
if (
!directives.hasThinkDirective &&
!directives.hasVerboseDirective &&
!directives.hasModelDirective &&
!directives.hasQueueDirective
)
return false;
const stripped = stripStructuralPrefixes(cleanedBody ?? "");
const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped;
return noMentions.length === 0;
}
export async function handleDirectiveOnly(params: {
directives: InlineDirectives;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
defaultProvider: string;
defaultModel: string;
aliasIndex: ModelAliasIndex;
allowedModelKeys: Set<string>;
allowedModelCatalog: Awaited<
ReturnType<typeof import("../../agents/model-catalog.js").loadModelCatalog>
>;
resetModelOverride: boolean;
provider: string;
model: string;
initialModelLabel: string;
formatModelSwitchEvent: (label: string, alias?: string) => string;
}): Promise<ReplyPayload | undefined> {
const {
directives,
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultProvider,
defaultModel,
aliasIndex,
allowedModelKeys,
allowedModelCatalog,
resetModelOverride,
initialModelLabel,
formatModelSwitchEvent,
} = params;
if (directives.hasModelDirective) {
const isModelListAlias =
directives.rawModelDirective?.trim().toLowerCase() === "status";
if (!directives.rawModelDirective || isModelListAlias) {
if (allowedModelCatalog.length === 0) {
return { text: "No models available." };
}
const current = `${params.provider}/${params.model}`;
const defaultLabel = `${defaultProvider}/${defaultModel}`;
const header =
current === defaultLabel
? `Models (current: ${current}):`
: `Models (current: ${current}, default: ${defaultLabel}):`;
const lines = [header];
if (resetModelOverride) {
lines.push(`(previous selection reset to default)`);
}
for (const entry of allowedModelCatalog) {
const label = `${entry.provider}/${entry.id}`;
const aliases = aliasIndex.byKey.get(label);
const aliasSuffix =
aliases && aliases.length > 0
? ` (alias: ${aliases.join(", ")})`
: "";
const suffix =
entry.name && entry.name !== entry.id ? `${entry.name}` : "";
lines.push(`- ${label}${aliasSuffix}${suffix}`);
}
return { text: lines.join("\n") };
}
}
if (directives.hasThinkDirective && !directives.thinkLevel) {
return {
text: `Unrecognized thinking level "${directives.rawThinkLevel ?? ""}". Valid levels: off, minimal, low, medium, high.`,
};
}
if (directives.hasVerboseDirective && !directives.verboseLevel) {
return {
text: `Unrecognized verbose level "${directives.rawVerboseLevel ?? ""}". Valid levels: off, on.`,
};
}
const queueModeInvalid =
directives.hasQueueDirective &&
!directives.queueMode &&
!directives.queueReset &&
Boolean(directives.rawQueueMode);
const queueDebounceInvalid =
directives.hasQueueDirective &&
directives.rawDebounce !== undefined &&
typeof directives.debounceMs !== "number";
const queueCapInvalid =
directives.hasQueueDirective &&
directives.rawCap !== undefined &&
typeof directives.cap !== "number";
const queueDropInvalid =
directives.hasQueueDirective &&
directives.rawDrop !== undefined &&
!directives.dropPolicy;
if (
queueModeInvalid ||
queueDebounceInvalid ||
queueCapInvalid ||
queueDropInvalid
) {
const errors: string[] = [];
if (queueModeInvalid) {
errors.push(
`Unrecognized queue mode "${directives.rawQueueMode ?? ""}". Valid modes: steer, followup, collect, steer+backlog, interrupt.`,
);
}
if (queueDebounceInvalid) {
errors.push(
`Invalid debounce "${directives.rawDebounce ?? ""}". Use ms/s/m (e.g. debounce:1500ms, debounce:2s).`,
);
}
if (queueCapInvalid) {
errors.push(
`Invalid cap "${directives.rawCap ?? ""}". Use a positive integer (e.g. cap:10).`,
);
}
if (queueDropInvalid) {
errors.push(
`Invalid drop policy "${directives.rawDrop ?? ""}". Use drop:old, drop:new, or drop:summarize.`,
);
}
return { text: errors.join(" ") };
}
let modelSelection: ModelDirectiveSelection | undefined;
if (directives.hasModelDirective && directives.rawModelDirective) {
const resolved = resolveModelDirectiveSelection({
raw: directives.rawModelDirective,
defaultProvider,
defaultModel,
aliasIndex,
allowedModelKeys,
});
if (resolved.error) {
return { text: resolved.error };
}
modelSelection = resolved.selection;
if (modelSelection) {
const nextLabel = `${modelSelection.provider}/${modelSelection.model}`;
if (nextLabel !== initialModelLabel) {
enqueueSystemEvent(
formatModelSwitchEvent(nextLabel, modelSelection.alias),
{
contextKey: `model:${nextLabel}`,
},
);
}
}
}
if (sessionEntry && sessionStore && sessionKey) {
if (directives.hasThinkDirective && directives.thinkLevel) {
if (directives.thinkLevel === "off") delete sessionEntry.thinkingLevel;
else sessionEntry.thinkingLevel = directives.thinkLevel;
}
if (directives.hasVerboseDirective && directives.verboseLevel) {
if (directives.verboseLevel === "off") delete sessionEntry.verboseLevel;
else sessionEntry.verboseLevel = directives.verboseLevel;
}
if (modelSelection) {
if (modelSelection.isDefault) {
delete sessionEntry.providerOverride;
delete sessionEntry.modelOverride;
} else {
sessionEntry.providerOverride = modelSelection.provider;
sessionEntry.modelOverride = modelSelection.model;
}
}
if (directives.hasQueueDirective && directives.queueReset) {
delete sessionEntry.queueMode;
delete sessionEntry.queueDebounceMs;
delete sessionEntry.queueCap;
delete sessionEntry.queueDrop;
} else if (directives.hasQueueDirective) {
if (directives.queueMode) sessionEntry.queueMode = directives.queueMode;
if (typeof directives.debounceMs === "number") {
sessionEntry.queueDebounceMs = directives.debounceMs;
}
if (typeof directives.cap === "number") {
sessionEntry.queueCap = directives.cap;
}
if (directives.dropPolicy) {
sessionEntry.queueDrop = directives.dropPolicy;
}
}
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
const parts: string[] = [];
if (directives.hasThinkDirective && directives.thinkLevel) {
parts.push(
directives.thinkLevel === "off"
? "Thinking disabled."
: `Thinking level set to ${directives.thinkLevel}.`,
);
}
if (directives.hasVerboseDirective && directives.verboseLevel) {
parts.push(
directives.verboseLevel === "off"
? `${SYSTEM_MARK} Verbose logging disabled.`
: `${SYSTEM_MARK} Verbose logging enabled.`,
);
}
if (modelSelection) {
const label = `${modelSelection.provider}/${modelSelection.model}`;
const labelWithAlias = modelSelection.alias
? `${modelSelection.alias} (${label})`
: label;
parts.push(
modelSelection.isDefault
? `Model reset to default (${labelWithAlias}).`
: `Model set to ${labelWithAlias}.`,
);
}
if (directives.hasQueueDirective && directives.queueMode) {
parts.push(`${SYSTEM_MARK} Queue mode set to ${directives.queueMode}.`);
} else if (directives.hasQueueDirective && directives.queueReset) {
parts.push(`${SYSTEM_MARK} Queue mode reset to default.`);
}
if (
directives.hasQueueDirective &&
typeof directives.debounceMs === "number"
) {
parts.push(
`${SYSTEM_MARK} Queue debounce set to ${directives.debounceMs}ms.`,
);
}
if (directives.hasQueueDirective && typeof directives.cap === "number") {
parts.push(`${SYSTEM_MARK} Queue cap set to ${directives.cap}.`);
}
if (directives.hasQueueDirective && directives.dropPolicy) {
parts.push(`${SYSTEM_MARK} Queue drop set to ${directives.dropPolicy}.`);
}
const ack = parts.join(" ").trim();
return { text: ack || "OK." };
}
export async function persistInlineDirectives(params: {
directives: InlineDirectives;
effectiveModelDirective?: string;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
defaultProvider: string;
defaultModel: string;
aliasIndex: ModelAliasIndex;
allowedModelKeys: Set<string>;
provider: string;
model: string;
initialModelLabel: string;
formatModelSwitchEvent: (label: string, alias?: string) => string;
agentCfg: ClawdisConfig["agent"] | undefined;
}): Promise<{ provider: string; model: string; contextTokens: number }> {
const {
directives,
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultProvider,
defaultModel,
aliasIndex,
allowedModelKeys,
initialModelLabel,
formatModelSwitchEvent,
agentCfg,
} = params;
let { provider, model } = params;
if (sessionEntry && sessionStore && sessionKey) {
let updated = false;
if (directives.hasThinkDirective && directives.thinkLevel) {
if (directives.thinkLevel === "off") {
delete sessionEntry.thinkingLevel;
} else {
sessionEntry.thinkingLevel = directives.thinkLevel;
}
updated = true;
}
if (directives.hasVerboseDirective && directives.verboseLevel) {
if (directives.verboseLevel === "off") {
delete sessionEntry.verboseLevel;
} else {
sessionEntry.verboseLevel = directives.verboseLevel;
}
updated = true;
}
const modelDirective =
directives.hasModelDirective && params.effectiveModelDirective
? params.effectiveModelDirective
: undefined;
if (modelDirective) {
const resolved = resolveModelRefFromString({
raw: modelDirective,
defaultProvider,
aliasIndex,
});
if (resolved) {
const key = modelKey(resolved.ref.provider, resolved.ref.model);
if (allowedModelKeys.size === 0 || allowedModelKeys.has(key)) {
const isDefault =
resolved.ref.provider === defaultProvider &&
resolved.ref.model === defaultModel;
if (isDefault) {
delete sessionEntry.providerOverride;
delete sessionEntry.modelOverride;
} else {
sessionEntry.providerOverride = resolved.ref.provider;
sessionEntry.modelOverride = resolved.ref.model;
}
provider = resolved.ref.provider;
model = resolved.ref.model;
const nextLabel = `${provider}/${model}`;
if (nextLabel !== initialModelLabel) {
enqueueSystemEvent(
formatModelSwitchEvent(nextLabel, resolved.alias),
{
contextKey: `model:${nextLabel}`,
},
);
}
updated = true;
}
}
}
if (directives.hasQueueDirective && directives.queueReset) {
delete sessionEntry.queueMode;
delete sessionEntry.queueDebounceMs;
delete sessionEntry.queueCap;
delete sessionEntry.queueDrop;
updated = true;
}
if (updated) {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
}
return {
provider,
model,
contextTokens:
agentCfg?.contextTokens ??
lookupContextTokens(model) ??
DEFAULT_CONTEXT_TOKENS,
};
}
export function resolveDefaultModel(params: { cfg: ClawdisConfig }): {
defaultProvider: string;
defaultModel: string;
aliasIndex: ModelAliasIndex;
} {
const mainModel = resolveConfiguredModelRef({
cfg: params.cfg,
defaultProvider: DEFAULT_PROVIDER,
defaultModel: DEFAULT_MODEL,
});
const defaultProvider = mainModel.provider;
const defaultModel = mainModel.model;
const aliasIndex = buildModelAliasIndex({
cfg: params.cfg,
defaultProvider,
});
return { defaultProvider, defaultModel, aliasIndex };
}

View File

@@ -0,0 +1,53 @@
import {
normalizeThinkLevel,
normalizeVerboseLevel,
type ThinkLevel,
type VerboseLevel,
} from "../thinking.js";
export function extractThinkDirective(body?: string): {
cleaned: string;
thinkLevel?: ThinkLevel;
rawLevel?: string;
hasDirective: boolean;
} {
if (!body) return { cleaned: "", hasDirective: false };
// Match the longest keyword first to avoid partial captures (e.g. "/think:high")
const match = body.match(
/(?:^|\s)\/(?:thinking|think|t)\s*:?\s*([a-zA-Z-]+)\b/i,
);
const thinkLevel = normalizeThinkLevel(match?.[1]);
const cleaned = match
? body.replace(match[0], "").replace(/\s+/g, " ").trim()
: body.trim();
return {
cleaned,
thinkLevel,
rawLevel: match?.[1],
hasDirective: !!match,
};
}
export function extractVerboseDirective(body?: string): {
cleaned: string;
verboseLevel?: VerboseLevel;
rawLevel?: string;
hasDirective: boolean;
} {
if (!body) return { cleaned: "", hasDirective: false };
const match = body.match(
/(?:^|\s)\/(?:verbose|v)(?=$|\s|:)\s*:?\s*([a-zA-Z-]+)\b/i,
);
const verboseLevel = normalizeVerboseLevel(match?.[1]);
const cleaned = match
? body.replace(match[0], "").replace(/\s+/g, " ").trim()
: body.trim();
return {
cleaned,
verboseLevel,
rawLevel: match?.[1],
hasDirective: !!match,
};
}
export type { ThinkLevel, VerboseLevel };

View File

@@ -0,0 +1,168 @@
import crypto from "node:crypto";
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { defaultRuntime } from "../../runtime.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { FollowupRun } from "./queue.js";
import { extractReplyToTag } from "./reply-tags.js";
import type { TypingController } from "./typing.js";
export function createFollowupRunner(params: {
opts?: GetReplyOptions;
typing: TypingController;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
defaultModel: string;
agentCfgContextTokens?: number;
}): (queued: FollowupRun) => Promise<void> {
const {
opts,
typing,
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultModel,
agentCfgContextTokens,
} = params;
const sendFollowupPayloads = async (payloads: ReplyPayload[]) => {
if (!opts?.onBlockReply) {
logVerbose("followup queue: no onBlockReply handler; dropping payloads");
return;
}
for (const payload of payloads) {
if (!payload?.text && !payload?.mediaUrl && !payload?.mediaUrls?.length) {
continue;
}
if (
payload.text?.trim() === SILENT_REPLY_TOKEN &&
!payload.mediaUrl &&
!payload.mediaUrls?.length
) {
continue;
}
await typing.startTypingOnText(payload.text);
await opts.onBlockReply(payload);
}
};
return async (queued: FollowupRun) => {
const runId = crypto.randomUUID();
if (queued.run.sessionKey) {
registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey });
}
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
try {
runResult = await runEmbeddedPiAgent({
sessionId: queued.run.sessionId,
sessionKey: queued.run.sessionKey,
surface: queued.run.surface,
sessionFile: queued.run.sessionFile,
workspaceDir: queued.run.workspaceDir,
config: queued.run.config,
skillsSnapshot: queued.run.skillsSnapshot,
prompt: queued.prompt,
extraSystemPrompt: queued.run.extraSystemPrompt,
ownerNumbers: queued.run.ownerNumbers,
enforceFinalTag: queued.run.enforceFinalTag,
provider: queued.run.provider,
model: queued.run.model,
thinkLevel: queued.run.thinkLevel,
verboseLevel: queued.run.verboseLevel,
timeoutMs: queued.run.timeoutMs,
runId,
blockReplyBreak: queued.run.blockReplyBreak,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
defaultRuntime.error?.(`Followup agent failed before reply: ${message}`);
return;
}
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return;
const sanitizedPayloads = payloadArray.flatMap((payload) => {
const text = payload.text;
if (!text || !text.includes("HEARTBEAT_OK")) return [payload];
const stripped = stripHeartbeatToken(text, { mode: "message" });
const hasMedia =
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return [];
return [{ ...payload, text: stripped.text }];
});
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
.map((payload) => {
const { cleaned, replyToId } = extractReplyToTag(payload.text);
return {
...payload,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? payload.replyToId,
};
})
.filter(
(payload) =>
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
if (replyTaggedPayloads.length === 0) return;
if (sessionStore && sessionKey) {
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? defaultModel;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (usage) {
const entry = sessionStore[sessionKey];
if (entry) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
sessionStore[sessionKey] = {
...entry,
inputTokens: input,
outputTokens: output,
totalTokens:
promptTokens > 0 ? promptTokens : (usage.total ?? input),
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
} else if (modelUsed || contextTokensUsed) {
const entry = sessionStore[sessionKey];
if (entry) {
sessionStore[sessionKey] = {
...entry,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
}
}
await sendFollowupPayloads(replyTaggedPayloads);
};
}

View File

@@ -0,0 +1,108 @@
import type { ClawdisConfig } from "../../config/config.js";
import type {
GroupKeyResolution,
SessionEntry,
} from "../../config/sessions.js";
import { normalizeGroupActivation } from "../group-activation.js";
import type { TemplateContext } from "../templating.js";
export function resolveGroupRequireMention(params: {
cfg: ClawdisConfig;
ctx: TemplateContext;
groupResolution?: GroupKeyResolution;
}): boolean {
const { cfg, ctx, groupResolution } = params;
const surface = groupResolution?.surface ?? ctx.Surface?.trim().toLowerCase();
const groupId = groupResolution?.id ?? ctx.From?.replace(/^group:/, "");
if (surface === "telegram") {
if (groupId) {
const groupConfig = cfg.telegram?.groups?.[groupId];
if (typeof groupConfig?.requireMention === "boolean") {
return groupConfig.requireMention;
}
}
const groupDefault = cfg.telegram?.groups?.["*"]?.requireMention;
if (typeof groupDefault === "boolean") return groupDefault;
return true;
}
if (surface === "whatsapp") {
if (groupId) {
const groupConfig = cfg.whatsapp?.groups?.[groupId];
if (typeof groupConfig?.requireMention === "boolean") {
return groupConfig.requireMention;
}
}
const groupDefault = cfg.whatsapp?.groups?.["*"]?.requireMention;
if (typeof groupDefault === "boolean") return groupDefault;
return true;
}
if (surface === "imessage") {
if (groupId) {
const groupConfig = cfg.imessage?.groups?.[groupId];
if (typeof groupConfig?.requireMention === "boolean") {
return groupConfig.requireMention;
}
}
const groupDefault = cfg.imessage?.groups?.["*"]?.requireMention;
if (typeof groupDefault === "boolean") return groupDefault;
return true;
}
return true;
}
export function defaultGroupActivation(
requireMention: boolean,
): "always" | "mention" {
return requireMention === false ? "always" : "mention";
}
export function buildGroupIntro(params: {
sessionCtx: TemplateContext;
sessionEntry?: SessionEntry;
defaultActivation: "always" | "mention";
silentToken: string;
}): string {
const activation =
normalizeGroupActivation(params.sessionEntry?.groupActivation) ??
params.defaultActivation;
const subject = params.sessionCtx.GroupSubject?.trim();
const members = params.sessionCtx.GroupMembers?.trim();
const surface = params.sessionCtx.Surface?.trim().toLowerCase();
const surfaceLabel = (() => {
if (!surface) return "chat";
if (surface === "whatsapp") return "WhatsApp";
if (surface === "telegram") return "Telegram";
if (surface === "discord") return "Discord";
if (surface === "webchat") return "WebChat";
return `${surface.at(0)?.toUpperCase() ?? ""}${surface.slice(1)}`;
})();
const subjectLine = subject
? `You are replying inside the ${surfaceLabel} group "${subject}".`
: `You are replying inside a ${surfaceLabel} group chat.`;
const membersLine = members ? `Group members: ${members}.` : undefined;
const activationLine =
activation === "always"
? "Activation: always-on (you receive every group message)."
: "Activation: trigger-only (you are invoked only when explicitly mentioned; recent context may be included).";
const silenceLine =
activation === "always"
? `If no response is needed, reply with exactly "${params.silentToken}" (no other text) so Clawdis stays silent.`
: undefined;
const cautionLine =
activation === "always"
? "Be extremely selective: reply only when you are directly addressed, asked a question, or can add clear value. Otherwise stay silent."
: undefined;
const lurkLine =
"Be a good group participant: lurk and follow the conversation, but only chime in when you have something genuinely helpful or relevant to add. Don't feel obligated to respond to every message — quality over quantity. Even when lurking silently, you can use emoji reactions to acknowledge messages, show support, or react to humor — reactions are always appreciated and don't clutter the chat.";
return [
subjectLine,
membersLine,
activationLine,
silenceLine,
cautionLine,
lurkLine,
]
.filter(Boolean)
.join(" ")
.concat(" Address the specific sender noted in the message context.");
}

View File

@@ -0,0 +1,45 @@
import type { ClawdisConfig } from "../../config/config.js";
import type { MsgContext } from "../templating.js";
export function stripStructuralPrefixes(text: string): string {
// Ignore wrapper labels, timestamps, and sender prefixes so directive-only
// detection still works in group batches that include history/context.
const marker = "[Current message - respond to this]";
const afterMarker = text.includes(marker)
? text.slice(text.indexOf(marker) + marker.length)
: text;
return afterMarker
.replace(/\[[^\]]+\]\s*/g, "")
.replace(/^[ \t]*[A-Za-z0-9+()\-_. ]+:\s*/gm, "")
.replace(/\s+/g, " ")
.trim();
}
export function stripMentions(
text: string,
ctx: MsgContext,
cfg: ClawdisConfig | undefined,
): string {
let result = text;
const patterns = cfg?.routing?.groupChat?.mentionPatterns ?? [];
for (const p of patterns) {
try {
const re = new RegExp(p, "gi");
result = result.replace(re, " ");
} catch {
// ignore invalid regex
}
}
const selfE164 = (ctx.To ?? "").replace(/^whatsapp:/, "");
if (selfE164) {
const esc = selfE164.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
result = result
.replace(new RegExp(esc, "gi"), " ")
.replace(new RegExp(`@${esc}`, "gi"), " ");
}
// Generic mention patterns like @123456789 or plain digits
result = result.replace(/@[0-9+]{5,}/g, " ");
// Discord-style mentions (<@123> or <@!123>)
result = result.replace(/<@!?\d+>/g, " ");
return result.replace(/\s+/g, " ").trim();
}

View File

@@ -0,0 +1,188 @@
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { loadModelCatalog } from "../../agents/model-catalog.js";
import {
buildAllowedModelSet,
type ModelAliasIndex,
modelKey,
resolveModelRefFromString,
resolveThinkingDefault,
} from "../../agents/model-selection.js";
import type { ClawdisConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import type { ThinkLevel } from "./directives.js";
export type ModelDirectiveSelection = {
provider: string;
model: string;
isDefault: boolean;
alias?: string;
};
type ModelCatalog = Awaited<ReturnType<typeof loadModelCatalog>>;
type ModelSelectionState = {
provider: string;
model: string;
allowedModelKeys: Set<string>;
allowedModelCatalog: ModelCatalog;
resetModelOverride: boolean;
resolveDefaultThinkingLevel: () => Promise<ThinkLevel | undefined>;
needsModelCatalog: boolean;
};
export async function createModelSelectionState(params: {
cfg: ClawdisConfig;
agentCfg: ClawdisConfig["agent"] | undefined;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
defaultProvider: string;
defaultModel: string;
provider: string;
model: string;
hasModelDirective: boolean;
}): Promise<ModelSelectionState> {
const {
cfg,
agentCfg,
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultProvider,
} = params;
let provider = params.provider;
let model = params.model;
const hasAllowlist = (agentCfg?.allowedModels?.length ?? 0) > 0;
const hasStoredOverride = Boolean(
sessionEntry?.modelOverride || sessionEntry?.providerOverride,
);
const needsModelCatalog =
params.hasModelDirective || hasAllowlist || hasStoredOverride;
let allowedModelKeys = new Set<string>();
let allowedModelCatalog: ModelCatalog = [];
let modelCatalog: ModelCatalog | null = null;
let resetModelOverride = false;
if (needsModelCatalog) {
modelCatalog = await loadModelCatalog({ config: cfg });
const allowed = buildAllowedModelSet({
cfg,
catalog: modelCatalog,
defaultProvider,
});
allowedModelCatalog = allowed.allowedCatalog;
allowedModelKeys = allowed.allowedKeys;
}
if (sessionEntry && sessionStore && sessionKey && hasStoredOverride) {
const overrideProvider =
sessionEntry.providerOverride?.trim() || defaultProvider;
const overrideModel = sessionEntry.modelOverride?.trim();
if (overrideModel) {
const key = modelKey(overrideProvider, overrideModel);
if (allowedModelKeys.size > 0 && !allowedModelKeys.has(key)) {
delete sessionEntry.providerOverride;
delete sessionEntry.modelOverride;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
resetModelOverride = true;
}
}
}
const storedProviderOverride = sessionEntry?.providerOverride?.trim();
const storedModelOverride = sessionEntry?.modelOverride?.trim();
if (storedModelOverride) {
const candidateProvider = storedProviderOverride || defaultProvider;
const key = modelKey(candidateProvider, storedModelOverride);
if (allowedModelKeys.size === 0 || allowedModelKeys.has(key)) {
provider = candidateProvider;
model = storedModelOverride;
}
}
let defaultThinkingLevel: ThinkLevel | undefined;
const resolveDefaultThinkingLevel = async () => {
if (defaultThinkingLevel) return defaultThinkingLevel;
let catalogForThinking = modelCatalog ?? allowedModelCatalog;
if (!catalogForThinking || catalogForThinking.length === 0) {
modelCatalog = await loadModelCatalog({ config: cfg });
catalogForThinking = modelCatalog;
}
defaultThinkingLevel = resolveThinkingDefault({
cfg,
provider,
model,
catalog: catalogForThinking,
});
return defaultThinkingLevel;
};
return {
provider,
model,
allowedModelKeys,
allowedModelCatalog,
resetModelOverride,
resolveDefaultThinkingLevel,
needsModelCatalog,
};
}
export function resolveModelDirectiveSelection(params: {
raw: string;
defaultProvider: string;
defaultModel: string;
aliasIndex: ModelAliasIndex;
allowedModelKeys: Set<string>;
}): { selection?: ModelDirectiveSelection; error?: string } {
const { raw, defaultProvider, defaultModel, aliasIndex, allowedModelKeys } =
params;
const resolved = resolveModelRefFromString({
raw,
defaultProvider,
aliasIndex,
});
if (!resolved) {
return {
error: `Unrecognized model "${raw}". Use /model to list available models.`,
};
}
const key = modelKey(resolved.ref.provider, resolved.ref.model);
if (allowedModelKeys.size > 0 && !allowedModelKeys.has(key)) {
return {
error: `Model "${resolved.ref.provider}/${resolved.ref.model}" is not allowed. Use /model to list available models.`,
};
}
const isDefault =
resolved.ref.provider === defaultProvider &&
resolved.ref.model === defaultModel;
return {
selection: {
provider: resolved.ref.provider,
model: resolved.ref.model,
isDefault,
alias: resolved.alias,
},
};
}
export function resolveContextTokens(params: {
agentCfg: ClawdisConfig["agent"] | undefined;
model: string;
}): number {
return (
params.agentCfg?.contextTokens ??
lookupContextTokens(params.model) ??
DEFAULT_CONTEXT_TOKENS
);
}

View File

@@ -0,0 +1,473 @@
import type { SkillSnapshot } from "../../agents/skills.js";
import { parseDurationMs } from "../../cli/parse-duration.js";
import type { ClawdisConfig } from "../../config/config.js";
import type { SessionEntry } from "../../config/sessions.js";
import { defaultRuntime } from "../../runtime.js";
import type { ThinkLevel, VerboseLevel } from "./directives.js";
export type QueueMode =
| "steer"
| "followup"
| "collect"
| "steer-backlog"
| "interrupt"
| "queue";
export type QueueDropPolicy = "old" | "new" | "summarize";
export type QueueSettings = {
mode: QueueMode;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
};
export type FollowupRun = {
prompt: string;
summaryLine?: string;
enqueuedAt: number;
run: {
sessionId: string;
sessionKey?: string;
surface?: string;
sessionFile: string;
workspaceDir: string;
config: ClawdisConfig;
skillsSnapshot?: SkillSnapshot;
provider: string;
model: string;
thinkLevel?: ThinkLevel;
verboseLevel?: VerboseLevel;
timeoutMs: number;
blockReplyBreak: "text_end" | "message_end";
ownerNumbers?: string[];
extraSystemPrompt?: string;
enforceFinalTag?: boolean;
};
};
type FollowupQueueState = {
items: FollowupRun[];
draining: boolean;
lastEnqueuedAt: number;
mode: QueueMode;
debounceMs: number;
cap: number;
dropPolicy: QueueDropPolicy;
droppedCount: number;
summaryLines: string[];
lastRun?: FollowupRun["run"];
};
const DEFAULT_QUEUE_DEBOUNCE_MS = 1000;
const DEFAULT_QUEUE_CAP = 20;
const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize";
const FOLLOWUP_QUEUES = new Map<string, FollowupQueueState>();
function normalizeQueueMode(raw?: string): QueueMode | undefined {
if (!raw) return undefined;
const cleaned = raw.trim().toLowerCase();
if (cleaned === "queue" || cleaned === "queued") return "steer";
if (
cleaned === "interrupt" ||
cleaned === "interrupts" ||
cleaned === "abort"
)
return "interrupt";
if (cleaned === "steer" || cleaned === "steering") return "steer";
if (
cleaned === "followup" ||
cleaned === "follow-ups" ||
cleaned === "followups"
)
return "followup";
if (cleaned === "collect" || cleaned === "coalesce") return "collect";
if (
cleaned === "steer+backlog" ||
cleaned === "steer-backlog" ||
cleaned === "steer_backlog"
)
return "steer-backlog";
return undefined;
}
function normalizeQueueDropPolicy(raw?: string): QueueDropPolicy | undefined {
if (!raw) return undefined;
const cleaned = raw.trim().toLowerCase();
if (cleaned === "old" || cleaned === "oldest") return "old";
if (cleaned === "new" || cleaned === "newest") return "new";
if (cleaned === "summarize" || cleaned === "summary") return "summarize";
return undefined;
}
function parseQueueDebounce(raw?: string): number | undefined {
if (!raw) return undefined;
try {
const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" });
if (!parsed || parsed < 0) return undefined;
return Math.round(parsed);
} catch {
return undefined;
}
}
function parseQueueCap(raw?: string): number | undefined {
if (!raw) return undefined;
const num = Number(raw);
if (!Number.isFinite(num)) return undefined;
const cap = Math.floor(num);
if (cap < 1) return undefined;
return cap;
}
function parseQueueDirectiveArgs(raw: string): {
consumed: number;
queueMode?: QueueMode;
queueReset: boolean;
rawMode?: string;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
rawDebounce?: string;
rawCap?: string;
rawDrop?: string;
hasOptions: boolean;
} {
let i = 0;
const len = raw.length;
while (i < len && /\s/.test(raw[i])) i += 1;
if (raw[i] === ":") {
i += 1;
while (i < len && /\s/.test(raw[i])) i += 1;
}
let consumed = i;
let queueMode: QueueMode | undefined;
let queueReset = false;
let rawMode: string | undefined;
let debounceMs: number | undefined;
let cap: number | undefined;
let dropPolicy: QueueDropPolicy | undefined;
let rawDebounce: string | undefined;
let rawCap: string | undefined;
let rawDrop: string | undefined;
let hasOptions = false;
const takeToken = (): string | null => {
if (i >= len) return null;
const start = i;
while (i < len && !/\s/.test(raw[i])) i += 1;
if (start === i) return null;
const token = raw.slice(start, i);
while (i < len && /\s/.test(raw[i])) i += 1;
return token;
};
while (i < len) {
const token = takeToken();
if (!token) break;
const lowered = token.trim().toLowerCase();
if (lowered === "default" || lowered === "reset" || lowered === "clear") {
queueReset = true;
consumed = i;
break;
}
if (lowered.startsWith("debounce:") || lowered.startsWith("debounce=")) {
rawDebounce = token.split(/[:=]/)[1] ?? "";
debounceMs = parseQueueDebounce(rawDebounce);
hasOptions = true;
consumed = i;
continue;
}
if (lowered.startsWith("cap:") || lowered.startsWith("cap=")) {
rawCap = token.split(/[:=]/)[1] ?? "";
cap = parseQueueCap(rawCap);
hasOptions = true;
consumed = i;
continue;
}
if (lowered.startsWith("drop:") || lowered.startsWith("drop=")) {
rawDrop = token.split(/[:=]/)[1] ?? "";
dropPolicy = normalizeQueueDropPolicy(rawDrop);
hasOptions = true;
consumed = i;
continue;
}
const mode = normalizeQueueMode(token);
if (mode) {
queueMode = mode;
rawMode = token;
consumed = i;
continue;
}
// Stop at first unrecognized token.
break;
}
return {
consumed,
queueMode,
queueReset,
rawMode,
debounceMs,
cap,
dropPolicy,
rawDebounce,
rawCap,
rawDrop,
hasOptions,
};
}
export function extractQueueDirective(body?: string): {
cleaned: string;
queueMode?: QueueMode;
queueReset: boolean;
rawMode?: string;
hasDirective: boolean;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
rawDebounce?: string;
rawCap?: string;
rawDrop?: string;
hasOptions: boolean;
} {
if (!body)
return {
cleaned: "",
hasDirective: false,
queueReset: false,
hasOptions: false,
};
const re = /(?:^|\s)\/queue(?=$|\s|:)/i;
const match = re.exec(body);
if (!match) {
return {
cleaned: body.trim(),
hasDirective: false,
queueReset: false,
hasOptions: false,
};
}
const start = match.index + match[0].indexOf("/queue");
const argsStart = start + "/queue".length;
const args = body.slice(argsStart);
const parsed = parseQueueDirectiveArgs(args);
const cleanedRaw =
body.slice(0, start) + body.slice(argsStart + parsed.consumed);
const cleaned = cleanedRaw.replace(/\s+/g, " ").trim();
return {
cleaned,
queueMode: parsed.queueMode,
queueReset: parsed.queueReset,
rawMode: parsed.rawMode,
debounceMs: parsed.debounceMs,
cap: parsed.cap,
dropPolicy: parsed.dropPolicy,
rawDebounce: parsed.rawDebounce,
rawCap: parsed.rawCap,
rawDrop: parsed.rawDrop,
hasDirective: true,
hasOptions: parsed.hasOptions,
};
}
function elideText(text: string, limit = 140): string {
if (text.length <= limit) return text;
return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}`;
}
function buildQueueSummaryLine(run: FollowupRun): string {
const base = run.summaryLine?.trim() || run.prompt.trim();
const cleaned = base.replace(/\s+/g, " ").trim();
return elideText(cleaned, 160);
}
function getFollowupQueue(
key: string,
settings: QueueSettings,
): FollowupQueueState {
const existing = FOLLOWUP_QUEUES.get(key);
if (existing) {
existing.mode = settings.mode;
existing.debounceMs =
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: existing.debounceMs;
existing.cap =
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: existing.cap;
existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy;
return existing;
}
const created: FollowupQueueState = {
items: [],
draining: false,
lastEnqueuedAt: 0,
mode: settings.mode,
debounceMs:
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: DEFAULT_QUEUE_DEBOUNCE_MS,
cap:
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: DEFAULT_QUEUE_CAP,
dropPolicy: settings.dropPolicy ?? DEFAULT_QUEUE_DROP,
droppedCount: 0,
summaryLines: [],
};
FOLLOWUP_QUEUES.set(key, created);
return created;
}
export function enqueueFollowupRun(
key: string,
run: FollowupRun,
settings: QueueSettings,
): boolean {
const queue = getFollowupQueue(key, settings);
queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run;
const cap = queue.cap;
if (cap > 0 && queue.items.length >= cap) {
if (queue.dropPolicy === "new") {
return false;
}
const dropCount = queue.items.length - cap + 1;
const dropped = queue.items.splice(0, dropCount);
if (queue.dropPolicy === "summarize") {
for (const item of dropped) {
queue.droppedCount += 1;
queue.summaryLines.push(buildQueueSummaryLine(item));
}
while (queue.summaryLines.length > cap) queue.summaryLines.shift();
}
}
queue.items.push(run);
return true;
}
async function waitForQueueDebounce(queue: FollowupQueueState): Promise<void> {
const debounceMs = Math.max(0, queue.debounceMs);
if (debounceMs <= 0) return;
while (true) {
const since = Date.now() - queue.lastEnqueuedAt;
if (since >= debounceMs) return;
await new Promise((resolve) => setTimeout(resolve, debounceMs - since));
}
}
function buildSummaryPrompt(queue: FollowupQueueState): string | undefined {
if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) {
return undefined;
}
const lines = [
`[Queue overflow] Dropped ${queue.droppedCount} message${queue.droppedCount === 1 ? "" : "s"} due to cap.`,
];
if (queue.summaryLines.length > 0) {
lines.push("Summary:");
for (const line of queue.summaryLines) {
lines.push(`- ${line}`);
}
}
queue.droppedCount = 0;
queue.summaryLines = [];
return lines.join("\n");
}
function buildCollectPrompt(items: FollowupRun[], summary?: string): string {
const blocks: string[] = ["[Queued messages while agent was busy]"];
if (summary) {
blocks.push(summary);
}
items.forEach((item, idx) => {
blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim());
});
return blocks.join("\n\n");
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
): void {
const queue = FOLLOWUP_QUEUES.get(key);
if (!queue || queue.draining) return;
queue.draining = true;
void (async () => {
try {
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForQueueDebounce(queue);
if (queue.mode === "collect") {
const items = queue.items.splice(0, queue.items.length);
const summary = buildSummaryPrompt(queue);
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) break;
const prompt = buildCollectPrompt(items, summary);
await runFollowup({
prompt,
run,
enqueuedAt: Date.now(),
});
continue;
}
const summaryPrompt = buildSummaryPrompt(queue);
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) break;
await runFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
});
continue;
}
const next = queue.items.shift();
if (!next) break;
await runFollowup(next);
}
} catch (err) {
defaultRuntime.error?.(
`followup queue drain failed for ${key}: ${String(err)}`,
);
} finally {
queue.draining = false;
if (queue.items.length === 0 && queue.droppedCount === 0) {
FOLLOWUP_QUEUES.delete(key);
} else {
scheduleFollowupDrain(key, runFollowup);
}
}
})();
}
function defaultQueueModeForSurface(surface?: string): QueueMode {
const normalized = surface?.trim().toLowerCase();
if (normalized === "discord") return "collect";
if (normalized === "webchat") return "collect";
if (normalized === "whatsapp") return "collect";
if (normalized === "telegram") return "collect";
if (normalized === "imessage") return "collect";
if (normalized === "signal") return "collect";
return "collect";
}
export function resolveQueueSettings(params: {
cfg: ClawdisConfig;
surface?: string;
sessionEntry?: SessionEntry;
inlineMode?: QueueMode;
inlineOptions?: Partial<QueueSettings>;
}): QueueSettings {
const surfaceKey = params.surface?.trim().toLowerCase();
const queueCfg = params.cfg.routing?.queue;
const surfaceModeRaw =
surfaceKey && queueCfg?.bySurface
? (queueCfg.bySurface as Record<string, string | undefined>)[surfaceKey]
: undefined;
const resolvedMode =
params.inlineMode ??
normalizeQueueMode(params.sessionEntry?.queueMode) ??
normalizeQueueMode(surfaceModeRaw) ??
normalizeQueueMode(queueCfg?.mode) ??
defaultQueueModeForSurface(surfaceKey);
const debounceRaw =
params.inlineOptions?.debounceMs ??
params.sessionEntry?.queueDebounceMs ??
queueCfg?.debounceMs ??
DEFAULT_QUEUE_DEBOUNCE_MS;
const capRaw =
params.inlineOptions?.cap ??
params.sessionEntry?.queueCap ??
queueCfg?.cap ??
DEFAULT_QUEUE_CAP;
const dropRaw =
params.inlineOptions?.dropPolicy ??
params.sessionEntry?.queueDrop ??
normalizeQueueDropPolicy(queueCfg?.drop) ??
DEFAULT_QUEUE_DROP;
return {
mode: resolvedMode,
debounceMs:
typeof debounceRaw === "number" ? Math.max(0, debounceRaw) : undefined,
cap:
typeof capRaw === "number" ? Math.max(1, Math.floor(capRaw)) : undefined,
dropPolicy: dropRaw,
};
}

View File

@@ -0,0 +1,35 @@
export function extractReplyToTag(
text?: string,
currentMessageId?: string,
): {
cleaned: string;
replyToId?: string;
hasTag: boolean;
} {
if (!text) return { cleaned: "", hasTag: false };
let cleaned = text;
let replyToId: string | undefined;
let hasTag = false;
const currentMatch = cleaned.match(/\[\[reply_to_current\]\]/i);
if (currentMatch) {
cleaned = cleaned.replace(/\[\[reply_to_current\]\]/gi, " ");
hasTag = true;
if (currentMessageId?.trim()) {
replyToId = currentMessageId.trim();
}
}
const idMatch = cleaned.match(/\[\[reply_to:([^\]\n]+)\]\]/i);
if (idMatch?.[1]) {
cleaned = cleaned.replace(/\[\[reply_to:[^\]\n]+\]\]/gi, " ");
replyToId = idMatch[1].trim();
hasTag = true;
}
cleaned = cleaned
.replace(/[ \t]+/g, " ")
.replace(/[ \t]*\n[ \t]*/g, "\n")
.trim();
return { cleaned, replyToId, hasTag };
}

View File

@@ -0,0 +1,125 @@
import crypto from "node:crypto";
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
import type { ClawdisConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import { buildProviderSummary } from "../../infra/provider-summary.js";
import { drainSystemEvents } from "../../infra/system-events.js";
export async function prependSystemEvents(params: {
cfg: ClawdisConfig;
isMainSession: boolean;
isNewSession: boolean;
prefixedBodyBase: string;
}): Promise<string> {
if (!params.isMainSession) return params.prefixedBodyBase;
const compactSystemEvent = (line: string): string | null => {
const trimmed = line.trim();
if (!trimmed) return null;
const lower = trimmed.toLowerCase();
if (lower.includes("reason periodic")) return null;
if (lower.includes("heartbeat")) return null;
if (trimmed.startsWith("Node:")) {
return trimmed.replace(/ · last input [^·]+/i, "").trim();
}
return trimmed;
};
const systemLines: string[] = [];
const queued = drainSystemEvents();
systemLines.push(
...queued.map(compactSystemEvent).filter((v): v is string => Boolean(v)),
);
if (params.isNewSession) {
const summary = await buildProviderSummary(params.cfg);
if (summary.length > 0) systemLines.unshift(...summary);
}
if (systemLines.length === 0) return params.prefixedBodyBase;
const block = systemLines.map((l) => `System: ${l}`).join("\n");
return `${block}\n\n${params.prefixedBodyBase}`;
}
export async function ensureSkillSnapshot(params: {
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
sessionId?: string;
isFirstTurnInSession: boolean;
workspaceDir: string;
cfg: ClawdisConfig;
}): Promise<{
sessionEntry?: SessionEntry;
skillsSnapshot?: SessionEntry["skillsSnapshot"];
systemSent: boolean;
}> {
const {
sessionEntry,
sessionStore,
sessionKey,
storePath,
sessionId,
isFirstTurnInSession,
workspaceDir,
cfg,
} = params;
let nextEntry = sessionEntry;
let systemSent = sessionEntry?.systemSent ?? false;
if (isFirstTurnInSession && sessionStore && sessionKey) {
const current = nextEntry ??
sessionStore[sessionKey] ?? {
sessionId: sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
};
const skillSnapshot =
isFirstTurnInSession || !current.skillsSnapshot
? buildWorkspaceSkillSnapshot(workspaceDir, { config: cfg })
: current.skillsSnapshot;
nextEntry = {
...current,
sessionId: sessionId ?? current.sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
systemSent: true,
skillsSnapshot: skillSnapshot,
};
sessionStore[sessionKey] = nextEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
systemSent = true;
}
const skillsSnapshot =
nextEntry?.skillsSnapshot ??
(isFirstTurnInSession
? undefined
: buildWorkspaceSkillSnapshot(workspaceDir, { config: cfg }));
if (
skillsSnapshot &&
sessionStore &&
sessionKey &&
!isFirstTurnInSession &&
!nextEntry?.skillsSnapshot
) {
const current = nextEntry ?? {
sessionId: sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
};
nextEntry = {
...current,
sessionId: sessionId ?? current.sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
skillsSnapshot,
};
sessionStore[sessionKey] = nextEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
return { sessionEntry: nextEntry, skillsSnapshot, systemSent };
}

View File

@@ -0,0 +1,207 @@
import crypto from "node:crypto";
import type { ClawdisConfig } from "../../config/config.js";
import {
buildGroupDisplayName,
DEFAULT_IDLE_MINUTES,
DEFAULT_RESET_TRIGGERS,
type GroupKeyResolution,
loadSessionStore,
resolveGroupSessionKey,
resolveSessionKey,
resolveStorePath,
type SessionEntry,
saveSessionStore,
} from "../../config/sessions.js";
import type { MsgContext, TemplateContext } from "../templating.js";
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
export type SessionInitResult = {
sessionCtx: TemplateContext;
sessionEntry: SessionEntry;
sessionStore: Record<string, SessionEntry>;
sessionKey: string;
sessionId: string;
isNewSession: boolean;
systemSent: boolean;
abortedLastRun: boolean;
storePath: string;
sessionScope: string;
groupResolution?: GroupKeyResolution;
isGroup: boolean;
bodyStripped?: string;
triggerBodyNormalized: string;
};
export async function initSessionState(params: {
ctx: MsgContext;
cfg: ClawdisConfig;
}): Promise<SessionInitResult> {
const { ctx, cfg } = params;
const sessionCfg = cfg.session;
const mainKey = sessionCfg?.mainKey ?? "main";
const resetTriggers = sessionCfg?.resetTriggers?.length
? sessionCfg.resetTriggers
: DEFAULT_RESET_TRIGGERS;
const idleMinutes = Math.max(
sessionCfg?.idleMinutes ?? DEFAULT_IDLE_MINUTES,
1,
);
const sessionScope = sessionCfg?.scope ?? "per-sender";
const storePath = resolveStorePath(sessionCfg?.store);
const sessionStore: Record<string, SessionEntry> =
loadSessionStore(storePath);
let sessionKey: string | undefined;
let sessionEntry: SessionEntry | undefined;
let sessionId: string | undefined;
let isNewSession = false;
let bodyStripped: string | undefined;
let systemSent = false;
let abortedLastRun = false;
let persistedThinking: string | undefined;
let persistedVerbose: string | undefined;
let persistedModelOverride: string | undefined;
let persistedProviderOverride: string | undefined;
const groupResolution = resolveGroupSessionKey(ctx);
const isGroup =
ctx.ChatType?.trim().toLowerCase() === "group" || Boolean(groupResolution);
const triggerBodyNormalized = stripStructuralPrefixes(ctx.Body ?? "")
.trim()
.toLowerCase();
const rawBody = ctx.Body ?? "";
const trimmedBody = rawBody.trim();
// Timestamp/message prefixes (e.g. "[Dec 4 17:35] ") are added by the
// web inbox before we get here. They prevented reset triggers like "/new"
// from matching, so strip structural wrappers when checking for resets.
const strippedForReset = isGroup
? stripMentions(triggerBodyNormalized, ctx, cfg)
: triggerBodyNormalized;
for (const trigger of resetTriggers) {
if (!trigger) continue;
if (trimmedBody === trigger || strippedForReset === trigger) {
isNewSession = true;
bodyStripped = "";
break;
}
const triggerPrefix = `${trigger} `;
if (
trimmedBody.startsWith(triggerPrefix) ||
strippedForReset.startsWith(triggerPrefix)
) {
isNewSession = true;
bodyStripped = strippedForReset.slice(trigger.length).trimStart();
break;
}
}
sessionKey = resolveSessionKey(sessionScope, ctx, mainKey);
if (groupResolution?.legacyKey && groupResolution.legacyKey !== sessionKey) {
const legacyEntry = sessionStore[groupResolution.legacyKey];
if (legacyEntry && !sessionStore[sessionKey]) {
sessionStore[sessionKey] = legacyEntry;
delete sessionStore[groupResolution.legacyKey];
}
}
const entry = sessionStore[sessionKey];
const idleMs = idleMinutes * 60_000;
const freshEntry = entry && Date.now() - entry.updatedAt <= idleMs;
if (!isNewSession && freshEntry) {
sessionId = entry.sessionId;
systemSent = entry.systemSent ?? false;
abortedLastRun = entry.abortedLastRun ?? false;
persistedThinking = entry.thinkingLevel;
persistedVerbose = entry.verboseLevel;
persistedModelOverride = entry.modelOverride;
persistedProviderOverride = entry.providerOverride;
} else {
sessionId = crypto.randomUUID();
isNewSession = true;
systemSent = false;
abortedLastRun = false;
}
const baseEntry = !isNewSession && freshEntry ? entry : undefined;
sessionEntry = {
...baseEntry,
sessionId,
updatedAt: Date.now(),
systemSent,
abortedLastRun,
// Persist previously stored thinking/verbose levels when present.
thinkingLevel: persistedThinking ?? baseEntry?.thinkingLevel,
verboseLevel: persistedVerbose ?? baseEntry?.verboseLevel,
modelOverride: persistedModelOverride ?? baseEntry?.modelOverride,
providerOverride: persistedProviderOverride ?? baseEntry?.providerOverride,
sendPolicy: baseEntry?.sendPolicy,
queueMode: baseEntry?.queueMode,
queueDebounceMs: baseEntry?.queueDebounceMs,
queueCap: baseEntry?.queueCap,
queueDrop: baseEntry?.queueDrop,
displayName: baseEntry?.displayName,
chatType: baseEntry?.chatType,
surface: baseEntry?.surface,
subject: baseEntry?.subject,
room: baseEntry?.room,
space: baseEntry?.space,
};
if (groupResolution?.surface) {
const surface = groupResolution.surface;
const subject = ctx.GroupSubject?.trim();
const space = ctx.GroupSpace?.trim();
const explicitRoom = ctx.GroupRoom?.trim();
const isRoomSurface = surface === "discord" || surface === "slack";
const nextRoom =
explicitRoom ??
(isRoomSurface && subject && subject.startsWith("#")
? subject
: undefined);
const nextSubject = nextRoom ? undefined : subject;
sessionEntry.chatType = groupResolution.chatType ?? "group";
sessionEntry.surface = surface;
if (nextSubject) sessionEntry.subject = nextSubject;
if (nextRoom) sessionEntry.room = nextRoom;
if (space) sessionEntry.space = space;
sessionEntry.displayName = buildGroupDisplayName({
surface: sessionEntry.surface,
subject: sessionEntry.subject,
room: sessionEntry.room,
space: sessionEntry.space,
id: groupResolution.id,
key: sessionKey,
});
} else if (!sessionEntry.chatType) {
sessionEntry.chatType = "direct";
}
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
const sessionCtx: TemplateContext = {
...ctx,
BodyStripped: bodyStripped ?? ctx.Body,
SessionId: sessionId,
IsNewSession: isNewSession ? "true" : "false",
};
return {
sessionCtx,
sessionEntry,
sessionStore,
sessionKey,
sessionId: sessionId ?? crypto.randomUUID(),
isNewSession,
systemSent,
abortedLastRun,
storePath,
sessionScope,
groupResolution,
isGroup,
bodyStripped,
triggerBodyNormalized,
};
}

View File

@@ -0,0 +1,95 @@
export type TypingController = {
onReplyStart: () => Promise<void>;
startTypingLoop: () => Promise<void>;
startTypingOnText: (text?: string) => Promise<void>;
refreshTypingTtl: () => void;
cleanup: () => void;
};
export function createTypingController(params: {
onReplyStart?: () => Promise<void> | void;
typingIntervalSeconds?: number;
typingTtlMs?: number;
silentToken?: string;
log?: (message: string) => void;
}): TypingController {
const {
onReplyStart,
typingIntervalSeconds = 6,
typingTtlMs = 2 * 60_000,
silentToken,
log,
} = params;
let started = false;
let typingTimer: NodeJS.Timeout | undefined;
let typingTtlTimer: NodeJS.Timeout | undefined;
const typingIntervalMs = typingIntervalSeconds * 1000;
const formatTypingTtl = (ms: number) => {
if (ms % 60_000 === 0) return `${ms / 60_000}m`;
return `${Math.round(ms / 1000)}s`;
};
const cleanup = () => {
if (typingTtlTimer) {
clearTimeout(typingTtlTimer);
typingTtlTimer = undefined;
}
if (typingTimer) {
clearInterval(typingTimer);
typingTimer = undefined;
}
};
const refreshTypingTtl = () => {
if (!typingIntervalMs || typingIntervalMs <= 0) return;
if (typingTtlMs <= 0) return;
if (typingTtlTimer) {
clearTimeout(typingTtlTimer);
}
typingTtlTimer = setTimeout(() => {
if (!typingTimer) return;
log?.(
`typing TTL reached (${formatTypingTtl(typingTtlMs)}); stopping typing indicator`,
);
cleanup();
}, typingTtlMs);
};
const triggerTyping = async () => {
await onReplyStart?.();
};
const ensureStart = async () => {
if (started) return;
started = true;
await triggerTyping();
};
const startTypingLoop = async () => {
if (!onReplyStart) return;
if (typingIntervalMs <= 0) return;
if (typingTimer) return;
await ensureStart();
refreshTypingTtl();
typingTimer = setInterval(() => {
void triggerTyping();
}, typingIntervalMs);
};
const startTypingOnText = async (text?: string) => {
const trimmed = text?.trim();
if (!trimmed) return;
if (silentToken && trimmed === silentToken) return;
refreshTypingTtl();
await startTypingLoop();
};
return {
onReplyStart: ensureStart,
startTypingLoop,
startTypingOnText,
refreshTypingTtl,
cleanup,
};
}