From 428a82e7348e058c7f80f24cc86b0f36504d23f7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 17 Dec 2025 15:51:31 +0100 Subject: [PATCH] feat(chat): Swift chat parity (abort/sessions/stream) --- .../Sources/Chat/IOSBridgeChatTransport.swift | 27 ++ .../Sources/Clawdis/WebChatSwiftUI.swift | 34 +++ .../Sources/ClawdisChatUI/ChatComposer.swift | 39 ++- .../ClawdisChatUI/ChatMessageViews.swift | 99 +++++++ .../Sources/ClawdisChatUI/ChatModels.swift | 165 ++++++++++- .../Sources/ClawdisChatUI/ChatSessions.swift | 69 +++++ .../Sources/ClawdisChatUI/ChatSheets.swift | 67 +++++ .../Sources/ClawdisChatUI/ChatTransport.swift | 18 ++ .../Sources/ClawdisChatUI/ChatView.swift | 31 ++ .../Sources/ClawdisChatUI/ChatViewModel.swift | 152 ++++++++-- src/agents/pi-embedded.ts | 59 +++- src/commands/agent.ts | 4 + src/gateway/protocol/index.ts | 5 + src/gateway/protocol/schema.ts | 11 + src/gateway/server.test.ts | 136 +++++++++ src/gateway/server.ts | 269 +++++++++++++++++- 16 files changed, 1131 insertions(+), 54 deletions(-) create mode 100644 apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatSessions.swift create mode 100644 apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatSheets.swift diff --git a/apps/ios/Sources/Chat/IOSBridgeChatTransport.swift b/apps/ios/Sources/Chat/IOSBridgeChatTransport.swift index 284dde107..a38b3fe7a 100644 --- a/apps/ios/Sources/Chat/IOSBridgeChatTransport.swift +++ b/apps/ios/Sources/Chat/IOSBridgeChatTransport.swift @@ -9,6 +9,28 @@ struct IOSBridgeChatTransport: ClawdisChatTransport, Sendable { self.bridge = bridge } + func abortRun(sessionKey: String, runId: String) async throws { + struct Params: Codable { + var sessionKey: String + var runId: String + } + let data = try JSONEncoder().encode(Params(sessionKey: sessionKey, runId: runId)) + let json = String(data: data, encoding: .utf8) + _ = try await self.bridge.request(method: "chat.abort", paramsJSON: json, timeoutSeconds: 10) + } + + func listSessions(limit: Int?) async throws -> ClawdisChatSessionsListResponse { + struct Params: Codable { + var includeGlobal: Bool + var includeUnknown: Bool + var limit: Int? + } + let data = try JSONEncoder().encode(Params(includeGlobal: true, includeUnknown: false, limit: limit)) + let json = String(data: data, encoding: .utf8) + let res = try await self.bridge.request(method: "sessions.list", paramsJSON: json, timeoutSeconds: 15) + return try JSONDecoder().decode(ClawdisChatSessionsListResponse.self, from: res) + } + func setActiveSessionKey(_ sessionKey: String) async throws { struct Subscribe: Codable { var sessionKey: String } let data = try JSONEncoder().encode(Subscribe(sessionKey: sessionKey)) @@ -79,6 +101,11 @@ struct IOSBridgeChatTransport: ClawdisChatTransport, Sendable { if let payload = try? JSONDecoder().decode(ClawdisChatEventPayload.self, from: data) { continuation.yield(.chat(payload)) } + case "agent": + guard let json = evt.payloadJSON, let data = json.data(using: .utf8) else { break } + if let payload = try? JSONDecoder().decode(ClawdisAgentEventPayload.self, from: data) { + continuation.yield(.agent(payload)) + } default: break } diff --git a/apps/macos/Sources/Clawdis/WebChatSwiftUI.swift b/apps/macos/Sources/Clawdis/WebChatSwiftUI.swift index 65b00f1ab..d9ae7876d 100644 --- a/apps/macos/Sources/Clawdis/WebChatSwiftUI.swift +++ b/apps/macos/Sources/Clawdis/WebChatSwiftUI.swift @@ -18,6 +18,31 @@ struct MacGatewayChatTransport: ClawdisChatTransport, Sendable { try await GatewayConnection.shared.chatHistory(sessionKey: sessionKey) } + func abortRun(sessionKey: String, runId: String) async throws { + _ = try await GatewayConnection.shared.request( + method: "chat.abort", + params: [ + "sessionKey": AnyCodable(sessionKey), + "runId": AnyCodable(runId), + ], + timeoutMs: 10_000) + } + + func listSessions(limit: Int?) async throws -> ClawdisChatSessionsListResponse { + var params: [String: AnyCodable] = [ + "includeGlobal": AnyCodable(true), + "includeUnknown": AnyCodable(false), + ] + if let limit { + params["limit"] = AnyCodable(limit) + } + let data = try await GatewayConnection.shared.request( + method: "sessions.list", + params: params, + timeoutMs: 15_000) + return try JSONDecoder().decode(ClawdisChatSessionsListResponse.self, from: data) + } + func sendMessage( sessionKey: String, message: String, @@ -88,6 +113,15 @@ struct MacGatewayChatTransport: ClawdisChatTransport, Sendable { return nil } return .chat(chat) + case "agent": + guard let payload = evt.payload else { return nil } + guard let agent = try? JSONDecoder().decode( + ClawdisAgentEventPayload.self, + from: JSONEncoder().encode(payload)) + else { + return nil + } + return .agent(agent) default: return nil } diff --git a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatComposer.swift b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatComposer.swift index eb467dd04..3039759b7 100644 --- a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatComposer.swift +++ b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatComposer.swift @@ -169,19 +169,38 @@ struct ClawdisChatComposer: View { } private var sendButton: some View { - Button { - self.viewModel.send() - } label: { - if self.viewModel.isSending { - ProgressView().controlSize(.small) + Group { + if self.viewModel.pendingRunCount > 0 { + Button { + self.viewModel.abort() + } label: { + if self.viewModel.isAborting { + ProgressView().controlSize(.small) + } else { + Image(systemName: "stop.fill") + .font(.system(size: 13, weight: .semibold)) + } + } + .buttonStyle(.bordered) + .tint(.red) + .controlSize(.small) + .disabled(self.viewModel.isAborting) } else { - Image(systemName: "arrow.up") - .font(.system(size: 13, weight: .semibold)) + Button { + self.viewModel.send() + } label: { + if self.viewModel.isSending { + ProgressView().controlSize(.small) + } else { + Image(systemName: "arrow.up") + .font(.system(size: 13, weight: .semibold)) + } + } + .buttonStyle(.borderedProminent) + .controlSize(.small) + .disabled(!self.viewModel.canSend) } } - .buttonStyle(.borderedProminent) - .controlSize(.small) - .disabled(!self.viewModel.canSend) } #if os(macOS) diff --git a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatMessageViews.swift b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatMessageViews.swift index ff2a38226..818c02996 100644 --- a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatMessageViews.swift +++ b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatMessageViews.swift @@ -158,6 +158,63 @@ struct ChatTypingIndicatorBubble: View { } } +@MainActor +struct ChatStreamingAssistantBubble: View { + let text: String + + var body: some View { + VStack(alignment: .leading, spacing: 10) { + Label("Assistant (streaming)", systemImage: "sparkles") + .font(.caption) + .foregroundStyle(.secondary) + + ChatMarkdownBody(text: self.text) + } + .padding(12) + .background( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .fill(ClawdisChatTheme.subtleCard)) + .overlay( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .strokeBorder(Color.white.opacity(0.10), lineWidth: 1)) + .frame(maxWidth: ChatUIConstants.bubbleMaxWidth, alignment: .leading) + } +} + +@MainActor +struct ChatPendingToolsBubble: View { + let toolCalls: [ClawdisChatPendingToolCall] + + var body: some View { + VStack(alignment: .leading, spacing: 8) { + Label("Running tools…", systemImage: "hammer") + .font(.caption) + .foregroundStyle(.secondary) + + ForEach(self.toolCalls) { call in + HStack(alignment: .firstTextBaseline, spacing: 8) { + Text(call.name) + .font(.footnote.monospaced()) + .lineLimit(1) + Spacer(minLength: 0) + ProgressView().controlSize(.mini) + } + .padding(10) + .background(Color.white.opacity(0.06)) + .clipShape(RoundedRectangle(cornerRadius: 12, style: .continuous)) + } + } + .padding(12) + .background( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .fill(ClawdisChatTheme.subtleCard)) + .overlay( + RoundedRectangle(cornerRadius: 16, style: .continuous) + .strokeBorder(Color.white.opacity(0.10), lineWidth: 1)) + .frame(maxWidth: ChatUIConstants.bubbleMaxWidth, alignment: .leading) + } +} + @MainActor private struct TypingDots: View { @Environment(\.accessibilityReduceMotion) private var reduceMotion @@ -202,6 +259,48 @@ private struct MarkdownTextView: View { } } +@MainActor +private struct ChatMarkdownBody: View { + let text: String + + var body: some View { + let split = ChatMarkdownSplitter.split(markdown: self.text) + VStack(alignment: .leading, spacing: 10) { + ForEach(split.blocks) { block in + switch block.kind { + case .text: + MarkdownTextView(text: block.text) + case let .code(language): + CodeBlockView(code: block.text, language: language) + } + } + + if !split.images.isEmpty { + ForEach( + split.images, + id: \ChatMarkdownSplitter.InlineImage.id) + { (item: ChatMarkdownSplitter.InlineImage) in + if let img = item.image { + ClawdisPlatformImageFactory.image(img) + .resizable() + .scaledToFit() + .frame(maxHeight: 260) + .clipShape(RoundedRectangle(cornerRadius: 12, style: .continuous)) + .overlay( + RoundedRectangle(cornerRadius: 12, style: .continuous) + .strokeBorder(Color.white.opacity(0.12), lineWidth: 1)) + } else { + Text(item.label.isEmpty ? "Image" : item.label) + .font(.footnote) + .foregroundStyle(.secondary) + } + } + } + } + .textSelection(.enabled) + } +} + @MainActor private struct CodeBlockView: View { let code: String diff --git a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatModels.swift b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatModels.swift index 1d7361fce..7b68e13ea 100644 --- a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatModels.swift +++ b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatModels.swift @@ -1,6 +1,8 @@ import ClawdisKit import Foundation +// NOTE: keep this file lightweight; decode must be resilient to varying transcript formats. + #if canImport(AppKit) import AppKit @@ -11,25 +13,125 @@ import UIKit public typealias ClawdisPlatformImage = UIImage #endif +public struct ClawdisChatUsageCost: Codable, Hashable, Sendable { + public let input: Double? + public let output: Double? + public let cacheRead: Double? + public let cacheWrite: Double? + public let total: Double? +} + +public struct ClawdisChatUsage: Codable, Hashable, Sendable { + public let input: Int? + public let output: Int? + public let cacheRead: Int? + public let cacheWrite: Int? + public let cost: ClawdisChatUsageCost? + public let total: Int? + + enum CodingKeys: String, CodingKey { + case input + case output + case cacheRead + case cacheWrite + case cost + case total + case totalTokens + } + + public init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + self.input = try container.decodeIfPresent(Int.self, forKey: .input) + self.output = try container.decodeIfPresent(Int.self, forKey: .output) + self.cacheRead = try container.decodeIfPresent(Int.self, forKey: .cacheRead) + self.cacheWrite = try container.decodeIfPresent(Int.self, forKey: .cacheWrite) + self.cost = try container.decodeIfPresent(ClawdisChatUsageCost.self, forKey: .cost) + self.total = + try container.decodeIfPresent(Int.self, forKey: .total) ?? + container.decodeIfPresent(Int.self, forKey: .totalTokens) + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: CodingKeys.self) + try container.encodeIfPresent(self.input, forKey: .input) + try container.encodeIfPresent(self.output, forKey: .output) + try container.encodeIfPresent(self.cacheRead, forKey: .cacheRead) + try container.encodeIfPresent(self.cacheWrite, forKey: .cacheWrite) + try container.encodeIfPresent(self.cost, forKey: .cost) + try container.encodeIfPresent(self.total, forKey: .total) + } +} + public struct ClawdisChatMessageContent: Codable, Hashable, Sendable { public let type: String? public let text: String? + public let thinking: String? + public let thinkingSignature: String? public let mimeType: String? public let fileName: String? - public let content: String? + public let content: AnyCodable? + + // Tool-call fields (when `type == "toolCall"` or similar) + public let id: String? + public let name: String? + public let arguments: AnyCodable? public init( type: String?, text: String?, + thinking: String? = nil, + thinkingSignature: String? = nil, mimeType: String?, fileName: String?, - content: String?) + content: AnyCodable?, + id: String? = nil, + name: String? = nil, + arguments: AnyCodable? = nil) { self.type = type self.text = text + self.thinking = thinking + self.thinkingSignature = thinkingSignature self.mimeType = mimeType self.fileName = fileName self.content = content + self.id = id + self.name = name + self.arguments = arguments + } + + enum CodingKeys: String, CodingKey { + case type + case text + case thinking + case thinkingSignature + case mimeType + case fileName + case content + case id + case name + case arguments + } + + public init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + self.type = try container.decodeIfPresent(String.self, forKey: .type) + self.text = try container.decodeIfPresent(String.self, forKey: .text) + self.thinking = try container.decodeIfPresent(String.self, forKey: .thinking) + self.thinkingSignature = try container.decodeIfPresent(String.self, forKey: .thinkingSignature) + self.mimeType = try container.decodeIfPresent(String.self, forKey: .mimeType) + self.fileName = try container.decodeIfPresent(String.self, forKey: .fileName) + self.id = try container.decodeIfPresent(String.self, forKey: .id) + self.name = try container.decodeIfPresent(String.self, forKey: .name) + self.arguments = try container.decodeIfPresent(AnyCodable.self, forKey: .arguments) + + if let any = try container.decodeIfPresent(AnyCodable.self, forKey: .content) { + self.content = any + } else if let str = try container.decodeIfPresent(String.self, forKey: .content) { + self.content = AnyCodable(str) + } else { + self.content = nil + } } } @@ -38,27 +140,47 @@ public struct ClawdisChatMessage: Codable, Identifiable, Sendable { public let role: String public let content: [ClawdisChatMessageContent] public let timestamp: Double? + public let toolCallId: String? + public let usage: ClawdisChatUsage? + public let stopReason: String? enum CodingKeys: String, CodingKey { - case role, content, timestamp + case role + case content + case timestamp + case toolCallId + case tool_call_id + case usage + case stopReason } public init( id: UUID = .init(), role: String, content: [ClawdisChatMessageContent], - timestamp: Double?) + timestamp: Double?, + toolCallId: String? = nil, + usage: ClawdisChatUsage? = nil, + stopReason: String? = nil) { self.id = id self.role = role self.content = content self.timestamp = timestamp + self.toolCallId = toolCallId + self.usage = usage + self.stopReason = stopReason } public init(from decoder: Decoder) throws { let container = try decoder.container(keyedBy: CodingKeys.self) self.role = try container.decode(String.self, forKey: .role) self.timestamp = try container.decodeIfPresent(Double.self, forKey: .timestamp) + self.toolCallId = + try container.decodeIfPresent(String.self, forKey: .toolCallId) ?? + container.decodeIfPresent(String.self, forKey: .tool_call_id) + self.usage = try container.decodeIfPresent(ClawdisChatUsage.self, forKey: .usage) + self.stopReason = try container.decodeIfPresent(String.self, forKey: .stopReason) if let decoded = try? container.decode([ClawdisChatMessageContent].self, forKey: .content) { self.content = decoded @@ -71,15 +193,30 @@ public struct ClawdisChatMessage: Codable, Identifiable, Sendable { ClawdisChatMessageContent( type: "text", text: text, + thinking: nil, + thinkingSignature: nil, mimeType: nil, fileName: nil, - content: nil), + content: nil, + id: nil, + name: nil, + arguments: nil), ] return } self.content = [] } + + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: CodingKeys.self) + try container.encode(self.role, forKey: .role) + try container.encodeIfPresent(self.timestamp, forKey: .timestamp) + try container.encodeIfPresent(self.toolCallId, forKey: .toolCallId) + try container.encodeIfPresent(self.usage, forKey: .usage) + try container.encodeIfPresent(self.stopReason, forKey: .stopReason) + try container.encode(self.content, forKey: .content) + } } public struct ClawdisChatHistoryPayload: Codable, Sendable { @@ -102,6 +239,24 @@ public struct ClawdisChatEventPayload: Codable, Sendable { public let errorMessage: String? } +public struct ClawdisAgentEventPayload: Codable, Sendable, Identifiable { + public var id: String { "\(self.runId)-\(self.seq ?? -1)" } + public let runId: String + public let seq: Int? + public let stream: String + public let ts: Int? + public let data: [String: AnyCodable] +} + +public struct ClawdisChatPendingToolCall: Identifiable, Hashable, Sendable { + public var id: String { self.toolCallId } + public let toolCallId: String + public let name: String + public let args: AnyCodable? + public let startedAt: Double? + public let isError: Bool? +} + public struct ClawdisGatewayHealthOK: Codable, Sendable { public let ok: Bool? } diff --git a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatSessions.swift b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatSessions.swift new file mode 100644 index 000000000..daa1ea70a --- /dev/null +++ b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatSessions.swift @@ -0,0 +1,69 @@ +import Foundation + +public struct ClawdisChatSessionsDefaults: Codable, Sendable { + public let model: String? + public let contextTokens: Int? +} + +public enum ClawdisChatSessionSyncing: Codable, Hashable, Sendable { + case bool(Bool) + case string(String) + + public init(from decoder: Decoder) throws { + let container = try decoder.singleValueContainer() + if let b = try? container.decode(Bool.self) { + self = .bool(b) + return + } + if let s = try? container.decode(String.self) { + self = .string(s) + return + } + throw DecodingError.typeMismatch( + ClawdisChatSessionSyncing.self, + DecodingError.Context( + codingPath: decoder.codingPath, + debugDescription: "Expected Bool or String")) + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.singleValueContainer() + switch self { + case let .bool(b): + try container.encode(b) + case let .string(s): + try container.encode(s) + } + } +} + +public struct ClawdisChatSessionEntry: Codable, Identifiable, Sendable, Hashable { + public var id: String { self.key } + + public let key: String + public let kind: String? + public let updatedAt: Double? + public let sessionId: String? + + public let systemSent: Bool? + public let abortedLastRun: Bool? + public let thinkingLevel: String? + public let verboseLevel: String? + + public let inputTokens: Int? + public let outputTokens: Int? + public let totalTokens: Int? + + public let model: String? + public let contextTokens: Int? + public let syncing: ClawdisChatSessionSyncing? +} + +public struct ClawdisChatSessionsListResponse: Codable, Sendable { + public let ts: Double? + public let path: String? + public let count: Int? + public let defaults: ClawdisChatSessionsDefaults? + public let sessions: [ClawdisChatSessionEntry] +} + diff --git a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatSheets.swift b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatSheets.swift new file mode 100644 index 000000000..e6a752552 --- /dev/null +++ b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatSheets.swift @@ -0,0 +1,67 @@ +import Observation +import SwiftUI + +@MainActor +struct ChatSessionsSheet: View { + @Bindable var viewModel: ClawdisChatViewModel + @Environment(\.dismiss) private var dismiss + + var body: some View { + NavigationStack { + List(self.viewModel.sessions) { session in + Button { + self.viewModel.switchSession(to: session.key) + self.dismiss() + } label: { + VStack(alignment: .leading, spacing: 4) { + Text(session.key) + .font(.system(.body, design: .monospaced)) + .lineLimit(1) + if let updatedAt = session.updatedAt, updatedAt > 0 { + Text(Date(timeIntervalSince1970: updatedAt / 1000).formatted(date: .abbreviated, time: .shortened)) + .font(.caption) + .foregroundStyle(.secondary) + } + } + } + } + .navigationTitle("Sessions") + .toolbar { + #if os(macOS) + ToolbarItem(placement: .automatic) { + Button { + self.viewModel.refreshSessions(limit: 200) + } label: { + Image(systemName: "arrow.clockwise") + } + } + ToolbarItem(placement: .primaryAction) { + Button { + self.dismiss() + } label: { + Image(systemName: "xmark") + } + } + #else + ToolbarItem(placement: .topBarLeading) { + Button { + self.viewModel.refreshSessions(limit: 200) + } label: { + Image(systemName: "arrow.clockwise") + } + } + ToolbarItem(placement: .topBarTrailing) { + Button { + self.dismiss() + } label: { + Image(systemName: "xmark") + } + } + #endif + } + .onAppear { + self.viewModel.refreshSessions(limit: 200) + } + } + } +} diff --git a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatTransport.swift b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatTransport.swift index 3c3fc010f..71343c072 100644 --- a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatTransport.swift +++ b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatTransport.swift @@ -4,6 +4,7 @@ public enum ClawdisChatTransportEvent: Sendable { case health(ok: Bool) case tick case chat(ClawdisChatEventPayload) + case agent(ClawdisAgentEventPayload) case seqGap } @@ -16,6 +17,9 @@ public protocol ClawdisChatTransport: Sendable { idempotencyKey: String, attachments: [ClawdisChatAttachmentPayload]) async throws -> ClawdisChatSendResponse + func abortRun(sessionKey: String, runId: String) async throws + func listSessions(limit: Int?) async throws -> ClawdisChatSessionsListResponse + func requestHealth(timeoutMs: Int) async throws -> Bool func events() -> AsyncStream @@ -24,4 +28,18 @@ public protocol ClawdisChatTransport: Sendable { extension ClawdisChatTransport { public func setActiveSessionKey(_: String) async throws {} + + public func abortRun(sessionKey _: String, runId _: String) async throws { + throw NSError( + domain: "ClawdisChatTransport", + code: 0, + userInfo: [NSLocalizedDescriptionKey: "chat.abort not supported by this transport"]) + } + + public func listSessions(limit _: Int?) async throws -> ClawdisChatSessionsListResponse { + throw NSError( + domain: "ClawdisChatTransport", + code: 0, + userInfo: [NSLocalizedDescriptionKey: "sessions.list not supported by this transport"]) + } } diff --git a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatView.swift b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatView.swift index e0b547cdb..4e93763d8 100644 --- a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatView.swift +++ b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatView.swift @@ -4,6 +4,7 @@ import SwiftUI public struct ClawdisChatView: View { @State private var viewModel: ClawdisChatViewModel @State private var scrollerBottomID = UUID() + @State private var showSessions = false public init(viewModel: ClawdisChatViewModel) { self._viewModel = State(initialValue: viewModel) @@ -24,6 +25,9 @@ public struct ClawdisChatView: View { } .frame(maxWidth: .infinity, maxHeight: .infinity, alignment: .top) .onAppear { self.viewModel.load() } + .sheet(isPresented: self.$showSessions) { + ChatSessionsSheet(viewModel: self.viewModel) + } } private var messageList: some View { @@ -42,6 +46,16 @@ public struct ClawdisChatView: View { .frame(maxWidth: .infinity, alignment: .leading) } + if !self.viewModel.pendingToolCalls.isEmpty { + ChatPendingToolsBubble(toolCalls: self.viewModel.pendingToolCalls) + .frame(maxWidth: .infinity, alignment: .leading) + } + + if let text = self.viewModel.streamingAssistantText, !text.isEmpty { + ChatStreamingAssistantBubble(text: text) + .frame(maxWidth: .infinity, alignment: .leading) + } + Color.clear .frame(height: 1) .id(self.scrollerBottomID) @@ -64,6 +78,23 @@ public struct ClawdisChatView: View { Text(self.viewModel.healthOK ? "Connected" : "Connecting…") .font(.caption) .foregroundStyle(.secondary) + Spacer(minLength: 0) + + Button { + self.showSessions = true + } label: { + Image(systemName: "tray.full") + } + .buttonStyle(.borderless) + .help("Sessions") + + Button { + self.viewModel.refresh() + } label: { + Image(systemName: "arrow.clockwise") + } + .buttonStyle(.borderless) + .help("Refresh") } .padding(.horizontal, 10) .padding(.vertical, 6) diff --git a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatViewModel.swift b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatViewModel.swift index 87f1605d2..29068db5d 100644 --- a/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatViewModel.swift +++ b/apps/shared/ClawdisKit/Sources/ClawdisChatUI/ChatViewModel.swift @@ -20,12 +20,17 @@ public final class ClawdisChatViewModel { public var thinkingLevel: String = "off" public private(set) var isLoading = false public private(set) var isSending = false + public private(set) var isAborting = false public var errorText: String? public var attachments: [ClawdisPendingAttachment] = [] public private(set) var healthOK: Bool = false public private(set) var pendingRunCount: Int = 0 - public let sessionKey: String + public private(set) var sessionKey: String + public private(set) var sessionId: String? + public private(set) var streamingAssistantText: String? + public private(set) var pendingToolCalls: [ClawdisChatPendingToolCall] = [] + public private(set) var sessions: [ClawdisChatSessionEntry] = [] private let transport: any ClawdisChatTransport @ObservationIgnored @@ -38,6 +43,12 @@ public final class ClawdisChatViewModel { private nonisolated(unsafe) var pendingRunTimeoutTasks: [String: Task] = [:] private let pendingRunTimeoutMs: UInt64 = 120_000 + private var pendingToolCallsById: [String: ClawdisChatPendingToolCall] = [:] { + didSet { + self.pendingToolCalls = self.pendingToolCallsById.values.sorted { ($0.startedAt ?? 0) < ($1.startedAt ?? 0) } + } + } + private var lastHealthPollAt: Date? public init(sessionKey: String, transport: any ClawdisChatTransport) { @@ -75,6 +86,18 @@ public final class ClawdisChatViewModel { Task { await self.performSend() } } + public func abort() { + Task { await self.performAbort() } + } + + public func refreshSessions(limit: Int? = nil) { + Task { await self.fetchSessions(limit: limit) } + } + + public func switchSession(to sessionKey: String) { + Task { await self.performSwitchSession(to: sessionKey) } + } + public func addAttachments(urls: [URL]) { Task { await self.loadAttachments(urls: urls) } } @@ -89,7 +112,7 @@ public final class ClawdisChatViewModel { public var canSend: Bool { let trimmed = self.input.trimmingCharacters(in: .whitespacesAndNewlines) - return !self.isSending && (!trimmed.isEmpty || !self.attachments.isEmpty) + return !self.isSending && self.pendingRunCount == 0 && (!trimmed.isEmpty || !self.attachments.isEmpty) } // MARK: - Internals @@ -99,6 +122,9 @@ public final class ClawdisChatViewModel { self.errorText = nil self.healthOK = false self.clearPendingRuns(reason: nil) + self.pendingToolCallsById = [:] + self.streamingAssistantText = nil + self.sessionId = nil defer { self.isLoading = false } do { do { @@ -109,10 +135,12 @@ public final class ClawdisChatViewModel { let payload = try await self.transport.requestHistory(sessionKey: self.sessionKey) self.messages = Self.decodeMessages(payload.messages ?? []) + self.sessionId = payload.sessionId if let level = payload.thinkingLevel, !level.isEmpty { self.thinkingLevel = level } await self.pollHealthIfNeeded(force: true) + await self.fetchSessions(limit: 50) self.errorText = nil } catch { self.errorText = error.localizedDescription @@ -140,15 +168,24 @@ public final class ClawdisChatViewModel { self.errorText = nil let runId = UUID().uuidString let messageText = trimmed.isEmpty && !self.attachments.isEmpty ? "See attached." : trimmed + self.pendingRuns.insert(runId) + self.armPendingRunTimeout(runId: runId) + self.pendingToolCallsById = [:] + self.streamingAssistantText = nil // Optimistically append user message to UI. var userContent: [ClawdisChatMessageContent] = [ ClawdisChatMessageContent( type: "text", text: messageText, + thinking: nil, + thinkingSignature: nil, mimeType: nil, fileName: nil, - content: nil), + content: nil, + id: nil, + name: nil, + arguments: nil), ] let encodedAttachments = self.attachments.map { att -> ClawdisChatAttachmentPayload in ClawdisChatAttachmentPayload( @@ -162,9 +199,14 @@ public final class ClawdisChatViewModel { ClawdisChatMessageContent( type: att.type, text: nil, + thinking: nil, + thinkingSignature: nil, mimeType: att.mimeType, fileName: att.fileName, - content: att.content)) + content: AnyCodable(att.content), + id: nil, + name: nil, + arguments: nil)) } self.messages.append( ClawdisChatMessage( @@ -180,9 +222,13 @@ public final class ClawdisChatViewModel { thinking: self.thinkingLevel, idempotencyKey: runId, attachments: encodedAttachments) - self.pendingRuns.insert(response.runId) - self.armPendingRunTimeout(runId: response.runId) + if response.runId != runId { + self.clearPendingRun(runId) + self.pendingRuns.insert(response.runId) + self.armPendingRunTimeout(runId: response.runId) + } } catch { + self.clearPendingRun(runId) self.errorText = error.localizedDescription chatUILogger.error("chat.send failed \(error.localizedDescription, privacy: .public)") } @@ -192,6 +238,39 @@ public final class ClawdisChatViewModel { self.isSending = false } + private func performAbort() async { + guard !self.pendingRuns.isEmpty else { return } + guard !self.isAborting else { return } + self.isAborting = true + defer { self.isAborting = false } + + let runIds = Array(self.pendingRuns) + for runId in runIds { + do { + try await self.transport.abortRun(sessionKey: self.sessionKey, runId: runId) + } catch { + // Best-effort. + } + } + } + + private func fetchSessions(limit: Int?) async { + do { + let res = try await self.transport.listSessions(limit: limit) + self.sessions = res.sessions + } catch { + // Best-effort. + } + } + + private func performSwitchSession(to sessionKey: String) async { + let next = sessionKey.trimmingCharacters(in: .whitespacesAndNewlines) + guard !next.isEmpty else { return } + guard next != self.sessionKey else { return } + self.sessionKey = next + await self.bootstrap() + } + private func handleTransportEvent(_ evt: ClawdisChatTransportEvent) { switch evt { case let .health(ok): @@ -200,6 +279,8 @@ public final class ClawdisChatViewModel { Task { await self.pollHealthIfNeeded(force: false) } case let .chat(chat): self.handleChatEvent(chat) + case let .agent(agent): + self.handleAgentEvent(agent) case .seqGap: self.errorText = "Event stream interrupted; try refreshing." self.clearPendingRuns(reason: nil) @@ -217,29 +298,66 @@ public final class ClawdisChatViewModel { } switch chat.state { - case "final": - if let raw = chat.message, - let msg = try? ChatPayloadDecoding.decode(raw, as: ClawdisChatMessage.self) - { - self.messages.append(msg) + case "final", "aborted", "error": + if chat.state == "error" { + self.errorText = chat.errorMessage ?? "Chat failed" } if let runId = chat.runId { self.clearPendingRun(runId) } else if self.pendingRuns.count <= 1 { self.clearPendingRuns(reason: nil) } - case "error": - self.errorText = chat.errorMessage ?? "Chat failed" - if let runId = chat.runId { - self.clearPendingRun(runId) - } else if self.pendingRuns.count <= 1 { - self.clearPendingRuns(reason: nil) + self.pendingToolCallsById = [:] + self.streamingAssistantText = nil + Task { await self.refreshHistoryAfterRun() } + default: + break + } + } + + private func handleAgentEvent(_ evt: ClawdisAgentEventPayload) { + if let sessionId, evt.runId != sessionId { + return + } + + switch evt.stream { + case "assistant": + if let text = evt.data["text"]?.value as? String { + self.streamingAssistantText = text + } + case "tool": + guard let phase = evt.data["phase"]?.value as? String else { return } + guard let name = evt.data["name"]?.value as? String else { return } + guard let toolCallId = evt.data["toolCallId"]?.value as? String else { return } + if phase == "start" { + let args = evt.data["args"] + self.pendingToolCallsById[toolCallId] = ClawdisChatPendingToolCall( + toolCallId: toolCallId, + name: name, + args: args, + startedAt: evt.ts.map(Double.init) ?? Date().timeIntervalSince1970 * 1000, + isError: nil) + } else if phase == "result" { + self.pendingToolCallsById[toolCallId] = nil } default: break } } + private func refreshHistoryAfterRun() async { + do { + let payload = try await self.transport.requestHistory(sessionKey: self.sessionKey) + self.messages = Self.decodeMessages(payload.messages ?? []) + self.sessionId = payload.sessionId + if let level = payload.thinkingLevel, !level.isEmpty { + self.thinkingLevel = level + } + } catch { + chatUILogger.error("refresh history failed \(error.localizedDescription, privacy: .public)") + } + } + private func armPendingRunTimeout(runId: String) { self.pendingRunTimeoutTasks[runId]?.cancel() self.pendingRunTimeoutTasks[runId] = Task { [weak self] in diff --git a/src/agents/pi-embedded.ts b/src/agents/pi-embedded.ts index 7f3b51314..fdef78c30 100644 --- a/src/agents/pi-embedded.ts +++ b/src/agents/pi-embedded.ts @@ -141,6 +141,7 @@ export async function runEmbeddedPiAgent(params: { verboseLevel?: VerboseLevel; timeoutMs: number; runId: string; + abortSignal?: AbortSignal; onPartialReply?: (payload: { text?: string; mediaUrls?: string[]; @@ -246,7 +247,7 @@ export async function runEmbeddedPiAgent(params: { const toolMetaById = new Map(); let deltaBuffer = ""; let lastStreamedAssistant: string | undefined; - let aborted = false; + let aborted = Boolean(params.abortSignal?.aborted); const unsubscribe = session.subscribe( (evt: AgentEvent | { type: string; [k: string]: unknown }) => { @@ -342,18 +343,31 @@ export async function runEmbeddedPiAgent(params: { if (chunk) { deltaBuffer += chunk; const next = deltaBuffer.trim(); - if ( - next && - next !== lastStreamedAssistant && - params.onPartialReply - ) { + if (next && next !== lastStreamedAssistant) { lastStreamedAssistant = next; const { text: cleanedText, mediaUrls } = splitMediaFromOutput(next); - void params.onPartialReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + emitAgentEvent({ + runId: params.runId, + stream: "assistant", + data: { + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, }); + params.onAgentEvent?.({ + stream: "assistant", + data: { + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, + }); + if (params.onPartialReply) { + void params.onPartialReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }); + } } } } @@ -385,15 +399,36 @@ export async function runEmbeddedPiAgent(params: { let messagesSnapshot: AppMessage[] = []; let sessionIdUsed = session.sessionId; + const onAbort = () => { + aborted = true; + void session.abort(); + }; + if (params.abortSignal) { + if (params.abortSignal.aborted) { + onAbort(); + } else { + params.abortSignal.addEventListener("abort", onAbort, { once: true }); + } + } + let promptError: unknown | null = null; try { - await session.prompt(params.prompt); - messagesSnapshot = session.messages.slice(); - sessionIdUsed = session.sessionId; + try { + await session.prompt(params.prompt); + } catch (err) { + promptError = err; + } finally { + messagesSnapshot = session.messages.slice(); + sessionIdUsed = session.sessionId; + } } finally { clearTimeout(abortTimer); unsubscribe(); toolDebouncer.flush(); session.dispose(); + params.abortSignal?.removeEventListener?.("abort", onAbort); + } + if (promptError && !aborted) { + throw promptError; } const lastAssistant = messagesSnapshot diff --git a/src/commands/agent.ts b/src/commands/agent.ts index e8163ec07..673d79efa 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -46,6 +46,7 @@ type AgentCommandOpts = { surface?: string; provider?: string; // delivery provider (whatsapp|telegram|...) bestEffortDeliver?: boolean; + abortSignal?: AbortSignal; }; type SessionResolution = { @@ -251,6 +252,7 @@ export async function agentCommand( verboseLevel: resolvedVerboseLevel, timeoutMs, runId: sessionId, + abortSignal: opts.abortSignal, onAgentEvent: (evt) => { emitAgentEvent({ runId: sessionId, @@ -269,6 +271,7 @@ export async function agentCommand( to: opts.to ?? null, sessionId, durationMs: Date.now() - startedAt, + aborted: result.meta.aborted ?? false, }, }); } catch (err) { @@ -308,6 +311,7 @@ export async function agentCommand( model: modelUsed, contextTokens, }; + next.abortedLastRun = result.meta.aborted ?? false; if (usage) { const input = usage.input ?? 0; const output = usage.output ?? 0; diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index e4500a57d..0da06cb59 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -3,6 +3,8 @@ import { type AgentEvent, AgentEventSchema, AgentParamsSchema, + type ChatAbortParams, + ChatAbortParamsSchema, type ChatEvent, ChatEventSchema, ChatHistoryParamsSchema, @@ -137,6 +139,9 @@ export const validateCronRunsParams = ajv.compile(CronRunsParamsSchema); export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema); export const validateChatSendParams = ajv.compile(ChatSendParamsSchema); +export const validateChatAbortParams = ajv.compile( + ChatAbortParamsSchema, +); export const validateChatEvent = ajv.compile(ChatEventSchema); export function formatValidationErrors( diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index a31b223f8..ce9bea42c 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -480,6 +480,14 @@ export const ChatSendParamsSchema = Type.Object( { additionalProperties: false }, ); +export const ChatAbortParamsSchema = Type.Object( + { + sessionKey: NonEmptyString, + runId: NonEmptyString, + }, + { additionalProperties: false }, +); + export const ChatEventSchema = Type.Object( { runId: NonEmptyString, @@ -488,6 +496,7 @@ export const ChatEventSchema = Type.Object( state: Type.Union([ Type.Literal("delta"), Type.Literal("final"), + Type.Literal("aborted"), Type.Literal("error"), ]), message: Type.Optional(Type.Unknown()), @@ -533,6 +542,7 @@ export const ProtocolSchemas: Record = { CronRunLogEntry: CronRunLogEntrySchema, ChatHistoryParams: ChatHistoryParamsSchema, ChatSendParams: ChatSendParamsSchema, + ChatAbortParams: ChatAbortParamsSchema, ChatEvent: ChatEventSchema, TickEvent: TickEventSchema, ShutdownEvent: ShutdownEventSchema, @@ -570,6 +580,7 @@ export type CronRemoveParams = Static; export type CronRunParams = Static; export type CronRunsParams = Static; export type CronRunLogEntry = Static; +export type ChatAbortParams = Static; export type ChatEvent = Static; export type TickEvent = Static; export type ShutdownEvent = Static; diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 446cdc2a6..444ab2ec6 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -1970,6 +1970,87 @@ describe("gateway server", () => { await server.close(); }); + test("chat.abort cancels an in-flight chat.send", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const spy = vi.mocked(agentCommand); + spy.mockImplementationOnce(async (opts) => { + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + + const abortedEventP = onceMessage( + ws, + (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted", + ); + + ws.send( + JSON.stringify({ + type: "req", + id: "send-abort-1", + method: "chat.send", + params: { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-abort-1", + timeoutMs: 30_000, + }, + }), + ); + + await new Promise((r) => setTimeout(r, 10)); + + ws.send( + JSON.stringify({ + type: "req", + id: "abort-1", + method: "chat.abort", + params: { sessionKey: "main", runId: "idem-abort-1" }, + }), + ); + + const abortRes = await onceMessage( + ws, + (o) => o.type === "res" && o.id === "abort-1", + ); + expect(abortRes.ok).toBe(true); + + const sendRes = await onceMessage( + ws, + (o) => o.type === "res" && o.id === "send-abort-1", + ); + expect(sendRes.ok).toBe(true); + + const evt = await abortedEventP; + expect(evt.payload?.runId).toBe("idem-abort-1"); + expect(evt.payload?.sessionKey).toBe("main"); + + ws.close(); + await server.close(); + }); + test("bridge RPC chat.history returns session messages", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); @@ -2029,6 +2110,54 @@ describe("gateway server", () => { await server.close(); }); + test("bridge RPC sessions.list returns session rows", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + const port = await getFreePort(); + const server = await startGatewayServer(port); + const bridgeCall = bridgeStartCalls.at(-1); + expect(bridgeCall?.onRequest).toBeDefined(); + + const res = await bridgeCall?.onRequest?.("ios-node", { + id: "r1", + method: "sessions.list", + paramsJSON: JSON.stringify({ + includeGlobal: true, + includeUnknown: false, + limit: 50, + }), + }); + + expect(res?.ok).toBe(true); + const payload = JSON.parse( + String((res as { payloadJSON?: string }).payloadJSON ?? "{}"), + ) as { + sessions?: unknown[]; + count?: number; + path?: string; + }; + expect(Array.isArray(payload.sessions)).toBe(true); + expect(typeof payload.count).toBe("number"); + expect(typeof payload.path).toBe("string"); + + await server.close(); + }); + test("bridge chat events are pushed to subscribed nodes", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); @@ -2092,6 +2221,13 @@ describe("gateway server", () => { // Wait a tick for the bridge send to happen. await new Promise((r) => setTimeout(r, 25)); + expect(bridgeSendEvent).toHaveBeenCalledWith( + expect.objectContaining({ + nodeId: "ios-node", + event: "agent", + }), + ); + expect(bridgeSendEvent).toHaveBeenCalledWith( expect.objectContaining({ nodeId: "ios-node", diff --git a/src/gateway/server.ts b/src/gateway/server.ts index e2aa178ae..78e6bab60 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -103,6 +103,7 @@ import { type SessionsPatchParams, type Snapshot, validateAgentParams, + validateChatAbortParams, validateChatHistoryParams, validateChatSendParams, validateConnectParams, @@ -208,6 +209,7 @@ const METHODS = [ "agent", // WebChat WebSocket-native chat methods "chat.history", + "chat.abort", "chat.send", ]; @@ -282,7 +284,11 @@ const chatRunSessions = new Map< string, { sessionKey: string; clientRunId: string } >(); -const chatRunBuffers = new Map(); +const chatRunBuffers = new Map(); +const chatAbortControllers = new Map< + string, + { controller: AbortController; sessionId: string; sessionKey: string } +>(); const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN; @@ -903,6 +909,120 @@ export async function startGatewayServer( const snap = await refreshHealthSnapshot({ probe: false }); return { ok: true, payloadJSON: JSON.stringify(snap) }; } + case "sessions.list": { + const params = parseParams(); + if (!validateSessionsListParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`, + }, + }; + } + const p = params as SessionsListParams; + const cfg = loadConfig(); + const storePath = resolveStorePath(cfg.inbound?.session?.store); + const store = loadSessionStore(storePath); + const result = listSessionsFromStore({ + cfg, + storePath, + store, + opts: p, + }); + return { ok: true, payloadJSON: JSON.stringify(result) }; + } + case "sessions.patch": { + const params = parseParams(); + if (!validateSessionsPatchParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`, + }, + }; + } + + const p = params as SessionsPatchParams; + const key = String(p.key ?? "").trim(); + if (!key) { + return { + ok: false, + error: { code: ErrorCodes.INVALID_REQUEST, message: "key required" }, + }; + } + + const cfg = loadConfig(); + const storePath = resolveStorePath(cfg.inbound?.session?.store); + const store = loadSessionStore(storePath); + const now = Date.now(); + + const existing = store[key]; + const next: SessionEntry = existing + ? { + ...existing, + updatedAt: Math.max(existing.updatedAt ?? 0, now), + } + : { sessionId: randomUUID(), updatedAt: now }; + + if ("thinkingLevel" in p) { + const raw = p.thinkingLevel; + if (raw === null) { + delete next.thinkingLevel; + } else if (raw !== undefined) { + const normalized = normalizeThinkLevel(String(raw)); + if (!normalized) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid thinkingLevel: ${String(raw)}`, + }, + }; + } + next.thinkingLevel = normalized; + } + } + + if ("verboseLevel" in p) { + const raw = p.verboseLevel; + if (raw === null) { + delete next.verboseLevel; + } else if (raw !== undefined) { + const normalized = normalizeVerboseLevel(String(raw)); + if (!normalized) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid verboseLevel: ${String(raw)}`, + }, + }; + } + next.verboseLevel = normalized; + } + } + + if ("syncing" in p) { + const raw = p.syncing; + if (raw === null) { + delete next.syncing; + } else if (raw !== undefined) { + next.syncing = raw as boolean | string; + } + } + + store[key] = next; + await saveSessionStore(storePath, store); + const payload: SessionsPatchResult = { + ok: true, + path: storePath, + key, + entry: next, + }; + return { ok: true, payloadJSON: JSON.stringify(payload) }; + } case "chat.history": { const params = parseParams(); if (!validateChatHistoryParams(params)) { @@ -945,6 +1065,60 @@ export async function startGatewayServer( }), }; } + case "chat.abort": { + const params = parseParams(); + if (!validateChatAbortParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid chat.abort params: ${formatValidationErrors(validateChatAbortParams.errors)}`, + }, + }; + } + + const { sessionKey, runId } = params as { + sessionKey: string; + runId: string; + }; + const active = chatAbortControllers.get(runId); + if (!active) { + return { + ok: true, + payloadJSON: JSON.stringify({ ok: true, aborted: false }), + }; + } + if (active.sessionKey !== sessionKey) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: "runId does not match sessionKey", + }, + }; + } + + active.controller.abort(); + chatAbortControllers.delete(runId); + chatRunBuffers.delete(runId); + const current = chatRunSessions.get(active.sessionId); + if (current?.clientRunId === runId && current.sessionKey === sessionKey) { + chatRunSessions.delete(active.sessionId); + } + + const payload = { + runId, + sessionKey, + seq: (agentRunSeq.get(active.sessionId) ?? 0) + 1, + state: "aborted" as const, + }; + broadcast("chat", payload); + bridgeSendToSession(sessionKey, "chat", payload); + return { + ok: true, + payloadJSON: JSON.stringify({ ok: true, aborted: true }), + }; + } case "chat.send": { const params = parseParams(); if (!validateChatSendParams(params)) { @@ -1052,6 +1226,13 @@ export async function startGatewayServer( }; } + const abortController = new AbortController(); + chatAbortControllers.set(clientRunId, { + controller: abortController, + sessionId, + sessionKey: p.sessionKey, + }); + try { await agentCommand( { @@ -1061,6 +1242,7 @@ export async function startGatewayServer( deliver: p.deliver, timeout: Math.ceil(timeoutMs / 1000).toString(), surface: `Iris(${nodeId})`, + abortSignal: abortController.signal, }, defaultRuntime, deps, @@ -1095,6 +1277,8 @@ export async function startGatewayServer( message: String(err), }, }; + } finally { + chatAbortControllers.delete(clientRunId); } } default: @@ -1504,26 +1688,25 @@ export async function startGatewayServer( agentRunSeq.set(evt.runId, evt.seq); broadcast("agent", evt); - const chatLink = chatRunSessions.get(evt.runId); - if (chatLink) { - // Map agent bus events to chat events for WS WebChat clients. - // Use clientRunId so the webchat can correlate with its pending promise. - const { sessionKey, clientRunId } = chatLink; + const chatLink = chatRunSessions.get(evt.runId); + if (chatLink) { + // Map agent bus events to chat events for WS WebChat clients. + // Use clientRunId so the webchat can correlate with its pending promise. + const { sessionKey, clientRunId } = chatLink; + bridgeSendToSession(sessionKey, "agent", evt); const base = { runId: clientRunId, sessionKey, seq: evt.seq, }; if (evt.stream === "assistant" && typeof evt.data?.text === "string") { - const buf = chatRunBuffers.get(clientRunId) ?? []; - buf.push(evt.data.text); - chatRunBuffers.set(clientRunId, buf); + chatRunBuffers.set(clientRunId, evt.data.text); } else if ( evt.stream === "job" && typeof evt.data?.state === "string" && (evt.data.state === "done" || evt.data.state === "error") ) { - const text = chatRunBuffers.get(clientRunId)?.join("\n").trim() ?? ""; + const text = chatRunBuffers.get(clientRunId)?.trim() ?? ""; chatRunBuffers.delete(clientRunId); if (evt.data.state === "done") { const payload = { @@ -1962,6 +2145,62 @@ export async function startGatewayServer( }); break; } + case "chat.abort": { + const params = (req.params ?? {}) as Record; + if (!validateChatAbortParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid chat.abort params: ${formatValidationErrors(validateChatAbortParams.errors)}`, + ), + ); + break; + } + const { sessionKey, runId } = params as { + sessionKey: string; + runId: string; + }; + const active = chatAbortControllers.get(runId); + if (!active) { + respond(true, { ok: true, aborted: false }); + break; + } + if (active.sessionKey !== sessionKey) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "runId does not match sessionKey", + ), + ); + break; + } + + active.controller.abort(); + chatAbortControllers.delete(runId); + chatRunBuffers.delete(runId); + const current = chatRunSessions.get(active.sessionId); + if ( + current?.clientRunId === runId && + current.sessionKey === sessionKey + ) { + chatRunSessions.delete(active.sessionId); + } + + const payload = { + runId, + sessionKey, + seq: (agentRunSeq.get(active.sessionId) ?? 0) + 1, + state: "aborted" as const, + }; + broadcast("chat", payload); + bridgeSendToSession(sessionKey, "chat", payload); + respond(true, { ok: true, aborted: true }); + break; + } case "chat.send": { const params = (req.params ?? {}) as Record; if (!validateChatSendParams(params)) { @@ -2061,6 +2300,13 @@ export async function startGatewayServer( } try { + const abortController = new AbortController(); + chatAbortControllers.set(clientRunId, { + controller: abortController, + sessionId, + sessionKey: p.sessionKey, + }); + await agentCommand( { message: messageWithAttachments, @@ -2069,6 +2315,7 @@ export async function startGatewayServer( deliver: p.deliver, timeout: Math.ceil(timeoutMs / 1000).toString(), surface: "WebChat", + abortSignal: abortController.signal, }, defaultRuntime, deps, @@ -2100,6 +2347,8 @@ export async function startGatewayServer( runId: clientRunId, error: formatForLog(err), }); + } finally { + chatAbortControllers.delete(clientRunId); } break; }