Commit 90a9b502 authored by tianbo's avatar tianbo

feat(data): 优化数据库设备数据同步到ES功能

- 在DataDockServiceImpl中添加监管机构处理逻辑 - 修改DataHandlerController中的API接口,支持参数传递 - 更新DataHandlerServiceImpl实现,支持按数据源查询和指定记录同步 - 添加对设备属地监管部门字段的支持 - 优化同步逻辑,支持批量处理和按需同步功能
parent 73448b6a
......@@ -991,4 +991,7 @@ public class XiAnEquipInfoExcelDto extends BaseDto {
@ExcelIgnore
private Integer isCompleteXa;
@ApiModelProperty(value = "属地监管部门")
@ExcelProperty(value = "属地监管部门")
private String orgBranchName;
}
\ No newline at end of file
......@@ -16,6 +16,7 @@ import org.typroject.tyboot.core.restful.utils.ResponseModel;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* 用于业务变更过程中历史数据处理的控制层
......@@ -414,10 +415,10 @@ public class DataHandlerController extends BaseController {
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "西安历史有证设备刷入es(es缺失设备)", notes = "西安历史有证设备刷入es(es缺失设备)")
@PutMapping(value = "/equip/addDbData2EsBatch")
public ResponseModel<Object> addDbData2EsBatch() {
dataHandlerService.addDbData2EsBatch();
@ApiOperation(httpMethod = "PUT", value = "数据库设备刷入es(es缺失设备)", notes = "数据库设备刷入es(es缺失设备)")
@PostMapping(value = "/equip/addDbData2EsBatch")
public ResponseModel<Object> addDbData2EsBatch(@RequestBody Map<String, String> paramMap) {
dataHandlerService.addDbData2EsBatch(paramMap);
return ResponseHelper.buildResponse(true);
}
}
\ No newline at end of file
......@@ -535,6 +535,7 @@ public class DataDockServiceImpl {
}
supervisionInfo.setRecord(record);
supervisionInfo.setRecDate(new Date());
this.handleSupervisionOffice(equ);
if (!ValidationUtil.isEmpty(equ.get("orgBranchCode"))){
supervisionInfo.setOrgBranchCode(String.valueOf(equ.get("orgBranchCode")));
supervisionInfo.setOrgBranchName(String.valueOf(equ.get("orgBranchName")));
......@@ -2796,7 +2797,7 @@ public class DataDockServiceImpl {
this.saveFactoryInfo(equ, record);
IdxBizJgRegisterInfo registerInfo = this.saveRegisterInfo(equ, record, equCategory, null);
// 西安导入电梯属地监管部门处理
this.handleSupervisionOffice(equ);
// this.handleSupervisionOffice(equ);
this.saveSupervisionInfo(equ, record);
IdxBizJgOtherInfo otherInfo = this.saveOtherInfo(equ, record, equList);
this.saveInspectInfo(equ, record);
......
......@@ -3,6 +3,7 @@ package com.yeejoin.amos.boot.module.jg.biz.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
......@@ -93,6 +94,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.alibaba.fastjson.JSON.toJSONString;
import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNoticeServiceImpl.CONSTRUCTION_TYPE;
......@@ -111,7 +113,6 @@ public class DataHandlerServiceImpl {
public static final String record = "record";
public static final String SEQUENCE_NBR = "SEQUENCE_NBR";
public static final String xaDataSource = "jg_his_xa";
private static final int BATCH_SIZE = 1000;
private final ObjectMapper objectMapper;
private final JgRegistrationHistoryServiceImpl registrationHistoryService;
......@@ -2626,26 +2627,40 @@ public class DataHandlerServiceImpl {
return receiveOrgFixService.doFix();
}
public void addDbData2EsBatch() {
log.info("开始同步西安历史有证设备数据到ES");
public void addDbData2EsBatch(Map<String, String> paramMap) {
log.info("开始同步数据库设备数据到ES");
StopWatch watch = new StopWatch();
watch.start("同步西安历史有证设备数据到ES");
watch.start("同步设备数据到ES");
String dataSource = MapUtil.getStr(paramMap, "dataSource");
if (!ValidationUtil.isEmpty(MapUtil.getStr(paramMap, "records"))) {
List<String> records = Stream.of(MapUtil.getStr(paramMap, "records").split(",")).map(String::trim).collect(Collectors.toList());
queryAndSave2Es(records);
} else {
List<String> extraInEs;
// 获取索引
// 1. 查询数据库中所有jg_his_xa设备 record
List<String> dbRecords;
// 1. 查询数据库中所有设备 record
LambdaQueryWrapper<IdxBizJgUseInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.select(IdxBizJgUseInfo::getRecord).eq(IdxBizJgUseInfo::getDataSource, xaDataSource);
lambdaQueryWrapper.select(IdxBizJgUseInfo::getRecord).likeLeft(!ValidationUtil.isEmpty(dataSource), IdxBizJgUseInfo::getDataSource, dataSource);
List<IdxBizJgUseInfo> list = idxBizJgUseInfoMapper.selectList(lambdaQueryWrapper);
List<String> dbRecords = list.stream().map(IdxBizJgUseInfo::getRecord).collect(Collectors.toList());
dbRecords = list.stream().map(IdxBizJgUseInfo::getRecord).collect(Collectors.toList());
// 2. 从ES中查询对应记录
List<String> esRecords = getEsData();
List<String> esRecords = getEsData(dataSource);
// 3. 对比差异(在DB没在ES的记录)
extraInEs = findElementsInANotInB(dbRecords, esRecords);
queryAndSave2Es(extraInEs);
}
watch.stop();
log.info("同步数据库设备数据到ES结束,耗时:{}s", watch.getTotalTimeSeconds());
}
private void queryAndSave2Es(List<String> records) {
// 4.extraInEs分批从数据库获取数据写入ES,每批次1000
List<List<String>> extraInEsBatches = Lists.partition(extraInEs, 1000);
List<List<String>> extraInEsBatches = Lists.partition(records, 1000);
AtomicInteger batchIndex = new AtomicInteger(0);
extraInEsBatches.parallelStream().forEach(batch -> {
StopWatch batchWatch = new StopWatch();
......@@ -2671,18 +2686,19 @@ public class DataHandlerServiceImpl {
batchWatch.stop();
log.info("批次{}处理完成,耗时:{}s", currentIndex, batchWatch.getTotalTimeSeconds());
});
watch.stop();
log.info("同步西安历史有证设备数据到ES结束,耗时:{}s", watch.getTotalTimeSeconds());
}
/**
* 从 ES 中拉取索引中所有的 SEQUENCE_NBR
*/
private List<String> getEsData() {
private List<String> getEsData(String dataSource) {
// 分批拉取文档
log.info("ES 查询开始, index={}, 时间={}", DataHandlerServiceImpl.IDX_BIZ_VIEW_JG_ALL, System.currentTimeMillis());
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("DATA_SOURCE", xaDataSource));
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (!ValidationUtil.isEmpty(dataSource)) {
boolQuery = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("DATA_SOURCE", dataSource));
}
// 线程安全的结果容器
List<String> esIds = Collections.synchronizedList(new ArrayList<>());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment