Commit b0b01cf8 authored by suhuiguang's avatar suhuiguang

feat(jg): 西安数据核对

1.核对数据同步接口
parent 520238cc
...@@ -423,8 +423,8 @@ public class DataHandlerController extends BaseController { ...@@ -423,8 +423,8 @@ public class DataHandlerController extends BaseController {
} }
@TycloudOperation(ApiLevel = UserType.AGENCY) @TycloudOperation(ApiLevel = UserType.AGENCY)
@PostMapping(value = "/orgBranchCode2Db") @PutMapping(value = "/sync/es2db/org-branches")
@ApiOperation(httpMethod = "PUT", value = "属地监管部门以es老索引为准刷数据库", notes = "属地监管部门以es为准刷数据库") @ApiOperation(httpMethod = "PUT", value = "1.将属地监管部门以es老索引为准刷数据库", notes = "属地监管部门以es为准刷数据库")
public ResponseModel<Long> orgBranchCode2Db(@ApiParam(value = "设备种类code") @RequestParam String equListCode, public ResponseModel<Long> orgBranchCode2Db(@ApiParam(value = "设备种类code") @RequestParam String equListCode,
@ApiParam(value = "设备类别code") @RequestParam(required = false) String equCategoryCode, @ApiParam(value = "设备类别code") @RequestParam(required = false) String equCategoryCode,
@ApiParam(value = "属地code") @RequestParam String orgBranchCode , @ApiParam(value = "属地code") @RequestParam String orgBranchCode ,
...@@ -434,8 +434,8 @@ public class DataHandlerController extends BaseController { ...@@ -434,8 +434,8 @@ public class DataHandlerController extends BaseController {
@TycloudOperation(ApiLevel = UserType.AGENCY) @TycloudOperation(ApiLevel = UserType.AGENCY)
@PostMapping(value = "/synEquipFromDb2Es") @PutMapping(value = "/sync/db2es/equipments")
@ApiOperation(httpMethod = "PUT", value = "同步设备数据到es新旧索引", notes = "同步设备数据到es新旧索引") @ApiOperation(httpMethod = "PUT", value = "2.同步设备数据到es新旧索引", notes = "同步设备数据到es新旧索引")
public ResponseModel<Integer> synEquipFromDb2Es(@ApiParam(value = "设备种类code") @RequestParam String equListCode, public ResponseModel<Integer> synEquipFromDb2Es(@ApiParam(value = "设备种类code") @RequestParam String equListCode,
@ApiParam(value = "设备类别code") @RequestParam(required = false) String equCategoryCode, @ApiParam(value = "设备类别code") @RequestParam(required = false) String equCategoryCode,
@ApiParam(value = "属地code") @RequestParam String orgBranchCode) { @ApiParam(value = "属地code") @RequestParam String orgBranchCode) {
......
...@@ -9,11 +9,11 @@ import java.util.List; ...@@ -9,11 +9,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
@Slf4j @Slf4j
public abstract class BatchDataPatcherWithFilter implements HistoricalDataPatcher { public abstract class FilterableBatchDataPatcher implements HistoricalDataPatcher<Map<String, Object>> {
private final ApplicationContext applicationContext; private final ApplicationContext applicationContext;
protected BatchDataPatcherWithFilter(ApplicationContext applicationContext) { protected FilterableBatchDataPatcher(ApplicationContext applicationContext) {
this.applicationContext = applicationContext; this.applicationContext = applicationContext;
} }
...@@ -25,6 +25,7 @@ public abstract class BatchDataPatcherWithFilter implements HistoricalDataPatche ...@@ -25,6 +25,7 @@ public abstract class BatchDataPatcherWithFilter implements HistoricalDataPatche
Integer maxVersion = useInfoService.getBaseMapper().selectMaxVersionWithParams(buildFilter(params)); Integer maxVersion = useInfoService.getBaseMapper().selectMaxVersionWithParams(buildFilter(params));
Integer nextVersion = maxVersion + 1; Integer nextVersion = maxVersion + 1;
List<String> refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion,buildFilter(params)); List<String> refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion,buildFilter(params));
int patchSize = refreshRecords.size();
while (!refreshRecords.isEmpty()) { while (!refreshRecords.isEmpty()) {
refreshRecords.parallelStream().forEach(record -> { refreshRecords.parallelStream().forEach(record -> {
try { try {
...@@ -37,11 +38,12 @@ public abstract class BatchDataPatcherWithFilter implements HistoricalDataPatche ...@@ -37,11 +38,12 @@ public abstract class BatchDataPatcherWithFilter implements HistoricalDataPatche
} }
}); });
useInfoService.getBaseMapper().updateVersionBatch(refreshRecords, nextVersion); useInfoService.getBaseMapper().updateVersionBatch(refreshRecords, nextVersion);
refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionAll(nextVersion); refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion,buildFilter(params));
patchSize = patchSize + refreshRecords.size();
} }
watch.stop(); watch.stop();
log.info("数据修补完成,共处理{}条记录,耗时: {}秒", refreshRecords, watch.getTotalTimeSeconds()); log.info("数据修补完成,共处理{}条记录,耗时: {}秒", patchSize, watch.getTotalTimeSeconds());
return null; return patchSize;
} }
protected abstract Map<String, Object> buildFilter(Map<String, Object> params); protected abstract Map<String, Object> buildFilter(Map<String, Object> params);
......
...@@ -9,11 +9,11 @@ import java.util.List; ...@@ -9,11 +9,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
@Slf4j @Slf4j
public abstract class BatchDataPatcher implements HistoricalDataPatcher { public abstract class FullBatchDataPatcher implements HistoricalDataPatcher<Map<String, Object>> {
private final ApplicationContext applicationContext; private final ApplicationContext applicationContext;
protected BatchDataPatcher(ApplicationContext applicationContext) { protected FullBatchDataPatcher(ApplicationContext applicationContext) {
this.applicationContext = applicationContext; this.applicationContext = applicationContext;
} }
......
package com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher; package com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher;
import java.util.Map; public interface HistoricalDataPatcher<T> {
public interface HistoricalDataPatcher {
/** /**
* 执行批量修补 * 执行批量修补
*
* @return 处理成功的记录数,如果不可计算则返回null * @return 处理成功的记录数,如果不可计算则返回null
*/ */
Integer patchBatchData(Map<String, Object> params); Integer patchBatchData(T params);
} }
...@@ -5,7 +5,7 @@ import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory; ...@@ -5,7 +5,7 @@ import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto; import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.entity.TzsDataRefreshMessage; import com.yeejoin.amos.boot.module.common.api.entity.TzsDataRefreshMessage;
import com.yeejoin.amos.boot.module.common.biz.refresh.DataRefreshEvent; import com.yeejoin.amos.boot.module.common.biz.refresh.DataRefreshEvent;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher.BatchDataPatcherWithFilter; import com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher.FilterableBatchDataPatcher;
import com.yeejoin.amos.boot.module.jg.biz.refresh.StatisticsDataUpdateService; import com.yeejoin.amos.boot.module.jg.biz.refresh.StatisticsDataUpdateService;
import com.yeejoin.amos.boot.module.jg.biz.refresh.handler.EquipmentRefreshHandler; import com.yeejoin.amos.boot.module.jg.biz.refresh.handler.EquipmentRefreshHandler;
import com.yeejoin.amos.boot.module.ymt.api.mapper.IdxBizJgUseInfoMapper; import com.yeejoin.amos.boot.module.ymt.api.mapper.IdxBizJgUseInfoMapper;
...@@ -18,7 +18,7 @@ import java.util.Optional; ...@@ -18,7 +18,7 @@ import java.util.Optional;
@Component @Component
@Slf4j @Slf4j
public class EquipInsert2EsPatcher extends BatchDataPatcherWithFilter { public class FilterableEquipInsert2EsPatcher extends FilterableBatchDataPatcher {
private final ESEquipmentCategory equipmentCategory; private final ESEquipmentCategory equipmentCategory;
...@@ -27,7 +27,7 @@ public class EquipInsert2EsPatcher extends BatchDataPatcherWithFilter { ...@@ -27,7 +27,7 @@ public class EquipInsert2EsPatcher extends BatchDataPatcherWithFilter {
private final IdxBizJgUseInfoMapper idxBizJgUseInfoMapper; private final IdxBizJgUseInfoMapper idxBizJgUseInfoMapper;
protected EquipInsert2EsPatcher(ApplicationContext applicationContext, ESEquipmentCategory equipmentCategory, EquipmentRefreshHandler refreshHandler, IdxBizJgUseInfoMapper idxBizJgUseInfoMapper) { protected FilterableEquipInsert2EsPatcher(ApplicationContext applicationContext, ESEquipmentCategory equipmentCategory, EquipmentRefreshHandler refreshHandler, IdxBizJgUseInfoMapper idxBizJgUseInfoMapper) {
super(applicationContext); super(applicationContext);
this.equipmentCategory = equipmentCategory; this.equipmentCategory = equipmentCategory;
this.refreshHandler = refreshHandler; this.refreshHandler = refreshHandler;
......
...@@ -5,7 +5,7 @@ import com.yeejoin.amos.boot.biz.common.entity.TzsBaseEntity; ...@@ -5,7 +5,7 @@ import com.yeejoin.amos.boot.biz.common.entity.TzsBaseEntity;
import com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant; import com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory; import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto; import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher.BatchDataPatcher; import com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher.FullBatchDataPatcher;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgRegisterInfoServiceImpl; import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgRegisterInfoServiceImpl;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgRegisterInfo; import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgRegisterInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -19,14 +19,14 @@ import java.util.Optional; ...@@ -19,14 +19,14 @@ import java.util.Optional;
*/ */
@Component @Component
@Slf4j @Slf4j
public class WeatherTankFieldPatcher extends BatchDataPatcher { public class WeatherTankFieldPatcherFull extends FullBatchDataPatcher {
private final IdxBizJgRegisterInfoServiceImpl registerInfoService; private final IdxBizJgRegisterInfoServiceImpl registerInfoService;
private final ESEquipmentCategory equipmentCategory; private final ESEquipmentCategory equipmentCategory;
protected WeatherTankFieldPatcher(ApplicationContext applicationContext, IdxBizJgRegisterInfoServiceImpl registerInfoService, ESEquipmentCategory equipmentCategory) { protected WeatherTankFieldPatcherFull(ApplicationContext applicationContext, IdxBizJgRegisterInfoServiceImpl registerInfoService, ESEquipmentCategory equipmentCategory) {
super(applicationContext); super(applicationContext);
this.registerInfoService = registerInfoService; this.registerInfoService = registerInfoService;
this.equipmentCategory = equipmentCategory; this.equipmentCategory = equipmentCategory;
......
...@@ -44,9 +44,9 @@ import com.yeejoin.amos.boot.module.jg.api.enums.BusinessTypeEnum; ...@@ -44,9 +44,9 @@ import com.yeejoin.amos.boot.module.jg.api.enums.BusinessTypeEnum;
import com.yeejoin.amos.boot.module.jg.api.enums.PipelineEnum; import com.yeejoin.amos.boot.module.jg.api.enums.PipelineEnum;
import com.yeejoin.amos.boot.module.jg.api.enums.SafetyProblemTypeEnum; import com.yeejoin.amos.boot.module.jg.api.enums.SafetyProblemTypeEnum;
import com.yeejoin.amos.boot.module.jg.api.mapper.*; import com.yeejoin.amos.boot.module.jg.api.mapper.*;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.EquipInsert2EsPatcher; import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.FilterableEquipInsert2EsPatcher;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.ReceiveOrgFixService; import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.ReceiveOrgFixService;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.WeatherTankFieldPatcher; import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.WeatherTankFieldPatcherFull;
import com.yeejoin.amos.boot.module.jg.biz.event.publisher.EventPublisher; import com.yeejoin.amos.boot.module.jg.biz.event.publisher.EventPublisher;
import com.yeejoin.amos.boot.module.jg.biz.feign.TzsServiceFeignClient; import com.yeejoin.amos.boot.module.jg.biz.feign.TzsServiceFeignClient;
import com.yeejoin.amos.boot.module.jg.biz.handler.strategy.ProblemHandleStrategy; import com.yeejoin.amos.boot.module.jg.biz.handler.strategy.ProblemHandleStrategy;
...@@ -194,9 +194,9 @@ public class DataHandlerServiceImpl { ...@@ -194,9 +194,9 @@ public class DataHandlerServiceImpl {
private final RestHighLevelClient restHighLevelClient; private final RestHighLevelClient restHighLevelClient;
private final WeatherTankFieldPatcher weatherTankFieldPatcher; private final WeatherTankFieldPatcherFull weatherTankFieldPatcher;
private final EquipInsert2EsPatcher equipInsert2EsPatcher; private final FilterableEquipInsert2EsPatcher equipInsert2EsPatcher;
private final EventPublisher eventPublisher; private final EventPublisher eventPublisher;
...@@ -2748,27 +2748,26 @@ public class DataHandlerServiceImpl { ...@@ -2748,27 +2748,26 @@ public class DataHandlerServiceImpl {
if (StringUtils.isNotEmpty(equCategoryCode)) { if (StringUtils.isNotEmpty(equCategoryCode)) {
boolMust.must(QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("EQU_CATEGORY_CODE", equCategoryCode))); boolMust.must(QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("EQU_CATEGORY_CODE", equCategoryCode)));
} }
// 数据库更新属地的设备总数
long totalUpdate = 0L; long totalUpdate = 0L;
try { try {
String version = DateUtil.today() + seq;
buildVersionFilter(version, boolMust);
CountRequest countRequest = new CountRequest(); CountRequest countRequest = new CountRequest();
countRequest.indices(IDX_BIZ_VIEW_JG_ALL); countRequest.indices(IDX_BIZ_VIEW_JG_ALL);
countRequest.query(boolMust); countRequest.query(boolMust);
CountResponse countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT); CountResponse countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
long total = countResponse.getCount(); long total = countResponse.getCount();
int pageSize = 1000; int pageSize = 1000;
int totalPage = Math.max(1, (int) Math.ceil((double) total / pageSize)); int totalPage = (int) Math.ceil((double) total / pageSize);
String version = DateUtil.today() + seq;
builder.size(pageSize); builder.size(pageSize);
SearchResponse response = null;
buildVersionFilter(version, boolMust);
builder.query(boolMust); builder.query(boolMust);
builder.sort("REC_DATE", SortOrder.DESC); builder.sort("REC_DATE", SortOrder.DESC);
SearchRequest request = new SearchRequest(); SearchRequest request = new SearchRequest();
request.indices(IDX_BIZ_VIEW_JG_ALL); request.indices(IDX_BIZ_VIEW_JG_ALL);
for (int i = 1; i <= totalPage; i++) { for (int i = 1; i <= totalPage; i++) {
builder.from((i - 1) * pageSize);
request.source(builder); request.source(builder);
response = restHighLevelClient.search(request, RequestOptions.DEFAULT); SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
for (org.elasticsearch.search.SearchHit hit : response.getHits()) { for (org.elasticsearch.search.SearchHit hit : response.getHits()) {
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(hit); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(hit);
ESEquipmentCategoryDto equipmentCategoryDto = JSONObject.toJavaObject(jsonObject.getJSONObject("sourceAsMap"), ESEquipmentCategoryDto.class); ESEquipmentCategoryDto equipmentCategoryDto = JSONObject.toJavaObject(jsonObject.getJSONObject("sourceAsMap"), ESEquipmentCategoryDto.class);
...@@ -2777,10 +2776,16 @@ public class DataHandlerServiceImpl { ...@@ -2777,10 +2776,16 @@ public class DataHandlerServiceImpl {
.set(IdxBizJgSupervisionInfo::getOrgBranchCode, equipmentCategoryDto.getOrgBranchCode()) .set(IdxBizJgSupervisionInfo::getOrgBranchCode, equipmentCategoryDto.getOrgBranchCode())
.set(IdxBizJgSupervisionInfo::getOrgBranchName, equipmentCategoryDto.getORG_BRANCH_NAME()) .set(IdxBizJgSupervisionInfo::getOrgBranchName, equipmentCategoryDto.getORG_BRANCH_NAME())
.update(); .update();
Integer recordCount = useInfoService.lambdaQuery().eq(IdxBizJgUseInfo::getRecord, equipmentCategoryDto.getSEQUENCE_NBR()).count();
if (exist) { if (exist) {
totalUpdate = totalUpdate + 1; totalUpdate = totalUpdate + 1;
} }
equipmentCategoryDto.setVersion(version); if (recordCount > 0) {
equipmentCategoryDto.setVersion(version);
} else {
// 数据库不存在的做标记
equipmentCategoryDto.setVersion("-1");
}
esEquipmentCategory.save(equipmentCategoryDto); esEquipmentCategory.save(equipmentCategoryDto);
} }
} }
...@@ -2792,8 +2797,9 @@ public class DataHandlerServiceImpl { ...@@ -2792,8 +2797,9 @@ public class DataHandlerServiceImpl {
private static void buildVersionFilter(String version, BoolQueryBuilder boolMust) { private static void buildVersionFilter(String version, BoolQueryBuilder boolMust) {
BoolQueryBuilder meBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder meBuilder = QueryBuilders.boolQuery();
meBuilder.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery("version", version))); meBuilder.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery("version", version, "-1")));
meBuilder.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery("version"))); meBuilder.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery("version")));
meBuilder.minimumShouldMatch(1);
boolMust.must(meBuilder); boolMust.must(meBuilder);
} }
......
...@@ -55,5 +55,5 @@ public interface IdxBizJgUseInfoMapper extends CustomBaseMapper<IdxBizJgUseInfo> ...@@ -55,5 +55,5 @@ public interface IdxBizJgUseInfoMapper extends CustomBaseMapper<IdxBizJgUseInfo>
List<String> selectEquipsClaimStatus(); List<String> selectEquipsClaimStatus();
List<String> selectUseInfoOfOneVersionWithParams(@Param("nextVersion") Integer nextVersion,@Param("params") Map<String, Object> params); List<String> selectUseInfoOfOneVersionWithParams(@Param("version") Integer version,@Param("params") Map<String, Object> params);
} }
...@@ -63,18 +63,18 @@ ...@@ -63,18 +63,18 @@
SELECT SELECT
u.record u.record
from from
"idx_biz_jg_use_info" u "idx_biz_jg_use_info" u,
"idx_biz_jg_supervision_info" s, "idx_biz_jg_supervision_info" s,
idx_biz_jg_register_info r idx_biz_jg_register_info r
where where
u.VERSION <![CDATA[ <> ]]> #{version} or u.VERSION is null (u.VERSION <![CDATA[ <> ]]> #{version} or u.VERSION is null)
and u."RECORD" = r."RECORD" and u."RECORD" = s."RECORD"
and s."RECORD" = r."RECORD" and s."RECORD" = r."RECORD"
<if test="params.equListCode != null and params.equListCode != ''"> <if test="params.equListCode != null and params.equListCode != ''">
and r."EQU_LIST" = '3000' and r."EQU_LIST" = #{params.equListCode}
</if> </if>
<if test="params.equCategoryCode != null and params.equCategoryCode != ''"> <if test="params.equCategoryCode != null and params.equCategoryCode != ''">
and r."EQU_CATEGORY" = #{params.equListCode} and r."EQU_CATEGORY" = #{params.equCategoryCode}
</if> </if>
<if test="params.orgBranchCode != null and params.orgBranchCode != ''"> <if test="params.orgBranchCode != null and params.orgBranchCode != ''">
and s."ORG_BRANCH_CODE" like concat(#{params.orgBranchCode},'%') and s."ORG_BRANCH_CODE" like concat(#{params.orgBranchCode},'%')
...@@ -90,17 +90,17 @@ ...@@ -90,17 +90,17 @@
SELECT SELECT
COALESCE(MAX(version),0) as version COALESCE(MAX(version),0) as version
FROM FROM
"idx_biz_jg_use_info" u "idx_biz_jg_use_info" u,
"idx_biz_jg_supervision_info" s, "idx_biz_jg_supervision_info" s,
idx_biz_jg_register_info r idx_biz_jg_register_info r
where where
u."RECORD" = r."RECORD" u."RECORD" = s."RECORD"
and s."RECORD" = r."RECORD" and s."RECORD" = r."RECORD"
<if test="params.equListCode != null and params.equListCode != ''"> <if test="params.equListCode != null and params.equListCode != ''">
and r."EQU_LIST" = '3000' and r."EQU_LIST" = #{params.equListCode}
</if> </if>
<if test="params.equCategoryCode != null and params.equCategoryCode != ''"> <if test="params.equCategoryCode != null and params.equCategoryCode != ''">
and r."EQU_CATEGORY" = #{params.equListCode} and r."EQU_CATEGORY" = #{params.equCategoryCode}
</if> </if>
<if test="params.orgBranchCode != null and params.orgBranchCode != ''"> <if test="params.orgBranchCode != null and params.orgBranchCode != ''">
and s."ORG_BRANCH_CODE" like concat(#{params.orgBranchCode},'%') and s."ORG_BRANCH_CODE" like concat(#{params.orgBranchCode},'%')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment