Commit dd41da46 authored by 刘林's avatar 刘林

fix(message):添加省级消息转发

parent a32d0d83
package com.yeejoin.amos.message.kafka; package com.yeejoin.amos.message.kafka;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -9,9 +11,8 @@ import org.springframework.kafka.annotation.KafkaListener; ...@@ -9,9 +11,8 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Optional; import java.util.Optional;
/** /**
...@@ -24,30 +25,27 @@ import java.util.Optional; ...@@ -24,30 +25,27 @@ import java.util.Optional;
@Service @Service
public class KafkaConsumerService { public class KafkaConsumerService {
private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String PROVINCE_MQTT_TOPIC = "province/data/transport";
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
private static final String MQTT_TOPIC = "romaSite/data/transmit";
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
*
* @param message 消息 * @param message 消息
*/ */
@KafkaListener(id = "consumerSingle", idIsGroup = false, topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "consumerSingle", idIsGroup = false, topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle(String message,Acknowledgment ack) { public void consumerSingle(String message, Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message); JSONObject messageObj = JSONObject.fromObject(message);
try { try {
String topic = messageObj.getString("topic"); String topic = messageObj.getString("topic");
JSONObject data = messageObj.getJSONObject("data"); JSONObject data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes("UTF-8"), 0,false); emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} catch (MqttException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (Exception e) { } catch (Exception e) {
// log.info("单条消息 ====> message: {}", message); log.error("消息转发失败" + e.getMessage(), e);
} }
// log.info("单条消息 ====> message: {}", message);
ack.acknowledge(); ack.acknowledge();
} }
...@@ -58,17 +56,46 @@ public class KafkaConsumerService { ...@@ -58,17 +56,46 @@ public class KafkaConsumerService {
try { try {
JSONObject messageObj = JSONObject.fromObject(record.value()); JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body"); JSONObject data = messageObj.getJSONObject("body");
if (data.size() == 0){ if (data.isEmpty()) {
data = messageObj; data = messageObj;
data.put("datatype","state"); data.put("datatype", "state");
} }
log.info("接收到Roma消息对象: {}", data); //log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); log.error("消息转发失败" + e.getMessage(), e);
ack.acknowledge();
}
}
}
/**
* 省级消息转发
* @param message 省级消息
* @param ack ack
*/
@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle1(String message, Acknowledgment ack) {
Optional<?> messages = Optional.ofNullable(message);
if (messages.isPresent()) {
try {
JSONObject jsonObject = JSONObject.fromObject(message);
String type = jsonObject.optString("type");
String table = jsonObject.optString("table");
if (StringUtils.isNoneEmpty(type, table)) {
if (Arrays.asList("INSERT", "UPDATE").contains(type)) {
JSONArray array = jsonObject.getJSONArray("data");
JSONObject data = (JSONObject)array.get(0);
data.put("type", type);
data.put("table", table);
emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
}
}
} catch (MqttException e) {
log.error("消息转发失败" + e.getMessage(), e);
ack.acknowledge();
} }
} }
ack.acknowledge();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment