Commit 9825e3d4 authored by zhengjiawei's avatar zhengjiawei

mqtt

parent 770ee2b4
...@@ -10,7 +10,7 @@ package com.yeejoin.amos.fas.common.enums; ...@@ -10,7 +10,7 @@ package com.yeejoin.amos.fas.common.enums;
*/ */
public enum EquipmentRiskTypeEnum { public enum EquipmentRiskTypeEnum {
HZGJ("alarm", "火灾告警"),GZ("trouble", "故障"); HZGJ("FIREALARM", "火灾告警"),GZ("BREAKDOWN", "故障");
private String code; private String code;
private String type; private String type;
......
package com.yeejoin.amos.fas.business.action; package com.yeejoin.amos.fas.business.action;
import com.yeejoin.amos.fas.business.service.intfc.IEquipmentHandlerService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -12,10 +13,14 @@ public class ContingencyLogListener implements ApplicationListener<ContingencyEv ...@@ -12,10 +13,14 @@ public class ContingencyLogListener implements ApplicationListener<ContingencyEv
@Autowired @Autowired
IRuleRunningSnapshotService ruleRunningSnapshotService; IRuleRunningSnapshotService ruleRunningSnapshotService;
// @Autowired
// IEquipmentHandlerService IEquipmentHandlerService;
@Override @Override
public void onApplicationEvent(ContingencyEvent event) { public void onApplicationEvent(ContingencyEvent event) {
ruleRunningSnapshotService.reacordPlan(event.getTopic(), event.getMsgType(), event.getMsgBody(), event.getContingency()); ruleRunningSnapshotService.reacordPlan(event.getTopic(), event.getMsgType(), event.getMsgBody(), event.getContingency());
// IEquipmentHandlerService.subscribeTopic();
} }
} }
...@@ -135,13 +135,13 @@ public class HandlerMqttMessageImpl implements IEquipmentHandlerService { ...@@ -135,13 +135,13 @@ public class HandlerMqttMessageImpl implements IEquipmentHandlerService {
deviceData.setMonitor(equipment != null ? equipment.getName() : ""); deviceData.setMonitor(equipment != null ? equipment.getName() : "");
deviceData.setId(String.valueOf(equipmentSpecific.getId())); deviceData.setId(String.valueOf(equipmentSpecific.getId()));
deviceData.setCode(equipmentSpecific.getCode()); deviceData.setCode(equipmentSpecific.getCode());
if (EquipmentRiskTypeEnum.HZGJ.getType().equals(equipmentSpecificIndex.getType())) { //设备告警处理逻辑 if (EquipmentRiskTypeEnum.HZGJ.getCode().equals(equipmentSpecificIndex.getType())) { //设备告警处理逻辑
log.info("(报警)Message type is: " + equipmentSpecificIndex.getType()); log.info("(报警)Message type is: " + equipmentSpecificIndex.getType());
// 报警触发调用规则服务 // 报警触发调用规则服务
executeDynamicPlan(deviceData, equipment, equipmentSpecific, toke); executeDynamicPlan(deviceData, equipment, equipmentSpecific, toke);
} else if (EquipmentRiskTypeEnum.GZ.getType().equals(equipmentSpecificIndex.getType())) { // 设备故障处理逻辑 } else if (EquipmentRiskTypeEnum.GZ.getCode().equals(equipmentSpecificIndex.getType())) { // 设备故障处理逻辑
log.info("(故障)Message type is: " + equipmentSpecificIndex.getType()); log.info("(故障)Message type is: " + equipmentSpecificIndex.getType());
long equipId = 0; long equipId = 0;
if(StringUtil.isNotEmpty(equipment)) { if(StringUtil.isNotEmpty(equipment)) {
...@@ -188,7 +188,26 @@ public class HandlerMqttMessageImpl implements IEquipmentHandlerService { ...@@ -188,7 +188,26 @@ public class HandlerMqttMessageImpl implements IEquipmentHandlerService {
// 若登录系统则订阅装备数据 // 若登录系统则订阅装备数据
webMqttSubscribe.adapter.removeTopic(defaultTopic); webMqttSubscribe.adapter.removeTopic(defaultTopic);
String orgCode = reginParams.getCompany().getOrgCode(); String orgCode = reginParams.getCompany().getOrgCode();
String topic = String.format("%s.%s.%s%s", serverName, orgCode, "equipment", "/#"); String topic = String.format("%s.%s%s", serverName, "equipment", "/#");
String[] strs = webMqttSubscribe.adapter.getTopic();
List<String> list = Stream.of(strs).collect(Collectors.toList());
if(list.size() > 0) {
list.forEach(x -> {
if(!(x.equals(topic))) {
webMqttSubscribe.adapter.addTopic(topic);
}
});
} else {
webMqttSubscribe.adapter.addTopic(topic);
}
}
@Override
public void subscribeTopic() {
// 若登录系统则订阅装备数据
webMqttSubscribe.adapter.removeTopic(defaultTopic);
String topic = String.format("%s.%s,%s", serverName, "equipment", "/#");
String[] strs = webMqttSubscribe.adapter.getTopic(); String[] strs = webMqttSubscribe.adapter.getTopic();
List<String> list = Stream.of(strs).collect(Collectors.toList()); List<String> list = Stream.of(strs).collect(Collectors.toList());
if(list.size() > 0) { if(list.size() > 0) {
......
...@@ -12,5 +12,8 @@ import com.yeejoin.amos.fas.business.vo.ReginParams; ...@@ -12,5 +12,8 @@ import com.yeejoin.amos.fas.business.vo.ReginParams;
*/ */
public interface IEquipmentHandlerService { public interface IEquipmentHandlerService {
void handlerMqttMessage(String topic, String message); void handlerMqttMessage(String topic, String message);
void subscribeTopic(ReginParams reginParams); void subscribeTopic(ReginParams reginParams);
void subscribeTopic();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment