Commit 6869ab04 authored by litengwei's avatar litengwei

消息组件优化

parent 0b2b7c02
...@@ -279,9 +279,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -279,9 +279,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
jsonObject.put("data",message); jsonObject.put("data",message);
try { try {
emqKeeper.getMqttClient().publish("eqm.iot.created",jsonObject.toString().getBytes(),1,false); emqKeeper.getMqttClient().publish("emq.iot.created",jsonObject.toString().getBytes(),1,false);
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
} }
if (!StringUtils.isEmpty(traceId)) { if (!StringUtils.isEmpty(traceId)) {
......
...@@ -136,6 +136,11 @@ public class SignServiceImpl extends BaseService<SignDto,Sign,SignMapper> implem ...@@ -136,6 +136,11 @@ public class SignServiceImpl extends BaseService<SignDto,Sign,SignMapper> implem
SerializerFeature.WriteMapNullValue); SerializerFeature.WriteMapNullValue);
try { try {
emqKeeper.getMqttClient().publish(personSign, json.getBytes(), RuleConfig.DEFAULT_QOS, false); emqKeeper.getMqttClient().publish(personSign, json.getBytes(), RuleConfig.DEFAULT_QOS, false);
// 发送emq消息转kafka
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", personSign);
jsonObject.put("data",json);
emqKeeper.getMqttClient().publish("emq.sign.created",jsonObject.toString().getBytes(),1,false);
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); e.printStackTrace();
} }
......
...@@ -22,5 +22,8 @@ spring.redis.port=6379 ...@@ -22,5 +22,8 @@ spring.redis.port=6379
spring.redis.password=1234560 spring.redis.password=1234560
#需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 #需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
topics=akka.iot.created,akka.patrol.created kafka.topics=null.topic
init.topics=akka.iot.created,akka.patrol.created kafka.init.topics=akka.iot.created,akka.patrol.created
\ No newline at end of file
#需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
emq.topic=eqm.iot.created,eqm.patrol.created
\ No newline at end of file
...@@ -6,7 +6,7 @@ spring.jackson.time-zone=GMT+8 ...@@ -6,7 +6,7 @@ spring.jackson.time-zone=GMT+8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.serialization.write-dates-as-timestamps=true spring.jackson.serialization.write-dates-as-timestamps=true
# kafka集群信息 # kafka集群信息
spring.kafka.bootstrap-servers=127.0.0.1:9092 spring.kafka.bootstrap-servers=172.16.3.100:9092
# 生产者配置 # 生产者配置
# 设置大于0的值,则客户端会将发送失败的记录重新发送 # 重试次数 # 设置大于0的值,则客户端会将发送失败的记录重新发送 # 重试次数
spring.kafka.producer.retries=3 spring.kafka.producer.retries=3
......
[ [
{ {
"code": "iot", "code": "iot",
"emqTopic": "eqm.iot.created", "emqTopic": "emq.iot.created",
"akkaTopic": "akka.iot.created" "akkaTopic": "akka.iot.created"
}, },
{ {
"code": "patrol", "code": "patrol",
"emqTopic": "eqm.patrol.created", "emqTopic": "emq.patrol.created",
"akkaTopic": "akka.patrol.created" "akkaTopic": "akka.patrol.created"
},
{
"code": "sign",
"emqTopic": "emq.sign.created",
"akkaTopic": "akka.sign.created"
} }
] ]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment