Commit 8d2ad62a authored by 刘林's avatar 刘林

fix:(大屏):大屏数据处理接口修改

parent 63a7ed76
......@@ -442,4 +442,11 @@ public class DataHandlerController extends BaseController {
return ResponseHelper.buildResponse(dataHandlerService.synEquipFromDb2Es(equListCode, equCategoryCode, orgBranchCode));
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@PutMapping(value = "/insert/equip/db2newes")
@ApiOperation(httpMethod = "PUT", value = "1.添加数据库设备到ES新索引", notes = "添加数据库设备到ES新索引")
public ResponseModel<Integer> insertEquipFromDb2NewEs(@RequestBody Map<String, String> paramMap) {
return ResponseHelper.buildResponse(dataHandlerService.insertEquipFromDb2NewEs(paramMap));
}
}
\ No newline at end of file
......@@ -49,6 +49,7 @@ import com.yeejoin.amos.boot.module.jg.biz.feign.TzsServiceFeignClient;
import com.yeejoin.amos.boot.module.jg.biz.handler.strategy.ProblemHandleStrategy;
import com.yeejoin.amos.boot.module.jg.biz.listener.SafetyProblemTopicMessage;
import com.yeejoin.amos.boot.module.jg.biz.refresh.StatisticsDataUpdateService;
import com.yeejoin.amos.boot.module.jg.biz.refresh.handler.EquipmentRefreshHandler;
import com.yeejoin.amos.boot.module.jg.biz.reminder.core.event.EquipCreateOrEditEvent;
import com.yeejoin.amos.boot.module.ymt.api.dto.EquipWaitRefreshDataQualityScore;
import com.yeejoin.amos.boot.module.ymt.api.dto.ProjectWaitRefreshDataQualityScore;
......@@ -70,6 +71,7 @@ import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Value;
......@@ -206,6 +208,14 @@ public class DataHandlerServiceImpl {
private final EsBulkService esBulkService;
private final EquipmentRefreshHandler refreshHandler;
private final IdxBizJgSupervisionInfoMapper idxBizJgSupervisionInfoMapper;
private final IdxBizJgMaintenanceRecordInfoMapper maintenanceRecordInfoMapper;
private final IdxBizJgInspectionDetectionInfoMapper inspectionDetectionInfoMapper;
/**
* 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式
*
......@@ -2767,7 +2777,7 @@ public class DataHandlerServiceImpl {
request.source(builder);
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
List<ESEquipmentCategoryDto> esEquipmentCategoryDtos = new ArrayList<>();
for (org.elasticsearch.search.SearchHit hit : response.getHits()) {
for (SearchHit hit : response.getHits()) {
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(hit);
ESEquipmentCategoryDto equipmentCategoryDto = JSONObject.toJavaObject(jsonObject.getJSONObject("sourceAsMap"), ESEquipmentCategoryDto.class);
Integer recordCount = useInfoService.lambdaQuery().eq(IdxBizJgUseInfo::getRecord, equipmentCategoryDto.getSEQUENCE_NBR()).count();
......@@ -2807,4 +2817,107 @@ public class DataHandlerServiceImpl {
p.put("orgBranchCode", orgBranchCode);
return equipInsert2EsPatcher.patchBatchData(p);
}
public Integer insertEquipFromDb2NewEs(Map<String, String> paramMap) {
List<String> refreshRecords = Collections.emptyList();
if (!ValidationUtil.isEmpty(MapUtil.getStr(paramMap, "records"))) {
refreshRecords = Stream.of(MapUtil.getStr(paramMap, "records").split(",")).map(String::trim).collect(Collectors.toList());
this.patchBatchRecord(refreshRecords);
}
return refreshRecords.size();
}
protected void patchBatchRecord(List<String> refreshRecords) {
log.info("批量处理设备信息到es开始");
StopWatch watch = new StopWatch();
watch.start("批量查询设备信息");
List<Map<String, Object>> details = idxBizJgUseInfoMapper.queryDetailBatch(refreshRecords);
watch.stop();
Map<String, Map<String, Object>> recordDetailMap = details.stream().collect(Collectors.toMap(e -> (String) e.get("SEQUENCE_NBR"), Function.identity(), (k1, k2) -> k2));
// watch.start("组装es设备老索引更新及新增的对象数据");
// // 组装es设备老索引更新及新增的对象数据
// List<ESEquipmentCategoryDto> esEquipmentCategoryDtos = getEsEquipmentCategoryDtos(refreshRecords, recordDetailMap);
// watch.stop();
watch.start("组装es设备新索引更新及新增的对象数据");
// 组装es设备新索引更新及新增的对象数据
List<ESEquipmentInfo> esEquipmentInfos = getEsEquipmentInfos(refreshRecords, recordDetailMap);
watch.stop();
watch.start("es设备新旧索引保存");
// 多线程保存
List<CompletableFuture<Void>> futures = new ArrayList<>();
// if (!esEquipmentCategoryDtos.isEmpty()) {
// futures.add(CompletableFuture.runAsync(() -> {
// StopWatch watch4 = new StopWatch();
// watch4.start();
// esBulkService.bulkUpsert(IDX_BIZ_VIEW_JG_ALL, esEquipmentCategoryDtos.stream().map(e -> new EsEntity<>(e.getSEQUENCE_NBR(), e)).collect(Collectors.toList()));
// watch4.stop();
// log.warn("[设备老索引] 批量入库 {} 条,耗时 {}s",
// esEquipmentCategoryDtos.size(), watch4.getTotalTimeSeconds());
// }));
// }
if (!esEquipmentInfos.isEmpty()) {
futures.add(CompletableFuture.runAsync(() -> {
StopWatch watch4 = new StopWatch();
watch4.start();
esBulkService.bulkUpsert(IDX_BIZ_EQUIPMENT_INFO, esEquipmentInfos.stream().map(e -> new EsEntity<>(e.getSEQUENCE_NBR(), e)).collect(Collectors.toList()));
watch4.stop();
log.warn("[设备新索引] 批量入库 {} 条,耗时 {}s",
esEquipmentInfos.size(), watch4.getTotalTimeSeconds());
}));
}
// 等待所有任务完成(阻塞当前线程)
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
watch.stop();
log.warn("新索引数据补充匹配补充,总耗时情况:{}", watch.prettyPrint());
}
private List<ESEquipmentInfo> getEsEquipmentInfos(List<String> refreshRecords, Map<String, Map<String, Object>> recordDetailMap) {
// 设备最新的维保信息-维度:设备
List<IdxBizJgMaintenanceRecordInfo> lastMaintenanceRecordInfos = maintenanceRecordInfoMapper.selectLastedMainInfoBatch(refreshRecords);
Map<String, List<IdxBizJgMaintenanceRecordInfo>> recordLastMaintMap = lastMaintenanceRecordInfos.stream().collect(Collectors.groupingBy(IdxBizJgMaintenanceRecordInfo::getRecord));
// 设备、各检验类型下最新的检验信息-维度:设备、检验类型
List<IdxBizJgInspectionDetectionInfo> lastedInspectInfosGroupByInspectType = inspectionDetectionInfoMapper.selectLastedGroupByInspectTypeBatch(refreshRecords);
Map<String, List<IdxBizJgInspectionDetectionInfo>> recordInspectInfosGroupByInspectTypeMap = lastedInspectInfosGroupByInspectType.stream().collect(Collectors.groupingBy(IdxBizJgInspectionDetectionInfo::getRecord));
// 设备最新的检验信息-维度:设备
Map<String, Optional<IdxBizJgInspectionDetectionInfo>> recordLastInspectionMap = lastedInspectInfosGroupByInspectType.stream().filter(e -> e.getNextInspectDate() != null).collect(Collectors.groupingBy(IdxBizJgInspectionDetectionInfo::getRecord, Collectors.maxBy(Comparator.comparing(IdxBizJgInspectionDetectionInfo::getNextInspectDate))));
List<ESEquipmentInfo> esEquipmentInfos = refreshRecords.parallelStream().map(record -> {
ESEquipmentInfo esEquipmentInfo = null;
try {
esEquipmentInfo = new ESEquipmentInfo();
Map<String, Object> detail = recordDetailMap.get(record);
StatisticsDataUpdateService.formatUseDate(detail);
BeanUtil.copyProperties(detail, esEquipmentInfo, true);
// 最新检验信息-维度record
IdxBizJgInspectionDetectionInfo inspectionDetectionInfo = Optional.ofNullable(recordLastInspectionMap.get(record)).flatMap(i -> i).orElse(new IdxBizJgInspectionDetectionInfo());
// 最新维保信息-维度record
IdxBizJgMaintenanceRecordInfo lastMaintenanceRecordInfo = Optional.ofNullable(recordLastMaintMap.get(record)).filter(l -> !l.isEmpty()).map(list -> list.get(0)).orElse(new IdxBizJgMaintenanceRecordInfo());
// 最新检验信息-维度record、检验类型,存最新的一条
List<IdxBizJgInspectionDetectionInfo> inspectionDetectionInfos = recordInspectInfosGroupByInspectTypeMap.getOrDefault(record, new ArrayList<>());
StatisticsDataUpdateService.formatInspectDate(esEquipmentInfo, inspectionDetectionInfo, record);
esEquipmentInfo.setIssueDate(refreshHandler.getIssueDate(esEquipmentInfo.getUSE_ORG_CODE()));
esEquipmentInfo.setMAINTAIN_UNIT(lastMaintenanceRecordInfo.getMeUnitCreditCode());
esEquipmentInfo.setMAINTAIN_UNIT_NAME(lastMaintenanceRecordInfo.getMeUnitName());
esEquipmentInfo.setInspections(BeanUtil.copyToList(inspectionDetectionInfos, ESEquipmentInfo.Inspection.class));
esEquipmentInfo.setMaintenances(lastMaintenanceRecordInfo.getSequenceNbr() != null ? Collections.singletonList(BeanUtil.copyProperties(lastMaintenanceRecordInfo, ESEquipmentInfo.Maintenance.class)) : new ArrayList<>());
esEquipmentInfo.setTechParams(refreshHandler.buildTechParamByEquList(record, esEquipmentInfo.getEQU_LIST_CODE()));
if ("8000".equals(esEquipmentInfo.getEQU_LIST_CODE())) {
List<ESEquipmentInfo.TechParam> techParams = esEquipmentInfo.getTechParams();
List<ESEquipmentInfo.TechParam> pipeLength = techParams.stream().filter(e -> e.getParamKey().equals("pipeLength") && e.getDoubleValue() != null).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(pipeLength)) {
esEquipmentInfo.setPipeLength(pipeLength.get(0).getDoubleValue());
}
}
} catch (Exception e) {
// 异常数据跳过
log.error("准备新设备索引数据失败:{}", record, e);
}
return esEquipmentInfo;
}).collect(Collectors.toList());
esEquipmentInfos.remove(null);
esEquipmentInfos = esEquipmentInfos.stream().filter(e -> StringUtils.isNotEmpty(e.getSEQUENCE_NBR())).collect(Collectors.toList());
return esEquipmentInfos;
}
}
......@@ -528,7 +528,10 @@ public class ComprehensiveStatisticalAnalysisController extends BaseController {
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@GetMapping(value = "/getDataDifferenceV2")
@ApiOperation(httpMethod = "GET", value = "获取es和数据库数据差异V2", notes = "获取es和数据库数据差异V2")
public ResponseModel<Object> getDataDifferenceV2() {
return ResponseHelper.buildResponse(statisticalAnalysisService.getDataDifferenceV2());
public ResponseModel<Object> getDataDifferenceV2(@RequestParam(value = "index") String index,
@RequestParam(value = "equListCode", required = false) String equListCode,
@RequestParam(value = "equCategoryCode", required = false) String equCategoryCode,
@RequestParam(value = "orgCode") String orgCode) {
return ResponseHelper.buildResponse(statisticalAnalysisService.getDataDifferenceV2(index, equListCode, equCategoryCode, orgCode));
}
}
......@@ -4715,44 +4715,60 @@ public class ComprehensiveStatisticalAnalysisServiceImpl {
/**
* 获取数据库与ES数据差异:找出ES比数据库多的记录ID
*/
public Object getDataDifferenceV2() {
public Object getDataDifferenceV2(String index, String equListCode, String equCategoryCode, String orgCode) {
Map<String, Object> result = new HashMap<>();
List<String> extraInEs;
List<String> extraInDb;
// 获取索引
String index = "idx_biz_view_jg_all";
// 1. 查询数据库中所有设备 record
//List<String> dbRecords = idxBizJgUseInfoMapper.selectEquipsClaimStatus();
LambdaQueryWrapper<IdxBizJgUseInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.select(IdxBizJgUseInfo::getRecord);
List<IdxBizJgUseInfo> list = idxBizJgUseInfoMapper.selectList(lambdaQueryWrapper);
List<String> dbRecords = list.stream().map(IdxBizJgUseInfo::getRecord).collect(Collectors.toList());
// LambdaQueryWrapper<IdxBizJgUseInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
// lambdaQueryWrapper.select(IdxBizJgUseInfo::getRecord);
// lambdaQueryWrapper.eq(IdxBizJgUseInfo::getRecord, index);
List<String> dbRecords = idxBizJgUseInfoMapper.selectEquips(equListCode, equCategoryCode, orgCode);
log.info("数据库设备记录数: {}", dbRecords.size());
// 2. 从ES中查询对应记录
List<String> esRecords = getEsDataV2(index);
List<String> esRecords = getEsDataV2(index, equListCode, equCategoryCode, orgCode);
log.info("ES设备记录数: {}", esRecords.size());
// 3. 对比差异(ES比DB多的记录)
extraInEs = findElementsInANotInB(esRecords, dbRecords);
log.info("ES比数据库多的记录数: {}", extraInEs.size());
extraInDb = findElementsInANotInB(dbRecords, esRecords);
result.put("extraInEs", extraInEs);
result.put("extraIndb", extraInDb);
return result;
}
/**
* 从 ES 中拉取索引中所有的 SEQUENCE_NBR(match_all)
*/
private List<String> getEsDataV2(String index) {
private List<String> getEsDataV2(String index, String equListCode, String equCategoryCode, String orgCode) {
// 使用 match_all 查询,拉取所有文档(通过 scroll 分批)
log.info("ES 全量查询开始, index={}, 时间={}", index, System.currentTimeMillis());
//BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
// .must(QueryBuilders.termsQuery("STATUS", Collections.singletonList("已认领")));
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.matchAllQuery());
if (index.equals("idx_biz_view_jg_all")){
boolQuery.must(QueryBuilders.wildcardQuery("ORG_BRANCH_CODE.keyword", QueryParser.escape(orgCode) + "*"));
}else{
boolQuery.must(QueryBuilders.wildcardQuery("ORG_BRANCH_CODE", QueryParser.escape(orgCode) + "*"));
}
// 设备种类编码
if (!ObjectUtils.isEmpty(equListCode)) {
BoolQueryBuilder elcBuilder = QueryBuilders.boolQuery();
elcBuilder.must(QueryBuilders.matchPhraseQuery("EQU_LIST_CODE", equListCode));
boolQuery.must(elcBuilder);
}
//设备类别编码
if (!ObjectUtils.isEmpty(equCategoryCode)) {
BoolQueryBuilder elcBuilder = QueryBuilders.boolQuery();
elcBuilder.must(QueryBuilders.matchPhraseQuery("EQU_CATEGORY_CODE", equCategoryCode));
boolQuery.must(elcBuilder);
}
// 线程安全的结果容器
List<String> esIds = Collections.synchronizedList(new ArrayList<>());
try {
esSearchService.searchResponseInBatch(index, boolQuery, 2000, searchHits -> {
// 每个批次只解析需要的字段
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment