Commit dc7c769e authored by tianbo's avatar tianbo

refactor(安全追溯): 优化消息发送和处理逻辑

- 修改 BizEmqPublisher 中的消息发送逻辑,增加对 MQTT 客户端为空的判断 - 仅发送类型为 update 的检验检测结果入库消息,并使用列表封装消息体 - 优化 SafetyProblemTopicMessage 中的消息处理逻辑,增加异常捕获和日志记录 - 修复消息处理线程的中断逻辑,确保线程可以正确退出
parent 95fed24d
...@@ -2,6 +2,7 @@ package com.yeejoin.amos.boot.module.jg.biz.listener; ...@@ -2,6 +2,7 @@ package com.yeejoin.amos.boot.module.jg.biz.listener;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils; import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
...@@ -74,24 +75,27 @@ public class SafetyProblemTopicMessage extends EmqxListener { ...@@ -74,24 +75,27 @@ public class SafetyProblemTopicMessage extends EmqxListener {
@PostConstruct @PostConstruct
void init() throws Exception { void init() throws Exception {
log.info("订阅安全追溯问题主题:{}", buildTopic(problemEventTopic));
emqKeeper.subscript(buildTopic(problemEventTopic), 2, this); emqKeeper.subscript(buildTopic(problemEventTopic), 2, this);
executorService = Executors.newFixedThreadPool(threadNumber); executorService = Executors.newFixedThreadPool(threadNumber);
for (int i = 0; i < threadNumber; i++) { for (int i = 0; i < threadNumber; i++) {
executorService.execute(() -> { executorService.execute(() -> {
try { try {
while (true) { while (!Thread.currentThread().isInterrupted()) {
SafetyProblemEvent safetyProblemMessageEvent = blockingQueue.take(); SafetyProblemEvent safetyProblemMessageEvent = blockingQueue.take();
JSONArray jsonObject = JSON.parseArray(safetyProblemMessageEvent.getMessage().toString()); try {
log.info("接收到问题生产消息:{}", jsonObject); log.info("接收到问题生产原始消息:{}", safetyProblemMessageEvent.getMessage());
SafetyProblemEventHandler eventHandler = SafetyProblemEventHandlerFactory.createProblemHandler(safetyProblemMessageEvent.getTopic()); JSONArray jsonObject = JSON.parseArray(safetyProblemMessageEvent.getMessage().toString());
eventHandler.handle(safetyProblemMessageEvent); log.info("接收到问题生产消息:{}", jsonObject);
log.info("处理问题生产消息完成"); SafetyProblemEventHandler eventHandler = SafetyProblemEventHandlerFactory.createProblemHandler(safetyProblemMessageEvent.getTopic());
eventHandler.handle(safetyProblemMessageEvent);
log.info("处理问题生产消息完成");
} catch (JSONException | ClassCastException | IllegalArgumentException e) {
log.error("处理消息异常", e);
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("处理线程被中断,准备退出", e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
log.error("处理问题生产消息异常", e);
throw new RuntimeException(e);
} }
}); });
} }
...@@ -149,7 +153,8 @@ public class SafetyProblemTopicMessage extends EmqxListener { ...@@ -149,7 +153,8 @@ public class SafetyProblemTopicMessage extends EmqxListener {
safetyProblemTracing -> new QueryWrapper<>() safetyProblemTracing -> new QueryWrapper<>()
.eq("problem_type_code", safetyProblemTracing.getProblemTypeCode()) .eq("problem_type_code", safetyProblemTracing.getProblemTypeCode())
.eq("source_id", safetyProblemTracing.getSourceId()) .eq("source_id", safetyProblemTracing.getSourceId())
.eq("problem_status_code", SafetyProblemStatusEnum.UNHANDLED.getCode()));} .eq("problem_status_code", SafetyProblemStatusEnum.UNHANDLED.getCode()));
}
} }
public static void generateUnitProblem(JSONArray jsonArray, SafetyProblemTypeEnum problemTypeEnum, SafetyProblemTracingServiceImpl safetyProblemTracingService) { public static void generateUnitProblem(JSONArray jsonArray, SafetyProblemTypeEnum problemTypeEnum, SafetyProblemTracingServiceImpl safetyProblemTracingService) {
...@@ -189,7 +194,8 @@ public class SafetyProblemTopicMessage extends EmqxListener { ...@@ -189,7 +194,8 @@ public class SafetyProblemTopicMessage extends EmqxListener {
safetyProblemTracing -> new QueryWrapper<>() safetyProblemTracing -> new QueryWrapper<>()
.eq("problem_type_code", safetyProblemTracing.getProblemTypeCode()) .eq("problem_type_code", safetyProblemTracing.getProblemTypeCode())
.eq("source_id", safetyProblemTracing.getSourceId()) .eq("source_id", safetyProblemTracing.getSourceId())
.eq("problem_status_code", SafetyProblemStatusEnum.UNHANDLED.getCode()));} .eq("problem_status_code", SafetyProblemStatusEnum.UNHANDLED.getCode()));
}
} }
public static void generatePersonnelProblem(JSONArray jsonArray, SafetyProblemTypeEnum problemTypeEnum, SafetyProblemTracingServiceImpl safetyProblemTracingService) { public static void generatePersonnelProblem(JSONArray jsonArray, SafetyProblemTypeEnum problemTypeEnum, SafetyProblemTracingServiceImpl safetyProblemTracingService) {
......
...@@ -7,6 +7,9 @@ import org.eclipse.paho.client.mqttv3.MqttException; ...@@ -7,6 +7,9 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.util.ArrayList;
import java.util.List;
/** /**
* @author Administrator * @author Administrator
*/ */
...@@ -14,7 +17,7 @@ import org.typroject.tyboot.component.emq.EmqKeeper; ...@@ -14,7 +17,7 @@ import org.typroject.tyboot.component.emq.EmqKeeper;
@Slf4j @Slf4j
public class BizEmqPublisher { public class BizEmqPublisher {
private EmqKeeper emqKeeper; private final EmqKeeper emqKeeper;
public BizEmqPublisher(EmqKeeper emqKeeper) { public BizEmqPublisher(EmqKeeper emqKeeper) {
this.emqKeeper = emqKeeper; this.emqKeeper = emqKeeper;
...@@ -24,10 +27,19 @@ public class BizEmqPublisher { ...@@ -24,10 +27,19 @@ public class BizEmqPublisher {
* 检验检测结果入库消息 * 检验检测结果入库消息
*/ */
private static final String INSPECTION_DETECTION_SAVE_TO_DB_TOPIC_PREFIX = "safetyProblemTracing/jy/bj"; private static final String INSPECTION_DETECTION_SAVE_TO_DB_TOPIC_PREFIX = "safetyProblemTracing/jy/bj";
private static final String INSPECTION_DETECTION_SAVE_TO_DB_TOPIC_TYPE_UPDATE = "update";
public void sendInspectionMsgAfterSave(InspectionDetectionInfo info, String type) { public void sendInspectionMsgAfterSave(InspectionDetectionInfo info, String type) {
try { try {
emqKeeper.getMqttClient().publish(this.buildSave2DbEmqTopic(type), JSON.toJSONBytes(info), 2, false); if (emqKeeper.getMqttClient() == null) {
return;
}
// 安全追溯目前只需要type=update的信息
if (INSPECTION_DETECTION_SAVE_TO_DB_TOPIC_TYPE_UPDATE.equals(type)) {
List<InspectionDetectionInfo> list = new ArrayList<>();
list.add(info);
emqKeeper.getMqttClient().publish(this.buildSave2DbEmqTopic(type), JSON.toJSONBytes(list), 2, false);
}
} catch (MqttException e) { } catch (MqttException e) {
log.error("发送检验检测信息入库消息失败", e); log.error("发送检验检测信息入库消息失败", e);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment