Commit 51b78d3d authored by zhangsen's avatar zhangsen

任务22978

消防泵等事件告警模块问题处理
parent 5424cf2c
......@@ -126,7 +126,7 @@ public class EquipmentIotMqttReceiveConfig {
list.add("+/+/event"); // 添加iot事件监听
list.add("+/+/transmit"); // 添加交换站事件监听
list.add("+/+/perspective"); // 添加交换站事件监听
list.add("+/+/shaoshan"); // 添加换流站韶山监听事件
list.add("+/+/eventAlarm"); // 添加换流站韶山监听事件 --- shaoshan 修改为 eventAlarm:事件告警 - 统一
String[] arr = list.toArray(new String[list.size()]);
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr);
adapter.setCompletionTimeout(completionTimeout);
......@@ -152,7 +152,7 @@ public class EquipmentIotMqttReceiveConfig {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
}else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)){ // 遥信遥测信号
mqttReceiveService.handlerMqttRomaMessage(topic,msg);
}else if (dataType.equals("shaoshan") && StringUtil.isNotEmpty(msg)){ // 告警信号
}else if (dataType.equals("eventAlarm") && StringUtil.isNotEmpty(msg)){ // 告警信号
mqttReceiveService.handlerMqttStationMessage(topic,msg);
}
}
......
package com.yeejoin.amos.message.kafka;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.paho.client.mqttv3.MqttException;
......@@ -22,7 +23,7 @@ import java.util.Optional;
@Service
public class KafkaConsumerService {
private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String MQTT_TOPIC_SHAOSHAN = "romaSite/data/shaoshan";
private static final String MQTT_TOPIC_EVENT_ALARM = "romaSite/data/eventAlarm";
@Autowired
protected EmqKeeper emqKeeper;
......@@ -83,7 +84,42 @@ public class KafkaConsumerService {
try {
JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body");
emqKeeper.getMqttClient().publish(MQTT_TOPIC_SHAOSHAN, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
}
}
}
/**
* 事件告警对接Kafka
* @param record record
* @param ack ack
* groupId = kafkaConsumerGroup
*/
@KafkaListener(id = "kafkaConsumerEventAlarm", groupId = "kafkaConsumerGroupEventAlarm", topics = "#{'${queue.kafka.eventAlarm.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaConsumerEventAlarm(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
try {
JSONObject messageObj = JSONObject.fromObject(record.value());
JSONArray dataArray = messageObj.getJSONArray("data");
JSONArray jsonArray = new JSONArray();
String timestamp = "";
for (Object obj : dataArray) {
JSONObject finallyObj = new JSONObject();
com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
finallyObj.put("eventtextL1", detail.get("description"));
finallyObj.put("pointId", detail.get("astId"));
finallyObj.put("time", detail.get("dateTime"));
jsonArray.add(finallyObj);
timestamp = detail.get("dateTime").toString();
}
JSONObject jsonObjectMessage = new JSONObject();
jsonObjectMessage.put("warns", jsonArray);
jsonObjectMessage.put("timestamp", timestamp);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, jsonObjectMessage.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
......
......@@ -98,3 +98,6 @@ emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq
#自定义Kafka配置对接交换站
queue.kafka.topics=null
kafka.auto-startup=false
#事件告警对接Kafka主题
queue.kafka.eventAlarm.topics=JKXT2BP-GJ-Topic-site
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment