Commit 5560474f authored by 刘林's avatar 刘林

fix(message): 对接韶山换流站kafka数据

parent 6e20d9bc
...@@ -9,8 +9,6 @@ import org.springframework.kafka.annotation.KafkaListener; ...@@ -9,8 +9,6 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Optional; import java.util.Optional;
...@@ -23,34 +21,36 @@ import java.util.Optional; ...@@ -23,34 +21,36 @@ import java.util.Optional;
@Slf4j @Slf4j
@Service @Service
public class KafkaConsumerService { public class KafkaConsumerService {
private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String MQTT_TOPIC_SHAOSHAN = "romaSite/data/shaoshan";
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
private static final String MQTT_TOPIC = "romaSite/data/transmit";
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
*
* @param message 消息 * @param message 消息
*
*/ */
@KafkaListener(id = "consumerSingle", idIsGroup = false, topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "consumerSingle", idIsGroup = false, topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle(String message,Acknowledgment ack) { public void consumerSingle(String message, Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message); JSONObject messageObj = JSONObject.fromObject(message);
try { try {
String topic = messageObj.getString("topic"); String topic = messageObj.getString("topic");
JSONObject data = messageObj.getJSONObject("data"); JSONObject data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes("UTF-8"), 0,false); emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); log.error("解析数据失败,{}", e.getMessage());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (Exception e) {
// log.info("单条消息 ====> message: {}", message);
} }
// log.info("单条消息 ====> message: {}", message);
ack.acknowledge();
} }
/**
* 绍兴,苏州换流站对接Kafka数据
* @param record record
* @param ack ack
*/
@KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory") @KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) { public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
...@@ -58,17 +58,37 @@ public class KafkaConsumerService { ...@@ -58,17 +58,37 @@ public class KafkaConsumerService {
try { try {
JSONObject messageObj = JSONObject.fromObject(record.value()); JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body"); JSONObject data = messageObj.getJSONObject("body");
if (data.size() == 0){ if (data.isEmpty()) {
data = messageObj; data = messageObj;
data.put("datatype","state"); data.put("datatype", "state");
} }
log.info("接收到Roma消息对象: {}", data); log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
}
}
}
/**
* 韶山换流对接Kafka
* @param record record
* @param ack ack
*/
@KafkaListener(id = "kafkaConsumer", groupId = "kafkaConsumerGroup", topics = "#{'${queue.kafka.shaoshan.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaConsumer(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
try {
JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body");
emqKeeper.getMqttClient().publish(MQTT_TOPIC_SHAOSHAN, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); log.error("解析数据失败,{}", e.getMessage());
} }
} }
ack.acknowledge();
} }
...@@ -115,6 +135,4 @@ public class KafkaConsumerService { ...@@ -115,6 +135,4 @@ public class KafkaConsumerService {
// //手动提交offset // //手动提交offset
// ack.acknowledge(); // ack.acknowledge();
// } // }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment