Commit e4289e0e authored by suhuiguang's avatar suhuiguang

Merge branch 'develop_tzs_test' into develop_tzs_register

# Conflicts: # amos-boot-system-tzs/amos-boot-module-jg/amos-boot-module-jg-biz/src/main/java/com/yeejoin/amos/boot/module/jg/biz/edit/event/listener/ChangeEquipImpactCertListener.java
parents 940dcba0 62616758
......@@ -10,6 +10,7 @@ import java.lang.annotation.*;
public @interface FieldDisplayDefine {
String value();
/**
* 字段别名 兼容前端使用
*
......@@ -21,15 +22,53 @@ public @interface FieldDisplayDefine {
String format() default "yyyy-MM-dd";
/**
* 是否是数据库字典
*
* @return 是否
*/
boolean isExist() default true;
/**
* 处理器bean 名称
*
* @return bean 名称唯一
*/
String typeHandler() default "defaultTypeHandler";
/**
* 字典 key
*
* @return 字典key
*/
String dictCode() default "";
/**
* 字典类型
*
* @return 平台字典、还是业务字典
*/
DictType dictType() default DictType.no;
/**
* 是否冗余字段,如行政区划名称,冗余时则该字段不再记录变更日志
*
* @return 是否冗余
*/
boolean isRepeatColumn() default false;
enum DictType {
/**
* 业务字典
*/
cb,
/**
* 平台字典
*/
platform,
/**
* 非字典
*/
no
}
}
......@@ -219,4 +219,10 @@ public class ESEquipmentCategoryDto {
*/
@Field(type = FieldType.Keyword)
private String whetherSphericalTank;
/**
* 版本号
*/
@Field(type = FieldType.Keyword)
private String version;
}
......@@ -36,7 +36,7 @@ public class TechParamsVesselChangeFieldDto extends BaseTechParamsFieldDto {
@FieldDisplayDefine(value = "总容积")
private String totalVolume;
@FieldDisplayDefine(value = "充装介质")
@FieldDisplayDefine(value = "充装介质", dictCode = "FILLING_MEDIUM", dictType = FieldDisplayDefine.DictType.platform)
private String chargingMedium;
@FieldDisplayDefine(value = "规格")
......
......@@ -52,5 +52,5 @@ public interface JgChangeRegistrationUnitMapper extends CustomBaseMapper<JgChang
List<CompanyEquipCountDto> queryForFlowingEquipList();
List<Map<String, Object>> getEstateUnitInfo(List<String> records);
List<Map<String, Object>> getEstateUnitInfo(@Param("records") List<String> records);
}
......@@ -126,21 +126,4 @@
and a.audit_status in ('三级待受理', '二级待受理', '一级待受理')
GROUP BY a.use_unit_credit_code
</select>
<select id="getEstateUnitInfo" resultType="java.util.Map">
SELECT
CONCAT(jui.ESTATE_UNIT_CREDIT_CODE, '_', jui.ESTATE_UNIT_NAME) AS estateUnitName
FROM
idx_biz_jg_register_info ri
LEFT JOIN idx_biz_jg_use_info jui on ri.RECORD = jui.RECORD
WHERE ri."EQU_CATEGORY" = '2300'
AND jui."DATA_SOURCE" like 'jg%'
AND ri.whether_vehicle_cylinder = 1
AND jui."ESTATE_UNIT_NAME" IS NOT NULL
AND jui."ESTATE_UNIT_CREDIT_CODE" IS NOT NULL
AND jui.record in
<foreach collection="records" item="record" open="(" close=")" separator=",">
#{record}
</foreach>
</select>
</mapper>
......@@ -178,4 +178,21 @@
and a.status in ('三级待受理', '二级待受理', '一级待受理')
GROUP BY a.use_unit_credit_code
</select>
<select id="getEstateUnitInfo" resultType="java.util.Map">
SELECT
CONCAT(jui.ESTATE_UNIT_CREDIT_CODE, '_', jui.ESTATE_UNIT_NAME) AS estateUnitName
FROM
idx_biz_jg_register_info ri
LEFT JOIN idx_biz_jg_use_info jui on ri.RECORD = jui.RECORD
WHERE ri."EQU_CATEGORY" = '2300'
AND jui."DATA_SOURCE" like 'jg%'
AND ri.whether_vehicle_cylinder = 1
AND jui."ESTATE_UNIT_NAME" IS NOT NULL
AND jui."ESTATE_UNIT_CREDIT_CODE" IS NOT NULL
AND jui.record in
<foreach collection="records" item="record" open="(" close=")" separator=",">
#{record}
</foreach>
</select>
</mapper>
......@@ -421,4 +421,25 @@ public class DataHandlerController extends BaseController {
dataHandlerService.addDbData2EsBatch(paramMap);
return ResponseHelper.buildResponse(true);
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@PutMapping(value = "/sync/es2db/org-branches")
@ApiOperation(httpMethod = "PUT", value = "1.将属地监管部门以es老索引为准刷数据库", notes = "属地监管部门以es为准刷数据库")
public ResponseModel<Long> orgBranchCode2Db(@ApiParam(value = "设备种类code") @RequestParam String equListCode,
@ApiParam(value = "设备类别code") @RequestParam(required = false) String equCategoryCode,
@ApiParam(value = "属地code") @RequestParam String orgBranchCode ,
@ApiParam(value = "序列号,不能重复") @RequestParam Integer seq) {
return ResponseHelper.buildResponse(dataHandlerService.orgBranchCode2Db(equListCode, equCategoryCode, orgBranchCode, seq));
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@PutMapping(value = "/sync/db2es/equipments")
@ApiOperation(httpMethod = "PUT", value = "2.同步设备数据到es新旧索引", notes = "同步设备数据到es新旧索引")
public ResponseModel<Integer> syncEquipFromDb2Es(@ApiParam(value = "设备种类code") @RequestParam String equListCode,
@ApiParam(value = "设备类别code") @RequestParam(required = false) String equCategoryCode,
@ApiParam(value = "属地code") @RequestParam String orgBranchCode) {
return ResponseHelper.buildResponse(dataHandlerService.synEquipFromDb2Es(equListCode, equCategoryCode, orgBranchCode));
}
}
\ No newline at end of file
package com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgUseInfoServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.util.StopWatch;
import java.util.List;
import java.util.Map;
@Slf4j
public abstract class FilterableBatchDataPatcher implements HistoricalDataPatcher<Map<String, Object>> {
private final ApplicationContext applicationContext;
protected FilterableBatchDataPatcher(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public Integer patchBatchData(Map<String, Object> params) {
StopWatch watch = new StopWatch();
watch.start();
IdxBizJgUseInfoServiceImpl useInfoService = applicationContext.getBean(IdxBizJgUseInfoServiceImpl.class);
Integer maxVersion = useInfoService.getBaseMapper().selectMaxVersionWithParams(buildFilter(params));
Integer nextVersion = maxVersion + 1;
List<String> refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion, buildFilter(params));
int patchSize = refreshRecords.size();
while (!refreshRecords.isEmpty()) {
try {
refreshRecords.parallelStream().forEach(record -> {
try {
beforePatching(record);
patchSingleRecord(record);
afterPatching(record);
} catch (Exception e) {
// 异常数据跳过
log.error("单个方式数据修补失败,设备:{}", record, e);
}
});
patchBatchRecord(refreshRecords);
} catch (Exception e) {
// 本批次异常数据跳过
log.error("数据修补失败,设备:{}", refreshRecords, e);
} finally {
StopWatch watch1 = new StopWatch();
watch1.start();
useInfoService.getBaseMapper().updateVersionBatch(refreshRecords, nextVersion);
watch1.stop();
log.info("版本号批量更新条数:「{}」, 耗时:「{}」", refreshRecords.size(), watch1.getTotalTimeSeconds());
refreshRecords = useInfoService.getBaseMapper().selectUseInfoOfOneVersionWithParams(nextVersion, buildFilter(params));
patchSize = patchSize + refreshRecords.size();
}
}
watch.stop();
log.warn("数据修补完成,共处理{}条记录,耗时: {}秒", patchSize, watch.getTotalTimeSeconds());
return patchSize;
}
protected abstract void patchBatchRecord(List<String> refreshRecords);
protected abstract Map<String, Object> buildFilter(Map<String, Object> params);
protected abstract void beforePatching(String record);
protected abstract void patchSingleRecord(String record);
protected abstract void afterPatching(String record);
}
......@@ -6,18 +6,19 @@ import org.springframework.context.ApplicationContext;
import org.springframework.util.StopWatch;
import java.util.List;
import java.util.Map;
@Slf4j
public abstract class BatchDataPatcher implements HistoricalDataPatcher {
public abstract class FullBatchDataPatcher implements HistoricalDataPatcher<Map<String, Object>> {
private final ApplicationContext applicationContext;
protected BatchDataPatcher(ApplicationContext applicationContext) {
protected FullBatchDataPatcher(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public Integer patchBatchData() {
public Integer patchBatchData(Map<String, Object> params) {
StopWatch watch = new StopWatch();
watch.start();
IdxBizJgUseInfoServiceImpl useInfoService = applicationContext.getBean(IdxBizJgUseInfoServiceImpl.class);
......
package com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher;
public interface HistoricalDataPatcher {
public interface HistoricalDataPatcher<T> {
/**
* 执行批量修补
*
* @return 处理成功的记录数,如果不可计算则返回null
*/
Integer patchBatchData();
Integer patchBatchData(T params);
}
package com.yeejoin.amos.boot.module.jg.biz.data.fix.service;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.entity.ESEquipmentInfo;
import com.yeejoin.amos.boot.module.common.api.entity.EsEntity;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher.FilterableBatchDataPatcher;
import com.yeejoin.amos.boot.module.jg.biz.edit.utils.JsonDiffUtil;
import com.yeejoin.amos.boot.module.jg.biz.refresh.StatisticsDataUpdateService;
import com.yeejoin.amos.boot.module.jg.biz.refresh.handler.EquipmentRefreshHandler;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.EsBulkService;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgInspectionDetectionInfo;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgMaintenanceRecordInfo;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgSupervisionInfo;
import com.yeejoin.amos.boot.module.ymt.api.mapper.IdxBizJgInspectionDetectionInfoMapper;
import com.yeejoin.amos.boot.module.ymt.api.mapper.IdxBizJgMaintenanceRecordInfoMapper;
import com.yeejoin.amos.boot.module.ymt.api.mapper.IdxBizJgSupervisionInfoMapper;
import com.yeejoin.amos.boot.module.ymt.api.mapper.IdxBizJgUseInfoMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StopWatch;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.yeejoin.amos.boot.module.jg.biz.service.impl.DataHandlerServiceImpl.IDX_BIZ_EQUIPMENT_INFO;
import static com.yeejoin.amos.boot.module.jg.biz.service.impl.DataHandlerServiceImpl.IDX_BIZ_VIEW_JG_ALL;
@Component
@Slf4j
public class FilterableEquipInsert2EsPatcher extends FilterableBatchDataPatcher {
private final ESEquipmentCategory equipmentCategory;
private final EquipmentRefreshHandler refreshHandler;
private final IdxBizJgUseInfoMapper idxBizJgUseInfoMapper;
private final IdxBizJgSupervisionInfoMapper idxBizJgSupervisionInfoMapper;
private final IdxBizJgMaintenanceRecordInfoMapper maintenanceRecordInfoMapper;
private final IdxBizJgInspectionDetectionInfoMapper inspectionDetectionInfoMapper;
private final EsBulkService esBulkService;
protected FilterableEquipInsert2EsPatcher(ApplicationContext applicationContext, ESEquipmentCategory equipmentCategory, EquipmentRefreshHandler refreshHandler, IdxBizJgUseInfoMapper idxBizJgUseInfoMapper, IdxBizJgSupervisionInfoMapper idxBizJgSupervisionInfoMapper, IdxBizJgMaintenanceRecordInfoMapper maintenanceRecordInfoMapper, IdxBizJgInspectionDetectionInfoMapper inspectionDetectionInfoMapper, EsBulkService esBulkService) {
super(applicationContext);
this.equipmentCategory = equipmentCategory;
this.refreshHandler = refreshHandler;
this.idxBizJgUseInfoMapper = idxBizJgUseInfoMapper;
this.idxBizJgSupervisionInfoMapper = idxBizJgSupervisionInfoMapper;
this.maintenanceRecordInfoMapper = maintenanceRecordInfoMapper;
this.inspectionDetectionInfoMapper = inspectionDetectionInfoMapper;
this.esBulkService = esBulkService;
}
@Override
protected void patchBatchRecord(List<String> refreshRecords) {
log.info("批量处理设备信息到es开始");
StopWatch watch = new StopWatch();
watch.start("批量查询设备信息");
List<Map<String, Object>> details = idxBizJgUseInfoMapper.queryDetailBatch(refreshRecords);
watch.stop();
Map<String, Map<String, Object>> recordDetailMap = details.stream().collect(Collectors.toMap(e -> (String) e.get("SEQUENCE_NBR"), Function.identity(), (k1, k2) -> k2));
watch.start("组装es设备老索引更新及新增的对象数据");
// 组装es设备老索引更新及新增的对象数据
List<ESEquipmentCategoryDto> esEquipmentCategoryDtos = getEsEquipmentCategoryDtos(refreshRecords, recordDetailMap);
watch.stop();
watch.start("组装es设备新索引更新及新增的对象数据");
// 组装es设备新索引更新及新增的对象数据
List<ESEquipmentInfo> esEquipmentInfos = getEsEquipmentInfos(refreshRecords, recordDetailMap);
watch.stop();
watch.start("es设备新旧索引保存");
// 多线程保存
List<CompletableFuture<Void>> futures = new ArrayList<>();
if (!esEquipmentCategoryDtos.isEmpty()) {
futures.add(CompletableFuture.runAsync(() -> {
StopWatch watch4 = new StopWatch();
watch4.start();
esBulkService.bulkUpsert(IDX_BIZ_VIEW_JG_ALL, esEquipmentCategoryDtos.stream().map(e -> new EsEntity<>(e.getSEQUENCE_NBR(), e)).collect(Collectors.toList()));
watch4.stop();
log.warn("[设备老索引] 批量入库 {} 条,耗时 {}s",
esEquipmentCategoryDtos.size(), watch4.getTotalTimeSeconds());
}));
}
if (!esEquipmentInfos.isEmpty()) {
futures.add(CompletableFuture.runAsync(() -> {
StopWatch watch4 = new StopWatch();
watch4.start();
esBulkService.bulkUpsert(IDX_BIZ_EQUIPMENT_INFO, esEquipmentInfos.stream().map(e -> new EsEntity<>(e.getSEQUENCE_NBR(), e)).collect(Collectors.toList()));
watch4.stop();
log.warn("[设备新索引] 批量入库 {} 条,耗时 {}s",
esEquipmentInfos.size(), watch4.getTotalTimeSeconds());
}));
}
// 等待所有任务完成(阻塞当前线程)
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
watch.stop();
log.warn("新索引数据补充匹配补充,总耗时情况:{}", watch.prettyPrint());
}
private List<ESEquipmentInfo> getEsEquipmentInfos(List<String> refreshRecords, Map<String, Map<String, Object>> recordDetailMap) {
// 设备最新的维保信息-维度:设备
List<IdxBizJgMaintenanceRecordInfo> lastMaintenanceRecordInfos = maintenanceRecordInfoMapper.selectLastedMainInfoBatch(refreshRecords);
Map<String, List<IdxBizJgMaintenanceRecordInfo>> recordLastMaintMap = lastMaintenanceRecordInfos.stream().collect(Collectors.groupingBy(IdxBizJgMaintenanceRecordInfo::getRecord));
// 设备、各检验类型下最新的检验信息-维度:设备、检验类型
List<IdxBizJgInspectionDetectionInfo> lastedInspectInfosGroupByInspectType = inspectionDetectionInfoMapper.selectLastedGroupByInspectTypeBatch(refreshRecords);
Map<String, List<IdxBizJgInspectionDetectionInfo>> recordInspectInfosGroupByInspectTypeMap = lastedInspectInfosGroupByInspectType.stream().collect(Collectors.groupingBy(IdxBizJgInspectionDetectionInfo::getRecord));
// 设备最新的检验信息-维度:设备
Map<String, Optional<IdxBizJgInspectionDetectionInfo>> recordLastInspectionMap = lastedInspectInfosGroupByInspectType.stream().filter(e -> e.getNextInspectDate() != null).collect(Collectors.groupingBy(IdxBizJgInspectionDetectionInfo::getRecord, Collectors.maxBy(Comparator.comparing(IdxBizJgInspectionDetectionInfo::getNextInspectDate))));
List<ESEquipmentInfo> esEquipmentInfos = refreshRecords.parallelStream().map(record -> {
ESEquipmentInfo esEquipmentInfo = null;
try {
esEquipmentInfo = new ESEquipmentInfo();
Map<String, Object> detail = recordDetailMap.get(record);
StatisticsDataUpdateService.formatUseDate(detail);
BeanUtil.copyProperties(detail, esEquipmentInfo, true);
// 最新检验信息-维度record
IdxBizJgInspectionDetectionInfo inspectionDetectionInfo = Optional.ofNullable(recordLastInspectionMap.get(record)).flatMap(i -> i).orElse(new IdxBizJgInspectionDetectionInfo());
// 最新维保信息-维度record
IdxBizJgMaintenanceRecordInfo lastMaintenanceRecordInfo = Optional.ofNullable(recordLastMaintMap.get(record)).filter(l -> !l.isEmpty()).map(list -> list.get(0)).orElse(new IdxBizJgMaintenanceRecordInfo());
// 最新检验信息-维度record、检验类型,存最新的一条
List<IdxBizJgInspectionDetectionInfo> inspectionDetectionInfos = recordInspectInfosGroupByInspectTypeMap.getOrDefault(record, new ArrayList<>());
StatisticsDataUpdateService.formatInspectDate(esEquipmentInfo, inspectionDetectionInfo, record);
esEquipmentInfo.setIssueDate(refreshHandler.getIssueDate(esEquipmentInfo.getUSE_ORG_CODE()));
esEquipmentInfo.setMAINTAIN_UNIT(lastMaintenanceRecordInfo.getMeUnitCreditCode());
esEquipmentInfo.setMAINTAIN_UNIT_NAME(lastMaintenanceRecordInfo.getMeUnitName());
esEquipmentInfo.setInspections(BeanUtil.copyToList(inspectionDetectionInfos, ESEquipmentInfo.Inspection.class));
esEquipmentInfo.setMaintenances(lastMaintenanceRecordInfo.getSequenceNbr() != null ? Collections.singletonList(BeanUtil.copyProperties(lastMaintenanceRecordInfo, ESEquipmentInfo.Maintenance.class)) : new ArrayList<>());
esEquipmentInfo.setTechParams(refreshHandler.buildTechParamByEquList(record, esEquipmentInfo.getEQU_LIST_CODE()));
if ("8000".equals(esEquipmentInfo.getEQU_LIST_CODE())) {
List<ESEquipmentInfo.TechParam> techParams = esEquipmentInfo.getTechParams();
List<ESEquipmentInfo.TechParam> pipeLength = techParams.stream().filter(e -> e.getParamKey().equals("pipeLength") && e.getDoubleValue() != null).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(pipeLength)) {
esEquipmentInfo.setPipeLength(pipeLength.get(0).getDoubleValue());
}
}
} catch (Exception e) {
// 异常数据跳过
log.error("准备新设备索引数据失败:{}", record, e);
}
return esEquipmentInfo;
}).collect(Collectors.toList());
esEquipmentInfos.remove(null);
esEquipmentInfos = esEquipmentInfos.stream().filter(e -> StringUtils.isNotEmpty(e.getSEQUENCE_NBR())).collect(Collectors.toList());
return esEquipmentInfos;
}
private List<ESEquipmentCategoryDto> getEsEquipmentCategoryDtos(List<String> refreshRecords, Map<String, Map<String, Object>> recordDetailMap) {
List<ESEquipmentCategoryDto> esEquipmentCategoryDtos = refreshRecords.parallelStream().map(record -> {
ESEquipmentCategoryDto esEquipmentInfo = null;
Optional<ESEquipmentCategoryDto> op = equipmentCategory.findById(record);
try {
if (op.isPresent()) { // 存在更新
IdxBizJgSupervisionInfo supervisionInfo = idxBizJgSupervisionInfoMapper.selectOne(new LambdaQueryWrapper<IdxBizJgSupervisionInfo>().eq(IdxBizJgSupervisionInfo::getRecord, record)
.select(IdxBizJgSupervisionInfo::getRecord, IdxBizJgSupervisionInfo::getOrgBranchCode, IdxBizJgSupervisionInfo::getOrgBranchName));
esEquipmentInfo = op.get();
if (supervisionInfo != null && StringUtils.isNotEmpty(supervisionInfo.getOrgBranchCode()) && JsonDiffUtil.isNullOrEmpty(esEquipmentInfo.getOrgBranchCode())) {
esEquipmentInfo.setOrgBranchCode(supervisionInfo.getOrgBranchCode());
esEquipmentInfo.setORG_BRANCH_NAME(supervisionInfo.getOrgBranchName());
}
} else { // 不存在创建
Map<String, Object> detail = recordDetailMap.get(record);
esEquipmentInfo = new ESEquipmentCategoryDto();
StatisticsDataUpdateService.formatUseDate(detail);
BeanUtil.copyProperties(detail, esEquipmentInfo, true);
}
} catch (Exception e) {
log.error("准备老设备索引数据失败:{}", record, e);
}
return esEquipmentInfo;
}).collect(Collectors.toList());
esEquipmentCategoryDtos.remove(null);
return esEquipmentCategoryDtos;
}
@Override
protected Map<String, Object> buildFilter(Map<String, Object> params) {
return params;
}
@Override
protected void beforePatching(String record) {
}
@Override
protected void patchSingleRecord(String record) {
}
@Override
protected void afterPatching(String record) {
}
}
......@@ -5,7 +5,7 @@ import com.yeejoin.amos.boot.biz.common.entity.TzsBaseEntity;
import com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher.BatchDataPatcher;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.patcher.FullBatchDataPatcher;
import com.yeejoin.amos.boot.module.jg.biz.service.impl.IdxBizJgRegisterInfoServiceImpl;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgRegisterInfo;
import lombok.extern.slf4j.Slf4j;
......@@ -19,14 +19,14 @@ import java.util.Optional;
*/
@Component
@Slf4j
public class WeatherTankFieldPatcher extends BatchDataPatcher {
public class WeatherTankFieldPatcherFull extends FullBatchDataPatcher {
private final IdxBizJgRegisterInfoServiceImpl registerInfoService;
private final ESEquipmentCategory equipmentCategory;
protected WeatherTankFieldPatcher(ApplicationContext applicationContext, IdxBizJgRegisterInfoServiceImpl registerInfoService, ESEquipmentCategory equipmentCategory) {
protected WeatherTankFieldPatcherFull(ApplicationContext applicationContext, IdxBizJgRegisterInfoServiceImpl registerInfoService, ESEquipmentCategory equipmentCategory) {
super(applicationContext);
this.registerInfoService = registerInfoService;
this.equipmentCategory = equipmentCategory;
......
......@@ -168,11 +168,11 @@ public class ChangeEquipImpactCertListener {
manage.setReceiveCompanyCode(afterValue);
manage.setReceiveOrgName(CommonCustomConverter.CompanyCodeConverter.getNameByCode(afterValue));
break;
case "fillingMedium":
case "chargingMedium":
manage.setFillingMedium(
Arrays.stream(manage.getFillingMedium().split(","))
.map(String::trim)
.map(v -> v.equals(beforeValue) ? afterValue : v)
.map(v -> v.equals(meta.getDisplayOldValue()) ? meta.getDisplayNewValue() : v)
.distinct()
.collect(Collectors.joining(","))
);
......
......@@ -21,6 +21,8 @@ public class FormatService {
private final CbDataDictTypeHandler dataDictTypeHandler;
private final PlatformDictTypeHandler platformDictTypeHandler;
private final Map<String, TypeHandler<String>> handlerCache = new ConcurrentHashMap<>();
@Value("${type-handler.default:defaultTypeHandler}")
......@@ -28,9 +30,15 @@ public class FormatService {
public String format(FieldDisplayDefine displayDefine, String value) {
// 字典优先
if (StringUtils.isNotEmpty(displayDefine.dictCode())) {
// 1.1兼容之前的业务字典,有字典配置时默认是业务字典
if(StringUtils.isNotEmpty(displayDefine.dictCode()) && (displayDefine.dictType().equals(FieldDisplayDefine.DictType.no) || displayDefine.dictType().equals(FieldDisplayDefine.DictType.cb))){
return dataDictTypeHandler.handle(displayDefine.dictCode(), value);
}
// 1.2平台字典
if(StringUtils.isNotEmpty(displayDefine.dictCode()) && displayDefine.dictType().equals(FieldDisplayDefine.DictType.platform)){
return platformDictTypeHandler.handle(displayDefine.dictCode(), value);
}
// 其次是自定义的处理器
try {
TypeHandler<String> handler = handlerCache.computeIfAbsent(displayDefine.typeHandler(),
......
......@@ -28,7 +28,8 @@ public class PlatformDictTypeHandler implements DictTypeHandler {
if (StringUtils.isEmpty(dictCode)){
return null;
}
return cache.computeIfAbsent(dictCode, k -> {
String key = dictType + "_" + dictCode;
return cache.computeIfAbsent(key, k -> {
try {
List<DictionarieValueModel> result = Systemctl.dictionarieClient.dictValues(dictType).getResult();
......
......@@ -88,11 +88,11 @@ public class EquipmentRefreshHandler implements IDataRefreshHandler {
}
private List<ESEquipmentInfo.TechParam> buildTechParamByEquList(String record, String equListCode) {
public List<ESEquipmentInfo.TechParam> buildTechParamByEquList(String record, String equListCode) {
return StringUtils.isNotEmpty(equListCode) ? statisticsDataUpdateService.getTechParams(equListCode, record) : new ArrayList<>();
}
private LocalDate getIssueDate(String useRegistrationCode) {
public LocalDate getIssueDate(String useRegistrationCode) {
if (StringUtils.isEmpty(useRegistrationCode)) {
return null;
}
......
......@@ -27,10 +27,7 @@ import com.yeejoin.amos.boot.module.common.api.dao.EsBaseEnterpriseInfoDao;
import com.yeejoin.amos.boot.module.common.api.dao.EsEquipmentDao;
import com.yeejoin.amos.boot.module.common.api.dao.EsUserInfoDao;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.entity.ESEquipmentInfo;
import com.yeejoin.amos.boot.module.common.api.entity.EsBaseEnterpriseInfo;
import com.yeejoin.amos.boot.module.common.api.entity.EsUserInfo;
import com.yeejoin.amos.boot.module.common.api.entity.TzsUserPermission;
import com.yeejoin.amos.boot.module.common.api.entity.*;
import com.yeejoin.amos.boot.module.common.api.enums.CylinderTypeEnum;
import com.yeejoin.amos.boot.module.common.biz.refresh.cm.RefreshCmService;
import com.yeejoin.amos.boot.module.common.biz.service.impl.EsSearchServiceImpl;
......@@ -44,8 +41,9 @@ import com.yeejoin.amos.boot.module.jg.api.enums.BusinessTypeEnum;
import com.yeejoin.amos.boot.module.jg.api.enums.PipelineEnum;
import com.yeejoin.amos.boot.module.jg.api.enums.SafetyProblemTypeEnum;
import com.yeejoin.amos.boot.module.jg.api.mapper.*;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.FilterableEquipInsert2EsPatcher;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.ReceiveOrgFixService;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.WeatherTankFieldPatcher;
import com.yeejoin.amos.boot.module.jg.biz.data.fix.service.WeatherTankFieldPatcherFull;
import com.yeejoin.amos.boot.module.jg.biz.event.publisher.EventPublisher;
import com.yeejoin.amos.boot.module.jg.biz.feign.TzsServiceFeignClient;
import com.yeejoin.amos.boot.module.jg.biz.handler.strategy.ProblemHandleStrategy;
......@@ -68,9 +66,12 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
......@@ -150,7 +151,7 @@ public class DataHandlerServiceImpl {
private final JgChangeVehicleRegistrationUnitMapper jgChangeVehicleRegistrationUnitMapper;
private final JgChangeRegistrationTransferMapper jgChangeRegistrationTransferMapper;
private final JgUseRegistrationManageServiceImpl jgUseRegistrationManageServiceImpl;
private final JgChangeRegistrationReformMapper jgChangeRegistrationReformMapper;
private final JgChangeRegistrationReformMapper jgChangeRegistrationReformMapper;
private final JgReformNoticeMapper jgReformNoticeMapper;
......@@ -176,6 +177,8 @@ public class DataHandlerServiceImpl {
private final IdxBizJgRegisterInfoMapper registerInfoMapper;
private final IdxBizJgSupervisionInfoServiceImpl idxBizJgSupervisionInfoServiceImpl;
@Value("${jyjc.open.online: true}")
private Boolean onlineJyjc;
......@@ -187,8 +190,9 @@ public class DataHandlerServiceImpl {
private final RestHighLevelClient restHighLevelClient;
private final WeatherTankFieldPatcher weatherTankFieldPatcher;
private final WeatherTankFieldPatcherFull weatherTankFieldPatcher;
private final FilterableEquipInsert2EsPatcher equipInsert2EsPatcher;
private final EventPublisher eventPublisher;
......@@ -200,6 +204,8 @@ public class DataHandlerServiceImpl {
private final IdxBizJgUseInfoMapper idxBizJgUseInfoMapper;
private final EsBulkService esBulkService;
/**
* 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式
*
......@@ -2341,7 +2347,7 @@ public class DataHandlerServiceImpl {
public Integer initTank2Es() {
return weatherTankFieldPatcher.patchBatchData();
return weatherTankFieldPatcher.patchBatchData(null);
}
@Transactional(rollbackFor = Exception.class)
......@@ -2508,80 +2514,81 @@ public class DataHandlerServiceImpl {
/**
* 压力管道-已完成及作废状态的单据的历史数据管道长度补充
*
* @return 是否成功
*/
public Long pipeLenFix() {
cn.hutool.core.date.StopWatch watch = new cn.hutool.core.date.StopWatch();
// 1.已完成及作废状态的安装告知
List<JgInstallationNotice> notices = installationNoticeService.list(new LambdaQueryWrapper<JgInstallationNotice>()
.and(w->w.eq(JgInstallationNotice::getNoticeStatus, FlowStatusEnum.TO_BE_FINISHED.getCode() + "")
.or().eq(JgInstallationNotice::getNoticeStatus, FlowStatusEnum.TO_BE_DISCARD.getCode() +""))
List<JgInstallationNotice> notices = installationNoticeService.list(new LambdaQueryWrapper<JgInstallationNotice>()
.and(w -> w.eq(JgInstallationNotice::getNoticeStatus, FlowStatusEnum.TO_BE_FINISHED.getCode() + "")
.or().eq(JgInstallationNotice::getNoticeStatus, FlowStatusEnum.TO_BE_DISCARD.getCode() + ""))
.eq(JgInstallationNotice::getEquListCode, EquipmentClassifityEnum.YLGD.getCode())
.select(BaseEntity::getSequenceNbr)
);
watch.start("安装告知" + notices.size());
notices.parallelStream().forEach(n->{
notices.parallelStream().forEach(n -> {
JSONObject jsonObject = commonServiceImpl.queryHistoryData(n.getSequenceNbr());
if(jsonObject != null){
if (jsonObject != null) {
JSONArray jsonArray = jsonObject.getJSONArray("deviceList");
List<JSONObject> pipelines = jsonArray.stream().map(e->{
List<JSONObject> pipelines = jsonArray.stream().map(e -> {
JSONObject pipeline = JSONObject.parseObject(e.toString());
if(!pipeline.containsKey(BizCommonConstant.PIPE_LENGTH)){
if (!pipeline.containsKey(BizCommonConstant.PIPE_LENGTH)) {
pipeline.put(BizCommonConstant.PIPE_LENGTH, pipeline.get("pipeLength"));
}
return pipeline;
}).collect(Collectors.toList());
jsonObject.put("deviceList", pipelines);
commonServiceImpl.saveOrUpdateHistory(null, jsonObject, null, n.getSequenceNbr() + "" );
commonServiceImpl.saveOrUpdateHistory(null, jsonObject, null, n.getSequenceNbr() + "");
}
});
watch.stop();
// 2.已完成及作废状态的使用登记
List<JgUseRegistration> useRegistrations = useRegistrationService.list(new LambdaQueryWrapper<JgUseRegistration>()
.and(w->w.eq(JgUseRegistration::getStatus, FlowStatusEnum.TO_BE_FINISHED.getName()).or()
.and(w -> w.eq(JgUseRegistration::getStatus, FlowStatusEnum.TO_BE_FINISHED.getName()).or()
.eq(JgUseRegistration::getStatus, FlowStatusEnum.TO_BE_DISCARD.getName()))
.ne(JgUseRegistration::getProjectContraptionId, "")
.select(BaseEntity::getSequenceNbr));
watch.start("使用登记" + useRegistrations.size() );
useRegistrations.parallelStream().forEach(u->{
watch.start("使用登记" + useRegistrations.size());
useRegistrations.parallelStream().forEach(u -> {
JSONObject jsonObject = commonServiceImpl.queryHistoryData(u.getSequenceNbr());
if(jsonObject != null){
if (jsonObject != null) {
String pipelistKey;
if(jsonObject.containsKey("equipmentLists")){
if (jsonObject.containsKey("equipmentLists")) {
pipelistKey = "equipmentLists";
} else {
pipelistKey = "pipelineList";
}
JSONArray jsonArray = jsonObject.getJSONArray(pipelistKey);
Optional.ofNullable(jsonArray).ifPresent(d->{
List<JSONObject> pipelines = d.stream().map(e->{
Optional.ofNullable(jsonArray).ifPresent(d -> {
List<JSONObject> pipelines = d.stream().map(e -> {
JSONObject pipeline = JSONObject.parseObject(e.toString());
if(!pipeline.containsKey(BizCommonConstant.PIPE_LENGTH)){
if (!pipeline.containsKey(BizCommonConstant.PIPE_LENGTH)) {
pipeline.put(BizCommonConstant.PIPE_LENGTH, pipeline.get("pipeLength"));
}
return pipeline;
}).collect(Collectors.toList());
jsonObject.put(pipelistKey, pipelines);
commonServiceImpl.saveOrUpdateHistory(null, jsonObject, null, u.getSequenceNbr() + "" );
commonServiceImpl.saveOrUpdateHistory(null, jsonObject, null, u.getSequenceNbr() + "");
});
}
});
watch.stop();
// 3.已完成及作废状态的改造变更登记
List<JgChangeRegistrationReform> changeRegistrationReforms = jgChangeRegistrationReformMapper.selectList(new LambdaQueryWrapper<JgChangeRegistrationReform>()
.and(w->w.eq(JgChangeRegistrationReform::getStatus, FlowStatusEnum.TO_BE_FINISHED.getName()).or()
.and(w -> w.eq(JgChangeRegistrationReform::getStatus, FlowStatusEnum.TO_BE_FINISHED.getName()).or()
.eq(JgChangeRegistrationReform::getStatus, FlowStatusEnum.TO_BE_DISCARD.getName()))
.ne(JgChangeRegistrationReform::getProjectContraptionId, "")
.select(JgChangeRegistrationReform::getApplyNo));
watch.start("改造变更登记" + changeRegistrationReforms.size());
changeRegistrationReforms.parallelStream().forEach(u->{
changeRegistrationReforms.parallelStream().forEach(u -> {
JSONObject jsonObject = commonServiceImpl.queryHistoryData(u.getApplyNo());
if(jsonObject != null){
if (jsonObject != null) {
JSONArray jsonArray = jsonObject.getJSONArray("equipmentLists");
Optional.ofNullable(jsonArray).ifPresent(d->{
List<JSONObject> pipelines = d.stream().map(e->{
Optional.ofNullable(jsonArray).ifPresent(d -> {
List<JSONObject> pipelines = d.stream().map(e -> {
JSONObject pipeline = JSONObject.parseObject(e.toString());
if(!pipeline.containsKey(BizCommonConstant.PIPE_LENGTH)){
if (!pipeline.containsKey(BizCommonConstant.PIPE_LENGTH)) {
pipeline.put(BizCommonConstant.PIPE_LENGTH, pipeline.get("pipeLength"));
}
return pipeline;
......@@ -2594,31 +2601,31 @@ public class DataHandlerServiceImpl {
watch.stop();
// 4.已完成及作废状态的改造告知
List<JgReformNotice> reformNotices = jgReformNoticeMapper.selectList(new LambdaQueryWrapper<JgReformNotice>()
.and(w->w.eq(JgReformNotice::getNoticeStatus, FlowStatusEnum.TO_BE_FINISHED.getCode() + "").or()
.and(w -> w.eq(JgReformNotice::getNoticeStatus, FlowStatusEnum.TO_BE_FINISHED.getCode() + "").or()
.eq(JgReformNotice::getNoticeStatus, FlowStatusEnum.TO_BE_DISCARD.getCode() + ""))
.ne(JgReformNotice::getProjectContraptionId, "")
.select(BaseEntity::getSequenceNbr));
watch.start("改造告知" + reformNotices.size());
reformNotices.parallelStream().forEach(u->{
reformNotices.parallelStream().forEach(u -> {
JSONObject jsonObject = commonServiceImpl.queryHistoryData(u.getSequenceNbr());
if(jsonObject != null){
if (jsonObject != null) {
JSONArray jsonArray = jsonObject.getJSONArray("deviceList");
Optional.ofNullable(jsonArray).ifPresent(d->{
List<JSONObject> pipelines = d.stream().map(e->{
Optional.ofNullable(jsonArray).ifPresent(d -> {
List<JSONObject> pipelines = d.stream().map(e -> {
JSONObject pipeline = JSONObject.parseObject(e.toString());
if(!pipeline.containsKey(BizCommonConstant.PIPE_LENGTH)){
if (!pipeline.containsKey(BizCommonConstant.PIPE_LENGTH)) {
pipeline.put(BizCommonConstant.PIPE_LENGTH, pipeline.get("pipeLength"));
}
return pipeline;
}).collect(Collectors.toList());
jsonObject.put("deviceList", pipelines);
commonServiceImpl.saveOrUpdateHistory(null, jsonObject, null, u.getSequenceNbr() + "" );
commonServiceImpl.saveOrUpdateHistory(null, jsonObject, null, u.getSequenceNbr() + "");
});
}
});
watch.stop();
int num = notices.size() + useRegistrations.size() + reformNotices.size() + changeRegistrationReforms.size();
log.info("压力管道业务单据补充字段pipeLengthText,总处理数量:{}, 耗时信息:{}",num, watch.prettyPrint(TimeUnit.SECONDS));
log.info("压力管道业务单据补充字段pipeLengthText,总处理数量:{}, 耗时信息:{}", num, watch.prettyPrint(TimeUnit.SECONDS));
return (long) (num);
}
......@@ -2667,7 +2674,7 @@ public class DataHandlerServiceImpl {
batchWatch.start("批次" + batchIndex.get());
int currentIndex = batchIndex.getAndIncrement();
log.info("开始处理第 {} 个批次", currentIndex);
List<Map<String,Object>> batchData = idxBizJgUseInfoMapper.queryDetailBatch(batch);
List<Map<String, Object>> batchData = idxBizJgUseInfoMapper.queryDetailBatch(batch);
List<ESEquipmentCategoryDto> esEquipmentInfos = batchData.parallelStream().map(data -> {
ESEquipmentCategoryDto esEquipmentInfo = null;
try {
......@@ -2697,7 +2704,7 @@ public class DataHandlerServiceImpl {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (!ValidationUtil.isEmpty(dataSource)) {
boolQuery = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("DATA_SOURCE", dataSource));
.must(QueryBuilders.termQuery("DATA_SOURCE", dataSource));
}
// 线程安全的结果容器
List<String> esIds = Collections.synchronizedList(new ArrayList<>());
......@@ -2729,4 +2736,75 @@ public class DataHandlerServiceImpl {
.filter(element -> !bSet.contains(element))
.collect(Collectors.toList());
}
public Long orgBranchCode2Db(String equListCode, String equCategoryCode, String orgBranchCode, Integer seq) {
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.trackTotalHits(true);
BoolQueryBuilder boolMust = QueryBuilders.boolQuery();
boolMust.must(QueryBuilders.boolQuery().must(QueryBuilders.wildcardQuery("ORG_BRANCH_CODE.keyword", QueryParser.escape(orgBranchCode) + "*")));
boolMust.must(QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("EQU_LIST_CODE", equListCode)));
if (StringUtils.isNotEmpty(equCategoryCode)) {
boolMust.must(QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("EQU_CATEGORY_CODE", equCategoryCode)));
}
// 数据库更新属地的设备总数
long totalUpdate = 0L;
try {
String version = DateUtil.today() + seq;
buildVersionFilter(version, boolMust);
CountRequest countRequest = new CountRequest();
countRequest.indices(IDX_BIZ_VIEW_JG_ALL);
countRequest.query(boolMust);
CountResponse countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
long total = countResponse.getCount();
int pageSize = 1000;
int totalPage = (int) Math.ceil((double) total / pageSize);
builder.size(pageSize);
builder.query(boolMust);
builder.sort("REC_DATE", SortOrder.DESC);
SearchRequest request = new SearchRequest();
request.indices(IDX_BIZ_VIEW_JG_ALL);
for (int i = 1; i <= totalPage; i++) {
request.source(builder);
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
List<ESEquipmentCategoryDto> esEquipmentCategoryDtos = new ArrayList<>();
for (org.elasticsearch.search.SearchHit hit : response.getHits()) {
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(hit);
ESEquipmentCategoryDto equipmentCategoryDto = JSONObject.toJavaObject(jsonObject.getJSONObject("sourceAsMap"), ESEquipmentCategoryDto.class);
Integer recordCount = useInfoService.lambdaQuery().eq(IdxBizJgUseInfo::getRecord, equipmentCategoryDto.getSEQUENCE_NBR()).count();
if (recordCount > 0) {
equipmentCategoryDto.setVersion(version);
} else {
// 数据库不存在的做标记
equipmentCategoryDto.setVersion("-1");
}
esEquipmentCategoryDtos.add(equipmentCategoryDto);
}
if (!esEquipmentCategoryDtos.isEmpty()) {
totalUpdate = esEquipmentCategoryDtos.size() + totalUpdate;
esEquipmentCategory.saveAll(esEquipmentCategoryDtos);
esBulkService.bulkUpsert(IDX_BIZ_VIEW_JG_ALL, esEquipmentCategoryDtos.stream().map(e-> new EsEntity<>(e.getSEQUENCE_NBR(), e)).collect(Collectors.toList()));
idxBizJgSupervisionInfoServiceImpl.getBaseMapper().updateOrgBranchCodeBatch(esEquipmentCategoryDtos);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return totalUpdate;
}
private static void buildVersionFilter(String version, BoolQueryBuilder boolMust) {
BoolQueryBuilder meBuilder = QueryBuilders.boolQuery();
meBuilder.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery("version", version, "-1")));
meBuilder.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery("version")));
meBuilder.minimumShouldMatch(1);
boolMust.must(meBuilder);
}
public Integer synEquipFromDb2Es(String equListCode, String equCategoryCode, String orgBranchCode) {
Map<String, Object> p = new HashMap<>();
p.put("equListCode", equListCode);
p.put("equCategoryCode", equCategoryCode);
p.put("orgBranchCode", orgBranchCode);
return equipInsert2EsPatcher.patchBatchData(p);
}
}
package com.yeejoin.amos.boot.module.jg.biz.service.impl;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.boot.module.common.api.entity.EsEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
@Component
......@@ -22,15 +21,18 @@ public class EsBulkService {
private final RestHighLevelClient restHighLevelClient;
private final ElasticsearchConverter converter; // 依赖注入
public <T> void bulkUpsert(String index, List<EsEntity<T>> list) {
BulkRequest request = new BulkRequest();
list.forEach(item -> {
Map<String, Object> docData = converter.mapObject(item.getData());
request.add(new UpdateRequest(index, item.getId())
.doc(JSON.toJSONString(item.getData()), XContentType.JSON)
.upsert(JSON.toJSONString(item.getData()), XContentType.JSON));
.doc(docData)
.upsert(docData));
});
try {
// request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("批量写入数据失败:{}", e.getMessage(), e);
......
......@@ -4808,18 +4808,14 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
}
batchInsert(idxBizJgUseInfoMapper, useInfoList, "使用信息");
batchInsert(idxBizJgRegisterInfoMapper, registerInfoList, "注册信息");
List<CompletableFuture<Void>> futures = new ArrayList<>();
futures.add(CompletableFuture.runAsync(() -> batchInsert(idxBizJgSupervisionInfoMapper, supervisionInfoList, "监督信息"), executor));
futures.add(CompletableFuture.runAsync(() -> batchInsert(idxBizJgDesignInfoMapper, designInfoList, "设计信息"), executor));
futures.add(CompletableFuture.runAsync(() -> batchInsert(idxBizJgFactoryInfoMapper, factoryInfoList, "制造信息"), executor));
futures.add(CompletableFuture.runAsync(() -> batchInsert(otherInfoMapper, otherInfoList, "其他信息"), executor));
futures.add(CompletableFuture.runAsync(() -> batchInsert(idxBizJgTechParamsVesselMapper, paramsVesselList, "容器参数信息"), executor));
futures.add(CompletableFuture.runAsync(() -> batchInsert(idxBizJgInspectionDetectionInfoMapper, inspectionDetectionInfoList, "检验检测信息"), executor));
futures.add(CompletableFuture.runAsync(() -> batchInsert(certificateChangeRecordEqMapper, jgCertificateChangeRecordEqList, "登记证关系信息"), executor));
futures.add(CompletableFuture.runAsync(() -> esEquipmentCategory.saveAll(esEquipmentCategoryList), executor));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
batchInsert(idxBizJgSupervisionInfoMapper, supervisionInfoList, "监督信息");
batchInsert(idxBizJgDesignInfoMapper, designInfoList, "设计信息");
batchInsert(idxBizJgFactoryInfoMapper, factoryInfoList, "制造信息");
batchInsert(otherInfoMapper, otherInfoList, "其他信息");
batchInsert(idxBizJgTechParamsVesselMapper, paramsVesselList, "容器参数信息");
batchInsert(idxBizJgInspectionDetectionInfoMapper, inspectionDetectionInfoList, "检验检测信息");
batchInsert(certificateChangeRecordEqMapper, jgCertificateChangeRecordEqList, "登记证关系信息");
esEquipmentCategory.saveAll(esEquipmentCategoryList);
// 使用事务同步回调确保事件在事务提交后发送
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
......@@ -4836,8 +4832,6 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
});
return String.format("导入完成,成功导入: %d 条数据!", useInfoList.size());
}
// 定义线程池,CPU 核数 * 2,避免阻塞主线程
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
/** 通用批量插入方法 */
public <T> void batchInsert(CustomBaseMapper<T> mapper, List<T> list, String name) {
......
......@@ -33,6 +33,7 @@ import com.yeejoin.amos.boot.module.jg.biz.context.FlowingEquipRedisContext;
import com.yeejoin.amos.boot.module.jg.biz.edit.permission.FillingEditPermForCurrentUser;
import com.yeejoin.amos.boot.module.jg.biz.event.publisher.EventPublisher;
import com.yeejoin.amos.boot.module.jg.biz.feign.TzsServiceFeignClient;
import com.yeejoin.amos.boot.module.jg.biz.utils.CodeUtil;
import com.yeejoin.amos.boot.module.ymt.api.entity.*;
import com.yeejoin.amos.boot.module.ymt.api.enums.ApplicationFormTypeEnum;
import com.yeejoin.amos.boot.module.ymt.api.enums.EquimentEnum;
......@@ -135,6 +136,8 @@ public class JgChangeVehicleRegistrationUnitServiceImpl extends BaseService<JgCh
@Autowired
private JgResumeInfoServiceImpl jgResumeInfoService;
@Autowired
private CodeUtil codeUtil;
public void changeData(JgChangeVehicleRegistrationUnit dto, CompanyBo company) {
if (!ObjectUtils.isEmpty(dto.getReceiveCompanyCode())) {
......@@ -589,8 +592,8 @@ public class JgChangeVehicleRegistrationUnitServiceImpl extends BaseService<JgCh
jgCertificateChangeRecord.setRoutePath(taskV2Model.getRoutePath());
if ("0".equals(jgChangeVehicleRegistrationUnit.getChangeType()) && !ObjectUtils.isEmpty(collect)) {
// 区外变更
// 生成使用登记证编号
String receiveCompanyCode = jgChangeVehicleRegistrationUnit.getReceiveCompanyCode();
// 生成使用登记证编号,根据行政审批局单位code获取行政区划code
String receiveCompanyCode = codeUtil.getCityRegionCode(jgChangeVehicleRegistrationUnit.getReceiveCompanyCode());
CompanyModel receiveCompanyResult = Privilege.companyClient.queryByCompanyCode(receiveCompanyCode).getResult();
//查询到局级
// 如果不是局级公司,则查询其上级公司信息
......
......@@ -457,7 +457,11 @@ public class JyjcInspectionResultServiceImpl extends BaseService<JyjcInspectionR
* @return null or 非空的字符串
*/
private String nullIfNullOrEmpty(String str) {
return CompareUtils.isNullOrEmpty(str) ? null : str;
return CompareUtils.isNullOrEmpty(str) ? null : trimNoIllegalChar(str);
}
private String trimNoIllegalChar(String str) {
return "\\".equals(str) || "/".equals(str) ? null : str;
}
private String getTableName(String paramType) {
......
package com.yeejoin.amos.boot.module.ymt.api.mapper;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.mapper.CustomBaseMapper;
import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgSupervisionInfo;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 监督管理信息表 Mapper 接口
......@@ -11,4 +15,5 @@ import com.yeejoin.amos.boot.module.ymt.api.entity.IdxBizJgSupervisionInfo;
*/
public interface IdxBizJgSupervisionInfoMapper extends CustomBaseMapper<IdxBizJgSupervisionInfo> {
void updateOrgBranchCodeBatch(@Param("dtos") List<ESEquipmentCategoryDto> esEquipmentCategoryDtos);
}
......@@ -29,6 +29,8 @@ public interface IdxBizJgUseInfoMapper extends CustomBaseMapper<IdxBizJgUseInfo>
Integer selectMaxVersion();
Integer selectMaxVersionWithParams(@Param("params") Map<String, Object> params);
void updateDataQualityScoreBatch(@Param("equips") List<EquipWaitRefreshDataQualityScore> refreshDataQualityScores, @Param("version") int version);
void updateVersionBatch(@Param("records") List<String> records, @Param("version") int version);
......@@ -52,4 +54,6 @@ public interface IdxBizJgUseInfoMapper extends CustomBaseMapper<IdxBizJgUseInfo>
List<IdxBizJgOtherInfo> selectOneMockRecord(@Param("equList") String equList, @Param("equCategory") String equCategory, @Param("equDefine") String equDefine, @Param("useUnitCode") String useUnitCode, @Param("size") Integer size);
List<String> selectEquipsClaimStatus();
List<String> selectUseInfoOfOneVersionWithParams(@Param("version") Integer version,@Param("params") Map<String, Object> params);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yeejoin.amos.boot.module.ymt.api.mapper.IdxBizJgSupervisionInfoMapper">
<update id="updateOrgBranchCodeBatch">
UPDATE idx_biz_jg_supervision_info SET
"ORG_BRANCH_CODE" = temp_table."ORG_BRANCH_CODE",
"ORG_BRANCH_NAME" = temp_table."ORG_BRANCH_NAME"
FROM (
VALUES
<foreach collection="dtos" separator="," item="dto" index="index">
(#{dto.orgBranchCode}, #{dto.ORG_BRANCH_NAME}, #{dto.SEQUENCE_NBR})
</foreach>
) AS temp_table("ORG_BRANCH_CODE", "ORG_BRANCH_NAME", "RECORD")
WHERE idx_biz_jg_supervision_info."RECORD" = temp_table."RECORD"
</update>
</mapper>
......@@ -59,11 +59,53 @@
ui.VERSION <![CDATA[ <> ]]> #{version} or ui.VERSION is null
limit 10000
</select>
<select id="selectUseInfoOfOneVersionWithParams" resultType="java.lang.String">
SELECT
u.record
from
"idx_biz_jg_use_info" u,
"idx_biz_jg_supervision_info" s,
idx_biz_jg_register_info r
where
(u.VERSION <![CDATA[ <> ]]> #{version} or u.VERSION is null)
and u."RECORD" = s."RECORD"
and s."RECORD" = r."RECORD"
<if test="params.equListCode != null and params.equListCode != ''">
and r."EQU_LIST" = #{params.equListCode}
</if>
<if test="params.equCategoryCode != null and params.equCategoryCode != ''">
and r."EQU_CATEGORY" = #{params.equCategoryCode}
</if>
<if test="params.orgBranchCode != null and params.orgBranchCode != ''">
and s."ORG_BRANCH_CODE" like concat(#{params.orgBranchCode},'%')
</if>
limit 1000
</select>
<select id="selectMaxVersion" resultType="java.lang.Integer">
SELECT
COALESCE(MAX(version),0) as version
FROM "idx_biz_jg_use_info"
</select>
<select id="selectMaxVersionWithParams" resultType="java.lang.Integer">
SELECT
COALESCE(MAX(version),0) as version
FROM
"idx_biz_jg_use_info" u,
"idx_biz_jg_supervision_info" s,
idx_biz_jg_register_info r
where
u."RECORD" = s."RECORD"
and s."RECORD" = r."RECORD"
<if test="params.equListCode != null and params.equListCode != ''">
and r."EQU_LIST" = #{params.equListCode}
</if>
<if test="params.equCategoryCode != null and params.equCategoryCode != ''">
and r."EQU_CATEGORY" = #{params.equCategoryCode}
</if>
<if test="params.orgBranchCode != null and params.orgBranchCode != ''">
and s."ORG_BRANCH_CODE" like concat(#{params.orgBranchCode},'%')
</if>
</select>
<select id="queryDetail" resultType="java.util.Map">
<include refid="equip-detail-es"/>
WHERE
......@@ -78,9 +120,16 @@
WHERE ibjui."RECORD" = #{record}
</select>
<update id="updateVersionBatch">
<foreach collection="records" separator=";" item="record" open="" close="">
UPDATE idx_biz_jg_use_info SET "VERSION"=#{version} WHERE record = #{record}
</foreach>
UPDATE
idx_biz_jg_use_info
SET
"VERSION"=#{version}
WHERE
record = ANY(ARRAY[
<foreach collection="records" separator="," item="record" open="" close="">
#{record}
</foreach>
])
</update>
<sql id="equip-detail-es">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment