Commit cc41ebf1 authored by litengwei's avatar litengwei

emq连接修改

parent bad77190
...@@ -62,6 +62,6 @@ public class ApplicationRunnerImpl implements ApplicationRunner { ...@@ -62,6 +62,6 @@ public class ApplicationRunnerImpl implements ApplicationRunner {
iSourceStatistics.initAllCategoryStatisticsData(SourceTypeEnum.IOT); iSourceStatistics.initAllCategoryStatisticsData(SourceTypeEnum.IOT);
maintenanceResourceDataService.subscribeTopic(); maintenanceResourceDataService.subscribeTopic();
emqKeeper.getMqttClient().subscribe(ConfigPageTopicEnum.INTEGRATE.getTopic(), 2, integratePageDataListener); emqKeeper.subscript(ConfigPageTopicEnum.INTEGRATE.getTopic(), 2, integratePageDataListener);
} }
} }
...@@ -6,6 +6,7 @@ import com.yeejoin.amos.boot.module.jcs.api.dto.AlertNewsDto; ...@@ -6,6 +6,7 @@ import com.yeejoin.amos.boot.module.jcs.api.dto.AlertNewsDto;
import com.yeejoin.amos.component.rule.config.ClazzUtils; import com.yeejoin.amos.component.rule.config.ClazzUtils;
import com.yeejoin.amos.component.rule.config.RuleConfig; import com.yeejoin.amos.component.rule.config.RuleConfig;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -15,6 +16,7 @@ import org.springframework.boot.ApplicationRunner; ...@@ -15,6 +16,7 @@ import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil; import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import java.util.List; import java.util.List;
...@@ -43,22 +45,25 @@ public class StartLoader implements ApplicationRunner { ...@@ -43,22 +45,25 @@ public class StartLoader implements ApplicationRunner {
public void loadSysParams(){ public void loadSysParams(){
try { try {
emqKeeper.getMqttClient().subscribe(topic, (s, mqttMessage) -> { emqKeeper.subscript(topic, 1, new EmqxListener() {
byte[] payload = mqttMessage.getPayload(); @Override
public void processMessage(String s, MqttMessage mqttMessage) throws Exception {
byte[] payload = mqttMessage.getPayload();
String obj = new String(payload); String obj = new String(payload);
if (!ValidationUtil.isEmpty(obj)) { if (!ValidationUtil.isEmpty(obj)) {
JSONObject json = JSON.parseObject(obj); JSONObject json = JSON.parseObject(obj);
AlertNewsDto alertNewsDto = new AlertNewsDto( "物联警情", AlertNewsDto alertNewsDto = new AlertNewsDto( "物联警情",
"物联设备发生警情,发生位置:"+(json.get("address")!=null?json.get("address").toString():"")+ "物联设备发生警情,发生位置:"+(json.get("address")!=null?json.get("address").toString():"")+
",事发单位:"+(json.get("unitInvolvedName")!=null?json.get("unitInvolvedName").toString():"") ",事发单位:"+(json.get("unitInvolvedName")!=null?json.get("unitInvolvedName").toString():"")
+",联系人:"+(json.get("contactUser")!=null?json.get("contactUser").toString():"") +",联系人:"+(json.get("contactUser")!=null?json.get("contactUser").toString():"")
+",联系电话:"+(json.get("contactPhone")!=null?json.get("contactPhone").toString():"")+".请尽快处理!", +",联系电话:"+(json.get("contactPhone")!=null?json.get("contactPhone").toString():"")+".请尽快处理!",
json.get("id").toString(), json); json.get("id").toString(), json);
emqKeeper.getMqttClient().publish(topicweb, JSON.toJSON(alertNewsDto).toString().getBytes("UTF-8"), 1, false); emqKeeper.getMqttClient().publish(topicweb, JSON.toJSON(alertNewsDto).toString().getBytes("UTF-8"), 1, false);
} }
}
}); });
} catch (MqttException e) { } catch (Exception e) {
logger.info("订阅物联警情异常", e); logger.info("订阅物联警情异常", e);
} }
} }
......
...@@ -74,6 +74,11 @@ public class AmostEquipApplication { ...@@ -74,6 +74,11 @@ public class AmostEquipApplication {
*/ */
@Bean @Bean
void initMqtt() throws MqttException { void initMqtt() throws MqttException {
emqKeeper.getMqttClient().subscribe("+/+/property", 1, carIotListener); try {
emqKeeper.subscript("+/+/property", 1, carIotListener);
} catch (Exception e) {
e.printStackTrace();
logger.error("EMQ初始化连接失败!");
}
} }
} }
...@@ -102,8 +102,8 @@ public class PatrolApplication { ...@@ -102,8 +102,8 @@ public class PatrolApplication {
@Bean @Bean
void initMqtt() { void initMqtt() {
try { try {
emqKeeper.getMqttClient().subscribe(patrolTopic, 1, patrolMqttListener); emqKeeper.subscript(patrolTopic, 1, patrolMqttListener);
} catch (MqttException e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
logger.error("EMQ初始化连接失败!"); logger.error("EMQ初始化连接失败!");
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment