Commit c9772102 authored by suhuiguang's avatar suhuiguang

feat(jg): 数据核对

1.数据同步接口性能优化
parent f2c6eeaa
...@@ -436,7 +436,7 @@ public class DataHandlerController extends BaseController { ...@@ -436,7 +436,7 @@ public class DataHandlerController extends BaseController {
@TycloudOperation(ApiLevel = UserType.AGENCY) @TycloudOperation(ApiLevel = UserType.AGENCY)
@PutMapping(value = "/sync/db2es/equipments") @PutMapping(value = "/sync/db2es/equipments")
@ApiOperation(httpMethod = "PUT", value = "2.同步设备数据到es新旧索引", notes = "同步设备数据到es新旧索引") @ApiOperation(httpMethod = "PUT", value = "2.同步设备数据到es新旧索引", notes = "同步设备数据到es新旧索引")
public ResponseModel<Integer> synEquipFromDb2Es(@ApiParam(value = "设备种类code") @RequestParam String equListCode, public ResponseModel<Integer> syncEquipFromDb2Es(@ApiParam(value = "设备种类code") @RequestParam String equListCode,
@ApiParam(value = "设备类别code") @RequestParam(required = false) String equCategoryCode, @ApiParam(value = "设备类别code") @RequestParam(required = false) String equCategoryCode,
@ApiParam(value = "属地code") @RequestParam String orgBranchCode) { @ApiParam(value = "属地code") @RequestParam String orgBranchCode) {
return ResponseHelper.buildResponse(dataHandlerService.synEquipFromDb2Es(equListCode, equCategoryCode, orgBranchCode)); return ResponseHelper.buildResponse(dataHandlerService.synEquipFromDb2Es(equListCode, equCategoryCode, orgBranchCode));
......
...@@ -24,28 +24,41 @@ public abstract class FilterableBatchDataPatcher implements HistoricalDataPatche ...@@ -24,28 +24,41 @@ public abstract class FilterableBatchDataPatcher implements HistoricalDataPatche
IdxBizJgUseInfoServiceImpl useInfoService = applicationContext.getBean(IdxBizJgUseInfoServiceImpl.class); IdxBizJgUseInfoServiceImpl useInfoService = applicationContext.getBean(IdxBizJgUseInfoServiceImpl.class);
Integer maxVersion = useInfoService.getBaseMapper().selectMaxVersionWithParams(buildFilter(params)); Integer maxVersion = useInfoService.getBaseMapper().selectMaxVersionWithParams(buildFilter(params));
Integer nextVersion = maxVersion + 1; Integer nextVersion = maxVersion + 1;
List<String> refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion,buildFilter(params)); List<String> refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion, buildFilter(params));
int patchSize = refreshRecords.size(); int patchSize = refreshRecords.size();
while (!refreshRecords.isEmpty()) { while (!refreshRecords.isEmpty()) {
refreshRecords.parallelStream().forEach(record -> { try {
try { refreshRecords.parallelStream().forEach(record -> {
beforePatching(record); try {
patchSingleRecord(record); beforePatching(record);
afterPatching(record); patchSingleRecord(record);
} catch (Exception e) { afterPatching(record);
// 异常数据跳过 } catch (Exception e) {
log.error("数据修补失败,设备:{}", record, e); // 异常数据跳过
} log.error("单个方式数据修补失败,设备:{}", record, e);
}); }
useInfoService.getBaseMapper().updateVersionBatch(refreshRecords, nextVersion); });
refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion,buildFilter(params)); patchBatchRecord(refreshRecords);
patchSize = patchSize + refreshRecords.size(); } catch (Exception e) {
// 本批次异常数据跳过
log.error("数据修补失败,设备:{}", refreshRecords, e);
} finally {
StopWatch watch1 = new StopWatch();
watch1.start();
useInfoService.getBaseMapper().updateVersionBatch(refreshRecords, nextVersion);
watch1.stop();
log.info("版本号批量更新条数:「{}」, 耗时:「{}」", refreshRecords.size(), watch1.getTotalTimeSeconds());
refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion, buildFilter(params));
patchSize = patchSize + refreshRecords.size();
}
} }
watch.stop(); watch.stop();
log.info("数据修补完成,共处理{}条记录,耗时: {}秒", patchSize, watch.getTotalTimeSeconds()); log.warn("数据修补完成,共处理{}条记录,耗时: {}秒", patchSize, watch.getTotalTimeSeconds());
return patchSize; return patchSize;
} }
protected abstract void patchBatchRecord(List<String> refreshRecords);
protected abstract Map<String, Object> buildFilter(Map<String, Object> params); protected abstract Map<String, Object> buildFilter(Map<String, Object> params);
protected abstract void beforePatching(String record); protected abstract void beforePatching(String record);
......
...@@ -88,11 +88,11 @@ public class EquipmentRefreshHandler implements IDataRefreshHandler { ...@@ -88,11 +88,11 @@ public class EquipmentRefreshHandler implements IDataRefreshHandler {
} }
private List<ESEquipmentInfo.TechParam> buildTechParamByEquList(String record, String equListCode) { public List<ESEquipmentInfo.TechParam> buildTechParamByEquList(String record, String equListCode) {
return StringUtils.isNotEmpty(equListCode) ? statisticsDataUpdateService.getTechParams(equListCode, record) : new ArrayList<>(); return StringUtils.isNotEmpty(equListCode) ? statisticsDataUpdateService.getTechParams(equListCode, record) : new ArrayList<>();
} }
private LocalDate getIssueDate(String useRegistrationCode) { public LocalDate getIssueDate(String useRegistrationCode) {
if (StringUtils.isEmpty(useRegistrationCode)) { if (StringUtils.isEmpty(useRegistrationCode)) {
return null; return null;
} }
......
...@@ -27,10 +27,7 @@ import com.yeejoin.amos.boot.module.common.api.dao.EsBaseEnterpriseInfoDao; ...@@ -27,10 +27,7 @@ import com.yeejoin.amos.boot.module.common.api.dao.EsBaseEnterpriseInfoDao;
import com.yeejoin.amos.boot.module.common.api.dao.EsEquipmentDao; import com.yeejoin.amos.boot.module.common.api.dao.EsEquipmentDao;
import com.yeejoin.amos.boot.module.common.api.dao.EsUserInfoDao; import com.yeejoin.amos.boot.module.common.api.dao.EsUserInfoDao;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto; import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.entity.ESEquipmentInfo; import com.yeejoin.amos.boot.module.common.api.entity.*;
import com.yeejoin.amos.boot.module.common.api.entity.EsBaseEnterpriseInfo;
import com.yeejoin.amos.boot.module.common.api.entity.EsUserInfo;
import com.yeejoin.amos.boot.module.common.api.entity.TzsUserPermission;
import com.yeejoin.amos.boot.module.common.api.enums.CylinderTypeEnum; import com.yeejoin.amos.boot.module.common.api.enums.CylinderTypeEnum;
import com.yeejoin.amos.boot.module.common.biz.refresh.cm.RefreshCmService; import com.yeejoin.amos.boot.module.common.biz.refresh.cm.RefreshCmService;
import com.yeejoin.amos.boot.module.common.biz.service.impl.EsSearchServiceImpl; import com.yeejoin.amos.boot.module.common.biz.service.impl.EsSearchServiceImpl;
...@@ -207,6 +204,8 @@ public class DataHandlerServiceImpl { ...@@ -207,6 +204,8 @@ public class DataHandlerServiceImpl {
private final IdxBizJgUseInfoMapper idxBizJgUseInfoMapper; private final IdxBizJgUseInfoMapper idxBizJgUseInfoMapper;
private final EsBulkService esBulkService;
/** /**
* 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式 * 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式
* *
...@@ -2783,6 +2782,7 @@ public class DataHandlerServiceImpl { ...@@ -2783,6 +2782,7 @@ public class DataHandlerServiceImpl {
if (!esEquipmentCategoryDtos.isEmpty()) { if (!esEquipmentCategoryDtos.isEmpty()) {
totalUpdate = esEquipmentCategoryDtos.size() + totalUpdate; totalUpdate = esEquipmentCategoryDtos.size() + totalUpdate;
esEquipmentCategory.saveAll(esEquipmentCategoryDtos); esEquipmentCategory.saveAll(esEquipmentCategoryDtos);
esBulkService.bulkUpsert(IDX_BIZ_VIEW_JG_ALL, esEquipmentCategoryDtos.stream().map(e-> new EsEntity<>(e.getSEQUENCE_NBR(), e)).collect(Collectors.toList()));
idxBizJgSupervisionInfoServiceImpl.getBaseMapper().updateOrgBranchCodeBatch(esEquipmentCategoryDtos); idxBizJgSupervisionInfoServiceImpl.getBaseMapper().updateOrgBranchCodeBatch(esEquipmentCategoryDtos);
} }
} }
......
package com.yeejoin.amos.boot.module.jg.biz.service.impl; package com.yeejoin.amos.boot.module.jg.biz.service.impl;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.boot.module.common.api.entity.EsEntity; import com.yeejoin.amos.boot.module.common.api.entity.EsEntity;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.Map;
@Component @Component
...@@ -22,15 +21,18 @@ public class EsBulkService { ...@@ -22,15 +21,18 @@ public class EsBulkService {
private final RestHighLevelClient restHighLevelClient; private final RestHighLevelClient restHighLevelClient;
private final ElasticsearchConverter converter; // 依赖注入
public <T> void bulkUpsert(String index, List<EsEntity<T>> list) { public <T> void bulkUpsert(String index, List<EsEntity<T>> list) {
BulkRequest request = new BulkRequest(); BulkRequest request = new BulkRequest();
list.forEach(item -> { list.forEach(item -> {
Map<String, Object> docData = converter.mapObject(item.getData());
request.add(new UpdateRequest(index, item.getId()) request.add(new UpdateRequest(index, item.getId())
.doc(JSON.toJSONString(item.getData()), XContentType.JSON) .doc(docData)
.upsert(JSON.toJSONString(item.getData()), XContentType.JSON)); .upsert(docData));
}); });
try { try {
// request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
restHighLevelClient.bulk(request, RequestOptions.DEFAULT); restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) { } catch (Exception e) {
log.error("批量写入数据失败:{}", e.getMessage(), e); log.error("批量写入数据失败:{}", e.getMessage(), e);
......
...@@ -79,7 +79,7 @@ ...@@ -79,7 +79,7 @@
<if test="params.orgBranchCode != null and params.orgBranchCode != ''"> <if test="params.orgBranchCode != null and params.orgBranchCode != ''">
and s."ORG_BRANCH_CODE" like concat(#{params.orgBranchCode},'%') and s."ORG_BRANCH_CODE" like concat(#{params.orgBranchCode},'%')
</if> </if>
limit 10000 limit 1000
</select> </select>
<select id="selectMaxVersion" resultType="java.lang.Integer"> <select id="selectMaxVersion" resultType="java.lang.Integer">
SELECT SELECT
...@@ -120,9 +120,16 @@ ...@@ -120,9 +120,16 @@
WHERE ibjui."RECORD" = #{record} WHERE ibjui."RECORD" = #{record}
</select> </select>
<update id="updateVersionBatch"> <update id="updateVersionBatch">
<foreach collection="records" separator=";" item="record" open="" close=""> UPDATE
UPDATE idx_biz_jg_use_info SET "VERSION"=#{version} WHERE record = #{record} idx_biz_jg_use_info
</foreach> SET
"VERSION"=#{version}
WHERE
record = ANY(ARRAY[
<foreach collection="records" separator="," item="record" open="" close="">
#{record}
</foreach>
])
</update> </update>
<sql id="equip-detail-es"> <sql id="equip-detail-es">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment