Commit 5538a8ef authored by 刘林's avatar 刘林

fix(message):修改message转发kafka消息到emq为批量消息

parent 502e67e9
...@@ -7,4 +7,12 @@ package com.yeejoin.amos.message.kafka; ...@@ -7,4 +7,12 @@ package com.yeejoin.amos.message.kafka;
public interface Constant { public interface Constant {
String INSERT = "INSERT"; String INSERT = "INSERT";
String UPDATE = "UPDATE"; String UPDATE = "UPDATE";
String DATA = "data";
String TOPIC = "topic";
String TABLE = "table";
String TYPE = "type";
String DB_TYPE = "dbType";
String BODY = "body";
String DATA_TYPE = "datatype";
String STATE = "state";
} }
package com.yeejoin.amos.message.kafka; package com.yeejoin.amos.message.kafka;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -14,7 +12,9 @@ import org.springframework.stereotype.Service; ...@@ -14,7 +12,9 @@ import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import static com.yeejoin.amos.message.kafka.Constant.*;
/** /**
* kafka 消费服务 * kafka 消费服务
...@@ -28,89 +28,155 @@ public class KafkaConsumerService { ...@@ -28,89 +28,155 @@ public class KafkaConsumerService {
private static final String MQTT_TOPIC = "romaSite/data/transmit"; private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String PROVINCE_MQTT_TOPIC = "province/data/transport"; private static final String PROVINCE_MQTT_TOPIC = "province/data/transport";
@Value("${system.zxj}")
private boolean isZxj;
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
@Value("${system.zxj}")
private boolean isZxj;
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 批量消费kafka消息
* Kafka消息转emq
* *
* @param message 消息 * @param consumerRecords messages
* @param ack ack
*/ */
@KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}")
public void consumerSingle(String message, Acknowledgment ack) { public void listen1(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message);
try { try {
String topic = null; for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
JSONObject data = null; Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (messageObj.has("topic")) { if (kafkaMessage.isPresent()) {
topic = messageObj.getString("topic"); JSONObject messageObj = JSONObject.fromObject(kafkaMessage.get());
data = messageObj.getJSONObject("data"); if (messageObj.has(TOPIC)) {
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(messageObj.optString(TOPIC), messageObj.getJSONObject(DATA).toString()
ack.acknowledge(); .getBytes(StandardCharsets.UTF_8), 0, false);
log.info("消息转发成功" + messageObj.toString()); }
log.info("kafka消费zhTestGroup消息{}", messageObj);
}
} }
} catch (Exception e) { } catch (Exception e) {
log.error("消息转发失败" + e.getMessage(), e); log.error("kafka失败,当前失败的批次: data:{}", consumerRecords);
} finally {
ack.acknowledge();
} }
} }
@KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory") /**
public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) { * 批量消费kafka消息
Optional<?> messages = Optional.ofNullable(record.value()); * 监听数据表变化kafka数据转发emq
if (messages.isPresent()) { *
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}")
public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try { try {
JSONObject messageObj = JSONObject.fromObject(record.value()); if (isZxj) {
JSONObject data = messageObj.getJSONObject("body"); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (data.isEmpty()) { Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
data = messageObj; if (kafkaMessage.isPresent()) {
data.put("datatype", "state"); JSONObject messageObj = JSONObject.fromObject(kafkaMessage.get());
String type = messageObj.optString(TYPE);
String table = messageObj.optString(TABLE);
if (Arrays.asList(INSERT, UPDATE).contains(type) && DBTableTypeEnum.have(table) != null) {
JSONObject data = (JSONObject) messageObj.getJSONArray(DATA).get(0);
data.put(DB_TYPE, type);
data.put(TABLE, table);
emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
log.info("kafka消费province消息{}", messageObj);
}
}
} }
emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} catch (MqttException e) {
log.error("消息转发失败" + e.getMessage(), e);
} }
} catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}", consumerRecords);
} finally {
ack.acknowledge(); ack.acknowledge();
} }
} }
/** /**
* 省级消息转发 * 转发苏州,绍兴换流站Kafka数据对emq
* *
* @param message 省级消息 * @param record record
* @param ack ack * @param ack ack
*/ */
@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void consumerSingle1(String message, Acknowledgment ack) { public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) {
log.info("省级消息转发打印" + message);
if(isZxj) {
Optional<?> messages = Optional.ofNullable(message);
if (messages.isPresent()) {
try { try {
JSONObject jsonObject = JSONObject.fromObject(message); Optional<?> messages = Optional.ofNullable(record.value());
String type = jsonObject.optString("type"); if (messages.isPresent()) {
String table = jsonObject.optString("table"); JSONObject messageObj = JSONObject.fromObject(record.value());
if (Arrays.asList(Constant.INSERT, Constant.UPDATE).contains(type) && DBTableTypeEnum.have(table) != null) { if (messageObj.getJSONObject(BODY).isEmpty()) {
if (Arrays.asList("INSERT", "UPDATE").contains(type)) { messageObj.put(DATA_TYPE, STATE);
JSONArray array = jsonObject.getJSONArray("data");
JSONObject data = (JSONObject)array.get(0);
data.put("dbType", type);
data.put("table", table);
emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
log.info("省级消息: {}", data);
} }
emqKeeper.getMqttClient().publish(MQTT_TOPIC, messageObj.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} }
} catch (MqttException e) { } catch (MqttException e) {
log.error("消息转发失败" + e.getMessage(), e); log.error("换流站转发Kafka消息失败" + e.getMessage(), e);
} } finally {
ack.acknowledge(); ack.acknowledge();
} }
} }
}
///**
// * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
// *
// * @param message 消息
// */
//@KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
//public void consumerSingle(String message, Acknowledgment ack) {
// JSONObject messageObj = JSONObject.fromObject(message);
// try {
// String topic = null;
// JSONObject data = null;
// if (messageObj.has("topic")) {
// topic = messageObj.getString("topic");
// data = messageObj.getJSONObject("data");
// emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
// ack.acknowledge();
// log.info("消息转发成功" + messageObj);
// }
// } catch (Exception e) {
// log.error("消息转发失败" + e.getMessage(), e);
// }
//}
//
///**
// * 省级消息转发
// *
// * @param message 省级消息
// * @param ack ack
// */
//@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
//public void consumerSingle1(String message, Acknowledgment ack) {
// log.info("省级消息转发打印" + message);
// if(isZxj) {
// Optional<?> messages = Optional.ofNullable(message);
// if (messages.isPresent()) {
// try {
// JSONObject jsonObject = JSONObject.fromObject(message);
// String type = jsonObject.optString("type");
// String table = jsonObject.optString("table");
// if (Arrays.asList(Constant.INSERT, Constant.UPDATE).contains(type) && DBTableTypeEnum.have(table) != null) {
// if (Arrays.asList("INSERT", "UPDATE").contains(type)) {
// JSONArray array = jsonObject.getJSONArray("data");
// JSONObject data = (JSONObject)array.get(0);
// data.put("dbType", type);
// data.put("table", table);
// emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
// log.info("省级消息: {}", data);
// }
// }
// } catch (MqttException e) {
// log.error("消息转发失败" + e.getMessage(), e);
// }
// ack.acknowledge();
// }
// }
//}
/* @KafkaListener(id = "consumerBatch", topicPartitions = { /* @KafkaListener(id = "consumerBatch", topicPartitions = {
...@@ -147,15 +213,15 @@ public class KafkaConsumerService { ...@@ -147,15 +213,15 @@ public class KafkaConsumerService {
// //
// //
// //
// //kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup" //kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
// @KafkaListener(topics = "test", groupId = "zhTestGroup") //@KafkaListener(topics = "test", groupId = "zhTestGroup")
// public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { //public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
// String value = record.value(); // String value = record.value();
// System.out.println(value); // System.out.println(value);
// System.out.println(record); // System.out.println(record);
// //手动提交offset // //手动提交offset
// ack.acknowledge(); // ack.acknowledge();
// } //}
} }
...@@ -67,7 +67,9 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S ...@@ -67,7 +67,9 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S
# \u624B\u52A8\u8C03\u7528Acknowledgment.acknowledge()\u540E\u7ACB\u5373\u63D0\u4EA4\uFF0C\u4E00\u822C\u4F7F\u7528\u8FD9\u79CD # \u624B\u52A8\u8C03\u7528Acknowledgment.acknowledge()\u540E\u7ACB\u5373\u63D0\u4EA4\uFF0C\u4E00\u822C\u4F7F\u7528\u8FD9\u79CD
# MANUAL_IMMEDIATE # MANUAL_IMMEDIATE
spring.kafka.listener.ack-mode=manual_immediate spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.fetch-max-wait= 1000
management.health.redis.enabled=false management.health.redis.enabled=false
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment