Commit a9806080 authored by suhuiguang's avatar suhuiguang

Merge branch 'develop_tzs_main' of http://39.100.92.250:5000/moa/amos-boot-biz…

Merge branch 'develop_tzs_main' of http://39.100.92.250:5000/moa/amos-boot-biz into develop_tzs_main
parents 69e12383 5e2bcc92
package com.yeejoin.amos.boot.module.common.api.dto; package com.yeejoin.amos.boot.module.common.api.dto;
import com.baomidou.mybatisplus.annotation.TableField;
import com.yeejoin.amos.boot.biz.common.annotation.FieldDisplayDefine;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Id;
......
...@@ -54,8 +54,8 @@ import com.yeejoin.amos.boot.module.jg.biz.utils.CodeUtil; ...@@ -54,8 +54,8 @@ import com.yeejoin.amos.boot.module.jg.biz.utils.CodeUtil;
import com.yeejoin.amos.boot.module.jg.biz.utils.JsonUtils; import com.yeejoin.amos.boot.module.jg.biz.utils.JsonUtils;
import com.yeejoin.amos.boot.module.ymt.api.entity.*; import com.yeejoin.amos.boot.module.ymt.api.entity.*;
import com.yeejoin.amos.boot.module.ymt.api.enums.ApplicationFormTypeEnum; import com.yeejoin.amos.boot.module.ymt.api.enums.ApplicationFormTypeEnum;
import com.yeejoin.amos.boot.module.ymt.api.enums.EquipmentEnum;
import com.yeejoin.amos.boot.module.ymt.api.enums.EquipmentClassifityEnum; import com.yeejoin.amos.boot.module.ymt.api.enums.EquipmentClassifityEnum;
import com.yeejoin.amos.boot.module.ymt.api.enums.EquipmentEnum;
import com.yeejoin.amos.boot.module.ymt.api.enums.FlowStatusEnum; import com.yeejoin.amos.boot.module.ymt.api.enums.FlowStatusEnum;
import com.yeejoin.amos.boot.module.ymt.api.mapper.*; import com.yeejoin.amos.boot.module.ymt.api.mapper.*;
import com.yeejoin.amos.feign.privilege.Privilege; import com.yeejoin.amos.feign.privilege.Privilege;
...@@ -70,9 +70,8 @@ import org.apache.commons.lang3.math.NumberUtils; ...@@ -70,9 +70,8 @@ import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import org.typroject.tyboot.core.foundation.context.RequestContext; import org.typroject.tyboot.core.foundation.context.RequestContext;
...@@ -89,9 +88,11 @@ import java.util.*; ...@@ -89,9 +88,11 @@ import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static com.alibaba.fastjson.JSON.toJSONString; import static com.alibaba.fastjson.JSON.toJSONString;
import static com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant.EQU_CATEGORY_CODE_GDYLRQ; import static com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant.EQU_CATEGORY_CODE_GDYLRQ;
...@@ -174,11 +175,9 @@ public class DataDockServiceImpl { ...@@ -174,11 +175,9 @@ public class DataDockServiceImpl {
private final IdxBizJgConstructionInfoMapper idxBizJgConstructionInfoMapper; private final IdxBizJgConstructionInfoMapper idxBizJgConstructionInfoMapper;
private final IdxBizJgOtherInfoMapper idxBizJgOtherInfoMapper; private final IdxBizJgOtherInfoMapper idxBizJgOtherInfoMapper;
private final IdxBizJgInspectionDetectionInfoMapper idxBizJgInspectionDetectionInfoMapper; private final IdxBizJgInspectionDetectionInfoMapper idxBizJgInspectionDetectionInfoMapper;
private final ESEquipmentCategory esEquipmentCategoryDao;
private final PieLineDataChangeServiceImpl pieLineDataChangeService; private final PieLineDataChangeServiceImpl pieLineDataChangeService;
private final ElevatorDataTransactionService elevatorDataTransactionService;
@Value("${equip.detail.path:/mixuap?appId=1742358052905971713&id=1734100233714954241&formType=detail&record=%s&DATA_SOURCE=%s}") @Value("${equip.detail.path:/mixuap?appId=1742358052905971713&id=1734100233714954241&formType=detail&record=%s&DATA_SOURCE=%s}")
private String equipRoutePath; private String equipRoutePath;
...@@ -2721,17 +2720,29 @@ public class DataDockServiceImpl { ...@@ -2721,17 +2720,29 @@ public class DataDockServiceImpl {
* @param equLists 数据集 * @param equLists 数据集
* @return 保存结果 * @return 保存结果
*/ */
@Transactional
public Set<String> saveElevatorData(List<Map<String, Object>> equLists) { public Set<String> saveElevatorData(List<Map<String, Object>> equLists) {
RequestContextWrapper contextWrapper = RequestContextWrapper.capture(); RequestContextWrapper contextWrapper = RequestContextWrapper.capture();
Set<String> recordSet = Sets.newConcurrentHashSet(); Set<String> successfulRecordSet = Sets.newConcurrentHashSet();
Set<String> resultSet = Sets.newConcurrentHashSet(); Set<String> failedRecordSet = Sets.newConcurrentHashSet();
Set<String> failedResultSet = Sets.newConcurrentHashSet();
// 分批处理大小 // 分批处理大小
int batchSize = 100; int batchSize = 100;
int totalSize = equLists.size(); int totalSize = equLists.size();
int successCount = 0; int successfulCount = 0;
int failedCount = 0; int failedCount = 0;
// 从equList中取出record
Set<String> recordList = equLists.stream().map(equ -> MapUtil.getStr(equ, "record")).collect(Collectors.toSet());
// 记录es数据快照
Map<String, ESEquipmentCategoryDto> originalEsResultMap = StreamSupport.stream(
esEquipmentCategoryDao.findAllById(recordList).spliterator(), false)
.collect(Collectors.toMap(
ESEquipmentCategoryDto::getSEQUENCE_NBR,
Function.identity()
));
// 分批处理数据 // 分批处理数据
for (int i = 0; i < totalSize; i += batchSize) { for (int i = 0; i < totalSize; i += batchSize) {
int endIndex = Math.min(i + batchSize, totalSize); int endIndex = Math.min(i + batchSize, totalSize);
...@@ -2742,7 +2753,7 @@ public class DataDockServiceImpl { ...@@ -2742,7 +2753,7 @@ public class DataDockServiceImpl {
.map(equ -> CompletableFuture.supplyAsync(() -> { .map(equ -> CompletableFuture.supplyAsync(() -> {
contextWrapper.apply(); contextWrapper.apply();
try { try {
Object resultObj = elevatorDataTransactionService.saveSingleElevatorData(equ); Object resultObj = this.saveElevatorDataInTransaction(equ, "jg_his_xa", null);
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
result.put("success", true); result.put("success", true);
result.put("data", resultObj); result.put("data", resultObj);
...@@ -2768,17 +2779,19 @@ public class DataDockServiceImpl { ...@@ -2768,17 +2779,19 @@ public class DataDockServiceImpl {
if ((Boolean) result.get("success")) { if ((Boolean) result.get("success")) {
Object resultObj = result.get("data"); Object resultObj = result.get("data");
if (resultObj instanceof String) { if (resultObj instanceof String) {
recordSet.add(resultObj.toString()); successfulRecordSet.add(resultObj.toString());
successCount++; successfulCount++;
} }
// 记录做了其他业务则不能更新的record
if (resultObj instanceof Map) { if (resultObj instanceof Map) {
resultSet.add(JSONObject.toJSONString(resultObj)); failedResultSet.add(JSONObject.toJSONString(resultObj));
failedCount++; failedCount++;
} }
} else { } else {
Exception e = (Exception) result.get("exception"); Exception e = (Exception) result.get("exception");
Map<String, Object> equ = (Map<String, Object>) result.get("equ"); Map<String, Object> equ = (Map<String, Object>) result.get("equ");
handleProcessException(e, equ, resultSet); failedRecordSet.add(MapUtil.getStr(equ, "record"));
handleProcessException(e, equ, failedResultSet);
failedCount++; failedCount++;
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -2788,25 +2801,49 @@ public class DataDockServiceImpl { ...@@ -2788,25 +2801,49 @@ public class DataDockServiceImpl {
} }
log.info("已处理数据 {}/{} - 成功: {}, 失败: {}", log.info("已处理数据 {}/{} - 成功: {}, 失败: {}",
Math.min(i + batchSize, totalSize), totalSize, successCount, failedCount); Math.min(i + batchSize, totalSize), totalSize, successfulCount, failedCount);
} }
// 发布成功记录的事件 // 发布成功记录的事件
if (!recordSet.isEmpty()) { if (!ValidationUtil.isEmpty(successfulRecordSet)) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { eventPublisher.publish(new EquipCreateOrEditEvent(this, BusinessTypeEnum.JG_NEW_EQUIP.name(), successfulRecordSet, EquipCreateOrEditEvent.EquipType.equip));
@Override }
public void afterCommit() { // 处理失败记录对应es
eventPublisher.publish(new EquipCreateOrEditEvent(this, BusinessTypeEnum.JG_NEW_EQUIP.name(), recordSet, EquipCreateOrEditEvent.EquipType.equip)); if (!ValidationUtil.isEmpty(failedRecordSet)) {
// 分离需要更新和需要删除的记录
List<ESEquipmentCategoryDto> updateList = new ArrayList<>();
List<ESEquipmentCategoryDto> deleteList = new ArrayList<>();
for (String record : failedRecordSet) {
ESEquipmentCategoryDto esEquipment = originalEsResultMap.get(record);
if (esEquipment != null) {
updateList.add(esEquipment);
} else {
// 创建仅包含ID的DTO对象用于删除
ESEquipmentCategoryDto deleteDto = new ESEquipmentCategoryDto();
deleteDto.setSEQUENCE_NBR(record); // 设置主键字段
deleteList.add(deleteDto);
} }
}); }
// 批量更新存在的记录
if (!updateList.isEmpty()) {
esEquipmentCategoryDao.saveAll(updateList);
}
// 批量删除不存在的记录
if (!deleteList.isEmpty()) {
esEquipmentCategoryDao.deleteAll(deleteList);
}
} }
log.info("数据导入完成 - 总计: {}条, 成功: {}条, 失败: {}条", log.info("数据导入完成 - 总计: {}条, 成功: {}条, 失败: {}条",
totalSize, successCount, failedCount); totalSize, successfulCount, failedCount);
log.info("未更新设备record:{}", resultSet); log.info("未更新设备record:{}", failedResultSet);
return resultSet; return failedResultSet;
} }
private void handleProcessException(Exception e, Map<String, Object> equ, Set<String> inUseRecordSet) { private void handleProcessException(Exception e, Map<String, Object> equ, Set<String> failedResultSet) {
try { try {
String traceInfo = Arrays.stream(e.getStackTrace()) String traceInfo = Arrays.stream(e.getStackTrace())
.filter(x -> x.getClassName().contains("com.yeejoin")) .filter(x -> x.getClassName().contains("com.yeejoin"))
...@@ -2818,20 +2855,20 @@ public class DataDockServiceImpl { ...@@ -2818,20 +2855,20 @@ public class DataDockServiceImpl {
.map(String::valueOf) .map(String::valueOf)
.orElse(UUID.randomUUID().toString()); .orElse(UUID.randomUUID().toString());
// esEquipmentCategory.deleteById(record);
ExcelImportErrorLogDto errorLogDto = JSON.parseObject(toJSONString(equ), ExcelImportErrorLogDto.class); ExcelImportErrorLogDto errorLogDto = JSON.parseObject(toJSONString(equ), ExcelImportErrorLogDto.class);
errorLogDto.setErrorInfo(e.getMessage()); errorLogDto.setErrorInfo(e.getMessage());
errorLogDto.setTraceInfo(traceInfo); errorLogDto.setTraceInfo(traceInfo);
errorLogDto.setCreateTime(new Date()); errorLogDto.setCreateTime(new Date());
excelImportErrorLogDao.save(errorLogDto); excelImportErrorLogDao.save(errorLogDto);
inUseRecordSet.add(JSONObject.toJSONString(errorLogDto)); failedResultSet.add(JSONObject.toJSONString(errorLogDto));
log.error("处理数据时发生异常,record: {}", record, e); log.error("处理数据时发生异常,record: {}", record, e);
} catch (Exception handleException) { } catch (Exception handleException) {
log.error("处理异常信息时发生错误", handleException); log.error("处理异常信息时发生错误", handleException);
} }
} }
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public Object saveElevatorDataInTransaction(Map<String, Object> equ, String dataSource, String remark) { public Object saveElevatorDataInTransaction(Map<String, Object> equ, String dataSource, String remark) {
Map<String, String> errorResult = Maps.newConcurrentMap(); Map<String, String> errorResult = Maps.newConcurrentMap();
String record = Optional.ofNullable(equ.get("record")).map(String::valueOf).orElse(UUID.randomUUID().toString()); String record = Optional.ofNullable(equ.get("record")).map(String::valueOf).orElse(UUID.randomUUID().toString());
......
package com.yeejoin.amos.boot.module.jg.biz.service.impl;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
@Service
public class ElevatorDataTransactionService {
private final DataDockServiceImpl dataDockService;
@Autowired
public ElevatorDataTransactionService(@Lazy DataDockServiceImpl dataDockService) {
this.dataDockService = dataDockService;
}
@Transactional(rollbackFor = Exception.class)
public Object saveSingleElevatorData(Map<String, Object> equ) {
return dataDockService.saveElevatorDataInTransaction(equ, "jg_his_xa", null);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment