Commit a1305698 authored by tianbo's avatar tianbo

refactor(jg): 重构设备注册和转移服务中的ES数据处理逻辑

- 修改EquipClaimServiceImpl中的checkEsData为saveEsData方法 - 重构ESEquipmentCategory的deleteAllWithFallback方法返回类型为void - 重构JgEquipTransferServiceImpl中的ES数据更新和回滚机制 - 重构JgInstallationNoticeServiceImpl中的批量设备ID处理逻辑 - 实现安装通知取消申请时的ES数据回滚功能 - 添加@EnableEsRollback注解支持ES数据操作回滚 - 重构JgReformNoticeServiceImpl中的压力管道数据处理逻辑 - 实现改造通知接受流程中的ES数据保存和删除列表管理
parent f5fb2108
......@@ -113,14 +113,14 @@ public interface ESEquipmentCategory extends PagingAndSortingRepository<ESEquipm
return result.iterator().next();
}
default Iterable<ESEquipmentCategoryDto> deleteAllWithFallback(Iterable<ESEquipmentCategoryDto> dtos) {
default void deleteAllWithFallback(Iterable<ESEquipmentCategoryDto> dtos) {
int batchSize = 2000;
List<ESEquipmentCategoryDto> dtoList = StreamSupport.stream(dtos.spliterator(), false)
.collect(Collectors.toList());
if (dtoList.isEmpty()) {
return Collections.emptyList();
return;
}
log.info("deleteAllWithCache开始处理ES数据,总数量: {}, 批次大小: {}", dtoList.size(), batchSize);
......@@ -160,6 +160,5 @@ public interface ESEquipmentCategory extends PagingAndSortingRepository<ESEquipm
log.info("deleteAllWithCache数据处理完成,总计: 删除{}条", allDeleteData.size());
return dtoList;
}
}
......@@ -923,7 +923,7 @@ public class EquipClaimServiceImpl {
// 技术参数
jgRegisterInfoService.saveOrUpdateEquParams(new LinkedHashMap<>(equipInfo), new LinkedHashMap<>(equipParams), equList, record, timestamp, "edit");
// 更新es
jgRegisterInfoService.checkEsData(record);
jgRegisterInfoService.saveEsData(record);
eventPublisher.publish(new EquipCreateOrEditEvent(this, BusinessTypeEnum.JG_NEW_EQUIP.name(), Sets.newHashSet(record), EquipCreateOrEditEvent.EquipType.equip));
......
......@@ -32,6 +32,7 @@ import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dao.EsEquipmentDao;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.entity.ESEquipmentInfo;
import com.yeejoin.amos.boot.module.common.api.enums.ConstructionTypeEnum;
import com.yeejoin.amos.boot.module.common.api.enums.CylinderTypeEnum;
import com.yeejoin.amos.boot.module.common.api.mapper.CustomBaseMapper;
import com.yeejoin.amos.boot.module.common.biz.service.impl.EquipmentCategoryService;
......@@ -500,14 +501,16 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
* @return record
*/
private ResponseModel pipelineEquipCreateOrUpdate(Map<String, Object> paramMap, CompanyBo company) {
try {
// 获取表单数据并进行类型检查
LinkedHashMap equipmentInfoForm = castToLinkedHashMap(paramMap.get(EQUIP_INFO_FORM_ID));
// 操作类型
String operateType = ValidationUtil.isEmpty(equipmentInfoForm.get(SEQUENCE_NBR)) ? OPERATESAVE : OPERATEEDIT;
try {
String submitType = String.valueOf(paramMap.get("submitType"));
return ResponseHelper.buildResponse(batchSubmitOrUpdatePipeline(equipmentInfoForm, submitType, company));
} catch (Exception e) {
log.error("操作失败,数据异常: {}", e.getMessage(), e);
handleError(e, String.valueOf(paramMap.getOrDefault(SEQUENCE_NBR, "")));
handleError(e, String.valueOf(paramMap.getOrDefault(SEQUENCE_NBR, "")), operateType);
return ResponseHelper.buildResponse(null);
}
}
......@@ -906,6 +909,8 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
LinkedHashMap equipmentClassForm = (LinkedHashMap) checkAndCast(paramMap.get(EQUIP_CLASS_FORM_ID));
LinkedHashMap equipmentInfoForm = (LinkedHashMap) checkAndCast(paramMap.get(EQUIP_INFO_FORM_ID));
LinkedHashMap equipmentParamsForm = (LinkedHashMap) checkAndCast(paramMap.get(EQUIP_PARAMS_FORM_ID));
// 操作类型
String operateType = ValidationUtil.isEmpty(equipmentInfoForm.get(SEQUENCE_NBR)) ? OPERATESAVE : OPERATEEDIT;
String submitType = String.valueOf(paramMap.get("submitType"));
String record = (String) equipmentInfoForm.get(RECORD);
try {
......@@ -916,7 +921,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
// 96333码 字段的唯一性校验
check96333Code(equipmentInfoForm);
} catch (Exception e) {
handleError(e, null);
handleError(e, null, null);
}
// 操作类型
try {
......@@ -924,11 +929,11 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
record = batchSubmitOrUpdate(equipmentClassForm, equipmentInfoForm, equipmentParamsForm, submitType, company);
// 保存Es数据
if (!ObjectUtils.isEmpty(record)) {
checkEsData(record);
saveEsData(record);
}
} catch (Exception e) {
log.error("操作失败,数据异常: " + e.getMessage(), e);
handleError(e, record);
handleError(e, record, operateType);
}
return ResponseHelper.buildResponse(record);
}
......@@ -964,7 +969,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
// 历史有使用登记证的场车设备校验车牌号的唯一性
checkCarNumberUniquenessWithHisCC(equipmentInfoForm, record, dataSource);
} catch (Exception e) {
handleError(e, null);
handleError(e, null, null);
}
// 使用登记按照单位办理除外,其余进行编辑校验
......@@ -975,17 +980,19 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
this.checkForEquipEdit(record);
}
// 操作类型
String operateType = ValidationUtil.isEmpty(equipmentInfoForm.get(SEQUENCE_NBR)) ? OPERATESAVE : OPERATEEDIT;
try {
// 保存数据
record = batchSubmitOrUpdate(equipmentClassForm, equipmentInfoForm, equipmentParamsForm, submitType, company);
// 保存Es数据
if (!ObjectUtils.isEmpty(record)) {
checkEsData(record);
saveEsData(record);
throw new RuntimeException("保存数据失败");
}
} catch (Exception e) {
log.error("操作失败,数据异常: " + e.getMessage(), e);
handleError(e, record);
handleError(e, record, operateType);
}
return ResponseHelper.buildResponse(record);
}
......@@ -1178,10 +1185,10 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
}
}
private void handleError(Exception e, String record) {
log.error("处理异常: " + e.getMessage(), e);
private void handleError(Exception e, String record, String operateType) {
log.error("处理设备新增或更新异常: " + e.getMessage(), e);
// 删除数据库数据和ES数据
if (!ObjectUtils.isEmpty(record)) {
if (!ObjectUtils.isEmpty(record) && OPERATESAVE.equals(operateType)) {
List<String> records = new ArrayList<>();
records.add(record);
superviseInfoMapper.deleteDataAll(records);
......@@ -3759,6 +3766,9 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
String companyCode = companyInfoMap.get("creditCode").toString();
constructionInfo.setRecord(record);
constructionInfo.setRecDate(date);
if (!registerInfo.getEquList().equals("5000") && !registerInfo.getEquCategory().equals("4400") && !registerInfo.getEquCategory().equals("2300")) {
constructionInfo.setConstructionType(ConstructionTypeEnum.AZ.getCode());
}
if (companyTypeStr.contains(CompanyTypeEnum.CONSTRUCTION.getCode())) {
constructionInfo.setUscUnitCreditCode(companyCode);
......@@ -4052,14 +4062,10 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
: equipSource;
}
public void checkEsData(String id) {
public void saveEsData(String id) {
Map<String, Object> map = categoryOtherInfoMapper.selectDataById(id);
categoryOtherInfoMapper.updateEsStatus(id);
ESEquipmentCategoryDto dto = JSON.parseObject(toJSONString(map), ESEquipmentCategoryDto.class);
Optional<ESEquipmentCategoryDto> data = esEquipmentCategory.findById(id);
if (!ObjectUtils.isEmpty(data)) {
esEquipmentCategory.deleteById(id);
}
if (!ObjectUtils.isEmpty(dto)) {
long recTime;
long createTime;
......
......@@ -10,12 +10,16 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.github.pagehelper.util.StringUtil;
import com.google.common.collect.Lists;
import com.yeejoin.amos.boot.biz.common.bo.CompanyBo;
import com.yeejoin.amos.boot.biz.common.bo.ReginParams;
import com.yeejoin.amos.boot.biz.common.utils.RedisKey;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dao.EsEquipmentDao;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.entity.ESEquipmentInfo;
import com.yeejoin.amos.boot.module.common.api.service.ICompensateFlowDataOfRedis;
import com.yeejoin.amos.boot.module.jg.api.dto.*;
import com.yeejoin.amos.boot.module.jg.api.entity.JgEquipTransfer;
import com.yeejoin.amos.boot.module.jg.api.entity.JgEquipTransferEq;
......@@ -33,10 +37,10 @@ import com.yeejoin.amos.boot.module.jg.biz.context.EquipUsedCheckStrategyContext
import com.yeejoin.amos.boot.module.jg.biz.context.FlowingEquipRedisContext;
import com.yeejoin.amos.boot.module.jg.biz.edit.permission.FillingEditPermForCurrentUser;
import com.yeejoin.amos.boot.module.jg.biz.feign.TzsServiceFeignClient;
import com.yeejoin.amos.boot.module.common.api.service.ICompensateFlowDataOfRedis;
import com.yeejoin.amos.boot.module.jg.biz.service.IIdxBizJgRegisterInfoService;
import com.yeejoin.amos.boot.module.ymt.api.common.BaseException;
import com.yeejoin.amos.boot.module.ymt.api.entity.*;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgProjectContraption;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgUseInfo;
import com.yeejoin.amos.boot.module.ymt.api.enums.ApplicationFormTypeEnum;
import com.yeejoin.amos.boot.module.ymt.api.enums.EquipmentClassifityEnum;
import com.yeejoin.amos.boot.module.ymt.api.enums.FlowStatusEnum;
......@@ -60,6 +64,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.typroject.tyboot.core.foundation.context.RequestContext;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import org.typroject.tyboot.core.rdbms.service.BaseService;
import org.typroject.tyboot.core.restful.exception.instance.BadRequest;
import org.typroject.tyboot.core.restful.utils.ResponseModel;
......@@ -109,6 +114,9 @@ public class JgEquipTransferServiceImpl extends BaseService<JgEquipTransferDto,
@Autowired
private JgResumeInfoServiceImpl jgResumeInfoService;
@Autowired
private ESEquipmentCategory esEquipmentCategory;
/**
* 保存和保存并提交
*
......@@ -504,6 +512,7 @@ public class JgEquipTransferServiceImpl extends BaseService<JgEquipTransferDto,
@Transactional(rollbackFor = Exception.class)
@GlobalTransactional(rollbackFor = Exception.class, timeoutMills = 6000000)
public void accept(JgEquipTransferDto jgEquipTransferDto, String op) {
List<String> recordList = Lists.newArrayList();
String instanceId = jgEquipTransferDto.getInstanceId();
String nextTaskId = jgEquipTransferDto.getNextTaskId();
String lockKey = CommonServiceImpl.buildJgExecuteLockKey(instanceId);
......@@ -556,13 +565,8 @@ public class JgEquipTransferServiceImpl extends BaseService<JgEquipTransferDto,
}
if (jgEquipTransferEqs != null) {
jgEquipTransferEqs.forEach(equipTransferEq -> {
Map<String, Map<String, Object>> resultMap = MapBuilder.<String, Map<String, Object>>create()
.put(equipTransferEq.getEquId(), MapBuilder.<String, Object>create()
.put("USC_UNIT_NAME", jgEquipTransfer.getInstallUnitName())
.put("USC_UNIT_CREDIT_CODE", jgEquipTransfer.getInstallUnitCreditCode())
.build())
.build();
tzsServiceFeignClient.commonUpdateEsDataByIds(resultMap);
recordList.add(equipTransferEq.getEquId());
updateOldEsEquipment(equipTransferEq, jgEquipTransfer);
updateEsEquipment(equipTransferEq, jgEquipTransfer);
writeUsc2UseInfo(equipTransferEq, jgEquipTransfer);
});
......@@ -603,7 +607,8 @@ public class JgEquipTransferServiceImpl extends BaseService<JgEquipTransferDto,
updateById(jgEquipTransfer);
commonService.saveExecuteFlowData2Redis(instanceId, this.buildInstanceRuntimeData(jgEquipTransfer));
this.delRepeatUseEquipData(jgEquipTransfer.getSequenceNbr(), jgEquipTransfer.getApplyStatus(), jgEquipTransfer.getUseUnitCreditCode());
} catch (InterruptedException e) {
} catch (Exception e) {
this.esRollback(recordList);
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread()) {
......@@ -612,6 +617,35 @@ public class JgEquipTransferServiceImpl extends BaseService<JgEquipTransferDto,
}
}
private void updateOldEsEquipment(JgEquipTransferEq equipTransferEq, JgEquipTransfer jgEquipTransfer) {
Optional<ESEquipmentCategoryDto> esOptional = esEquipmentCategory.findById(equipTransferEq.getEquId());
if (esOptional.isPresent()) {
ESEquipmentCategoryDto esEquipmentCategoryDto = esOptional.get();
esEquipmentCategoryDto.setUSC_UNIT_NAME(jgEquipTransfer.getInstallUnitName());
esEquipmentCategoryDto.setUSC_UNIT_CREDIT_CODE(jgEquipTransfer.getInstallUnitCreditCode());
esEquipmentCategory.save(esEquipmentCategoryDto);
}
}
private void esRollback(List<String> recordList) {
Iterable<ESEquipmentCategoryDto> esEquipmentCategoryDtos = esEquipmentCategory.findAllById(recordList);
if (!ValidationUtil.isEmpty(esEquipmentCategoryDtos)) {
esEquipmentCategoryDtos.forEach(esEquipmentCategoryDto -> {
esEquipmentCategoryDto.setUSC_UNIT_NAME(null);
esEquipmentCategoryDto.setUSC_UNIT_CREDIT_CODE(null);
});
esEquipmentCategory.saveAll(esEquipmentCategoryDtos);
}
Iterable<ESEquipmentInfo> esEquipmentInfos = esEquipmentDao.findAllById(recordList);
if (!ValidationUtil.isEmpty(esEquipmentInfos)) {
esEquipmentInfos.forEach(esEquipmentInfo -> {
esEquipmentInfo.setUSC_UNIT_NAME(null);
esEquipmentInfo.setUSC_UNIT_CREDIT_CODE(null);
});
esEquipmentDao.saveAll(esEquipmentInfos);
}
}
private void createResume(List<JgEquipTransferEq> equipTransferEqList, JgEquipTransfer equipTransfer, String routePath) {
if(StringUtils.hasText(equipTransfer.getProjectContraptionSeq())){
jgResumeInfoService.saveBatchResume(Collections.singletonList(
......
......@@ -574,7 +574,7 @@ public class ShCarServiceImpl extends BaseService<ShCar, ShCar, ShCarMapper> imp
idxBizJgInspectionDetectionInfoService.save(inspectionDetectionInfo);
// 保存es
idxBizJgRegisterInfoService.checkEsData(equRecord);
idxBizJgRegisterInfoService.saveEsData(equRecord);
equRecords.add(equRecord);
// 更新 equList 中的元素
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment