chore: tidy agent event streaming types

This commit is contained in:
Peter Steinberger
2025-12-09 00:57:19 +01:00
parent 8e8e695db9
commit 34d2527606
5 changed files with 27 additions and 15 deletions

View File

@@ -6,6 +6,7 @@ import { piSpec } from "../agents/pi.js";
import type { AgentMeta, AgentToolResult } from "../agents/types.js";
import type { WarelayConfig } from "../config/config.js";
import { isVerbose, logVerbose } from "../globals.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { logError } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { splitMediaFromOutput } from "../media/parse.js";
@@ -13,7 +14,6 @@ import { enqueueCommand } from "../process/command-queue.js";
import type { runCommandWithTimeout } from "../process/exec.js";
import { runPiRpc } from "../process/tau-rpc.js";
import { applyTemplate, type TemplateContext } from "./templating.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import {
formatToolAggregate,
shortenMeta,
@@ -161,7 +161,10 @@ type CommandReplyParams = {
verboseLevel?: "off" | "on";
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
runId?: string;
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
onAgentEvent?: (evt: {
stream: string;
data: Record<string, unknown>;
}) => void;
};
export type CommandReplyMeta = {
@@ -555,7 +558,7 @@ export async function runCommandReply(
streamedAny = true;
};
const run = async () => {
const run = async () => {
const runId = params.runId ?? crypto.randomUUID();
const rpcPromptIndex =
promptIndex >= 0 ? promptIndex : finalArgv.length - 1;
@@ -572,15 +575,29 @@ export async function runCommandReply(
}
return copy;
})();
type RpcStreamEvent = {
type?: string;
message?: {
role?: string;
content?: unknown;
toolCallId?: string;
tool_call_id?: string;
} & Record<string, unknown>;
toolName?: string;
toolCallId?: string;
args?: unknown;
[key: string]: unknown;
};
const rpcResult = await runPiRpc({
argv: rpcArgvForRun,
cwd: reply.cwd,
prompt: body,
timeoutMs,
onEvent: (line: string) => {
let ev: any;
let ev: RpcStreamEvent;
try {
ev = JSON.parse(line);
ev = JSON.parse(line) as RpcStreamEvent;
} catch {
return;
}
@@ -639,11 +656,7 @@ export async function runCommandReply(
},
});
if (
pendingToolName &&
toolName &&
toolName !== pendingToolName
) {
if (pendingToolName && toolName && toolName !== pendingToolName) {
flushPendingTool();
}
if (!pendingToolName) pendingToolName = toolName;

View File

@@ -1,5 +1,5 @@
import chalk from "chalk";
import { randomUUID } from "node:crypto";
import chalk from "chalk";
import { Command } from "commander";
import { agentCommand } from "../commands/agent.js";
import { healthCommand } from "../commands/health.js";

View File

@@ -21,11 +21,11 @@ import {
type SessionEntry,
saveSessionStore,
} from "../config/sessions.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { runCommandWithTimeout } from "../process/exec.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { sendViaIpc } from "../web/ipc.js";
import { emitAgentEvent } from "../infra/agent-events.js";
type AgentCommandOpts = {
message: string;
@@ -306,7 +306,7 @@ export async function agentCommand(
},
});
let result;
let result: Awaited<ReturnType<typeof runCommandReply>>;
try {
result = await runCommandReply({
reply: { ...replyCfg, mode: "command" },

View File

@@ -28,4 +28,3 @@ export function onAgentEvent(listener: (evt: AgentEventPayload) => void) {
listeners.add(listener);
return () => listeners.delete(listener);
}

View File

@@ -3,13 +3,13 @@ import net from "node:net";
import { getHealthSnapshot, type HealthSummary } from "../commands/health.js";
import { getStatusSummary, type StatusSummary } from "../commands/status.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { type AgentEventPayload, onAgentEvent } from "./agent-events.js";
import {
emitHeartbeatEvent,
getLastHeartbeatEvent,
type HeartbeatEventPayload,
onHeartbeatEvent,
} from "./heartbeat-events.js";
import { onAgentEvent, type AgentEventPayload } from "./agent-events.js";
type ControlRequest = {
type: "request";