Commit 6f2bd136 authored by KeYong's avatar KeYong

修改message服务kafka接科技、继保等平台消息处理逻辑

parent 7982ff55
package com.yeejoin.amos.message.kafka; package com.yeejoin.amos.message.kafka;
import com.yeejoin.amos.message.utils.ClassToJsonUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray; import net.sf.json.JSONArray;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -28,6 +31,9 @@ public class KafkaConsumerService { ...@@ -28,6 +31,9 @@ public class KafkaConsumerService {
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
@Value("classpath:/json/commonMessage.json")
private Resource commonMessage;
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* *
...@@ -57,14 +63,16 @@ public class KafkaConsumerService { ...@@ -57,14 +63,16 @@ public class KafkaConsumerService {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) { if (messages.isPresent()) {
try { try {
JSONObject messageObj = JSONObject.fromObject(record.value()); JSONObject object = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body"); String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
if (data.isEmpty()) { // JSONObject messageObj = JSONObject.fromObject(record.value());
data = messageObj; // JSONObject data = messageObj.getJSONObject("body");
data.put("datatype", "state"); // if (data.isEmpty()) {
} // data = messageObj;
log.info("接收到Roma消息对象: {}", data); // data.put("datatype", "state");
emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); // }
log.info("接收到Roma消息对象: {}", object);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge(); ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage()); log.error("解析数据失败,{}", e.getMessage());
...@@ -82,9 +90,11 @@ public class KafkaConsumerService { ...@@ -82,9 +90,11 @@ public class KafkaConsumerService {
Optional<?> message = Optional.ofNullable(record.value()); Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) { if (message.isPresent()) {
try { try {
JSONObject messageObj = JSONObject.fromObject(record.value()); // JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body"); // JSONObject data = messageObj.getJSONObject("body");
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge(); ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage()); log.error("解析数据失败,{}", e.getMessage());
...@@ -106,23 +116,26 @@ public class KafkaConsumerService { ...@@ -106,23 +116,26 @@ public class KafkaConsumerService {
Optional<?> message = Optional.ofNullable(record.value()); Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) { if (message.isPresent()) {
try { try {
JSONObject messageObj = JSONObject.fromObject(record.value()); // JSONObject messageObj = JSONObject.fromObject(record.value());
JSONArray dataArray = messageObj.getJSONArray("data"); // JSONArray dataArray = messageObj.getJSONArray("data");
JSONArray jsonArray = new JSONArray(); // JSONArray jsonArray = new JSONArray();
String timestamp = ""; // String timestamp = "";
for (Object obj : dataArray) { // for (Object obj : dataArray) {
JSONObject finallyObj = new JSONObject(); // JSONObject finallyObj = new JSONObject();
com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj)); // com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
finallyObj.put("eventtextL1", detail.get("description")); // finallyObj.put("eventtextL1", detail.get("description"));
finallyObj.put("pointId", detail.get("astId")); // finallyObj.put("pointId", detail.get("astId"));
finallyObj.put("time", detail.get("dateTime")); // finallyObj.put("time", detail.get("dateTime"));
jsonArray.add(finallyObj); // jsonArray.add(finallyObj);
timestamp = detail.get("dateTime").toString(); // timestamp = detail.get("dateTime").toString();
} // }
JSONObject jsonObjectMessage = new JSONObject(); // JSONObject jsonObjectMessage = new JSONObject();
jsonObjectMessage.put("warns", jsonArray); // jsonObjectMessage.put("warns", jsonArray);
jsonObjectMessage.put("timestamp", timestamp); // jsonObjectMessage.put("timestamp", timestamp);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, jsonObjectMessage.toString().getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge(); ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage()); log.error("解析数据失败,{}", e.getMessage());
......
package com.yeejoin.amos.message.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.core.io.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author keyong
* @title: ClassToJsonUtil
* <pre>
* @description: TODO
* </pre>
* @date 2024/4/11 11:28
*/
public class ClassToJsonUtil {
public static String class2json(Object obj, Resource commonMessage, String topic) {
String json;
try {
json = IOUtils.toString(commonMessage.getInputStream(), String.valueOf(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new RuntimeException("获取kafka信息模板失败!");
}
List<Map> listModel = JSONObject.parseArray(json, Map.class);
if (0 < listModel.size()) {
List<Map> mapList = listModel.stream().filter(x -> String.valueOf(x.get("topic")).equalsIgnoreCase(topic)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(mapList)) {
Map<String, Object> map = mapList.get(0);
Map<String, Object> entityObj = JSONObject.parseObject(String.valueOf(map.get("data")), Map.class);
Map<String, Object> sourceMap = JSONObject.parseObject(String.valueOf(obj), Map.class);
for (Map.Entry<String, Object> entry : entityObj.entrySet()) {
String fieldName = entry.getKey();
String fieldValue = String.valueOf(entry.getValue());
// 数据表body里面与外面有相同的key值时,若配置了" _body "标识则取body里面的key值,否则直接取body外面的值
Map<String, Object> bodyMap = JSONObject.parseObject(String.valueOf(sourceMap.get("body")), Map.class);
if (-1 < fieldValue.lastIndexOf("_body")) {
entry.setValue(bodyMap.get(fieldName));
} else {
if (sourceMap.containsKey(fieldValue)) {
entry.setValue(sourceMap.get(fieldValue));
} else if (bodyMap.containsKey(fieldValue)) {
entry.setValue(bodyMap.get(fieldValue));
} else {
entry.setValue("");
}
}
}
return JSON.toJSONString(entityObj);
}
}
return null;
}
}
[
{
"topic": "AAA",
"data": {
"dataType": "dataType",
"value": "value",
"timeStamp": "time_stamp",
"quality": "quality",
"scadaId": "id",
"key": "key",
"disCreate": "disCreate",
"name": "name"
}
},
{
"topic": "BBB",
"data": {
"timeStamp": "timeStamp",
"warns": [
{
"eventTextL1": "eventTextL1",
"pointId": "pointId",
"time": "time",
"deviceId": "deviceId",
"eventstatus": "eventstatus"
}
]
}
}
]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment