Commit 32ec6a48 authored by tianbo's avatar tianbo

refactor(tcm): 优化监管机构数据同步和设备信息更新逻辑

- 重构了设备类别服务接口和实现类,增加了按监管机构代码前缀查询设备的功能- 优化了监管机构数据同步时设备信息更新的逻辑,提高了数据更新效率 - 修正了设备类别 DTO 中监管机构代码字段的注解,支持 keyword 类型查询
parent 3d9582f9
......@@ -20,4 +20,14 @@ public class TZSCommonConstant {
public static final String STREET = "STREET";
public static final String PLATFORM_FEIGN_RESULT_KEY_COMPANY = "compnay";
public static final String REGULATOR_UNIT_TREE = "REGULATOR_UNIT_TREE";
/**
* 接收机构redis缓存key前置
*/
public static final String PREFIX_NOTICE_RECEIVE_UNIT_TREE = "NOTICE_RECEIVE_UNIT_TREE";
// 管辖机构redis缓存key
public static final String REGULATOR_UNIT_TREE_ALL = "REGULATOR_UNIT_TREE_ALL";
// 行政审批局redis缓存key
public static final String ADMINISTRATION_UNIT_TREE = "ADMINISTRATION_UNIT_TREE";
}
......@@ -2,6 +2,7 @@ package com.yeejoin.amos.boot.module.common.api.dao;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import org.springframework.data.elasticsearch.annotations.Query;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.stereotype.Repository;
......@@ -10,4 +11,8 @@ import java.util.List;
@Repository
public interface ESEquipmentCategory extends PagingAndSortingRepository<ESEquipmentCategoryDto, String> {
List<ESEquipmentCategoryDto> findAllByProjectContraptionId(String PROJECT_CONTRAPTION_ID);
@Query("{\"prefix\": {\"ORG_BRANCH_CODE.keyword\": \"?0\"}}")
List<ESEquipmentCategoryDto> findByOrgBranchCodeKeywordStartingWith(String orgBranchCodePrefix);
}
......@@ -3,9 +3,7 @@ package com.yeejoin.amos.boot.module.common.api.dto;
import lombok.Data;
import lombok.experimental.Accessors;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.*;
/**
* @Author cpp
......@@ -23,8 +21,13 @@ public class ESEquipmentCategoryDto {
@Field(type = FieldType.Text)
private String ORG_BRANCH_NAME;
@Field(type = FieldType.Text)
private String ORG_BRANCH_CODE;
@MultiField(
mainField = @Field(type = FieldType.Text, name = "ORG_BRANCH_CODE"),
otherFields = {
@InnerField(suffix = "keyword", type = FieldType.Keyword)
}
)
private String orgBranchCode;
@Field(type = FieldType.Text)
private String USE_UNIT_NAME;
......
......@@ -382,7 +382,7 @@ public class EquipChangeDataUpdateServiceImpl {
if (optional.isPresent()) {
ESEquipmentCategoryDto esEquipmentCategoryDto = optional.get();
if(equipSupervisionInfoChangeDataDto.getOrgBranchCode() != null) {
esEquipmentCategoryDto.setORG_BRANCH_CODE(equipSupervisionInfoChangeDataDto.getOrgBranchCode());
esEquipmentCategoryDto.setOrgBranchCode(equipSupervisionInfoChangeDataDto.getOrgBranchCode());
esEquipmentCategoryDto.setORG_BRANCH_NAME(equipSupervisionInfoChangeDataDto.getOrgBranchName());
esEquipmentCategory.save(esEquipmentCategoryDto);
}
......
......@@ -2234,7 +2234,7 @@ public class DataDockServiceImpl {
esEquipmentDto.setUSE_PLACE(paramsDto.getProvinceName() + "/" + paramsDto.getCityName() + "/" + paramsDto.getCountyName() + "/" + paramsDto.getStreetName());
esEquipmentDto.setUSE_PLACE_CODE(paramsDto.getProvince() + "#" + paramsDto.getCity() + "#" + paramsDto.getCounty() + "#" + paramsDto.getStreet());
esEquipmentDto.setUSE_SITE_CODE(paramsDto.getUsePlace());
esEquipmentDto.setORG_BRANCH_CODE(paramsDto.getOrgBranchCode());
esEquipmentDto.setOrgBranchCode(paramsDto.getOrgBranchCode());
esEquipmentDto.setORG_BRANCH_NAME(paramsDto.getOrgBranchName());
esEquipmentDto.setEQU_STATE(1);
esEquipmentDto.setEQU_CODE(paramsDto.getEquCode());
......
......@@ -2036,7 +2036,7 @@ public class DataHandlerServiceImpl {
}
List<Map<String, Object>> updateList = esResults.stream()
.filter(dto -> StringUtils.isNotEmpty(dto.getORG_BRANCH_CODE())
.filter(dto -> StringUtils.isNotEmpty(dto.getOrgBranchCode())
&& StringUtils.isNotEmpty(dto.getSEQUENCE_NBR())
)
.map(this::buildUpdateMap)
......@@ -2061,9 +2061,9 @@ public class DataHandlerServiceImpl {
private Map<String, Object> buildUpdateMap(ESEquipmentCategoryDto dto) {
Map<String, Object> map = new HashMap<>();
map.put("record", dto.getSEQUENCE_NBR());
map.put("orgBranchCode", dto.getORG_BRANCH_CODE());
map.put("orgBranchCode", dto.getOrgBranchCode());
map.put("orgBranchName", dto.getORG_BRANCH_NAME());
log.info("==========>record,{},orgBranchCode,{},orgBranchName,{}", dto.getSEQUENCE_NBR(), dto.getORG_BRANCH_CODE(), dto.getORG_BRANCH_NAME());
log.info("==========>record,{},orgBranchCode,{},orgBranchName,{}", dto.getSEQUENCE_NBR(), dto.getOrgBranchCode(), dto.getORG_BRANCH_NAME());
return map;
}
......
......@@ -4472,7 +4472,7 @@ public class IdxBizJgRegisterInfoServiceImpl extends BaseService<IdxBizJgRegiste
esEquipmentDto.setEQU_LIST(equipInfoDto.getEquList());
esEquipmentDto.setEQU_DEFINE_CODE(equipInfoDto.getEquDefineCode());
esEquipmentDto.setSUPERVISORY_CODE(otherInfo.getSupervisoryCode());
esEquipmentDto.setORG_BRANCH_CODE(orgBranchCode);
esEquipmentDto.setOrgBranchCode(orgBranchCode);
esEquipmentDto.setORG_BRANCH_NAME(orgBranchName);
esEquipmentDto.setEQU_DEFINE(equipInfoDto.getEquDefine());
esEquipmentDto.setINFORMATION_SITUATION(otherInfo.getInformationSituation());
......
......@@ -1969,7 +1969,7 @@ public class JgInstallationNoticeServiceImpl extends BaseService<JgInstallationN
ESEquipmentCategoryDto esEquipmentCategoryDto = optional.get();
esEquipmentCategoryDto.setUSE_UNIT_CREDIT_CODE(null);
esEquipmentCategoryDto.setUSE_UNIT_NAME(null);
esEquipmentCategoryDto.setORG_BRANCH_CODE(null);
esEquipmentCategoryDto.setOrgBranchCode(null);
esEquipmentCategoryDto.setORG_BRANCH_NAME(null);
esEquipmentCategoryDto.setSUPERVISORY_CODE(null);
esEquipmentCategoryDto.setCODE96333(null);
......
......@@ -3826,7 +3826,7 @@ public class JgUseRegistrationServiceImpl extends BaseService<JgUseRegistrationD
esEquipmentCategoryDto.setUSE_ORG_CODE(null);
//esEquipmentCategoryDto.setSTATUS(null);
esEquipmentCategoryDto.setEQU_STATE(null);
esEquipmentCategoryDto.setORG_BRANCH_CODE(null);
esEquipmentCategoryDto.setOrgBranchCode(null);
esEquipmentCategoryDto.setORG_BRANCH_NAME(null);
if (jgConstructionInfo != null) {
esEquipmentCategoryDto.setUSC_UNIT_NAME(jgConstructionInfo.getUscUnitName());
......
......@@ -1608,7 +1608,7 @@ public class JgVehicleInformationServiceImpl extends BaseService<JgVehicleInform
esEquipmentCategoryDto.setUSE_ORG_CODE(null);
esEquipmentCategoryDto.setSTATUS(null);
esEquipmentCategoryDto.setEQU_STATE(null);
esEquipmentCategoryDto.setORG_BRANCH_CODE(null);
esEquipmentCategoryDto.setOrgBranchCode(null);
esEquipmentCategoryDto.setORG_BRANCH_NAME(null);
esEquipmentCategory.save(esEquipmentCategoryDto);
}
......
......@@ -11,7 +11,7 @@ import lombok.Getter;
@AllArgsConstructor
public enum PlatformOpMethodTypeEnum {
CREATE("create", "创建"),
INSERT("insert", "插入"),
UPDATE("update", "更新"),
DELETE("delete", "删除");
......
......@@ -99,4 +99,6 @@ public interface TzBaseEnterpriseInfoMapper extends BaseMapper<TzBaseEnterpriseI
void updateRedundantSupervisionOrgCodeUnit(String newOrgName, String newOrgCode, String oldOrgCode);
void updateRedundantSupervisionOrgCodeStatistics(String newOrgCode, String oldOrgCode);
void updateEquipmentSupervisionOrgCode(String newOrgCode, String newSupervisionName, List<String> equipmentList);
}
......@@ -50,5 +50,5 @@ public interface IEquipmentCategoryService {
*/
List<Map<String, Object>> unitEquipTree();
void deleteRegulatorUnitTree();
void deleteAllRegulatorUnitTree();
}
......@@ -421,4 +421,14 @@
set supervisory_unit_org_code = replace(supervisory_unit_org_code, #{oldOrgCode}, #{newOrgCode})
where supervisory_unit_org_code like concat(#{oldOrgCode}, '%');
</update>
<update id="updateEquipmentSupervisionOrgCode">
update idx_biz_jg_supervision_info
set "ORG_BRANCH_CODE" = #{newOrgCode},
"ORG_BRANCH_NAME" = #{newSupervisionName}
where "RECORD" in
<foreach collection ='equipmentList' item='record' open="(" close= ")" separator=",">
#{record}
</foreach>
</update>
</mapper>
......@@ -79,6 +79,7 @@ public class PlatformUserTopicMessage extends EmqxListener {
}
protected void processOrgCodeUpdateMessage(MqttMessage message) {
log.info("平台推送组织架构层级变动消息开始: " + message);
try {
if (ValidationUtil.isEmpty(message)) {
return;
......@@ -88,9 +89,10 @@ public class PlatformUserTopicMessage extends EmqxListener {
String oldOrgCode = jsonObject.getString("oldOrgCode");
tzBaseEnterpriseInfoService.refreshCompanyOrgCode(jsonObject, newOrgCode, oldOrgCode);
} catch (Exception e) {
log.info("平台同步消息失败:{}", e.getMessage());
log.info("处理平台推送组织架构层级变动消息失败:{}", e.getMessage());
e.printStackTrace();
}
log.info("平台推送组织架构层级变动消息结束");
}
protected void processOpMessage(MqttMessage message) {
......@@ -116,7 +118,7 @@ public class PlatformUserTopicMessage extends EmqxListener {
if (!ValidationUtil.isEmpty(dataResult) && StringUtils.isNotEmpty(path)) {
if (path.contains("company")) {
if ("监管机构".equals(dataResult.get("companyType"))) {
equipmentCategoryService.deleteRegulatorUnitTree();
equipmentCategoryService.deleteAllRegulatorUnitTree();
}
tzBaseEnterpriseInfoService.refreshCompanyInfo(dataResult, method);
} else if (path.contains("agencyuser")) {
......
......@@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static com.alibaba.fastjson.JSON.toJSONString;
import static com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant.*;
/**
* 装备分类服务实现类
......@@ -87,8 +88,7 @@ public class EquipmentCategoryServiceImpl extends BaseService<EquipmentCategoryD
final static String NOT_CREATE = "0";
@Autowired
private static final String TABLENAME = "tableName";
//管辖机构redis缓存key
private static final String REGULATOR_UNIT_TREE = "REGULATOR_UNIT_TREE";
//行政区划redis缓存key
private static final String PROVINCE = "PROVINCE";
private static final String CITY = "CITY";
......@@ -310,8 +310,11 @@ public class EquipmentCategoryServiceImpl extends BaseService<EquipmentCategoryD
/**
* 删除缓存中管辖机构树
*/
public void deleteRegulatorUnitTree() {
redisUtils.del(REGULATOR_UNIT_TREE);
public void deleteAllRegulatorUnitTree() {
redisUtils.getAndDeletePatternKeys(REGULATOR_UNIT_TREE + "*");
redisUtils.getAndDeletePatternKeys(PREFIX_NOTICE_RECEIVE_UNIT_TREE + "*");
redisUtils.getAndDeletePatternKeys(REGULATOR_UNIT_TREE_ALL);
redisUtils.getAndDeletePatternKeys(ADMINISTRATION_UNIT_TREE);
}
/**
......
......@@ -1416,6 +1416,7 @@ public class TzBaseEnterpriseInfoServiceImpl
// 如果消息中的单位是企业
if (!companyBo.getCompanyType().equals("监管机构")) {
switch (Objects.requireNonNull(PlatformOpMethodTypeEnum.getEnumByCode(method))) {
case CREATE:
case INSERT:
case DELETE:
// 暂无操作
......@@ -1453,6 +1454,7 @@ public class TzBaseEnterpriseInfoServiceImpl
@Transactional
@Async
public void refreshCompanyOrgCode(JSONObject dataResult, String newOrgCode, String oldOrgCode) {
log.info("开始刷新单位orgCode:{}", dataResult);
try {
if (!ValidationUtil.isEmpty(newOrgCode) && !ValidationUtil.isEmpty(oldOrgCode) && !newOrgCode.equals(oldOrgCode)) {
JSONObject newModel = (JSONObject) dataResult.get("newModel");
......@@ -1465,7 +1467,7 @@ public class TzBaseEnterpriseInfoServiceImpl
// 3.2 更新业务表统计表冗余的监管单位orgCode
tzBaseEnterpriseInfoMapper.updateRedundantSupervisionOrgCodeStatistics(newOrgCode, oldOrgCode);
// 4. 更新idx_biz_view_jg_all es设备信息
updateEquipmentJgAllEs(newOrgCode, oldOrgCode, equipmentRecordList);
updateEquipmentJgAllEs(newOrgCode, oldOrgCode, equipmentRecordList, newModel.getString("companyName"));
// 5. 发送数据刷新事件 - 内存分页处理
// 处理企业数据
publishDataRefreshEvents(companySeqList, DataRefreshEvent.DataType.enterprise, 1000);
......@@ -1476,33 +1478,61 @@ public class TzBaseEnterpriseInfoServiceImpl
willInfo("refreshCompanyOrgCode", dataResult, "orgCodeUpdate", e);
log.error("刷新单位orgCode发生异常", e);
}
log.info("刷新单位orgCode结束");
}
private void updateEquipmentJgAllEs(String newOrgCode, String oldOrgCode, List<String> equipmentRecordList) {
if (ValidationUtil.isEmpty(equipmentRecordList)) {
return;
private void updateEquipmentJgAllEs(String newOrgCode, String oldOrgCode, List<String> equipmentRecordList, String supervisionName) {
log.info("开始更新idx_biz_view_jg_all es设备属地监管部门信息,新orgCode{}, 旧orgCode:{}", newOrgCode, oldOrgCode);
if (!ValidationUtil.isEmpty(equipmentRecordList)) {
int batchSize = 1000;
for (int i = 0; i < equipmentRecordList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, equipmentRecordList.size());
List<String> subList = equipmentRecordList.subList(i, endIndex);
Iterable<ESEquipmentCategoryDto> equipmentCategoryIterator = esEquipmentCategory.findAllById(subList);
List<ESEquipmentCategoryDto> updatedCategories = new ArrayList<>();
equipmentCategoryIterator.forEach(equipmentCategory -> {
if (ValidationUtil.isEmpty(equipmentCategory.getOrgBranchCode())) {
equipmentCategory.setOrgBranchCode(newOrgCode);// 补偿旧数据es属地监管部门为空,则用数据库中设备属地监管部门更新
} else {
equipmentCategory.setOrgBranchCode(equipmentCategory.getOrgBranchCode().replace(oldOrgCode, newOrgCode));
}
updatedCategories.add(equipmentCategory);
});
if (!updatedCategories.isEmpty()) {
esEquipmentCategory.saveAll(updatedCategories);
}
}
}
int batchSize = 1000;
for (int i = 0; i < equipmentRecordList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, equipmentRecordList.size());
List<String> subList = equipmentRecordList.subList(i, endIndex);
Iterable<ESEquipmentCategoryDto> equipmentCategoryIterator = esEquipmentCategory.findAllById(subList);
List<ESEquipmentCategoryDto> updatedCategories = new ArrayList<>();
equipmentCategoryIterator.forEach(equipmentCategory -> {
if (ValidationUtil.isEmpty(equipmentCategory.getORG_BRANCH_CODE())) {
equipmentCategory.setORG_BRANCH_CODE(newOrgCode);// 补偿旧数据es属地监管部门为空,则用数据库中设备属地监管部门更新
} else {
equipmentCategory.setORG_BRANCH_CODE(equipmentCategory.getORG_BRANCH_CODE().replace(oldOrgCode, newOrgCode));
// 根据旧属地监管部门orgCode查找es对应设备,更新数据库中设备属地字段
List<ESEquipmentCategoryDto> esEquipmentCategoryList = esEquipmentCategory.findByOrgBranchCodeKeywordStartingWith(oldOrgCode);
if (!ValidationUtil.isEmpty(esEquipmentCategoryList)) {
List<String> esRecordList = Lists.newArrayList();
List<ESEquipmentCategoryDto> updatedEsDtoList = ValidationUtil.isEmpty(equipmentRecordList) ? esEquipmentCategoryList : esEquipmentCategoryList.stream().filter(esDto -> !equipmentRecordList.contains(esDto.getSEQUENCE_NBR())).collect(Collectors.toList());
log.info("开始更新es设备有属地监管部门,数据库无属地监管部门的设备信息。需更新数据{}条", updatedEsDtoList.size());
if (!ValidationUtil.isEmpty(updatedEsDtoList)) {
updatedEsDtoList.forEach(esDto -> {
esDto.setOrgBranchCode(esDto.getOrgBranchCode().replace(oldOrgCode, newOrgCode));
esDto.setORG_BRANCH_NAME(supervisionName);
esRecordList.add(esDto.getSEQUENCE_NBR());
});
int batchSize = 1000;
for (int i = 0; i < updatedEsDtoList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, updatedEsDtoList.size());
List<ESEquipmentCategoryDto> subList = updatedEsDtoList.subList(i, endIndex);
esEquipmentCategory.saveAll(subList);
}
for (int i = 0; i < esRecordList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, esRecordList.size());
List<String> subList = esRecordList.subList(i, endIndex);
tzBaseEnterpriseInfoMapper.updateEquipmentSupervisionOrgCode(newOrgCode, supervisionName, subList);
}
updatedCategories.add(equipmentCategory);
});
if (!updatedCategories.isEmpty()) {
esEquipmentCategory.saveAll(updatedCategories);
}
log.info("更新数据库设备有属地监管部门,数据库无属地监管部门的设备结束");
}
log.info("更新idx_biz_view_jg_all es设备属地监管部门信息结束");
}
/**
......@@ -1533,12 +1563,12 @@ public class TzBaseEnterpriseInfoServiceImpl
throw new RuntimeException(e);
}
}
}
// 休眠50毫秒再发送
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
// 休眠50毫秒再发送
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment