Commit 2208eb06 authored by 刘林's avatar 刘林

fix(jg):修改SupervisionInfo中ORG_BRANCH_CODE=%50*X%的数据

parent c14c28cc
......@@ -291,7 +291,32 @@ public class DataHandlerController extends BaseController {
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "删除ES和数据库中状态≠已认领的设备(备份后谨慎操作,不可恢复)", notes = "删除ES和数据库中状态≠已认领的设备(备份后谨慎操作,不可恢复)")
@PutMapping(value = "/equip/deleteEquipWithStatusNotInClaimed")
public ResponseModel<Integer> deleteEquipWithStatusNotInClaimed(@RequestParam(value = "indices",defaultValue = "idx_biz_view_jg_all") String indices) {
return ResponseHelper.buildResponse(dataHandlerService.deleteEquipWithStatusNotInClaimed(indices));
public ResponseModel<Integer> deleteEquipWithStatusNotInClaimed(@RequestParam(value = "indices",defaultValue = "idx_biz_view_jg_all") String indices,
@RequestParam(value = "isDelete",defaultValue = "false") boolean isDelete) throws InterruptedException {
return ResponseHelper.buildResponse(dataHandlerService.deleteEquipWithStatusNotInClaimed(indices, isDelete));
}
/**
* @apiNote 删除数据库中状态≠已认领的设备
*
* @return
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "删除数据库中状态≠已认领的设备", notes = "删除数据库中状态≠已认领的设备")
@PutMapping(value = "/equip/deleteEquipExistDB")
public ResponseModel<Integer> deleteEquipExistDB(@RequestParam(value = "isDelete",defaultValue = "false") boolean isDelete) {
return ResponseHelper.buildResponse(dataHandlerService.deleteEquipExistDB(isDelete));
}
/**
* @apiNote 修改SupervisionInfo中ORG_BRANCH_CODE=%50*X%的数据
*
* @return
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "PUT", value = "修改SupervisionInfo中ORG_BRANCH_CODE=%50*X%的数据", notes = "修改SupervisionInfo中ORG_BRANCH_CODE=%50*X%的数据")
@PutMapping(value = "/equip/deleteEquipExistDB")
public ResponseModel<Integer> deleteEquipExistDB2(@RequestParam(value = "isDelete",defaultValue = "false") boolean isDelete) {
return ResponseHelper.buildResponse(dataHandlerService.deleteEquipExistDB(isDelete));
}
}
\ No newline at end of file
......@@ -69,7 +69,7 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
......@@ -153,6 +153,8 @@ public class DataHandlerServiceImpl {
private final SuperviseInfoMapper superviseInfoMapper;
private final IdxBizJgOtherInfoMapper otherInfoMapper;
/**
* 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式
......@@ -1835,38 +1837,90 @@ public class DataHandlerServiceImpl {
log.info("增量添加ORG_BRANCH_CODE为50X综合搜索数据-设备信息入库结束,耗时:{}秒", watch.getTotalTimeSeconds());
}
public Integer deleteEquipWithStatusNotInClaimed(String indices) {
public Integer deleteEquipWithStatusNotInClaimed(String indices, boolean isDelete) throws InterruptedException {
if (!Arrays.asList(IDX_BIZ_VIEW_JG_ALL, IDX_BIZ_EQUIPMENT_INFO).contains(indices)) {
throw new IllegalArgumentException("非法索引名,只允许使用 idx_biz_view_jg_all 或 idx_biz_equipment_info");
}
List<String> records = new ArrayList<>();
List<?> esList;
List<String> records;
if (IDX_BIZ_VIEW_JG_ALL.equals(indices)) {
List<ESEquipmentCategoryDto> esList = queryUnclaimedEquipments(indices, ESEquipmentCategoryDto.class);
if (!CollectionUtils.isEmpty(esList)) {
records = extractSequenceNbr(esList);
esEquipmentCategory.deleteAll(esList);
}
esList = queryUnclaimedEquipments(indices, ESEquipmentCategoryDto.class);
} else {
List<ESEquipmentInfo> esList = queryUnclaimedEquipments(indices, ESEquipmentInfo.class);
if (!CollectionUtils.isEmpty(esList)) {
records = extractSequenceNbr(esList);
esEquipmentDao.deleteAll(esList);
}
esList = queryUnclaimedEquipments(indices, ESEquipmentInfo.class);
}
if (CollectionUtils.isEmpty(esList)) {
return 0;
}
records = extractSequenceNbr(esList);
if (records.isEmpty()) {
return 0;
}
List<List<String>> allDataList = Lists.partition(records, BATCH_SIZE);
for (List<String> tempDataList : allDataList) {
if (!CollectionUtils.isEmpty(tempDataList)) {
superviseInfoMapper.deleteDataAll(tempDataList);
if (isDelete) {
List<String> successDeleteList = Collections.synchronizedList(new ArrayList<>());
List<List<String>> allDataList = Lists.partition(records, BATCH_SIZE);
ExecutorService executor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 队列最多允许1000个任务
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略(可换成其他)
);
CountDownLatch latch = new CountDownLatch(allDataList.size());
for (List<String> batch : allDataList) {
executor.submit(() -> {
try {
superviseInfoMapper.deleteDataAll(batch);
successDeleteList.addAll(batch);
} catch (Exception e) {
log.error("批次删除失败: {}", batch, e);
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
if (!successDeleteList.isEmpty()) {
List<?> successEsList = esList.stream()
.filter(e -> successDeleteList.contains(getSequenceNbr(e)))
.collect(Collectors.toList());
if (IDX_BIZ_VIEW_JG_ALL.equals(indices)) {
esEquipmentCategory.deleteAll((List<ESEquipmentCategoryDto>) successEsList);
} else {
esEquipmentDao.deleteAll((List<ESEquipmentInfo>) successEsList);
}
}
return successDeleteList.size();
}
return records.size();
}
private String getSequenceNbr(Object e) {
try {
if (e instanceof ESEquipmentCategoryDto) {
return ((ESEquipmentCategoryDto) e).getSEQUENCE_NBR();
} else if (e instanceof ESEquipmentInfo) {
return ((ESEquipmentInfo) e).getSEQUENCE_NBR();
}
} catch (Exception ex) {
log.warn("无法获取sequenceNbr: {}", e, ex);
}
return null;
}
private <T> List<T> queryUnclaimedEquipments(String index, Class<T> clazz) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
BoolQueryBuilder statusFilter = QueryBuilders.boolQuery()
......@@ -1907,4 +1961,22 @@ public class DataHandlerServiceImpl {
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
public Integer deleteEquipExistDB(boolean isDelete) {
LambdaQueryWrapper<IdxBizJgOtherInfo> wrapper = new LambdaQueryWrapper<>();
wrapper.select(IdxBizJgOtherInfo::getRecord)
.notIn(IdxBizJgOtherInfo::getClaimStatus, "已认领", "");
List<IdxBizJgOtherInfo> jgOtherInfos = otherInfoMapper.selectList(wrapper);
if (CollectionUtils.isEmpty(jgOtherInfos)) {
return 0;
}
List<String> records = jgOtherInfos.stream()
.map(IdxBizJgOtherInfo::getRecord)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (!records.isEmpty() && isDelete) {
superviseInfoMapper.deleteDataAll(records);
}
return records.size();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment