Commit 0e78acd4 authored by 张森's avatar 张森

kafka消息解析后发送mq消息格式错误 bug修改

parent d9444816
......@@ -67,7 +67,7 @@ public class KafkaConsumerService {
try {
JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj.getJSONObject("data")).getBytes("UTF-8"), 0, false);
log.info("接收到Roma消息对象: {}", object);
ack.acknowledge();
} catch (MqttException e) {
......
[
{
"kafkaTopic": "k1",
"kafkaTopic": "T_DC_MQ_REALDATA__guanggu",
"mqTopic": "romaSite/data/transmit",
"data": {
"dataType": "condition.station_psr_id",
"value": "condition.station_psr_id",
"timeStamp": "condition.station_psr_id",
"quality": "condition.station_psr_id",
"scadaId": "condition.station_psr_id",
"key": "condition.station_psr_id",
"disCreate": "condition.station_psr_id",
"name": "condition.station_psr_id"
"dataType": "body.datatype",
"value": "body.value",
"timeStamp": "body.time_stamp",
"quality": "body.quality",
"scadaId": "body.key",
"key": "body.key",
"disCreate": "body.station_psr_id",
"name": "body.name"
}
},
{
"kafkaTopic": "k2",
"kafkaTopic": "T_DC_MQ_ALARM__guanggu",
"mqTopic": "romaSite/data/eventAlarm",
"data": {
"timeStamp": "body.warns.time",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment