Commit 302e26b8 authored by litengwei's avatar litengwei

省测卡片问题

parent 2274f6e7
...@@ -41,7 +41,7 @@ public class KafkaConsumerService { ...@@ -41,7 +41,7 @@ public class KafkaConsumerService {
* *
* @param message 消息 * @param message 消息
*/ */
@KafkaListener(id = "consumerSingle", idIsGroup = false, topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle(String message, Acknowledgment ack) { public void consumerSingle(String message, Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message); JSONObject messageObj = JSONObject.fromObject(message);
try { try {
...@@ -51,11 +51,12 @@ public class KafkaConsumerService { ...@@ -51,11 +51,12 @@ public class KafkaConsumerService {
topic = messageObj.getString("topic"); topic = messageObj.getString("topic");
data = messageObj.getJSONObject("data"); data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
log.info("消息转发成功" + messageObj.toString());
} }
} catch (Exception e) { } catch (Exception e) {
log.error("消息转发失败" + e.getMessage(), e); log.error("消息转发失败" + e.getMessage(), e);
} }
ack.acknowledge();
} }
@KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory") @KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment