From 98d0318d4ef0ec48aa13b58064f768e27a1cda79 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 22:56:55 +0000 Subject: [PATCH] refactor: cron payload migration cleanup (#621) * refactor: centralize cron payload migration * test: stabilize block streaming mocks * test: adjust chunker fence-close case --- src/agents/pi-embedded-block-chunker.test.ts | 34 +++++++++++ src/auto-reply/reply.block-streaming.test.ts | 60 ++++++++------------ src/cron/normalize.test.ts | 22 ++++++- src/cron/normalize.ts | 11 +--- src/cron/payload-migration.ts | 38 +++++++++++++ src/cron/service.test.ts | 52 ++++++++++++++++- src/cron/service.ts | 18 +----- 7 files changed, 173 insertions(+), 62 deletions(-) create mode 100644 src/agents/pi-embedded-block-chunker.test.ts create mode 100644 src/cron/payload-migration.ts diff --git a/src/agents/pi-embedded-block-chunker.test.ts b/src/agents/pi-embedded-block-chunker.test.ts new file mode 100644 index 000000000..ae48f3844 --- /dev/null +++ b/src/agents/pi-embedded-block-chunker.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, it } from "vitest"; + +import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; + +describe("EmbeddedBlockChunker", () => { + it("breaks at paragraph boundary right after fence close", () => { + const chunker = new EmbeddedBlockChunker({ + minChars: 1, + maxChars: 40, + breakPreference: "paragraph", + }); + + const text = [ + "Intro", + "```js", + "console.log('x')", + "```", + "", + "After first line", + "After second line", + ].join("\n"); + + chunker.append(text); + + const chunks: string[] = []; + chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) }); + + expect(chunks.length).toBe(1); + expect(chunks[0]).toContain("console.log"); + expect(chunks[0]).toMatch(/```\n?$/); + expect(chunks[0]).not.toContain("After"); + expect(chunker.bufferedText).toMatch(/^After/); + }); +}); diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index c11bd492c..8cb8ab767 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -2,30 +2,29 @@ import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import { runEmbeddedPiAgent as runEmbeddedPiAgentRunner } from "/src/agents/pi-embedded.js"; import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; import { loadModelCatalog } from "../agents/model-catalog.js"; -import { runEmbeddedPiAgent as runEmbeddedPiAgentAutoReply } from "../agents/pi-embedded.js"; import { getReplyFromConfig } from "./reply.js"; -vi.mock("/src/agents/pi-embedded.js", () => ({ +type RunEmbeddedPiAgent = + typeof import("../agents/pi-embedded.js").runEmbeddedPiAgent; +type RunEmbeddedPiAgentParams = Parameters[0]; + +const piEmbeddedMock = vi.hoisted(() => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), - runEmbeddedPiAgent: vi.fn(), - queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), - resolveEmbeddedSessionLane: (key: string) => - `session:${key.trim() || "main"}`, - isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), - isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), -})); -vi.mock("../agents/pi-embedded.js", () => ({ - abortEmbeddedPiRun: vi.fn().mockReturnValue(false), - runEmbeddedPiAgent: vi.fn(), + runEmbeddedPiAgent: vi.fn< + ReturnType, + Parameters + >(), queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), })); + +vi.mock("/src/agents/pi-embedded.js", () => piEmbeddedMock); +vi.mock("../agents/pi-embedded.js", () => piEmbeddedMock); vi.mock("../agents/model-catalog.js", () => ({ loadModelCatalog: vi.fn(), })); @@ -36,8 +35,11 @@ async function withTempHome(fn: (home: string) => Promise): Promise { describe("block streaming", () => { beforeEach(() => { - vi.mocked(runEmbeddedPiAgentAutoReply).mockReset(); - vi.mocked(runEmbeddedPiAgentRunner).mockReset(); + piEmbeddedMock.abortEmbeddedPiRun.mockReset().mockReturnValue(false); + piEmbeddedMock.queueEmbeddedPiMessage.mockReset().mockReturnValue(false); + piEmbeddedMock.isEmbeddedPiRunActive.mockReset().mockReturnValue(false); + piEmbeddedMock.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false); + piEmbeddedMock.runEmbeddedPiAgent.mockReset(); vi.mocked(loadModelCatalog).mockResolvedValue([ { id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" }, { id: "gpt-4.1-mini", name: "GPT-4.1 Mini", provider: "openai" }, @@ -63,9 +65,7 @@ describe("block streaming", () => { const onReplyStart = vi.fn(() => typingGate); const onBlockReply = vi.fn().mockResolvedValue(undefined); - const impl = async ( - params: Parameters[0], - ) => { + const impl = async (params: RunEmbeddedPiAgentParams) => { void params.onBlockReply?.({ text: "hello" }); return { payloads: [{ text: "hello" }], @@ -75,8 +75,7 @@ describe("block streaming", () => { }, }; }; - vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl); - vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl); + piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); const replyPromise = getReplyFromConfig( { @@ -124,9 +123,7 @@ describe("block streaming", () => { seen.push(payload.text ?? ""); }); - const impl = async ( - params: Parameters[0], - ) => { + const impl = async (params: RunEmbeddedPiAgentParams) => { void params.onBlockReply?.({ text: "first" }); void params.onBlockReply?.({ text: "second" }); return { @@ -137,8 +134,7 @@ describe("block streaming", () => { }, }; }; - vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl); - vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl); + piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); const replyPromise = getReplyFromConfig( { @@ -178,9 +174,7 @@ describe("block streaming", () => { await withTempHome(async (home) => { const onBlockReply = vi.fn().mockResolvedValue(undefined); - const impl = async ( - params: Parameters[0], - ) => { + const impl = async (params: RunEmbeddedPiAgentParams) => { void params.onBlockReply?.({ text: "chunk-1" }); return { payloads: [{ text: "chunk-1\nchunk-2" }], @@ -190,8 +184,7 @@ describe("block streaming", () => { }, }; }; - vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl); - vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl); + piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); const res = await getReplyFromConfig( { @@ -238,9 +231,7 @@ describe("block streaming", () => { }); }); - const impl = async ( - params: Parameters[0], - ) => { + const impl = async (params: RunEmbeddedPiAgentParams) => { void params.onBlockReply?.({ text: "streamed" }); return { payloads: [{ text: "final" }], @@ -250,8 +241,7 @@ describe("block streaming", () => { }, }; }; - vi.mocked(runEmbeddedPiAgentAutoReply).mockImplementation(impl); - vi.mocked(runEmbeddedPiAgentRunner).mockImplementation(impl); + piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); const replyPromise = getReplyFromConfig( { diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index 4dc6d830c..529db68b3 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -14,7 +14,7 @@ describe("normalizeCronJobCreate", () => { kind: "agentTurn", message: "hi", deliver: true, - channel: "telegram", + channel: " TeLeGrAm ", to: "7200373102", }, }) as unknown as Record; @@ -23,4 +23,24 @@ describe("normalizeCronJobCreate", () => { expect(payload.provider).toBe("telegram"); expect("channel" in payload).toBe(false); }); + + it("canonicalizes payload.provider casing", () => { + const normalized = normalizeCronJobCreate({ + name: "legacy provider", + enabled: true, + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { + kind: "agentTurn", + message: "hi", + deliver: true, + provider: "Telegram", + to: "7200373102", + }, + }) as unknown as Record; + + const payload = normalized.payload as Record; + expect(payload.provider).toBe("telegram"); + }); }); diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 2a4715c0b..6e29bf277 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -1,3 +1,4 @@ +import { migrateLegacyCronPayload } from "./payload-migration.js"; import type { CronJobCreate, CronJobPatch } from "./types.js"; type UnknownRecord = Record; @@ -34,15 +35,7 @@ function coercePayload(payload: UnknownRecord) { } // Back-compat: older configs used `channel` for delivery provider. - const providerRaw = - typeof payload.provider === "string" ? payload.provider.trim() : ""; - const channelRaw = - typeof payload.channel === "string" ? payload.channel.trim() : ""; - const provider = - (providerRaw || channelRaw).trim().toLowerCase() || - (providerRaw || channelRaw).trim(); - if (!providerRaw && provider) next.provider = provider; - if ("channel" in next) delete next.channel; + migrateLegacyCronPayload(next); return next; } diff --git a/src/cron/payload-migration.ts b/src/cron/payload-migration.ts new file mode 100644 index 000000000..6ca2122b1 --- /dev/null +++ b/src/cron/payload-migration.ts @@ -0,0 +1,38 @@ +type UnknownRecord = Record; + +function readString(value: unknown): string | undefined { + if (typeof value !== "string") return undefined; + return value; +} + +function normalizeProvider(value: string): string { + return value.trim().toLowerCase(); +} + +export function migrateLegacyCronPayload(payload: UnknownRecord): boolean { + let mutated = false; + + const providerValue = readString(payload.provider); + const channelValue = readString(payload.channel); + + const nextProvider = + typeof providerValue === "string" && providerValue.trim().length > 0 + ? normalizeProvider(providerValue) + : typeof channelValue === "string" && channelValue.trim().length > 0 + ? normalizeProvider(channelValue) + : ""; + + if (nextProvider) { + if (providerValue !== nextProvider) { + payload.provider = nextProvider; + mutated = true; + } + } + + if ("channel" in payload) { + delete payload.channel; + mutated = true; + } + + return mutated; +} diff --git a/src/cron/service.test.ts b/src/cron/service.test.ts index 781782936..34762abd8 100644 --- a/src/cron/service.test.ts +++ b/src/cron/service.test.ts @@ -136,7 +136,7 @@ describe("CronService", () => { kind: "agentTurn", message: "hi", deliver: true, - channel: "telegram", + channel: " TeLeGrAm ", to: "7200373102", }, state: {}, @@ -169,6 +169,56 @@ describe("CronService", () => { await store.cleanup(); }); + it("canonicalizes payload.provider casing on load", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + const rawJob = { + id: "legacy-2", + name: "legacy", + enabled: true, + createdAtMs: Date.now(), + updatedAtMs: Date.now(), + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { + kind: "agentTurn", + message: "hi", + deliver: true, + provider: "Telegram", + to: "7200373102", + }, + state: {}, + }; + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify({ version: 1, jobs: [rawJob] }, null, 2), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + const jobs = await cron.list({ includeDisabled: true }); + const job = jobs.find((j) => j.id === rawJob.id); + const payload = job?.payload as unknown as Record; + expect(payload.provider).toBe("telegram"); + + cron.stop(); + await store.cleanup(); + }); + it("posts last output to main even when isolated job errors", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); diff --git a/src/cron/service.ts b/src/cron/service.ts index 2391d57de..87046c62e 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -1,6 +1,7 @@ import crypto from "node:crypto"; import { truncateUtf16Safe } from "../utils.js"; +import { migrateLegacyCronPayload } from "./payload-migration.js"; import { computeNextRunAtMs } from "./schedule.js"; import { loadCronStore, saveCronStore } from "./store.js"; import type { @@ -320,22 +321,7 @@ export class CronService { const payload = raw.payload; if (payload && typeof payload === "object" && !Array.isArray(payload)) { - const legacyChannel = - typeof (payload as Record).channel === "string" - ? String((payload as Record).channel).trim() - : ""; - const provider = - typeof (payload as Record).provider === "string" - ? String((payload as Record).provider).trim() - : ""; - // Back-compat: older cron payloads used `channel` for delivery provider. - if (!provider && legacyChannel) { - (payload as Record).provider = - legacyChannel.toLowerCase(); - mutated = true; - } - if ("channel" in (payload as Record)) { - delete (payload as Record).channel; + if (migrateLegacyCronPayload(payload as Record)) { mutated = true; } }