From 34d252760661a4e6cb937fb9c9ed53bd650ac6d6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 9 Dec 2025 00:57:19 +0100 Subject: [PATCH] chore: tidy agent event streaming types --- src/auto-reply/command-reply.ts | 33 +++++++++++++++++++++++---------- src/cli/program.ts | 2 +- src/commands/agent.ts | 4 ++-- src/infra/agent-events.ts | 1 - src/infra/control-channel.ts | 2 +- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 7b0043b8f..4fa7ed283 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -6,6 +6,7 @@ import { piSpec } from "../agents/pi.js"; import type { AgentMeta, AgentToolResult } from "../agents/types.js"; import type { WarelayConfig } from "../config/config.js"; import { isVerbose, logVerbose } from "../globals.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; import { logError } from "../logger.js"; import { getChildLogger } from "../logging.js"; import { splitMediaFromOutput } from "../media/parse.js"; @@ -13,7 +14,6 @@ import { enqueueCommand } from "../process/command-queue.js"; import type { runCommandWithTimeout } from "../process/exec.js"; import { runPiRpc } from "../process/tau-rpc.js"; import { applyTemplate, type TemplateContext } from "./templating.js"; -import { emitAgentEvent } from "../infra/agent-events.js"; import { formatToolAggregate, shortenMeta, @@ -161,7 +161,10 @@ type CommandReplyParams = { verboseLevel?: "off" | "on"; onPartialReply?: (payload: ReplyPayload) => Promise | void; runId?: string; - onAgentEvent?: (evt: { stream: string; data: Record }) => void; + onAgentEvent?: (evt: { + stream: string; + data: Record; + }) => void; }; export type CommandReplyMeta = { @@ -555,7 +558,7 @@ export async function runCommandReply( streamedAny = true; }; - const run = async () => { + const run = async () => { const runId = params.runId ?? crypto.randomUUID(); const rpcPromptIndex = promptIndex >= 0 ? promptIndex : finalArgv.length - 1; @@ -572,15 +575,29 @@ export async function runCommandReply( } return copy; })(); + type RpcStreamEvent = { + type?: string; + message?: { + role?: string; + content?: unknown; + toolCallId?: string; + tool_call_id?: string; + } & Record; + toolName?: string; + toolCallId?: string; + args?: unknown; + [key: string]: unknown; + }; + const rpcResult = await runPiRpc({ argv: rpcArgvForRun, cwd: reply.cwd, prompt: body, timeoutMs, onEvent: (line: string) => { - let ev: any; + let ev: RpcStreamEvent; try { - ev = JSON.parse(line); + ev = JSON.parse(line) as RpcStreamEvent; } catch { return; } @@ -639,11 +656,7 @@ export async function runCommandReply( }, }); - if ( - pendingToolName && - toolName && - toolName !== pendingToolName - ) { + if (pendingToolName && toolName && toolName !== pendingToolName) { flushPendingTool(); } if (!pendingToolName) pendingToolName = toolName; diff --git a/src/cli/program.ts b/src/cli/program.ts index a58b29a87..432230c4e 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -1,5 +1,5 @@ -import chalk from "chalk"; import { randomUUID } from "node:crypto"; +import chalk from "chalk"; import { Command } from "commander"; import { agentCommand } from "../commands/agent.js"; import { healthCommand } from "../commands/health.js"; diff --git a/src/commands/agent.ts b/src/commands/agent.ts index df2fb72c0..bc064306b 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -21,11 +21,11 @@ import { type SessionEntry, saveSessionStore, } from "../config/sessions.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; import { runCommandWithTimeout } from "../process/exec.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; import { sendViaIpc } from "../web/ipc.js"; -import { emitAgentEvent } from "../infra/agent-events.js"; type AgentCommandOpts = { message: string; @@ -306,7 +306,7 @@ export async function agentCommand( }, }); - let result; + let result: Awaited>; try { result = await runCommandReply({ reply: { ...replyCfg, mode: "command" }, diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 3a0f447d1..0acd7c14a 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -28,4 +28,3 @@ export function onAgentEvent(listener: (evt: AgentEventPayload) => void) { listeners.add(listener); return () => listeners.delete(listener); } - diff --git a/src/infra/control-channel.ts b/src/infra/control-channel.ts index 658add6dc..9174be1d1 100644 --- a/src/infra/control-channel.ts +++ b/src/infra/control-channel.ts @@ -3,13 +3,13 @@ import net from "node:net"; import { getHealthSnapshot, type HealthSummary } from "../commands/health.js"; import { getStatusSummary, type StatusSummary } from "../commands/status.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import { type AgentEventPayload, onAgentEvent } from "./agent-events.js"; import { emitHeartbeatEvent, getLastHeartbeatEvent, type HeartbeatEventPayload, onHeartbeatEvent, } from "./heartbeat-events.js"; -import { onAgentEvent, type AgentEventPayload } from "./agent-events.js"; type ControlRequest = { type: "request";