Commit 5890a819 authored by KeYong's avatar KeYong

提交fakfa消息体通用模板

parent faa398ec
package com.yeejoin.amos.message.kafka; package com.yeejoin.amos.message.kafka;
import com.yeejoin.amos.message.utils.ClassToJsonUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
...@@ -10,6 +11,8 @@ import org.springframework.kafka.annotation.KafkaListener; ...@@ -10,6 +11,8 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import org.springframework.core.io.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
...@@ -28,10 +31,13 @@ public class KafkaConsumerService { ...@@ -28,10 +31,13 @@ public class KafkaConsumerService {
private static final String MQTT_TOPIC = "romaSite/data/transmit"; private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String PROVINCE_MQTT_TOPIC = "province/data/transport"; private static final String PROVINCE_MQTT_TOPIC = "province/data/transport";
private static final String MQTT_TOPIC_EVENT_ALARM = "romaSite/data/eventAlarm";
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
@Value("${system.zxj}") @Value("${system.zxj}")
private boolean isZxj; private boolean isZxj;
@Value("classpath:/json/commonMessage.json")
private Resource commonMessage;
/** /**
* 批量消费kafka消息 * 批量消费kafka消息
...@@ -106,11 +112,13 @@ public class KafkaConsumerService { ...@@ -106,11 +112,13 @@ public class KafkaConsumerService {
try { try {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) { if (messages.isPresent()) {
JSONObject messageObj = JSONObject.fromObject(record.value()); // JSONObject messageObj = JSONObject.fromObject(record.value());
if (messageObj.getJSONObject(BODY).isEmpty()) { // if (messageObj.getJSONObject(BODY).isEmpty()) {
messageObj.put(DATA_TYPE, STATE); // messageObj.put(DATA_TYPE, STATE);
} // }
emqKeeper.getMqttClient().publish(MQTT_TOPIC, messageObj.toString().getBytes(StandardCharsets.UTF_8), 0, false); JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
} }
} catch (MqttException e) { } catch (MqttException e) {
log.error("换流站转发Kafka消息失败" + e.getMessage(), e); log.error("换流站转发Kafka消息失败" + e.getMessage(), e);
...@@ -119,6 +127,38 @@ public class KafkaConsumerService { ...@@ -119,6 +127,38 @@ public class KafkaConsumerService {
} }
} }
@KafkaListener(id = "kafkaConsumerEventAlarm", groupId = "kafkaConsumerGroupEventAlarm", topics = "#{'${queue.kafka.eventAlarm.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaConsumerEventAlarm(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
try {
// JSONObject messageObj = JSONObject.fromObject(record.value());
// JSONArray dataArray = messageObj.getJSONArray("data");
// JSONArray jsonArray = new JSONArray();
// String timestamp = "";
// for (Object obj : dataArray) {
// JSONObject finallyObj = new JSONObject();
// com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
// finallyObj.put("eventtextL1", detail.get("description"));
// finallyObj.put("pointId", detail.get("astId"));
// finallyObj.put("time", detail.get("dateTime"));
// jsonArray.add(finallyObj);
// timestamp = detail.get("dateTime").toString();
// }
// JSONObject jsonObjectMessage = new JSONObject();
// jsonObjectMessage.put("warns", jsonArray);
// jsonObjectMessage.put("timestamp", timestamp);
JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
}
}
}
///** ///**
// * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} // * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
......
package com.yeejoin.amos.message.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.core.io.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author keyong
* @title: ClassToJsonUtil
* <pre>
* @description: TODO
* </pre>
* @date 2024/4/11 11:28
*/
public class ClassToJsonUtil {
public static String class2json(Object obj, Resource commonMessage, String topic) {
String json;
try {
json = IOUtils.toString(commonMessage.getInputStream(), String.valueOf(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new RuntimeException("获取kafka信息模板失败!");
}
List<Map> listModel = JSONObject.parseArray(json, Map.class);
if (0 < listModel.size()) {
List<Map> mapList = listModel.stream().filter(x -> String.valueOf(x.get("topic")).equalsIgnoreCase(topic)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(mapList)) {
Map<String, Object> map = mapList.get(0);
Map<String, Object> entityObj = JSONObject.parseObject(String.valueOf(map.get("data")), Map.class);
Map<String, Object> sourceMap = JSONObject.parseObject(String.valueOf(obj), Map.class);
for (Map.Entry<String, Object> entry : entityObj.entrySet()) {
String fieldName = entry.getKey();
String fieldValue = String.valueOf(entry.getValue());
// 数据表body里面与外面有相同的key值时,若配置了" _body "标识则取body里面的key值,否则直接取body外面的值
Map<String, Object> bodyMap = JSONObject.parseObject(String.valueOf(sourceMap.get("body")), Map.class);
if (-1 < fieldValue.lastIndexOf("_body")) {
entry.setValue(bodyMap.get(fieldName));
} else {
if (sourceMap.containsKey(fieldValue)) {
entry.setValue(sourceMap.get(fieldValue));
} else if (bodyMap.containsKey(fieldValue)) {
entry.setValue(bodyMap.get(fieldValue));
} else {
entry.setValue("");
}
}
}
return JSON.toJSONString(entityObj);
}
}
return null;
}
}
[
{
"topic": "romaSite/data/transmit",
"data": {
"dataType": "dataType",
"value": "value",
"timeStamp": "time_stamp",
"quality": "quality",
"scadaId": "id",
"key": "key",
"disCreate": "disCreate",
"name": "name"
}
},
{
"topic": "romaSite/data/eventAlarm",
"data": {
"timeStamp": "timeStamp",
"warns": [
{
"eventTextL1": "eventTextL1",
"pointId": "pointId",
"time": "time",
"deviceId": "deviceId",
"eventstatus": "eventstatus"
}
]
}
}
]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment