/** * 小程序 STOMP over WebSocket 客户端 * 用于聊天实时通信,替代 HTTP 轮询 */ import { UPLOAD_URL } from './request'; // WebSocket 使用与后端一致的基地址 const BASE_URL = UPLOAD_URL; // 将 HTTP 地址转为 WS 地址 function getWsUrl() { return BASE_URL.replace(/^http/, 'ws') + '/api/chat/ws/chat/websocket'; } let socketTask = null; let isConnected = false; let subscriptionId = 0; const subscriptions = {}; // id -> { destination, callback } let reconnectCount = 0; const MAX_RECONNECT = 5; let reconnectTimer = null; let heartbeatTimer = null; let connectHeaders = {}; let onConnectCallback = null; let onDisconnectCallback = null; /** * 建立 WebSocket 连接并完成 STOMP 握手 * @param {Object} options * @param {string} options.token - 认证 token * @param {Function} options.onConnect - 连接成功回调 * @param {Function} options.onDisconnect - 断开连接回调 */ export function connectChat(options = {}) { const { token, onConnect, onDisconnect } = options; onConnectCallback = onConnect || null; onDisconnectCallback = onDisconnect || null; connectHeaders = {}; if (token) { connectHeaders['Authorization'] = 'Bearer ' + token; } const wsUrl = getWsUrl(); console.log('[ChatSocket] 连接中...', wsUrl); socketTask = uni.connectSocket({ url: wsUrl, header: { 'Authorization': connectHeaders['Authorization'] || '' }, success: () => { console.log('[ChatSocket] 连接请求已发送'); }, fail: (err) => { console.error('[ChatSocket] 连接失败', err); scheduleReconnect(); } }); socketTask.onOpen(() => { console.log('[ChatSocket] WebSocket 已打开,发送 STOMP CONNECT'); sendStompConnect(); }); socketTask.onMessage((res) => { handleStompFrame(res.data); }); socketTask.onClose((res) => { console.log('[ChatSocket] WebSocket 已关闭', res); isConnected = false; stopHeartbeat(); if (onDisconnectCallback) onDisconnectCallback(); scheduleReconnect(); }); socketTask.onError((err) => { console.error('[ChatSocket] WebSocket 错误', err); isConnected = false; }); } /** * 发送 STOMP CONNECT 帧 */ function sendStompConnect() { const headers = { 'accept-version': '1.1,1.0', 'heart-beat': '10000,10000', }; if (connectHeaders['Authorization']) { headers['Authorization'] = connectHeaders['Authorization']; } sendFrame('CONNECT', headers, ''); } /** * 处理收到的 STOMP 帧 */ function handleStompFrame(data) { if (!data || typeof data !== 'string') return; // STOMP 帧以 \n 分隔,可能一次收到多个帧或帧片段 const frames = data.split('\0'); for (const raw of frames) { if (!raw.trim()) continue; parseFrame(raw.trim()); } } /** * 解析单个 STOMP 帧 */ function parseFrame(raw) { const lines = raw.split('\n'); if (lines.length === 0) return; const command = lines[0].trim(); const headers = {}; let bodyStart = 0; // 解析头部 for (let i = 1; i < lines.length; i++) { const line = lines[i]; if (line === '') { bodyStart = i + 1; break; } const colonIdx = line.indexOf(':'); if (colonIdx > 0) { headers[line.substring(0, colonIdx).trim()] = line.substring(colonIdx + 1).trim(); } } // 解析 body const body = lines.slice(bodyStart).join('\n').replace(/\0$/, '').trim(); switch (command) { case 'CONNECTED': console.log('[ChatSocket] STOMP 连接成功'); isConnected = true; reconnectCount = 0; startHeartbeat(); // 重新订阅 Object.keys(subscriptions).forEach(id => { const sub = subscriptions[id]; sendFrame('SUBSCRIBE', { id, destination: sub.destination, ack: 'auto' }, ''); }); if (onConnectCallback) onConnectCallback(); break; case 'MESSAGE': handleStompMessage(headers, body); break; case 'RECEIPT': console.log('[ChatSocket] 收到回执:', headers['receipt-id']); break; case 'ERROR': console.error('[ChatSocket] STOMP 错误:', headers['message'], body); break; case 'HEARTBEAT': // 心跳响应,忽略 break; default: // 可能是心跳 \n break; } } /** * 处理 STOMP MESSAGE 帧 */ function handleStompMessage(headers, body) { const destination = headers['destination']; if (!destination) return; let data = body; try { data = JSON.parse(body); } catch (e) { // 非 JSON,直接作为文本 } // 匹配订阅 Object.keys(subscriptions).forEach(id => { const sub = subscriptions[id]; if (destination === sub.destination || destination.startsWith(sub.destination)) { sub.callback(data, headers); } }); } /** * 订阅 STOMP 目的地 * @param {string} destination - 订阅地址,如 /topic/session/123 * @param {Function} callback - 消息回调 * @returns {string} 订阅 ID */ export function subscribe(destination, callback) { const id = 'sub-' + (++subscriptionId); subscriptions[id] = { destination, callback }; if (isConnected) { sendFrame('SUBSCRIBE', { id, destination, ack: 'auto' }, ''); } return id; } /** * 取消订阅 * @param {string} id - 订阅 ID */ export function unsubscribe(id) { if (subscriptions[id]) { if (isConnected) { sendFrame('UNSUBSCRIBE', { id }, ''); } delete subscriptions[id]; } } /** * 通过 WebSocket 发送聊天消息(STOMP SEND) * @param {number} sessionId - 会话 ID * @param {string} content - 消息内容 * @param {string} msgType - 消息类型 * @param {string} msgNo - 消息编号 */ export function sendTextByWs(sessionId, content, msgType = 'text', msgNo = '', senderId = null) { if (!isConnected) { console.warn('[ChatSocket] 未连接,无法发送消息'); return false; } if (!msgNo) { msgNo = generateMsgNo(); } const bodyObj = { msgType, sessionId, msgNo, content }; // 小程序用户必须传 senderId,否则后端无法获取(非后台登录用户) if (senderId) { bodyObj.senderId = senderId; } const body = JSON.stringify(bodyObj); sendFrame('SEND', { destination: '/app/chat/send', 'content-type': 'application/json' }, body); return true; } /** * 断开连接 */ export function disconnectChat() { if (socketTask && isConnected) { sendFrame('DISCONNECT', { receipt: 'disconnect-' + Date.now() }, ''); } clearAll(); if (socketTask) { socketTask.close({}); socketTask = null; } } /** * 清理所有状态 */ function clearAll() { isConnected = false; stopHeartbeat(); if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } Object.keys(subscriptions).forEach(id => delete subscriptions[id]); } /** * 发送 STOMP 帧 * @param {string} command - STOMP 命令 * @param {Object} headers - 帧头 * @param {string} body - 帧体 */ function sendFrame(command, headers, body) { if (!socketTask) return; let frame = command + '\n'; if (headers) { Object.keys(headers).forEach(key => { frame += key + ':' + headers[key] + '\n'; }); } frame += '\n'; // 空行分隔 header 和 body if (body) { frame += body; } frame += '\0'; // NULL 终止符 socketTask.send({ data: frame, fail: (err) => { console.error('[ChatSocket] 发送帧失败:', command, err); } }); } /** * 心跳机制 */ function startHeartbeat() { stopHeartbeat(); heartbeatTimer = setInterval(() => { if (isConnected && socketTask) { // STOMP 心跳是 \n socketTask.send({ data: '\n', fail: () => {} }); } }, 10000); } function stopHeartbeat() { if (heartbeatTimer) { clearInterval(heartbeatTimer); heartbeatTimer = null; } } /** * 断线重连 */ function scheduleReconnect() { if (reconnectCount >= MAX_RECONNECT) { console.log('[ChatSocket] 达到最大重连次数,停止重连'); return; } reconnectCount++; const delay = Math.min(3000 * reconnectCount, 15000); console.log(`[ChatSocket] ${delay}ms 后进行第 ${reconnectCount} 次重连...`); reconnectTimer = setTimeout(() => { const token = uni.getStorageSync('token'); connectChat({ token, onConnect: onConnectCallback, onDisconnect: onDisconnectCallback }); }, delay); } /** * 生成消息编号 */ export function generateMsgNo() { return 'MSG_' + Date.now() + '_' + Math.random().toString(36).slice(2, 8); } /** * 检查是否已连接 */ export function isSocketConnected() { return isConnected; }