diff --git a/apps/macos/Sources/Clawdis/AgentRPC.swift b/apps/macos/Sources/Clawdis/AgentRPC.swift index 87d0540c6..6c5d7b53b 100644 --- a/apps/macos/Sources/Clawdis/AgentRPC.swift +++ b/apps/macos/Sources/Clawdis/AgentRPC.swift @@ -62,17 +62,19 @@ actor AgentRPC { func send( text: String, thinking: String?, - session: String, + sessionKey: String, deliver: Bool, - to: String?) async -> (ok: Bool, text: String?, error: String?) + to: String?, + channel: String? = nil) async -> (ok: Bool, text: String?, error: String?) { do { let params: [String: Any] = [ "message": text, - "sessionId": session, + "sessionKey": sessionKey, "thinking": thinking ?? "default", "deliver": deliver, "to": to ?? "", + "channel": channel ?? "", "idempotencyKey": UUID().uuidString, ] _ = try await self.controlRequest(method: "agent", params: ControlRequestParams(raw: params)) diff --git a/apps/macos/Sources/Clawdis/ControlRequestHandler.swift b/apps/macos/Sources/Clawdis/ControlRequestHandler.swift index 43da27283..30a578c33 100644 --- a/apps/macos/Sources/Clawdis/ControlRequestHandler.swift +++ b/apps/macos/Sources/Clawdis/ControlRequestHandler.swift @@ -57,9 +57,10 @@ enum ControlRequestHandler { let rpcResult = await AgentRPC.shared.send( text: trimmed, thinking: thinking, - session: sessionKey, + sessionKey: sessionKey, deliver: deliver, - to: to) + to: to, + channel: nil) return rpcResult.ok ? Response(ok: true, message: rpcResult.text ?? "sent") : Response(ok: false, message: rpcResult.error ?? "failed to send") diff --git a/apps/macos/Sources/Clawdis/VoiceWakeForwarder.swift b/apps/macos/Sources/Clawdis/VoiceWakeForwarder.swift index 91e2e2576..cc2b368e1 100644 --- a/apps/macos/Sources/Clawdis/VoiceWakeForwarder.swift +++ b/apps/macos/Sources/Clawdis/VoiceWakeForwarder.swift @@ -33,10 +33,11 @@ enum VoiceWakeForwarder { } struct ForwardOptions: Sendable { - var session: String = "main" + var sessionKey: String = "main" var thinking: String = "low" var deliver: Bool = true var to: String? + var channel: String = "last" } @discardableResult @@ -45,12 +46,15 @@ enum VoiceWakeForwarder { options: ForwardOptions = ForwardOptions()) async -> Result { let payload = Self.prefixedTranscript(transcript) + let channel = options.channel.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() + let deliver = options.deliver && channel != "webchat" let result = await AgentRPC.shared.send( text: payload, thinking: options.thinking, - session: options.session, - deliver: options.deliver, - to: options.to) + sessionKey: options.sessionKey, + deliver: deliver, + to: options.to, + channel: channel) if result.ok { self.logger.info("voice wake forward ok") diff --git a/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift b/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift index e1737d652..6b7c4a475 100644 --- a/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift +++ b/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift @@ -28,8 +28,8 @@ public struct Hello: Codable { caps: [String]?, auth: [String: AnyCodable]?, locale: String?, - useragent: String?) - { + useragent: String? + ) { self.type = type self.minprotocol = minprotocol self.maxprotocol = maxprotocol @@ -39,7 +39,6 @@ public struct Hello: Codable { self.locale = locale self.useragent = useragent } - private enum CodingKeys: String, CodingKey { case type case minprotocol = "minProtocol" @@ -66,8 +65,8 @@ public struct HelloOk: Codable { server: [String: AnyCodable], features: [String: AnyCodable], snapshot: Snapshot, - policy: [String: AnyCodable]) - { + policy: [String: AnyCodable] + ) { self.type = type self._protocol = _protocol self.server = server @@ -75,7 +74,6 @@ public struct HelloOk: Codable { self.snapshot = snapshot self.policy = policy } - private enum CodingKeys: String, CodingKey { case type case _protocol = "protocol" @@ -96,14 +94,13 @@ public struct HelloError: Codable { type: String, reason: String, expectedprotocol: Int?, - minclient: String?) - { + minclient: String? + ) { self.type = type self.reason = reason self.expectedprotocol = expectedprotocol self.minclient = minclient } - private enum CodingKeys: String, CodingKey { case type case reason @@ -122,14 +119,13 @@ public struct RequestFrame: Codable { type: String, id: String, method: String, - params: AnyCodable?) - { + params: AnyCodable? + ) { self.type = type self.id = id self.method = method self.params = params } - private enum CodingKeys: String, CodingKey { case type case id @@ -150,15 +146,14 @@ public struct ResponseFrame: Codable { id: String, ok: Bool, payload: AnyCodable?, - error: [String: AnyCodable]?) - { + error: [String: AnyCodable]? + ) { self.type = type self.id = id self.ok = ok self.payload = payload self.error = error } - private enum CodingKeys: String, CodingKey { case type case id @@ -180,15 +175,14 @@ public struct EventFrame: Codable { event: String, payload: AnyCodable?, seq: Int?, - stateversion: [String: AnyCodable]?) - { + stateversion: [String: AnyCodable]? + ) { self.type = type self.event = event self.payload = payload self.seq = seq self.stateversion = stateversion } - private enum CodingKeys: String, CodingKey { case type case event @@ -220,8 +214,8 @@ public struct PresenceEntry: Codable { tags: [String]?, text: String?, ts: Int, - instanceid: String?) - { + instanceid: String? + ) { self.host = host self.ip = ip self.version = version @@ -233,7 +227,6 @@ public struct PresenceEntry: Codable { self.ts = ts self.instanceid = instanceid } - private enum CodingKeys: String, CodingKey { case host case ip @@ -254,12 +247,11 @@ public struct StateVersion: Codable { public init( presence: Int, - health: Int) - { + health: Int + ) { self.presence = presence self.health = health } - private enum CodingKeys: String, CodingKey { case presence case health @@ -276,14 +268,13 @@ public struct Snapshot: Codable { presence: [PresenceEntry], health: AnyCodable, stateversion: StateVersion, - uptimems: Int) - { + uptimems: Int + ) { self.presence = presence self.health = health self.stateversion = stateversion self.uptimems = uptimems } - private enum CodingKeys: String, CodingKey { case presence case health @@ -304,15 +295,14 @@ public struct ErrorShape: Codable { message: String, details: AnyCodable?, retryable: Bool?, - retryafterms: Int?) - { + retryafterms: Int? + ) { self.code = code self.message = message self.details = details self.retryable = retryable self.retryafterms = retryafterms } - private enum CodingKeys: String, CodingKey { case code case message @@ -334,15 +324,14 @@ public struct AgentEvent: Codable { seq: Int, stream: String, ts: Int, - data: [String: AnyCodable]) - { + data: [String: AnyCodable] + ) { self.runid = runid self.seq = seq self.stream = stream self.ts = ts self.data = data } - private enum CodingKeys: String, CodingKey { case runid = "runId" case seq @@ -364,15 +353,14 @@ public struct SendParams: Codable { message: String, mediaurl: String?, provider: String?, - idempotencykey: String) - { + idempotencykey: String + ) { self.to = to self.message = message self.mediaurl = mediaurl self.provider = provider self.idempotencykey = idempotencykey } - private enum CodingKeys: String, CodingKey { case to case message @@ -386,8 +374,10 @@ public struct AgentParams: Codable { public let message: String public let to: String? public let sessionid: String? + public let sessionkey: String? public let thinking: String? public let deliver: Bool? + public let channel: String? public let timeout: Int? public let idempotencykey: String @@ -395,40 +385,135 @@ public struct AgentParams: Codable { message: String, to: String?, sessionid: String?, + sessionkey: String?, thinking: String?, deliver: Bool?, + channel: String?, timeout: Int?, - idempotencykey: String) - { + idempotencykey: String + ) { self.message = message self.to = to self.sessionid = sessionid + self.sessionkey = sessionkey self.thinking = thinking self.deliver = deliver + self.channel = channel self.timeout = timeout self.idempotencykey = idempotencykey } - private enum CodingKeys: String, CodingKey { case message case to case sessionid = "sessionId" + case sessionkey = "sessionKey" case thinking case deliver + case channel case timeout case idempotencykey = "idempotencyKey" } } +public struct ChatHistoryParams: Codable { + public let sessionkey: String + + public init( + sessionkey: String + ) { + self.sessionkey = sessionkey + } + private enum CodingKeys: String, CodingKey { + case sessionkey = "sessionKey" + } +} + +public struct ChatSendParams: Codable { + public let sessionkey: String + public let message: String + public let thinking: String? + public let deliver: Bool? + public let attachments: [AnyCodable]? + public let timeoutms: Int? + public let idempotencykey: String + + public init( + sessionkey: String, + message: String, + thinking: String?, + deliver: Bool?, + attachments: [AnyCodable]?, + timeoutms: Int?, + idempotencykey: String + ) { + self.sessionkey = sessionkey + self.message = message + self.thinking = thinking + self.deliver = deliver + self.attachments = attachments + self.timeoutms = timeoutms + self.idempotencykey = idempotencykey + } + private enum CodingKeys: String, CodingKey { + case sessionkey = "sessionKey" + case message + case thinking + case deliver + case attachments + case timeoutms = "timeoutMs" + case idempotencykey = "idempotencyKey" + } +} + +public struct ChatEvent: Codable { + public let runid: String + public let sessionkey: String + public let seq: Int + public let state: AnyCodable + public let message: AnyCodable? + public let errormessage: String? + public let usage: AnyCodable? + public let stopreason: String? + + public init( + runid: String, + sessionkey: String, + seq: Int, + state: AnyCodable, + message: AnyCodable?, + errormessage: String?, + usage: AnyCodable?, + stopreason: String? + ) { + self.runid = runid + self.sessionkey = sessionkey + self.seq = seq + self.state = state + self.message = message + self.errormessage = errormessage + self.usage = usage + self.stopreason = stopreason + } + private enum CodingKeys: String, CodingKey { + case runid = "runId" + case sessionkey = "sessionKey" + case seq + case state + case message + case errormessage = "errorMessage" + case usage + case stopreason = "stopReason" + } +} + public struct TickEvent: Codable { public let ts: Int public init( - ts: Int) - { + ts: Int + ) { self.ts = ts } - private enum CodingKeys: String, CodingKey { case ts } @@ -440,12 +525,11 @@ public struct ShutdownEvent: Codable { public init( reason: String, - restartexpectedms: Int?) - { + restartexpectedms: Int? + ) { self.reason = reason self.restartexpectedms = restartexpectedms } - private enum CodingKeys: String, CodingKey { case reason case restartexpectedms = "restartExpectedMs" @@ -469,17 +553,17 @@ public enum GatewayFrame: Codable { } switch type { case "hello": - self = try .hello(Self.decodePayload(Hello.self, from: raw)) + self = .hello(try Self.decodePayload(Hello.self, from: raw)) case "hello-ok": - self = try .helloOk(Self.decodePayload(HelloOk.self, from: raw)) + self = .helloOk(try Self.decodePayload(HelloOk.self, from: raw)) case "hello-error": - self = try .helloError(Self.decodePayload(HelloError.self, from: raw)) + self = .helloError(try Self.decodePayload(HelloError.self, from: raw)) case "req": - self = try .req(Self.decodePayload(RequestFrame.self, from: raw)) + self = .req(try Self.decodePayload(RequestFrame.self, from: raw)) case "res": - self = try .res(Self.decodePayload(ResponseFrame.self, from: raw)) + self = .res(try Self.decodePayload(ResponseFrame.self, from: raw)) case "event": - self = try .event(Self.decodePayload(EventFrame.self, from: raw)) + self = .event(try Self.decodePayload(EventFrame.self, from: raw)) default: self = .unknown(type: type, raw: raw) } @@ -487,26 +571,23 @@ public enum GatewayFrame: Codable { public func encode(to encoder: Encoder) throws { switch self { - case let .hello(v): try v.encode(to: encoder) - case let .helloOk(v): try v.encode(to: encoder) - case let .helloError(v): try v.encode(to: encoder) - case let .req(v): try v.encode(to: encoder) - case let .res(v): try v.encode(to: encoder) - case let .event(v): try v.encode(to: encoder) - case let .unknown(_, raw): + case .hello(let v): try v.encode(to: encoder) + case .helloOk(let v): try v.encode(to: encoder) + case .helloError(let v): try v.encode(to: encoder) + case .req(let v): try v.encode(to: encoder) + case .res(let v): try v.encode(to: encoder) + case .event(let v): try v.encode(to: encoder) + case .unknown(_, let raw): var container = encoder.singleValueContainer() try container.encode(raw) } } + private static func decodePayload(_ type: T.Type, from raw: [String: AnyCodable]) throws -> T { - // Re-encode the already-decoded map using `JSONEncoder` instead of - // `JSONSerialization` because `AnyCodable` values are not bridged to - // Objective-C types and `JSONSerialization` throws an ObjC exception, - // crashing the app (seen on macOS 26.1). `JSONEncoder` understands - // `Encodable` values and stays in Swift land. - let data = try JSONEncoder().encode(raw) + let data = try JSONSerialization.data(withJSONObject: raw) let decoder = JSONDecoder() return try decoder.decode(T.self, from: data) } + } diff --git a/apps/macos/Sources/ClawdisProtocol/Protocol.swift b/apps/macos/Sources/ClawdisProtocol/Protocol.swift deleted file mode 100644 index 60841ecd4..000000000 --- a/apps/macos/Sources/ClawdisProtocol/Protocol.swift +++ /dev/null @@ -1,6 +0,0 @@ -// Legacy shim: Protocol definitions now live in GatewayModels.swift generated from TypeBox. -// Kept to satisfy existing project references. -import Foundation - -@available(*, deprecated, message: "Use GatewayModels.swift (GatewayFrame and payload structs)") -public enum LegacyProtocolShim {} diff --git a/apps/macos/Tests/ClawdisIPCTests/VoiceWakeForwarderTests.swift b/apps/macos/Tests/ClawdisIPCTests/VoiceWakeForwarderTests.swift index 13ef28f9e..1307888d5 100644 --- a/apps/macos/Tests/ClawdisIPCTests/VoiceWakeForwarderTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/VoiceWakeForwarderTests.swift @@ -13,9 +13,10 @@ import Testing @Test func forwardOptionsDefaults() { let opts = VoiceWakeForwarder.ForwardOptions() - #expect(opts.session == "main") + #expect(opts.sessionKey == "main") #expect(opts.thinking == "low") #expect(opts.deliver == true) #expect(opts.to == nil) + #expect(opts.channel == "last") } } diff --git a/dist/protocol.schema.json b/dist/protocol.schema.json index a1e1b542f..f22d0d7a9 100644 --- a/dist/protocol.schema.json +++ b/dist/protocol.schema.json @@ -1108,12 +1108,18 @@ "sessionId": { "type": "string" }, + "sessionKey": { + "type": "string" + }, "thinking": { "type": "string" }, "deliver": { "type": "boolean" }, + "channel": { + "type": "string" + }, "timeout": { "minimum": 0, "type": "integer" @@ -1128,6 +1134,104 @@ "idempotencyKey" ] }, + "ChatHistoryParams": { + "additionalProperties": false, + "type": "object", + "properties": { + "sessionKey": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "sessionKey" + ] + }, + "ChatSendParams": { + "additionalProperties": false, + "type": "object", + "properties": { + "sessionKey": { + "minLength": 1, + "type": "string" + }, + "message": { + "minLength": 1, + "type": "string" + }, + "thinking": { + "type": "string" + }, + "deliver": { + "type": "boolean" + }, + "attachments": { + "type": "array", + "items": {} + }, + "timeoutMs": { + "minimum": 0, + "type": "integer" + }, + "idempotencyKey": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "sessionKey", + "message", + "idempotencyKey" + ] + }, + "ChatEvent": { + "additionalProperties": false, + "type": "object", + "properties": { + "runId": { + "minLength": 1, + "type": "string" + }, + "sessionKey": { + "minLength": 1, + "type": "string" + }, + "seq": { + "minimum": 0, + "type": "integer" + }, + "state": { + "anyOf": [ + { + "const": "delta", + "type": "string" + }, + { + "const": "final", + "type": "string" + }, + { + "const": "error", + "type": "string" + } + ] + }, + "message": {}, + "errorMessage": { + "type": "string" + }, + "usage": {}, + "stopReason": { + "type": "string" + } + }, + "required": [ + "runId", + "sessionKey", + "seq", + "state" + ] + }, "TickEvent": { "additionalProperties": false, "type": "object", diff --git a/scripts/protocol-gen.ts b/scripts/protocol-gen.ts index f0629ae37..a915791a9 100644 --- a/scripts/protocol-gen.ts +++ b/scripts/protocol-gen.ts @@ -2,12 +2,6 @@ import { promises as fs } from "node:fs"; import path from "node:path"; import { fileURLToPath } from "node:url"; import { ProtocolSchemas } from "../src/gateway/protocol/schema.js"; -import { - InputData, - JSONSchemaInput, - JSONSchemaStore, - quicktype, -} from "quicktype-core"; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const repoRoot = path.resolve(__dirname, ".."); @@ -53,40 +47,8 @@ async function writeJsonSchema() { return { jsonSchemaPath, schemaString: JSON.stringify(rootSchema) }; } -async function writeSwiftModels(schemaString: string) { - const schemaInput = new JSONSchemaInput(new JSONSchemaStore()); - await schemaInput.addSource({ name: "ClawdisGateway", schema: schemaString }); - - const inputData = new InputData(); - inputData.addInput(schemaInput); - - const qtResult = await quicktype({ - inputData, - lang: "swift", - topLevel: "GatewayFrame", - rendererOptions: { - "struct-or-class": "struct", - "immutable-types": "true", - "accessLevel": "public", - }, - }); - - const swiftDir = path.join( - repoRoot, - "apps", - "macos", - "Sources", - "ClawdisProtocol", - ); - await fs.mkdir(swiftDir, { recursive: true }); - const swiftPath = path.join(swiftDir, "Protocol.swift"); - await fs.writeFile(swiftPath, qtResult.lines.join("\n")); - console.log(`wrote ${swiftPath}`); -} - async function main() { - const { schemaString } = await writeJsonSchema(); - await writeSwiftModels(schemaString); + await writeJsonSchema(); } main().catch((err) => { diff --git a/src/commands/agent.ts b/src/commands/agent.ts index aad1b3d46..9a709397b 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -1,4 +1,5 @@ import crypto from "node:crypto"; +import { chunkText } from "../auto-reply/chunk.js"; import { runCommandReply } from "../auto-reply/command-reply.js"; import { applyTemplate, @@ -36,6 +37,8 @@ type AgentCommandOpts = { timeout?: string; deliver?: boolean; surface?: string; + provider?: string; + bestEffortDeliver?: boolean; }; type SessionResolution = { @@ -369,11 +372,47 @@ export async function agentCommand( const payloads = result.payloads ?? []; const deliver = opts.deliver === true; - const targetTo = opts.to ? normalizeE164(opts.to) : allowFrom[0]; - if (deliver && !targetTo) { - throw new Error( - "Delivering to WhatsApp requires --to or inbound.allowFrom[0]", - ); + const bestEffortDeliver = opts.bestEffortDeliver === true; + const provider = (opts.provider ?? "whatsapp").toLowerCase(); + + const whatsappTarget = opts.to ? normalizeE164(opts.to) : allowFrom[0]; + const telegramTarget = opts.to?.trim() || undefined; + + const logDeliveryError = (err: unknown) => { + const message = `Delivery failed (${provider}): ${String(err)}`; + runtime.error?.(message); + if (!runtime.error) runtime.log(message); + }; + + if (deliver) { + if (provider === "whatsapp" && !whatsappTarget) { + const err = new Error( + "Delivering to WhatsApp requires --to or inbound.allowFrom[0]", + ); + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } + if (provider === "telegram" && !telegramTarget) { + const err = new Error("Delivering to Telegram requires --to "); + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } + if (provider === "webchat") { + const err = new Error( + "Delivering to WebChat is not supported via `clawdis agent`; use WebChat RPC instead.", + ); + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } + if ( + provider !== "whatsapp" && + provider !== "telegram" && + provider !== "webchat" + ) { + const err = new Error(`Unknown provider: ${provider}`); + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } } if (opts.json) { @@ -414,22 +453,55 @@ export async function agentCommand( runtime.log(lines.join("\n")); } - if (deliver && targetTo) { - const text = payload.text ?? ""; - const media = mediaList; - if (!text && media.length === 0) continue; + if (!deliver) continue; - const primaryMedia = media[0]; - await deps.sendMessageWhatsApp(targetTo, text, { - verbose: false, - mediaUrl: primaryMedia, - }); + const text = payload.text ?? ""; + const media = mediaList; + if (!text && media.length === 0) continue; - for (const extra of media.slice(1)) { - await deps.sendMessageWhatsApp(targetTo, "", { + if (provider === "whatsapp" && whatsappTarget) { + try { + const primaryMedia = media[0]; + await deps.sendMessageWhatsApp(whatsappTarget, text, { verbose: false, - mediaUrl: extra, + mediaUrl: primaryMedia, }); + + for (const extra of media.slice(1)) { + await deps.sendMessageWhatsApp(whatsappTarget, "", { + verbose: false, + mediaUrl: extra, + }); + } + } catch (err) { + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } + continue; + } + + if (provider === "telegram" && telegramTarget) { + try { + if (media.length === 0) { + for (const chunk of chunkText(text, 4000)) { + await deps.sendMessageTelegram(telegramTarget, chunk, { + verbose: false, + }); + } + } else { + let first = true; + for (const url of media) { + const caption = first ? text : ""; + first = false; + await deps.sendMessageTelegram(telegramTarget, caption, { + verbose: false, + mediaUrl: url, + }); + } + } + } catch (err) { + if (!bestEffortDeliver) throw err; + logDeliveryError(err); } } } diff --git a/src/config/sessions.ts b/src/config/sessions.ts index e5f16b754..e5223bdbf 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -1,3 +1,4 @@ +import crypto from "node:crypto"; import fs from "node:fs"; import os from "node:os"; import path from "node:path"; @@ -20,6 +21,8 @@ export type SessionEntry = { totalTokens?: number; model?: string; contextTokens?: number; + lastChannel?: "whatsapp" | "telegram" | "webchat"; + lastTo?: string; // Optional flag to mirror Mac app UI and future sync states. syncing?: boolean | string; }; @@ -66,6 +69,37 @@ export async function saveSessionStore( ); } +export async function updateLastRoute(params: { + storePath: string; + sessionKey: string; + channel: SessionEntry["lastChannel"]; + to?: string; +}) { + const { storePath, sessionKey, channel, to } = params; + const store = loadSessionStore(storePath); + const existing = store[sessionKey]; + const now = Date.now(); + const next: SessionEntry = { + sessionId: existing?.sessionId ?? crypto.randomUUID(), + updatedAt: Math.max(existing?.updatedAt ?? 0, now), + systemSent: existing?.systemSent, + abortedLastRun: existing?.abortedLastRun, + thinkingLevel: existing?.thinkingLevel, + verboseLevel: existing?.verboseLevel, + inputTokens: existing?.inputTokens, + outputTokens: existing?.outputTokens, + totalTokens: existing?.totalTokens, + model: existing?.model, + contextTokens: existing?.contextTokens, + syncing: existing?.syncing, + lastChannel: channel, + lastTo: to?.trim() ? to.trim() : undefined, + }; + store[sessionKey] = next; + await saveSessionStore(storePath, store); + return next; +} + // Decide which session bucket to use (per-sender vs global). export function deriveSessionKey(scope: SessionScope, ctx: MsgContext) { if (scope === "global") return "global"; diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index f4fddd402..e998ada14 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -211,8 +211,10 @@ export const AgentParamsSchema = Type.Object( message: NonEmptyString, to: Type.Optional(Type.String()), sessionId: Type.Optional(Type.String()), + sessionKey: Type.Optional(Type.String()), thinking: Type.Optional(Type.String()), deliver: Type.Optional(Type.Boolean()), + channel: Type.Optional(Type.String()), timeout: Type.Optional(Type.Integer({ minimum: 0 })), idempotencyKey: NonEmptyString, }, diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 3551d6784..9407a67fe 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -2,8 +2,8 @@ import { type AddressInfo, createServer } from "node:net"; import { describe, expect, test, vi } from "vitest"; import { WebSocket } from "ws"; import { emitAgentEvent } from "../infra/agent-events.js"; -import { startGatewayServer } from "./server.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; +import { startGatewayServer } from "./server.js"; vi.mock("../commands/health.js", () => ({ getHealthSnapshot: vi.fn().mockResolvedValue({ ok: true, stub: true }), @@ -35,7 +35,10 @@ async function getFreePort(): Promise { }); } -async function occupyPort(): Promise<{ server: ReturnType; port: number }> { +async function occupyPort(): Promise<{ + server: ReturnType; + port: number; +}> { return await new Promise((resolve, reject) => { const server = createServer(); server.once("error", reject); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 49f8b409f..43b3ca613 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -1,8 +1,11 @@ import { randomUUID } from "node:crypto"; import fs from "node:fs"; +import { + createServer as createHttpServer, + type Server as HttpServer, +} from "node:http"; import os from "node:os"; import path from "node:path"; -import { createServer as createHttpServer, type Server as HttpServer } from "node:http"; import chalk from "chalk"; import { type WebSocket, WebSocketServer } from "ws"; import { createDefaultDeps } from "../cli/deps.js"; @@ -850,7 +853,9 @@ export async function startGatewayServer( break; } } - const { storePath, store, entry } = loadSessionEntry(p.sessionKey); + const { cfg, storePath, store, entry } = loadSessionEntry( + p.sessionKey, + ); const now = Date.now(); const sessionId = entry?.sessionId ?? randomUUID(); const sessionEntry: SessionEntry = { @@ -859,7 +864,15 @@ export async function startGatewayServer( thinkingLevel: entry?.thinkingLevel, verboseLevel: entry?.verboseLevel, systemSent: entry?.systemSent, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, }; + const mainKey = + (cfg.inbound?.reply?.session?.mainKey ?? "main").trim() || "main"; + if (p.sessionKey === mainKey) { + sessionEntry.lastChannel = "webchat"; + delete sessionEntry.lastTo; + } if (store) { store[p.sessionKey] = sessionEntry; if (storePath) { @@ -1081,8 +1094,10 @@ export async function startGatewayServer( message: string; to?: string; sessionId?: string; + sessionKey?: string; thinking?: string; deliver?: boolean; + channel?: string; idempotencyKey: string; timeout?: number; }; @@ -1095,7 +1110,90 @@ export async function startGatewayServer( break; } const message = params.message.trim(); - const runId = params.sessionId || randomUUID(); + + const requestedSessionKey = + typeof params.sessionKey === "string" && params.sessionKey.trim() + ? params.sessionKey.trim() + : undefined; + let resolvedSessionId = params.sessionId?.trim() || undefined; + let sessionEntry: SessionEntry | undefined; + let bestEffortDeliver = false; + + if (requestedSessionKey) { + const { cfg, storePath, store, entry } = + loadSessionEntry(requestedSessionKey); + const now = Date.now(); + const sessionId = entry?.sessionId ?? randomUUID(); + sessionEntry = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + systemSent: entry?.systemSent, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + if (store) { + store[requestedSessionKey] = sessionEntry; + if (storePath) { + await saveSessionStore(storePath, store); + } + } + resolvedSessionId = sessionId; + const mainKey = + (cfg.inbound?.reply?.session?.mainKey ?? "main").trim() || + "main"; + if (requestedSessionKey === mainKey) { + chatRunSessions.set(sessionId, requestedSessionKey); + bestEffortDeliver = true; + } + } + + const runId = resolvedSessionId || randomUUID(); + + const requestedChannelRaw = + typeof params.channel === "string" ? params.channel.trim() : ""; + const requestedChannel = requestedChannelRaw + ? requestedChannelRaw.toLowerCase() + : "last"; + + const lastChannel = sessionEntry?.lastChannel; + const lastTo = + typeof sessionEntry?.lastTo === "string" + ? sessionEntry.lastTo.trim() + : ""; + + const resolvedChannel = (() => { + if (requestedChannel === "last") { + return lastChannel ?? "whatsapp"; + } + if ( + requestedChannel === "whatsapp" || + requestedChannel === "telegram" || + requestedChannel === "webchat" + ) { + return requestedChannel; + } + return lastChannel ?? "whatsapp"; + })(); + + const resolvedTo = (() => { + const explicit = + typeof params.to === "string" && params.to.trim() + ? params.to.trim() + : undefined; + if (explicit) return explicit; + if ( + resolvedChannel === "whatsapp" || + resolvedChannel === "telegram" + ) { + return lastTo || undefined; + } + return undefined; + })(); + + const deliver = + params.deliver === true && resolvedChannel !== "webchat"; // Acknowledge via event to avoid double res frames const ackEvent = { type: "event", @@ -1114,11 +1212,14 @@ export async function startGatewayServer( await agentCommand( { message, - to: params.to, - sessionId: params.sessionId, + to: resolvedTo, + sessionId: resolvedSessionId, thinking: params.thinking, - deliver: params.deliver, + deliver, + provider: resolvedChannel, timeout: params.timeout?.toString(), + bestEffortDeliver, + surface: "VoiceWake", }, defaultRuntime, deps, diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 5a42beedd..fadf2708e 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -10,6 +10,7 @@ import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { loadConfig } from "../config/config.js"; +import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; import { danger, logVerbose } from "../globals.js"; import { getChildLogger } from "../logging.js"; import { mediaKindFromMime } from "../media/constants.js"; @@ -145,6 +146,18 @@ export function createTelegramBot(opts: TelegramBotOptions) { MediaUrl: media?.path, }; + if (!isGroup) { + const sessionCfg = cfg.inbound?.reply?.session; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const storePath = resolveStorePath(sessionCfg?.store); + await updateLastRoute({ + storePath, + sessionKey: mainKey, + channel: "telegram", + to: String(chatId), + }); + } + if (logVerbose()) { const preview = body.slice(0, 200).replace(/\n/g, "\\n"); logVerbose( diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index c12ffdb88..aa84f1071 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -10,6 +10,7 @@ import { resolveSessionKey, resolveStorePath, saveSessionStore, + updateLastRoute, } from "../config/sessions.js"; import { danger, isVerbose, logVerbose, success } from "../globals.js"; import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; @@ -849,6 +850,23 @@ export async function monitorWebProvider( `\n[${tsDisplay}] ${fromDisplay} -> ${latest.to}: ${combinedBody}`, ); + if (latest.chatType !== "group") { + const sessionCfg = cfg.inbound?.reply?.session; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const storePath = resolveStorePath(sessionCfg?.store); + const to = latest.senderE164 + ? normalizeE164(latest.senderE164) + : jidToE164(latest.from); + if (to) { + await updateLastRoute({ + storePath, + sessionKey: mainKey, + channel: "whatsapp", + to, + }); + } + } + const replyResult = await (replyResolver ?? getReplyFromConfig)( { Body: combinedBody,