225 lines
8.8 KiB
TypeScript
225 lines
8.8 KiB
TypeScript
import { Type } from "@sinclair/typebox";
|
|
import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js";
|
|
import { loadConfig } from "../../config/config.js";
|
|
import { truncateUtf16Safe } from "../../utils.js";
|
|
import { optionalStringEnum, stringEnum } from "../schema/typebox.js";
|
|
import { type AnyAgentTool, jsonResult, readStringParam } from "./common.js";
|
|
import { callGatewayTool, type GatewayCallOptions } from "./gateway.js";
|
|
import { resolveInternalSessionKey, resolveMainSessionAlias } from "./sessions-helpers.js";
|
|
|
|
// NOTE: We use Type.Object({}, { additionalProperties: true }) for job/patch
|
|
// instead of CronAddParamsSchema/CronJobPatchSchema because the gateway schemas
|
|
// contain nested unions. Tool schemas need to stay provider-friendly, so we
|
|
// accept "any object" here and validate at runtime.
|
|
|
|
const CRON_ACTIONS = ["status", "list", "add", "update", "remove", "run", "runs", "wake"] as const;
|
|
|
|
const CRON_WAKE_MODES = ["now", "next-heartbeat"] as const;
|
|
|
|
const REMINDER_CONTEXT_MESSAGES = 3;
|
|
const REMINDER_CONTEXT_PER_MESSAGE_MAX = 220;
|
|
const REMINDER_CONTEXT_TOTAL_MAX = 700;
|
|
const REMINDER_CONTEXT_MARKER = "\n\nRecent context:\n";
|
|
|
|
// Flattened schema: runtime validates per-action requirements.
|
|
const CronToolSchema = Type.Object({
|
|
action: stringEnum(CRON_ACTIONS),
|
|
gatewayUrl: Type.Optional(Type.String()),
|
|
gatewayToken: Type.Optional(Type.String()),
|
|
timeoutMs: Type.Optional(Type.Number()),
|
|
includeDisabled: Type.Optional(Type.Boolean()),
|
|
job: Type.Optional(Type.Object({}, { additionalProperties: true })),
|
|
jobId: Type.Optional(Type.String()),
|
|
id: Type.Optional(Type.String()),
|
|
patch: Type.Optional(Type.Object({}, { additionalProperties: true })),
|
|
text: Type.Optional(Type.String()),
|
|
mode: optionalStringEnum(CRON_WAKE_MODES),
|
|
});
|
|
|
|
type CronToolOptions = {
|
|
agentSessionKey?: string;
|
|
};
|
|
|
|
type ChatMessage = {
|
|
role?: unknown;
|
|
content?: unknown;
|
|
};
|
|
|
|
function stripExistingContext(text: string) {
|
|
const index = text.indexOf(REMINDER_CONTEXT_MARKER);
|
|
if (index === -1) return text;
|
|
return text.slice(0, index).trim();
|
|
}
|
|
|
|
function truncateText(input: string, maxLen: number) {
|
|
if (input.length <= maxLen) return input;
|
|
const truncated = truncateUtf16Safe(input, Math.max(0, maxLen - 3)).trimEnd();
|
|
return `${truncated}...`;
|
|
}
|
|
|
|
function normalizeContextText(raw: string) {
|
|
return raw.replace(/\s+/g, " ").trim();
|
|
}
|
|
|
|
function extractMessageText(message: ChatMessage): { role: string; text: string } | null {
|
|
const role = typeof message.role === "string" ? message.role : "";
|
|
if (role !== "user" && role !== "assistant") return null;
|
|
const content = message.content;
|
|
if (typeof content === "string") {
|
|
const normalized = normalizeContextText(content);
|
|
return normalized ? { role, text: normalized } : null;
|
|
}
|
|
if (!Array.isArray(content)) return null;
|
|
const chunks: string[] = [];
|
|
for (const block of content) {
|
|
if (!block || typeof block !== "object") continue;
|
|
if ((block as { type?: unknown }).type !== "text") continue;
|
|
const text = (block as { text?: unknown }).text;
|
|
if (typeof text === "string" && text.trim()) {
|
|
chunks.push(text);
|
|
}
|
|
}
|
|
const joined = normalizeContextText(chunks.join(" "));
|
|
return joined ? { role, text: joined } : null;
|
|
}
|
|
|
|
async function buildReminderContextLines(params: {
|
|
agentSessionKey?: string;
|
|
gatewayOpts: GatewayCallOptions;
|
|
}) {
|
|
const sessionKey = params.agentSessionKey?.trim();
|
|
if (!sessionKey) return [];
|
|
const cfg = loadConfig();
|
|
const { mainKey, alias } = resolveMainSessionAlias(cfg);
|
|
const resolvedKey = resolveInternalSessionKey({ key: sessionKey, alias, mainKey });
|
|
try {
|
|
const res = (await callGatewayTool("chat.history", params.gatewayOpts, {
|
|
sessionKey: resolvedKey,
|
|
limit: 12,
|
|
})) as { messages?: unknown[] };
|
|
const messages = Array.isArray(res?.messages) ? res.messages : [];
|
|
const parsed = messages
|
|
.map((msg) => extractMessageText(msg as ChatMessage))
|
|
.filter((msg): msg is { role: string; text: string } => Boolean(msg));
|
|
const recent = parsed.slice(-REMINDER_CONTEXT_MESSAGES);
|
|
if (recent.length === 0) return [];
|
|
const lines: string[] = [];
|
|
let total = 0;
|
|
for (const entry of recent) {
|
|
const label = entry.role === "user" ? "User" : "Assistant";
|
|
const text = truncateText(entry.text, REMINDER_CONTEXT_PER_MESSAGE_MAX);
|
|
const line = `- ${label}: ${text}`;
|
|
total += line.length;
|
|
if (total > REMINDER_CONTEXT_TOTAL_MAX) break;
|
|
lines.push(line);
|
|
}
|
|
return lines;
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
export function createCronTool(opts?: CronToolOptions): AnyAgentTool {
|
|
return {
|
|
label: "Cron",
|
|
name: "cron",
|
|
description:
|
|
"Manage Gateway cron jobs (status/list/add/update/remove/run/runs) and send wake events. Use `jobId` as the canonical identifier; `id` is accepted for compatibility.",
|
|
parameters: CronToolSchema,
|
|
execute: async (_toolCallId, args) => {
|
|
const params = args as Record<string, unknown>;
|
|
const action = readStringParam(params, "action", { required: true });
|
|
const gatewayOpts: GatewayCallOptions = {
|
|
gatewayUrl: readStringParam(params, "gatewayUrl", { trim: false }),
|
|
gatewayToken: readStringParam(params, "gatewayToken", { trim: false }),
|
|
timeoutMs: typeof params.timeoutMs === "number" ? params.timeoutMs : undefined,
|
|
};
|
|
|
|
switch (action) {
|
|
case "status":
|
|
return jsonResult(await callGatewayTool("cron.status", gatewayOpts, {}));
|
|
case "list":
|
|
return jsonResult(
|
|
await callGatewayTool("cron.list", gatewayOpts, {
|
|
includeDisabled: Boolean(params.includeDisabled),
|
|
}),
|
|
);
|
|
case "add": {
|
|
if (!params.job || typeof params.job !== "object") {
|
|
throw new Error("job required");
|
|
}
|
|
const job = normalizeCronJobCreate(params.job) ?? params.job;
|
|
if (
|
|
job &&
|
|
typeof job === "object" &&
|
|
"payload" in job &&
|
|
(job as { payload?: { kind?: string; text?: string } }).payload?.kind === "systemEvent"
|
|
) {
|
|
const payload = (job as { payload: { kind: string; text: string } }).payload;
|
|
if (typeof payload.text === "string" && payload.text.trim()) {
|
|
const contextLines = await buildReminderContextLines({
|
|
agentSessionKey: opts?.agentSessionKey,
|
|
gatewayOpts,
|
|
});
|
|
if (contextLines.length > 0) {
|
|
const baseText = stripExistingContext(payload.text);
|
|
payload.text = `${baseText}${REMINDER_CONTEXT_MARKER}${contextLines.join("\n")}`;
|
|
}
|
|
}
|
|
}
|
|
return jsonResult(await callGatewayTool("cron.add", gatewayOpts, job));
|
|
}
|
|
case "update": {
|
|
const id = readStringParam(params, "jobId") ?? readStringParam(params, "id");
|
|
if (!id) {
|
|
throw new Error("jobId required (id accepted for backward compatibility)");
|
|
}
|
|
if (!params.patch || typeof params.patch !== "object") {
|
|
throw new Error("patch required");
|
|
}
|
|
const patch = normalizeCronJobPatch(params.patch) ?? params.patch;
|
|
return jsonResult(
|
|
await callGatewayTool("cron.update", gatewayOpts, {
|
|
id,
|
|
patch,
|
|
}),
|
|
);
|
|
}
|
|
case "remove": {
|
|
const id = readStringParam(params, "jobId") ?? readStringParam(params, "id");
|
|
if (!id) {
|
|
throw new Error("jobId required (id accepted for backward compatibility)");
|
|
}
|
|
return jsonResult(await callGatewayTool("cron.remove", gatewayOpts, { id }));
|
|
}
|
|
case "run": {
|
|
const id = readStringParam(params, "jobId") ?? readStringParam(params, "id");
|
|
if (!id) {
|
|
throw new Error("jobId required (id accepted for backward compatibility)");
|
|
}
|
|
return jsonResult(await callGatewayTool("cron.run", gatewayOpts, { id }));
|
|
}
|
|
case "runs": {
|
|
const id = readStringParam(params, "jobId") ?? readStringParam(params, "id");
|
|
if (!id) {
|
|
throw new Error("jobId required (id accepted for backward compatibility)");
|
|
}
|
|
return jsonResult(await callGatewayTool("cron.runs", gatewayOpts, { id }));
|
|
}
|
|
case "wake": {
|
|
const text = readStringParam(params, "text", { required: true });
|
|
const mode =
|
|
params.mode === "now" || params.mode === "next-heartbeat"
|
|
? params.mode
|
|
: "next-heartbeat";
|
|
return jsonResult(
|
|
await callGatewayTool("wake", gatewayOpts, { mode, text }, { expectFinal: false }),
|
|
);
|
|
}
|
|
default:
|
|
throw new Error(`Unknown action: ${action}`);
|
|
}
|
|
},
|
|
};
|
|
}
|