/** * QQ Bot WebSocket Monitor * * Handles WebSocket connection to QQ Bot Gateway for receiving events. */ import WebSocket from "ws"; import type { MoltbotConfig, MarkdownTableMode } from "clawdbot/plugin-sdk"; import type { ResolvedQQAccount } from "./accounts.js"; import { getAccessToken, getGatewayUrl, sendC2CMessage, sendGroupMessage, } from "./api.js"; import { getQQRuntime } from "./runtime.js"; import { chunkQQText } from "./send.js"; import { OpCode, EventType, DEFAULT_INTENTS, type GatewayPayload, type HelloData, type ReadyData, type QQMessageEvent, type IdentifyData, type ResumeData, } from "./types.js"; export interface QQMonitorOptions { account: ResolvedQQAccount; config: MoltbotConfig; abortSignal: AbortSignal; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number; sessionId?: string; }) => void; log?: { info: (msg: string) => void; error: (msg: string) => void; }; } export interface QQMonitorResult { stop: () => void; } const QQ_TEXT_LIMIT = 2000; const DEFAULT_MEDIA_MAX_MB = 5; type QQCoreRuntime = ReturnType; /** * Start QQ Bot WebSocket monitor */ export async function monitorQQProvider( options: QQMonitorOptions, ): Promise { const { account, config, abortSignal, statusSink, log } = options; if (!account.appId || !account.appSecret) { throw new Error("QQ appId and appSecret are required"); } const core = getQQRuntime(); let stopped = false; let ws: WebSocket | null = null; let heartbeatInterval: ReturnType | null = null; let sessionId: string | null = null; let lastSeq: number | null = null; let reconnectAttempts = 0; const maxReconnectAttempts = 10; const stop = () => { stopped = true; if (heartbeatInterval) { clearInterval(heartbeatInterval); heartbeatInterval = null; } if (ws) { ws.close(); ws = null; } }; abortSignal.addEventListener("abort", stop, { once: true }); const connect = async () => { if (stopped || abortSignal.aborted) return; try { const token = await getAccessToken(account.appId!, account.appSecret!); const gatewayUrl = await getGatewayUrl(token); log?.info(`[${account.accountId}] Connecting to gateway: ${gatewayUrl}`); ws = new WebSocket(gatewayUrl); ws.on("open", () => { log?.info(`[${account.accountId}] WebSocket connected`); reconnectAttempts = 0; }); ws.on("message", async (data) => { try { const payload = JSON.parse(data.toString()) as GatewayPayload; await handlePayload(payload, token); } catch (err) { log?.error( `[${account.accountId}] Failed to parse message: ${String(err)}`, ); } }); ws.on("close", (code, reason) => { log?.info( `[${account.accountId}] WebSocket closed: ${code} ${reason.toString()}`, ); if (heartbeatInterval) { clearInterval(heartbeatInterval); heartbeatInterval = null; } scheduleReconnect(); }); ws.on("error", (err) => { log?.error(`[${account.accountId}] WebSocket error: ${String(err)}`); }); } catch (err) { log?.error(`[${account.accountId}] Failed to connect: ${String(err)}`); scheduleReconnect(); } }; const scheduleReconnect = () => { if (stopped || abortSignal.aborted) return; if (reconnectAttempts >= maxReconnectAttempts) { log?.error(`[${account.accountId}] Max reconnect attempts reached`); return; } reconnectAttempts++; const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000); log?.info( `[${account.accountId}] Reconnecting in ${delay}ms (attempt ${reconnectAttempts})`, ); setTimeout(connect, delay); }; const handlePayload = async ( payload: GatewayPayload, token: string, ): Promise => { // Update sequence number if (payload.s !== undefined) { lastSeq = payload.s; } switch (payload.op) { case OpCode.Hello: { const hello = payload.d as HelloData; log?.info( `[${account.accountId}] Received Hello, heartbeat interval: ${hello.heartbeat_interval}ms`, ); // Start heartbeat if (heartbeatInterval) clearInterval(heartbeatInterval); heartbeatInterval = setInterval(() => { sendHeartbeat(); }, hello.heartbeat_interval); // Send Identify or Resume if (sessionId && lastSeq !== null) { sendResume(token); } else { sendIdentify(token); } break; } case OpCode.HeartbeatAck: // Heartbeat acknowledged break; case OpCode.Dispatch: statusSink?.({ lastInboundAt: Date.now() }); await handleEvent(payload.t!, payload.d, token); break; case OpCode.Reconnect: log?.info(`[${account.accountId}] Received Reconnect, reconnecting...`); ws?.close(); break; case OpCode.InvalidSession: { const resumable = payload.d as boolean; log?.info( `[${account.accountId}] Invalid session, resumable: ${resumable}`, ); if (!resumable) { sessionId = null; lastSeq = null; } // Wait a bit before reconnecting setTimeout(() => { if (resumable && sessionId) { sendResume(token); } else { sendIdentify(token); } }, 1000 + Math.random() * 4000); break; } } }; const sendHeartbeat = () => { if (!ws || ws.readyState !== WebSocket.OPEN) return; const payload: GatewayPayload = { op: OpCode.Heartbeat, d: lastSeq, }; ws.send(JSON.stringify(payload)); }; const sendIdentify = async (token: string) => { if (!ws || ws.readyState !== WebSocket.OPEN) return; const intents = account.config.intents ?? DEFAULT_INTENTS; const payload: GatewayPayload = { op: OpCode.Identify, d: { token: `QQBot ${token}`, intents, properties: { $os: "linux", $browser: "moltbot", $device: "moltbot", }, }, }; log?.info(`[${account.accountId}] Sending Identify with intents: ${intents}`); ws.send(JSON.stringify(payload)); }; const sendResume = (token: string) => { if (!ws || ws.readyState !== WebSocket.OPEN || !sessionId) return; const payload: GatewayPayload = { op: OpCode.Resume, d: { token: `QQBot ${token}`, session_id: sessionId, seq: lastSeq ?? 0, }, }; log?.info(`[${account.accountId}] Sending Resume for session: ${sessionId}`); ws.send(JSON.stringify(payload)); }; const handleEvent = async ( eventType: string, data: unknown, token: string, ): Promise => { switch (eventType) { case EventType.READY: { const ready = data as ReadyData; sessionId = ready.session_id; statusSink?.({ sessionId }); log?.info( `[${account.accountId}] Ready! Bot: ${ready.user.username} (${ready.user.id})`, ); break; } case EventType.RESUMED: log?.info(`[${account.accountId}] Session resumed`); break; case EventType.C2C_MESSAGE_CREATE: await handleC2CMessage(data as QQMessageEvent, token); break; case EventType.GROUP_AT_MESSAGE_CREATE: await handleGroupMessage(data as QQMessageEvent, token); break; case EventType.DIRECT_MESSAGE_CREATE: case EventType.AT_MESSAGE_CREATE: case EventType.MESSAGE_CREATE: // TODO: Implement channel message handling log?.info(`[${account.accountId}] Received channel event: ${eventType}`); break; default: log?.info(`[${account.accountId}] Unhandled event: ${eventType}`); } }; const handleC2CMessage = async ( message: QQMessageEvent, token: string, ): Promise => { const senderId = message.author.user_openid; if (!senderId) return; log?.info( `[${account.accountId}] C2C message from ${senderId}: ${message.content?.slice(0, 50)}`, ); await processMessageWithPipeline({ message, token, chatType: "c2c", chatId: senderId, senderId, isGroup: false, }); }; const handleGroupMessage = async ( message: QQMessageEvent, token: string, ): Promise => { const groupId = message.group_openid; const senderId = message.author.member_openid; if (!groupId || !senderId) return; log?.info( `[${account.accountId}] Group message in ${groupId} from ${senderId}: ${message.content?.slice(0, 50)}`, ); await processMessageWithPipeline({ message, token, chatType: "group", chatId: groupId, senderId, isGroup: true, }); }; const processMessageWithPipeline = async (params: { message: QQMessageEvent; token: string; chatType: "c2c" | "group"; chatId: string; senderId: string; isGroup: boolean; }): Promise => { const { message, token, chatType, chatId, senderId, isGroup } = params; const rawBody = message.content?.trim() || ""; if (!rawBody) return; const dmPolicy = account.config.dmPolicy ?? "open"; const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v), ); // Check authorization for DMs if (!isGroup && dmPolicy !== "open") { const allowed = configAllowFrom.includes("*") || configAllowFrom.some( (entry) => entry.toLowerCase() === senderId.toLowerCase(), ); if (!allowed) { if (dmPolicy === "pairing") { const { code, created } = await core.channel.pairing.upsertPairingRequest({ channel: "qq", id: senderId, meta: {}, }); if (created) { log?.info(`[${account.accountId}] Pairing request from ${senderId}`); const replyText = core.channel.pairing.buildPairingReply({ channel: "qq", idLine: `Your QQ OpenID: ${senderId}`, code, }); await sendC2CMessage(token, senderId, { content: replyText, msg_type: 0, msg_id: message.id, }); statusSink?.({ lastOutboundAt: Date.now() }); } } return; } } // Resolve agent route const route = core.channel.routing.resolveAgentRoute({ cfg: config, channel: "qq", accountId: account.accountId, peer: { kind: isGroup ? "group" : "dm", id: chatId, }, }); const fromLabel = isGroup ? `group:${chatId}` : `user:${senderId}`; const storePath = core.channel.session.resolveStorePath(config.session?.store, { agentId: route.agentId, }); const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(config); const previousTimestamp = core.channel.session.readSessionUpdatedAt({ storePath, sessionKey: route.sessionKey, }); const body = core.channel.reply.formatAgentEnvelope({ channel: "QQ", from: fromLabel, timestamp: new Date(message.timestamp).getTime(), previousTimestamp, envelope: envelopeOptions, body: rawBody, }); const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: body, RawBody: rawBody, CommandBody: rawBody, From: isGroup ? `qq:group:${chatId}` : `qq:${senderId}`, To: `qq:${chatId}`, SessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", ConversationLabel: fromLabel, SenderId: senderId, Provider: "qq", Surface: "qq", MessageSid: message.id, OriginatingChannel: "qq", OriginatingTo: `qq:${chatId}`, }); await core.channel.session.recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, onRecordError: (err) => { log?.error(`[${account.accountId}] Failed updating session: ${String(err)}`); }, }); const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg: config, channel: "qq", accountId: account.accountId, }); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config, dispatcherOptions: { deliver: async (payload) => { await deliverQQReply({ payload, token, chatType, chatId, msgId: message.id, tableMode, }); }, onError: (err, info) => { log?.error( `[${account.accountId}] QQ ${info.kind} reply failed: ${String(err)}`, ); }, }, }); }; const deliverQQReply = async (params: { payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string }; token: string; chatType: "c2c" | "group"; chatId: string; msgId?: string; tableMode?: MarkdownTableMode; }): Promise => { const { payload, token, chatType, chatId, msgId } = params; const tableMode = params.tableMode ?? "code"; const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode); if (!text) return; const chunkMode = core.channel.text.resolveChunkMode(config, "qq", account.accountId); const chunks = core.channel.text.chunkMarkdownTextWithMode( text, QQ_TEXT_LIMIT, chunkMode, ); const sendFn = chatType === "c2c" ? sendC2CMessage : sendGroupMessage; for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]; try { await sendFn(token, chatId, { content: chunk, msg_type: 0, msg_id: i === 0 ? msgId : undefined, // Only use msg_id for first chunk msg_seq: i + 1, }); statusSink?.({ lastOutboundAt: Date.now() }); } catch (err) { log?.error(`[${account.accountId}] QQ message send failed: ${String(err)}`); } } }; // Start initial connection await connect(); return { stop }; }