Commit 1f8a90fd authored by suhuiguang's avatar suhuiguang

fix(jg) :数据质量等级计算

1.导入时计算慢问题处理
parent acd025ae
......@@ -3,6 +3,7 @@ package com.yeejoin.amos.boot.module.common.biz.service.impl;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
......@@ -11,9 +12,6 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;
import java.io.IOException;
......@@ -25,6 +23,7 @@ import static com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant
@Service
@RequiredArgsConstructor
@Slf4j
public class EquipmentCategoryService {
private final ESEquipmentCategory equipmentCategoryDao;
......@@ -55,4 +54,5 @@ public class EquipmentCategoryService {
throw new RuntimeException(e);
}
}
}
\ No newline at end of file
package com.yeejoin.amos.boot.module.jg.biz.reminder.core.event.service;
import cn.hutool.core.map.MapUtil;
import com.yeejoin.amos.boot.module.ymt.api.dto.EquipWaitRefreshDataQualityScore;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public abstract class DefaultQualityScoreUpdateService implements IQualityScoreUpdate {
......@@ -7,12 +13,16 @@ public abstract class DefaultQualityScoreUpdateService implements IQualityScoreU
/**
* 前置处理-定义目前支持的业务类型,由于目前仅实现使用登记、新增设备、新增装置、大编辑设备、大编辑装置、安装告知准则,
* 所有都实现后返回true即可
*
* @param bizType 业务类型
* @return 是否通过前置检验
*/
protected abstract Boolean shouldProcess(String bizType);
public void doUpdate(String bizType, Set<String> recordOrPIds) {
if (recordOrPIds.isEmpty()) {
return;
}
if (shouldProcess(bizType)) {
doHandle(bizType, recordOrPIds);
}
......@@ -21,6 +31,7 @@ public abstract class DefaultQualityScoreUpdateService implements IQualityScoreU
/**
* 执行
*
* @param bizType 业务类型
* @param recordOrPIds 装置或者设备id集合
*/
......@@ -28,7 +39,17 @@ public abstract class DefaultQualityScoreUpdateService implements IQualityScoreU
/**
* 事后处理
*
* @param recordOrPIds 装置或者设备id集合
*/
protected abstract void afterHandle(Set<String> recordOrPIds);
protected Map<String, Map<String, Object>> buildUpdateFields(List<EquipWaitRefreshDataQualityScore> dataQualityScores) {
Map<String, Map<String, Object>> fieldMap = new HashMap<>();
dataQualityScores.forEach(dataQualityScore -> {
fieldMap.put(dataQualityScore.getRecord(), MapUtil.of("DATA_QUALITY_SCORE", dataQualityScore.getDataQualityScore()));
});
return fieldMap;
}
}
package com.yeejoin.amos.boot.module.jg.biz.reminder.core.event.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.google.common.collect.Lists;
import com.yeejoin.amos.boot.biz.common.entity.TzsBaseEntity;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant;
import com.yeejoin.amos.boot.module.common.biz.constats.Constants;
import com.yeejoin.amos.boot.module.common.biz.refresh.DataRefreshEvent;
import com.yeejoin.amos.boot.module.common.biz.service.impl.EquipmentCategoryService;
import com.yeejoin.amos.boot.module.jg.api.dto.ReminderItemDto;
import com.yeejoin.amos.boot.module.jg.api.enums.BusinessTypeEnum;
import com.yeejoin.amos.boot.module.jg.biz.event.publisher.EventPublisher;
......@@ -16,25 +14,32 @@ import com.yeejoin.amos.boot.module.jg.biz.reminder.core.event.EquipCreateOrEdit
import com.yeejoin.amos.boot.module.jg.biz.reminder.core.event.service.DefaultQualityScoreUpdateService;
import com.yeejoin.amos.boot.module.jg.biz.reminder.dto.MatchItemDto;
import com.yeejoin.amos.boot.module.jg.biz.reminder.service.CommonReminderService;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.EsBulkService;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgRegisterInfoServiceImpl;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgUseInfoServiceImpl;
import com.yeejoin.amos.boot.module.ymt.api.dto.EquipWaitRefreshDataQualityScore;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgRegisterInfo;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgUseInfo;
import com.yeejoin.amos.boot.module.ymt.api.enums.EquipmentClassifityEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
@Slf4j
public class EquipQualityScoreUpdateService extends DefaultQualityScoreUpdateService {
private final ESEquipmentCategory equipmentCategoryDao;
private final IdxBizJgUseInfoServiceImpl idxBizJgUseInfoService;
private final IdxBizJgRegisterInfoServiceImpl idxBizJgRegisterInfoService;
......@@ -46,7 +51,7 @@ public class EquipQualityScoreUpdateService extends DefaultQualityScoreUpdateSer
private final GradeStrategyFactory gradeStrategyFactory;
private final EquipmentCategoryService equipmentCategoryService;
private final EsBulkService esBulkService;
private final EventPublisher eventPublisher;
......@@ -113,28 +118,45 @@ public class EquipQualityScoreUpdateService extends DefaultQualityScoreUpdateSer
@Override
protected void doHandle(String bizType, Set<String> recordOrPIds) {
recordOrPIds.forEach(record -> {
Optional<ESEquipmentCategoryDto> op = equipmentCategoryDao.findById(record);
// 压力管道不进行管道登记的计算
if (op.isPresent() && op.get().getEQU_LIST_CODE().equals(EquipmentClassifityEnum.YLGD.getCode())) {
if (recordOrPIds.isEmpty()) {
return;
}
Integer level = this.getReminderLevel(bizType, record);
op.ifPresent(equipmentCategory -> {
equipmentCategory.setDataQualityScore(level);
equipmentCategoryService.saveWithImmediateRefresh(equipmentCategory);
});
LambdaUpdateWrapper<IdxBizJgUseInfo> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(IdxBizJgUseInfo::getRecord, record);
updateWrapper.set(IdxBizJgUseInfo::getDataQualityScore, level);
updateWrapper.set(TzsBaseEntity::getRecDate, new Date());
idxBizJgUseInfoService.update(updateWrapper);
StopWatch watch = new StopWatch();
watch.start();
List<IdxBizJgRegisterInfo> idxBizJgRegisterInfos = idxBizJgRegisterInfoService.list(new LambdaQueryWrapper<IdxBizJgRegisterInfo>().in(IdxBizJgRegisterInfo::getRecord, recordOrPIds).select(IdxBizJgRegisterInfo::getRecord, IdxBizJgRegisterInfo::getEquList));
List<EquipWaitRefreshDataQualityScore> dataQualityScores = idxBizJgRegisterInfos.parallelStream().filter(r -> !EquipmentClassifityEnum.YLGD.getCode().equals(r.getEquList())).map(registerInfo -> {
EquipWaitRefreshDataQualityScore dataQualityScore = new EquipWaitRefreshDataQualityScore();
dataQualityScore.setRecord(registerInfo.getRecord());
try {
Integer level = this.getReminderLevel(bizType, registerInfo.getRecord());
dataQualityScore.setDataQualityScore(level);
} catch (Exception e) {
log.error("设备:「{}」,计算数据质量登记失败。", registerInfo.getRecord(), e);
dataQualityScore.setDataQualityScore(null);
}
return dataQualityScore;
}).collect(Collectors.toList());
if (!dataQualityScores.isEmpty()) {
// 分批数据库更新
Lists.partition(dataQualityScores, 200).forEach(partitionData -> {
try {
idxBizJgUseInfoService.getBaseMapper().updateDataQualityScoreBatch(partitionData, null);
// es更新
esBulkService.bulkUpdateFieldsByIds(this.buildUpdateFields(partitionData), TZSCommonConstant.ES_INDEX_NAME_JG_ALL);
} catch (Exception e) {
log.error("更新数据质量等级失败,数据为:「{}」", partitionData, e);
}
});
}
watch.stop();
log.info("计算数据质量等级数量:「{}」,耗时:「{}」", dataQualityScores.size(), watch.getTotalTimeSeconds());
}
@Override
protected void afterHandle(Set<String> recordOrPIds) {
// 事务提交后发送数据变更消息
this.sendDataRefreshMsgEquip(new ArrayList<>(recordOrPIds));
}
}
......@@ -21,6 +21,7 @@ import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgProjectContrapti
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgProjectContraption;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgUseInfo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
......@@ -34,6 +35,7 @@ import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
@Slf4j
public class ProjectQualityScoreUpdateService extends DefaultQualityScoreUpdateService {
......@@ -101,6 +103,7 @@ public class ProjectQualityScoreUpdateService extends DefaultQualityScoreUpdateS
@Override
protected void doHandle(String bizType, Set<String> projectContraptionIds) {
projectContraptionIds.forEach(projectContraptionId -> {
try {
Integer level = this.getReminderLevel(bizType, projectContraptionId);
LambdaUpdateWrapper<IdxBizJgProjectContraption> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(IdxBizJgProjectContraption::getSequenceNbr, projectContraptionId);
......@@ -120,6 +123,9 @@ public class ProjectQualityScoreUpdateService extends DefaultQualityScoreUpdateS
}
equipmentCategoryDao.saveAll(projectEquips);
}
} catch (Exception e) {
log.error("装置更新数据质量等级失败,装置 id为:「{}」", projectContraptionId, e);
}
});
}
......
......@@ -3,13 +3,18 @@ package com.yeejoin.amos.boot.module.jg.biz.service.impl;
import com.yeejoin.amos.boot.module.common.api.entity.EsEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
......@@ -39,4 +44,34 @@ public class EsBulkService {
throw new RuntimeException(e);
}
}
/**
* 批量更新指定索引中的文档字段
*
* @param documentIdFieldsMap key: 要更新的文档ID列表 ,value: 需要更新的字段映射(字段名 → 新值)
* @param targetIndex 目标索引名称
* @throws RuntimeException 当ES操作失败时抛出
*/
public void bulkUpdateFieldsByIds(Map<String, Map<String, Object>> documentIdFieldsMap, String targetIndex) {
try {
BulkRequest bulkRequest = new BulkRequest();
documentIdFieldsMap.forEach((documentId, fieldsToUpdate) -> {
// 构建批量请求
UpdateRequest updateRequest = new UpdateRequest(targetIndex, documentId).doc(fieldsToUpdate, XContentType.JSON);
bulkRequest.add(updateRequest);
});
BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
// 更详细的错误日志记录
if (response.hasFailures()) {
Arrays.stream(response.getItems())
.filter(BulkItemResponse::isFailed)
.forEach(item -> log.error("Failed to update {}: {}",
item.getId(),
item.getFailureMessage()));
}
} catch (IOException e) {
log.error("批量更新指定索引中的文档字段: batchSize={}, index={}", documentIdFieldsMap.size(), targetIndex, e);
}
}
}
......@@ -2,9 +2,11 @@ package com.yeejoin.amos.boot.module.ymt.api.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class EquipWaitRefreshDataQualityScore {
private String record;
......
......@@ -31,7 +31,7 @@ public interface IdxBizJgUseInfoMapper extends CustomBaseMapper<IdxBizJgUseInfo>
Integer selectMaxVersionWithParams(@Param("params") Map<String, Object> params);
void updateDataQualityScoreBatch(@Param("equips") List<EquipWaitRefreshDataQualityScore> refreshDataQualityScores, @Param("version") int version);
void updateDataQualityScoreBatch(@Param("equips") List<EquipWaitRefreshDataQualityScore> refreshDataQualityScores, @Param("version") Integer version);
void updateVersionBatch(@Param("records") List<String> records, @Param("version") int version);
......
......@@ -9,10 +9,21 @@
</foreach>
</update>
<update id="updateDataQualityScoreBatch">
<foreach collection="equips" separator=";" item="equip" open="" close="">
UPDATE idx_biz_jg_use_info SET "DATA_QUALITY_SCORE" = #{equip.dataQualityScore} , "VERSION"=#{version} WHERE record = #{equip.record}
UPDATE idx_biz_jg_use_info SET
"DATA_QUALITY_SCORE" = tmp."DATA_QUALITY_SCORE"
<if test="version != null and version !=''">
, "VERSION"=#{version}
</if>
FROM (
<foreach collection="equips" item="equip" separator="UNION ALL">
SELECT
#{equip.record} AS record,
#{equip.dataQualityScore} AS "DATA_QUALITY_SCORE"
</foreach>
) AS tmp
WHERE idx_biz_jg_use_info.record = tmp.record
</update>
<select id="selectXAList" resultType="com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgUseInfo">
select
u."SEQUENCE_NBR",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment