Move mattermost channel implementation from core to extensions/mattermost plugin. Extract config schema, group mentions, normalize utilities, and all mattermost-specific logic (accounts, client, monitor, probe, send) into the extension. Update imports to use plugin SDK and local modules. Add channel metadata directly in plugin definition instead of using getChatChannelMeta. Update package.json with channel and install configuration.
423 lines
14 KiB
TypeScript
423 lines
14 KiB
TypeScript
import crypto from "node:crypto";
|
|
import {
|
|
abortEmbeddedPiRun,
|
|
isEmbeddedPiRunActive,
|
|
isEmbeddedPiRunStreaming,
|
|
resolveEmbeddedSessionLane,
|
|
} from "../../agents/pi-embedded.js";
|
|
import { resolveSessionAuthProfileOverride } from "../../agents/auth-profiles/session-override.js";
|
|
import type { ExecToolDefaults } from "../../agents/bash-tools.js";
|
|
import type { ClawdbotConfig } from "../../config/config.js";
|
|
import {
|
|
resolveSessionFilePath,
|
|
type SessionEntry,
|
|
updateSessionStore,
|
|
} from "../../config/sessions.js";
|
|
import { logVerbose } from "../../globals.js";
|
|
import { clearCommandLane, getQueueSize } from "../../process/command-queue.js";
|
|
import { normalizeMainKey } from "../../routing/session-key.js";
|
|
import { isReasoningTagProvider } from "../../utils/provider-utils.js";
|
|
import { hasControlCommand } from "../command-detection.js";
|
|
import { buildInboundMediaNote } from "../media-note.js";
|
|
import type { MsgContext, TemplateContext } from "../templating.js";
|
|
import {
|
|
type ElevatedLevel,
|
|
formatXHighModelHint,
|
|
normalizeThinkLevel,
|
|
type ReasoningLevel,
|
|
supportsXHighThinking,
|
|
type ThinkLevel,
|
|
type VerboseLevel,
|
|
} from "../thinking.js";
|
|
import { SILENT_REPLY_TOKEN } from "../tokens.js";
|
|
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
|
import { runReplyAgent } from "./agent-runner.js";
|
|
import { applySessionHints } from "./body.js";
|
|
import { routeReply } from "./route-reply.js";
|
|
import type { buildCommandContext } from "./commands.js";
|
|
import type { InlineDirectives } from "./directive-handling.js";
|
|
import { buildGroupIntro } from "./groups.js";
|
|
import type { createModelSelectionState } from "./model-selection.js";
|
|
import { resolveQueueSettings } from "./queue.js";
|
|
import { ensureSkillSnapshot, prependSystemEvents } from "./session-updates.js";
|
|
import type { TypingController } from "./typing.js";
|
|
import { resolveTypingMode } from "./typing-mode.js";
|
|
|
|
type AgentDefaults = NonNullable<ClawdbotConfig["agents"]>["defaults"];
|
|
type ExecOverrides = Pick<ExecToolDefaults, "host" | "security" | "ask" | "node">;
|
|
|
|
const BARE_SESSION_RESET_PROMPT =
|
|
"A new session was started via /new or /reset. Say hi briefly (1-2 sentences) and ask what the user wants to do next. If the runtime model differs from default_model in the system prompt, mention the default model in the greeting. Do not mention internal steps, files, tools, or reasoning.";
|
|
|
|
type RunPreparedReplyParams = {
|
|
ctx: MsgContext;
|
|
sessionCtx: TemplateContext;
|
|
cfg: ClawdbotConfig;
|
|
agentId: string;
|
|
agentDir: string;
|
|
agentCfg: AgentDefaults;
|
|
sessionCfg: ClawdbotConfig["session"];
|
|
commandAuthorized: boolean;
|
|
command: ReturnType<typeof buildCommandContext>;
|
|
commandSource: string;
|
|
allowTextCommands: boolean;
|
|
directives: InlineDirectives;
|
|
defaultActivation: Parameters<typeof buildGroupIntro>[0]["defaultActivation"];
|
|
resolvedThinkLevel: ThinkLevel | undefined;
|
|
resolvedVerboseLevel: VerboseLevel | undefined;
|
|
resolvedReasoningLevel: ReasoningLevel;
|
|
resolvedElevatedLevel: ElevatedLevel;
|
|
execOverrides?: ExecOverrides;
|
|
elevatedEnabled: boolean;
|
|
elevatedAllowed: boolean;
|
|
blockStreamingEnabled: boolean;
|
|
blockReplyChunking?: {
|
|
minChars: number;
|
|
maxChars: number;
|
|
breakPreference: "paragraph" | "newline" | "sentence";
|
|
};
|
|
resolvedBlockStreamingBreak: "text_end" | "message_end";
|
|
modelState: Awaited<ReturnType<typeof createModelSelectionState>>;
|
|
provider: string;
|
|
model: string;
|
|
perMessageQueueMode?: InlineDirectives["queueMode"];
|
|
perMessageQueueOptions?: {
|
|
debounceMs?: number;
|
|
cap?: number;
|
|
dropPolicy?: InlineDirectives["dropPolicy"];
|
|
};
|
|
typing: TypingController;
|
|
opts?: GetReplyOptions;
|
|
defaultProvider: string;
|
|
defaultModel: string;
|
|
timeoutMs: number;
|
|
isNewSession: boolean;
|
|
resetTriggered: boolean;
|
|
systemSent: boolean;
|
|
sessionEntry?: SessionEntry;
|
|
sessionStore?: Record<string, SessionEntry>;
|
|
sessionKey: string;
|
|
sessionId?: string;
|
|
storePath?: string;
|
|
workspaceDir: string;
|
|
abortedLastRun: boolean;
|
|
};
|
|
|
|
export async function runPreparedReply(
|
|
params: RunPreparedReplyParams,
|
|
): Promise<ReplyPayload | ReplyPayload[] | undefined> {
|
|
const {
|
|
ctx,
|
|
sessionCtx,
|
|
cfg,
|
|
agentId,
|
|
agentDir,
|
|
agentCfg,
|
|
sessionCfg,
|
|
commandAuthorized,
|
|
command,
|
|
commandSource,
|
|
allowTextCommands,
|
|
directives,
|
|
defaultActivation,
|
|
elevatedEnabled,
|
|
elevatedAllowed,
|
|
blockStreamingEnabled,
|
|
blockReplyChunking,
|
|
resolvedBlockStreamingBreak,
|
|
modelState,
|
|
provider,
|
|
model,
|
|
perMessageQueueMode,
|
|
perMessageQueueOptions,
|
|
typing,
|
|
opts,
|
|
defaultProvider,
|
|
defaultModel,
|
|
timeoutMs,
|
|
isNewSession,
|
|
resetTriggered,
|
|
systemSent,
|
|
sessionKey,
|
|
sessionId,
|
|
storePath,
|
|
workspaceDir,
|
|
sessionStore,
|
|
} = params;
|
|
let {
|
|
sessionEntry,
|
|
resolvedThinkLevel,
|
|
resolvedVerboseLevel,
|
|
resolvedReasoningLevel,
|
|
resolvedElevatedLevel,
|
|
execOverrides,
|
|
abortedLastRun,
|
|
} = params;
|
|
let currentSystemSent = systemSent;
|
|
|
|
const isFirstTurnInSession = isNewSession || !currentSystemSent;
|
|
const isGroupChat = sessionCtx.ChatType === "group";
|
|
const originatingChannel =
|
|
(ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider)?.toString().toLowerCase() ?? "";
|
|
const wasMentioned = ctx.WasMentioned === true;
|
|
const isHeartbeat = opts?.isHeartbeat === true;
|
|
const typingMode = resolveTypingMode({
|
|
configured: sessionCfg?.typingMode ?? agentCfg?.typingMode,
|
|
isGroupChat,
|
|
wasMentioned,
|
|
isHeartbeat,
|
|
});
|
|
const shouldInjectGroupIntro = Boolean(
|
|
isGroupChat && (isFirstTurnInSession || sessionEntry?.groupActivationNeedsSystemIntro),
|
|
);
|
|
const groupIntro = shouldInjectGroupIntro
|
|
? buildGroupIntro({
|
|
cfg,
|
|
sessionCtx,
|
|
sessionEntry,
|
|
defaultActivation,
|
|
silentToken: SILENT_REPLY_TOKEN,
|
|
})
|
|
: "";
|
|
const groupSystemPrompt = sessionCtx.GroupSystemPrompt?.trim() ?? "";
|
|
const extraSystemPrompt = [groupIntro, groupSystemPrompt].filter(Boolean).join("\n\n");
|
|
const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? "";
|
|
// Use CommandBody/RawBody for bare reset detection (clean message without structural context).
|
|
const rawBodyTrimmed = (ctx.CommandBody ?? ctx.RawBody ?? ctx.Body ?? "").trim();
|
|
const baseBodyTrimmedRaw = baseBody.trim();
|
|
if (
|
|
allowTextCommands &&
|
|
(!commandAuthorized || !command.isAuthorizedSender) &&
|
|
!baseBodyTrimmedRaw &&
|
|
hasControlCommand(commandSource, cfg)
|
|
) {
|
|
typing.cleanup();
|
|
return undefined;
|
|
}
|
|
const isBareNewOrReset = rawBodyTrimmed === "/new" || rawBodyTrimmed === "/reset";
|
|
const isBareSessionReset =
|
|
isNewSession &&
|
|
((baseBodyTrimmedRaw.length === 0 && rawBodyTrimmed.length > 0) || isBareNewOrReset);
|
|
const baseBodyFinal = isBareSessionReset ? BARE_SESSION_RESET_PROMPT : baseBody;
|
|
const baseBodyTrimmed = baseBodyFinal.trim();
|
|
if (!baseBodyTrimmed) {
|
|
await typing.onReplyStart();
|
|
logVerbose("Inbound body empty after normalization; skipping agent run");
|
|
typing.cleanup();
|
|
return {
|
|
text: "I didn't receive any text in your message. Please resend or add a caption.",
|
|
};
|
|
}
|
|
let prefixedBodyBase = await applySessionHints({
|
|
baseBody: baseBodyFinal,
|
|
abortedLastRun,
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
abortKey: command.abortKey,
|
|
messageId: sessionCtx.MessageSid,
|
|
});
|
|
const isGroupSession = sessionEntry?.chatType === "group" || sessionEntry?.chatType === "channel";
|
|
const isMainSession = !isGroupSession && sessionKey === normalizeMainKey(sessionCfg?.mainKey);
|
|
prefixedBodyBase = await prependSystemEvents({
|
|
cfg,
|
|
sessionKey,
|
|
isMainSession,
|
|
isNewSession,
|
|
prefixedBodyBase,
|
|
});
|
|
const threadStarterBody = ctx.ThreadStarterBody?.trim();
|
|
const threadStarterNote =
|
|
isNewSession && threadStarterBody
|
|
? `[Thread starter - for context]\n${threadStarterBody}`
|
|
: undefined;
|
|
const skillResult = await ensureSkillSnapshot({
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
sessionId,
|
|
isFirstTurnInSession,
|
|
workspaceDir,
|
|
cfg,
|
|
skillFilter: opts?.skillFilter,
|
|
});
|
|
sessionEntry = skillResult.sessionEntry ?? sessionEntry;
|
|
currentSystemSent = skillResult.systemSent;
|
|
const skillsSnapshot = skillResult.skillsSnapshot;
|
|
const prefixedBody = [threadStarterNote, prefixedBodyBase].filter(Boolean).join("\n\n");
|
|
const mediaNote = buildInboundMediaNote(ctx);
|
|
const mediaReplyHint = mediaNote
|
|
? "To send an image back, prefer the message tool (media/path/filePath). If you must inline, use MEDIA:/path or MEDIA:https://example.com/image.jpg (spaces ok, quote if needed). Keep caption in the text body."
|
|
: undefined;
|
|
let prefixedCommandBody = mediaNote
|
|
? [mediaNote, mediaReplyHint, prefixedBody ?? ""].filter(Boolean).join("\n").trim()
|
|
: prefixedBody;
|
|
if (!resolvedThinkLevel && prefixedCommandBody) {
|
|
const parts = prefixedCommandBody.split(/\s+/);
|
|
const maybeLevel = normalizeThinkLevel(parts[0]);
|
|
if (maybeLevel && (maybeLevel !== "xhigh" || supportsXHighThinking(provider, model))) {
|
|
resolvedThinkLevel = maybeLevel;
|
|
prefixedCommandBody = parts.slice(1).join(" ").trim();
|
|
}
|
|
}
|
|
if (!resolvedThinkLevel) {
|
|
resolvedThinkLevel = await modelState.resolveDefaultThinkingLevel();
|
|
}
|
|
if (resolvedThinkLevel === "xhigh" && !supportsXHighThinking(provider, model)) {
|
|
const explicitThink = directives.hasThinkDirective && directives.thinkLevel !== undefined;
|
|
if (explicitThink) {
|
|
typing.cleanup();
|
|
return {
|
|
text: `Thinking level "xhigh" is only supported for ${formatXHighModelHint()}. Use /think high or switch to one of those models.`,
|
|
};
|
|
}
|
|
resolvedThinkLevel = "high";
|
|
if (sessionEntry && sessionStore && sessionKey && sessionEntry.thinkingLevel === "xhigh") {
|
|
sessionEntry.thinkingLevel = "high";
|
|
sessionEntry.updatedAt = Date.now();
|
|
sessionStore[sessionKey] = sessionEntry;
|
|
if (storePath) {
|
|
await updateSessionStore(storePath, (store) => {
|
|
store[sessionKey] = sessionEntry;
|
|
});
|
|
}
|
|
}
|
|
}
|
|
if (resetTriggered && command.isAuthorizedSender) {
|
|
const channel = ctx.OriginatingChannel || (command.channel as any);
|
|
const to = ctx.OriginatingTo || command.from || command.to;
|
|
if (channel && to) {
|
|
const modelLabel = `${provider}/${model}`;
|
|
const defaultLabel = `${defaultProvider}/${defaultModel}`;
|
|
const text =
|
|
modelLabel === defaultLabel
|
|
? `✅ New session started · model: ${modelLabel}`
|
|
: `✅ New session started · model: ${modelLabel} (default: ${defaultLabel})`;
|
|
await routeReply({
|
|
payload: { text },
|
|
channel,
|
|
to,
|
|
sessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: ctx.MessageThreadId,
|
|
cfg,
|
|
});
|
|
}
|
|
}
|
|
const sessionIdFinal = sessionId ?? crypto.randomUUID();
|
|
const sessionFile = resolveSessionFilePath(sessionIdFinal, sessionEntry);
|
|
const queueBodyBase = [threadStarterNote, baseBodyFinal].filter(Boolean).join("\n\n");
|
|
const queueMessageId = sessionCtx.MessageSid?.trim();
|
|
const queueMessageIdHint = queueMessageId ? `[message_id: ${queueMessageId}]` : "";
|
|
const queueBodyWithId = queueMessageIdHint
|
|
? `${queueBodyBase}\n${queueMessageIdHint}`
|
|
: queueBodyBase;
|
|
const queuedBody = mediaNote
|
|
? [mediaNote, mediaReplyHint, queueBodyWithId].filter(Boolean).join("\n").trim()
|
|
: queueBodyWithId;
|
|
const resolvedQueue = resolveQueueSettings({
|
|
cfg,
|
|
channel: sessionCtx.Provider,
|
|
sessionEntry,
|
|
inlineMode: perMessageQueueMode,
|
|
inlineOptions: perMessageQueueOptions,
|
|
});
|
|
const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey ?? sessionIdFinal);
|
|
const laneSize = getQueueSize(sessionLaneKey);
|
|
if (resolvedQueue.mode === "interrupt" && laneSize > 0) {
|
|
const cleared = clearCommandLane(sessionLaneKey);
|
|
const aborted = abortEmbeddedPiRun(sessionIdFinal);
|
|
logVerbose(`Interrupting ${sessionLaneKey} (cleared ${cleared}, aborted=${aborted})`);
|
|
}
|
|
const queueKey = sessionKey ?? sessionIdFinal;
|
|
const isActive = isEmbeddedPiRunActive(sessionIdFinal);
|
|
const isStreaming = isEmbeddedPiRunStreaming(sessionIdFinal);
|
|
const shouldSteer = resolvedQueue.mode === "steer" || resolvedQueue.mode === "steer-backlog";
|
|
const shouldFollowup =
|
|
resolvedQueue.mode === "followup" ||
|
|
resolvedQueue.mode === "collect" ||
|
|
resolvedQueue.mode === "steer-backlog";
|
|
const authProfileId = await resolveSessionAuthProfileOverride({
|
|
cfg,
|
|
provider,
|
|
agentDir,
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
isNewSession,
|
|
});
|
|
const authProfileIdSource = sessionEntry?.authProfileOverrideSource;
|
|
const followupRun = {
|
|
prompt: queuedBody,
|
|
messageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,
|
|
summaryLine: baseBodyTrimmedRaw,
|
|
enqueuedAt: Date.now(),
|
|
// Originating channel for reply routing.
|
|
originatingChannel: ctx.OriginatingChannel,
|
|
originatingTo: ctx.OriginatingTo,
|
|
originatingAccountId: ctx.AccountId,
|
|
originatingThreadId: ctx.MessageThreadId,
|
|
run: {
|
|
agentId,
|
|
agentDir,
|
|
sessionId: sessionIdFinal,
|
|
sessionKey,
|
|
messageProvider: sessionCtx.Provider?.trim().toLowerCase() || undefined,
|
|
agentAccountId: sessionCtx.AccountId,
|
|
sessionFile,
|
|
workspaceDir,
|
|
config: cfg,
|
|
skillsSnapshot,
|
|
provider,
|
|
model,
|
|
authProfileId,
|
|
authProfileIdSource,
|
|
thinkLevel: resolvedThinkLevel,
|
|
verboseLevel: resolvedVerboseLevel,
|
|
reasoningLevel: resolvedReasoningLevel,
|
|
elevatedLevel: resolvedElevatedLevel,
|
|
execOverrides,
|
|
bashElevated: {
|
|
enabled: elevatedEnabled,
|
|
allowed: elevatedAllowed,
|
|
defaultLevel: resolvedElevatedLevel ?? "off",
|
|
},
|
|
timeoutMs,
|
|
blockReplyBreak: resolvedBlockStreamingBreak,
|
|
ownerNumbers: command.ownerList.length > 0 ? command.ownerList : undefined,
|
|
extraSystemPrompt: extraSystemPrompt || undefined,
|
|
...(isReasoningTagProvider(provider) ? { enforceFinalTag: true } : {}),
|
|
},
|
|
};
|
|
|
|
return runReplyAgent({
|
|
commandBody: prefixedCommandBody,
|
|
followupRun,
|
|
queueKey,
|
|
resolvedQueue,
|
|
shouldSteer,
|
|
shouldFollowup,
|
|
isActive,
|
|
isStreaming,
|
|
opts,
|
|
typing,
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
defaultModel,
|
|
agentCfgContextTokens: agentCfg?.contextTokens,
|
|
resolvedVerboseLevel: resolvedVerboseLevel ?? "off",
|
|
isNewSession,
|
|
blockStreamingEnabled,
|
|
blockReplyChunking,
|
|
resolvedBlockStreamingBreak,
|
|
sessionCtx,
|
|
shouldInjectGroupIntro,
|
|
typingMode,
|
|
});
|
|
}
|