Commit 4b7e4c6a authored by tianbo's avatar tianbo

feat(es): 添加ES删除操作的回滚功能支持

- 在ESEquipmentCategory接口中新增deleteAllWithFallback方法实现批量删除并缓存回滚数据 - 扩展EsRollbackContext枚举类型增加DELETE类型支持删除操作的回滚数据管理 - 实现删除数据的去重过滤机制避免重复缓存相同SEQUENCE_NBR的数据 - 在EsRollbackAspect切面中添加对删除操作回滚的支持分批处理删除数据回滚 - 优化EsRollbackContext的清理逻辑确保删除类型的上下文数据也被清除
parent e1727802
...@@ -59,6 +59,7 @@ public class EsRollbackAspect { ...@@ -59,6 +59,7 @@ public class EsRollbackAspect {
// 从上下文获取需要回滚的数据 // 从上下文获取需要回滚的数据
List<ESEquipmentCategoryDto> allInsertEsData = EsRollbackContext.getAllInsertEsFromMap(); List<ESEquipmentCategoryDto> allInsertEsData = EsRollbackContext.getAllInsertEsFromMap();
List<ESEquipmentCategoryDto> allUpdateEsData = EsRollbackContext.getAllUpdateEsFromMap(); List<ESEquipmentCategoryDto> allUpdateEsData = EsRollbackContext.getAllUpdateEsFromMap();
List<ESEquipmentCategoryDto> allDeleteEsData = EsRollbackContext.getAllDeleteEsFromMap();
// 防御性检查 // 防御性检查
if (ValidationUtil.isEmpty(allInsertEsData) && ValidationUtil.isEmpty(allUpdateEsData)) { if (ValidationUtil.isEmpty(allInsertEsData) && ValidationUtil.isEmpty(allUpdateEsData)) {
...@@ -72,6 +73,9 @@ public class EsRollbackAspect { ...@@ -72,6 +73,9 @@ public class EsRollbackAspect {
// 分批处理更新数据(回滚为保存) // 分批处理更新数据(回滚为保存)
processBatchSave(allUpdateEsData, "更新数据回滚"); processBatchSave(allUpdateEsData, "更新数据回滚");
// 分批处理删除数据(回滚为保存)
processBatchSave(allDeleteEsData, "删除数据回滚");
log.info("ES数据回滚完成,插入数据条数: {}, 更新数据条数: {}", log.info("ES数据回滚完成,插入数据条数: {}, 更新数据条数: {}",
allInsertEsData.size(), allUpdateEsData.size()); allInsertEsData.size(), allUpdateEsData.size());
......
...@@ -4,10 +4,8 @@ package com.yeejoin.amos.boot.module.common.api.context; ...@@ -4,10 +4,8 @@ package com.yeejoin.amos.boot.module.common.api.context;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto; import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap; import java.util.stream.Collectors;
import java.util.List;
import java.util.Map;
/** /**
* ES回滚上下文管理器 * ES回滚上下文管理器
...@@ -20,7 +18,8 @@ public class EsRollbackContext { ...@@ -20,7 +18,8 @@ public class EsRollbackContext {
public enum EsRollbackType { public enum EsRollbackType {
INSERT, INSERT,
UPDATE UPDATE,
DELETE
} }
private static final ThreadLocal<Map<EsRollbackType, List<ESEquipmentCategoryDto>>> threadLocal = new ThreadLocal<>(); private static final ThreadLocal<Map<EsRollbackType, List<ESEquipmentCategoryDto>>> threadLocal = new ThreadLocal<>();
...@@ -37,6 +36,7 @@ public class EsRollbackContext { ...@@ -37,6 +36,7 @@ public class EsRollbackContext {
context = new HashMap<>(); context = new HashMap<>();
context.put(EsRollbackType.INSERT, new ArrayList<>()); context.put(EsRollbackType.INSERT, new ArrayList<>());
context.put(EsRollbackType.UPDATE, new ArrayList<>()); context.put(EsRollbackType.UPDATE, new ArrayList<>());
context.put(EsRollbackType.DELETE, new ArrayList<>());
threadLocal.set(context); threadLocal.set(context);
} }
return context; return context;
...@@ -50,7 +50,18 @@ public class EsRollbackContext { ...@@ -50,7 +50,18 @@ public class EsRollbackContext {
public static void putAllInsertEsToMap(List<ESEquipmentCategoryDto> esDtos) { public static void putAllInsertEsToMap(List<ESEquipmentCategoryDto> esDtos) {
if (esDtos != null && !esDtos.isEmpty()) { if (esDtos != null && !esDtos.isEmpty()) {
List<ESEquipmentCategoryDto> insertList = getContext().get(EsRollbackType.INSERT); List<ESEquipmentCategoryDto> insertList = getContext().get(EsRollbackType.INSERT);
insertList.addAll(esDtos); // 过滤掉已存在的数据(通过 SEQUENCE_NBR 判断唯一性)
Set<String> existingIds = insertList.stream()
.map(ESEquipmentCategoryDto::getSEQUENCE_NBR)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
List<ESEquipmentCategoryDto> uniqueDtos = esDtos.stream()
.filter(dto -> dto.getSEQUENCE_NBR() != null && !existingIds.contains(dto.getSEQUENCE_NBR()))
.collect(Collectors.toList());
// 只添加不重复的数据
insertList.addAll(uniqueDtos);
} }
} }
...@@ -62,7 +73,41 @@ public class EsRollbackContext { ...@@ -62,7 +73,41 @@ public class EsRollbackContext {
public static void putAllUpdateEsToMap(List<ESEquipmentCategoryDto> esDtos) { public static void putAllUpdateEsToMap(List<ESEquipmentCategoryDto> esDtos) {
if (esDtos != null && !esDtos.isEmpty()) { if (esDtos != null && !esDtos.isEmpty()) {
List<ESEquipmentCategoryDto> updateList = getContext().get(EsRollbackType.UPDATE); List<ESEquipmentCategoryDto> updateList = getContext().get(EsRollbackType.UPDATE);
updateList.addAll(esDtos); // 过滤掉已存在的数据(通过 SEQUENCE_NBR 判断唯一性)
Set<String> existingIds = updateList.stream()
.map(ESEquipmentCategoryDto::getSEQUENCE_NBR)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
List<ESEquipmentCategoryDto> uniqueDtos = esDtos.stream()
.filter(dto -> dto.getSEQUENCE_NBR() != null && !existingIds.contains(dto.getSEQUENCE_NBR()))
.collect(Collectors.toList());
// 只添加不重复的数据
updateList.addAll(uniqueDtos);
}
}
/**
* 添加插入操作的ES数据到上下文
*
* @param esDtos 需要回滚的ES数据列表
*/
public static void putAllDeleteEsToMap(List<ESEquipmentCategoryDto> esDtos) {
if (esDtos != null && !esDtos.isEmpty()) {
List<ESEquipmentCategoryDto> deleteList = getContext().get(EsRollbackType.DELETE);
// 过滤掉已存在的数据(通过 SEQUENCE_NBR 判断唯一性)
Set<String> existingIds = deleteList.stream()
.map(ESEquipmentCategoryDto::getSEQUENCE_NBR)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
List<ESEquipmentCategoryDto> uniqueDtos = esDtos.stream()
.filter(dto -> dto.getSEQUENCE_NBR() != null && !existingIds.contains(dto.getSEQUENCE_NBR()))
.collect(Collectors.toList());
// 只添加不重复的数据
deleteList.addAll(uniqueDtos);
} }
} }
...@@ -74,7 +119,9 @@ public class EsRollbackContext { ...@@ -74,7 +119,9 @@ public class EsRollbackContext {
public static void putInsertEsToMap(ESEquipmentCategoryDto esDto) { public static void putInsertEsToMap(ESEquipmentCategoryDto esDto) {
if (esDto != null) { if (esDto != null) {
List<ESEquipmentCategoryDto> insertList = getContext().get(EsRollbackType.INSERT); List<ESEquipmentCategoryDto> insertList = getContext().get(EsRollbackType.INSERT);
insertList.add(esDto); if (insertList.stream().noneMatch(dto -> dto.getSEQUENCE_NBR().equals(esDto.getSEQUENCE_NBR()))) {
insertList.add(esDto);
}
} }
} }
...@@ -86,7 +133,23 @@ public class EsRollbackContext { ...@@ -86,7 +133,23 @@ public class EsRollbackContext {
public static void putUpdateEsToMap(ESEquipmentCategoryDto esDto) { public static void putUpdateEsToMap(ESEquipmentCategoryDto esDto) {
if (esDto != null) { if (esDto != null) {
List<ESEquipmentCategoryDto> updateList = getContext().get(EsRollbackType.UPDATE); List<ESEquipmentCategoryDto> updateList = getContext().get(EsRollbackType.UPDATE);
updateList.add(esDto); if (updateList.stream().noneMatch(dto -> dto.getSEQUENCE_NBR().equals(esDto.getSEQUENCE_NBR()))) {
updateList.add(esDto);
}
}
}
/**
* 添加单个删除操作的ES数据到上下文
*
* @param esDto 需要回滚的ES数据
*/
public static void putDeleteEsToMap(ESEquipmentCategoryDto esDto) {
if (esDto != null) {
List<ESEquipmentCategoryDto> deleteList = getContext().get(EsRollbackType.DELETE);
if (deleteList.stream().noneMatch(dto -> dto.getSEQUENCE_NBR().equals(esDto.getSEQUENCE_NBR()))) {
deleteList.add(esDto);
}
} }
} }
...@@ -109,6 +172,15 @@ public class EsRollbackContext { ...@@ -109,6 +172,15 @@ public class EsRollbackContext {
} }
/** /**
* 获取所有删除操作的ES数据
*
* @return 删除操作的ES数据列表
*/
public static List<ESEquipmentCategoryDto> getAllDeleteEsFromMap() {
return new ArrayList<>(getContext().get(EsRollbackType.DELETE));
}
/**
* 检查上下文中是否存在需要回滚的数据 * 检查上下文中是否存在需要回滚的数据
* *
* @return 如果存在需要回滚的数据返回true,否则返回false * @return 如果存在需要回滚的数据返回true,否则返回false
...@@ -116,7 +188,8 @@ public class EsRollbackContext { ...@@ -116,7 +188,8 @@ public class EsRollbackContext {
public static boolean hasRollbackData() { public static boolean hasRollbackData() {
Map<EsRollbackType, List<ESEquipmentCategoryDto>> context = getContext(); Map<EsRollbackType, List<ESEquipmentCategoryDto>> context = getContext();
return !context.get(EsRollbackType.INSERT).isEmpty() || return !context.get(EsRollbackType.INSERT).isEmpty() ||
!context.get(EsRollbackType.UPDATE).isEmpty(); !context.get(EsRollbackType.UPDATE).isEmpty() ||
!context.get(EsRollbackType.DELETE).isEmpty();
} }
/** /**
...@@ -129,6 +202,7 @@ public class EsRollbackContext { ...@@ -129,6 +202,7 @@ public class EsRollbackContext {
if (context != null) { if (context != null) {
context.get(EsRollbackType.INSERT).clear(); context.get(EsRollbackType.INSERT).clear();
context.get(EsRollbackType.UPDATE).clear(); context.get(EsRollbackType.UPDATE).clear();
context.get(EsRollbackType.DELETE).clear();
threadLocal.remove(); threadLocal.remove();
} }
log.debug("EsRollbackContext clean end"); log.debug("EsRollbackContext clean end");
......
...@@ -112,4 +112,54 @@ public interface ESEquipmentCategory extends PagingAndSortingRepository<ESEquipm ...@@ -112,4 +112,54 @@ public interface ESEquipmentCategory extends PagingAndSortingRepository<ESEquipm
// 返回处理后的单个对象 // 返回处理后的单个对象
return result.iterator().next(); return result.iterator().next();
} }
default Iterable<ESEquipmentCategoryDto> deleteAllWithFallback(Iterable<ESEquipmentCategoryDto> dtos) {
int batchSize = 2000;
List<ESEquipmentCategoryDto> dtoList = StreamSupport.stream(dtos.spliterator(), false)
.collect(Collectors.toList());
if (dtoList.isEmpty()) {
return Collections.emptyList();
}
log.info("deleteAllWithCache开始处理ES数据,总数量: {}, 批次大小: {}", dtoList.size(), batchSize);
List<ESEquipmentCategoryDto> allDeleteData = new ArrayList<>();
// 分批处理
for (int i = 0; i < dtoList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, dtoList.size());
List<ESEquipmentCategoryDto> batch = dtoList.subList(i, endIndex);
long startTime = System.currentTimeMillis();
List<String> batchIds = batch.stream()
.map(ESEquipmentCategoryDto::getSEQUENCE_NBR)
.collect(Collectors.toList());
Iterable<ESEquipmentCategoryDto> batchBeforeData = this.findAllById(batchIds);
allDeleteData.addAll(StreamSupport.stream(batchBeforeData.spliterator(), false).collect(Collectors.toList()));
// 批量保存当前批次
this.deleteAll(batch);
long endTime = System.currentTimeMillis();
log.debug("处理批次 {}/{} 完成,耗时: {}ms, 删除: {}",
(i/batchSize + 1),
(int)Math.ceil((double)dtoList.size()/batchSize),
(endTime - startTime),
allDeleteData.size());
}
// 一次性缓存所有数据(避免频繁的上下文操作)
if (!allDeleteData.isEmpty()) {
EsRollbackContext.putAllDeleteEsToMap(allDeleteData);
log.info("es缓存删除数据: {} 条", allDeleteData.size());
}
log.info("deleteAllWithCache数据处理完成,总计: 删除{}条", allDeleteData.size());
return dtoList;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment