Commit 3cd28fc4 authored by suhuiguang's avatar suhuiguang

feat(jg): es数据同步

1.es增加字段是否球罐后,数据的同步
parent e1fb30cd
...@@ -40,5 +40,9 @@ public class TZSCommonConstant { ...@@ -40,5 +40,9 @@ public class TZSCommonConstant {
public static final String ES_INDEX_NAME_EQUIPMENT_INFO = "idx_biz_equipment_info"; public static final String ES_INDEX_NAME_EQUIPMENT_INFO = "idx_biz_equipment_info";
public static final String ES_INDEX_NAME_JG_ALL = "idx_biz_view_jg_all"; public static final String ES_INDEX_NAME_JG_ALL = "idx_biz_view_jg_all";
/**
* 固定式压力容器
*/
public static final String EQU_CATEGORY_CODE_GDYLRQ = "2100";
} }
...@@ -375,4 +375,12 @@ public class DataHandlerController extends BaseController { ...@@ -375,4 +375,12 @@ public class DataHandlerController extends BaseController {
@RequestParam(value = "isDelete", defaultValue = "false") boolean isDelete) { @RequestParam(value = "isDelete", defaultValue = "false") boolean isDelete) {
return ResponseHelper.buildResponse(dataHandlerService.deleteEquipIsNotClaimed(useUnitCreditCode, equList, dataSource, isDelete)); return ResponseHelper.buildResponse(dataHandlerService.deleteEquipIsNotClaimed(useUnitCreditCode, equList, dataSource, isDelete));
} }
@TycloudOperation(ApiLevel = UserType.AGENCY)
@PutMapping(value = "/initTank2Es")
@ApiOperation(value = "固定压力容器,初始是否球罐到es")
public ResponseModel<Integer> initTank2Es(){
Integer dealNum = dataHandlerService.initTank2Es();
return ResponseHelper.buildResponse(dealNum);
}
} }
\ No newline at end of file
package com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgUseInfoServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.util.StopWatch;
import java.util.List;
@Slf4j
public abstract class BatchDataPatcher implements HistoricalDataPatcher {
private final ApplicationContext applicationContext;
protected BatchDataPatcher(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public Integer patchBatchData() {
StopWatch watch = new StopWatch();
watch.start();
IdxBizJgUseInfoServiceImpl useInfoService = applicationContext.getBean(IdxBizJgUseInfoServiceImpl.class);
Integer maxVersion = useInfoService.getBaseMapper().selectMaxVersion();
Integer nextVersion = maxVersion + 1;
List<String> refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionAll(nextVersion);
while (!refreshRecords.isEmpty()) {
refreshRecords.parallelStream().forEach(record -> {
try {
beforePatching(record);
patchSingleRecord(record);
afterPatching(record);
} catch (Exception e) {
// 异常数据跳过
log.error("数据修补失败,设备:{}", record, e);
}
});
useInfoService.getBaseMapper().updateVersionBatch(refreshRecords, nextVersion);
refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionAll(nextVersion);
}
watch.stop();
log.info("数据修补完成,共处理{}条记录,耗时: {}秒", refreshRecords, watch.getTotalTimeSeconds());
return null;
}
protected abstract void beforePatching(String record);
protected abstract void patchSingleRecord(String record);
protected abstract void afterPatching(String record);
}
package com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher;
public interface HistoricalDataPatcher {
/**
* 执行批量修补
* @return 处理成功的记录数,如果不可计算则返回null
*/
Integer patchBatchData();
}
package com.yeejoin.amos.boot.module.jg.biz.data.fix.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yeejoin.amos.boot.biz.common.entity.TzsBaseEntity;
import com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher.BatchDataPatcher;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgRegisterInfoServiceImpl;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgRegisterInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* 是否球阀数据修补处理器
*/
@Component
@Slf4j
public class WeatherTankFieldPatcher extends BatchDataPatcher {
private final IdxBizJgRegisterInfoServiceImpl registerInfoService;
private final ESEquipmentCategory equipmentCategory;
protected WeatherTankFieldPatcher(ApplicationContext applicationContext, IdxBizJgRegisterInfoServiceImpl registerInfoService, ESEquipmentCategory equipmentCategory) {
super(applicationContext);
this.registerInfoService = registerInfoService;
this.equipmentCategory = equipmentCategory;
}
@Override
protected void patchSingleRecord(String record) {
IdxBizJgRegisterInfo registerInfo = registerInfoService.getBaseMapper().selectOne(
new LambdaQueryWrapper<IdxBizJgRegisterInfo>()
.eq(IdxBizJgRegisterInfo::getRecord, record)
.select(IdxBizJgRegisterInfo::getRecord,
TzsBaseEntity::getSequenceNbr,
IdxBizJgRegisterInfo::getEquCategory,
IdxBizJgRegisterInfo::getWhetherSphericalTank));
// 是否球阀只在固定式压力容器式才需要
if (registerInfo == null || !TZSCommonConstant.EQU_CATEGORY_CODE_GDYLRQ.equals(registerInfo.getEquCategory())) {
return;
}
Optional<ESEquipmentCategoryDto> op = equipmentCategory.findById(registerInfo.getRecord());
if (op.isPresent()) {
ESEquipmentCategoryDto equipmentCategoryDto = op.get();
equipmentCategoryDto.setWhetherSphericalTank(registerInfo.getWhetherSphericalTank());
equipmentCategory.save(equipmentCategoryDto);
}
}
@Override
protected void beforePatching(String record) {
}
@Override
protected void afterPatching(String record) {
}
}
...@@ -19,8 +19,8 @@ public interface IBizDataChangeHandleStrategy { ...@@ -19,8 +19,8 @@ public interface IBizDataChangeHandleStrategy {
* @return 业务类型 * @return 业务类型
*/ */
String canHandleBizType(); String canHandleBizType();
/** /**
* 获取变更信息详情-二级详情 * 获取变更信息详情-二级详情
* *
* @param applyNo 单据或者设备唯一,单据编辑时用来查询eq表 * @param applyNo 单据或者设备唯一,单据编辑时用来查询eq表
......
...@@ -40,6 +40,7 @@ import com.yeejoin.amos.boot.module.jg.api.entity.*; ...@@ -40,6 +40,7 @@ import com.yeejoin.amos.boot.module.jg.api.entity.*;
import com.yeejoin.amos.boot.module.jg.api.enums.PipelineEnum; import com.yeejoin.amos.boot.module.jg.api.enums.PipelineEnum;
import com.yeejoin.amos.boot.module.jg.api.enums.SafetyProblemTypeEnum; import com.yeejoin.amos.boot.module.jg.api.enums.SafetyProblemTypeEnum;
import com.yeejoin.amos.boot.module.jg.api.mapper.*; import com.yeejoin.amos.boot.module.jg.api.mapper.*;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.WeatherTankFieldPatcher;
import com.yeejoin.amos.boot.module.jg.biz.feign.TzsServiceFeignClient; import com.yeejoin.amos.boot.module.jg.biz.feign.TzsServiceFeignClient;
import com.yeejoin.amos.boot.module.jg.biz.handler.strategy.ProblemHandleStrategy; import com.yeejoin.amos.boot.module.jg.biz.handler.strategy.ProblemHandleStrategy;
import com.yeejoin.amos.boot.module.jg.biz.listener.SafetyProblemTopicMessage; import com.yeejoin.amos.boot.module.jg.biz.listener.SafetyProblemTopicMessage;
...@@ -170,6 +171,8 @@ public class DataHandlerServiceImpl { ...@@ -170,6 +171,8 @@ public class DataHandlerServiceImpl {
private final RestHighLevelClient restHighLevelClient; private final RestHighLevelClient restHighLevelClient;
private final WeatherTankFieldPatcher weatherTankFieldPatcher;
/** /**
* 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式 * 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式
...@@ -2282,7 +2285,7 @@ public class DataHandlerServiceImpl { ...@@ -2282,7 +2285,7 @@ public class DataHandlerServiceImpl {
public Integer deleteEquipIsNotClaimed(String useUnitCreditCode, String equList, public Integer deleteEquipIsNotClaimed(String useUnitCreditCode, String equList,
String dataSource, boolean isDelete) { String dataSource, boolean isDelete) {
List<IdxBizJgRegisterInfo> jgRegisterInfoList = registerInfoMapper.selectDeleteEquipIsNotClaimed(useUnitCreditCode, equList, dataSource ); List<IdxBizJgRegisterInfo> jgRegisterInfoList = registerInfoMapper.selectDeleteEquipIsNotClaimed(useUnitCreditCode, equList, dataSource);
List<String> records = Optional.ofNullable(jgRegisterInfoList) List<String> records = Optional.ofNullable(jgRegisterInfoList)
.orElse(Collections.emptyList()) .orElse(Collections.emptyList())
.stream() .stream()
...@@ -2301,4 +2304,9 @@ public class DataHandlerServiceImpl { ...@@ -2301,4 +2304,9 @@ public class DataHandlerServiceImpl {
} }
return records.size(); return records.size();
} }
public Integer initTank2Es() {
return weatherTankFieldPatcher.patchBatchData();
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment