| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- /**
- * 小程序 STOMP over WebSocket 客户端
- * 用于聊天实时通信,替代 HTTP 轮询
- */
- import { UPLOAD_URL } from './request';
- // WebSocket 使用与后端一致的基地址
- const BASE_URL = UPLOAD_URL;
- // 将 HTTP 地址转为 WS 地址
- function getWsUrl() {
- return BASE_URL.replace(/^http/, 'ws') + '/api/chat/ws/chat/websocket';
- }
- let socketTask = null;
- let isConnected = false;
- let subscriptionId = 0;
- const subscriptions = {}; // id -> { destination, callback }
- let reconnectCount = 0;
- const MAX_RECONNECT = 5;
- let reconnectTimer = null;
- let heartbeatTimer = null;
- let connectHeaders = {};
- let onConnectCallback = null;
- let onDisconnectCallback = null;
- /**
- * 建立 WebSocket 连接并完成 STOMP 握手
- * @param {Object} options
- * @param {string} options.token - 认证 token
- * @param {Function} options.onConnect - 连接成功回调
- * @param {Function} options.onDisconnect - 断开连接回调
- */
- export function connectChat(options = {}) {
- const { token, onConnect, onDisconnect } = options;
- onConnectCallback = onConnect || null;
- onDisconnectCallback = onDisconnect || null;
- connectHeaders = {};
- if (token) {
- connectHeaders['Authorization'] = 'Bearer ' + token;
- }
- const wsUrl = getWsUrl();
- console.log('[ChatSocket] 连接中...', wsUrl);
- socketTask = uni.connectSocket({
- url: wsUrl,
- header: {
- 'Authorization': connectHeaders['Authorization'] || ''
- },
- success: () => {
- console.log('[ChatSocket] 连接请求已发送');
- },
- fail: (err) => {
- console.error('[ChatSocket] 连接失败', err);
- scheduleReconnect();
- }
- });
- socketTask.onOpen(() => {
- console.log('[ChatSocket] WebSocket 已打开,发送 STOMP CONNECT');
- sendStompConnect();
- });
- socketTask.onMessage((res) => {
- handleStompFrame(res.data);
- });
- socketTask.onClose((res) => {
- console.log('[ChatSocket] WebSocket 已关闭', res);
- isConnected = false;
- stopHeartbeat();
- if (onDisconnectCallback) onDisconnectCallback();
- scheduleReconnect();
- });
- socketTask.onError((err) => {
- console.error('[ChatSocket] WebSocket 错误', err);
- isConnected = false;
- });
- }
- /**
- * 发送 STOMP CONNECT 帧
- */
- function sendStompConnect() {
- const headers = {
- 'accept-version': '1.1,1.0',
- 'heart-beat': '10000,10000',
- };
- if (connectHeaders['Authorization']) {
- headers['Authorization'] = connectHeaders['Authorization'];
- }
- sendFrame('CONNECT', headers, '');
- }
- /**
- * 处理收到的 STOMP 帧
- */
- function handleStompFrame(data) {
- if (!data || typeof data !== 'string') return;
- // STOMP 帧以 \n 分隔,可能一次收到多个帧或帧片段
- const frames = data.split('\0');
- for (const raw of frames) {
- if (!raw.trim()) continue;
- parseFrame(raw.trim());
- }
- }
- /**
- * 解析单个 STOMP 帧
- */
- function parseFrame(raw) {
- const lines = raw.split('\n');
- if (lines.length === 0) return;
- const command = lines[0].trim();
- const headers = {};
- let bodyStart = 0;
- // 解析头部
- for (let i = 1; i < lines.length; i++) {
- const line = lines[i];
- if (line === '') {
- bodyStart = i + 1;
- break;
- }
- const colonIdx = line.indexOf(':');
- if (colonIdx > 0) {
- headers[line.substring(0, colonIdx).trim()] = line.substring(colonIdx + 1).trim();
- }
- }
- // 解析 body
- const body = lines.slice(bodyStart).join('\n').replace(/\0$/, '').trim();
- switch (command) {
- case 'CONNECTED':
- console.log('[ChatSocket] STOMP 连接成功');
- isConnected = true;
- reconnectCount = 0;
- startHeartbeat();
- // 重新订阅
- Object.keys(subscriptions).forEach(id => {
- const sub = subscriptions[id];
- sendFrame('SUBSCRIBE', { id, destination: sub.destination, ack: 'auto' }, '');
- });
- if (onConnectCallback) onConnectCallback();
- break;
- case 'MESSAGE':
- handleStompMessage(headers, body);
- break;
- case 'RECEIPT':
- console.log('[ChatSocket] 收到回执:', headers['receipt-id']);
- break;
- case 'ERROR':
- console.error('[ChatSocket] STOMP 错误:', headers['message'], body);
- break;
- case 'HEARTBEAT':
- // 心跳响应,忽略
- break;
- default:
- // 可能是心跳 \n
- break;
- }
- }
- /**
- * 处理 STOMP MESSAGE 帧
- */
- function handleStompMessage(headers, body) {
- const destination = headers['destination'];
- if (!destination) return;
- let data = body;
- try {
- data = JSON.parse(body);
- } catch (e) {
- // 非 JSON,直接作为文本
- }
- // 匹配订阅
- Object.keys(subscriptions).forEach(id => {
- const sub = subscriptions[id];
- if (destination === sub.destination || destination.startsWith(sub.destination)) {
- sub.callback(data, headers);
- }
- });
- }
- /**
- * 订阅 STOMP 目的地
- * @param {string} destination - 订阅地址,如 /topic/session/123
- * @param {Function} callback - 消息回调
- * @returns {string} 订阅 ID
- */
- export function subscribe(destination, callback) {
- const id = 'sub-' + (++subscriptionId);
- subscriptions[id] = { destination, callback };
- if (isConnected) {
- sendFrame('SUBSCRIBE', { id, destination, ack: 'auto' }, '');
- }
- return id;
- }
- /**
- * 取消订阅
- * @param {string} id - 订阅 ID
- */
- export function unsubscribe(id) {
- if (subscriptions[id]) {
- if (isConnected) {
- sendFrame('UNSUBSCRIBE', { id }, '');
- }
- delete subscriptions[id];
- }
- }
- /**
- * 通过 WebSocket 发送聊天消息(STOMP SEND)
- * @param {number} sessionId - 会话 ID
- * @param {string} content - 消息内容
- * @param {string} msgType - 消息类型
- * @param {string} msgNo - 消息编号
- */
- export function sendTextByWs(sessionId, content, msgType = 'text', msgNo = '', senderId = null) {
- if (!isConnected) {
- console.warn('[ChatSocket] 未连接,无法发送消息');
- return false;
- }
- if (!msgNo) {
- msgNo = generateMsgNo();
- }
- const bodyObj = {
- msgType,
- sessionId,
- msgNo,
- content
- };
- // 小程序用户必须传 senderId,否则后端无法获取(非后台登录用户)
- if (senderId) {
- bodyObj.senderId = senderId;
- }
- const body = JSON.stringify(bodyObj);
- sendFrame('SEND', { destination: '/app/chat/send', 'content-type': 'application/json' }, body);
- return true;
- }
- /**
- * 断开连接
- */
- export function disconnectChat() {
- if (socketTask && isConnected) {
- sendFrame('DISCONNECT', { receipt: 'disconnect-' + Date.now() }, '');
- }
- clearAll();
- if (socketTask) {
- socketTask.close({});
- socketTask = null;
- }
- }
- /**
- * 清理所有状态
- */
- function clearAll() {
- isConnected = false;
- stopHeartbeat();
- if (reconnectTimer) {
- clearTimeout(reconnectTimer);
- reconnectTimer = null;
- }
- Object.keys(subscriptions).forEach(id => delete subscriptions[id]);
- }
- /**
- * 发送 STOMP 帧
- * @param {string} command - STOMP 命令
- * @param {Object} headers - 帧头
- * @param {string} body - 帧体
- */
- function sendFrame(command, headers, body) {
- if (!socketTask) return;
- let frame = command + '\n';
- if (headers) {
- Object.keys(headers).forEach(key => {
- frame += key + ':' + headers[key] + '\n';
- });
- }
- frame += '\n'; // 空行分隔 header 和 body
- if (body) {
- frame += body;
- }
- frame += '\0'; // NULL 终止符
- socketTask.send({
- data: frame,
- fail: (err) => {
- console.error('[ChatSocket] 发送帧失败:', command, err);
- }
- });
- }
- /**
- * 心跳机制
- */
- function startHeartbeat() {
- stopHeartbeat();
- heartbeatTimer = setInterval(() => {
- if (isConnected && socketTask) {
- // STOMP 心跳是 \n
- socketTask.send({
- data: '\n',
- fail: () => {}
- });
- }
- }, 10000);
- }
- function stopHeartbeat() {
- if (heartbeatTimer) {
- clearInterval(heartbeatTimer);
- heartbeatTimer = null;
- }
- }
- /**
- * 断线重连
- */
- function scheduleReconnect() {
- if (reconnectCount >= MAX_RECONNECT) {
- console.log('[ChatSocket] 达到最大重连次数,停止重连');
- return;
- }
- reconnectCount++;
- const delay = Math.min(3000 * reconnectCount, 15000);
- console.log(`[ChatSocket] ${delay}ms 后进行第 ${reconnectCount} 次重连...`);
- reconnectTimer = setTimeout(() => {
- const token = uni.getStorageSync('token');
- connectChat({
- token,
- onConnect: onConnectCallback,
- onDisconnect: onDisconnectCallback
- });
- }, delay);
- }
- /**
- * 生成消息编号
- */
- export function generateMsgNo() {
- return 'MSG_' + Date.now() + '_' + Math.random().toString(36).slice(2, 8);
- }
- /**
- * 检查是否已连接
- */
- export function isSocketConnected() {
- return isConnected;
- }
|