refactor: cron payload migration cleanup (#621)
* refactor: centralize cron payload migration * test: stabilize block streaming mocks * test: adjust chunker fence-close case
This commit is contained in:
committed by
GitHub
parent
e3c340fd38
commit
98d0318d4e
34
src/agents/pi-embedded-block-chunker.test.ts
Normal file
34
src/agents/pi-embedded-block-chunker.test.ts
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
|
||||||
|
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
|
||||||
|
|
||||||
|
describe("EmbeddedBlockChunker", () => {
|
||||||
|
it("breaks at paragraph boundary right after fence close", () => {
|
||||||
|
const chunker = new EmbeddedBlockChunker({
|
||||||
|
minChars: 1,
|
||||||
|
maxChars: 40,
|
||||||
|
breakPreference: "paragraph",
|
||||||
|
});
|
||||||
|
|
||||||
|
const text = [
|
||||||
|
"Intro",
|
||||||
|
"```js",
|
||||||
|
"console.log('x')",
|
||||||
|
"```",
|
||||||
|
"",
|
||||||
|
"After first line",
|
||||||
|
"After second line",
|
||||||
|
].join("\n");
|
||||||
|
|
||||||
|
chunker.append(text);
|
||||||
|
|
||||||
|
const chunks: string[] = [];
|
||||||
|
chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) });
|
||||||
|
|
||||||
|
expect(chunks.length).toBe(1);
|
||||||
|
expect(chunks[0]).toContain("console.log");
|
||||||
|
expect(chunks[0]).toMatch(/```\n?$/);
|
||||||
|
expect(chunks[0]).not.toContain("After");
|
||||||
|
expect(chunker.bufferedText).toMatch(/^After/);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -2,30 +2,29 @@ import path from "node:path";
|
|||||||
|
|
||||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
import { runEmbeddedPiAgent as runEmbeddedPiAgentRunner } from "/src/agents/pi-embedded.js";
|
|
||||||
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
|
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
|
||||||
import { loadModelCatalog } from "../agents/model-catalog.js";
|
import { loadModelCatalog } from "../agents/model-catalog.js";
|
||||||
import { runEmbeddedPiAgent as runEmbeddedPiAgentAutoReply } from "../agents/pi-embedded.js";
|
|
||||||
import { getReplyFromConfig } from "./reply.js";
|
import { getReplyFromConfig } from "./reply.js";
|
||||||
|
|
||||||
vi.mock("/src/agents/pi-embedded.js", () => ({
|
type RunEmbeddedPiAgent =
|
||||||
|
typeof import("../agents/pi-embedded.js").runEmbeddedPiAgent;
|
||||||
|
type RunEmbeddedPiAgentParams = Parameters<RunEmbeddedPiAgent>[0];
|
||||||
|
|
||||||
|
const piEmbeddedMock = vi.hoisted(() => ({
|
||||||
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
|
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
|
||||||
runEmbeddedPiAgent: vi.fn(),
|
runEmbeddedPiAgent: vi.fn<
|
||||||
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
ReturnType<RunEmbeddedPiAgent>,
|
||||||
resolveEmbeddedSessionLane: (key: string) =>
|
Parameters<RunEmbeddedPiAgent>
|
||||||
`session:${key.trim() || "main"}`,
|
>(),
|
||||||
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
|
||||||
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
|
|
||||||
}));
|
|
||||||
vi.mock("../agents/pi-embedded.js", () => ({
|
|
||||||
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
|
|
||||||
runEmbeddedPiAgent: vi.fn(),
|
|
||||||
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
||||||
resolveEmbeddedSessionLane: (key: string) =>
|
resolveEmbeddedSessionLane: (key: string) =>
|
||||||
`session:${key.trim() || "main"}`,
|
`session:${key.trim() || "main"}`,
|
||||||
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
||||||
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
|
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
vi.mock("/src/agents/pi-embedded.js", () => piEmbeddedMock);
|
||||||
|
vi.mock("../agents/pi-embedded.js", () => piEmbeddedMock);
|
||||||
vi.mock("../agents/model-catalog.js", () => ({
|
vi.mock("../agents/model-catalog.js", () => ({
|
||||||
loadModelCatalog: vi.fn(),
|
loadModelCatalog: vi.fn(),
|
||||||
}));
|
}));
|
||||||
@@ -36,8 +35,11 @@ async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
|
|||||||
|
|
||||||
describe("block streaming", () => {
|
describe("block streaming", () => {
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
vi.mocked(runEmbeddedPiAgentAutoReply).mockReset();
|
piEmbeddedMock.abortEmbeddedPiRun.mockReset().mockReturnValue(false);
|
||||||
vi.mocked(runEmbeddedPiAgentRunner).mockReset();
|
piEmbeddedMock.queueEmbeddedPiMessage.mockReset().mockReturnValue(false);
|
||||||
|
piEmbeddedMock.isEmbeddedPiRunActive.mockReset().mockReturnValue(false);
|
||||||
|
piEmbeddedMock.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false);
|
||||||
|
piEmbeddedMock.runEmbeddedPiAgent.mockReset();
|
||||||
vi.mocked(loadModelCatalog).mockResolvedValue([
|
vi.mocked(loadModelCatalog).mockResolvedValue([
|
||||||
{ id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" },
|
{ id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" },
|
||||||
{ id: "gpt-4.1-mini", name: "GPT-4.1 Mini", provider: "openai" },
|
{ id: "gpt-4.1-mini", name: "GPT-4.1 Mini", provider: "openai" },
|
||||||
@@ -63,9 +65,7 @@ describe("block streaming", () => {
|
|||||||
const onReplyStart = vi.fn(() => typingGate);
|
const onReplyStart = vi.fn(() => typingGate);
|
||||||
const onBlockReply = vi.fn().mockResolvedValue(undefined);
|
const onBlockReply = vi.fn().mockResolvedValue(undefined);
|
||||||
|
|
||||||
const impl = async (
|
const impl = async (params: RunEmbeddedPiAgentParams) => {
|
||||||
params: Parameters<typeof runEmbeddedPiAgentRunner>[0],
|
|
||||||
) => {
|
|
||||||
void params.onBlockReply?.({ text: "hello" });
|
void params.onBlockReply?.({ text: "hello" });
|
||||||
return {
|
return {
|
||||||
payloads: [{ text: "hello" }],
|
payloads: [{ text: "hello" }],
|
||||||
@@ -75,8 +75,7 @@ describe("block streaming", () => {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl);
|
piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl);
|
||||||
vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl);
|
|
||||||
|
|
||||||
const replyPromise = getReplyFromConfig(
|
const replyPromise = getReplyFromConfig(
|
||||||
{
|
{
|
||||||
@@ -124,9 +123,7 @@ describe("block streaming", () => {
|
|||||||
seen.push(payload.text ?? "");
|
seen.push(payload.text ?? "");
|
||||||
});
|
});
|
||||||
|
|
||||||
const impl = async (
|
const impl = async (params: RunEmbeddedPiAgentParams) => {
|
||||||
params: Parameters<typeof runEmbeddedPiAgentRunner>[0],
|
|
||||||
) => {
|
|
||||||
void params.onBlockReply?.({ text: "first" });
|
void params.onBlockReply?.({ text: "first" });
|
||||||
void params.onBlockReply?.({ text: "second" });
|
void params.onBlockReply?.({ text: "second" });
|
||||||
return {
|
return {
|
||||||
@@ -137,8 +134,7 @@ describe("block streaming", () => {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl);
|
piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl);
|
||||||
vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl);
|
|
||||||
|
|
||||||
const replyPromise = getReplyFromConfig(
|
const replyPromise = getReplyFromConfig(
|
||||||
{
|
{
|
||||||
@@ -178,9 +174,7 @@ describe("block streaming", () => {
|
|||||||
await withTempHome(async (home) => {
|
await withTempHome(async (home) => {
|
||||||
const onBlockReply = vi.fn().mockResolvedValue(undefined);
|
const onBlockReply = vi.fn().mockResolvedValue(undefined);
|
||||||
|
|
||||||
const impl = async (
|
const impl = async (params: RunEmbeddedPiAgentParams) => {
|
||||||
params: Parameters<typeof runEmbeddedPiAgentRunner>[0],
|
|
||||||
) => {
|
|
||||||
void params.onBlockReply?.({ text: "chunk-1" });
|
void params.onBlockReply?.({ text: "chunk-1" });
|
||||||
return {
|
return {
|
||||||
payloads: [{ text: "chunk-1\nchunk-2" }],
|
payloads: [{ text: "chunk-1\nchunk-2" }],
|
||||||
@@ -190,8 +184,7 @@ describe("block streaming", () => {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl);
|
piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl);
|
||||||
vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl);
|
|
||||||
|
|
||||||
const res = await getReplyFromConfig(
|
const res = await getReplyFromConfig(
|
||||||
{
|
{
|
||||||
@@ -238,9 +231,7 @@ describe("block streaming", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
const impl = async (
|
const impl = async (params: RunEmbeddedPiAgentParams) => {
|
||||||
params: Parameters<typeof runEmbeddedPiAgentRunner>[0],
|
|
||||||
) => {
|
|
||||||
void params.onBlockReply?.({ text: "streamed" });
|
void params.onBlockReply?.({ text: "streamed" });
|
||||||
return {
|
return {
|
||||||
payloads: [{ text: "final" }],
|
payloads: [{ text: "final" }],
|
||||||
@@ -250,8 +241,7 @@ describe("block streaming", () => {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl);
|
piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl);
|
||||||
vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl);
|
|
||||||
|
|
||||||
const replyPromise = getReplyFromConfig(
|
const replyPromise = getReplyFromConfig(
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ describe("normalizeCronJobCreate", () => {
|
|||||||
kind: "agentTurn",
|
kind: "agentTurn",
|
||||||
message: "hi",
|
message: "hi",
|
||||||
deliver: true,
|
deliver: true,
|
||||||
channel: "telegram",
|
channel: " TeLeGrAm ",
|
||||||
to: "7200373102",
|
to: "7200373102",
|
||||||
},
|
},
|
||||||
}) as unknown as Record<string, unknown>;
|
}) as unknown as Record<string, unknown>;
|
||||||
@@ -23,4 +23,24 @@ describe("normalizeCronJobCreate", () => {
|
|||||||
expect(payload.provider).toBe("telegram");
|
expect(payload.provider).toBe("telegram");
|
||||||
expect("channel" in payload).toBe(false);
|
expect("channel" in payload).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("canonicalizes payload.provider casing", () => {
|
||||||
|
const normalized = normalizeCronJobCreate({
|
||||||
|
name: "legacy provider",
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "cron", expr: "* * * * *" },
|
||||||
|
sessionTarget: "isolated",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: {
|
||||||
|
kind: "agentTurn",
|
||||||
|
message: "hi",
|
||||||
|
deliver: true,
|
||||||
|
provider: "Telegram",
|
||||||
|
to: "7200373102",
|
||||||
|
},
|
||||||
|
}) as unknown as Record<string, unknown>;
|
||||||
|
|
||||||
|
const payload = normalized.payload as Record<string, unknown>;
|
||||||
|
expect(payload.provider).toBe("telegram");
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { migrateLegacyCronPayload } from "./payload-migration.js";
|
||||||
import type { CronJobCreate, CronJobPatch } from "./types.js";
|
import type { CronJobCreate, CronJobPatch } from "./types.js";
|
||||||
|
|
||||||
type UnknownRecord = Record<string, unknown>;
|
type UnknownRecord = Record<string, unknown>;
|
||||||
@@ -34,15 +35,7 @@ function coercePayload(payload: UnknownRecord) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Back-compat: older configs used `channel` for delivery provider.
|
// Back-compat: older configs used `channel` for delivery provider.
|
||||||
const providerRaw =
|
migrateLegacyCronPayload(next);
|
||||||
typeof payload.provider === "string" ? payload.provider.trim() : "";
|
|
||||||
const channelRaw =
|
|
||||||
typeof payload.channel === "string" ? payload.channel.trim() : "";
|
|
||||||
const provider =
|
|
||||||
(providerRaw || channelRaw).trim().toLowerCase() ||
|
|
||||||
(providerRaw || channelRaw).trim();
|
|
||||||
if (!providerRaw && provider) next.provider = provider;
|
|
||||||
if ("channel" in next) delete next.channel;
|
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
38
src/cron/payload-migration.ts
Normal file
38
src/cron/payload-migration.ts
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
type UnknownRecord = Record<string, unknown>;
|
||||||
|
|
||||||
|
function readString(value: unknown): string | undefined {
|
||||||
|
if (typeof value !== "string") return undefined;
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
function normalizeProvider(value: string): string {
|
||||||
|
return value.trim().toLowerCase();
|
||||||
|
}
|
||||||
|
|
||||||
|
export function migrateLegacyCronPayload(payload: UnknownRecord): boolean {
|
||||||
|
let mutated = false;
|
||||||
|
|
||||||
|
const providerValue = readString(payload.provider);
|
||||||
|
const channelValue = readString(payload.channel);
|
||||||
|
|
||||||
|
const nextProvider =
|
||||||
|
typeof providerValue === "string" && providerValue.trim().length > 0
|
||||||
|
? normalizeProvider(providerValue)
|
||||||
|
: typeof channelValue === "string" && channelValue.trim().length > 0
|
||||||
|
? normalizeProvider(channelValue)
|
||||||
|
: "";
|
||||||
|
|
||||||
|
if (nextProvider) {
|
||||||
|
if (providerValue !== nextProvider) {
|
||||||
|
payload.provider = nextProvider;
|
||||||
|
mutated = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ("channel" in payload) {
|
||||||
|
delete payload.channel;
|
||||||
|
mutated = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return mutated;
|
||||||
|
}
|
||||||
@@ -136,7 +136,7 @@ describe("CronService", () => {
|
|||||||
kind: "agentTurn",
|
kind: "agentTurn",
|
||||||
message: "hi",
|
message: "hi",
|
||||||
deliver: true,
|
deliver: true,
|
||||||
channel: "telegram",
|
channel: " TeLeGrAm ",
|
||||||
to: "7200373102",
|
to: "7200373102",
|
||||||
},
|
},
|
||||||
state: {},
|
state: {},
|
||||||
@@ -169,6 +169,56 @@ describe("CronService", () => {
|
|||||||
await store.cleanup();
|
await store.cleanup();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("canonicalizes payload.provider casing on load", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
const requestHeartbeatNow = vi.fn();
|
||||||
|
|
||||||
|
const rawJob = {
|
||||||
|
id: "legacy-2",
|
||||||
|
name: "legacy",
|
||||||
|
enabled: true,
|
||||||
|
createdAtMs: Date.now(),
|
||||||
|
updatedAtMs: Date.now(),
|
||||||
|
schedule: { kind: "cron", expr: "* * * * *" },
|
||||||
|
sessionTarget: "isolated",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: {
|
||||||
|
kind: "agentTurn",
|
||||||
|
message: "hi",
|
||||||
|
deliver: true,
|
||||||
|
provider: "Telegram",
|
||||||
|
to: "7200373102",
|
||||||
|
},
|
||||||
|
state: {},
|
||||||
|
};
|
||||||
|
|
||||||
|
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||||
|
await fs.writeFile(
|
||||||
|
store.storePath,
|
||||||
|
JSON.stringify({ version: 1, jobs: [rawJob] }, null, 2),
|
||||||
|
"utf-8",
|
||||||
|
);
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent,
|
||||||
|
requestHeartbeatNow,
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
const jobs = await cron.list({ includeDisabled: true });
|
||||||
|
const job = jobs.find((j) => j.id === rawJob.id);
|
||||||
|
const payload = job?.payload as unknown as Record<string, unknown>;
|
||||||
|
expect(payload.provider).toBe("telegram");
|
||||||
|
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
it("posts last output to main even when isolated job errors", async () => {
|
it("posts last output to main even when isolated job errors", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const enqueueSystemEvent = vi.fn();
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import crypto from "node:crypto";
|
import crypto from "node:crypto";
|
||||||
|
|
||||||
import { truncateUtf16Safe } from "../utils.js";
|
import { truncateUtf16Safe } from "../utils.js";
|
||||||
|
import { migrateLegacyCronPayload } from "./payload-migration.js";
|
||||||
import { computeNextRunAtMs } from "./schedule.js";
|
import { computeNextRunAtMs } from "./schedule.js";
|
||||||
import { loadCronStore, saveCronStore } from "./store.js";
|
import { loadCronStore, saveCronStore } from "./store.js";
|
||||||
import type {
|
import type {
|
||||||
@@ -320,22 +321,7 @@ export class CronService {
|
|||||||
|
|
||||||
const payload = raw.payload;
|
const payload = raw.payload;
|
||||||
if (payload && typeof payload === "object" && !Array.isArray(payload)) {
|
if (payload && typeof payload === "object" && !Array.isArray(payload)) {
|
||||||
const legacyChannel =
|
if (migrateLegacyCronPayload(payload as Record<string, unknown>)) {
|
||||||
typeof (payload as Record<string, unknown>).channel === "string"
|
|
||||||
? String((payload as Record<string, unknown>).channel).trim()
|
|
||||||
: "";
|
|
||||||
const provider =
|
|
||||||
typeof (payload as Record<string, unknown>).provider === "string"
|
|
||||||
? String((payload as Record<string, unknown>).provider).trim()
|
|
||||||
: "";
|
|
||||||
// Back-compat: older cron payloads used `channel` for delivery provider.
|
|
||||||
if (!provider && legacyChannel) {
|
|
||||||
(payload as Record<string, unknown>).provider =
|
|
||||||
legacyChannel.toLowerCase();
|
|
||||||
mutated = true;
|
|
||||||
}
|
|
||||||
if ("channel" in (payload as Record<string, unknown>)) {
|
|
||||||
delete (payload as Record<string, unknown>).channel;
|
|
||||||
mutated = true;
|
mutated = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user