chatSocket.js 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. /**
  2. * 小程序 STOMP over WebSocket 客户端
  3. * 用于聊天实时通信,替代 HTTP 轮询
  4. */
  5. import { UPLOAD_URL } from './request';
  6. // WebSocket 使用与后端一致的基地址
  7. const BASE_URL = UPLOAD_URL;
  8. // 将 HTTP 地址转为 WS 地址
  9. function getWsUrl() {
  10. return BASE_URL.replace(/^http/, 'ws') + '/api/chat/ws/chat/websocket';
  11. }
  12. let socketTask = null;
  13. let isConnected = false;
  14. let subscriptionId = 0;
  15. const subscriptions = {}; // id -> { destination, callback }
  16. let reconnectCount = 0;
  17. const MAX_RECONNECT = 5;
  18. let reconnectTimer = null;
  19. let heartbeatTimer = null;
  20. let connectHeaders = {};
  21. let onConnectCallback = null;
  22. let onDisconnectCallback = null;
  23. /**
  24. * 建立 WebSocket 连接并完成 STOMP 握手
  25. * @param {Object} options
  26. * @param {string} options.token - 认证 token
  27. * @param {Function} options.onConnect - 连接成功回调
  28. * @param {Function} options.onDisconnect - 断开连接回调
  29. */
  30. export function connectChat(options = {}) {
  31. const { token, onConnect, onDisconnect } = options;
  32. onConnectCallback = onConnect || null;
  33. onDisconnectCallback = onDisconnect || null;
  34. connectHeaders = {};
  35. // 清除待执行的重连定时器,避免重复连接
  36. if (reconnectTimer) {
  37. clearTimeout(reconnectTimer);
  38. reconnectTimer = null;
  39. }
  40. if (token) {
  41. connectHeaders['Authorization'] = 'Bearer ' + token;
  42. }
  43. const wsUrl = getWsUrl();
  44. console.log('[ChatSocket] 连接中...', wsUrl);
  45. socketTask = uni.connectSocket({
  46. url: wsUrl,
  47. header: {
  48. 'Authorization': connectHeaders['Authorization'] || ''
  49. },
  50. success: () => {
  51. console.log('[ChatSocket] 连接请求已发送');
  52. },
  53. fail: (err) => {
  54. console.error('[ChatSocket] 连接失败', err);
  55. scheduleReconnect();
  56. }
  57. });
  58. socketTask.onOpen(() => {
  59. console.log('[ChatSocket] WebSocket 已打开,发送 STOMP CONNECT');
  60. sendStompConnect();
  61. });
  62. socketTask.onMessage((res) => {
  63. handleStompFrame(res.data);
  64. });
  65. socketTask.onClose((res) => {
  66. console.log('[ChatSocket] WebSocket 已关闭', res);
  67. isConnected = false;
  68. stopHeartbeat();
  69. if (onDisconnectCallback) onDisconnectCallback();
  70. scheduleReconnect();
  71. });
  72. socketTask.onError((err) => {
  73. console.error('[ChatSocket] WebSocket 错误', err);
  74. isConnected = false;
  75. });
  76. }
  77. /**
  78. * 发送 STOMP CONNECT 帧
  79. */
  80. function sendStompConnect() {
  81. const headers = {
  82. 'accept-version': '1.1,1.0',
  83. 'heart-beat': '10000,10000',
  84. };
  85. if (connectHeaders['Authorization']) {
  86. headers['Authorization'] = connectHeaders['Authorization'];
  87. }
  88. sendFrame('CONNECT', headers, '');
  89. }
  90. /**
  91. * 处理收到的 STOMP 帧
  92. */
  93. function handleStompFrame(data) {
  94. if (!data) return;
  95. // 微信小程序 WebSocket 可能返回 ArrayBuffer
  96. let text;
  97. if (typeof data === 'string') {
  98. text = data;
  99. } else if (data instanceof ArrayBuffer) {
  100. // ArrayBuffer → 字符串(使用 Uint8Array 兼容小程序环境)
  101. const uint8 = new Uint8Array(data);
  102. text = '';
  103. for (let i = 0; i < uint8.byteLength; i++) {
  104. text += String.fromCharCode(uint8[i]);
  105. }
  106. } else {
  107. return;
  108. }
  109. // STOMP 帧以 NULL 字符结尾
  110. const frames = text.split('\0');
  111. for (const raw of frames) {
  112. if (!raw.trim()) continue;
  113. parseFrame(raw.trim());
  114. }
  115. }
  116. /**
  117. * 解析单个 STOMP 帧
  118. */
  119. function parseFrame(raw) {
  120. const lines = raw.split('\n');
  121. if (lines.length === 0) return;
  122. const command = lines[0].trim();
  123. const headers = {};
  124. let bodyStart = 0;
  125. // 解析头部
  126. for (let i = 1; i < lines.length; i++) {
  127. const line = lines[i];
  128. if (line === '') {
  129. bodyStart = i + 1;
  130. break;
  131. }
  132. const colonIdx = line.indexOf(':');
  133. if (colonIdx > 0) {
  134. headers[line.substring(0, colonIdx).trim()] = line.substring(colonIdx + 1).trim();
  135. }
  136. }
  137. // 解析 body
  138. const body = lines.slice(bodyStart).join('\n').replace(/\0$/, '').trim();
  139. switch (command) {
  140. case 'CONNECTED':
  141. console.log('[ChatSocket] STOMP 连接成功');
  142. isConnected = true;
  143. reconnectCount = 0;
  144. startHeartbeat();
  145. // 重新订阅
  146. Object.keys(subscriptions).forEach(id => {
  147. const sub = subscriptions[id];
  148. sendFrame('SUBSCRIBE', { id, destination: sub.destination, ack: 'auto' }, '');
  149. });
  150. if (onConnectCallback) onConnectCallback();
  151. break;
  152. case 'MESSAGE':
  153. handleStompMessage(headers, body);
  154. break;
  155. case 'RECEIPT':
  156. console.log('[ChatSocket] 收到回执:', headers['receipt-id']);
  157. break;
  158. case 'ERROR':
  159. console.error('[ChatSocket] STOMP 错误:', headers['message'], body);
  160. break;
  161. case 'HEARTBEAT':
  162. // 心跳响应,忽略
  163. break;
  164. default:
  165. // 可能是心跳 \n
  166. break;
  167. }
  168. }
  169. /**
  170. * 处理 STOMP MESSAGE 帧
  171. */
  172. function handleStompMessage(headers, body) {
  173. const destination = headers['destination'];
  174. if (!destination) return;
  175. let data = body;
  176. try {
  177. data = JSON.parse(body);
  178. } catch (e) {
  179. // 非 JSON,直接作为文本
  180. }
  181. // 匹配订阅
  182. Object.keys(subscriptions).forEach(id => {
  183. const sub = subscriptions[id];
  184. if (destination === sub.destination || destination.startsWith(sub.destination)) {
  185. sub.callback(data, headers);
  186. }
  187. });
  188. }
  189. /**
  190. * 订阅 STOMP 目的地
  191. * @param {string} destination - 订阅地址,如 /topic/session/123
  192. * @param {Function} callback - 消息回调
  193. * @returns {string} 订阅 ID
  194. */
  195. export function subscribe(destination, callback) {
  196. const id = 'sub-' + (++subscriptionId);
  197. subscriptions[id] = { destination, callback };
  198. if (isConnected) {
  199. sendFrame('SUBSCRIBE', { id, destination, ack: 'auto' }, '');
  200. }
  201. return id;
  202. }
  203. /**
  204. * 取消订阅
  205. * @param {string} id - 订阅 ID
  206. */
  207. export function unsubscribe(id) {
  208. if (subscriptions[id]) {
  209. if (isConnected) {
  210. sendFrame('UNSUBSCRIBE', { id }, '');
  211. }
  212. delete subscriptions[id];
  213. }
  214. }
  215. /**
  216. * 通过 WebSocket 发送聊天消息(STOMP SEND)
  217. * @param {number} sessionId - 会话 ID
  218. * @param {string} content - 消息内容
  219. * @param {string} msgType - 消息类型
  220. * @param {string} msgNo - 消息编号
  221. */
  222. export function sendTextByWs(sessionId, content, msgType = 'text', msgNo = '', senderId = null) {
  223. if (!isConnected) {
  224. console.warn('[ChatSocket] 未连接,无法发送消息');
  225. return false;
  226. }
  227. if (!msgNo) {
  228. msgNo = generateMsgNo();
  229. }
  230. const bodyObj = {
  231. msgType,
  232. sessionId,
  233. msgNo,
  234. content
  235. };
  236. // 小程序用户必须传 senderId,否则后端无法获取(非后台登录用户)
  237. if (senderId) {
  238. bodyObj.senderId = senderId;
  239. }
  240. const body = JSON.stringify(bodyObj);
  241. sendFrame('SEND', { destination: '/app/chat/send', 'content-type': 'application/json' }, body);
  242. return true;
  243. }
  244. /**
  245. * 断开连接
  246. */
  247. export function disconnectChat() {
  248. if (socketTask && isConnected) {
  249. sendFrame('DISCONNECT', { receipt: 'disconnect-' + Date.now() }, '');
  250. }
  251. clearAll();
  252. if (socketTask) {
  253. socketTask.close({});
  254. socketTask = null;
  255. }
  256. }
  257. /**
  258. * 清理所有状态
  259. */
  260. function clearAll() {
  261. isConnected = false;
  262. stopHeartbeat();
  263. if (reconnectTimer) {
  264. clearTimeout(reconnectTimer);
  265. reconnectTimer = null;
  266. }
  267. Object.keys(subscriptions).forEach(id => delete subscriptions[id]);
  268. }
  269. /**
  270. * 发送 STOMP 帧
  271. * @param {string} command - STOMP 命令
  272. * @param {Object} headers - 帧头
  273. * @param {string} body - 帧体
  274. */
  275. function sendFrame(command, headers, body) {
  276. if (!socketTask) return;
  277. let frame = command + '\n';
  278. if (headers) {
  279. Object.keys(headers).forEach(key => {
  280. frame += key + ':' + headers[key] + '\n';
  281. });
  282. }
  283. frame += '\n'; // 空行分隔 header 和 body
  284. if (body) {
  285. frame += body;
  286. }
  287. frame += '\0'; // NULL 终止符
  288. socketTask.send({
  289. data: frame,
  290. fail: (err) => {
  291. console.error('[ChatSocket] 发送帧失败:', command, err);
  292. }
  293. });
  294. }
  295. /**
  296. * 心跳机制
  297. */
  298. function startHeartbeat() {
  299. stopHeartbeat();
  300. heartbeatTimer = setInterval(() => {
  301. if (isConnected && socketTask) {
  302. // STOMP 心跳是 \n
  303. socketTask.send({
  304. data: '\n',
  305. fail: () => { }
  306. });
  307. }
  308. }, 10000);
  309. }
  310. function stopHeartbeat() {
  311. if (heartbeatTimer) {
  312. clearInterval(heartbeatTimer);
  313. heartbeatTimer = null;
  314. }
  315. }
  316. /**
  317. * 断线重连
  318. */
  319. function scheduleReconnect() {
  320. if (reconnectCount >= MAX_RECONNECT) {
  321. console.log('[ChatSocket] 达到最大重连次数,停止重连');
  322. return;
  323. }
  324. reconnectCount++;
  325. const delay = Math.min(3000 * reconnectCount, 15000);
  326. console.log(`[ChatSocket] ${delay}ms 后进行第 ${reconnectCount} 次重连...`);
  327. reconnectTimer = setTimeout(() => {
  328. const token = uni.getStorageSync('token');
  329. connectChat({
  330. token,
  331. onConnect: onConnectCallback,
  332. onDisconnect: onDisconnectCallback
  333. });
  334. }, delay);
  335. }
  336. /**
  337. * 生成消息编号
  338. */
  339. export function generateMsgNo() {
  340. return 'MSG_' + Date.now() + '_' + Math.random().toString(36).slice(2, 8);
  341. }
  342. /**
  343. * 检查是否已连接
  344. */
  345. export function isSocketConnected() {
  346. return isConnected;
  347. }