Explorar o código

导入历史数据异步处理修改

Zhangbw hai 1 mes
pai
achega
b8d406df72

+ 8 - 0
.claude/settings.local.json

@@ -0,0 +1,8 @@
+{
+  "permissions": {
+    "allow": [
+      "Bash(mvn clean)",
+      "Bash(mvn compile -pl ruoyi-modules/yp-stock -am -DskipTests)"
+    ]
+  }
+}

+ 1 - 1
ruoyi-admin/src/main/java/org/dromara/DromaraApplication.java

@@ -13,7 +13,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
  *
  * @author Lion Li
  */
-@EnableAsync
+@EnableAsync(proxyTargetClass = true)
 @EnableScheduling
 @SpringBootApplication
 @ComponentScan(basePackages = {"org.dromara", "com.yingpai"})

+ 4 - 3
ruoyi-modules/yp-stock/src/main/java/com/yingpai/stock/mapper/StockPoolHistoryMapper.java

@@ -52,15 +52,16 @@ public interface StockPoolHistoryMapper extends BaseMapperPlus<StockPoolHistory,
                                          @Param("endDate") LocalDate endDate);
 
     /**
-     * 查询指定股票从指定日期开始向后10天的最高价(用于计算high_trend)
+     * 查询指定股票从指定日期的第二天开始向后10天的最高价(用于计算high_trend)
+     * 不包含当天,从第二天开始计算未来涨幅
      * @param stockCode 股票代码
      * @param startDate 开始日期(记录日期)
      * @param endDate 结束日期(当前导入日期,限制查询范围)
-     * @return 向后10天内最高价,如果没有则返回null
+     * @return 向后10天内最高价(不含当天),如果没有则返回null
      */
     @Select("SELECT MAX(day_highest_price) FROM stock_pool_history " +
             "WHERE stock_code = #{stockCode} " +
-            "AND record_date >= #{startDate} " +
+            "AND record_date > #{startDate} " +
             "AND record_date <= #{endDate} " +
             "AND record_date <= DATE_ADD(#{startDate}, INTERVAL 10 DAY)")
     BigDecimal selectForwardTenDaysHighestPrice(@Param("stockCode") String stockCode,

+ 17 - 0
ruoyi-modules/yp-stock/src/main/java/com/yingpai/stock/service/IStockPoolAsyncService.java

@@ -0,0 +1,17 @@
+package com.yingpai.stock.service;
+
+import java.time.LocalDate;
+
+/**
+ * 股票池异步处理服务接口
+ */
+public interface IStockPoolAsyncService {
+
+    /**
+     * 异步执行导入后的耗时操作
+     * 包括:数据补全、更新强势池10天涨幅、回溯更新high_trend
+     *
+     * @param importDate 导入日期
+     */
+    void asyncProcessAfterImport(LocalDate importDate);
+}

+ 148 - 0
ruoyi-modules/yp-stock/src/main/java/com/yingpai/stock/service/impl/StockPoolAsyncServiceImpl.java

@@ -0,0 +1,148 @@
+package com.yingpai.stock.service.impl;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.yingpai.stock.domain.StockPoolHistory;
+import com.yingpai.stock.mapper.StockPoolHistoryMapper;
+import com.yingpai.stock.service.IStockPoolAsyncService;
+import com.yingpai.stock.service.IStockPoolService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.LocalDate;
+import java.util.List;
+
+/**
+ * 股票池异步处理服务实现类
+ */
+@Slf4j
+@RequiredArgsConstructor
+@Service
+public class StockPoolAsyncServiceImpl implements IStockPoolAsyncService {
+
+    private final StockPoolHistoryMapper stockPoolHistoryMapper;
+    private final IStockPoolService stockPoolService;
+
+    @Override
+    @Async
+    public void asyncProcessAfterImport(LocalDate importDate) {
+        log.info("[异步处理] 开始执行导入后的耗时操作,导入日期: {}, 线程: {}",
+            importDate, Thread.currentThread().getName());
+
+        // 调用数据补全功能,将历史数据补全到stock_pool表
+        try {
+            log.info("[数据补全] 开始补全历史数据到stock_pool表,导入日期: {}", importDate);
+            String completeResult = stockPoolService.completeHistoryData(importDate);
+            log.info("[数据补全] 补全结果: {}", completeResult);
+        } catch (Exception e) {
+            log.error("[数据补全] 补全历史数据失败: {}", e.getMessage(), e);
+        }
+
+        // 更新强势池10天最高涨幅
+        try {
+            log.info("[强势池10天涨幅] 开始更新强势池10天最高涨幅,导入日期: {}", importDate);
+            String gainResult = stockPoolService.updateStrongPoolTenDayGain(importDate);
+            log.info("[强势池10天涨幅] 更新结果: {}", gainResult);
+        } catch (Exception e) {
+            log.error("[强势池10天涨幅] 更新失败: {}", e.getMessage(), e);
+        }
+
+        // 回溯更新过去10天内所有记录的high_trend
+        try {
+            log.info("[回溯更新high_trend] 开始回溯更新,导入日期: {}", importDate);
+            String updateResult = updatePastTenDaysHighTrend(importDate);
+            log.info("[回溯更新high_trend] 更新结果: {}", updateResult);
+        } catch (Exception e) {
+            log.error("[回溯更新high_trend] 更新失败: {}", e.getMessage(), e);
+        }
+
+        log.info("[异步处理] 导入后的耗时操作全部完成,导入日期: {}, 线程: {}",
+            importDate, Thread.currentThread().getName());
+    }
+
+    /**
+     * 回溯更新过去10天内所有记录的high_trend
+     * 当导入新的历史数据后,需要更新过去10天内所有记录的high_trend,
+     * 因为这些记录的"未来10天最高价"可能会因为新数据的加入而改变
+     *
+     * @param importDate 当前导入的日期
+     * @return 更新结果信息
+     */
+    private String updatePastTenDaysHighTrend(LocalDate importDate) {
+        log.info("[回溯更新high_trend] 开始更新,导入日期: {}", importDate);
+
+        // 查询过去10天内的所有历史记录(包括今天)
+        LocalDate tenDaysAgo = importDate.minusDays(10);
+        LambdaQueryWrapper<StockPoolHistory> lqw = Wrappers.lambdaQuery();
+        lqw.ge(StockPoolHistory::getRecordDate, tenDaysAgo)
+           .le(StockPoolHistory::getRecordDate, importDate);
+
+        List<StockPoolHistory> recordsToUpdate = stockPoolHistoryMapper.selectList(lqw);
+
+        if (recordsToUpdate.isEmpty()) {
+            String message = "没有需要回溯更新的历史记录(过去10天内)";
+            log.info("[回溯更新high_trend] {}", message);
+            return message;
+        }
+
+        log.info("[回溯更新high_trend] 找到 {} 条需要更新的记录", recordsToUpdate.size());
+
+        int updatedCount = 0;
+        int skippedCount = 0;
+
+        for (StockPoolHistory record : recordsToUpdate) {
+            try {
+                String stockCode = record.getStockCode();
+                LocalDate recordDate = record.getRecordDate();
+                BigDecimal dayClosePrice = record.getDayClosePrice();
+
+                if (dayClosePrice == null || dayClosePrice.compareTo(BigDecimal.ZERO) == 0) {
+                    log.warn("[回溯更新high_trend] 股票 {} 日期 {} 收盘价为空或为0,跳过",
+                        stockCode, recordDate);
+                    skippedCount++;
+                    continue;
+                }
+
+                // 查询从记录日期到当前导入日期的最高价(向后查询,最多10天)
+                BigDecimal maxHighPrice = stockPoolHistoryMapper.selectForwardTenDaysHighestPrice(
+                    stockCode, recordDate, importDate);
+
+                if (maxHighPrice == null) {
+                    log.warn("[回溯更新high_trend] 股票 {} 在 [{}, {}] 范围内没有最高价数据,跳过",
+                        stockCode, recordDate, importDate);
+                    skippedCount++;
+                    continue;
+                }
+
+                // 计算涨幅:(最高价 - 当天收盘价) / 当天收盘价 * 100
+                BigDecimal highTrend = maxHighPrice.subtract(dayClosePrice)
+                    .divide(dayClosePrice, 4, RoundingMode.HALF_UP)
+                    .multiply(new BigDecimal("100"))
+                    .setScale(2, RoundingMode.HALF_UP);
+
+                // 更新记录
+                record.setHighTrend(highTrend);
+                stockPoolHistoryMapper.updateById(record);
+
+                updatedCount++;
+
+                log.debug("[回溯更新high_trend] 股票 {} 日期 {} 更新成功,收盘价: {}, 最高价: {}, 涨幅: {}%",
+                    stockCode, recordDate, dayClosePrice, maxHighPrice, highTrend);
+
+            } catch (Exception e) {
+                log.error("[回溯更新high_trend] 股票 {} 日期 {} 更新失败: {}",
+                    record.getStockCode(), record.getRecordDate(), e.getMessage(), e);
+                skippedCount++;
+            }
+        }
+
+        String message = String.format("回溯更新完成!成功更新 %d 条,跳过 %d 条",
+            updatedCount, skippedCount);
+        log.info("[回溯更新high_trend] {}", message);
+        return message;
+    }
+}

+ 3 - 125
ruoyi-modules/yp-stock/src/main/java/com/yingpai/stock/service/impl/StockPoolHistoryServiceImpl.java

@@ -14,6 +14,7 @@ import com.yingpai.stock.domain.bo.StockPoolHistoryBo;
 import com.yingpai.stock.domain.vo.StockPoolHistoryVo;
 import com.yingpai.stock.mapper.StockPoolHistoryMapper;
 import com.yingpai.stock.mapper.StockPoolMapper;
+import com.yingpai.stock.service.IStockPoolAsyncService;
 import com.yingpai.stock.service.IStockPoolHistoryService;
 import com.yingpai.stock.service.IStockPoolService;
 import lombok.RequiredArgsConstructor;
@@ -22,7 +23,6 @@ import org.dromara.common.core.utils.StringUtils;
 import org.dromara.common.mybatis.core.page.PageQuery;
 import org.dromara.common.mybatis.core.page.TableDataInfo;
 import org.dromara.common.satoken.utils.LoginHelper;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.multipart.MultipartFile;
@@ -49,6 +49,7 @@ public class StockPoolHistoryServiceImpl implements IStockPoolHistoryService {
     private final StockPoolHistoryMapper baseMapper;
     private final StockPoolMapper stockPoolMapper;
     private final IStockPoolService stockPoolService;
+    private final IStockPoolAsyncService stockPoolAsyncService;
 
     @Override
     public TableDataInfo<StockPoolHistoryVo> queryPageList(StockPoolHistoryBo bo, PageQuery pageQuery) {
@@ -454,7 +455,7 @@ public class StockPoolHistoryServiceImpl implements IStockPoolHistoryService {
         addTopTenToStrongPool(parsedRecordDate);
 
         // 异步执行耗时的后续操作(数据补全、更新涨幅、回溯更新)
-        asyncProcessAfterImport(parsedRecordDate);
+        stockPoolAsyncService.asyncProcessAfterImport(parsedRecordDate);
 
         if (failureNum > 0) {
             String message = String.format("导入完成!成功 %d 条(其中更新 %d 条),失败 %d 条。详细错误信息请查看服务器日志。",
@@ -578,47 +579,6 @@ public class StockPoolHistoryServiceImpl implements IStockPoolHistoryService {
         }
     }
 
-    /**
-     * 异步执行导入后的耗时操作
-     * 包括:数据补全、更新强势池10天涨幅、回溯更新high_trend
-     *
-     * @param importDate 导入日期
-     */
-    @Async
-    public void asyncProcessAfterImport(LocalDate importDate) {
-        log.info("[异步处理] 开始执行导入后的耗时操作,导入日期: {}", importDate);
-
-        // 调用数据补全功能,将历史数据补全到stock_pool表
-        try {
-            log.info("[数据补全] 开始补全历史数据到stock_pool表,导入日期: {}", importDate);
-            String completeResult = stockPoolService.completeHistoryData(importDate);
-            log.info("[数据补全] 补全结果: {}", completeResult);
-        } catch (Exception e) {
-            log.error("[数据补全] 补全历史数据失败: {}", e.getMessage(), e);
-        }
-
-        // 更新强势池10天最高涨幅
-        try {
-            log.info("[强势池10天涨幅] 开始更新强势池10天最高涨幅,导入日期: {}", importDate);
-            String gainResult = stockPoolService.updateStrongPoolTenDayGain(importDate);
-            log.info("[强势池10天涨幅] 更新结果: {}", gainResult);
-        } catch (Exception e) {
-            log.error("[强势池10天涨幅] 更新失败: {}", e.getMessage(), e);
-        }
-
-        // 回溯更新过去10天内所有记录的high_trend
-        try {
-            log.info("[回溯更新high_trend] 开始回溯更新,导入日期: {}", importDate);
-            String updateResult = updatePastTenDaysHighTrend(importDate);
-            log.info("[回溯更新high_trend] 更新结果: {}", updateResult);
-        } catch (Exception e) {
-            log.error("[回溯更新high_trend] 更新失败: {}", e.getMessage(), e);
-        }
-
-        log.info("[异步处理] 导入后的耗时操作全部完成,导入日期: {}", importDate);
-    }
-
-
     /**
      * 从 Map 中获取字符串值
      */
@@ -742,88 +702,6 @@ public class StockPoolHistoryServiceImpl implements IStockPoolHistoryService {
             .setScale(2, RoundingMode.HALF_UP);
     }
 
-    /**
-     * 回溯更新过去10天内所有记录的high_trend
-     * 当导入新的历史数据后,需要更新过去10天内所有记录的high_trend,
-     * 因为这些记录的"未来10天最高价"可能会因为新数据的加入而改变
-     *
-     * @param importDate 当前导入的日期
-     * @return 更新结果信息
-     */
-    private String updatePastTenDaysHighTrend(LocalDate importDate) {
-        log.info("[回溯更新high_trend] 开始更新,导入日期: {}", importDate);
-
-        // 查询过去10天内的所有历史记录(包括今天)
-        LocalDate tenDaysAgo = importDate.minusDays(10);
-        LambdaQueryWrapper<StockPoolHistory> lqw = Wrappers.lambdaQuery();
-        lqw.ge(StockPoolHistory::getRecordDate, tenDaysAgo)
-           .le(StockPoolHistory::getRecordDate, importDate);
-
-        List<StockPoolHistory> recordsToUpdate = baseMapper.selectList(lqw);
-
-        if (recordsToUpdate.isEmpty()) {
-            String message = "没有需要回溯更新的历史记录(过去10天内)";
-            log.info("[回溯更新high_trend] {}", message);
-            return message;
-        }
-
-        log.info("[回溯更新high_trend] 找到 {} 条需要更新的记录", recordsToUpdate.size());
-
-        int updatedCount = 0;
-        int skippedCount = 0;
-
-        for (StockPoolHistory record : recordsToUpdate) {
-            try {
-                String stockCode = record.getStockCode();
-                LocalDate recordDate = record.getRecordDate();
-                BigDecimal dayClosePrice = record.getDayClosePrice();
-
-                if (dayClosePrice == null || dayClosePrice.compareTo(BigDecimal.ZERO) == 0) {
-                    log.warn("[回溯更新high_trend] 股票 {} 日期 {} 收盘价为空或为0,跳过",
-                        stockCode, recordDate);
-                    skippedCount++;
-                    continue;
-                }
-
-                // 查询从记录日期到当前导入日期的最高价(向后查询,最多10天)
-                BigDecimal maxHighPrice = baseMapper.selectForwardTenDaysHighestPrice(
-                    stockCode, recordDate, importDate);
-
-                if (maxHighPrice == null) {
-                    log.warn("[回溯更新high_trend] 股票 {} 在 [{}, {}] 范围内没有最高价数据,跳过",
-                        stockCode, recordDate, importDate);
-                    skippedCount++;
-                    continue;
-                }
-
-                // 计算涨幅:(最高价 - 当天收盘价) / 当天收盘价 * 100
-                BigDecimal highTrend = maxHighPrice.subtract(dayClosePrice)
-                    .divide(dayClosePrice, 4, RoundingMode.HALF_UP)
-                    .multiply(new BigDecimal("100"))
-                    .setScale(2, RoundingMode.HALF_UP);
-
-                // 更新记录
-                record.setHighTrend(highTrend);
-                baseMapper.updateById(record);
-
-                updatedCount++;
-
-                log.debug("[回溯更新high_trend] 股票 {} 日期 {} 更新成功,收盘价: {}, 最高价: {}, 涨幅: {}%",
-                    stockCode, recordDate, dayClosePrice, maxHighPrice, highTrend);
-
-            } catch (Exception e) {
-                log.error("[回溯更新high_trend] 股票 {} 日期 {} 更新失败: {}",
-                    record.getStockCode(), record.getRecordDate(), e.getMessage(), e);
-                skippedCount++;
-            }
-        }
-
-        String message = String.format("回溯更新完成!成功更新 %d 条,跳过 %d 条",
-            updatedCount, skippedCount);
-        log.info("[回溯更新high_trend] {}", message);
-        return message;
-    }
-
     /**
      * 构建查询条件
      */