Commit 2ef2c20e authored by KeYong's avatar KeYong

更新message服务消息处理逻辑

parent 5890a819
package com.yeejoin.amos.message.kafka;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.message.utils.ClassToJsonUtil;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
......@@ -13,6 +14,8 @@ import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.springframework.core.io.Resource;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
......@@ -116,12 +119,18 @@ public class KafkaConsumerService {
// if (messageObj.getJSONObject(BODY).isEmpty()) {
// messageObj.put(DATA_TYPE, STATE);
// }
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
}
} catch (MqttException e) {
log.error("换流站转发Kafka消息失败" + e.getMessage(), e);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} finally {
ack.acknowledge();
}
......@@ -149,12 +158,18 @@ public class KafkaConsumerService {
// jsonObjectMessage.put("warns", jsonArray);
// jsonObjectMessage.put("timestamp", timestamp);
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
......
package com.yeejoin.amos.message.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.io.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
/**
......@@ -22,8 +24,9 @@ import java.util.stream.Collectors;
*/
public class ClassToJsonUtil {
public static String class2json(Object obj, Resource commonMessage, String topic) {
private static Map<String, Object> map = new HashMap<>();
public static JSONObject class2json(Object obj, Resource commonMessage, String topic) {
String json;
try {
json = IOUtils.toString(commonMessage.getInputStream(), String.valueOf(StandardCharsets.UTF_8));
......@@ -31,34 +34,97 @@ public class ClassToJsonUtil {
throw new RuntimeException("获取kafka信息模板失败!");
}
List<Map> listModel = JSONObject.parseArray(json, Map.class);
if (0 < listModel.size()) {
List<Map> mapList = listModel.stream().filter(x -> String.valueOf(x.get("topic")).equalsIgnoreCase(topic)).collect(Collectors.toList());
List<Map> mapList = listModel.stream().filter(x -> String.valueOf(x.get("kafkaTopic")).equalsIgnoreCase(topic)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(mapList)) {
Map<String, Object> map = mapList.get(0);
Map<String, Object> entityObj = JSONObject.parseObject(String.valueOf(map.get("data")), Map.class);
Map<String, Object> sourceMap = JSONObject.parseObject(String.valueOf(obj), Map.class);
for (Map.Entry<String, Object> entry : entityObj.entrySet()) {
String fieldName = entry.getKey();
String fieldValue = String.valueOf(entry.getValue());
// 数据表body里面与外面有相同的key值时,若配置了" _body "标识则取body里面的key值,否则直接取body外面的值
Map<String, Object> bodyMap = JSONObject.parseObject(String.valueOf(sourceMap.get("body")), Map.class);
if (-1 < fieldValue.lastIndexOf("_body")) {
entry.setValue(bodyMap.get(fieldName));
Map<String, Object> map1 = mapList.get(0);
JSONObject object = JSON.parseObject(String.valueOf(obj));
analysisJson(object, "");
JSONObject res = analyseJson(JSON.toJSONString(map1), map);
res.put("kafkaTopic", map1.get("kafkaTopic"));
res.put("mqTopic", map1.get("mqTopic"));
return res;
}
}
return null;
}
private static JSONObject analysisJson(Object objJson, String flag) {
if (objJson instanceof JSONArray) {//如果obj为json数组
JSONArray objArray = (JSONArray) objJson;
for (int i = 0; i < objArray.size(); i++) {
analysisJson(objArray.get(i), flag);
}
} else if (objJson instanceof JSONObject) {//如果为json对象
JSONObject jsonObject = (JSONObject) objJson;
Iterator it = jsonObject.keySet().iterator();
while (it.hasNext()) {
String key = it.next().toString();
Object object = jsonObject.get(key);
//如果得到的是数组
if (object instanceof JSONArray) {
JSONArray objArray = (JSONArray) object;
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
if (sourceMap.containsKey(fieldValue)) {
entry.setValue(sourceMap.get(fieldValue));
} else if (bodyMap.containsKey(fieldValue)) {
entry.setValue(bodyMap.get(fieldValue));
path = key;
}
analysisJson(objArray, path);
} else if (object instanceof JSONObject) {//如果key中是一个json对象
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
entry.setValue("");
path = key;
}
analysisJson((JSONObject) object, path);
} else {//如果key中是其他
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
path = key;
}
// System.out.println(path+":"+object.toString()+" ");
map.put(path, String.valueOf(object));
}
return JSON.toJSONString(entityObj);
}
} else {//如果key中是其他
// System.out.println(flag+":"+objJson.toString()+" ");
map.put(flag, String.valueOf(objJson));
}
return null;
return JSONObject.parseObject(JSON.toJSONString(map));
}
public static JSONObject analyseJson(String jsonData, Map<String, Object> keyMap) {
SortedMap<String, Object> map = new TreeMap<>();
JSONObject jsonObject = JSON.parseObject(jsonData);
for (String key : jsonObject.keySet()) {
String resKey = keyMap.get(key) == null ? key : String.valueOf(keyMap.get(key));
Object value = jsonObject.get(key);
if (value instanceof JSONArray) {
JSONArray jsonArray = new JSONArray(new LinkedList<>());
JSONArray array = jsonObject.getJSONArray(key);
for (int i = 0; i < array.size(); i++) {
Object object = array.get(i);
if (object instanceof String) {
map.put(resKey, array);
} else {
JSONObject sortJson = analyseJson(String.valueOf(object), keyMap);
jsonArray.add(sortJson);
map.put(resKey, jsonArray);
}
}
} else if (value instanceof JSONObject) {
JSONObject sortJson = analyseJson(String.valueOf(value), keyMap);
map.put(resKey, sortJson);
} else {
map.put(resKey, ObjectUtils.isNotEmpty(keyMap.get(value)) ? keyMap.get(value) : "");
}
}
return new JSONObject(map);
}
}
[
{
"topic": "romaSite/data/transmit",
"kafkaTopic": "k1",
"mqTopic": "romaSite/data/transmit",
"data": {
"dataType": "dataType",
"value": "value",
"timeStamp": "time_stamp",
"quality": "quality",
"scadaId": "id",
"key": "key",
"disCreate": "disCreate",
"name": "name"
"dataType": "condition.station_psr_id",
"value": "condition.station_psr_id",
"timeStamp": "condition.station_psr_id",
"quality": "condition.station_psr_id",
"scadaId": "condition.station_psr_id",
"key": "condition.station_psr_id",
"disCreate": "condition.station_psr_id",
"name": "condition.station_psr_id"
}
},
{
"topic": "romaSite/data/eventAlarm",
"kafkaTopic": "k2",
"mqTopic": "romaSite/data/eventAlarm",
"data": {
"timeStamp": "timeStamp",
"timeStamp": "body.warns.time",
"warns": [
{
"eventTextL1": "eventTextL1",
"pointId": "pointId",
"time": "time",
"eventTextL1": "body.warns.systemid",
"pointId": "body.warns.pointId",
"time": "body.warns.type",
"deviceId": "deviceId",
"eventstatus": "eventstatus"
"eventstatus": "body.warns.content"
}
]
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment