|
|
@@ -117,75 +117,77 @@ public class ProductBaseServiceImpl extends ServiceImpl<ProductBaseMapper, Produ
|
|
|
*/
|
|
|
@Override
|
|
|
public void run(ApplicationArguments args) throws Exception {
|
|
|
- // 检查索引是否存在,不存在则创建
|
|
|
- if (!esMapper.existsIndex("productbasevo")) {
|
|
|
- log.info("索引 [productbasevo] 不存在,正在创建...");
|
|
|
- esMapper.createIndex();
|
|
|
- }
|
|
|
- // 只同步缺失的数据,而不是全量拉取
|
|
|
- long totalInDb = baseMapper.selectCount(new LambdaQueryWrapper<>());
|
|
|
- log.info("DbProduct 数量 :" + totalInDb);
|
|
|
- long totalInEs = esMapper.selectCount(new LambdaEsQueryWrapper<>());
|
|
|
- log.info("EsProduct 数量 :" + totalInEs);
|
|
|
-
|
|
|
- if (totalInEs < totalInDb) {
|
|
|
- log.info("ES 数据不完整,开始同步...");
|
|
|
-
|
|
|
- // 使用LIMIT分页查询,避免一次性加载所有数据
|
|
|
- int pageSize = 1000;
|
|
|
- long offset = 0;
|
|
|
- long totalSynced = 0;
|
|
|
-
|
|
|
- // 使用线程池处理批量插入
|
|
|
- int threadCount = 10;
|
|
|
- ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
|
|
|
-
|
|
|
- try {
|
|
|
- while (true) {
|
|
|
- // 使用LIMIT OFFSET进行分页查询
|
|
|
- QueryWrapper<ProductBase> queryWrapper = Wrappers.query();
|
|
|
- queryWrapper.last("LIMIT " + pageSize + " OFFSET " + offset);
|
|
|
- List<ProductBaseVo> currentBatch = baseMapper.selectAllList(queryWrapper);
|
|
|
- if (CollUtil.isEmpty(currentBatch)) {
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- // 异步插入ES
|
|
|
- long finalOffset = offset;
|
|
|
- int batchNumber = (int) (offset / pageSize) + 1;
|
|
|
- CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
|
- try {
|
|
|
- esMapper.insertBatch(currentBatch);
|
|
|
- log.info("成功同步第 {} 批(OFFSET: {}),共 {} 条记录", batchNumber, finalOffset, currentBatch.size());
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("同步第 {} 批数据失败: {}", batchNumber, e.getMessage(), e);
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- }, executorService);
|
|
|
-
|
|
|
- // 等待当前批次完成(控制并发,避免ES压力过大)
|
|
|
- future.join();
|
|
|
-
|
|
|
- totalSynced += currentBatch.size();
|
|
|
- log.info("已同步进度: {}/{}", totalSynced, totalInDb);
|
|
|
-
|
|
|
- // 如果当前批次数据少于pageSize,说明是最后一批
|
|
|
- if (currentBatch.size() < pageSize) {
|
|
|
- break;
|
|
|
- }
|
|
|
+// // 检查索引是否存在,不存在则创建
|
|
|
+// if (!esMapper.existsIndex("productbasevo")) {
|
|
|
+// log.info("索引 [productbasevo] 不存在,正在创建...");
|
|
|
+// esMapper.createIndex();
|
|
|
+// }
|
|
|
+// // 只同步缺失的数据,而不是全量拉取
|
|
|
+// long totalInDb = baseMapper.selectCount(new LambdaQueryWrapper<>());
|
|
|
+// log.info("DbProduct 数量 :" + totalInDb);
|
|
|
+// long totalInEs = esMapper.selectCount(new LambdaEsQueryWrapper<>());
|
|
|
+// log.info("EsProduct 数量 :" + totalInEs);
|
|
|
+//
|
|
|
+// if (totalInEs < totalInDb) {
|
|
|
+// log.info("ES 数据不完整,开始同步...");
|
|
|
+//
|
|
|
+// // 使用LIMIT分页查询,避免一次性加载所有数据
|
|
|
+// int pageSize = 1000;
|
|
|
+// long offset = 0;
|
|
|
+// long totalSynced = 0;
|
|
|
+//
|
|
|
+// // 使用线程池处理批量插入
|
|
|
+// int threadCount = 10;
|
|
|
+// ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
|
|
|
+//
|
|
|
+// try {
|
|
|
+// while (true) {
|
|
|
+// // 使用LIMIT OFFSET进行分页查询
|
|
|
+// QueryWrapper<ProductBase> queryWrapper = Wrappers.query();
|
|
|
+// queryWrapper.last("LIMIT " + pageSize + " OFFSET " + offset);
|
|
|
+// List<ProductBaseVo> currentBatch = baseMapper.selectAllList(queryWrapper);
|
|
|
+// if (CollUtil.isEmpty(currentBatch)) {
|
|
|
+// break;
|
|
|
+// }
|
|
|
+//
|
|
|
+// // 异步插入ES
|
|
|
+// long finalOffset = offset;
|
|
|
+// int batchNumber = (int) (offset / pageSize) + 1;
|
|
|
+// CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
|
+// try {
|
|
|
+// esMapper.insertBatch(currentBatch);
|
|
|
+// log.info("成功同步第 {} 批(OFFSET: {}),共 {} 条记录", batchNumber, finalOffset, currentBatch.size());
|
|
|
+// } catch (Exception e) {
|
|
|
+// log.error("同步第 {} 批数据失败: {}", batchNumber, e.getMessage(), e);
|
|
|
+// throw new RuntimeException(e);
|
|
|
+// }
|
|
|
+// }, executorService);
|
|
|
+//
|
|
|
+// // 等待当前批次完成(控制并发,避免ES压力过大)
|
|
|
+// future.join();
|
|
|
+//
|
|
|
+// totalSynced += currentBatch.size();
|
|
|
+// log.info("已同步进度: {}/{}", totalSynced, totalInDb);
|
|
|
+//
|
|
|
+// // 如果当前批次数据少于pageSize,说明是最后一批
|
|
|
+// if (currentBatch.size() < pageSize) {
|
|
|
+// break;
|
|
|
+// }
|
|
|
+//
|
|
|
+// offset += pageSize;
|
|
|
+//
|
|
|
+// // 添加短暂延迟,避免对数据库和ES造成过大压力
|
|
|
+// Thread.sleep(100);
|
|
|
+// }
|
|
|
+//
|
|
|
+// log.info("ES 数据同步完成,共同步 {} 条记录", totalSynced);
|
|
|
+// } finally {
|
|
|
+// executorService.shutdown();
|
|
|
+// }
|
|
|
+// }
|
|
|
+ }
|
|
|
|
|
|
- offset += pageSize;
|
|
|
|
|
|
- // 添加短暂延迟,避免对数据库和ES造成过大压力
|
|
|
- Thread.sleep(100);
|
|
|
- }
|
|
|
-
|
|
|
- log.info("ES 数据同步完成,共同步 {} 条记录", totalSynced);
|
|
|
- } finally {
|
|
|
- executorService.shutdown();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* 查询产品基础信息(包含扩展信息、价格库存、属性、定制信息)
|
|
|
@@ -2279,5 +2281,13 @@ public class ProductBaseServiceImpl extends ServiceImpl<ProductBaseMapper, Produ
|
|
|
build.setTotal(esPageInfo.getTotal());
|
|
|
return build;
|
|
|
}
|
|
|
+ @Override
|
|
|
+ public void syncEsProduct(String productNos){
|
|
|
+ List<String> productNoList = Arrays.asList(productNos.split(","));
|
|
|
+ List<ProductBaseVo> productBaseVos = baseMapper.selectAllList(
|
|
|
+ Wrappers.query(ProductBase.class).in("b.product_no", productNoList)
|
|
|
+ );
|
|
|
+ esMapper.insertBatch(productBaseVos);
|
|
|
+ }
|
|
|
}
|
|
|
|