Commit 63f45e00 authored by tianbo's avatar tianbo

refactor(jg): 优化设备注册信息的ES数据同步和删除逻辑

- 将selectDataById方法重命名为selectJgAllViewDataById以提高语义清晰度 - 在设备注册服务中实现ES数据删除的回滚机制,确保数据一致性 - 重构删除逻辑,先查询原有ES数据再执行删除操作 - 统一ES数据构建方法,提高代码复用性 - 完善异常处理机制,在操作失败时恢复ES数据状态
parent 8fd6fe17
......@@ -842,7 +842,7 @@ public class DataDockServiceImpl {
* @param record 设备唯一编码
*/
private void saveEquInfoToEs(String record, String isCompleteXa) {
Map<String, Object> map = categoryOtherInfoMapper.selectDataById(record);
Map<String, Object> map = categoryOtherInfoMapper.selectJgAllViewDataById(record);
ESEquipmentCategoryDto equipmentCategoryDto = JSON.parseObject(toJSONString(map), ESEquipmentCategoryDto.class);
if (!ObjectUtils.isEmpty(equipmentCategoryDto)) {
long recTime;
......@@ -2949,7 +2949,7 @@ public class DataDockServiceImpl {
idxBizJgUseInfoService.updateById(useInfo);
// es更新
Map<String, Object> map = categoryOtherInfoMapper.selectDataById(useInfo.getRecord());
Map<String, Object> map = categoryOtherInfoMapper.selectJgAllViewDataById(useInfo.getRecord());
ESEquipmentCategoryDto equipmentCategoryDto = JSON.parseObject(toJSONString(map), ESEquipmentCategoryDto.class);
equipmentCategoryDto.setIS_INTO_MANAGEMENT(Boolean.TRUE);
esEquipmentCategory.save(equipmentCategoryDto);
......
......@@ -106,7 +106,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static com.alibaba.fastjson.JSON.toJSONString;
import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNoticeServiceImpl.CONSTRUCTION_TYPE;
......@@ -1141,7 +1140,7 @@ public class DataHandlerServiceImpl {
}
String[] recordArr = records.trim().split(",");
for (String record : recordArr) {
Map<String, Object> map = categoryOtherInfoMapper.selectDataById(record);
Map<String, Object> map = categoryOtherInfoMapper.selectJgAllViewDataById(record);
categoryOtherInfoMapper.updateEsStatus(record);
ESEquipmentCategoryDto dto = JSON.parseObject(toJSONString(map), ESEquipmentCategoryDto.class);
Optional<ESEquipmentCategoryDto> data = esEquipmentCategory.findById(record);
......
......@@ -113,6 +113,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static com.alibaba.fastjson.JSON.toJSONString;
import static com.yeejoin.amos.boot.module.common.api.enums.CylinderTypeEnum.SPECIAL_CYLINDER;
......@@ -501,16 +502,28 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
* @return record
*/
private ResponseModel pipelineEquipCreateOrUpdate(Map<String, Object> paramMap, CompanyBo company) {
List<ESEquipmentCategoryDto> addEsEquipCategoryDtos = new ArrayList<>();
List<ESEquipmentCategoryDto> deleteEsEquipCategoryDtos = new ArrayList<>();
List<ESEquipmentInfo> addEsEquipInfos = new ArrayList<>();
List<ESEquipmentInfo> deleteEsEquipInfos = new ArrayList<>();
// 获取表单数据并进行类型检查
LinkedHashMap equipmentInfoForm = castToLinkedHashMap(paramMap.get(EQUIP_INFO_FORM_ID));
// 操作类型
String operateType = ValidationUtil.isEmpty(equipmentInfoForm.get(SEQUENCE_NBR)) ? OPERATESAVE : OPERATEEDIT;
try {
String submitType = String.valueOf(paramMap.get("submitType"));
return ResponseHelper.buildResponse(batchSubmitOrUpdatePipeline(equipmentInfoForm, submitType, company));
return ResponseHelper.buildResponse(batchSubmitOrUpdatePipeline(equipmentInfoForm, submitType, company, addEsEquipCategoryDtos, deleteEsEquipCategoryDtos, addEsEquipInfos, deleteEsEquipInfos));
} catch (Exception e) {
log.error("操作失败,数据异常: {}", e.getMessage(), e);
handleError(e, String.valueOf(paramMap.getOrDefault(SEQUENCE_NBR, "")), operateType);
// 回滚es:新增的需要删除、删除的需要用快照数据保存
if (!ValidationUtil.isEmpty(addEsEquipCategoryDtos)) {
esEquipmentCategory.deleteAll(addEsEquipCategoryDtos);
esEquipmentDao.deleteAll(this.buildEquipData(addEsEquipCategoryDtos.stream().map(ESEquipmentCategoryDto::getSEQUENCE_NBR).collect(Collectors.toList())));
}
if (!ValidationUtil.isEmpty(deleteEsEquipCategoryDtos)) {
esEquipmentCategory.saveAll(deleteEsEquipCategoryDtos);
}
if (!ValidationUtil.isEmpty(deleteEsEquipInfos)) {
esEquipmentDao.saveAll(deleteEsEquipInfos);
}
return ResponseHelper.buildResponse(null);
}
}
......@@ -535,7 +548,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
return new ArrayList<>(15);
}
private Long batchSubmitOrUpdatePipeline(LinkedHashMap equipmentInfoForm, String submitType, CompanyBo company) {
private Long batchSubmitOrUpdatePipeline(LinkedHashMap equipmentInfoForm, String submitType, CompanyBo company, List<ESEquipmentCategoryDto> addEsEquipCategoryDtos, List<ESEquipmentCategoryDto> deleteEsEquipCategoryDtos, List<ESEquipmentInfo> addEsEquipInfos, List<ESEquipmentInfo> deleteEsEquipInfos) {
Date date = new Date();
String operateType = ValidationUtil.isEmpty(equipmentInfoForm.get(SEQUENCE_NBR)) ? OPERATESAVE : OPERATEEDIT;
// 设备是否复制而来,复制来的设备走新增
......@@ -558,10 +571,14 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
.stream()
.map(IdxBizJgUseInfo::getRecord)
.collect(Collectors.toList());
idxBizJgRegisterInfoService.batchDeleteByRecord(MapBuilder.<String, Object>create()
.put("recordList", records)
.put("equList", equipmentInfoForm.get("EQU_LIST"))
.build());
// 删除涉及的19张表的数据
superviseInfoMapper.deleteDataAll(records);
// 先查询旧es数据,用于后续报错回滚
deleteEsEquipCategoryDtos.addAll(StreamSupport.stream(esEquipmentCategory.findAllById(records).spliterator(), false).collect(Collectors.toList()));
deleteEsEquipInfos.addAll(StreamSupport.stream(esEquipmentDao.findAllById(records).spliterator(), false).collect(Collectors.toList()));
// 删除两个es
esEquipmentCategory.deleteAll(this.buildEquipCategoryData(records));
esEquipmentDao.deleteAll(this.buildEquipData(records));
}
List<Map<String, Object>> pipelineList = (List<Map<String, Object>>) equipmentInfoForm.get(PIPELINE_LIST);
......@@ -837,6 +854,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
esEquipmentDto.setPRODUCT_NAME(pipelineInfo.getPipeName());
esEquipmentDto.setProjectContraptionId(String.valueOf(sequenceNbr));
esEquipmentCategoryList.add(esEquipmentDto);
addEsEquipCategoryDtos.add(esEquipmentDto);
}
idxBizJgUseInfoService.saveOrUpdateBatch(useInfoList);
iIdxBizJgDesignInfoService.saveOrUpdateBatch(designInfoList);
......@@ -911,8 +929,12 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
LinkedHashMap equipmentParamsForm = (LinkedHashMap) checkAndCast(paramMap.get(EQUIP_PARAMS_FORM_ID));
// 操作类型
String operateType = ValidationUtil.isEmpty(equipmentInfoForm.get(SEQUENCE_NBR)) ? OPERATESAVE : OPERATEEDIT;
// 设备是否复制而来,复制来的设备走新增
boolean isCopy = !ValidationUtil.isEmpty(equipmentInfoForm.get(IS_COPY));
operateType = isCopy ? OPERATESAVE : operateType;
String submitType = String.valueOf(paramMap.get("submitType"));
String record = (String) equipmentInfoForm.get(RECORD);
ESEquipmentCategoryDto esEquipmentCategoryDto = esEquipmentCategory.findById(record).orElse(null);
try {
// 设备代码 字段的唯一性校验
checkEquCodeUniqueness(equipmentInfoForm);
......@@ -921,7 +943,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
// 96333码 字段的唯一性校验
check96333Code(equipmentInfoForm);
} catch (Exception e) {
handleError(e, null, null);
handleError(e, null, null, null);
}
// 操作类型
try {
......@@ -933,7 +955,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
}
} catch (Exception e) {
log.error("操作失败,数据异常: " + e.getMessage(), e);
handleError(e, record, operateType);
handleError(e, record, operateType, esEquipmentCategoryDto);
}
return ResponseHelper.buildResponse(record);
}
......@@ -948,7 +970,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
String submitType = String.valueOf(paramMap.get("submitType"));
String record = (String) equipmentInfoForm.get(RECORD);
String dataSource = (String) equipmentInfoForm.get(DATA_SOURCE);
ESEquipmentCategoryDto esEquipmentCategoryDto = esEquipmentCategory.findById(record).orElse(null);
if (dataSource.contains("his")) {
// 使用登记证编号校验
this.checkUseRegistrationCodeIsNotNUll(equipmentInfoForm);
......@@ -969,7 +991,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
// 历史有使用登记证的场车设备校验车牌号的唯一性
checkCarNumberUniquenessWithHisCC(equipmentInfoForm, record, dataSource);
} catch (Exception e) {
handleError(e, null, null);
handleError(e, null, null, null);
}
// 使用登记按照单位办理除外,其余进行编辑校验
......@@ -981,6 +1003,9 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
}
// 操作类型
String operateType = ValidationUtil.isEmpty(equipmentInfoForm.get(SEQUENCE_NBR)) ? OPERATESAVE : OPERATEEDIT;
// 设备是否复制而来,复制来的设备走新增
boolean isCopy = !ValidationUtil.isEmpty(equipmentInfoForm.get(IS_COPY));
operateType = isCopy ? OPERATESAVE : operateType;
try {
// 保存数据
record = batchSubmitOrUpdate(equipmentClassForm, equipmentInfoForm, equipmentParamsForm, submitType, company);
......@@ -991,7 +1016,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
} catch (Exception e) {
log.error("操作失败,数据异常: " + e.getMessage(), e);
handleError(e, record, operateType);
handleError(e, record, operateType, esEquipmentCategoryDto);
}
return ResponseHelper.buildResponse(record);
}
......@@ -1184,7 +1209,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
}
}
private void handleError(Exception e, String record, String operateType) {
private void handleError(Exception e, String record, String operateType, ESEquipmentCategoryDto esEquipmentCategoryDto) {
log.error("处理设备新增或更新异常: " + e.getMessage(), e);
// 删除数据库数据和ES数据
if (!ObjectUtils.isEmpty(record) && OPERATESAVE.equals(operateType)) {
......@@ -1193,6 +1218,9 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
superviseInfoMapper.deleteDataAll(records);
esEquipmentCategory.deleteById(record);
}
if (!ValidationUtil.isEmpty(record) && OPERATEEDIT.equals(operateType) && !ValidationUtil.isEmpty(esEquipmentCategoryDto)) {
esEquipmentCategory.save(esEquipmentCategoryDto);
}
throw new BadRequest(e.getMessage());
}
......@@ -1220,34 +1248,49 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
@Override
@Transactional(rollbackFor = Exception.class)
public boolean batchDeleteByRecord(Map<String, Object> map) {
Object recordList = map.get("recordList");
List<String> records = new ArrayList<>();
List<ESEquipmentCategoryDto> list = new ArrayList<>();
// 删除ES数据
if (recordList.toString().contains("[")) {
for (String record : (List<String>) recordList) {
records.add(record);
ESEquipmentCategoryDto esEquipmentCategoryDto = new ESEquipmentCategoryDto();
esEquipmentCategoryDto.setSEQUENCE_NBR(record);
list.add(esEquipmentCategoryDto);
List<ESEquipmentCategoryDto> originalEsList1 = new ArrayList<>();
List<ESEquipmentInfo> originalEsList2 = new ArrayList<>();
try {
Object recordList = map.get("recordList");
// 删除ES数据
if (recordList.toString().contains("[")) {
records.addAll((List<String>) recordList);
} else {
records.add(recordList.toString());
}
} else {
records.add(recordList.toString());
ESEquipmentCategoryDto esEquipmentCategoryDto = new ESEquipmentCategoryDto();
esEquipmentCategoryDto.setSEQUENCE_NBR(recordList.toString());
list.add(esEquipmentCategoryDto);
this.checkForDelete(records);
if (CollUtil.isNotEmpty(records)) {
// 删除涉及的19张表的数据
superviseInfoMapper.deleteDataAll(records);
}
if (CollUtil.isNotEmpty(records)) {
originalEsList1 = StreamSupport.stream(esEquipmentCategory.findAllById(records).spliterator(), false).collect(Collectors.toList());
originalEsList2 = StreamSupport.stream(esEquipmentDao.findAllById(records).spliterator(), false).collect(Collectors.toList());
// 删除es中的数据
esEquipmentCategory.deleteAll(this.buildEquipCategoryData(records));
esEquipmentDao.deleteAll(this.buildEquipData(records));
}
return true;
} catch (Exception e) {
log.error("批量删除设备注册信息异常: " + e.getMessage(), e);
this.rollbackEsInfo(originalEsList1, originalEsList2);
throw new BadRequest(e.getMessage());
}
this.checkForDelete(records);
if (CollUtil.isNotEmpty(records)) {
// 删除涉及的19张表的数据
superviseInfoMapper.deleteDataAll(records);
}
/**
* 回滚es删除的数据
* @param originalEsList1 esEquipmentCategoryDto集合
* @param originalEsList2 esEquipmentInfo集合
*/
private void rollbackEsInfo(List<ESEquipmentCategoryDto> originalEsList1, List<ESEquipmentInfo> originalEsList2) {
if (CollUtil.isNotEmpty(originalEsList1)) {
esEquipmentCategory.saveAll(originalEsList1);
}
if (CollUtil.isNotEmpty(list)) {
// 删除es中的数据
esEquipmentCategory.deleteAll(list);
esEquipmentDao.deleteAll(this.buildEquipData(records));
if (CollUtil.isNotEmpty(originalEsList2)) {
esEquipmentDao.saveAll(originalEsList2);
}
return true;
}
private Iterable<? extends ESEquipmentInfo> buildEquipData(List<String> records) {
......@@ -1258,6 +1301,14 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
}).collect(Collectors.toList());
}
private Iterable<? extends ESEquipmentCategoryDto> buildEquipCategoryData(List<String> records) {
return records.stream().map(record->{
ESEquipmentCategoryDto esEquipmentInfo = new ESEquipmentCategoryDto();
esEquipmentInfo.setSEQUENCE_NBR(record);
return esEquipmentInfo;
}).collect(Collectors.toList());
}
/**
* 删除校验,被引用时不可删除
*
......@@ -4062,8 +4113,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
}
public void saveEsData(String id) {
Map<String, Object> map = categoryOtherInfoMapper.selectDataById(id);
categoryOtherInfoMapper.updateEsStatus(id);
Map<String, Object> map = categoryOtherInfoMapper.selectJgAllViewDataById(id);
ESEquipmentCategoryDto dto = JSON.parseObject(toJSONString(map), ESEquipmentCategoryDto.class);
if (!ObjectUtils.isEmpty(dto)) {
long recTime;
......
......@@ -13,7 +13,9 @@ import com.yeejoin.amos.boot.biz.common.utils.RedisKey;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.biz.common.utils.SnowflakeIdUtil;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dao.EsEquipmentDao;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.entity.ESEquipmentInfo;
import com.yeejoin.amos.boot.module.jg.api.dto.ShCarDto;
import com.yeejoin.amos.boot.module.jg.api.entity.*;
import com.yeejoin.amos.boot.module.jg.api.enums.BusinessTypeEnum;
......@@ -94,6 +96,8 @@ public class ShCarServiceImpl extends BaseService<ShCar, ShCar, ShCarMapper> imp
@Resource
private ESEquipmentCategory esEquipmentCategory;
@Resource
private EsEquipmentDao esEquipmentDao;
@Resource
private TzsServiceFeignClient tzsServiceFeignClient;
@Resource
private JgVehicleInformationServiceImpl vehicleInformationService;
......@@ -707,13 +711,23 @@ public class ShCarServiceImpl extends BaseService<ShCar, ShCar, ShCarMapper> imp
* 认领过程中报错,回滚第一步的es数据
*/
public void handleClaimFailed(List<String> records) {
if (!records.isEmpty()) {
records.forEach(record -> {
Optional<ESEquipmentCategoryDto> data = esEquipmentCategory.findById(record);
if (!ObjectUtils.isEmpty(data)) {
esEquipmentCategory.deleteById(record);
}
});
}
esEquipmentCategory.deleteAll(this.buildEquipCategoryData(records));
esEquipmentDao.deleteAll(this.buildEquipData(records));
}
private Iterable<? extends ESEquipmentInfo> buildEquipData(List<String> records) {
return records.stream().map(record->{
ESEquipmentInfo esEquipmentInfo = new ESEquipmentInfo();
esEquipmentInfo.setSEQUENCE_NBR(record);
return esEquipmentInfo;
}).collect(Collectors.toList());
}
private Iterable<? extends ESEquipmentCategoryDto> buildEquipCategoryData(List<String> records) {
return records.stream().map(record->{
ESEquipmentCategoryDto esEquipmentInfo = new ESEquipmentCategoryDto();
esEquipmentInfo.setSEQUENCE_NBR(record);
return esEquipmentInfo;
}).collect(Collectors.toList());
}
}
\ No newline at end of file
......@@ -30,7 +30,7 @@ public interface CategoryOtherInfoMapper extends BaseMapper<CategoryOtherInfo> {
int updateOtherInfo(String supervisorCode, String editStatus);
Map<String, Object> selectDataById(String id);
Map<String, Object> selectJgAllViewDataById(String id);
CategoryOtherInfo queryInitCode(@Param("initCode") @NonNull String initCode);
......
......@@ -73,7 +73,7 @@
LEFT JOIN idx_biz_jg_use_info ibjui ON ibjoi.RECORD = ibjui.RECORD
WHERE ibjoi."RECORD" = #{record}
</select>
<select id="selectDataById" resultType="java.util.Map">
<select id="selectJgAllViewDataById" resultType="java.util.Map">
SELECT "SEQUENCE_NBR",
"REC_DATE",
"CREATE_DATE",
......
......@@ -1664,7 +1664,7 @@ public class EquipmentCategoryServiceImpl extends BaseService<EquipmentCategoryD
@Override
public void checkEsData(String id) {
Map<String, Object> map = categoryOtherInfoMapper.selectDataById(id);
Map<String, Object> map = categoryOtherInfoMapper.selectJgAllViewDataById(id);
categoryOtherInfoMapper.updateEsStatus(id);
ESEquipmentCategoryDto dto = JSON.parseObject(toJSONString(map), ESEquipmentCategoryDto.class);
Optional<ESEquipmentCategoryDto> data = esEquipmentCategory.findById(id);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment