Commit 548485ce authored by tianbo's avatar tianbo

feat(jg): 添加西安历史有证设备数据同步到ES功能

- 在ComprehensiveStatisticalAnalysisController中为getDataDifferenceV2接口添加TycloudOperation注解 - 在DataHandlerController中新增addDbData2EsBatch接口用于同步西安历史有证设备数据到ES - 在DataHandlerServiceImpl中实现addDbData2EsBatch方法,包含ES数据对比和批量同步逻辑 - 添加getEsData方法用于从ES中查询所有SEQUENCE_NBR数据 - 实现findElementsInANotInB方法用于查找数据库有但ES中缺失的记录 - 在IdxBizJgUseInfoMapper.xml中扩展查询字段,添加problemStatus和关联安装检验信息的JOIN查询
parent 7d02aa63
...@@ -413,4 +413,11 @@ public class DataHandlerController extends BaseController { ...@@ -413,4 +413,11 @@ public class DataHandlerController extends BaseController {
return ResponseHelper.buildResponse(dataHandlerService.receiveOrgFix()); return ResponseHelper.buildResponse(dataHandlerService.receiveOrgFix());
} }
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "西安历史有证设备刷入es(es缺失设备)", notes = "西安历史有证设备刷入es(es缺失设备)")
@PutMapping(value = "/equip/addDbData2EsBatch")
public ResponseModel<Object> addDbData2EsBatch() {
dataHandlerService.addDbData2EsBatch();
return ResponseHelper.buildResponse(true);
}
} }
\ No newline at end of file
...@@ -110,6 +110,8 @@ public class DataHandlerServiceImpl { ...@@ -110,6 +110,8 @@ public class DataHandlerServiceImpl {
public static final String STATUS = "STATUS"; public static final String STATUS = "STATUS";
public static final String record = "record"; public static final String record = "record";
public static final String SEQUENCE_NBR = "SEQUENCE_NBR"; public static final String SEQUENCE_NBR = "SEQUENCE_NBR";
public static final String xaDataSource = "jg_his_xa";
private static final int BATCH_SIZE = 1000; private static final int BATCH_SIZE = 1000;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final JgRegistrationHistoryServiceImpl registrationHistoryService; private final JgRegistrationHistoryServiceImpl registrationHistoryService;
...@@ -195,6 +197,8 @@ public class DataHandlerServiceImpl { ...@@ -195,6 +197,8 @@ public class DataHandlerServiceImpl {
private final ReceiveOrgFixService receiveOrgFixService; private final ReceiveOrgFixService receiveOrgFixService;
private final IdxBizJgUseInfoMapper idxBizJgUseInfoMapper;
/** /**
* 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式 * 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式
* *
...@@ -2621,4 +2625,92 @@ public class DataHandlerServiceImpl { ...@@ -2621,4 +2625,92 @@ public class DataHandlerServiceImpl {
public Integer receiveOrgFix() { public Integer receiveOrgFix() {
return receiveOrgFixService.doFix(); return receiveOrgFixService.doFix();
} }
public void addDbData2EsBatch() {
log.info("开始同步西安历史有证设备数据到ES");
StopWatch watch = new StopWatch();
watch.start("同步西安历史有证设备数据到ES");
List<String> extraInEs;
// 获取索引
// 1. 查询数据库中所有jg_his_xa设备 record
LambdaQueryWrapper<IdxBizJgUseInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.select(IdxBizJgUseInfo::getRecord).eq(IdxBizJgUseInfo::getDataSource, xaDataSource);
List<IdxBizJgUseInfo> list = idxBizJgUseInfoMapper.selectList(lambdaQueryWrapper);
List<String> dbRecords = list.stream().map(IdxBizJgUseInfo::getRecord).collect(Collectors.toList());
// 2. 从ES中查询对应记录
List<String> esRecords = getEsData();
// 3. 对比差异(在DB没在ES的记录)
extraInEs = findElementsInANotInB(dbRecords, esRecords);
// 4.extraInEs分批从数据库获取数据写入ES,每批次1000
List<List<String>> extraInEsBatches = Lists.partition(extraInEs, 1000);
AtomicInteger batchIndex = new AtomicInteger(0);
extraInEsBatches.parallelStream().forEach(batch -> {
StopWatch batchWatch = new StopWatch();
batchWatch.start("批次" + batchIndex.get());
int currentIndex = batchIndex.getAndIncrement();
log.info("开始处理第 {} 个批次", currentIndex);
List<Map<String,Object>> batchData = idxBizJgUseInfoMapper.queryDetailBatch(batch);
List<ESEquipmentCategoryDto> esEquipmentInfos = batchData.parallelStream().map(data -> {
ESEquipmentCategoryDto esEquipmentInfo = null;
try {
esEquipmentInfo = new ESEquipmentCategoryDto();
StatisticsDataUpdateService.formatUseDate(data);
BeanUtil.copyProperties(data, esEquipmentInfo, true);
} catch (Exception e) {
log.error("批次{}设备刷数据处理失败:{}", currentIndex, record, e);
}
return esEquipmentInfo;
}).collect(Collectors.toList());
esEquipmentInfos = esEquipmentInfos.stream().filter(e -> StringUtils.isNotEmpty(e.getSEQUENCE_NBR())).collect(Collectors.toList());
if (!esEquipmentInfos.isEmpty()) {
esEquipmentCategory.saveAll(esEquipmentInfos);
}
batchWatch.stop();
log.info("批次{}处理完成,耗时:{}s", currentIndex, batchWatch.getTotalTimeSeconds());
});
watch.stop();
log.info("同步西安历史有证设备数据到ES结束,耗时:{}s", watch.getTotalTimeSeconds());
}
/**
* 从 ES 中拉取索引中所有的 SEQUENCE_NBR
*/
private List<String> getEsData() {
// 分批拉取文档
log.info("ES 查询开始, index={}, 时间={}", DataHandlerServiceImpl.IDX_BIZ_VIEW_JG_ALL, System.currentTimeMillis());
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("DATA_SOURCE", xaDataSource));
// 线程安全的结果容器
List<String> esIds = Collections.synchronizedList(new ArrayList<>());
try {
esSearchService.searchResponseInBatch(DataHandlerServiceImpl.IDX_BIZ_VIEW_JG_ALL, boolQuery, 2000, searchHits -> {
// 每个批次只解析需要的字段
List<String> batchIds = searchHits.stream()
.map(hit -> {
ESEquipmentCategoryDto dto = JSONObject.parseObject(hit.getSourceAsString(), ESEquipmentCategoryDto.class);
return dto == null ? null : dto.getSEQUENCE_NBR();
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
esIds.addAll(batchIds);
});
} catch (Exception ex) {
log.error("ES 查询异常, index={}, 错误={}", DataHandlerServiceImpl.IDX_BIZ_VIEW_JG_ALL, ex.getMessage(), ex);
throw new RuntimeException("ES 查询失败: " + ex.getMessage(), ex);
}
// 去重并返回
return esIds.stream().distinct().collect(Collectors.toList());
}
public static List<String> findElementsInANotInB(List<String> listA, List<String> listB) {
Set<String> bSet = Collections.synchronizedSet(new HashSet<>(listB));
return listA.parallelStream()
.filter(element -> !bSet.contains(element))
.collect(Collectors.toList());
}
} }
...@@ -525,6 +525,7 @@ public class ComprehensiveStatisticalAnalysisController extends BaseController { ...@@ -525,6 +525,7 @@ public class ComprehensiveStatisticalAnalysisController extends BaseController {
* 获取es和数据库中设备数据差异V2 * 获取es和数据库中设备数据差异V2
* @return Object * @return Object
*/ */
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@GetMapping(value = "/getDataDifferenceV2") @GetMapping(value = "/getDataDifferenceV2")
@ApiOperation(httpMethod = "GET", value = "获取es和数据库数据差异V2", notes = "获取es和数据库数据差异V2") @ApiOperation(httpMethod = "GET", value = "获取es和数据库数据差异V2", notes = "获取es和数据库数据差异V2")
public ResponseModel<Object> getDataDifferenceV2() { public ResponseModel<Object> getDataDifferenceV2() {
......
...@@ -122,6 +122,7 @@ ...@@ -122,6 +122,7 @@
ibjui."EQU_STATE", ibjui."EQU_STATE",
ibjui."IS_INTO_MANAGEMENT", ibjui."IS_INTO_MANAGEMENT",
ibjoi."CLAIM_STATUS" AS "STATUS", ibjoi."CLAIM_STATUS" AS "STATUS",
ibjoi."STATUS" AS problemStatus,
ibjfi."FACTORY_NUM", ibjfi."FACTORY_NUM",
ibjfi."PRODUCE_UNIT_NAME", ibjfi."PRODUCE_UNIT_NAME",
ibjfi."PRODUCE_UNIT_CREDIT_CODE", ibjfi."PRODUCE_UNIT_CREDIT_CODE",
...@@ -138,6 +139,24 @@ ...@@ -138,6 +139,24 @@
di."DESIGN_DATE" di."DESIGN_DATE"
FROM FROM
idx_biz_jg_use_info ibjui idx_biz_jg_use_info ibjui
LEFT JOIN (
SELECT DISTINCT ON ("RECORD")
"RECORD",
"USC_UNIT_CREDIT_CODE",
"USC_UNIT_NAME",
"CONSTRUCTION_TYPE",
"USC_DATE"
FROM amos_tzs_biz.idx_biz_jg_construction_info
ORDER BY "RECORD", "REC_DATE" DESC NULLS LAST
) construction_info ON ibjui."RECORD" = construction_info."RECORD"
LEFT JOIN (
SELECT DISTINCT ON ("RECORD")
"RECORD",
"INSPECT_REPORT",
"NEXT_INSPECT_DATE"
FROM amos_tzs_biz.idx_biz_jg_inspection_detection_info
ORDER BY "RECORD", "INSPECT_DATE" DESC NULLS LAST
) inspection_info ON ibjui."RECORD" = inspection_info."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_supervision_info ibjsi ON ibjui."RECORD" = ibjsi."RECORD" LEFT JOIN amos_tzs_biz.idx_biz_jg_supervision_info ibjsi ON ibjui."RECORD" = ibjsi."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_register_info ibjri ON ibjui."RECORD" = ibjri."RECORD" LEFT JOIN amos_tzs_biz.idx_biz_jg_register_info ibjri ON ibjui."RECORD" = ibjri."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_other_info ibjoi ON ibjui."RECORD" = ibjoi."RECORD" LEFT JOIN amos_tzs_biz.idx_biz_jg_other_info ibjoi ON ibjui."RECORD" = ibjoi."RECORD"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment