Commit a5a61190 authored by litengwei's avatar litengwei

巡检bug

parent 66cc3427
...@@ -28,17 +28,19 @@ public class KafkaConsumerService { ...@@ -28,17 +28,19 @@ public class KafkaConsumerService {
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息 * @param message 消息
*/ */
@KafkaListener(id = "consumerSingle", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "consumerSingle", idIsGroup = false, topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle(String message,Acknowledgment ack) { public void consumerSingle(String message,Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message); JSONObject messageObj = JSONObject.fromObject(message);
String topic = messageObj.getString("topic");
JSONObject data = messageObj.getJSONObject("data");
try { try {
String topic = messageObj.getString("topic");
JSONObject data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes("UTF-8"), 0,false); emqKeeper.getMqttClient().publish(topic, data.toString().getBytes("UTF-8"), 0,false);
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); e.printStackTrace();
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
e.printStackTrace(); e.printStackTrace();
} catch (Exception e) {
log.info("单条消息 ====> message: {}", message);
} }
log.info("单条消息 ====> message: {}", message); log.info("单条消息 ====> message: {}", message);
ack.acknowledge(); ack.acknowledge();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment