Commit 7d34e32a authored by tianbo's avatar tianbo

feat(common): 添加ES回滚功能支持

- 在ESEquipmentCategory接口中新增saveAllWithCache和saveWithCache方法实现批量缓存操作 - 新增EnableEsRollback注解用于标识需要ES回滚的方法 - 实现EsRollbackAspect切面处理ES数据回滚逻辑 - 创建EsRollbackContext管理ES回滚上下文数据 - 在JgInstallationNoticeServiceImpl中集成ES回滚机制
parent 630deb8d
package com.yeejoin.amos.boot.biz.common.annotation;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface EnableEsRollback {
/**
* 是否开启es回滚
*/
boolean enabled() default true;
}
package com.yeejoin.amos.boot.module.common.api.aop;
import com.yeejoin.amos.boot.biz.common.annotation.EnableEsRollback;
import com.yeejoin.amos.boot.module.common.api.context.EsRollbackContext;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import java.util.List;
@Aspect
@Component
@Order(value = 0)
@Slf4j
public class EsRollbackAspect {
private final ESEquipmentCategory esEquipmentCategoryDao;
public EsRollbackAspect(ESEquipmentCategory esEquipmentCategoryDao) {
this.esEquipmentCategoryDao = esEquipmentCategoryDao;
}
@Pointcut(value = "@annotation(com.yeejoin.amos.boot.biz.common.annotation.EnableEsRollback)")
public void esRollbackPointcut() {}
@Around("esRollbackPointcut() && @annotation(enableEsRollback)")
public Object handleEsRollback(ProceedingJoinPoint joinPoint, EnableEsRollback enableEsRollback) throws Throwable {
try {
return joinPoint.proceed();
} catch (Exception e) {
// 开启es回滚时需要处理异常
if (enableEsRollback.enabled()) {
rollbackEsData();
}
throw e;
} finally {
// 清理上下文数据
EsRollbackContext.clean();
}
}
/**
* 回滚ES数据
*/
private void rollbackEsData() {
if (!EsRollbackContext.hasRollbackData()) {
log.info("无需要回滚的ES数据");
return;
}
try {
// 从上下文获取需要回滚的数据
List<ESEquipmentCategoryDto> allInsertEsData = EsRollbackContext.getAllInsertEsFromMap();
List<ESEquipmentCategoryDto> allUpdateEsData = EsRollbackContext.getAllUpdateEsFromMap();
// 防御性检查
if (ValidationUtil.isEmpty(allInsertEsData) && ValidationUtil.isEmpty(allUpdateEsData)) {
log.warn("回滚数据为空,跳过回滚操作");
return;
}
// 分批处理插入数据(回滚为删除)
processBatchDelete(allInsertEsData, "插入数据回滚");
// 分批处理更新数据(回滚为保存)
processBatchSave(allUpdateEsData, "更新数据回滚");
log.info("ES数据回滚完成,插入数据条数: {}, 更新数据条数: {}",
allInsertEsData.size(), allUpdateEsData.size());
} catch (Exception e) {
log.error("ES数据回滚失败", e);
throw new RuntimeException("ES回滚操作异常", e);
} finally {
// 清理上下文数据,避免残留
EsRollbackContext.clean();
}
}
/**
* 分批删除ES数据
*/
private void processBatchDelete(List<ESEquipmentCategoryDto> dataList, String operationDesc) {
if (ValidationUtil.isEmpty(dataList)) {
return;
}
int batchSize = 1000; // 批次大小可根据实际情况调整
for (int i = 0; i < dataList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, dataList.size());
List<ESEquipmentCategoryDto> batch = dataList.subList(i, endIndex);
try {
esEquipmentCategoryDao.deleteAll(batch);
log.debug("{} 批次完成,条数: {}", operationDesc, batch.size());
} catch (Exception e) {
log.error("{} 批次失败,条数: {}", operationDesc, batch.size(), e);
throw e;
}
}
}
/**
* 分批保存ES数据
*/
private void processBatchSave(List<ESEquipmentCategoryDto> dataList, String operationDesc) {
if (ValidationUtil.isEmpty(dataList)) {
return;
}
int batchSize = 1000; // 批次大小可根据实际情况调整
for (int i = 0; i < dataList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, dataList.size());
List<ESEquipmentCategoryDto> batch = dataList.subList(i, endIndex);
try {
esEquipmentCategoryDao.saveAll(batch);
log.debug("{} 批次完成,条数: {}", operationDesc, batch.size());
} catch (Exception e) {
log.error("{} 批次失败,条数: {}", operationDesc, batch.size(), e);
throw e;
}
}
}
}
package com.yeejoin.amos.boot.module.common.api.context;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* ES回滚上下文管理器
* 用于在事务执行过程中缓存ES操作数据,以便在异常时进行回滚
*
* @author Administrator
*/
@Slf4j
public class EsRollbackContext {
public enum EsRollbackType {
INSERT,
UPDATE
}
private static final ThreadLocal<Map<EsRollbackType, List<ESEquipmentCategoryDto>>> threadLocal = new ThreadLocal<>();
/**
* 获取当前线程的上下文Map
* 如果不存在则创建新的Map
*
* @return 当前线程的上下文Map
*/
public static Map<EsRollbackType, List<ESEquipmentCategoryDto>> getContext() {
Map<EsRollbackType, List<ESEquipmentCategoryDto>> context = threadLocal.get();
if (context == null) {
context = new HashMap<>();
context.put(EsRollbackType.INSERT, new ArrayList<>());
context.put(EsRollbackType.UPDATE, new ArrayList<>());
threadLocal.set(context);
}
return context;
}
/**
* 添加插入操作的ES数据到上下文
*
* @param esDtos 需要回滚的ES数据列表
*/
public static void putAllInsertEsToMap(List<ESEquipmentCategoryDto> esDtos) {
if (esDtos != null && !esDtos.isEmpty()) {
List<ESEquipmentCategoryDto> insertList = getContext().get(EsRollbackType.INSERT);
insertList.addAll(esDtos);
}
}
/**
* 添加更新操作的ES数据到上下文
*
* @param esDtos 需要回滚的ES数据列表
*/
public static void putAllUpdateEsToMap(List<ESEquipmentCategoryDto> esDtos) {
if (esDtos != null && !esDtos.isEmpty()) {
List<ESEquipmentCategoryDto> updateList = getContext().get(EsRollbackType.UPDATE);
updateList.addAll(esDtos);
}
}
/**
* 添加单个插入操作的ES数据到上下文
*
* @param esDto 需要回滚的ES数据
*/
public static void putInsertEsToMap(ESEquipmentCategoryDto esDto) {
if (esDto != null) {
List<ESEquipmentCategoryDto> insertList = getContext().get(EsRollbackType.INSERT);
insertList.add(esDto);
}
}
/**
* 添加单个更新操作的ES数据到上下文
*
* @param esDto 需要回滚的ES数据
*/
public static void putUpdateEsToMap(ESEquipmentCategoryDto esDto) {
if (esDto != null) {
List<ESEquipmentCategoryDto> updateList = getContext().get(EsRollbackType.UPDATE);
updateList.add(esDto);
}
}
/**
* 获取所有插入操作的ES数据
*
* @return 插入操作的ES数据列表
*/
public static List<ESEquipmentCategoryDto> getAllInsertEsFromMap() {
return new ArrayList<>(getContext().get(EsRollbackType.INSERT));
}
/**
* 获取所有更新操作的ES数据
*
* @return 更新操作的ES数据列表
*/
public static List<ESEquipmentCategoryDto> getAllUpdateEsFromMap() {
return new ArrayList<>(getContext().get(EsRollbackType.UPDATE));
}
/**
* 检查上下文中是否存在需要回滚的数据
*
* @return 如果存在需要回滚的数据返回true,否则返回false
*/
public static boolean hasRollbackData() {
Map<EsRollbackType, List<ESEquipmentCategoryDto>> context = getContext();
return !context.get(EsRollbackType.INSERT).isEmpty() ||
!context.get(EsRollbackType.UPDATE).isEmpty();
}
/**
* 清理当前线程的上下文数据
* 需要在事务结束时调用,避免内存泄漏
*/
public static void clean() {
log.debug("EsRollbackContext clean begin");
Map<EsRollbackType, List<ESEquipmentCategoryDto>> context = threadLocal.get();
if (context != null) {
context.get(EsRollbackType.INSERT).clear();
context.get(EsRollbackType.UPDATE).clear();
threadLocal.remove();
}
log.debug("EsRollbackContext clean end");
}
}
package com.yeejoin.amos.boot.module.common.api.dao;
import com.yeejoin.amos.boot.module.common.api.context.EsRollbackContext;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.annotations.Query;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@Repository
public interface ESEquipmentCategory extends PagingAndSortingRepository<ESEquipmentCategoryDto, String> {
Logger log = LoggerFactory.getLogger(ESEquipmentCategory.class);
List<ESEquipmentCategoryDto> findAllByProjectContraptionId(String PROJECT_CONTRAPTION_ID);
@Query("{\"prefix\": {\"ORG_BRANCH_CODE.keyword\": \"?0\"}}")
List<ESEquipmentCategoryDto> findByOrgBranchCodeKeywordStartingWith(String orgBranchCodePrefix);
default Iterable<ESEquipmentCategoryDto> saveAllWithCache(Iterable<ESEquipmentCategoryDto> dtos) {
int batchSize = 2000;
List<ESEquipmentCategoryDto> dtoList = StreamSupport.stream(dtos.spliterator(), false)
.collect(Collectors.toList());
if (dtoList.isEmpty()) {
return Collections.emptyList();
}
log.info("saveAllWithCache开始处理ES数据,总数量: {}, 批次大小: {}", dtoList.size(), batchSize);
List<ESEquipmentCategoryDto> allUpdateData = new ArrayList<>();
List<ESEquipmentCategoryDto> allInsertData = new ArrayList<>();
// 分批处理
for (int i = 0; i < dtoList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, dtoList.size());
List<ESEquipmentCategoryDto> batch = dtoList.subList(i, endIndex);
long startTime = System.currentTimeMillis();
List<String> batchIds = batch.stream()
.map(ESEquipmentCategoryDto::getSEQUENCE_NBR)
.collect(Collectors.toList());
Iterable<ESEquipmentCategoryDto> batchBeforeData = this.findAllById(batchIds);
Map<String, ESEquipmentCategoryDto> beforeDataMap = StreamSupport.stream(
batchBeforeData.spliterator(), false)
.collect(Collectors.toMap(
ESEquipmentCategoryDto::getSEQUENCE_NBR,
Function.identity(),
(existing, replacement) -> existing
));
// 快速分离更新和插入数据
List<ESEquipmentCategoryDto> batchUpdateData = new ArrayList<>();
List<ESEquipmentCategoryDto> batchInsertData = new ArrayList<>();
for (ESEquipmentCategoryDto dto : batch) {
if (beforeDataMap.containsKey(dto.getSEQUENCE_NBR())) {
batchUpdateData.add(beforeDataMap.get(dto.getSEQUENCE_NBR()));
} else {
batchInsertData.add(dto);
}
}
allUpdateData.addAll(batchUpdateData);
allInsertData.addAll(batchInsertData);
// 批量保存当前批次
this.saveAll(batch);
long endTime = System.currentTimeMillis();
log.debug("处理批次 {}/{} 完成,耗时: {}ms, 更新: {}, 插入: {}",
(i/batchSize + 1),
(int)Math.ceil((double)dtoList.size()/batchSize),
(endTime - startTime),
batchUpdateData.size(),
batchInsertData.size());
}
// 一次性缓存所有数据(避免频繁的上下文操作)
if (!allUpdateData.isEmpty()) {
EsRollbackContext.putAllUpdateEsToMap(allUpdateData);
log.info("es缓存更新数据: {} 条", allUpdateData.size());
}
if (!allInsertData.isEmpty()) {
EsRollbackContext.putAllInsertEsToMap(allInsertData);
log.info("es缓存插入数据: {} 条", allInsertData.size());
}
log.info("saveAllWithCache数据处理完成,总计: 更新{}条, 插入{}条",
allUpdateData.size(), allInsertData.size());
return dtoList;
}
default ESEquipmentCategoryDto saveWithCache(ESEquipmentCategoryDto dto) {
// 将单个对象转换为列表进行处理
List<ESEquipmentCategoryDto> dtoList = Collections.singletonList(dto);
Iterable<ESEquipmentCategoryDto> result = saveAllWithCache(dtoList);
// 返回处理后的单个对象
return result.iterator().next();
}
}
......@@ -726,7 +726,7 @@ public class JgMaintenanceContractServiceImpl extends BaseService<JgMaintenanceC
contract.setNextTaskId(null);
TaskV2Model taskV2Model = updateTaskModel(contract, "0");
// 添加设备维保信息
updateEquipMessage(contract.getSequenceNbr(), taskV2Model);
updateEquipMessage(contract);
// 发送数据刷新消息
this.sendDataRefreshMsg(sequenceNbr);
// 创建设备履历
......@@ -763,9 +763,8 @@ public class JgMaintenanceContractServiceImpl extends BaseService<JgMaintenanceC
* @param id
*/
@SneakyThrows
private void updateEquipMessage(Long id, TaskV2Model taskV2Model) {
JgMaintenanceContract jgMaintenanceContract = this.getBaseMapper().selectById(id);
List<JgMaintenanceContractEq> list = getJgMaintenanceContractEqs(id);
private void updateEquipMessage(JgMaintenanceContract jgMaintenanceContract) {
List<JgMaintenanceContractEq> list = getJgMaintenanceContractEqs(jgMaintenanceContract.getSequenceNbr());
List<IdxBizJgMaintenanceRecordInfo> maintenanceRecordInfoList = new ArrayList<>();
// 循环设备
list.forEach(item -> {
......@@ -785,7 +784,7 @@ public class JgMaintenanceContractServiceImpl extends BaseService<JgMaintenanceC
info.setInformEnd(ObjectUtils.isEmpty(jgMaintenanceContract.getInformEnd()) ? null : jgMaintenanceContract.getInformEnd());
info.setRepairInform(ObjectUtils.isEmpty(jgMaintenanceContract.getMaintenanceContract()) ? null : jgMaintenanceContract.getMaintenanceContract());
// 记录创建业务的id,在维保备案作废时,将对应的维保记录删除
info.setSourceId(id + "");
info.setSourceId(jgMaintenanceContract.getSequenceNbr() + "");
maintenanceRecordInfoList.add(info);
});
......
......@@ -18,7 +18,6 @@ import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
......@@ -884,7 +883,7 @@ public class DPSubServiceImpl {
}
}
Object body = apiObj.get("body");
ResponseEntity<String> sresponseEntity = null;
ResponseEntity<String> responseEntity = null;
//如果url以/开头,则调用本服务内接口
if (url != null && url.trim().startsWith("/")) {
url = "http://" + GATEWAY_SERVER_NAME + url;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment