Commit f6ffb190 authored by litengwei's avatar litengwei

消息组件优化

parent 5034ddf6
...@@ -43,6 +43,9 @@ public class SignServiceImpl extends BaseService<SignDto,Sign,SignMapper> implem ...@@ -43,6 +43,9 @@ public class SignServiceImpl extends BaseService<SignDto,Sign,SignMapper> implem
@Value("${mqtt.topic.person.sign}") @Value("${mqtt.topic.person.sign}")
private String personSign; private String personSign;
@Value("${mqtt.topic.person.sign.zxj}")
private String personSignZxj;
/** /**
* 分页查询 * 分页查询
*/ */
...@@ -138,11 +141,28 @@ public class SignServiceImpl extends BaseService<SignDto,Sign,SignMapper> implem ...@@ -138,11 +141,28 @@ public class SignServiceImpl extends BaseService<SignDto,Sign,SignMapper> implem
SerializerFeature.WriteMapNullValue); SerializerFeature.WriteMapNullValue);
try { try {
emqKeeper.getMqttClient().publish(personSign, json.getBytes(), RuleConfig.DEFAULT_QOS, false); emqKeeper.getMqttClient().publish(personSign, json.getBytes(), RuleConfig.DEFAULT_QOS, false);
map.put("personOfDay", ObjectUtils.isEmpty(sign.getPersonOfDay())?"":sign.getPersonOfDay());
map.put("type", ObjectUtils.isEmpty(sign.getType())?"":sign.getType());
map.put("photos", ObjectUtils.isEmpty(sign.getPhotos())?"":sign.getPhotos());
map.put("remarks", ObjectUtils.isEmpty(sign.getRemarks())?"":sign.getRemarks());
map.put("date", ObjectUtils.isEmpty(sign.getDate())?"":sign.getDate());
map.put("recDate", ObjectUtils.isEmpty(sign.getRecDate())?"":sign.getRecDate());
map.put("isDelete", ObjectUtils.isEmpty(sign.getIsDelete())?"":sign.getIsDelete());
map.put("source", ObjectUtils.isEmpty(sign.getSource())?"":sign.getSource());
String json1=JSONObject.toJSONString(map, SerializerFeature.PrettyFormat,
SerializerFeature.WriteMapNullValue);
// 发送emq消息转kafka // 发送emq消息转kafka
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", personSign); jsonObject.put("topic", personSign);
jsonObject.put("data",json); jsonObject.put("data",json);
emqKeeper.getMqttClient().publish("emq.sign.created",jsonObject.toString().getBytes(),1,false); emqKeeper.getMqttClient().publish("emq.sign.created",jsonObject.toString().getBytes(),1,false);
// 发送emq消息转kafka 同步业务库打卡表
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("topic", personSignZxj);
jsonObject1.put("data",json1);
emqKeeper.getMqttClient().publish("emq.bussSign.created",jsonObject1.toString().getBytes(),1,false);
} catch (MqttException e) { } catch (MqttException e) {
log.info(String.format("发送eqm打卡消息失败:%s", e.getMessage())); log.info(String.format("发送eqm打卡消息失败:%s", e.getMessage()));
} }
......
...@@ -34,7 +34,7 @@ public class KafkaConsumerService { ...@@ -34,7 +34,7 @@ public class KafkaConsumerService {
String topic = messageObj.getString("topic"); String topic = messageObj.getString("topic");
JSONObject data = messageObj.getJSONObject("data"); JSONObject data = messageObj.getJSONObject("data");
try { try {
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes("UTF-8"), 1,false); emqKeeper.getMqttClient().publish(topic, data.toString().getBytes("UTF-8"), 0,false);
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); e.printStackTrace();
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
......
...@@ -23,7 +23,7 @@ spring.redis.password=1234560 ...@@ -23,7 +23,7 @@ spring.redis.password=1234560
#需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 #需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
kafka.topics=null.topic kafka.topics=null.topic
kafka.init.topics=akka.iot.created,akka.patrol.created kafka.init.topics=akka.iot.created,akka.patrol.created,akka.sign.created,akka.bussSign.created,akka.user.created
#需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 #需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
emq.topic=eqm.iot.created,eqm.patrol.created emq.topic=emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created
\ No newline at end of file \ No newline at end of file
...@@ -13,5 +13,15 @@ ...@@ -13,5 +13,15 @@
"code": "sign", "code": "sign",
"emqTopic": "emq.sign.created", "emqTopic": "emq.sign.created",
"akkaTopic": "akka.sign.created" "akkaTopic": "akka.sign.created"
},
{
"code": "bussSign",
"emqTopic": "emq.bussSign.created",
"akkaTopic": "akka.bussSign.created"
},
{
"code": "user",
"emqTopic": "emq.user.created",
"akkaTopic": "akka.user.created"
} }
] ]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment