chatSocket.js 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. /**
  2. * 小程序 STOMP over WebSocket 客户端
  3. * 用于聊天实时通信,替代 HTTP 轮询
  4. */
  5. import { UPLOAD_URL } from './request';
  6. // WebSocket 使用与后端一致的基地址
  7. const BASE_URL = UPLOAD_URL;
  8. // 将 HTTP 地址转为 WS 地址
  9. function getWsUrl() {
  10. return BASE_URL.replace(/^http/, 'ws') + '/api/chat/ws/chat/websocket';
  11. }
  12. let socketTask = null;
  13. let isConnected = false;
  14. let subscriptionId = 0;
  15. const subscriptions = {}; // id -> { destination, callback }
  16. let reconnectCount = 0;
  17. const MAX_RECONNECT = 5;
  18. let reconnectTimer = null;
  19. let heartbeatTimer = null;
  20. let connectHeaders = {};
  21. let onConnectCallback = null;
  22. let onDisconnectCallback = null;
  23. /**
  24. * 建立 WebSocket 连接并完成 STOMP 握手
  25. * @param {Object} options
  26. * @param {string} options.token - 认证 token
  27. * @param {Function} options.onConnect - 连接成功回调
  28. * @param {Function} options.onDisconnect - 断开连接回调
  29. */
  30. export function connectChat(options = {}) {
  31. const { token, onConnect, onDisconnect } = options;
  32. onConnectCallback = onConnect || null;
  33. onDisconnectCallback = onDisconnect || null;
  34. connectHeaders = {};
  35. if (token) {
  36. connectHeaders['Authorization'] = 'Bearer ' + token;
  37. }
  38. const wsUrl = getWsUrl();
  39. console.log('[ChatSocket] 连接中...', wsUrl);
  40. socketTask = uni.connectSocket({
  41. url: wsUrl,
  42. header: {
  43. 'Authorization': connectHeaders['Authorization'] || ''
  44. },
  45. success: () => {
  46. console.log('[ChatSocket] 连接请求已发送');
  47. },
  48. fail: (err) => {
  49. console.error('[ChatSocket] 连接失败', err);
  50. scheduleReconnect();
  51. }
  52. });
  53. socketTask.onOpen(() => {
  54. console.log('[ChatSocket] WebSocket 已打开,发送 STOMP CONNECT');
  55. sendStompConnect();
  56. });
  57. socketTask.onMessage((res) => {
  58. handleStompFrame(res.data);
  59. });
  60. socketTask.onClose((res) => {
  61. console.log('[ChatSocket] WebSocket 已关闭', res);
  62. isConnected = false;
  63. stopHeartbeat();
  64. if (onDisconnectCallback) onDisconnectCallback();
  65. scheduleReconnect();
  66. });
  67. socketTask.onError((err) => {
  68. console.error('[ChatSocket] WebSocket 错误', err);
  69. isConnected = false;
  70. });
  71. }
  72. /**
  73. * 发送 STOMP CONNECT 帧
  74. */
  75. function sendStompConnect() {
  76. const headers = {
  77. 'accept-version': '1.1,1.0',
  78. 'heart-beat': '10000,10000',
  79. };
  80. if (connectHeaders['Authorization']) {
  81. headers['Authorization'] = connectHeaders['Authorization'];
  82. }
  83. sendFrame('CONNECT', headers, '');
  84. }
  85. /**
  86. * 处理收到的 STOMP 帧
  87. */
  88. function handleStompFrame(data) {
  89. if (!data || typeof data !== 'string') return;
  90. // STOMP 帧以 \n 分隔,可能一次收到多个帧或帧片段
  91. const frames = data.split('\0');
  92. for (const raw of frames) {
  93. if (!raw.trim()) continue;
  94. parseFrame(raw.trim());
  95. }
  96. }
  97. /**
  98. * 解析单个 STOMP 帧
  99. */
  100. function parseFrame(raw) {
  101. const lines = raw.split('\n');
  102. if (lines.length === 0) return;
  103. const command = lines[0].trim();
  104. const headers = {};
  105. let bodyStart = 0;
  106. // 解析头部
  107. for (let i = 1; i < lines.length; i++) {
  108. const line = lines[i];
  109. if (line === '') {
  110. bodyStart = i + 1;
  111. break;
  112. }
  113. const colonIdx = line.indexOf(':');
  114. if (colonIdx > 0) {
  115. headers[line.substring(0, colonIdx).trim()] = line.substring(colonIdx + 1).trim();
  116. }
  117. }
  118. // 解析 body
  119. const body = lines.slice(bodyStart).join('\n').replace(/\0$/, '').trim();
  120. switch (command) {
  121. case 'CONNECTED':
  122. console.log('[ChatSocket] STOMP 连接成功');
  123. isConnected = true;
  124. reconnectCount = 0;
  125. startHeartbeat();
  126. // 重新订阅
  127. Object.keys(subscriptions).forEach(id => {
  128. const sub = subscriptions[id];
  129. sendFrame('SUBSCRIBE', { id, destination: sub.destination, ack: 'auto' }, '');
  130. });
  131. if (onConnectCallback) onConnectCallback();
  132. break;
  133. case 'MESSAGE':
  134. handleStompMessage(headers, body);
  135. break;
  136. case 'RECEIPT':
  137. console.log('[ChatSocket] 收到回执:', headers['receipt-id']);
  138. break;
  139. case 'ERROR':
  140. console.error('[ChatSocket] STOMP 错误:', headers['message'], body);
  141. break;
  142. case 'HEARTBEAT':
  143. // 心跳响应,忽略
  144. break;
  145. default:
  146. // 可能是心跳 \n
  147. break;
  148. }
  149. }
  150. /**
  151. * 处理 STOMP MESSAGE 帧
  152. */
  153. function handleStompMessage(headers, body) {
  154. const destination = headers['destination'];
  155. if (!destination) return;
  156. let data = body;
  157. try {
  158. data = JSON.parse(body);
  159. } catch (e) {
  160. // 非 JSON,直接作为文本
  161. }
  162. // 匹配订阅
  163. Object.keys(subscriptions).forEach(id => {
  164. const sub = subscriptions[id];
  165. if (destination === sub.destination || destination.startsWith(sub.destination)) {
  166. sub.callback(data, headers);
  167. }
  168. });
  169. }
  170. /**
  171. * 订阅 STOMP 目的地
  172. * @param {string} destination - 订阅地址,如 /topic/session/123
  173. * @param {Function} callback - 消息回调
  174. * @returns {string} 订阅 ID
  175. */
  176. export function subscribe(destination, callback) {
  177. const id = 'sub-' + (++subscriptionId);
  178. subscriptions[id] = { destination, callback };
  179. if (isConnected) {
  180. sendFrame('SUBSCRIBE', { id, destination, ack: 'auto' }, '');
  181. }
  182. return id;
  183. }
  184. /**
  185. * 取消订阅
  186. * @param {string} id - 订阅 ID
  187. */
  188. export function unsubscribe(id) {
  189. if (subscriptions[id]) {
  190. if (isConnected) {
  191. sendFrame('UNSUBSCRIBE', { id }, '');
  192. }
  193. delete subscriptions[id];
  194. }
  195. }
  196. /**
  197. * 通过 WebSocket 发送聊天消息(STOMP SEND)
  198. * @param {number} sessionId - 会话 ID
  199. * @param {string} content - 消息内容
  200. * @param {string} msgType - 消息类型
  201. * @param {string} msgNo - 消息编号
  202. */
  203. export function sendTextByWs(sessionId, content, msgType = 'text', msgNo = '', senderId = null) {
  204. if (!isConnected) {
  205. console.warn('[ChatSocket] 未连接,无法发送消息');
  206. return false;
  207. }
  208. if (!msgNo) {
  209. msgNo = generateMsgNo();
  210. }
  211. const bodyObj = {
  212. msgType,
  213. sessionId,
  214. msgNo,
  215. content
  216. };
  217. // 小程序用户必须传 senderId,否则后端无法获取(非后台登录用户)
  218. if (senderId) {
  219. bodyObj.senderId = senderId;
  220. }
  221. const body = JSON.stringify(bodyObj);
  222. sendFrame('SEND', { destination: '/app/chat/send', 'content-type': 'application/json' }, body);
  223. return true;
  224. }
  225. /**
  226. * 断开连接
  227. */
  228. export function disconnectChat() {
  229. if (socketTask && isConnected) {
  230. sendFrame('DISCONNECT', { receipt: 'disconnect-' + Date.now() }, '');
  231. }
  232. clearAll();
  233. if (socketTask) {
  234. socketTask.close({});
  235. socketTask = null;
  236. }
  237. }
  238. /**
  239. * 清理所有状态
  240. */
  241. function clearAll() {
  242. isConnected = false;
  243. stopHeartbeat();
  244. if (reconnectTimer) {
  245. clearTimeout(reconnectTimer);
  246. reconnectTimer = null;
  247. }
  248. Object.keys(subscriptions).forEach(id => delete subscriptions[id]);
  249. }
  250. /**
  251. * 发送 STOMP 帧
  252. * @param {string} command - STOMP 命令
  253. * @param {Object} headers - 帧头
  254. * @param {string} body - 帧体
  255. */
  256. function sendFrame(command, headers, body) {
  257. if (!socketTask) return;
  258. let frame = command + '\n';
  259. if (headers) {
  260. Object.keys(headers).forEach(key => {
  261. frame += key + ':' + headers[key] + '\n';
  262. });
  263. }
  264. frame += '\n'; // 空行分隔 header 和 body
  265. if (body) {
  266. frame += body;
  267. }
  268. frame += '\0'; // NULL 终止符
  269. socketTask.send({
  270. data: frame,
  271. fail: (err) => {
  272. console.error('[ChatSocket] 发送帧失败:', command, err);
  273. }
  274. });
  275. }
  276. /**
  277. * 心跳机制
  278. */
  279. function startHeartbeat() {
  280. stopHeartbeat();
  281. heartbeatTimer = setInterval(() => {
  282. if (isConnected && socketTask) {
  283. // STOMP 心跳是 \n
  284. socketTask.send({
  285. data: '\n',
  286. fail: () => {}
  287. });
  288. }
  289. }, 10000);
  290. }
  291. function stopHeartbeat() {
  292. if (heartbeatTimer) {
  293. clearInterval(heartbeatTimer);
  294. heartbeatTimer = null;
  295. }
  296. }
  297. /**
  298. * 断线重连
  299. */
  300. function scheduleReconnect() {
  301. if (reconnectCount >= MAX_RECONNECT) {
  302. console.log('[ChatSocket] 达到最大重连次数,停止重连');
  303. return;
  304. }
  305. reconnectCount++;
  306. const delay = Math.min(3000 * reconnectCount, 15000);
  307. console.log(`[ChatSocket] ${delay}ms 后进行第 ${reconnectCount} 次重连...`);
  308. reconnectTimer = setTimeout(() => {
  309. const token = uni.getStorageSync('token');
  310. connectChat({
  311. token,
  312. onConnect: onConnectCallback,
  313. onDisconnect: onDisconnectCallback
  314. });
  315. }, delay);
  316. }
  317. /**
  318. * 生成消息编号
  319. */
  320. export function generateMsgNo() {
  321. return 'MSG_' + Date.now() + '_' + Math.random().toString(36).slice(2, 8);
  322. }
  323. /**
  324. * 检查是否已连接
  325. */
  326. export function isSocketConnected() {
  327. return isConnected;
  328. }