Commit 4407faea authored by KeYong's avatar KeYong

处理冲突

parent 5a11c635
...@@ -28,7 +28,6 @@ public class KafkaConsumerService { ...@@ -28,7 +28,6 @@ public class KafkaConsumerService {
private static final String MQTT_TOPIC = "romaSite/data/transmit"; private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String MQTT_TOPIC_SHAOSHAN = "romaSite/data/shaoshan"; private static final String MQTT_TOPIC_SHAOSHAN = "romaSite/data/shaoshan";
private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String PROVINCE_MQTT_TOPIC = "province/data/transport"; private static final String PROVINCE_MQTT_TOPIC = "province/data/transport";
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
...@@ -95,6 +94,8 @@ public class KafkaConsumerService { ...@@ -95,6 +94,8 @@ public class KafkaConsumerService {
} finally { } finally {
ack.acknowledge(); ack.acknowledge();
} }
}
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* *
...@@ -123,27 +124,20 @@ public class KafkaConsumerService { ...@@ -123,27 +124,20 @@ public class KafkaConsumerService {
*/ */
@KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory") @KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) { public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) {
try {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) { if (messages.isPresent()) {
JSONObject messageObj = JSONObject.fromObject(record.value()); try {
if (messageObj.getJSONObject(BODY).isEmpty()) { JSONObject messageObj = JSONObject.fromObject(record.value());
messageObj.put(DATA_TYPE, STATE); if (messageObj.getJSONObject(BODY).isEmpty()) {
} messageObj.put(DATA_TYPE, STATE);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, messageObj.toString().getBytes(StandardCharsets.UTF_8), 0, false); }
emqKeeper.getMqttClient().publish(MQTT_TOPIC, messageObj.toString().getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject data = messageObj.getJSONObject("body"); } catch (MqttException e) {
if (data.isEmpty()) { log.error("解析数据失败,{}", e.getMessage());
data = messageObj; } finally {
data.put("datatype", "state"); ack.acknowledge();
} }
log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
} }
}
} }
/** /**
...@@ -163,10 +157,6 @@ public class KafkaConsumerService { ...@@ -163,10 +157,6 @@ public class KafkaConsumerService {
} catch (MqttException e) { } catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage()); log.error("解析数据失败,{}", e.getMessage());
} }
} catch (MqttException e) {
log.error("换流站转发Kafka消息失败" + e.getMessage(), e);
} finally {
ack.acknowledge();
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment