Zhangbw 2 месяцев назад
Родитель
Сommit
9ab0c37568

+ 10 - 12
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/controller/api/ChatController.java

@@ -1,10 +1,12 @@
 package org.dromara.talk.controller.api;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.dromara.common.core.domain.dto.DictDataDTO;
 import org.dromara.common.core.service.DictService;
 import org.dromara.common.mybatis.core.page.TableDataInfo;
+import org.dromara.common.satoken.utils.LoginHelper;
 import org.dromara.talk.domain.bo.TalkAgentBo;
 import org.dromara.talk.domain.bo.TalkSessionBo;
 import org.dromara.talk.domain.vo.TalkAgentVo;
@@ -97,7 +99,7 @@ public class ChatController {
             // 获取当前登录用户ID
             Long userId = 0L;
             try {
-                userId = org.dromara.common.satoken.utils.LoginHelper.getUserId();
+                userId = LoginHelper.getUserId();
             } catch (Exception e) {
                 log.warn("获取登录用户ID失败,使用默认值", e);
             }
@@ -125,7 +127,7 @@ public class ChatController {
      * 挂断电话
      * 用户挂断电话后,更新会话结束时间并将客服状态改回0(正常)
      *
-     * @param id 客服ID
+     * @param id      客服ID
      * @param request 包含sessionId的请求体
      * @return 更新结果
      */
@@ -155,7 +157,7 @@ public class ChatController {
                         messages.add(message);
                     }
 
-                    String conversationJson = new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(messages);
+                    String conversationJson = new ObjectMapper().writeValueAsString(messages);
                     talkSessionService.saveOrUpdateConversation(sessionId, id, conversationJson, null, null);
                     log.info("挂断时保存对话内容,会话ID: {}, 消息数量: {}", sessionId, messages.size());
                 } catch (Exception e) {
@@ -180,21 +182,17 @@ public class ChatController {
     @PostMapping("/message/stream")
     public SseEmitter handleMessageStream(@RequestBody Map<String, Object> request) {
         String userMessage = (String) request.get("message");
-        Long agentId = request.get("agentId") != null ?
-            Long.valueOf(request.get("agentId").toString()) : null;
+        Long agentId = request.get("agentId") != null ? Long.valueOf(request.get("agentId").toString()) : null;
         String agentGender = (String) request.get("agentGender");
         List<Map<String, String>> ttsVcnList = (List<Map<String, String>>) request.get("ttsVcnList");
         String conversationId = (String) request.get("conversationId");
-        Boolean isGreeting = request.get("isGreeting") != null ?
-            Boolean.valueOf(request.get("isGreeting").toString()) : false;
-        Integer requestId = request.get("requestId") != null ?
-            Integer.valueOf(request.get("requestId").toString()) : null;
-        String customerPhone = (String) request.get("customerPhone");
+        Boolean isGreeting = request.get("isGreeting") != null ? Boolean.valueOf(request.get("isGreeting").toString()) : false;
+        Integer requestId = request.get("requestId") != null ? Integer.valueOf(request.get("requestId").toString()) : null;
 
-        log.info("收到流式消息请求: {}, 客服ID: {}, 对话ID: {}, 请求ID: {}, 客户手机: {}", userMessage, agentId, conversationId, requestId, customerPhone);
+        log.info("收到流式消息请求: {}, 客服ID: {}, 对话ID: {}, 请求ID: {}", userMessage, agentId, conversationId, requestId);
 
         SseEmitter emitter = new SseEmitter(60000L);
-        chatService.processMessageStream(userMessage, agentId, agentGender, ttsVcnList, conversationId, isGreeting, requestId, customerPhone, emitter);
+        chatService.processMessageStream(userMessage, agentId, agentGender, ttsVcnList, conversationId, isGreeting, requestId, emitter);
         return emitter;
     }
 

+ 1 - 2
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/IChatService.java

@@ -20,8 +20,7 @@ public interface IChatService {
      * @param conversationId 对话ID
      * @param isGreeting 是否为欢迎语
      * @param requestId 请求ID(用于判断是否为最新请求)
-     * @param customerPhone 客户手机号
      * @param emitter SSE发射器
      */
-    void processMessageStream(String userMessage, Long agentId, String agentGender, List<Map<String, String>> ttsVcnList, String conversationId, Boolean isGreeting, Integer requestId, String customerPhone, SseEmitter emitter);
+    void processMessageStream(String userMessage, Long agentId, String agentGender, List<Map<String, String>> ttsVcnList, String conversationId, Boolean isGreeting, Integer requestId, SseEmitter emitter);
 }

+ 52 - 95
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/impl/ChatServiceImpl.java

@@ -38,21 +38,50 @@ public class ChatServiceImpl implements IChatService {
     // 存储每个会话的对话内容
     private final Map<String, List<Map<String, String>>> conversationMap = new ConcurrentHashMap<>();
 
-    @Override
-    public void processMessageStream(String userMessage, Long agentId, String agentGender,
-                                     List<Map<String, String>> ttsVcnList, String conversationId,
-                                     Boolean isGreeting, Integer requestId, String customerPhone, SseEmitter emitter) {
-        // 如果没有传递customerPhone,尝试从数据库中查询
-        if (customerPhone == null && conversationId != null) {
-            TalkSessionVo session = talkSessionService.queryBySessionId(conversationId);
-            if (session != null) {
-                customerPhone = session.getCustomerPhone();
-                log.info("从数据库查询到客户手机号: {}", customerPhone);
+    /**
+     * 合成音频并通过 SSE 发送
+     *
+     * @param text 要合成的文本
+     * @param agentConfig 客服配置
+     * @param emitter SSE 发送器
+     * @return 是否成功
+     */
+    private boolean synthesizeAndSendAudio(String text, TalkAgentVo agentConfig, SseEmitter emitter) {
+        CountDownLatch latch = new CountDownLatch(1);
+        ByteArrayOutputStream mergedAudioBytes = new ByteArrayOutputStream();
+
+        ttsService.synthesizeStream(text, agentConfig, (audioChunk, status) -> {
+            try {
+                byte[] audioBytes = Base64.getDecoder().decode(audioChunk);
+                mergedAudioBytes.write(audioBytes);
+
+                if (status == 2) {
+                    String mergedAudioBase64 = Base64.getEncoder().encodeToString(mergedAudioBytes.toByteArray());
+                    Map<String, String> audioEvent = new HashMap<>();
+                    audioEvent.put("name", "audio");
+                    audioEvent.put("data", mergedAudioBase64);
+                    emitter.send(SseEmitter.event().data(audioEvent));
+                    latch.countDown();
+                }
+            } catch (Exception e) {
+                log.error("发送音频失败", e);
+                latch.countDown();
             }
-        }
+        });
 
-        String finalCustomerPhone = customerPhone;
+        try {
+            return latch.await(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("等待音频合成被中断", e);
+            Thread.currentThread().interrupt();
+            return false;
+        }
+    }
 
+    @Override
+    public void processMessageStream(String userMessage, Long agentId, String agentGender,
+                                     List<Map<String, String>> ttsVcnList, String conversationId,
+                                     Boolean isGreeting, Integer requestId, SseEmitter emitter) {
         // 在主线程中获取用户ID,避免在异步线程中访问ThreadLocal
         Long userId = LoginHelper.getUserId();
         if (userId == null) {
@@ -78,39 +107,8 @@ public class ChatServiceImpl implements IChatService {
 
                 // 如果是欢迎语,合成语音但不发送text事件(避免前端重复显示)
                 if (Boolean.TRUE.equals(isGreeting)) {
-                    TalkAgentVo finalAgentConfig = agentConfig;
-
                     // 合成欢迎语语音
-                    CountDownLatch latch = new CountDownLatch(1);
-                    ByteArrayOutputStream mergedAudioBytes = new ByteArrayOutputStream();
-
-                    ttsService.synthesizeStream(userMessage, finalAgentConfig, (audioChunk, status) -> {
-                        try {
-                            byte[] audioBytes = Base64.getDecoder().decode(audioChunk);
-                            mergedAudioBytes.write(audioBytes);
-
-                            if (status == 2) {
-                                String mergedAudioBase64 = Base64.getEncoder().encodeToString(mergedAudioBytes.toByteArray());
-                                Map<String, String> audioEvent = new HashMap<>();
-                                audioEvent.put("name", "audio");
-                                audioEvent.put("data", mergedAudioBase64);
-                                emitter.send(SseEmitter.event()
-                                    .data(audioEvent));
-                                latch.countDown();
-                            }
-                        } catch (Exception e) {
-                            log.error("发送欢迎语音频失败", e);
-                            latch.countDown();
-                        }
-                    });
-
-                    // 等待音频合成完成
-                    try {
-                        latch.await(30, java.util.concurrent.TimeUnit.SECONDS);
-                    } catch (InterruptedException e) {
-                        log.error("等待欢迎语音频合成被中断", e);
-                        Thread.currentThread().interrupt();
-                    }
+                    synthesizeAndSendAudio(userMessage, agentConfig, emitter);
 
                     // 发送完成信号
                     Map<String, String> doneEvent = new HashMap<>();
@@ -128,7 +126,8 @@ public class ChatServiceImpl implements IChatService {
                 // 初始化对话记录列表
                 String finalConversationId = conversationId;
                 if (finalConversationId != null) {
-                    conversationMap.putIfAbsent(finalConversationId, new ArrayList<>());
+                    // 使用 computeIfAbsent 避免竞态条件
+                    conversationMap.computeIfAbsent(finalConversationId, k -> new ArrayList<>());
 
                     // 添加用户消息(客户)
                     Map<String, String> userMsg = new HashMap<>();
@@ -178,8 +177,8 @@ public class ChatServiceImpl implements IChatService {
 
                             // 发送conversationId
                             if (newConversationId != null) {
-                                // 确保conversationMap中有对应的列表
-                                conversationMap.putIfAbsent(newConversationId, new ArrayList<>());
+                                // 使用 computeIfAbsent 确保线程安全
+                                conversationMap.computeIfAbsent(newConversationId, k -> new ArrayList<>());
 
                                 // 更新conversationId
                                 if (finalConversationId == null) {
@@ -217,53 +216,7 @@ public class ChatServiceImpl implements IChatService {
 
                             if (needAudio && sentence != null && !sentence.trim().isEmpty()) {
                                 log.info("合成句子音频,长度: {}, 内容: {}", sentence.length(), sentence);
-
-                                // 使用 CountDownLatch 等待音频合成完成
-                                CountDownLatch latch = new CountDownLatch(1);
-
-                                // 用于累积同一句子的所有音频片段(字节数组)
-                                ByteArrayOutputStream mergedAudioBytes = new ByteArrayOutputStream();
-
-                                // 对每个句子进行 TTS 合成
-                                ttsService.synthesizeStream(sentence, finalAgentConfig, (audioChunk, status) -> {
-                                    try {
-                                        // 解码base64音频片段并累积到字节流
-                                        byte[] audioBytes = Base64.getDecoder().decode(audioChunk);
-                                        mergedAudioBytes.write(audioBytes);
-
-                                        // 当音频合成完成时(status=2),发送合并后的音频
-                                        if (status == 2) {
-                                            // 将合并后的字节数组重新编码为base64
-                                            String mergedAudioBase64 = Base64.getEncoder().encodeToString(mergedAudioBytes.toByteArray());
-
-                                            Map<String, String> audioEvent = new HashMap<>();
-                                            audioEvent.put("name", "audio");
-                                            audioEvent.put("data", mergedAudioBase64);
-                                            emitter.send(SseEmitter.event()
-                                                .data(audioEvent));
-
-                                            log.info("句子音频合成完成,合并后长度: {}, 释放锁", mergedAudioBase64.length());
-                                            latch.countDown();
-                                        }
-                                    } catch (Exception e) {
-                                        log.error("发送音频失败", e);
-                                        latch.countDown(); // 出错时也要释放锁
-                                    }
-                                });
-
-                                // 等待当前句子的音频合成完成
-                                try {
-                                    log.info("等待句子音频合成完成...");
-                                    boolean completed = latch.await(30, TimeUnit.SECONDS);
-                                    if (completed) {
-                                        log.info("句子音频合成等待完成");
-                                    } else {
-                                        log.warn("句子音频合成等待超时");
-                                    }
-                                } catch (InterruptedException e) {
-                                    log.error("等待音频合成被中断", e);
-                                    Thread.currentThread().interrupt();
-                                }
+                                synthesizeAndSendAudio(sentence, finalAgentConfig, emitter);
                             }
 
                             // 如果是最后一个句子,发送完成事件
@@ -289,9 +242,13 @@ public class ChatServiceImpl implements IChatService {
                                         String conversationJson = new ObjectMapper().writeValueAsString(messages);
 
                                         // 保存到数据库
-                                        talkSessionService.saveOrUpdateConversation(newConversationId, finalAgentConfig.getId(), conversationJson, finalCustomerPhone, finalUserId);
+                                        talkSessionService.saveOrUpdateConversation(newConversationId, finalAgentConfig.getId(), conversationJson, null, finalUserId);
 
                                         log.info("对话内容已保存到数据库,会话ID: {}, 消息数量: {}", newConversationId, messages.size());
+
+                                        // 清理内存:对话完成后从 conversationMap 中移除
+                                        conversationMap.remove(newConversationId);
+                                        log.debug("已清理会话 {} 的对话内容缓存", newConversationId);
                                     } catch (Exception e) {
                                         log.error("保存对话内容失败", e);
                                     }

+ 12 - 11
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/impl/DifyServiceImpl.java

@@ -54,17 +54,17 @@ public class DifyServiceImpl implements IDifyService {
                 .addHeader("Content-Type", "application/json")
                 .build();
 
-            Response response = httpClient.newCall(request).execute();
-
-            // 处理流式响应
-            StringBuilder fullAnswer = new StringBuilder();
-            String newConversationId = null;
-            String ttsVcn = null;
-
-            try (ResponseBody responseBody = response.body()) {
-                if (responseBody == null) {
-                    throw new RuntimeException("响应体为空");
-                }
+            // 使用 try-with-resources 确保 response 被正确关闭
+            try (Response response = httpClient.newCall(request).execute()) {
+                // 处理流式响应
+                StringBuilder fullAnswer = new StringBuilder();
+                String newConversationId = null;
+                String ttsVcn = null;
+
+                try (ResponseBody responseBody = response.body()) {
+                    if (responseBody == null) {
+                        throw new RuntimeException("响应体为空");
+                    }
 
                 // 逐行读取 SSE 流
                 java.io.BufferedReader reader = new java.io.BufferedReader(
@@ -137,6 +137,7 @@ public class DifyServiceImpl implements IDifyService {
             log.info("Dify 流式响应处理完成,回复长度: {}", replyText.length());
 
             return aiResponse;
+            } // 关闭 try (Response response = ...)
         } catch (Exception e) {
             log.error("调用 Dify 工作流失败", e);
             throw new RuntimeException("AI 服务调用失败", e);