Commit 16623b7e authored by tianyiming's avatar tianyiming

feat(statistics): 添加气瓶使用表中record同步到监管表功能

feat(jg): 增加压力管道长度刷入es功能
parent 80a6de49
package com.yeejoin.amos.boot.module.common.api.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.yeejoin.amos.boot.biz.common.annotation.FieldDisplayDefine;
import com.yeejoin.amos.boot.biz.common.annotation.TechnicalParameter;
import com.yeejoin.amos.boot.module.common.api.dto.TechParamItem;
import lombok.Data;
......@@ -398,6 +396,12 @@ public class ESEquipmentInfo {
private List<Maintenance> maintenances;
/**
* 压力管道长度
*/
@Field(type = FieldType.Double)
private Double pipeLength;
/**
* 技术参数
*/
@Field(type = FieldType.Nested)
......
......@@ -262,6 +262,15 @@ public class DataHandlerController extends BaseController {
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "压力管道长度刷入es", notes = "压力管道长度刷入es")
@PutMapping(value = "/equip/addPipeLength2Es")
public ResponseModel<Object> addPipeLength2Es() {
dataHandlerService.addPipeLength2Es();
return ResponseHelper.buildResponse(true);
}
/**
* @apiNote 场车车牌号刷入ES
*
......
......@@ -20,10 +20,12 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.*;
import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
......@@ -67,6 +69,13 @@ public class EquipmentRefreshHandler implements IDataRefreshHandler {
esEquipmentInfo.setInspections(BeanUtil.copyToList(inspectionDetectionInfos, ESEquipmentInfo.Inspection.class));
esEquipmentInfo.setMaintenances(lastMaintenanceRecordInfo.getSequenceNbr() != null ? Collections.singletonList(BeanUtil.copyProperties(lastMaintenanceRecordInfo, ESEquipmentInfo.Maintenance.class)) : new ArrayList<>());
esEquipmentInfo.setTechParams(this.buildTechParamByEquList(record, esEquipmentInfo.getEQU_LIST_CODE()));
if ("8000".equals(esEquipmentInfo.getEQU_LIST_CODE())) {
List<ESEquipmentInfo.TechParam> techParams = esEquipmentInfo.getTechParams();
List<ESEquipmentInfo.TechParam> pipeLength = techParams.stream().filter(e -> e.getParamKey().equals("pipeLength") && e.getDoubleValue() != null).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(pipeLength)) {
esEquipmentInfo.setPipeLength(pipeLength.get(0).getDoubleValue());
}
}
esEquipmentDao.save(esEquipmentInfo);
break;
default:
......
......@@ -1563,6 +1563,13 @@ public class DataHandlerServiceImpl {
esEquipmentInfo.setInspections(BeanUtil.copyToList(inspectionDetectionInfos, ESEquipmentInfo.Inspection.class));
esEquipmentInfo.setMaintenances(lastMaintenanceRecordInfo.getSequenceNbr() != null ? Collections.singletonList(BeanUtil.copyProperties(lastMaintenanceRecordInfo, ESEquipmentInfo.Maintenance.class)) : new ArrayList<>());
esEquipmentInfo.setTechParams(this.buildTechParamByEquList(record, esEquipmentInfo.getEQU_LIST_CODE()));
if ("8000".equals(esEquipmentInfo.getEQU_LIST_CODE())) {
List<ESEquipmentInfo.TechParam> techParams = esEquipmentInfo.getTechParams();
List<ESEquipmentInfo.TechParam> pipeLength = techParams.stream().filter(e -> e.getParamKey().equals("pipeLength") && e.getDoubleValue() != null).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(pipeLength)) {
esEquipmentInfo.setPipeLength(pipeLength.get(0).getDoubleValue());
}
}
} catch (Exception e) {
// 异常数据跳过
log.error("设备刷数据处理失败:{}", record, e);
......@@ -1662,6 +1669,13 @@ public class DataHandlerServiceImpl {
esEquipmentInfo.setInspections(BeanUtil.copyToList(inspectionDetectionInfos, ESEquipmentInfo.Inspection.class));
esEquipmentInfo.setMaintenances(lastMaintenanceRecordInfo.getSequenceNbr() != null ? Collections.singletonList(BeanUtil.copyProperties(lastMaintenanceRecordInfo, ESEquipmentInfo.Maintenance.class)) : new ArrayList<>());
esEquipmentInfo.setTechParams(this.buildTechParamByEquList(record, esEquipmentInfo.getEQU_LIST_CODE()));
if ("8000".equals(esEquipmentInfo.getEQU_LIST_CODE())) {
List<ESEquipmentInfo.TechParam> techParams = esEquipmentInfo.getTechParams();
List<ESEquipmentInfo.TechParam> pipeLength = techParams.stream().filter(e -> e.getParamKey().equals("pipeLength") && e.getDoubleValue() != null).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(pipeLength)) {
esEquipmentInfo.setPipeLength(pipeLength.get(0).getDoubleValue());
}
}
} catch (Exception e) {
// 异常数据跳过
log.error("设备刷数据处理失败:{}", record, e);
......@@ -1716,4 +1730,98 @@ public class DataHandlerServiceImpl {
return Optional.ofNullable(manage).map(JgUseRegistrationManage::getRegDate).map(d -> d.toInstant().atZone(ZoneId.systemDefault()).toLocalDate()).orElse(null);
}
public void addPipeLength2Es() {
log.info("压力管道长度刷入es-设备信息入库开始");
StopWatch watch = new StopWatch();
watch.start();
Integer count = useInfoService.getBaseMapper().selectPiPeCount();
Integer times = 0;
if (count != 0) {
times = count / 5000;
int last = count % 5000;
if (last > 0) {
times++;
}
} else {
return;
}
Page<String> recordDtoPage = new Page<>();
for (int j = 0; j <= times; j++) {
recordDtoPage.setCurrent(j + 1);
recordDtoPage.setSize(5000);
Page<String> refreshRecords = useInfoService.getBaseMapper().selectPiPeRecords(recordDtoPage);
if(!ObjectUtils.isEmpty(refreshRecords)&& refreshRecords.getRecords().size() > 0){
List<String> records = refreshRecords.getRecords();
StopWatch watch0 = new StopWatch();
watch0.start();
List<Map<String, Object>> details = useInfoService.getBaseMapper().queryDetailBatch(records);
Map<String, Map<String, Object>> recordDetailMap = details.stream().collect(Collectors.toMap(e -> (String) e.get("SEQUENCE_NBR"), Function.identity(), (k1, k2) -> k2));
// 设备最新的维保信息-维度:设备
List<IdxBizJgMaintenanceRecordInfo> lastMaintenanceRecordInfos = maintenanceRecordInfoService.getBaseMapper().selectLastedMainInfoBatch(records);
Map<String, List<IdxBizJgMaintenanceRecordInfo>> recordLastMaintMap = lastMaintenanceRecordInfos.stream().collect(Collectors.groupingBy(IdxBizJgMaintenanceRecordInfo::getRecord));
// 设备、各检验类型下最新的检验信息-维度:设备、检验类型
List<IdxBizJgInspectionDetectionInfo> lastedInspectInfosGroupByInspectType = idxBizJgInspectionDetectionInfoService.getBaseMapper().selectLastedGroupByInspectTypeBatch(records);
Map<String, List<IdxBizJgInspectionDetectionInfo>> recordInspectInfosGroupByInspectTypeMap = lastedInspectInfosGroupByInspectType.stream().collect(Collectors.groupingBy(IdxBizJgInspectionDetectionInfo::getRecord));
// 设备最新的检验信息-维度:设备
Map<String, Optional<IdxBizJgInspectionDetectionInfo>> recordLastInspectionMap = lastedInspectInfosGroupByInspectType.stream().filter(e -> e.getInspectDate() != null).collect(Collectors.groupingBy(IdxBizJgInspectionDetectionInfo::getRecord, Collectors.maxBy(Comparator.comparing(IdxBizJgInspectionDetectionInfo::getInspectDate))));
List<ESEquipmentInfo> esEquipmentInfos = records.parallelStream().map(record -> {
ESEquipmentInfo esEquipmentInfo = null;
try {
esEquipmentInfo = new ESEquipmentInfo();
Map<String, Object> detail = recordDetailMap.get(record);
StatisticsDataUpdateService.formatUseDate(detail);
BeanUtil.copyProperties(detail, esEquipmentInfo, true);
Optional<ESEquipmentCategoryDto> esOptional = esEquipmentCategory.findById(record);
if (esOptional.isPresent()) {
ESEquipmentCategoryDto dto = esOptional.get();
esEquipmentInfo.setUSC_UNIT_CREDIT_CODE(dto.getUSC_UNIT_CREDIT_CODE());
esEquipmentInfo.setUSC_UNIT_NAME(dto.getUSC_UNIT_NAME());
esEquipmentInfo.setIS_DO_BUSINESS(dto.getIS_DO_BUSINESS());
}
// 最新检验信息-维度record
IdxBizJgInspectionDetectionInfo inspectionDetectionInfo = Optional.ofNullable(recordLastInspectionMap.get(record)).flatMap(i -> i).orElse(new IdxBizJgInspectionDetectionInfo());
// 最新维保信息-维度record
IdxBizJgMaintenanceRecordInfo lastMaintenanceRecordInfo = Optional.ofNullable(recordLastMaintMap.get(record)).filter(l -> !l.isEmpty()).map(list -> list.get(0)).orElse(new IdxBizJgMaintenanceRecordInfo());
// 最新检验信息-维度record、检验类型,存最新的一条
List<IdxBizJgInspectionDetectionInfo> inspectionDetectionInfos = recordInspectInfosGroupByInspectTypeMap.getOrDefault(record, new ArrayList<>());
StatisticsDataUpdateService.formatInspectDate(esEquipmentInfo, inspectionDetectionInfo, record);
esEquipmentInfo.setIssueDate(getIssueDate(esEquipmentInfo.getUSE_ORG_CODE()));
esEquipmentInfo.setMAINTAIN_UNIT(lastMaintenanceRecordInfo.getMeUnitCreditCode());
esEquipmentInfo.setMAINTAIN_UNIT_NAME(lastMaintenanceRecordInfo.getMeUnitName());
esEquipmentInfo.setInspections(BeanUtil.copyToList(inspectionDetectionInfos, ESEquipmentInfo.Inspection.class));
esEquipmentInfo.setMaintenances(lastMaintenanceRecordInfo.getSequenceNbr() != null ? Collections.singletonList(BeanUtil.copyProperties(lastMaintenanceRecordInfo, ESEquipmentInfo.Maintenance.class)) : new ArrayList<>());
esEquipmentInfo.setTechParams(this.buildTechParamByEquList(record, esEquipmentInfo.getEQU_LIST_CODE()));
if ("8000".equals(esEquipmentInfo.getEQU_LIST_CODE())) {
List<ESEquipmentInfo.TechParam> techParams = esEquipmentInfo.getTechParams();
List<ESEquipmentInfo.TechParam> pipeLength = techParams.stream().filter(e -> e.getParamKey().equals("pipeLength") && e.getDoubleValue() != null).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(pipeLength)) {
esEquipmentInfo.setPipeLength(pipeLength.get(0).getDoubleValue());
}
}
} catch (Exception e) {
// 异常数据跳过
log.error("设备刷数据处理失败:{}", record, e);
}
return esEquipmentInfo;
}).collect(Collectors.toList());
watch0.stop();
log.warn("多线程处理查询设备详情耗时:{}", watch0.getTotalTimeSeconds());
esEquipmentInfos = esEquipmentInfos.stream().filter(e -> StringUtils.isNotEmpty(e.getSEQUENCE_NBR())).collect(Collectors.toList());
if (!esEquipmentInfos.isEmpty()) {
StopWatch watch1 = new StopWatch();
watch1.start();
esEquipmentDao.saveAll(esEquipmentInfos);
watch1.stop();
log.warn("es批量入库条数:{},耗时:{}s", esEquipmentInfos.size(), watch1.getTotalTimeSeconds());
}
}
}
watch.stop();
log.info("增量添加ORG_BRANCH_CODE为50X综合搜索数据-设备信息入库结束,耗时:{}秒", watch.getTotalTimeSeconds());
}
}
......@@ -2,6 +2,7 @@ package com.yeejoin.amos.boot.module.statistics.api.mapper;
import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.yeejoin.amos.boot.module.statistics.api.entity.TzsCustomFilter;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Param;
......@@ -23,4 +24,8 @@ public interface TzsCustomFilterMapper extends BaseMapper<TzsCustomFilter> {
Integer selectEquipmentCategoryCountByParentId(@Param("parentId") String parentId);
JSONArray queryEquCategory(@Param("type") String type, @Param("description") String description);
Page<String> selectRecords(Page<String> page);
void addGas(@Param("refreshRecords")List<String> refreshRecords);
}
......@@ -28,4 +28,28 @@
<select id="selectEquipmentCategoryCountByParentId" resultType="java.lang.Integer">
select count(1) from tz_equipment_category where is_delete = 0 and parent_id = #{parentId}
</select>
<select id="selectRecords" resultType="java.lang.String">
SELECT
ibjui."RECORD"
FROM
amos_tzs_biz.idx_biz_jg_use_info ibjui
LEFT JOIN amos_tzs_biz.idx_biz_jg_supervision_info ibjsi ON ibjui."RECORD" = ibjsi."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_register_info ibjri ON ibjui."RECORD" = ibjri."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_other_info ibjoi ON ibjui."RECORD" = ibjoi."RECORD"
WHERE
"ORG_BRANCH_CODE" IS NULL
AND ibjui."RECORD" is not null
AND ibjsi."RECORD" is null
AND ibjri."EQU_CATEGORY" = '2300'
AND "CLAIM_STATUS" = '已认领'
order by ibjui."RECORD" desc
</select>
<insert id="addGas">
INSERT INTO "amos_tzs_biz"."idx_biz_jg_supervision_info" ("SEQUENCE_NBR", "RECORD")
VALUES
<foreach collection="refreshRecords" item="record" separator=",">
(#{record}, #{record})
</foreach>
</insert>
</mapper>
......@@ -211,6 +211,15 @@ public class ComprehensiveStatisticalAnalysisController extends BaseController {
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "添加气瓶使用表中record同步到监管表", notes = "添加气瓶使用表中record同步到监管表")
@PutMapping(value = "/equip/addGasRecordToSupervision")
public ResponseModel<Integer> addGasRecordToSupervision() {
return ResponseHelper.buildResponse(statisticalAnalysisService.addGasRecordToSupervision());
}
/**
* 大屏综合统计查询接口
*
......
......@@ -47,6 +47,8 @@ import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
......@@ -301,7 +303,7 @@ public class ComprehensiveStatisticalAnalysisServiceImpl {
String licensesStatusStr = "";
if (!ObjectUtils.isEmpty(filter.get("filterParams"))) {
filterParams = JSONObject.parseObject(JSONObject.toJSONString(filter.get("filterParams")));
String filterType = "advanced";
String filterType = filter.getString("filterType");
// 组装人员过滤条件
this.getPersonBoolQueryBuilder(filterParams, boolMust, filterType);
// 资质判断
......@@ -1290,53 +1292,17 @@ public class ComprehensiveStatisticalAnalysisServiceImpl {
private BigDecimal getPipeLength(BoolQueryBuilder boolMust, SearchSourceBuilder builder, String countField) {
SearchRequest request = new SearchRequest();
request.indices(StatisticalAnalysisEnum.equip.getKey());
BigDecimal pipeLong = new BigDecimal(0);
// String painlessScript =
// "double total = 0.0; " +
// "for (int i = 0; i < doc['techParams.doubleValue'].length; i++) { " +
// " if (doc['techParams.paramKey'][i] == 'pipeLength') { " +
// " try { " +
// " total += Double.parseDouble(doc['techParams.doubleValue'][i].toString()); " +
// " } catch (Exception e) {} " +
// " } " +
// "} " +
// "return total;";
// NestedAggregationBuilder nestedAgg = AggregationBuilders.nested(countField, "techParams");
// SumAggregationBuilder sumAgg = AggregationBuilders.sum("pipeLength")
// .script(new Script(ScriptType.INLINE, "painless", painlessScript, Collections.emptyMap()));
// nestedAgg.subAggregation(sumAgg);
builder.query(boolMust).size(10000000);
BigDecimal pipeLong;
BoolQueryBuilder pipeLengthQuery = QueryBuilderUtils.copyBoolQuery(boolMust);
pipeLengthQuery.must(QueryBuilders.existsQuery("pipLineLength"));
SumAggregationBuilder pipeLengthAgg = AggregationBuilders.sum(countField).field("pipLineLength").missing(0);
builder.aggregation(pipeLengthAgg).size(0);
request.source(builder);
try {
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
JSONArray resultList = new JSONArray();
for (SearchHit hit : response.getHits().getHits()) {
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(hit);
JSONObject dto = jsonObject.getJSONObject("sourceAsMap");
resultList.add(dto.get("techParams"));
}
for (Object object : resultList) {
JSONArray jsonArray = (JSONArray) object;
for (Object o : jsonArray) {
JSONObject jsonObject = (JSONObject) o;
if(jsonObject.getString("paramKey").equals("pipeLength") && jsonObject.containsKey("doubleValue")){
BigDecimal value = new BigDecimal(jsonObject.get("doubleValue").toString());
pipeLong = pipeLong.add(value);
}
}
}
// // 获取最外层聚合结果
// Aggregations aggregations = response.getAggregations();
// // 1. 获取嵌套聚合结果
// ParsedNested nestedItems = aggregations.get(countField);
// 2. 从嵌套聚合中获取过滤聚合结果
// ParsedFilter filteredItems = nestedItems.getAggregations().get("filtered_items");
// 3. 从过滤聚合中获取指标聚合结果
// ParsedSum totalValue = nestedItems.getAggregations().get("pipeLength");
// double value = 0;
ParsedSum sumAgg = response.getAggregations().get(countField);
double value = sumAgg.getValue();
pipeLong = new BigDecimal(value);
} catch (IOException e) {
throw new RuntimeException(e);
}
......@@ -3686,4 +3652,28 @@ public class ComprehensiveStatisticalAnalysisServiceImpl {
return result;
}
public Integer addGasRecordToSupervision() {
Integer count = 80000;
Integer times = 0;
if (count != 0) {
times = count / 5000;
int last = count % 5000;
if (last > 0) {
times++;
}
} else {
return 0;
}
Page<String> recordDtoPage = new Page<>();
for (int j = 0; j <= times; j++) {
recordDtoPage.setCurrent(j + 1);
recordDtoPage.setSize(5000);
Page<String> refreshRecords = tzsCustomFilterMapper.selectRecords(recordDtoPage);
if(!ObjectUtils.isEmpty(refreshRecords)&& refreshRecords.getRecords().size() > 0){
tzsCustomFilterMapper.addGas(refreshRecords.getRecords());
}
}
return null;
}
}
\ No newline at end of file
......@@ -38,4 +38,9 @@ public interface IdxBizJgUseInfoMapper extends BaseMapper<IdxBizJgUseInfo> {
Page<String> selectAddDataRecords(Page<String> page);
Integer selectAddDataRecordsCount();
Integer selectPiPeCount();
Page<String> selectPiPeRecords(Page<String> page);
}
......@@ -154,5 +154,33 @@
<select id="selectAddDataRecordsCount" resultType="java.lang.Integer">
select count(1) from amos_tzs_biz.idx_biz_jg_supervision_info where "ORG_BRANCH_CODE" = '50*X'
</select>
<select id="selectPiPeCount" resultType="java.lang.Integer">
SELECT
count(1)
FROM
amos_tzs_biz.idx_biz_jg_use_info ibjui
LEFT JOIN amos_tzs_biz.idx_biz_jg_supervision_info ibjsi ON ibjui."RECORD" = ibjsi."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_register_info ibjri ON ibjui."RECORD" = ibjri."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_other_info ibjoi ON ibjui."RECORD" = ibjoi."RECORD"
WHERE
"ORG_BRANCH_CODE" IS NOT NULL
AND ibjri."EQU_LIST" = '8000'
AND "CLAIM_STATUS" NOT IN ('待认领','已拒领','草稿')
order by ibjui."RECORD" desc
</select>
<select id="selectPiPeRecords" resultType="java.lang.String">
SELECT
ibjui."RECORD"
FROM
amos_tzs_biz.idx_biz_jg_use_info ibjui
LEFT JOIN amos_tzs_biz.idx_biz_jg_supervision_info ibjsi ON ibjui."RECORD" = ibjsi."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_register_info ibjri ON ibjui."RECORD" = ibjri."RECORD"
LEFT JOIN amos_tzs_biz.idx_biz_jg_other_info ibjoi ON ibjui."RECORD" = ibjoi."RECORD"
WHERE
"ORG_BRANCH_CODE" IS NOT NULL
AND ibjri."EQU_LIST" = '8000'
AND "CLAIM_STATUS" NOT IN ('待认领','已拒领','草稿')
order by ibjui."RECORD" desc
</select>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment