Forráskód Böngészése

针对dify工作流的流式输出修改

Zhangbw 2 hónapja
szülő
commit
691b3b2e73

+ 4 - 2
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/controller/api/ChatController.java

@@ -56,10 +56,12 @@ public class ChatController {
         String conversationId = (String) request.get("conversationId");
         Boolean isGreeting = request.get("isGreeting") != null ?
             Boolean.valueOf(request.get("isGreeting").toString()) : false;
+        Integer requestId = request.get("requestId") != null ?
+            Integer.valueOf(request.get("requestId").toString()) : null;
 
-        log.info("收到用户消息: {}, 客服ID: {}, 客服性别: {}, 对话ID: {}, 是否欢迎语: {}", userMessage, agentId, agentGender, conversationId, isGreeting);
+        log.info("收到用户消息: {}, 客服ID: {}, 客服性别: {}, 对话ID: {}, 是否欢迎语: {}, 请求ID: {}", userMessage, agentId, agentGender, conversationId, isGreeting, requestId);
 
-        return chatService.processMessage(userMessage, agentId, agentGender, ttsVcnList, conversationId, isGreeting);
+        return chatService.processMessage(userMessage, agentId, agentGender, ttsVcnList, conversationId, isGreeting, requestId);
     }
 
     /**

+ 2 - 1
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/IChatService.java

@@ -17,7 +17,8 @@ public interface IChatService {
      * @param ttsVcnList 发言人字典列表
      * @param conversationId 对话ID(用于上下文回顾)
      * @param isGreeting 是否为欢迎语(true时跳过dify工作流)
+     * @param requestId 请求ID(用于判断是否为最新请求)
      * @return 响应数据(包含回复文本和音频)
      */
-    Map<String, Object> processMessage(String userMessage, Long agentId, String agentGender, List<Map<String, String>> ttsVcnList, String conversationId, Boolean isGreeting);
+    Map<String, Object> processMessage(String userMessage, Long agentId, String agentGender, List<Map<String, String>> ttsVcnList, String conversationId, Boolean isGreeting, Integer requestId);
 }

+ 37 - 13
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/impl/ChatServiceImpl.java

@@ -25,9 +25,27 @@ public class ChatServiceImpl implements IChatService {
     private final ITalkAgentService talkAgentService;
     private final IDifyService difyService;
 
+    // 存储每个用户的最新请求ID
+    private final Map<Long, Integer> latestRequestIdMap = new java.util.concurrent.ConcurrentHashMap<>();
+
     @Override
-    public Map<String, Object> processMessage(String userMessage, Long agentId, String agentGender, List<Map<String, String>> ttsVcnList, String conversationId, Boolean isGreeting) {
-        log.info("处理用户消息: {}, 客服ID: {}, 客服性别: {}, 对话ID: {}, 是否欢迎语: {}", userMessage, agentId, agentGender, conversationId, isGreeting);
+    public Map<String, Object> processMessage(String userMessage, Long agentId, String agentGender, List<Map<String, String>> ttsVcnList, String conversationId, Boolean isGreeting, Integer requestId) {
+        log.info("处理用户消息: {}, 客服ID: {}, 客服性别: {}, 对话ID: {}, 是否欢迎语: {}, 请求ID: {}", userMessage, agentId, agentGender, conversationId, isGreeting, requestId);
+
+        // 获取当前登录用户ID
+        Long userId = null;
+        try {
+            userId = StpUtil.getLoginIdAsLong();
+        } catch (Exception e) {
+            log.warn("获取登录用户ID失败,使用默认值", e);
+            userId = 0L;
+        }
+
+        // 更新最新请求ID
+        if (requestId != null && userId != null) {
+            latestRequestIdMap.put(userId, requestId);
+            log.info("更新用户 {} 的最新请求ID为: {}", userId, requestId);
+        }
 
         // 获取客服配置
         TalkAgentVo agentConfig = null;
@@ -43,15 +61,6 @@ public class ChatServiceImpl implements IChatService {
             log.info("处理欢迎语,跳过 Dify 工作流");
             reply = userMessage;
         } else {
-            // 获取当前登录用户ID
-            Long userId = null;
-            try {
-                userId = StpUtil.getLoginIdAsLong();
-            } catch (Exception e) {
-                log.warn("获取登录用户ID失败,使用默认值", e);
-                userId = 0L;
-            }
-
             // 调用 Dify 生成回复
             Map<String, String> aiResult = generateReply(userMessage, agentGender, ttsVcnList, agentConfig, userId, conversationId);
             reply = aiResult.get("replyText");
@@ -64,8 +73,23 @@ public class ChatServiceImpl implements IChatService {
             }
         }
 
-        // 合成语音(传递客服配置)
-        String audioBase64 = synthesizeAudio(reply, agentConfig);
+        // 检查是否需要合成音频(只为最新请求合成音频)
+        boolean needAudio = true;
+        if (requestId != null && userId != null) {
+            Integer latestRequestId = latestRequestIdMap.get(userId);
+            if (latestRequestId != null && !latestRequestId.equals(requestId)) {
+                needAudio = false;
+                log.info("请求ID {} 不是最新请求(最新为 {}),跳过音频合成", requestId, latestRequestId);
+            }
+        }
+
+        // 合成语音(只有最新请求才合成)
+        String audioBase64 = null;
+        if (needAudio) {
+            audioBase64 = synthesizeAudio(reply, agentConfig);
+        } else {
+            log.info("跳过音频合成");
+        }
 
         // 构建响应
         Map<String, Object> response = new HashMap<>();

+ 61 - 26
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/impl/DifyServiceImpl.java

@@ -31,7 +31,7 @@ public class DifyServiceImpl implements IDifyService {
             JSONObject requestBody = new JSONObject();
             requestBody.set("inputs", inputs);
             requestBody.set("query", userMessage);
-            requestBody.set("response_mode", "blocking");
+            requestBody.set("response_mode", "streaming");
             requestBody.set("user", "user-" + userId);
 
             // 如果有对话ID,添加到请求中以支持上下文回顾
@@ -55,40 +55,74 @@ public class DifyServiceImpl implements IDifyService {
                 .build();
 
             Response response = httpClient.newCall(request).execute();
-            String responseBody = response.body().string();
 
-            log.info("Dify API 响应: {}", responseBody);
+            // 处理流式响应
+            StringBuilder fullAnswer = new StringBuilder();
+            String newConversationId = null;
+            String ttsVcn = null;
 
-            // 解析响应
-            JSONObject result = new JSONObject(responseBody);
-            Map<String, String> aiResponse = new HashMap<>();
+            try (ResponseBody responseBody = response.body()) {
+                if (responseBody == null) {
+                    throw new RuntimeException("响应体为空");
+                }
 
-            // 检查是否有错误响应
-            if (result.containsKey("code") && result.containsKey("message")) {
-                String errorCode = result.getStr("code");
-                String errorMessage = result.getStr("message");
-                log.error("Dify API 返回错误: code={}, message={}", errorCode, errorMessage);
-                throw new RuntimeException("Dify API 错误: " + errorMessage);
+                // 逐行读取 SSE 流
+                java.io.BufferedReader reader = new java.io.BufferedReader(
+                    new java.io.InputStreamReader(responseBody.byteStream(), java.nio.charset.StandardCharsets.UTF_8)
+                );
+
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    // SSE 格式: data: {...}
+                    if (line.startsWith("data: ")) {
+                        String jsonData = line.substring(6); // 去掉 "data: " 前缀
+
+                        try {
+                            JSONObject event = new JSONObject(jsonData);
+                            String eventType = event.getStr("event");
+
+                            if ("message".equals(eventType)) {
+                                // 累积回复文本
+                                String answer = event.getStr("answer");
+                                if (answer != null) {
+                                    fullAnswer.append(answer);
+                                }
+                            } else if ("message_end".equals(eventType)) {
+                                // 最后一个事件,获取 conversation_id
+                                newConversationId = event.getStr("conversation_id");
+
+                                // 从 metadata 中获取 outputs
+                                if (event.containsKey("metadata")) {
+                                    JSONObject metadata = event.getJSONObject("metadata");
+                                    if (metadata != null && metadata.containsKey("outputs")) {
+                                        JSONObject outputs = metadata.getJSONObject("outputs");
+                                        if (outputs != null) {
+                                            ttsVcn = outputs.getStr("ttsVcn");
+                                        }
+                                    }
+                                }
+                            } else if ("error".equals(eventType)) {
+                                String errorMessage = event.getStr("message");
+                                log.error("Dify API 返回错误: {}", errorMessage);
+                                throw new RuntimeException("Dify API 错误: " + errorMessage);
+                            }
+                        } catch (Exception e) {
+                            log.warn("解析 SSE 事件失败: {}", line, e);
+                        }
+                    }
+                }
             }
 
-            // Dify API 直接返回 answer 字段,不是嵌套在 data 中
-            String replyText = result.getStr("answer");
-            if (replyText == null) {
-                log.error("Dify API 响应中没有 answer 字段,完整响应: {}", responseBody);
+            Map<String, String> aiResponse = new HashMap<>();
+
+            String replyText = fullAnswer.toString();
+            if (replyText.isEmpty()) {
+                log.error("Dify API 未返回任何回复文本");
                 throw new RuntimeException("Dify API 响应格式错误");
             }
 
             aiResponse.put("replyText", replyText);
 
-            // 从 outputs 中获取 ttsVcn(如果有的话)
-            String ttsVcn = null;
-            if (result.containsKey("outputs")) {
-                JSONObject outputs = result.getJSONObject("outputs");
-                if (outputs != null) {
-                    ttsVcn = outputs.getStr("ttsVcn");
-                }
-            }
-
             // 如果AI没有返回发言人,使用客服配置中的默认发言人
             if (ttsVcn == null) {
                 ttsVcn = inputs.get("currentVcn") != null ? inputs.get("currentVcn").toString() : "x4_yezi";
@@ -96,11 +130,12 @@ public class DifyServiceImpl implements IDifyService {
             aiResponse.put("ttsVcn", ttsVcn);
 
             // 返回对话ID以便前端保存,用于下次请求
-            String newConversationId = result.getStr("conversation_id");
             if (newConversationId != null) {
                 aiResponse.put("conversationId", newConversationId);
             }
 
+            log.info("Dify 流式响应处理完成,回复长度: {}", replyText.length());
+
             return aiResponse;
         } catch (Exception e) {
             log.error("调用 Dify 工作流失败", e);