diff --git a/CHANGELOG.md b/CHANGELOG.md index d8d1e547f..64a07d9ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - Skills: add user-invocable skill commands and expanded skill command registration. - Telegram: default reaction level to minimal and enable reaction notifications by default. - Telegram: allow reply-chain messages to bypass mention gating in groups. (#1038) — thanks @adityashaw2. +- Messages: mirror delivered outbound text/media into session transcripts. (#1031) — thanks @TSavo. - Cron: isolated cron jobs now start a fresh session id on every run to prevent context buildup. - Docs: add `/help` hub, Node/npm PATH guide, and expand directory CLI docs. - Config: support env var substitution in config values. (#1044) — thanks @sebslight. diff --git a/README.md b/README.md index 7c309d421..ca985e004 100644 --- a/README.md +++ b/README.md @@ -470,7 +470,6 @@ Special thanks to @andrewting19 for the Anthropic OAuth tool-name fix. Core contributors: - @cpojer — Telegram onboarding UX + docs -- @ThomsenDrake — Internal hooks system Thanks to all clawtributors: @@ -478,21 +477,21 @@ Thanks to all clawtributors: steipete bohdanpodvirnyi joaohlisboa mneves75 MatthieuBizien rahthakor vrknetha joshp123 mukhtharcm maxsumrall xadenryan Tobias Bischoff juanpablodlc hsrvc magimetal meaningfool NicholasSpisak abhisekbasu1 claude jamesgroat Hyaxia dantelex daveonkels radek-paclt mteam88 Eng. Juan Combetto dbhurley Mariano Belinky julianengel benithors - nachx639 sreekaransrinath gupsammy cristip73 nachoiacovino Vasanth Rao Naik Sabavat cpojer lc0rp scald andranik-sahakyan - davidguttman sleontenko sircrumpet peschee rafaelreis-r ratulsarna lutr0 thewilloftheshadow gumadeiras emanuelst - KristijanJovanovski CashWilliams rdev osolmaz kiranjd sebslight sheeek onutc manuelhettich minghinmatthewlam - myfunc buddyh mcinteerj timkrase gerardward2007 obviyus tosh-hamburg azade-c bjesuiter danielz1z - Josh Phillips roshanasingh4 superman32432432 Yurii Chukhlib antons austinm911 blacksmith-sh[bot] grp06 HeimdallStrategy imfing - jalehman jarvis-medmatic kkarimi mahmoudashraf93 petter-b pkrmf RandyVentures dan-dr erikpr1994 jonasjancarik - Keith the Silly Goose L36 Server mitschabaude-bot neist chrisrodz Friederike Seiler gabriel-trigo iamadig Kit koala73 - manmal ngutman ogulcancelik pasogott petradonka VACInc wes-davis zats Chris Taylor Django Navarro - evalexpr henrino3 oswalpalash pcty-nextgen-service-account rubyrunsstuff Syhids Aaron Konyer adam91holt erik-agens fcatuhe - ivanrvpereira jayhickey jeffersonwarrior jeffersonwarrior Jonathan D. Rhyne (DJ-D) jverdi mickahouan mjrussell mkbehr p6l-richard - philipp-spiess robaxelsen Sash Catanzarite tyler6204 VAC zknicker alejandro maza andrewting19 Asleep123 bolismauro - cash-echo-bot Clawd conhecendocontato Drake Thomsen gtsifrikas HazAT hrdwdmrbl hugobarauna Jamie Openshaw Jarvis - Jefferson Nunn kitze levifig Lloyd longmaba loukotal Marc martinpucik Miles mrdbstn - MSch Mustafa Tag Eldeen ndraiman nexty5870 prathamdby reeltimeapps RLTCmpe Rolf Fredheim Rony Kelner Samrat Jha - siraht snopoke The Admiral Ubuntu voidserf wstock YuriNachos Zach Knickerbocker Alphonse-arianee Azade - carlulsoe ddyo Erik latitudeki5223 Manuel Maly Mourad Boustani pcty-nextgen-ios-builder Quentin Randy Torres ronak-guliani - thesash William Stock Szpadel + timolins nachx639 sreekaransrinath gupsammy cristip73 nachoiacovino Vasanth Rao Naik Sabavat cpojer lc0rp scald + andranik-sahakyan davidguttman sleontenko sircrumpet peschee rafaelreis-r ratulsarna thewilloftheshadow lutr0 gumadeiras + emanuelst KristijanJovanovski CashWilliams rdev osolmaz kiranjd adityashaw2 sebslight sheeek onutc + manuelhettich minghinmatthewlam myfunc buddyh mcinteerj timkrase gerardward2007 obviyus tosh-hamburg azade-c + bjesuiter danielz1z Josh Phillips roshanasingh4 YuriNachos superman32432432 Yurii Chukhlib antons austinm911 blacksmith-sh[bot] + grp06 HeimdallStrategy imfing jalehman jarvis-medmatic kkarimi mahmoudashraf93 petter-b pkrmf RandyVentures + dan-dr erikpr1994 jonasjancarik Keith the Silly Goose L36 Server Marc mitschabaude-bot neist chrisrodz Friederike Seiler + gabriel-trigo iamadig Kit koala73 manmal ngutman ogulcancelik pasogott petradonka rubyrunsstuff + VACInc wes-davis zats Chris Taylor Django Navarro evalexpr henrino3 oswalpalash pcty-nextgen-service-account Syhids + tyler6204 Aaron Konyer adam91holt erik-agens fcatuhe ivanrvpereira jayhickey jeffersonwarrior jeffersonwarrior Jonathan D. Rhyne (DJ-D) + jverdi mickahouan mjrussell mkbehr p6l-richard philipp-spiess robaxelsen Sash Catanzarite VAC zknicker + alejandro maza andrewting19 Asleep123 bolismauro cash-echo-bot Clawd conhecendocontato Drake Thomsen gtsifrikas HazAT + hrdwdmrbl hugobarauna Jamie Openshaw Jarvis Jefferson Nunn kitze levifig Lloyd longmaba loukotal + martinpucik Miles mrdbstn MSch Mustafa Tag Eldeen ndraiman nexty5870 prathamdby reeltimeapps RLTCmpe + Rolf Fredheim Rony Kelner Samrat Jha siraht snopoke suminhthanh The Admiral thesash Ubuntu voidserf + wstock Zach Knickerbocker Alphonse-arianee Azade carlulsoe ddyo Erik latitudeki5223 Manuel Maly Mourad Boustani + pcty-nextgen-ios-builder Quentin Randy Torres ronak-guliani William Stock

diff --git a/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift b/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift index 6be608f70..5e8ac5e2c 100644 --- a/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift +++ b/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift @@ -347,6 +347,7 @@ public struct SendParams: Codable, Sendable { public let gifplayback: Bool? public let channel: String? public let accountid: String? + public let sessionkey: String? public let idempotencykey: String public init( @@ -356,6 +357,7 @@ public struct SendParams: Codable, Sendable { gifplayback: Bool?, channel: String?, accountid: String?, + sessionkey: String? = nil, idempotencykey: String ) { self.to = to @@ -364,6 +366,7 @@ public struct SendParams: Codable, Sendable { self.gifplayback = gifplayback self.channel = channel self.accountid = accountid + self.sessionkey = sessionkey self.idempotencykey = idempotencykey } private enum CodingKeys: String, CodingKey { @@ -373,6 +376,7 @@ public struct SendParams: Codable, Sendable { case gifplayback = "gifPlayback" case channel case accountid = "accountId" + case sessionkey = "sessionKey" case idempotencykey = "idempotencyKey" } } diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 3fc3fedde..1cfaba9e3 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -101,7 +101,9 @@ Common `agentTurn` fields: - `bestEffortDeliver`: avoid failing the job if delivery fails. Isolation options (only for `session=isolated`): -- `postToMainPrefix` (CLI: `--post-prefix`): prefix for the summary system event in main. +- `postToMainPrefix` (CLI: `--post-prefix`): prefix for the system event in main. +- `postToMainMode`: `summary` (default) or `full`. +- `postToMainMaxChars`: max chars when `postToMainMode=full` (default 8000). ### Model and thinking overrides Isolated jobs (`agentTurn`) can override the model and thinking level: diff --git a/src/agents/clawdbot-tools.ts b/src/agents/clawdbot-tools.ts index 28e71bf19..e9efa14a4 100644 --- a/src/agents/clawdbot-tools.ts +++ b/src/agents/clawdbot-tools.ts @@ -80,6 +80,7 @@ export function createClawdbotTools(options?: { }), createMessageTool({ agentAccountId: options?.agentAccountId, + agentSessionKey: options?.agentSessionKey, config: options?.config, currentChannelId: options?.currentChannelId, currentThreadTs: options?.currentThreadTs, diff --git a/src/agents/tools/message-tool.test.ts b/src/agents/tools/message-tool.test.ts new file mode 100644 index 000000000..92a343818 --- /dev/null +++ b/src/agents/tools/message-tool.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { MessageActionRunResult } from "../../infra/outbound/message-action-runner.js"; +import { createMessageTool } from "./message-tool.js"; + +const mocks = vi.hoisted(() => ({ + runMessageAction: vi.fn(), + appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })), +})); + +vi.mock("../../infra/outbound/message-action-runner.js", () => ({ + runMessageAction: mocks.runMessageAction, +})); + +vi.mock("../../config/sessions.js", async () => { + const actual = await vi.importActual( + "../../config/sessions.js", + ); + return { + ...actual, + appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript, + }; +}); + +describe("message tool mirroring", () => { + it("mirrors media filename for plugin-handled sends", async () => { + mocks.appendAssistantMessageToSessionTranscript.mockClear(); + mocks.runMessageAction.mockResolvedValue({ + kind: "send", + action: "send", + channel: "telegram", + handledBy: "plugin", + payload: {}, + dryRun: false, + } satisfies MessageActionRunResult); + + const tool = createMessageTool({ + agentSessionKey: "agent:main:main", + config: {} as never, + }); + + await tool.execute("1", { + action: "send", + to: "telegram:123", + message: "", + media: "https://example.com/files/report.pdf?sig=1", + }); + + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith( + expect.objectContaining({ text: "report.pdf" }), + ); + }); + + it("does not mirror on dry-run", async () => { + mocks.appendAssistantMessageToSessionTranscript.mockClear(); + mocks.runMessageAction.mockResolvedValue({ + kind: "send", + action: "send", + channel: "telegram", + handledBy: "plugin", + payload: {}, + dryRun: true, + } satisfies MessageActionRunResult); + + const tool = createMessageTool({ + agentSessionKey: "agent:main:main", + config: {} as never, + }); + + await tool.execute("1", { + action: "send", + to: "telegram:123", + message: "hi", + }); + + expect(mocks.appendAssistantMessageToSessionTranscript).not.toHaveBeenCalled(); + }); +}); diff --git a/src/agents/tools/message-tool.ts b/src/agents/tools/message-tool.ts index 95ad0311f..d5d72f1e5 100644 --- a/src/agents/tools/message-tool.ts +++ b/src/agents/tools/message-tool.ts @@ -9,8 +9,13 @@ import { } from "../../channels/plugins/types.js"; import type { ClawdbotConfig } from "../../config/config.js"; import { loadConfig } from "../../config/config.js"; +import { + appendAssistantMessageToSessionTranscript, + resolveMirroredTranscriptText, +} from "../../config/sessions.js"; import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "../../gateway/protocol/client-info.js"; import { runMessageAction } from "../../infra/outbound/message-action-runner.js"; +import { resolveSessionAgentId } from "../agent-scope.js"; import { normalizeAccountId } from "../../routing/session-key.js"; import { stringEnum } from "../schema/typebox.js"; import type { AnyAgentTool } from "./common.js"; @@ -119,6 +124,7 @@ const MessageToolSchema = buildMessageToolSchemaFromActions(AllMessageActions, { type MessageToolOptions = { agentAccountId?: string; + agentSessionKey?: string; config?: ClawdbotConfig; currentChannelId?: string; currentThreadTs?: string; @@ -187,8 +193,36 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool { defaultAccountId: accountId ?? undefined, gateway, toolContext, + sessionKey: options?.agentSessionKey, + agentId: options?.agentSessionKey + ? resolveSessionAgentId({ sessionKey: options.agentSessionKey, config: cfg }) + : undefined, }); + if ( + action === "send" && + options?.agentSessionKey && + !result.dryRun && + result.handledBy === "plugin" + ) { + const mediaUrl = typeof params.media === "string" ? params.media : undefined; + const mirrorText = resolveMirroredTranscriptText({ + text: typeof params.message === "string" ? params.message : undefined, + mediaUrls: mediaUrl ? [mediaUrl] : undefined, + }); + if (mirrorText) { + const agentId = resolveSessionAgentId({ + sessionKey: options.agentSessionKey, + config: cfg, + }); + await appendAssistantMessageToSessionTranscript({ + agentId, + sessionKey: options.agentSessionKey, + text: mirrorText, + }); + } + } + if (result.toolResult) return result.toolResult; return jsonResult(result.payload); }, diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index 7466ff860..0a918cdda 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -17,6 +17,7 @@ const mocks = vi.hoisted(() => ({ sendMessageSlack: vi.fn(async () => ({ messageId: "m1", channelId: "c1" })), sendMessageTelegram: vi.fn(async () => ({ messageId: "m1", chatId: "c1" })), sendMessageWhatsApp: vi.fn(async () => ({ messageId: "m1", toJid: "jid" })), + deliverOutboundPayloads: vi.fn(), })); vi.mock("../../discord/send.js", () => ({ @@ -37,12 +38,25 @@ vi.mock("../../telegram/send.js", () => ({ vi.mock("../../web/outbound.js", () => ({ sendMessageWhatsApp: mocks.sendMessageWhatsApp, })); +vi.mock("../../infra/outbound/deliver.js", async () => { + const actual = await vi.importActual( + "../../infra/outbound/deliver.js", + ); + return { + ...actual, + deliverOutboundPayloads: mocks.deliverOutboundPayloads, + }; +}); +const actualDeliver = await vi.importActual( + "../../infra/outbound/deliver.js", +); const { routeReply } = await import("./route-reply.js"); describe("routeReply", () => { beforeEach(() => { setActivePluginRegistry(emptyRegistry); + mocks.deliverOutboundPayloads.mockImplementation(actualDeliver.deliverOutboundPayloads); }); afterEach(() => { @@ -261,6 +275,25 @@ describe("routeReply", () => { }), ); }); + + it("passes mirror data when sessionKey is set", async () => { + mocks.deliverOutboundPayloads.mockResolvedValue([]); + await routeReply({ + payload: { text: "hi" }, + channel: "slack", + to: "channel:C123", + sessionKey: "agent:main:main", + cfg: {} as never, + }); + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + mirror: expect.objectContaining({ + sessionKey: "agent:main:main", + text: "hi", + }), + }), + ); + }); }); const createRegistry = (channels: PluginRegistry["channels"]): PluginRegistry => ({ diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index fd774abb6..357a4ba49 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -113,7 +113,16 @@ export async function routeReply(params: RouteReplyParams): Promise", "Prefix for summary system event", "Cron") + .option("--post-prefix ", "Prefix for main-session post", "Cron") + .option( + "--post-mode ", + "What to post back to main for isolated jobs (summary|full)", + "summary", + ) + .option("--post-max-chars ", "Max chars when --post-mode=full (default 8000)", "8000") .option("--json", "Output JSON", false) .action(async (opts: GatewayRpcOpts & Record) => { try { @@ -174,6 +180,14 @@ export function registerCronAddCommand(cron: Command) { typeof opts.postPrefix === "string" && opts.postPrefix.trim() ? opts.postPrefix.trim() : "Cron", + postToMainMode: + opts.postMode === "full" || opts.postMode === "summary" + ? opts.postMode + : undefined, + postToMainMaxChars: + typeof opts.postMaxChars === "string" && /^\d+$/.test(opts.postMaxChars) + ? Number.parseInt(opts.postMaxChars, 10) + : undefined, } : undefined; diff --git a/src/config/sessions.ts b/src/config/sessions.ts index d5e80bbde..4113fc206 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -4,3 +4,4 @@ export * from "./sessions/paths.js"; export * from "./sessions/session-key.js"; export * from "./sessions/store.js"; export * from "./sessions/types.js"; +export * from "./sessions/transcript.js"; diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts new file mode 100644 index 000000000..540ebd047 --- /dev/null +++ b/src/config/sessions/transcript.test.ts @@ -0,0 +1,114 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + appendAssistantMessageToSessionTranscript, + resolveMirroredTranscriptText, +} from "./transcript.js"; + +describe("resolveMirroredTranscriptText", () => { + it("prefers media filenames over text", () => { + const result = resolveMirroredTranscriptText({ + text: "caption here", + mediaUrls: ["https://example.com/files/report.pdf?sig=123"], + }); + expect(result).toBe("report.pdf"); + }); + + it("returns trimmed text when no media", () => { + const result = resolveMirroredTranscriptText({ text: " hello " }); + expect(result).toBe("hello"); + }); +}); + +describe("appendAssistantMessageToSessionTranscript", () => { + let tempDir: string; + let storePath: string; + let sessionsDir: string; + + beforeEach(() => { + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "transcript-test-")); + sessionsDir = path.join(tempDir, "agents", "main", "sessions"); + fs.mkdirSync(sessionsDir, { recursive: true }); + storePath = path.join(sessionsDir, "sessions.json"); + }); + + afterEach(() => { + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it("returns error for missing sessionKey", async () => { + const result = await appendAssistantMessageToSessionTranscript({ + sessionKey: "", + text: "test", + storePath, + }); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toBe("missing sessionKey"); + } + }); + + it("returns error for empty text", async () => { + const result = await appendAssistantMessageToSessionTranscript({ + sessionKey: "test-session", + text: " ", + storePath, + }); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toBe("empty text"); + } + }); + + it("returns error for unknown sessionKey", async () => { + fs.writeFileSync(storePath, JSON.stringify({}), "utf-8"); + const result = await appendAssistantMessageToSessionTranscript({ + sessionKey: "nonexistent", + text: "test message", + storePath, + }); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toContain("unknown sessionKey"); + } + }); + + it("creates transcript file and appends message for valid session", async () => { + const sessionId = "test-session-id"; + const sessionKey = "test-session"; + const store = { + [sessionKey]: { + sessionId, + chatType: "direct", + channel: "discord", + }, + }; + fs.writeFileSync(storePath, JSON.stringify(store), "utf-8"); + + const result = await appendAssistantMessageToSessionTranscript({ + sessionKey, + text: "Hello from delivery mirror!", + storePath, + }); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(fs.existsSync(result.sessionFile)).toBe(true); + + const lines = fs.readFileSync(result.sessionFile, "utf-8").trim().split("\n"); + expect(lines.length).toBe(2); // header + message + + const header = JSON.parse(lines[0]); + expect(header.type).toBe("session"); + expect(header.id).toBe(sessionId); + + const messageLine = JSON.parse(lines[1]); + expect(messageLine.type).toBe("message"); + expect(messageLine.message.role).toBe("assistant"); + expect(messageLine.message.content[0].type).toBe("text"); + expect(messageLine.message.content[0].text).toBe("Hello from delivery mirror!"); + } + }); +}); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts new file mode 100644 index 000000000..ace198224 --- /dev/null +++ b/src/config/sessions/transcript.ts @@ -0,0 +1,131 @@ +import fs from "node:fs"; +import path from "node:path"; + +import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; + +import type { SessionEntry } from "./types.js"; +import { loadSessionStore, updateSessionStore } from "./store.js"; +import { resolveDefaultSessionStorePath, resolveSessionTranscriptPath } from "./paths.js"; + +function stripQuery(value: string): string { + const noHash = value.split("#")[0] ?? value; + return noHash.split("?")[0] ?? noHash; +} + +function extractFileNameFromMediaUrl(value: string): string | null { + const trimmed = value.trim(); + if (!trimmed) return null; + const cleaned = stripQuery(trimmed); + try { + const parsed = new URL(cleaned); + const base = path.basename(parsed.pathname); + if (!base) return null; + try { + return decodeURIComponent(base); + } catch { + return base; + } + } catch { + const base = path.basename(cleaned); + if (!base || base === "/" || base === ".") return null; + return base; + } +} + +export function resolveMirroredTranscriptText(params: { + text?: string; + mediaUrls?: string[]; +}): string | null { + const mediaUrls = params.mediaUrls?.filter((url) => url && url.trim()) ?? []; + if (mediaUrls.length > 0) { + const names = mediaUrls + .map((url) => extractFileNameFromMediaUrl(url)) + .filter((name): name is string => Boolean(name && name.trim())); + if (names.length > 0) return names.join(", "); + return "media"; + } + + const text = params.text ?? ""; + const trimmed = text.trim(); + return trimmed ? trimmed : null; +} + +async function ensureSessionHeader(params: { + sessionFile: string; + sessionId: string; +}): Promise { + if (fs.existsSync(params.sessionFile)) return; + await fs.promises.mkdir(path.dirname(params.sessionFile), { recursive: true }); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: params.sessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }; + await fs.promises.writeFile(params.sessionFile, `${JSON.stringify(header)}\n`, "utf-8"); +} + +export async function appendAssistantMessageToSessionTranscript(params: { + agentId?: string; + sessionKey: string; + text?: string; + mediaUrls?: string[]; + /** Optional override for store path (mostly for tests). */ + storePath?: string; +}): Promise<{ ok: true; sessionFile: string } | { ok: false; reason: string }> { + const sessionKey = params.sessionKey.trim(); + if (!sessionKey) return { ok: false, reason: "missing sessionKey" }; + + const mirrorText = resolveMirroredTranscriptText({ + text: params.text, + mediaUrls: params.mediaUrls, + }); + if (!mirrorText) return { ok: false, reason: "empty text" }; + + const storePath = params.storePath ?? resolveDefaultSessionStorePath(params.agentId); + const store = loadSessionStore(storePath, { skipCache: true }); + const entry = store[sessionKey] as SessionEntry | undefined; + if (!entry?.sessionId) return { ok: false, reason: `unknown sessionKey: ${sessionKey}` }; + + const sessionFile = + entry.sessionFile?.trim() || resolveSessionTranscriptPath(entry.sessionId, params.agentId); + + await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId }); + + const sessionManager = SessionManager.open(sessionFile); + sessionManager.appendMessage({ + role: "assistant", + content: [{ type: "text", text: mirrorText }], + api: "openai-responses", + provider: "clawdbot", + model: "delivery-mirror", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + }, + stopReason: "stop", + timestamp: Date.now(), + }); + + if (!entry.sessionFile || entry.sessionFile !== sessionFile) { + await updateSessionStore(storePath, (current) => { + current[sessionKey] = { + ...entry, + sessionFile, + }; + }); + } + + return { ok: true, sessionFile }; +} diff --git a/src/cron/isolated-agent/helpers.ts b/src/cron/isolated-agent/helpers.ts index 97da8dca1..cd257618d 100644 --- a/src/cron/isolated-agent/helpers.ts +++ b/src/cron/isolated-agent/helpers.ts @@ -25,6 +25,14 @@ export function pickSummaryFromPayloads(payloads: Array<{ text?: string | undefi return undefined; } +export function pickLastNonEmptyTextFromPayloads(payloads: Array<{ text?: string | undefined }>) { + for (let i = payloads.length - 1; i >= 0; i--) { + const clean = (payloads[i]?.text ?? "").trim(); + if (clean) return clean; + } + return undefined; +} + /** * Check if all payloads are just heartbeat ack responses (HEARTBEAT_OK). * Returns true if delivery should be skipped because there's no real content. diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 99db389a2..88e8b2369 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -41,6 +41,7 @@ import type { CronJob } from "../types.js"; import { resolveDeliveryTarget } from "./delivery-target.js"; import { isHeartbeatOnlyResponse, + pickLastNonEmptyTextFromPayloads, pickSummaryFromOutput, pickSummaryFromPayloads, resolveHeartbeatAckMaxChars, @@ -50,6 +51,8 @@ import { resolveCronSession } from "./session.js"; export type RunCronAgentTurnResult = { status: "ok" | "error" | "skipped"; summary?: string; + /** Last non-empty agent text output (not truncated). */ + outputText?: string; error?: string; }; @@ -333,6 +336,7 @@ export async function runCronIsolatedAgentTurn(params: { } const firstText = payloads[0]?.text ?? ""; const summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText); + const outputText = pickLastNonEmptyTextFromPayloads(payloads); // Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content). const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg); @@ -346,12 +350,14 @@ export async function runCronIsolatedAgentTurn(params: { return { status: "error", summary, + outputText, error: reason, }; } return { status: "skipped", summary: `Delivery skipped (${reason}).`, + outputText, }; } try { @@ -366,11 +372,11 @@ export async function runCronIsolatedAgentTurn(params: { }); } catch (err) { if (!bestEffortDeliver) { - return { status: "error", summary, error: String(err) }; + return { status: "error", summary, outputText, error: String(err) }; } - return { status: "ok", summary }; + return { status: "ok", summary, outputText }; } } - return { status: "ok", summary }; + return { status: "ok", summary, outputText }; } diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 0f2980354..ab094c20b 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -30,6 +30,8 @@ export type CronServiceDeps = { runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{ status: "ok" | "error" | "skipped"; summary?: string; + /** Last non-empty agent text output (not truncated). */ + outputText?: string; error?: string; }>; onEvent?: (evt: CronEvent) => void; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 5417d2581..370f5d116 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -66,7 +66,12 @@ export async function executeJob( let deleted = false; - const finish = async (status: "ok" | "error" | "skipped", err?: string, summary?: string) => { + const finish = async ( + status: "ok" | "error" | "skipped", + err?: string, + summary?: string, + outputText?: string, + ) => { const endedAt = state.deps.nowMs(); job.state.runningAtMs = undefined; job.state.lastRunAtMs = startedAt; @@ -108,7 +113,19 @@ export async function executeJob( if (job.sessionTarget === "isolated") { const prefix = job.isolation?.postToMainPrefix?.trim() || "Cron"; - const body = (summary ?? err ?? status).trim(); + const mode = job.isolation?.postToMainMode ?? "summary"; + + let body = (summary ?? err ?? status).trim(); + if (mode === "full") { + // Prefer full agent output if available; fall back to summary. + const maxCharsRaw = job.isolation?.postToMainMaxChars; + const maxChars = Number.isFinite(maxCharsRaw) ? Math.max(0, maxCharsRaw as number) : 8000; + const fullText = (outputText ?? "").trim(); + if (fullText) { + body = fullText.length > maxChars ? `${fullText.slice(0, maxChars)}…` : fullText; + } + } + const statusPrefix = status === "ok" ? prefix : `${prefix} (${status})`; state.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`, { agentId: job.agentId, @@ -182,9 +199,10 @@ export async function executeJob( job, message: job.payload.message, }); - if (res.status === "ok") await finish("ok", undefined, res.summary); - else if (res.status === "skipped") await finish("skipped", undefined, res.summary); - else await finish("error", res.error ?? "cron job failed", res.summary); + if (res.status === "ok") await finish("ok", undefined, res.summary, res.outputText); + else if (res.status === "skipped") + await finish("skipped", undefined, res.summary, res.outputText); + else await finish("error", res.error ?? "cron job failed", res.summary, res.outputText); } catch (err) { await finish("error", String(err)); } finally { diff --git a/src/cron/types.ts b/src/cron/types.ts index 684f901f0..88d5f9d3d 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -27,6 +27,14 @@ export type CronPayload = export type CronIsolation = { postToMainPrefix?: string; + /** + * What to post back into the main session after an isolated run. + * - summary: small status/summary line (default) + * - full: the agent's final text output (optionally truncated) + */ + postToMainMode?: "summary" | "full"; + /** Max chars when postToMainMode="full". Default: 8000. */ + postToMainMaxChars?: number; }; export type CronJobState = { diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index 2217e0419..cf0b8be7f 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -21,6 +21,8 @@ export const SendParamsSchema = Type.Object( gifPlayback: Type.Optional(Type.Boolean()), channel: Type.Optional(Type.String()), accountId: Type.Optional(Type.String()), + /** Optional session key for mirroring delivered output back into the transcript. */ + sessionKey: Type.Optional(Type.String()), idempotencyKey: NonEmptyString, }, { additionalProperties: false }, diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index 3a04e330b..b3f95cfe3 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -55,6 +55,8 @@ export const CronPayloadSchema = Type.Union([ export const CronIsolationSchema = Type.Object( { postToMainPrefix: Type.Optional(Type.String()), + postToMainMode: Type.Optional(Type.Union([Type.Literal("summary"), Type.Literal("full")])), + postToMainMaxChars: Type.Optional(Type.Integer({ minimum: 0 })), }, { additionalProperties: false }, ); diff --git a/src/gateway/server-methods/send.test.ts b/src/gateway/server-methods/send.test.ts new file mode 100644 index 000000000..001dbd19e --- /dev/null +++ b/src/gateway/server-methods/send.test.ts @@ -0,0 +1,102 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { GatewayRequestContext } from "./types.js"; +import { sendHandlers } from "./send.js"; + +const mocks = vi.hoisted(() => ({ + deliverOutboundPayloads: vi.fn(), + appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })), +})); + +vi.mock("../../config/config.js", () => ({ + loadConfig: () => ({}), +})); + +vi.mock("../../channels/plugins/index.js", () => ({ + getChannelPlugin: () => ({ outbound: {} }), + normalizeChannelId: (value: string) => value, +})); + +vi.mock("../../infra/outbound/targets.js", () => ({ + resolveOutboundTarget: () => ({ ok: true, to: "resolved" }), +})); + +vi.mock("../../infra/outbound/deliver.js", () => ({ + deliverOutboundPayloads: mocks.deliverOutboundPayloads, +})); + +vi.mock("../../config/sessions.js", async () => { + const actual = await vi.importActual( + "../../config/sessions.js", + ); + return { + ...actual, + appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript, + }; +}); + +const makeContext = (): GatewayRequestContext => + ({ + dedupe: new Map(), + }) as unknown as GatewayRequestContext; + +describe("gateway send mirroring", () => { + it("does not mirror when delivery returns no results", async () => { + mocks.deliverOutboundPayloads.mockResolvedValue([]); + + const respond = vi.fn(); + await sendHandlers.send({ + params: { + to: "channel:C1", + message: "hi", + channel: "slack", + idempotencyKey: "idem-1", + sessionKey: "agent:main:main", + }, + respond, + context: makeContext(), + req: { type: "req", id: "1", method: "send" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + mirror: expect.objectContaining({ + sessionKey: "agent:main:main", + }), + }), + ); + }); + + it("mirrors media filenames when delivery succeeds", async () => { + mocks.deliverOutboundPayloads.mockResolvedValue([{ messageId: "m1", channel: "slack" }]); + + const respond = vi.fn(); + await sendHandlers.send({ + params: { + to: "channel:C1", + message: "caption", + mediaUrl: "https://example.com/files/report.pdf?sig=1", + channel: "slack", + idempotencyKey: "idem-2", + sessionKey: "agent:main:main", + }, + respond, + context: makeContext(), + req: { type: "req", id: "1", method: "send" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + mirror: expect.objectContaining({ + sessionKey: "agent:main:main", + text: "caption", + mediaUrls: ["https://example.com/files/report.pdf?sig=1"], + }), + }), + ); + }); +}); diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index d8a20292c..477c03b19 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -3,6 +3,7 @@ import type { ChannelId } from "../../channels/plugins/types.js"; import { DEFAULT_CHAT_CHANNEL } from "../../channels/registry.js"; import { loadConfig } from "../../config/config.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; +import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import type { OutboundChannel } from "../../infra/outbound/targets.js"; import { resolveOutboundTarget } from "../../infra/outbound/targets.js"; import { normalizePollInput } from "../../polls.js"; @@ -37,6 +38,7 @@ export const sendHandlers: GatewayRequestHandlers = { gifPlayback?: boolean; channel?: string; accountId?: string; + sessionKey?: string; idempotencyKey: string; }; const idem = request.idempotencyKey; @@ -94,7 +96,20 @@ export const sendHandlers: GatewayRequestHandlers = { accountId, payloads: [{ text: message, mediaUrl: request.mediaUrl }], gifPlayback: request.gifPlayback, + mirror: + typeof request.sessionKey === "string" && request.sessionKey.trim() + ? { + sessionKey: request.sessionKey.trim(), + agentId: resolveSessionAgentId({ + sessionKey: request.sessionKey.trim(), + config: cfg, + }), + text: message, + mediaUrls: request.mediaUrl ? [request.mediaUrl] : undefined, + } + : undefined, }); + const result = results.at(-1); if (!result) { throw new Error("No delivery result"); diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 9cf66a924..11fce5505 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -2,7 +2,22 @@ import { describe, expect, it, vi } from "vitest"; import type { ClawdbotConfig } from "../../config/config.js"; import { markdownToSignalTextChunks } from "../../signal/format.js"; -import { deliverOutboundPayloads, normalizeOutboundPayloads } from "./deliver.js"; + +const mocks = vi.hoisted(() => ({ + appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })), +})); + +vi.mock("../../config/sessions.js", async () => { + const actual = await vi.importActual( + "../../config/sessions.js", + ); + return { + ...actual, + appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript, + }; +}); + +const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js"); describe("deliverOutboundPayloads", () => { it("chunks telegram markdown and passes through accountId", async () => { @@ -193,4 +208,29 @@ describe("deliverOutboundPayloads", () => { expect(onError).toHaveBeenCalledTimes(1); expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]); }); + + it("mirrors delivered output when mirror options are provided", async () => { + const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" }); + const cfg: ClawdbotConfig = { + channels: { telegram: { botToken: "tok-1", textChunkLimit: 2 } }, + }; + mocks.appendAssistantMessageToSessionTranscript.mockClear(); + + await deliverOutboundPayloads({ + cfg, + channel: "telegram", + to: "123", + payloads: [{ text: "caption", mediaUrl: "https://example.com/files/report.pdf?sig=1" }], + deps: { sendTelegram }, + mirror: { + sessionKey: "agent:main:main", + text: "caption", + mediaUrls: ["https://example.com/files/report.pdf?sig=1"], + }, + }); + + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith( + expect.objectContaining({ text: "report.pdf" }), + ); + }); }); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 51f87d7c3..21fffe807 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -11,6 +11,10 @@ import { sendMessageSignal } from "../../signal/send.js"; import type { sendMessageSlack } from "../../slack/send.js"; import type { sendMessageTelegram } from "../../telegram/send.js"; import type { sendMessageWhatsApp } from "../../web/outbound.js"; +import { + appendAssistantMessageToSessionTranscript, + resolveMirroredTranscriptText, +} from "../../config/sessions.js"; import type { NormalizedOutboundPayload } from "./payloads.js"; import { normalizeOutboundPayloads } from "./payloads.js"; import type { OutboundChannel } from "./targets.js"; @@ -159,6 +163,12 @@ export async function deliverOutboundPayloads(params: { bestEffort?: boolean; onError?: (err: unknown, payload: NormalizedOutboundPayload) => void; onPayload?: (payload: NormalizedOutboundPayload) => void; + mirror?: { + sessionKey: string; + agentId?: string; + text?: string; + mediaUrls?: string[]; + }; }): Promise { const { cfg, channel, to, payloads } = params; const accountId = params.accountId; @@ -279,5 +289,18 @@ export async function deliverOutboundPayloads(params: { params.onError?.(err, payload); } } + if (params.mirror && results.length > 0) { + const mirrorText = resolveMirroredTranscriptText({ + text: params.mirror.text, + mediaUrls: params.mirror.mediaUrls, + }); + if (mirrorText) { + await appendAssistantMessageToSessionTranscript({ + agentId: params.mirror.agentId, + sessionKey: params.mirror.sessionKey, + text: mirrorText, + }); + } + } return results; } diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index c2b4f0eb0..7c7ee23fb 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -36,6 +36,8 @@ export type RunMessageActionParams = { toolContext?: ChannelThreadingToolContext; gateway?: MessageActionRunnerGateway; deps?: OutboundSendDeps; + sessionKey?: string; + agentId?: string; dryRun?: boolean; }; @@ -265,6 +267,13 @@ export async function runMessageAction( bestEffort: bestEffort ?? undefined, deps: input.deps, gateway, + mirror: + input.sessionKey && !dryRun + ? { + sessionKey: input.sessionKey, + agentId: input.agentId, + } + : undefined, }); return { diff --git a/src/infra/outbound/message.ts b/src/infra/outbound/message.ts index ce1aa210b..34a400255 100644 --- a/src/infra/outbound/message.ts +++ b/src/infra/outbound/message.ts @@ -42,6 +42,10 @@ type MessageSendParams = { cfg?: ClawdbotConfig; gateway?: MessageGatewayOptions; idempotencyKey?: string; + mirror?: { + sessionKey: string; + agentId?: string; + }; }; export type MessageSendResult = { @@ -142,6 +146,13 @@ export async function sendMessage(params: MessageSendParams): Promise