Commit cccc4d3d authored by tianbo's avatar tianbo

Merge remote-tracking branch 'origin/develop_tzs_bugfix' into develop_tzs_bugfix

parents eb73242b a1c10465
...@@ -283,4 +283,40 @@ public class DataHandlerController extends BaseController { ...@@ -283,4 +283,40 @@ public class DataHandlerController extends BaseController {
return ResponseHelper.buildResponse(dataHandlerService.initFactoryCarNumber2Es()); return ResponseHelper.buildResponse(dataHandlerService.initFactoryCarNumber2Es());
} }
/**
* @apiNote 删除ES和数据库中状态≠已认领的设备
*
* @return
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "删除ES和数据库中状态≠已认领的设备(备份后谨慎操作,不可恢复)", notes = "删除ES和数据库中状态≠已认领的设备(备份后谨慎操作,不可恢复)")
@PutMapping(value = "/equip/deleteEquipWithStatusNotInClaimed")
public ResponseModel<Integer> deleteEquipWithStatusNotInClaimed(@RequestParam(value = "indices",defaultValue = "idx_biz_view_jg_all") String indices,
@RequestParam(value = "isDelete",defaultValue = "false") boolean isDelete) throws InterruptedException {
return ResponseHelper.buildResponse(dataHandlerService.deleteEquipWithStatusNotInClaimed(indices, isDelete));
}
/**
* @apiNote 删除数据库中状态≠已认领的设备
*
* @return
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "删除数据库中状态≠已认领的设备", notes = "删除数据库中状态≠已认领的设备")
@PutMapping(value = "/equip/deleteEquipExistDB")
public ResponseModel<Integer> deleteEquipExistDB(@RequestParam(value = "isDelete",defaultValue = "false") boolean isDelete) {
return ResponseHelper.buildResponse(dataHandlerService.deleteEquipExistDB(isDelete));
}
/**
* @apiNote 修改SupervisionInfo中ORG_BRANCH_CODE=%50*X%的数据
*
* @return
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "修改SupervisionInfo中ORG_BRANCH_CODE=%50*X%的数据", notes = "修改SupervisionInfo中ORG_BRANCH_CODE=%50*X%的数据")
@PutMapping(value = "/equip/deleteEquipExistDB")
public ResponseModel<Integer> deleteEquipExistDB2(@RequestParam(value = "isDelete",defaultValue = "false") boolean isDelete) {
return ResponseHelper.buildResponse(dataHandlerService.deleteEquipExistDB(isDelete));
}
} }
\ No newline at end of file
...@@ -53,6 +53,8 @@ import com.yeejoin.amos.boot.module.ymt.api.mapper.*; ...@@ -53,6 +53,8 @@ import com.yeejoin.amos.boot.module.ymt.api.mapper.*;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
...@@ -60,19 +62,18 @@ import org.springframework.util.ObjectUtils; ...@@ -60,19 +62,18 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.StopWatch; import org.springframework.util.StopWatch;
import org.typroject.tyboot.core.foundation.context.RequestContext; import org.typroject.tyboot.core.foundation.context.RequestContext;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil; import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import java.lang.reflect.Method;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.alibaba.fastjson.JSON.toJSONString; import static com.alibaba.fastjson.JSON.toJSONString;
import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNoticeServiceImpl.CONSTRUCTION_TYPE; import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNoticeServiceImpl.CONSTRUCTION_TYPE;
import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNoticeServiceImpl.CONSTRUCTION_TYPE_NAME; import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNoticeServiceImpl.CONSTRUCTION_TYPE_NAME;
...@@ -84,7 +85,12 @@ import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNot ...@@ -84,7 +85,12 @@ import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNot
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
public class DataHandlerServiceImpl { public class DataHandlerServiceImpl {
public static final String IDX_BIZ_VIEW_JG_ALL = "idx_biz_view_jg_all";
public static final String IDX_BIZ_EQUIPMENT_INFO = "idx_biz_equipment_info";
public static final String STATUS = "STATUS";
public static final String record = "record";
public static final String SEQUENCE_NBR = "SEQUENCE_NBR";
private static final int BATCH_SIZE = 1000;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final JgRegistrationHistoryServiceImpl registrationHistoryService; private final JgRegistrationHistoryServiceImpl registrationHistoryService;
private final JgInstallationNoticeServiceImpl installationNoticeService; private final JgInstallationNoticeServiceImpl installationNoticeService;
...@@ -143,6 +149,12 @@ public class DataHandlerServiceImpl { ...@@ -143,6 +149,12 @@ public class DataHandlerServiceImpl {
private final IdxBizJgRegisterInfoMapper registerInfoMapper; private final IdxBizJgRegisterInfoMapper registerInfoMapper;
private final JgUseRegistrationManageServiceImpl registrationManageService;
private final SuperviseInfoMapper superviseInfoMapper;
private final IdxBizJgOtherInfoMapper otherInfoMapper;
/** /**
* 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式 * 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式
...@@ -1824,4 +1836,147 @@ public class DataHandlerServiceImpl { ...@@ -1824,4 +1836,147 @@ public class DataHandlerServiceImpl {
watch.stop(); watch.stop();
log.info("增量添加ORG_BRANCH_CODE为50X综合搜索数据-设备信息入库结束,耗时:{}秒", watch.getTotalTimeSeconds()); log.info("增量添加ORG_BRANCH_CODE为50X综合搜索数据-设备信息入库结束,耗时:{}秒", watch.getTotalTimeSeconds());
} }
public Integer deleteEquipWithStatusNotInClaimed(String indices, boolean isDelete) throws InterruptedException {
if (!Arrays.asList(IDX_BIZ_VIEW_JG_ALL, IDX_BIZ_EQUIPMENT_INFO).contains(indices)) {
throw new IllegalArgumentException("非法索引名,只允许使用 idx_biz_view_jg_all 或 idx_biz_equipment_info");
}
List<?> esList;
List<String> records;
if (IDX_BIZ_VIEW_JG_ALL.equals(indices)) {
esList = queryUnclaimedEquipments(indices, ESEquipmentCategoryDto.class);
} else {
esList = queryUnclaimedEquipments(indices, ESEquipmentInfo.class);
}
if (CollectionUtils.isEmpty(esList)) {
return 0;
}
records = extractSequenceNbr(esList);
if (records.isEmpty()) {
return 0;
}
if (isDelete) {
List<String> successDeleteList = Collections.synchronizedList(new ArrayList<>());
List<List<String>> allDataList = Lists.partition(records, BATCH_SIZE);
ExecutorService executor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 队列最多允许1000个任务
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略(可换成其他)
);
CountDownLatch latch = new CountDownLatch(allDataList.size());
for (List<String> batch : allDataList) {
executor.submit(() -> {
try {
superviseInfoMapper.deleteDataAll(batch);
successDeleteList.addAll(batch);
} catch (Exception e) {
log.error("批次删除失败: {}", batch, e);
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
if (!successDeleteList.isEmpty()) {
List<?> successEsList = esList.stream()
.filter(e -> successDeleteList.contains(getSequenceNbr(e)))
.collect(Collectors.toList());
if (IDX_BIZ_VIEW_JG_ALL.equals(indices)) {
esEquipmentCategory.deleteAll((List<ESEquipmentCategoryDto>) successEsList);
} else {
esEquipmentDao.deleteAll((List<ESEquipmentInfo>) successEsList);
}
}
return successDeleteList.size();
}
return records.size();
}
private String getSequenceNbr(Object e) {
try {
if (e instanceof ESEquipmentCategoryDto) {
return ((ESEquipmentCategoryDto) e).getSEQUENCE_NBR();
} else if (e instanceof ESEquipmentInfo) {
return ((ESEquipmentInfo) e).getSEQUENCE_NBR();
}
} catch (Exception ex) {
log.warn("无法获取sequenceNbr: {}", e, ex);
}
return null;
}
private <T> List<T> queryUnclaimedEquipments(String index, Class<T> clazz) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
BoolQueryBuilder statusFilter = QueryBuilders.boolQuery()
.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(STATUS, "已认领")))
.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(STATUS)))
.should(QueryBuilders.termQuery(STATUS, ""))
.minimumShouldMatch(1);
boolQuery.must(statusFilter);
try {
return registrationManageService.searchResponse(
index,
boolQuery,
hit -> {
JSONObject json = JSONObject.parseObject(hit.getSourceAsString(), JSONObject.class);
json.put(SEQUENCE_NBR, Objects.toString(json.get(SEQUENCE_NBR), ""));
return json.toJavaObject(clazz);
}
);
} catch (Exception ex) {
log.error("查询未认领设备异常:", ex);
return Collections.emptyList();
}
}
private <T> List<String> extractSequenceNbr(List<T> list) {
return list.stream()
.map(item -> {
try {
Method method = item.getClass().getMethod("getSEQUENCE_NBR");
Object value = method.invoke(item);
return Objects.toString(value, null);
} catch (Exception e) {
log.warn("反射获取 SEQUENCE_NBR 失败", e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
public Integer deleteEquipExistDB(boolean isDelete) {
LambdaQueryWrapper<IdxBizJgOtherInfo> wrapper = new LambdaQueryWrapper<>();
wrapper.select(IdxBizJgOtherInfo::getRecord)
.notIn(IdxBizJgOtherInfo::getClaimStatus, "已认领", "");
List<IdxBizJgOtherInfo> jgOtherInfos = otherInfoMapper.selectList(wrapper);
if (CollectionUtils.isEmpty(jgOtherInfos)) {
return 0;
}
List<String> records = jgOtherInfos.stream()
.map(IdxBizJgOtherInfo::getRecord)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (!records.isEmpty() && isDelete) {
superviseInfoMapper.deleteDataAll(records);
}
return records.size();
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment