Commit 166b2fa0 authored by litengwei's avatar litengwei

messgae修改

parent 1a6bb8a2
package com.yeejoin.amos.message.kafka;
import com.alibaba.excel.metadata.Sheet;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.message.utils.ClassToJsonUtil;
import lombok.extern.slf4j.Slf4j;
......@@ -162,7 +163,7 @@ public class KafkaConsumerService {
JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 2, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
......@@ -258,7 +259,7 @@ public class KafkaConsumerService {
if (messageObj.has("topic")) {
topic = messageObj.getString("topic");
data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 2, false);
ack.acknowledge();
log.info("消息转发成功" + messageObj);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment