import MoltbotKit import Foundation import Observation import OSLog import UniformTypeIdentifiers #if canImport(AppKit) import AppKit #elseif canImport(UIKit) import UIKit #endif private let chatUILogger = Logger(subsystem: "com.clawdbot", category: "MoltbotChatUI") @MainActor @Observable public final class MoltbotChatViewModel { public private(set) var messages: [MoltbotChatMessage] = [] public var input: String = "" public var thinkingLevel: String = "off" public private(set) var isLoading = false public private(set) var isSending = false public private(set) var isAborting = false public var errorText: String? public var attachments: [MoltbotPendingAttachment] = [] public private(set) var healthOK: Bool = false public private(set) var pendingRunCount: Int = 0 public private(set) var sessionKey: String public private(set) var sessionId: String? public private(set) var streamingAssistantText: String? public private(set) var pendingToolCalls: [MoltbotChatPendingToolCall] = [] public private(set) var sessions: [MoltbotChatSessionEntry] = [] private let transport: any MoltbotChatTransport @ObservationIgnored private nonisolated(unsafe) var eventTask: Task? private var pendingRuns = Set() { didSet { self.pendingRunCount = self.pendingRuns.count } } @ObservationIgnored private nonisolated(unsafe) var pendingRunTimeoutTasks: [String: Task] = [:] private let pendingRunTimeoutMs: UInt64 = 120_000 private var pendingToolCallsById: [String: MoltbotChatPendingToolCall] = [:] { didSet { self.pendingToolCalls = self.pendingToolCallsById.values .sorted { ($0.startedAt ?? 0) < ($1.startedAt ?? 0) } } } private var lastHealthPollAt: Date? public init(sessionKey: String, transport: any MoltbotChatTransport) { self.sessionKey = sessionKey self.transport = transport self.eventTask = Task { [weak self] in guard let self else { return } let stream = self.transport.events() for await evt in stream { if Task.isCancelled { return } await MainActor.run { [weak self] in self?.handleTransportEvent(evt) } } } } deinit { self.eventTask?.cancel() for (_, task) in self.pendingRunTimeoutTasks { task.cancel() } } public func load() { Task { await self.bootstrap() } } public func refresh() { Task { await self.bootstrap() } } public func send() { Task { await self.performSend() } } public func abort() { Task { await self.performAbort() } } public func refreshSessions(limit: Int? = nil) { Task { await self.fetchSessions(limit: limit) } } public func switchSession(to sessionKey: String) { Task { await self.performSwitchSession(to: sessionKey) } } public var sessionChoices: [MoltbotChatSessionEntry] { let now = Date().timeIntervalSince1970 * 1000 let cutoff = now - (24 * 60 * 60 * 1000) let sorted = self.sessions.sorted { ($0.updatedAt ?? 0) > ($1.updatedAt ?? 0) } var seen = Set() var recent: [MoltbotChatSessionEntry] = [] for entry in sorted { guard !seen.contains(entry.key) else { continue } seen.insert(entry.key) guard (entry.updatedAt ?? 0) >= cutoff else { continue } recent.append(entry) } var result: [MoltbotChatSessionEntry] = [] var included = Set() for entry in recent where !included.contains(entry.key) { result.append(entry) included.insert(entry.key) } if !included.contains(self.sessionKey) { if let current = sorted.first(where: { $0.key == self.sessionKey }) { result.append(current) } else { result.append(self.placeholderSession(key: self.sessionKey)) } } return result } public func addAttachments(urls: [URL]) { Task { await self.loadAttachments(urls: urls) } } public func addImageAttachment(data: Data, fileName: String, mimeType: String) { Task { await self.addImageAttachment(url: nil, data: data, fileName: fileName, mimeType: mimeType) } } public func removeAttachment(_ id: MoltbotPendingAttachment.ID) { self.attachments.removeAll { $0.id == id } } public var canSend: Bool { let trimmed = self.input.trimmingCharacters(in: .whitespacesAndNewlines) return !self.isSending && self.pendingRunCount == 0 && (!trimmed.isEmpty || !self.attachments.isEmpty) } // MARK: - Internals private func bootstrap() async { self.isLoading = true self.errorText = nil self.healthOK = false self.clearPendingRuns(reason: nil) self.pendingToolCallsById = [:] self.streamingAssistantText = nil self.sessionId = nil defer { self.isLoading = false } do { do { try await self.transport.setActiveSessionKey(self.sessionKey) } catch { // Best-effort only; history/send/health still work without push events. } let payload = try await self.transport.requestHistory(sessionKey: self.sessionKey) self.messages = Self.decodeMessages(payload.messages ?? []) self.sessionId = payload.sessionId if let level = payload.thinkingLevel, !level.isEmpty { self.thinkingLevel = level } await self.pollHealthIfNeeded(force: true) await self.fetchSessions(limit: 50) self.errorText = nil } catch { self.errorText = error.localizedDescription chatUILogger.error("bootstrap failed \(error.localizedDescription, privacy: .public)") } } private static func decodeMessages(_ raw: [AnyCodable]) -> [MoltbotChatMessage] { let decoded = raw.compactMap { item in (try? ChatPayloadDecoding.decode(item, as: MoltbotChatMessage.self)) } return Self.dedupeMessages(decoded) } private static func dedupeMessages(_ messages: [MoltbotChatMessage]) -> [MoltbotChatMessage] { var result: [MoltbotChatMessage] = [] result.reserveCapacity(messages.count) var seen = Set() for message in messages { guard let key = Self.dedupeKey(for: message) else { result.append(message) continue } if seen.contains(key) { continue } seen.insert(key) result.append(message) } return result } private static func dedupeKey(for message: MoltbotChatMessage) -> String? { guard let timestamp = message.timestamp else { return nil } let text = message.content.compactMap(\.text).joined(separator: "\n") .trimmingCharacters(in: .whitespacesAndNewlines) guard !text.isEmpty else { return nil } return "\(message.role)|\(timestamp)|\(text)" } private func performSend() async { guard !self.isSending else { return } let trimmed = self.input.trimmingCharacters(in: .whitespacesAndNewlines) guard !trimmed.isEmpty || !self.attachments.isEmpty else { return } guard self.healthOK else { self.errorText = "Gateway health not OK; cannot send" return } self.isSending = true self.errorText = nil let runId = UUID().uuidString let messageText = trimmed.isEmpty && !self.attachments.isEmpty ? "See attached." : trimmed self.pendingRuns.insert(runId) self.armPendingRunTimeout(runId: runId) self.pendingToolCallsById = [:] self.streamingAssistantText = nil // Optimistically append user message to UI. var userContent: [MoltbotChatMessageContent] = [ MoltbotChatMessageContent( type: "text", text: messageText, thinking: nil, thinkingSignature: nil, mimeType: nil, fileName: nil, content: nil, id: nil, name: nil, arguments: nil), ] let encodedAttachments = self.attachments.map { att -> MoltbotChatAttachmentPayload in MoltbotChatAttachmentPayload( type: att.type, mimeType: att.mimeType, fileName: att.fileName, content: att.data.base64EncodedString()) } for att in encodedAttachments { userContent.append( MoltbotChatMessageContent( type: att.type, text: nil, thinking: nil, thinkingSignature: nil, mimeType: att.mimeType, fileName: att.fileName, content: AnyCodable(att.content), id: nil, name: nil, arguments: nil)) } self.messages.append( MoltbotChatMessage( id: UUID(), role: "user", content: userContent, timestamp: Date().timeIntervalSince1970 * 1000)) // Clear input immediately for responsive UX (before network await) self.input = "" self.attachments = [] do { let response = try await self.transport.sendMessage( sessionKey: self.sessionKey, message: messageText, thinking: self.thinkingLevel, idempotencyKey: runId, attachments: encodedAttachments) if response.runId != runId { self.clearPendingRun(runId) self.pendingRuns.insert(response.runId) self.armPendingRunTimeout(runId: response.runId) } } catch { self.clearPendingRun(runId) self.errorText = error.localizedDescription chatUILogger.error("chat.send failed \(error.localizedDescription, privacy: .public)") } self.isSending = false } private func performAbort() async { guard !self.pendingRuns.isEmpty else { return } guard !self.isAborting else { return } self.isAborting = true defer { self.isAborting = false } let runIds = Array(self.pendingRuns) for runId in runIds { do { try await self.transport.abortRun(sessionKey: self.sessionKey, runId: runId) } catch { // Best-effort. } } } private func fetchSessions(limit: Int?) async { do { let res = try await self.transport.listSessions(limit: limit) self.sessions = res.sessions } catch { // Best-effort. } } private func performSwitchSession(to sessionKey: String) async { let next = sessionKey.trimmingCharacters(in: .whitespacesAndNewlines) guard !next.isEmpty else { return } guard next != self.sessionKey else { return } self.sessionKey = next await self.bootstrap() } private func placeholderSession(key: String) -> MoltbotChatSessionEntry { MoltbotChatSessionEntry( key: key, kind: nil, displayName: nil, surface: nil, subject: nil, room: nil, space: nil, updatedAt: nil, sessionId: nil, systemSent: nil, abortedLastRun: nil, thinkingLevel: nil, verboseLevel: nil, inputTokens: nil, outputTokens: nil, totalTokens: nil, model: nil, contextTokens: nil) } private func handleTransportEvent(_ evt: MoltbotChatTransportEvent) { switch evt { case let .health(ok): self.healthOK = ok case .tick: Task { await self.pollHealthIfNeeded(force: false) } case let .chat(chat): self.handleChatEvent(chat) case let .agent(agent): self.handleAgentEvent(agent) case .seqGap: self.errorText = "Event stream interrupted; try refreshing." self.clearPendingRuns(reason: nil) } } private func handleChatEvent(_ chat: MoltbotChatEventPayload) { if let sessionKey = chat.sessionKey, sessionKey != self.sessionKey { return } let isOurRun = chat.runId.flatMap { self.pendingRuns.contains($0) } ?? false if !isOurRun { // Keep multiple clients in sync: if another client finishes a run for our session, refresh history. switch chat.state { case "final", "aborted", "error": self.streamingAssistantText = nil self.pendingToolCallsById = [:] Task { await self.refreshHistoryAfterRun() } default: break } return } switch chat.state { case "final", "aborted", "error": if chat.state == "error" { self.errorText = chat.errorMessage ?? "Chat failed" } if let runId = chat.runId { self.clearPendingRun(runId) } else if self.pendingRuns.count <= 1 { self.clearPendingRuns(reason: nil) } self.pendingToolCallsById = [:] self.streamingAssistantText = nil Task { await self.refreshHistoryAfterRun() } default: break } } private func handleAgentEvent(_ evt: MoltbotAgentEventPayload) { if let sessionId, evt.runId != sessionId { return } switch evt.stream { case "assistant": if let text = evt.data["text"]?.value as? String { self.streamingAssistantText = text } case "tool": guard let phase = evt.data["phase"]?.value as? String else { return } guard let name = evt.data["name"]?.value as? String else { return } guard let toolCallId = evt.data["toolCallId"]?.value as? String else { return } if phase == "start" { let args = evt.data["args"] self.pendingToolCallsById[toolCallId] = MoltbotChatPendingToolCall( toolCallId: toolCallId, name: name, args: args, startedAt: evt.ts.map(Double.init) ?? Date().timeIntervalSince1970 * 1000, isError: nil) } else if phase == "result" { self.pendingToolCallsById[toolCallId] = nil } default: break } } private func refreshHistoryAfterRun() async { do { let payload = try await self.transport.requestHistory(sessionKey: self.sessionKey) self.messages = Self.decodeMessages(payload.messages ?? []) self.sessionId = payload.sessionId if let level = payload.thinkingLevel, !level.isEmpty { self.thinkingLevel = level } } catch { chatUILogger.error("refresh history failed \(error.localizedDescription, privacy: .public)") } } private func armPendingRunTimeout(runId: String) { self.pendingRunTimeoutTasks[runId]?.cancel() self.pendingRunTimeoutTasks[runId] = Task { [weak self] in let timeoutMs = await MainActor.run { self?.pendingRunTimeoutMs ?? 0 } try? await Task.sleep(nanoseconds: timeoutMs * 1_000_000) await MainActor.run { [weak self] in guard let self else { return } guard self.pendingRuns.contains(runId) else { return } self.clearPendingRun(runId) self.errorText = "Timed out waiting for a reply; try again or refresh." } } } private func clearPendingRun(_ runId: String) { self.pendingRuns.remove(runId) self.pendingRunTimeoutTasks[runId]?.cancel() self.pendingRunTimeoutTasks[runId] = nil } private func clearPendingRuns(reason: String?) { for runId in self.pendingRuns { self.pendingRunTimeoutTasks[runId]?.cancel() } self.pendingRunTimeoutTasks.removeAll() self.pendingRuns.removeAll() if let reason, !reason.isEmpty { self.errorText = reason } } private func pollHealthIfNeeded(force: Bool) async { if !force, let last = self.lastHealthPollAt, Date().timeIntervalSince(last) < 10 { return } self.lastHealthPollAt = Date() do { let ok = try await self.transport.requestHealth(timeoutMs: 5000) self.healthOK = ok } catch { self.healthOK = false } } private func loadAttachments(urls: [URL]) async { for url in urls { do { let data = try await Task.detached { try Data(contentsOf: url) }.value await self.addImageAttachment( url: url, data: data, fileName: url.lastPathComponent, mimeType: Self.mimeType(for: url) ?? "application/octet-stream") } catch { await MainActor.run { self.errorText = error.localizedDescription } } } } private static func mimeType(for url: URL) -> String? { let ext = url.pathExtension guard !ext.isEmpty else { return nil } return (UTType(filenameExtension: ext) ?? .data).preferredMIMEType } private func addImageAttachment(url: URL?, data: Data, fileName: String, mimeType: String) async { if data.count > 5_000_000 { self.errorText = "Attachment \(fileName) exceeds 5 MB limit" return } let uti: UTType = { if let url { return UTType(filenameExtension: url.pathExtension) ?? .data } return UTType(mimeType: mimeType) ?? .data }() guard uti.conforms(to: .image) else { self.errorText = "Only image attachments are supported right now" return } let preview = Self.previewImage(data: data) self.attachments.append( MoltbotPendingAttachment( url: url, data: data, fileName: fileName, mimeType: mimeType, preview: preview)) } private static func previewImage(data: Data) -> MoltbotPlatformImage? { #if canImport(AppKit) NSImage(data: data) #elseif canImport(UIKit) UIImage(data: data) #else nil #endif } }