Commit cf7ad94f authored by suhuiguang's avatar suhuiguang

fix(设备创建) : 事务回滚

1.自动任务异常处理
parent 894f98fe
......@@ -652,4 +652,7 @@ public class ESEquipmentInfo {
@Field(type = FieldType.Keyword)
private String meMaster1Phone;
}
@Field(type = FieldType.Keyword)
private String version;
}
package com.yeejoin.amos.boot.module.common.biz.service.impl;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant.ES_INDEX_NAME_EQUIPMENT_INFO;
@Service
@RequiredArgsConstructor
public class ESEquipmentInfoService {
private final RestHighLevelClient restHighLevelClient;
public List<String> findSequenceNbrByIds(List<String> ids) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.termsQuery("SEQUENCE_NBR.keyword", ids))
.fetchSource(false);
SearchRequest request = new SearchRequest(ES_INDEX_NAME_EQUIPMENT_INFO)
.source(sourceBuilder);
try {
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
return Arrays.stream(response.getHits().getHits())
.map(SearchHit::getId)
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
......@@ -3,9 +3,26 @@ package com.yeejoin.amos.boot.module.common.biz.service.impl;
import com.yeejoin.amos.boot.module.common.api.dao.ESEquipmentCategory;
import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static com.yeejoin.amos.boot.module.common.api.constant.TZSCommonConstant.ES_INDEX_NAME_JG_ALL;
@Service
@RequiredArgsConstructor
public class EquipmentCategoryService {
......@@ -14,10 +31,28 @@ public class EquipmentCategoryService {
private final ElasticsearchOperations elasticsearchOperations;
private final RestHighLevelClient restHighLevelClient;
public ESEquipmentCategoryDto saveWithImmediateRefresh(ESEquipmentCategoryDto dto) {
ESEquipmentCategoryDto saved = equipmentCategoryDao.save(dto);
// 手动触发索引刷新
elasticsearchOperations.indexOps(ESEquipmentCategoryDto.class).refresh();
return saved;
}
public List<String> findSequenceNbrByIds(List<String> ids) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.termsQuery("SEQUENCE_NBR.keyword", ids))
.fetchSource(false);
SearchRequest request = new SearchRequest(ES_INDEX_NAME_JG_ALL)
.source(sourceBuilder);
try {
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
return Arrays.stream(response.getHits().getHits())
.map(SearchHit::getId)
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
\ No newline at end of file
......@@ -456,4 +456,13 @@ public class DataHandlerController extends BaseController {
return ResponseHelper.buildResponse(dataHandlerService.insertEquipFromDb2NewEs(paramMap));
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@PutMapping(value = "/sync-equip/shard")
@ApiOperation(httpMethod = "PUT", value = "分片对齐设备的数据与es数据", notes = "分片对齐设备的数据与es数据")
public ResponseModel<Boolean> syncEquipShard(@RequestParam Integer shard,
@RequestParam Integer slots,
@RequestParam String batchNo){
return ResponseHelper.buildResponse(dataHandlerService.syncEquipShard(shard, slots, batchNo));
}
}
\ No newline at end of file
package com.yeejoin.amos.boot.module.jg.biz.job;
import cn.hutool.core.date.DateUtil;
import com.yeejoin.amos.boot.module.ymt.api.mapper.IdxBizJgUseInfoMapper;
import com.yeejoin.amos.component.feign.model.FeignClientResult;
import com.yeejoin.amos.component.robot.AmosRequestContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Component
@RequiredArgsConstructor
@Slf4j
public class DataConsistencyJob {
private final AmosRequestContext requestContext;
private final IdxBizJgUseInfoMapper idxBizJgUseInfoMapper;
private final DiscoveryClient discoveryClient;
private final RestTemplate restTemplate; // 需要提前注入
@Value("${spring.application.name}")
private String applicationName;
@Scheduled(cron = "0 0 3 * * ?")
@SchedulerLock(name = "equipConsistencyJob", lockAtMostFor = "PT1H")
public void execute() {
List<ServiceInstance> instanceList = discoveryClient.getInstances(applicationName);
int slots = instanceList.size();
Executor executor = Executors.newFixedThreadPool(slots);
HttpHeaders headers = this.builderHeaders();
HttpEntity<?> requestEntity = new HttpEntity<>(headers);
List<CompletableFuture<Void>> futures = new ArrayList<>();
String batchNo = DateUtil.today();
for (int i = 0; i < slots; i++) {
int index = i;
ServiceInstance instance = instanceList.get(i);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
String baseUrl = instance.getUri().toString();
String apiUrl = baseUrl + "/jg/dataHandler/sync-equip/shard?shard={shard}&slots={slots}&batchNo={batchNo}";
Map<String, Object> params = new HashMap<>();
params.put("shard", index);
params.put("slots", slots);
params.put("batchNo", batchNo);
restTemplate.exchange(
apiUrl,
HttpMethod.PUT,
requestEntity,
FeignClientResult.class,
params
);
}, executor);
futures.add(future);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.exceptionally(ex -> {
log.error("任务执行异常", ex);
return null;
})
.join();
}
}
private HttpHeaders builderHeaders() {
HttpHeaders httpheaders = new HttpHeaders();
httpheaders.add("appKey", requestContext.getAppKey());
httpheaders.add("product", requestContext.getProduct());
httpheaders.add("token", requestContext.getToken());
httpheaders.setContentType(MediaType.APPLICATION_JSON);
return httpheaders;
}
}
......@@ -30,6 +30,8 @@ import com.yeejoin.amos.boot.module.common.api.dto.ESEquipmentCategoryDto;
import com.yeejoin.amos.boot.module.common.api.entity.*;
import com.yeejoin.amos.boot.module.common.api.enums.CylinderTypeEnum;
import com.yeejoin.amos.boot.module.common.biz.refresh.cm.RefreshCmService;
import com.yeejoin.amos.boot.module.common.biz.service.impl.ESEquipmentInfoService;
import com.yeejoin.amos.boot.module.common.biz.service.impl.EquipmentCategoryService;
import com.yeejoin.amos.boot.module.common.biz.service.impl.EsSearchServiceImpl;
import com.yeejoin.amos.boot.module.common.biz.utils.RefreshDataUtils;
import com.yeejoin.amos.boot.module.jg.api.common.BizCommonConstant;
......@@ -60,15 +62,21 @@ import com.yeejoin.amos.boot.module.ymt.api.enums.EquipmentEnum;
import com.yeejoin.amos.boot.module.ymt.api.enums.FlowStatusEnum;
import com.yeejoin.amos.boot.module.ymt.api.mapper.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
......@@ -98,6 +106,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static com.alibaba.fastjson.JSON.toJSONString;
import static com.yeejoin.amos.boot.module.jg.biz.service.impl.JgInstallationNoticeServiceImpl.CONSTRUCTION_TYPE;
......@@ -219,6 +228,12 @@ public class DataHandlerServiceImpl {
private final IdxBizJgInspectionDetectionInfoMapper inspectionDetectionInfoMapper;
private final ESEquipmentInfoService esEquipmentInfoService;
private final EquipmentCategoryService equipmentCategoryService;
/**
* 安装告知压力管道历史数据修复-详情中的设备列表修改为汇总表格式
*
......@@ -2948,4 +2963,91 @@ public class DataHandlerServiceImpl {
esEquipmentInfos = esEquipmentInfos.stream().filter(e -> StringUtils.isNotEmpty(e.getSEQUENCE_NBR())).collect(Collectors.toList());
return esEquipmentInfos;
}
@SneakyThrows
public boolean syncEquipShard(Integer shard, Integer slots, String batchNo) {
StopWatch watch = new StopWatch();
watch.start();
// 0.数据准备
// 查询数据库 批量
List<String> records = idxBizJgUseInfoMapper.selectRecordList(shard, slots);
List<List<String>> partitions = Lists.partition(records, 2000);
// 查询es-旧索引
List<String> existsOldEsEquipRecords = partitions.parallelStream().map(equipmentCategoryService::findSequenceNbrByIds).flatMap(List::stream).collect(Collectors.toList());
// 标记老索引es-缺少数据
List<String> oldEsMissingRecords = records.parallelStream().filter(r -> !existsOldEsEquipRecords.contains(r)).collect(Collectors.toList());
// 查询es-新索引
List<String> existsNewEsEquipRecords = partitions.parallelStream().map(esEquipmentInfoService::findSequenceNbrByIds).flatMap(List::stream).collect(Collectors.toList());
// 标记新索引es-缺少数据
List<String> newEsMissingRecords = records.parallelStream().filter(r -> !existsNewEsEquipRecords.contains(r)).collect(Collectors.toList());
// 1.分批次保存,数据补充至es
Set<String> allMissingRecords = Stream.concat(oldEsMissingRecords.stream(), newEsMissingRecords.stream()).collect(Collectors.toSet());
List<List<String>> savePartitions = Lists.partition(new ArrayList<>(allMissingRecords), 2000);
savePartitions.parallelStream().forEach(rs->{
List<Map<String, Object>> details = idxBizJgUseInfoMapper.queryDetailBatch(rs);
save2OldEquipEs(details, oldEsMissingRecords, batchNo);
save2NewEquipEs(rs, details, newEsMissingRecords, batchNo);
});
// 2.更新本切片数据的es的版本号,用来清除es的数据标记用
Map<String, Object> updateFields = MapUtil.of("version", batchNo);
partitions.parallelStream().forEach(batch->{
updateEsVersion(batch, updateFields, IDX_BIZ_VIEW_JG_ALL);
updateEsVersion(batch, updateFields, IDX_BIZ_EQUIPMENT_INFO);
});
watch.stop();
log.info("处理耗时:{}", watch.getTotalTimeSeconds());
return true;
}
private void updateEsVersion(List<String> batch, Map<String, Object> updateFields, String index) {
BulkRequest bulkRequest = new BulkRequest();
// 构建批量请求
batch.stream()
.map(id -> new UpdateRequest(index, id)
.doc(updateFields, XContentType.JSON))
.forEach(bulkRequest::add);
try {
BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
// 更详细的错误日志记录
if (response.hasFailures()) {
Arrays.stream(response.getItems())
.filter(BulkItemResponse::isFailed)
.forEach(item -> log.error("Failed to update {}: {}",
item.getId(),
item.getFailureMessage()));
}
} catch (IOException e) {
log.error("批量更新版本号失败: batchSize={}, index={}", batch.size(), index, e);
throw new RuntimeException("ES批量更新失败", e);
}
}
private void save2NewEquipEs(List<String> rs, List<Map<String, Object>> details, List<String> waitSync2EquipmentInfoRecords, String batchNo) {
Map<String, Map<String, Object>> recordDetailMap = details.stream().collect(Collectors.toMap(e -> (String) e.get("SEQUENCE_NBR"), Function.identity(), (k1, k2) -> k2));
List<String> equipmentInfoSaveRecords = rs.stream().filter(waitSync2EquipmentInfoRecords::contains).collect(Collectors.toList());
List<ESEquipmentInfo> esEquipmentInfos = getEsEquipmentInfos(equipmentInfoSaveRecords, recordDetailMap);
esEquipmentInfos.forEach(e->e.setVersion(batchNo));
esBulkService.bulkUpsert(IDX_BIZ_EQUIPMENT_INFO, esEquipmentInfos.stream().map(e -> new EsEntity<>(e.getSEQUENCE_NBR(), e)).collect(Collectors.toList()));
}
private void save2OldEquipEs(List<Map<String, Object>> details, List<String> waitSync2JgAllRecords, String batchNo) {
List<Map<String, Object>> jgAllSaves = details.stream().filter(e-> waitSync2JgAllRecords.contains((String) e.get("SEQUENCE_NBR"))).collect(Collectors.toList());
List<ESEquipmentCategoryDto> esEquipmentInfos = jgAllSaves.parallelStream().map(data -> {
ESEquipmentCategoryDto esEquipmentInfo = null;
try {
esEquipmentInfo = new ESEquipmentCategoryDto();
StatisticsDataUpdateService.formatUseDate(data);
BeanUtil.copyProperties(data, esEquipmentInfo, true);
esEquipmentInfo.setVersion(batchNo);
} catch (Exception e) {
log.error("批次{}设备刷数据处理失败", record, e);
}
return esEquipmentInfo;
}).collect(Collectors.toList());
if(!esEquipmentInfos.isEmpty()){
esBulkService.bulkUpsert(IDX_BIZ_VIEW_JG_ALL, esEquipmentInfos.stream().map(e -> new EsEntity<>(e.getSEQUENCE_NBR(), e)).collect(Collectors.toList()));
}
}
}
......@@ -884,7 +884,7 @@ public class DPSubServiceImpl {
}
}
Object body = apiObj.get("body");
ResponseEntity<String> responseEntity = null;
ResponseEntity<String> sresponseEntity = null;
//如果url以/开头,则调用本服务内接口
if (url != null && url.trim().startsWith("/")) {
url = "http://" + GATEWAY_SERVER_NAME + url;
......
......@@ -58,4 +58,6 @@ public interface IdxBizJgUseInfoMapper extends CustomBaseMapper<IdxBizJgUseInfo>
List<String> selectEquips(@Param("equListCode") String equListCode, @Param("equCategoryCode") String equCategoryCode, @Param("orgCode") String orgCode);
List<String> selectUseInfoOfOneVersionWithParams(@Param("version") Integer version,@Param("params") Map<String, Object> params);
List<String> selectRecordList(@Param("shard") Integer shard, @Param("slots") Integer slots);
}
......@@ -324,5 +324,13 @@
AND r."EQU_CATEGORY" = #{equCategoryCode}
</if>
</select>
<select id="selectRecordList" resultType="java.lang.String">
SELECT
"RECORD"
FROM
idx_biz_jg_use_info
WHERE
ABS(MOD(hashtext("RECORD"), #{slots})) = #{shard}
</select>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment