Gqingci hace 4 horas
padre
commit
478a68038b

+ 4 - 4
ruoyi-admin/src/main/resources/application-dev.yml

@@ -47,9 +47,9 @@ spring:
           driverClassName: com.mysql.cj.jdbc.Driver
           # jdbc 所有参数配置参考 https://lionli.blog.csdn.net/article/details/122018562
           # rewriteBatchedStatements=true 批处理优化 大幅提升批量插入更新删除性能(对数据库有性能损耗 使用批量操作应考虑性能问题)
-          url: jdbc:mysql://localhost:3306/ai-talk?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true
+          url: jdbc:mysql://192.168.194.130:3306/ai-talk?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true
           username: root
-          password: 121380
+          password: 123456
 #        # 从库数据源
 #        slave:
 #          lazy: true
@@ -96,13 +96,13 @@ spring:
 spring.data:
   redis:
     # 地址
-    host: 192.168.150.102
+    host: 192.168.194.130
     # 端口,默认为6379
     port: 6379
     # 数据库索引
     database: 0
     # redis 密码必须配置
-    password: hat123
+    password: 123456
     # 连接超时时间
     timeout: 10s
     # 是否开启ssl

+ 2 - 2
ruoyi-extend/ruoyi-snailjob-server/src/main/resources/application-dev.yml

@@ -2,9 +2,9 @@ spring:
   datasource:
     type: com.zaxxer.hikari.HikariDataSource
     driver-class-name: com.mysql.cj.jdbc.Driver
-    url: jdbc:mysql://localhost:3306/ry-vue?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
+    url: jdbc:mysql://192.168.194.130:3306/ry-vue?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
     username: root
-    password: root
+    password: 123456
     hikari:
       connection-timeout: 30000
       validation-timeout: 5000

+ 33 - 0
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/config/ChatAsyncConfig.java

@@ -0,0 +1,33 @@
+package org.dromara.talk.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+
+@Configuration
+public class ChatAsyncConfig {
+
+    @Bean("chatStreamExecutor")
+    public Executor chatStreamExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setThreadNamePrefix("chat-stream-");
+        executor.setCorePoolSize(8);
+        executor.setMaxPoolSize(16);
+        executor.setQueueCapacity(200);
+        executor.initialize();
+        return executor;
+    }
+
+    @Bean("chatTtsExecutor")
+    public Executor chatTtsExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setThreadNamePrefix("chat-tts-");
+        executor.setCorePoolSize(4);
+        executor.setMaxPoolSize(8);
+        executor.setQueueCapacity(200);
+        executor.initialize();
+        return executor;
+    }
+}

+ 1 - 1
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/IDifyService.java

@@ -11,7 +11,7 @@ public interface IDifyService {
    /**
     * 流式调用Dify工作流(按句子分段)
     */
-   void callWorkflowStream(String userMessage, String agentGender, List<Map<String, String>> ttsVcnList,
+   void callWorkflowStream(String traceId, String userMessage, String agentGender, List<Map<String, String>> ttsVcnList,
                           TalkAgentVo agentConfig, Long userId, String conversationId, String customerPhone, String source,
                           Consumer<String> onTextChunk,
                           SentenceCallback onSentence);

+ 246 - 85
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/impl/ChatServiceImpl.java

@@ -16,6 +16,7 @@ import org.dromara.talk.service.IPhoneUserService;
 import org.dromara.talk.service.ITalkAgentService;
 import org.dromara.talk.service.ITalkSessionService;
 import org.dromara.talk.service.ITtsService;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
@@ -23,8 +24,8 @@ import java.io.ByteArrayOutputStream;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -36,6 +37,10 @@ public class ChatServiceImpl implements IChatService {
     private final IDifyService difyService;
     private final ITalkSessionService talkSessionService;
     private final IPhoneUserService phoneUserService;
+    @Qualifier("chatStreamExecutor")
+    private final Executor chatStreamExecutor;
+    @Qualifier("chatTtsExecutor")
+    private final Executor chatTtsExecutor;
 
     // 存储每个用户的最新请求ID
     private final Map<Long, Integer> latestRequestIdMap = new ConcurrentHashMap<>();
@@ -43,46 +48,11 @@ public class ChatServiceImpl implements IChatService {
     // 存储每个会话的对话内容
     private final Map<String, List<Map<String, String>>> conversationMap = new ConcurrentHashMap<>();
 
-    /**
-     * 合成音频并通过 SSE 发送
-     *
-     * @param text 要合成的文本
-     * @param agentConfig 客服配置
-     * @param emitter SSE 发送器
-     */
-    private void synthesizeAndSendAudio(String text, TalkAgentVo agentConfig, SseEmitter emitter) {
-        CountDownLatch latch = new CountDownLatch(1);
-        ByteArrayOutputStream mergedAudioBytes = new ByteArrayOutputStream();
-
-        ttsService.synthesizeStream(text, agentConfig, (audioChunk, status) -> {
-            try {
-                byte[] audioBytes = Base64.getDecoder().decode(audioChunk);
-                mergedAudioBytes.write(audioBytes);
-
-                if (status == 2) {
-                    String mergedAudioBase64 = Base64.getEncoder().encodeToString(mergedAudioBytes.toByteArray());
-                    Map<String, String> audioEvent = new HashMap<>();
-                    audioEvent.put("name", "audio");
-                    audioEvent.put("data", mergedAudioBase64);
-                    emitter.send(SseEmitter.event().data(audioEvent));
-                    latch.countDown();
-                }
-            } catch (Exception e) {
-                log.error("发送音频失败", e);
-                latch.countDown();
-            }
-        });
-
-        try {
-            latch.await(30, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            log.error("等待音频合成被中断", e);
-            Thread.currentThread().interrupt();
-        }
-    }
-
     @Override
     public void processMessageStream(MessageStreamRequest request, SseEmitter emitter) {
+        long requestStartNs = System.nanoTime();
+        String traceId = buildTraceId(request);
+
         // 在主线程中获取用户ID,避免在异步线程中访问ThreadLocal
         Long userId = LoginHelper.getUserId();
         if (userId == null) {
@@ -90,52 +60,63 @@ public class ChatServiceImpl implements IChatService {
             log.warn("获取登录用户ID失败,使用默认值");
         }
 
+        log.info("traceId={} currentTime={} 聊天流开始 requestId={}, userId={}, conversationId={}, agentId={}, isGreeting={}, messageLength={}",
+            traceId, currentTime(),
+            request.getRequestId(), userId, request.getConversationId(), request.getAgentId(),
+            request.getIsGreeting(), request.getMessage() != null ? request.getMessage().length() : 0);
+
         // 更新最新请求ID
         if (request.getRequestId() != null) {
             latestRequestIdMap.put(userId, request.getRequestId());
-            log.info("流式请求 - 更新用户 {} 的最新请求ID为: {}", userId, request.getRequestId());
+            log.info("traceId={} currentTime={} 更新用户 {} 的最新请求ID为: {},elapsedMs={}",
+                traceId, currentTime(), userId, request.getRequestId(), elapsedMs(requestStartNs));
         }
 
         Long finalUserId = userId;
         CompletableFuture.runAsync(() -> {
+            long asyncStartNs = System.nanoTime();
             try {
-
+                log.info("traceId={} currentTime={} 异步处理开始 queueWaitMs={}", traceId, currentTime(), elapsedMs(requestStartNs));
                 // 获取客服配置
                 TalkAgentVo agentConfig = null;
                 if (request.getAgentId() != null) {
+                    long agentQueryStartNs = System.nanoTime();
                     agentConfig = talkAgentService.queryById(request.getAgentId());
+                    log.info("traceId={} currentTime={} 查询客服配置完成 elapsedMs={}, agentId={}, found={}",
+                        traceId, currentTime(), elapsedMs(agentQueryStartNs), request.getAgentId(), agentConfig != null);
                 }
 
                 // 如果是欢迎语,合成语音但不发送text事件(避免前端重复显示)
                 if (Boolean.TRUE.equals(request.getIsGreeting())) {
-                    // 合成欢迎语语音
-                    synthesizeAndSendAudio(request.getMessage(), agentConfig, emitter);
-
-                    // 发送完成信号
-                    Map<String, String> doneEvent = new HashMap<>();
-                    doneEvent.put("name", "done");
-                    doneEvent.put("data", "");
-                    emitter.send(SseEmitter.event()
-                        .data(doneEvent));
-                    emitter.complete();
+                    AtomicReference<CompletableFuture<Void>> greetingAudioChain =
+                        new AtomicReference<>(CompletableFuture.completedFuture(null));
+                    enqueueAudioSynthesis(greetingAudioChain, request.getMessage(), agentConfig, emitter, finalUserId,
+                        request.getRequestId(), traceId, "greeting-tts", requestStartNs);
+                    completeAfterAudio(greetingAudioChain, emitter, traceId, requestStartNs);
                     return;
                 }
 
                 TalkAgentVo finalAgentConfig = agentConfig;
                 Integer finalRequestId = request.getRequestId();
+                long[] firstTextChunkNs = {0L};
+                int[] textChunkCount = {0};
+                int[] sentenceCount = {0};
+                AtomicReference<CompletableFuture<Void>> audioChain =
+                    new AtomicReference<>(CompletableFuture.completedFuture(null));
+                StringBuilder audioBatchBuilder = new StringBuilder();
+                final int audioBatchMaxLength = 120;
 
                 // 初始化对话记录列表
                 String finalConversationId = request.getConversationId();
                 if (finalConversationId != null) {
-                    // 使用 computeIfAbsent 避免竞态条件
-                    conversationMap.computeIfAbsent(finalConversationId, k -> new ArrayList<>());
+                    List<Map<String, String>> messages = getOrCreateConversation(finalConversationId);
 
                     // 添加用户消息(客户)
                     Map<String, String> userMsg = new HashMap<>();
                     userMsg.put("role", "user");
                     userMsg.put("content", request.getMessage());
                     userMsg.put("timestamp", String.valueOf(System.currentTimeMillis()));
-                    conversationMap.get(finalConversationId).add(userMsg);
+                    messages.add(userMsg);
                 }
 
                 // 用于累积AI回复的完整文本
@@ -145,13 +126,16 @@ public class ChatServiceImpl implements IChatService {
                 // 如果是临时 sessionId(数据库中还未被 Dify 更新),则传递 null 给 Dify
                 String difyConversationId = request.getConversationId();
                 if (request.getConversationId() != null) {
+                    long querySessionStartNs = System.nanoTime();
                     TalkSessionVo session = talkSessionService.queryBySessionId(request.getConversationId());
+                    log.info("traceId={} currentTime={} 查询会话完成 elapsedMs={}, requestedConversationId={}, sessionFound={}",
+                        traceId, currentTime(), elapsedMs(querySessionStartNs), request.getConversationId(), session != null);
                     if (session != null && request.getConversationId().equals(session.getSessionId())) {
                         // 检查这个 sessionId 是否是刚创建的(没有对话内容)
                         // 如果没有对话内容,说明这是第一次对话,不应该传递给 Dify
                         if (session.getConversationJson() == null || session.getConversationJson().isEmpty()) {
                             difyConversationId = null;
-                            log.info("检测到临时 sessionId,第一次对话不传递 conversationId 给 Dify");
+                            log.info("traceId={} currentTime={} 检测到临时 sessionId,第一次对话不传递 conversationId 给 Dify", traceId, currentTime());
                         }
                     }
                 }
@@ -159,9 +143,27 @@ public class ChatServiceImpl implements IChatService {
                 // 处理source字段,去除前后空格
                 String source = request.getSource() != null ? request.getSource().trim() : null;
 
-                difyService.callWorkflowStream(request.getMessage(), request.getAgentGender(), request.getTtsVcnList(), agentConfig, finalUserId, difyConversationId, request.getCustomerPhone(), source,
+                log.info("traceId={} currentTime={} 准备调用Dify elapsedMs={}, difyConversationId={}, source={}",
+                    traceId, currentTime(), elapsedMs(requestStartNs), difyConversationId, source);
+
+                difyService.callWorkflowStream(traceId,
+                    request.getMessage(),
+                    request.getAgentGender(),
+                    request.getTtsVcnList(),
+                    agentConfig,
+                    finalUserId,
+                    difyConversationId,
+                    request.getCustomerPhone(),
+                    source,
                     (textChunk) -> {
                         try {
+                            textChunkCount[0]++;
+                            if (firstTextChunkNs[0] == 0L) {
+                                firstTextChunkNs[0] = System.nanoTime();
+                                log.info("traceId={} currentTime={} 首个文本分片发送给前端 elapsedMs={}, chunkLength={}",
+                                    traceId, currentTime(), elapsedMs(requestStartNs), textChunk.length());
+                            }
+
                             // 累积AI回复文本
                             aiReplyBuilder.append(textChunk);
 
@@ -177,30 +179,33 @@ public class ChatServiceImpl implements IChatService {
                     },
                     (sentence, newConversationId, isComplete) -> {
                         try {
-                            log.info("句子回调 - 句子: {}, isComplete: {}", sentence != null ? sentence : "(null)", isComplete);
+                            sentenceCount[0]++;
+                            log.info("traceId={} currentTime={} 句子回调 elapsedMs={}, sentenceIndex={}, sentenceLength={}, isComplete={}, conversationId={}",
+                                traceId, currentTime(), elapsedMs(requestStartNs), sentenceCount[0], sentence != null ? sentence.length() : 0,
+                                isComplete, newConversationId);
 
                             // 发送conversationId
                             if (newConversationId != null) {
-                                // 使用 computeIfAbsent 确保线程安全
-                                conversationMap.computeIfAbsent(newConversationId, k -> new ArrayList<>());
+                                List<Map<String, String>> messages = getOrCreateConversation(newConversationId);
 
                                 // 更新conversationId
                                 if (finalConversationId == null) {
-                                    // 第一次对话,已经在上面创建了列表
+                                    Map<String, String> userMsg = new HashMap<>();
+                                    userMsg.put("role", "user");
+                                    userMsg.put("content", request.getMessage());
+                                    userMsg.put("timestamp", String.valueOf(System.currentTimeMillis()));
+                                    messages.add(userMsg);
+                                    log.info("traceId={} currentTime={} 首次收到Dify conversationId={}, 已补写用户消息", traceId, currentTime(), newConversationId);
                                 } else if (!finalConversationId.equals(newConversationId)) {
                                     // 如果传入的conversationId与Dify返回的不同,说明传入的是临时sessionId
                                     // 需要更新数据库中的sessionId
+                                    long updateSessionIdStartNs = System.nanoTime();
                                     talkSessionService.updateSessionId(finalConversationId, newConversationId);
-                                    log.info("更新临时sessionId {} 为Dify的conversationId {}", finalConversationId, newConversationId);
+                                    moveConversation(finalConversationId, newConversationId);
+                                    log.info("traceId={} currentTime={} 更新临时sessionId为Dify conversationId old={}, new={}, elapsedMs={}",
+                                        traceId, currentTime(), finalConversationId, newConversationId, elapsedMs(updateSessionIdStartNs));
                                 }
 
-                                // 添加用户消息(客户)- 每次对话都要添加
-                                Map<String, String> userMsg = new HashMap<>();
-                                userMsg.put("role", "user");
-                                userMsg.put("content", request.getMessage());
-                                userMsg.put("timestamp", String.valueOf(System.currentTimeMillis()));
-                                conversationMap.get(newConversationId).add(userMsg);
-
                                 Map<String, String> conversationEvent = new HashMap<>();
                                 conversationEvent.put("name", "conversationId");
                                 conversationEvent.put("data", newConversationId);
@@ -219,17 +224,26 @@ public class ChatServiceImpl implements IChatService {
                             }
 
                             if (needAudio && sentence != null && !sentence.trim().isEmpty()) {
-                                log.info("合成句子音频,长度: {}, 内容: {}", sentence.length(), sentence);
-                                synthesizeAndSendAudio(sentence, finalAgentConfig, emitter);
+                                audioBatchBuilder.append(sentence.trim());
+                                if (shouldFlushAudioBatch(audioBatchBuilder, isComplete, audioBatchMaxLength)) {
+                                    String audioText = audioBatchBuilder.toString();
+                                    audioBatchBuilder.setLength(0);
+                                    log.info("traceId={} currentTime={} 音频批次入队 elapsedMs={}, textLength={}",
+                                        traceId, currentTime(), elapsedMs(requestStartNs), audioText.length());
+                                    enqueueAudioSynthesis(audioChain, audioText, finalAgentConfig, emitter, finalUserId,
+                                        finalRequestId, traceId, "stream-tts", requestStartNs);
+                                }
                             }
 
                             // 如果是最后一个句子,发送完成事件
                             if (isComplete) {
-                                log.info("收到完成标志,准备发送done事件");
+                                log.info("traceId={} currentTime={} 收到完成标志 elapsedMs={}, textChunkCount={}, sentenceCount={}, aiReplyLength={}",
+                                    traceId, currentTime(), elapsedMs(requestStartNs), textChunkCount[0], sentenceCount[0], aiReplyBuilder.length());
 
                                 // 保存对话内容到数据库
                                 if (newConversationId != null && finalAgentConfig != null) {
                                     try {
+                                        long saveConversationStartNs = System.nanoTime();
                                         // 获取当前会话的对话内容
                                         List<Map<String, String>> messages = conversationMap.getOrDefault(newConversationId, new ArrayList<>());
 
@@ -248,39 +262,186 @@ public class ChatServiceImpl implements IChatService {
                                         // 保存到数据库
                                         talkSessionService.saveOrUpdateConversation(newConversationId, finalAgentConfig.getId(), conversationJson, null, finalUserId);
 
-                                        log.info("对话内容已保存到数据库,会话ID: {}, 消息数量: {}", newConversationId, messages.size());
+                                        log.info("traceId={} currentTime={} 对话内容已保存到数据库 elapsedMs={}, conversationId={}, messageCount={}",
+                                            traceId, currentTime(), elapsedMs(saveConversationStartNs), newConversationId, messages.size());
 
                                         // 清理内存:对话完成后从 conversationMap 中移除
                                         conversationMap.remove(newConversationId);
-                                        log.debug("已清理会话 {} 的对话内容缓存", newConversationId);
+                                        log.debug("traceId={} 已清理会话 {} 的对话内容缓存", traceId, newConversationId);
                                     } catch (Exception e) {
-                                        log.error("保存对话内容失败", e);
+                                        log.error("traceId={} 保存对话内容失败", traceId, e);
                                     }
                                 }
 
-                                Map<String, String> doneEvent = new HashMap<>();
-                                doneEvent.put("name", "done");
-                                doneEvent.put("data", "");
-                                emitter.send(SseEmitter.event()
-                                    .data(doneEvent));
-                                log.info("done事件已发送,关闭SSE连接");
-                                emitter.complete();
+                                if (audioBatchBuilder.length() > 0) {
+                                    String audioText = audioBatchBuilder.toString();
+                                    audioBatchBuilder.setLength(0);
+                                    log.info("traceId={} currentTime={} 尾部音频批次入队 elapsedMs={}, textLength={}",
+                                        traceId, currentTime(), elapsedMs(requestStartNs), audioText.length());
+                                    enqueueAudioSynthesis(audioChain, audioText, finalAgentConfig, emitter, finalUserId,
+                                        finalRequestId, traceId, "tail-tts", requestStartNs);
+                                }
+                                completeAfterAudio(audioChain, emitter, traceId, requestStartNs);
+                                log.info("traceId={} currentTime={} 已安排在音频队列完成后发送done事件 totalElapsedMs={}",
+                                    traceId, currentTime(), elapsedMs(requestStartNs));
                             }
                         } catch (Exception e) {
-                            log.error("处理句子回调失败", e);
+                            log.error("traceId={} 处理句子回调失败 elapsedMs={}", traceId, elapsedMs(requestStartNs), e);
                             emitter.completeWithError(e);
                         }
                     });
 
+                log.info("traceId={} currentTime={} Dify调用返回到ChatService totalElapsedMs={}, asyncElapsedMs={}",
+                    traceId, currentTime(), elapsedMs(requestStartNs), elapsedMs(asyncStartNs));
+
             } catch (Exception e) {
-                log.error("流式处理失败", e);
+                log.error("traceId={} 流式处理失败 totalElapsedMs={}", traceId, elapsedMs(requestStartNs), e);
                 try {
                     emitter.completeWithError(e);
                 } catch (Exception ex) {
-                    log.error("发送错误失败", ex);
+                    log.error("traceId={} 发送错误失败", traceId, ex);
                 }
             }
+        }, chatStreamExecutor);
+    }
+
+    private List<Map<String, String>> getOrCreateConversation(String conversationId) {
+        return conversationMap.computeIfAbsent(conversationId, key -> Collections.synchronizedList(new ArrayList<>()));
+    }
+
+    private void moveConversation(String oldConversationId, String newConversationId) {
+        List<Map<String, String>> oldMessages = conversationMap.remove(oldConversationId);
+        if (oldMessages == null) {
+            return;
+        }
+        List<Map<String, String>> newMessages = getOrCreateConversation(newConversationId);
+        synchronized (oldMessages) {
+            newMessages.addAll(oldMessages);
+        }
+    }
+
+    private boolean shouldFlushAudioBatch(StringBuilder audioBatchBuilder, boolean isComplete, int audioBatchMaxLength) {
+        return isComplete || audioBatchBuilder.length() >= audioBatchMaxLength;
+    }
+
+    private void enqueueAudioSynthesis(AtomicReference<CompletableFuture<Void>> audioChain,
+                                       String text,
+                                       TalkAgentVo agentConfig,
+                                       SseEmitter emitter,
+                                       Long userId,
+                                       Integer requestId,
+                                       String traceId,
+                                       String stageName,
+                                       long requestStartNs) {
+        if (text == null || text.trim().isEmpty()) {
+            return;
+        }
+        log.info("traceId={} currentTime={} {} 排队等待TTS elapsedMs={}, textLength={}",
+            traceId, currentTime(), stageName, elapsedMs(requestStartNs), text.length());
+        audioChain.getAndUpdate(previous -> previous.thenRunAsync(() ->
+            synthesizeAndSendAudio(text, agentConfig, emitter, userId, requestId, traceId, stageName, requestStartNs), chatTtsExecutor));
+    }
+
+    private void synthesizeAndSendAudio(String text,
+                                        TalkAgentVo agentConfig,
+                                        SseEmitter emitter,
+                                        Long userId,
+                                        Integer requestId,
+                                        String traceId,
+                                        String stageName,
+                                        long requestStartNs) {
+        if (!isLatestRequest(userId, requestId)) {
+            log.info("traceId={} currentTime={} {} 请求ID {} 已过期,跳过音频合成 elapsedMs={}",
+                traceId, currentTime(), stageName, requestId, elapsedMs(requestStartNs));
+            return;
+        }
+
+        long ttsStartNs = System.nanoTime();
+        long[] firstAudioChunkNs = {0L};
+        int[] audioChunkCount = {0};
+        ByteArrayOutputStream mergedAudioBytes = new ByteArrayOutputStream();
+        AtomicReference<RuntimeException> audioFailure = new AtomicReference<>();
+        log.info("traceId={} currentTime={} {} 开始TTS elapsedMs={}, textLength={}",
+            traceId, currentTime(), stageName, elapsedMs(requestStartNs), text.length());
+        ttsService.synthesize(text, agentConfig, new ITtsService.AudioCallback() {
+            @Override
+            public void onAudio(String base64Audio, int status) {
+                try {
+                    audioChunkCount[0]++;
+                    if (firstAudioChunkNs[0] == 0L) {
+                        firstAudioChunkNs[0] = System.nanoTime();
+                        log.info("traceId={} currentTime={} {} 首个TTS音频分片到达 ttsElapsedMs={}, requestElapsedMs={}",
+                            traceId, currentTime(), stageName, elapsedMs(ttsStartNs), elapsedMs(requestStartNs));
+                    }
+                    byte[] audioBytes = Base64.getDecoder().decode(base64Audio);
+                    mergedAudioBytes.write(audioBytes);
+                    if (status == 2 && isLatestRequest(userId, requestId)) {
+                        Map<String, String> audioEvent = new HashMap<>();
+                        audioEvent.put("name", "audio");
+                        audioEvent.put("data", Base64.getEncoder().encodeToString(mergedAudioBytes.toByteArray()));
+                        emitter.send(SseEmitter.event().data(audioEvent));
+                        log.info("traceId={} currentTime={} {} TTS完成并发送音频 ttsElapsedMs={}, requestElapsedMs={}, audioChunkCount={}, mergedBytes={}",
+                            traceId, currentTime(), stageName, elapsedMs(ttsStartNs), elapsedMs(requestStartNs),
+                            audioChunkCount[0], mergedAudioBytes.size());
+                    }
+                } catch (Exception e) {
+                    audioFailure.compareAndSet(null, new RuntimeException("发送音频失败", e));
+                }
+            }
+
+            @Override
+            public void onError(int code, String message) {
+                audioFailure.compareAndSet(null, new RuntimeException("TTS合成失败: " + code + " - " + message));
+            }
         });
+        if (audioFailure.get() != null) {
+            throw audioFailure.get();
+        }
+    }
+
+    private boolean isLatestRequest(Long userId, Integer requestId) {
+        if (requestId == null) {
+            return true;
+        }
+        Integer latestRequestId = latestRequestIdMap.get(userId);
+        return latestRequestId == null || latestRequestId.equals(requestId);
+    }
+
+    private void completeAfterAudio(AtomicReference<CompletableFuture<Void>> audioChain, SseEmitter emitter,
+                                    String traceId, long requestStartNs) {
+        audioChain.get()
+            .whenComplete((unused, throwable) -> {
+                try {
+                    if (throwable != null) {
+                        log.error("traceId={} 音频队列执行失败 totalElapsedMs={}", traceId, elapsedMs(requestStartNs), throwable);
+                        emitter.completeWithError(throwable);
+                        return;
+                    }
+                    Map<String, String> doneEvent = new HashMap<>();
+                    doneEvent.put("name", "done");
+                    doneEvent.put("data", "");
+                    emitter.send(SseEmitter.event().data(doneEvent));
+                    log.info("traceId={} currentTime={} done事件已发送,关闭SSE连接 totalElapsedMs={}",
+                        traceId, currentTime(), elapsedMs(requestStartNs));
+                    emitter.complete();
+                } catch (Exception e) {
+                    log.error("traceId={} 发送done事件失败 totalElapsedMs={}", traceId, elapsedMs(requestStartNs), e);
+                    emitter.completeWithError(e);
+                }
+            });
+    }
+
+    private String buildTraceId(MessageStreamRequest request) {
+        String requestIdPart = request.getRequestId() != null ? String.valueOf(request.getRequestId()) : "noReq";
+        return "chat-" + requestIdPart + "-" + UUID.randomUUID().toString().substring(0, 8);
+    }
+
+    private long elapsedMs(long startNs) {
+        return java.util.concurrent.TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+    }
+
+    private String currentTime() {
+        return java.time.LocalDateTime.now().toString();
     }
 
     @Override

+ 96 - 36
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/impl/DifyServiceImpl.java

@@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
 @RequiredArgsConstructor
 public class DifyServiceImpl implements IDifyService {
     private final DifyConfig difyConfig;
+    private static final String SENTENCE_DELIMITERS = ",,。.!!??";
     private final OkHttpClient httpClient = new OkHttpClient.Builder()
         .connectTimeout(30, java.util.concurrent.TimeUnit.SECONDS)
         .readTimeout(60, java.util.concurrent.TimeUnit.SECONDS)
@@ -27,14 +28,16 @@ public class DifyServiceImpl implements IDifyService {
         .build();
 
     @Override
-    public void callWorkflowStream(String userMessage, String agentGender,
+    public void callWorkflowStream(String traceId, String userMessage, String agentGender,
                                    List<Map<String, String>> ttsVcnList,
                                    TalkAgentVo agentConfig,
                                    Long userId, String conversationId, String customerPhone, String source,
                                    Consumer<String> onTextChunk,
                                    IDifyService.SentenceCallback onSentence) {
+        long workflowStartNs = System.nanoTime();
         try {
-            log.info("流式调用 Dify 工作流 - userId: {}, conversationId: {}", userId, conversationId);
+            log.info("traceId={} currentTime={} Dify开始请求 userId={}, conversationId={}, messageLength={}",
+                traceId, currentTime(), userId, conversationId, userMessage != null ? userMessage.length() : 0);
 
             // 构建请求体
             Map<String, Object> inputs = new HashMap<>();
@@ -43,6 +46,7 @@ public class DifyServiceImpl implements IDifyService {
             inputs.put("agentGender", genderChinese);
             inputs.put("ttsVcnList", ttsVcnList);
             inputs.put("currentVcn", agentConfig != null ? agentConfig.getTtsVcn() : null);
+            inputs.put("user_id", userId != null ? String.valueOf(userId) : "");
             if (customerPhone != null && !customerPhone.isEmpty()) {
                 inputs.put("customerPhone", customerPhone);
             }
@@ -55,6 +59,7 @@ public class DifyServiceImpl implements IDifyService {
             requestBody.set("query", userMessage);
             requestBody.set("response_mode", "streaming");
             requestBody.set("user", "user-" + userId);
+            requestBody.set("send_at", System.currentTimeMillis());
 
             if (conversationId != null && !conversationId.isEmpty()) {
                 requestBody.set("conversation_id", conversationId);
@@ -69,18 +74,26 @@ public class DifyServiceImpl implements IDifyService {
                 .addHeader("Content-Type", "application/json")
                 .build();
 
+            long httpExecuteStartNs = System.nanoTime();
             Response response = httpClient.newCall(request).execute();
 
-            log.info("Dify API 响应状态码: {}", response.code());
+            log.info("traceId={} currentTime={} Dify响应头返回 status={}, headersElapsedMs={}",
+                traceId, currentTime(), response.code(), elapsedMs(httpExecuteStartNs));
 
             if (!response.isSuccessful()) {
                 String errorBody = response.body() != null ? response.body().string() : "无响应体";
-                log.error("Dify API 调用失败,状态码: {}, 响应: {}", response.code(), errorBody);
+                log.error("traceId={} Dify调用失败 status={}, totalElapsedMs={}, response={}",
+                    traceId, response.code(), elapsedMs(workflowStartNs), errorBody);
                 throw new RuntimeException("Dify API 调用失败: " + response.code());
             }
 
             StringBuilder currentSentence = new StringBuilder();
             String newConversationId = null;
+            boolean firstEventLogged = false;
+            boolean firstMessageLogged = false;
+            boolean firstSentenceLogged = false;
+            int chunkCount = 0;
+            int sentenceCount = 0;
 
             try (ResponseBody responseBody = response.body()) {
                 if (responseBody == null) {
@@ -94,51 +107,98 @@ public class DifyServiceImpl implements IDifyService {
                 String line;
                 while ((line = reader.readLine()) != null) {
                     if (line.startsWith("data: ")) {
+                        if (!firstEventLogged) {
+                            firstEventLogged = true;
+                            log.info("traceId={} currentTime={} Dify首个SSE事件到达 elapsedMs={}",
+                                traceId, currentTime(), elapsedMs(workflowStartNs));
+                        }
                         String jsonData = line.substring(6);
 
+                        JSONObject event;
                         try {
-                            JSONObject event = new JSONObject(jsonData);
-                            String eventType = event.getStr("event");
-
-                            if ("message".equals(eventType)) {
-                                String answer = event.getStr("answer");
-                                if (answer != null) {
-                                    currentSentence.append(answer);
-                                    onTextChunk.accept(answer);
-
-                                    // 检查是否包含句子结束符
-                                    String sentenceText = currentSentence.toString();
-                                    if (sentenceText.matches(".*[。!?.!?]\\s*$")) {
-                                        // 发现句子结束,触发回调
-                                        onSentence.onSentence(sentenceText.trim(), newConversationId, false);
-                                        currentSentence.setLength(0);
-                                    }
+                            event = new JSONObject(jsonData);
+                        } catch (Exception e) {
+                            log.warn("解析 SSE 事件失败: {}", line, e);
+                            continue;
+                        }
+
+                        String eventType = event.getStr("event");
+
+                        if ("message".equals(eventType)) {
+                            String answer = event.getStr("answer");
+                            if (answer != null) {
+                                chunkCount++;
+                                if (!firstMessageLogged) {
+                                    firstMessageLogged = true;
+                                    log.info("traceId={} currentTime={} Dify首个文本分片到达 elapsedMs={}, chunkLength={}",
+                                        traceId, currentTime(), elapsedMs(workflowStartNs), answer.length());
                                 }
-                            } else if ("message_end".equals(eventType)) {
-                                newConversationId = event.getStr("conversation_id");
-
-                                // 处理最后剩余的文本(如果有)
-                                if (currentSentence.length() > 0) {
-                                    onSentence.onSentence(currentSentence.toString().trim(), newConversationId, true);
-                                } else {
-                                    // 没有剩余文本,直接标记完成
-                                    onSentence.onSentence("", newConversationId, true);
+                                currentSentence.append(answer);
+                                onTextChunk.accept(answer);
+
+                                String currentText = currentSentence.toString();
+                                int splitIndex;
+                                while ((splitIndex = findSentenceSplitIndex(currentText)) >= 0) {
+                                    String sentenceText = currentText.substring(0, splitIndex + 1).trim();
+                                    if (!sentenceText.isEmpty()) {
+                                        sentenceCount++;
+                                        if (!firstSentenceLogged) {
+                                            firstSentenceLogged = true;
+                                            log.info("traceId={} currentTime={} Dify首句完成 elapsedMs={}, sentenceLength={}",
+                                                traceId, currentTime(), elapsedMs(workflowStartNs), sentenceText.length());
+                                        }
+                                        onSentence.onSentence(sentenceText, newConversationId, false);
+                                    }
+                                    currentText = currentText.substring(splitIndex + 1);
                                 }
-                            } else if ("error".equals(eventType)) {
-                                String errorMessage = event.getStr("message");
-                                log.error("Dify API 返回错误: {}", errorMessage);
-                                throw new RuntimeException("Dify API 错误: " + errorMessage);
+                                currentSentence.setLength(0);
+                                currentSentence.append(currentText);
                             }
-                        } catch (Exception e) {
-                            log.warn("解析 SSE 事件失败: {}", line, e);
+                        } else if ("message_end".equals(eventType)) {
+                            newConversationId = event.getStr("conversation_id");
+                            log.info("traceId={} currentTime={} Dify收到message_end elapsedMs={}, conversationId={}, chunkCount={}, sentenceCount={}",
+                                traceId, currentTime(), elapsedMs(workflowStartNs), newConversationId, chunkCount, sentenceCount);
+
+                            // 处理最后剩余的文本(如果有)
+                            if (currentSentence.length() > 0) {
+                                sentenceCount++;
+                                onSentence.onSentence(currentSentence.toString().trim(), newConversationId, true);
+                            } else {
+                                // 没有剩余文本,直接标记完成
+                                onSentence.onSentence("", newConversationId, true);
+                            }
+                        } else if ("error".equals(eventType)) {
+                            String errorMessage = event.getStr("message");
+                            log.error("Dify API 返回错误: {}", errorMessage);
+                            throw new RuntimeException("Dify API 错误: " + errorMessage);
                         }
                     }
                 }
             }
 
+            log.info("traceId={} currentTime={} Dify流式处理完成 totalElapsedMs={}, chunkCount={}, sentenceCount={}, finalConversationId={}",
+                traceId, currentTime(), elapsedMs(workflowStartNs), chunkCount, sentenceCount, newConversationId);
+
         } catch (Exception e) {
-            log.error("流式调用 Dify 工作流失败", e);
+            log.error("traceId={} 流式调用 Dify 工作流失败 totalElapsedMs={}", traceId, elapsedMs(workflowStartNs), e);
             throw new RuntimeException("AI 服务调用失败", e);
         }
     }
+
+    private int findSentenceSplitIndex(String text) {
+        for (int i = 0; i < text.length(); i++) {
+            if (SENTENCE_DELIMITERS.indexOf(text.charAt(i)) >= 0) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private long elapsedMs(long startNs) {
+        return java.util.concurrent.TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+    }
+
+    private String currentTime() {
+        return java.time.LocalDateTime.now().toString();
+    }
 }

+ 32 - 2
ruoyi-modules/yp-talk/src/main/java/org/dromara/talk/service/impl/TtsServiceImpl.java

@@ -18,6 +18,8 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @Service
@@ -34,6 +36,7 @@ public class TtsServiceImpl implements ITtsService {
             String wsUrl = getAuthUrl(HOST_URL, xunfeiConfig.getApiKey(), xunfeiConfig.getApiSecret()).replace("https://", "wss://");
             URI uri = new URI(wsUrl);
             String requestJson = buildRequest(text, agentConfig);
+            CountDownLatch latch = new CountDownLatch(1);
 
             WebSocketClient client = new WebSocketClient(uri) {
                 @Override
@@ -50,27 +53,41 @@ public class TtsServiceImpl implements ITtsService {
                     if (response.code != 0) {
                         log.error("TTS返回错误: code={}, message={}", response.code, response.message);
                         callback.onError(response.code, response.message);
+                        close();
+                        latch.countDown();
                         return;
                     }
 
                     if (response.data != null && response.data.audio != null) {
                         callback.onAudio(response.data.audio, response.data.status);
+                        if (response.data.status == 2) {
+                            close();
+                            latch.countDown();
+                        }
                     }
                 }
 
                 @Override
                 public void onClose(int code, String reason, boolean remote) {
                     log.info("TTS WebSocket连接关闭: code={}, reason={}, remote={}", code, reason, remote);
+                    latch.countDown();
                 }
 
                 @Override
                 public void onError(Exception e) {
                     log.error("TTS WebSocket错误", e);
                     callback.onError(-1, e.getMessage());
+                    latch.countDown();
                 }
             };
 
-            client.connect();
+            if (!client.connectBlocking(10, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("TTS WebSocket连接超时");
+            }
+            if (!latch.await(30, TimeUnit.SECONDS)) {
+                client.close();
+                throw new IllegalStateException("TTS语音合成超时");
+            }
 
         } catch (Exception e) {
             log.error("TTS合成失败", e);
@@ -84,6 +101,7 @@ public class TtsServiceImpl implements ITtsService {
             String wsUrl = getAuthUrl(HOST_URL, xunfeiConfig.getApiKey(), xunfeiConfig.getApiSecret()).replace("https://", "wss://");
             URI uri = new URI(wsUrl);
             String requestJson = buildRequest(text, agentConfig);
+            CountDownLatch latch = new CountDownLatch(1);
 
             WebSocketClient client = new WebSocketClient(uri) {
                 @Override
@@ -105,6 +123,10 @@ public class TtsServiceImpl implements ITtsService {
                     if (response.data != null && response.data.audio != null) {
                         log.info("TTS流式收到音频数据: status={}, 音频长度={}", response.data.status, response.data.audio.length());
                         callback.accept(response.data.audio, response.data.status);
+                        if (response.data.status == 2) {
+                            close();
+                            latch.countDown();
+                        }
                     } else {
                         log.warn("TTS流式响应中没有音频数据: data={}", response.data);
                     }
@@ -113,15 +135,23 @@ public class TtsServiceImpl implements ITtsService {
                 @Override
                 public void onClose(int code, String reason, boolean remote) {
                     log.info("TTS WebSocket连接关闭(流式)");
+                    latch.countDown();
                 }
 
                 @Override
                 public void onError(Exception e) {
                     log.error("TTS WebSocket错误(流式)", e);
+                    latch.countDown();
                 }
             };
 
-            client.connect();
+            if (!client.connectBlocking(10, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("TTS流式 WebSocket连接超时");
+            }
+            if (!latch.await(30, TimeUnit.SECONDS)) {
+                client.close();
+                throw new IllegalStateException("TTS流式语音合成超时");
+            }
 
         } catch (Exception e) {
             log.error("TTS流式合成失败", e);