diff --git a/apps/macos/Sources/Clawdis/ControlChannel.swift b/apps/macos/Sources/Clawdis/ControlChannel.swift index d8921ef7d..c7242edff 100644 --- a/apps/macos/Sources/Clawdis/ControlChannel.swift +++ b/apps/macos/Sources/Clawdis/ControlChannel.swift @@ -186,6 +186,7 @@ final class ControlChannel: ObservableObject { private var mode: Mode = .local private var localPort: UInt16 = 18789 private var pingTask: Task? + private var activeJobs: Int = 0 @Published private(set) var state: ConnectionState = .disconnected @Published private(set) var lastPingMs: Double? @@ -430,7 +431,15 @@ final class ControlChannel: ObservableObject { private func handleAgentEvent(_ event: ControlAgentEvent) { if event.stream == "job" { if let state = event.data["state"]?.value as? String { - let working = state.lowercased() == "started" || state.lowercased() == "streaming" + switch state.lowercased() { + case "started", "streaming": + self.activeJobs &+= 1 + case "done", "error": + self.activeJobs = max(0, self.activeJobs - 1) + default: + break + } + let working = self.activeJobs > 0 Task { @MainActor in AppStateStore.shared.setWorking(working) } diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index d67eae7e8..108fb039b 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -13,6 +13,7 @@ import { enqueueCommand } from "../process/command-queue.js"; import type { runCommandWithTimeout } from "../process/exec.js"; import { runPiRpc } from "../process/tau-rpc.js"; import { applyTemplate, type TemplateContext } from "./templating.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; import { formatToolAggregate, shortenMeta, @@ -159,6 +160,7 @@ type CommandReplyParams = { thinkLevel?: ThinkLevel; verboseLevel?: "off" | "on"; onPartialReply?: (payload: ReplyPayload) => Promise | void; + runId?: string; }; export type CommandReplyMeta = { @@ -552,7 +554,8 @@ export async function runCommandReply( streamedAny = true; }; - const run = async () => { + const run = async () => { + const runId = params.runId ?? crypto.randomUUID(); const rpcPromptIndex = promptIndex >= 0 ? promptIndex : finalArgv.length - 1; const body = promptArg ?? ""; @@ -573,103 +576,88 @@ export async function runCommandReply( cwd: reply.cwd, prompt: body, timeoutMs, - onEvent: onPartialReply - ? (line: string) => { - try { - const ev = JSON.parse(line) as { - type?: string; - message?: { - role?: string; - content?: unknown[]; - details?: Record; - arguments?: Record; - toolCallId?: string; - tool_call_id?: string; - toolName?: string; - name?: string; - }; - toolCallId?: string; - toolName?: string; - args?: Record; - }; - if (!enableToolStreaming) return; - // Capture metadata as soon as the tool starts (from args). - if (ev.type === "tool_execution_start") { - const toolName = ev.toolName; - const meta = inferToolMeta({ - toolName, - name: ev.toolName, - arguments: ev.args, - }); - if (ev.toolCallId) { - toolMetaById.set(ev.toolCallId, meta); - } - if (meta) { - if ( - pendingToolName && - toolName && - toolName !== pendingToolName - ) { - flushPendingTool(); - } - if (!pendingToolName) pendingToolName = toolName; - pendingMetas.push(meta); - if ( - TOOL_RESULT_FLUSH_COUNT > 0 && - pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT - ) { - flushPendingTool(); - } else { - if (pendingTimer) clearTimeout(pendingTimer); - pendingTimer = setTimeout( - flushPendingTool, - TOOL_RESULT_DEBOUNCE_MS, - ); - } - } - } - if ( - enableToolStreaming && - (ev.type === "message" || ev.type === "message_end") && - ev.message?.role === "tool_result" && - Array.isArray(ev.message.content) - ) { - const toolName = inferToolName(ev.message); - const toolCallId = - ev.message.toolCallId ?? ev.message.tool_call_id; - const meta = - inferToolMeta(ev.message) ?? - (toolCallId ? toolMetaById.get(toolCallId) : undefined); - if ( - pendingToolName && - toolName && - toolName !== pendingToolName - ) { - flushPendingTool(); - } - if (!pendingToolName) pendingToolName = toolName; - if (meta) pendingMetas.push(meta); - if ( - TOOL_RESULT_FLUSH_COUNT > 0 && - pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT - ) { - flushPendingTool(); - return; - } - if (pendingTimer) clearTimeout(pendingTimer); - pendingTimer = setTimeout( - flushPendingTool, - TOOL_RESULT_DEBOUNCE_MS, - ); - } - if (ev.type === "message_end") { - streamAssistantFinal(ev.message); - } - } catch { - // ignore malformed lines - } + onEvent: (line: string) => { + let ev: any; + try { + ev = JSON.parse(line); + } catch { + return; + } + + // Forward tool lifecycle events to the agent bus. + if (enableToolStreaming && ev.type === "tool_execution_start") { + emitAgentEvent({ + runId, + stream: "tool", + data: { + phase: "start", + name: ev.toolName, + toolCallId: ev.toolCallId, + args: ev.args, + }, + }); + } + + if ( + enableToolStreaming && + (ev.type === "message" || ev.type === "message_end") && + ev.message?.role === "tool_result" && + Array.isArray(ev.message.content) + ) { + const toolName = inferToolName(ev.message); + const toolCallId = ev.message.toolCallId ?? ev.message.tool_call_id; + const meta = + inferToolMeta(ev.message) ?? + (toolCallId ? toolMetaById.get(toolCallId) : undefined); + + emitAgentEvent({ + runId, + stream: "tool", + data: { + phase: "result", + name: toolName, + toolCallId, + meta, + }, + }); + + if ( + pendingToolName && + toolName && + toolName !== pendingToolName + ) { + flushPendingTool(); } - : undefined, + if (!pendingToolName) pendingToolName = toolName; + if (meta) pendingMetas.push(meta); + if ( + TOOL_RESULT_FLUSH_COUNT > 0 && + pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT + ) { + flushPendingTool(); + return; + } + if (pendingTimer) clearTimeout(pendingTimer); + pendingTimer = setTimeout( + flushPendingTool, + TOOL_RESULT_DEBOUNCE_MS, + ); + } + + if (ev.type === "message_end") { + streamAssistantFinal(ev.message); + } + + // Preserve existing partial reply hook when provided. + if (onPartialReply && ev.message?.role === "assistant") { + // Let the existing logic reuse the already-parsed message. + try { + streamAssistantFinal(ev.message); + } catch { + /* ignore */ + } + } + }, }); flushPendingTool(); return rpcResult; diff --git a/src/commands/agent.ts b/src/commands/agent.ts index ad534407a..df2fb72c0 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -320,6 +320,7 @@ export async function agentCommand( commandRunner: runCommandWithTimeout, thinkLevel: resolvedThinkLevel, verboseLevel: resolvedVerboseLevel, + runId: sessionId, }); emitAgentEvent({ runId: sessionId,