Commit a2d01ed5 authored by tianyiming's avatar tianyiming

refactor: 增加批量处理逻辑以优化设备状态更新和消息发送

parent 5983dd6b
...@@ -143,18 +143,22 @@ public class SafetyProblemTracingGenServiceImpl{ ...@@ -143,18 +143,22 @@ public class SafetyProblemTracingGenServiceImpl{
return; return;
} }
List<String> records = mapList.stream().map(m -> m.get("RECORD").toString()).collect(Collectors.toList()); List<String> records = mapList.stream().map(m -> m.get("RECORD").toString()).collect(Collectors.toList());
// 更新设备状态为未处理(异常) int batchSize = 10000;
idxBizJgOtherInfoService.lambdaUpdate() for (int i = 0; i < records.size(); i += batchSize) {
.in(IdxBizJgOtherInfo::getRecord, records) List<String> batch = records.subList(i, Math.min(i + batchSize, records.size()));
.set(IdxBizJgOtherInfo::getStatus, SafetyProblemStatusEnum.UNHANDLED.getCode()) // 更新设备状态为未处理(异常)
.update(); idxBizJgOtherInfoService.lambdaUpdate()
List<ESEquipmentCategoryDto> esEquipmentCategoryDto = Lists.newArrayList(); .in(IdxBizJgOtherInfo::getRecord, batch)
for (ESEquipmentCategoryDto equipmentCategoryDto : esEquipmentCategory.findAllById(records)) { .set(IdxBizJgOtherInfo::getStatus, SafetyProblemStatusEnum.UNHANDLED.getCode())
equipmentCategoryDto.setProblemStatus(SafetyProblemStatusEnum.UNHANDLED.getCode()); .update();
esEquipmentCategoryDto.add(equipmentCategoryDto); List<ESEquipmentCategoryDto> esEquipmentCategoryDto = Lists.newArrayList();
for (ESEquipmentCategoryDto equipmentCategoryDto : esEquipmentCategory.findAllById(batch)) {
equipmentCategoryDto.setProblemStatus(SafetyProblemStatusEnum.UNHANDLED.getCode());
esEquipmentCategoryDto.add(equipmentCategoryDto);
}
esEquipmentCategory.saveAll(esEquipmentCategoryDto);
sendSafetyProblemMessage(mapList, safetyProblemTypeEnum);
} }
esEquipmentCategory.saveAll(esEquipmentCategoryDto);
sendSafetyProblemMessage(mapList, safetyProblemTypeEnum);
} }
private void updateEnterpriseAndSendMessage(List<Map<String, Object>> mapList) { private void updateEnterpriseAndSendMessage(List<Map<String, Object>> mapList) {
...@@ -191,21 +195,16 @@ public class SafetyProblemTracingGenServiceImpl{ ...@@ -191,21 +195,16 @@ public class SafetyProblemTracingGenServiceImpl{
* @param safetyProblemTypeEnum * @param safetyProblemTypeEnum
*/ */
private void sendSafetyProblemMessage(List<Map<String, Object>> mapList, SafetyProblemTypeEnum safetyProblemTypeEnum) { private void sendSafetyProblemMessage(List<Map<String, Object>> mapList, SafetyProblemTypeEnum safetyProblemTypeEnum) {
int batchSize = 10000; if (CollectionUtil.isNotEmpty(mapList)){
for (int i = 0; i < mapList.size(); i += batchSize) { try {
List<Map<String, Object>> batch = mapList.subList(i, Math.min(i + batchSize, mapList.size())); logger.info("发送安全追溯问题主题---->{}", safetyProblemTypeEnum.getTopic());
if (CollectionUtil.isNotEmpty(mapList)){ emqKeeper.getMqttClient().publish(safetyProblemTypeEnum.getTopic(), JSON.toJSONBytes(mapList), 2, false);
try { logger.info("发送安全追溯问题消息成功---->");
logger.info("发送安全追溯问题主题---->{}", safetyProblemTypeEnum.getTopic()); } catch (MqttException e) {
emqKeeper.getMqttClient().publish(safetyProblemTypeEnum.getTopic(), JSON.toJSONBytes(batch), 2, false); logger.error("发送安全追溯问题设备信息消息失败---->{}", e.getMessage());
logger.info("发送安全追溯问题消息成功---->"); throw new RuntimeException(e);
} catch (MqttException e) {
logger.error("发送安全追溯问题设备信息消息失败---->{}", e.getMessage());
throw new RuntimeException(e);
}
} }
} }
} }
// @Scheduled(cron = "0 0 1 * * ?") // @Scheduled(cron = "0 0 1 * * ?")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment