Commit 0d564a04 authored by litengwei's avatar litengwei

message消费者类修改

parent bbbe8331
......@@ -42,33 +42,33 @@ public class KafkaConsumerService {
@Value("classpath:/json/commonMessage.json")
private Resource commonMessage;
/**
* 批量消费kafka消息
* Kafka消息转emq
*
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}")
public void listen1(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
JSONObject messageObj = JSONObject.fromObject(kafkaMessage.get());
if (messageObj.has(TOPIC)) {
emqKeeper.getMqttClient().publish(messageObj.optString(TOPIC), messageObj.getJSONObject(DATA).toString()
.getBytes(StandardCharsets.UTF_8), 0, false);
}
log.info("kafka消费zhTestGroup消息{}", messageObj);
}
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecords, e);
} finally {
ack.acknowledge();
}
}
// /**
// * 批量消费kafka消息
// * Kafka消息转emq
// *
// * @param consumerRecords messages
// * @param ack ack
// */
// @KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}")
// public void listen1(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
// try {
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
// if (kafkaMessage.isPresent()) {
// JSONObject messageObj = JSONObject.fromObject(kafkaMessage.get());
// if (messageObj.has(TOPIC)) {
// emqKeeper.getMqttClient().publish(messageObj.optString(TOPIC), messageObj.getJSONObject(DATA).toString()
// .getBytes(StandardCharsets.UTF_8), 0, false);
// }
// log.info("kafka消费zhTestGroup消息{}", messageObj);
// }
// }
// } catch (Exception e) {
// log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecords, e);
// } finally {
// ack.acknowledge();
// }
// }
/**
* 批量消费kafka消息
......@@ -175,29 +175,29 @@ public class KafkaConsumerService {
}
///**
// * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
// *
// * @param message 消息
// */
//@KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
//public void consumerSingle(String message, Acknowledgment ack) {
// JSONObject messageObj = JSONObject.fromObject(message);
// try {
// String topic = null;
// JSONObject data = null;
// if (messageObj.has("topic")) {
// topic = messageObj.getString("topic");
// data = messageObj.getJSONObject("data");
// emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
// ack.acknowledge();
// log.info("消息转发成功" + messageObj);
// }
// } catch (Exception e) {
// log.error("消息转发失败" + e.getMessage(), e);
// }
//}
//
/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
*
* @param message 消息
*/
@KafkaListener(id = "consumerSingle", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle(String message, Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message);
try {
String topic = null;
JSONObject data = null;
if (messageObj.has("topic")) {
topic = messageObj.getString("topic");
data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
log.info("消息转发成功" + messageObj);
}
} catch (Exception e) {
log.error("消息转发失败" + e.getMessage(), e);
}
}
///**
// * 省级消息转发
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment