Commit fbc846bd authored by 张森's avatar 张森

中心触发预警问题处理

parent eaef1c40
......@@ -7,6 +7,7 @@ import com.yeejoin.equipmanage.mapper.EquipmentSpecificMapper;
import com.yeejoin.equipmanage.service.ISyncDataService;
import com.yeejoin.equipmanage.service.MqttEventReceiveService;
import com.yeejoin.equipmanage.service.MqttReceiveService;
import com.yeejoin.equipmanage.service.MqttWaringReceiveService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -69,6 +70,9 @@ public class EquipmentIotMqttReceiveConfig {
private MqttEventReceiveService mqttEventReceiveService;
@Autowired
private MqttWaringReceiveService mqttWaringReceiveService;
private ISyncDataService iSyncDataService;
......@@ -136,6 +140,7 @@ public class EquipmentIotMqttReceiveConfig {
list.add(riskMsgCenterEquipTopic);
list.add(riskMsgCenterPatrolTopic);
list.add("+/+/eventAlarm"); // 添加换流站韶山监听事件 --- shaoshan 修改为 eventAlarm:事件告警 - 统一
list.add("equip/threshold/warning"); // 触发预警相关 - 【站储水量不大于4000、泡沫罐和管网压力等阈值信息、稳压泵信息】- 中心系统使用
String[] arr = list.toArray(new String[list.size()]);
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr);
adapter.setCompletionTimeout(completionTimeout);
......@@ -163,6 +168,8 @@ public class EquipmentIotMqttReceiveConfig {
mqttReceiveService.handlerMqttRomaMessage(topic,msg);
}else if (dataType.equals("eventAlarm") && StringUtil.isNotEmpty(msg)){ // 告警信号
mqttReceiveService.handlerMqttStationMessage(topic,msg);
} else if (dataType.equals("warning") && StringUtil.isNotEmpty(msg)) {
mqttWaringReceiveService.handlerMqttMessage(topic, msg);
}
}
};
......
package com.yeejoin.equipmanage.service;
public interface MqttWaringReceiveService {
/**
* 处理稳压泵、水池、泡沫罐、管网压力等阈值相关数据
*
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttMessage(String topic, String message);
}
......@@ -1380,9 +1380,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
* @param iotDataVO iotDataVO
* @param equipmentSpecificIndex equipmentSpecificIndex
*/
private boolean doFoamTankLevel(IotDataVO iotDataVO, EquipmentSpecificIndex equipmentSpecificIndex, Map<String, String> messageBody) {
public boolean doFoamTankLevel(IotDataVO iotDataVO, EquipmentSpecificIndex equipmentSpecificIndex, Map<String, String> messageBody) {
boolean alarmFlag = false;
MessageModel model = new MessageModel();
Map<String, Object> map = new HashMap<>();
String indexKey = "";
if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) || iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel)) {
......@@ -1400,9 +1399,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
if (!ObjectUtils.isEmpty(checkValue)) {
nowValue = checkValue;
}
//预警业务 泡沫罐 或 者管网压力
HashMap<String, String> extra = new HashMap<>();
extra.put("useSource", "center");
extra.put("codingSystem", "center");
......@@ -1426,49 +1423,10 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
extra,
"cafsWaterLevelOver",
tableContentVos);
if (nowValue.compareTo(minValue) < 0 || nowValue.compareTo(maxValue) > 0) {
String body = "";
if (nowValue.compareTo(minValue) < 0) {
body = "当前数值 " + nowValue + " 低于最低报警阈值 " + minValue;
} else if (nowValue.compareTo(maxValue) > 0) {
body = "当前数值 " + nowValue + " 超过最高报警阈值 " + maxValue;
}
messageBody.put("messageBody", equipmentSpecificIndex.getEquipmentSpecificName() + "-" + body);
String bodyMain = String.format("%s,- 当前数值%s,%s ,请及时查看处理。",
equipmentSpecificIndex.getEquipmentSpecificName() + "-" + equipmentSpecificIndex.getLocation(),
nowValue,
nowValue.compareTo(minValue) < 0 ? "低于最低报警阈值" + minValue : "超过最高报警阈值" + maxValue
);
model.setTitle("模拟量提醒");
model.setBody(bodyMain);
model.setMsgType("FoamTankOrPipeNetwork");
model.setSendTime(new Date());
model.setIsSendWeb(true);
model.setCategory(1);
model.setRelationId(equipmentSpecificIndex.getEquipmentSpecificId().toString());
model.setIsSendApp(false);
model.setTerminal("WEB");
model.setRecivers(Collections.singletonList("system"));
Map<String, String> ext = new HashMap<>();
ext.put("content", body);
ext.put("type", "模拟量超阈值提醒");
ext.put("name", equipmentSpecificIndex.getEquipmentSpecificName());
ext.put("time", new SimpleDateFormat(DateUtils.DATE_TIME_PATTERN).format(new Date()));
model.setExtras(ext);
// try {
// Token token = remoteSecurityService.getServerToken();
// systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
// } catch (Exception e) {
// log.error("调用平台出错!");
// }
// log.info(String.format("调用平台消息服务成功:%s", JSON.toJSONString(model)));
alarmFlag = true;
}
return alarmFlag;
}
private void doWaterStationWarning(String bizOrgCode, String bizOrgName) {
public void doWaterStationWarning(String bizOrgCode, String bizOrgName) {
List<Map<String, Object>> result = poolStatisticController.getStationWaterPanelInfo(bizOrgCode).getResult();
String indexValue = result.stream()
.map(map -> (BigDecimal) map.getOrDefault("volumeBigDecimal", new BigDecimal(0)))
......@@ -1520,7 +1478,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
}
private void doPressurePumInfo(TopicEntityVo topicEntity, EquipmentSpecificIndex equipmentSpecificIndex) {
public void doPressurePumInfo(TopicEntityVo topicEntity, EquipmentSpecificIndex equipmentSpecificIndex) {
// 查询iot该稳压泵的启停次数 一个小时内
String endDate = DateUtil.format(new Date(), DatePattern.NORM_DATETIME_PATTERN);
String startDate = DateUtil.format(DateUtil.offsetHour(new Date(), -1), DatePattern.NORM_DATETIME_PATTERN);
......@@ -1585,7 +1543,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
* @param iotDataVO iotDataVO
* @param equipmentSpecificIndex equipmentSpecificIndex
*/
private boolean doWaterPoolLevel(IotDataVO iotDataVO, EquipmentSpecificIndex equipmentSpecificIndex, Map<String, String> messageBody) {
public boolean doWaterPoolLevel(IotDataVO iotDataVO, EquipmentSpecificIndex equipmentSpecificIndex, Map<String, String> messageBody) {
// 权限处理
PermissionInterceptorContext.setDataAuthRule(authKeyEnable);
boolean alarmFlag = false;
......@@ -1640,41 +1598,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
"waterLevelOver",
tableContentVos);
}
if (nowValue!=null&&(nowValue.compareTo(minValue) < 0 || nowValue.compareTo(maxValue) > 0)) {
String body = "";
if (nowValue.compareTo(minValue) < 0) {
body = "当前数值 " + nowValue + " 低于最低报警阈值 " + minValue;
} else if (nowValue.compareTo(maxValue) > 0) {
body = "当前数值 " + nowValue + " 超过最高报警阈值 " + maxValue;
}
String bodyMain = String.format("%s,- 当前数值%s,%s ,请及时查看处理。",
map.get("name"),
nowValue,
nowValue.compareTo(minValue) < 0 ? "低于最低报警阈值" + minValue : "超过最高报警阈值" + maxValue
);
model.setTitle("模拟量提醒");
model.setBody(bodyMain);
model.setMsgType("WaterPoolKey");
model.setSendTime(new Date());
model.setIsSendWeb(true);
model.setCategory(1);
model.setRelationId(map.get("id").toString());
model.setIsSendApp(false);
model.setTerminal("WEB");
model.setRecivers(Collections.singletonList("system"));
Map<String, String> ext = new HashMap<>();
ext.put("content", body);
ext.put("type", "模拟量超阈值提醒");
ext.put("name", (String) map.get("name"));
ext.put("time", new SimpleDateFormat(DateUtils.DATE_TIME_PATTERN).format(new Date()));
model.setExtras(ext);
// Token token = remoteSecurityService.getServerToken();
// systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
// log.info(String.format("调用平台消息服务成功:%s", JSON.toJSONString(model)));
alarmFlag = true;
messageBody.put("messageBody", map.get("name") + "-" + body);
}
}
return alarmFlag;
}
......
package com.yeejoin.equipmanage.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex;
import com.yeejoin.equipmanage.common.vo.IotDataVO;
import com.yeejoin.equipmanage.common.vo.TopicEntityVo;
import com.yeejoin.equipmanage.service.MqttWaringReceiveService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
@Slf4j
@Service
public class MqttWarningReceiveServiceImpl implements MqttWaringReceiveService {
/**
* 泡沫罐KEY
*/
private final static String CAFS_FoamTank_FoamTankLevel = "CAFS_FoamTank_FoamTankLevel";
/**
* 泡沫罐KEY
*/
private final static String FHS_PipePressureDetector_PipePressure = "FHS_PipePressureDetector_PipePressure";
/**
* 水池信息
*/
private final static String FHS_FirePoolDevice_WaterLevel = "FHS_FirePoolDevice_WaterLevel";
private final static String FHS_LevelDetector_WaterLevel = "FHS_LevelDetector_WaterLevel";
/**
* 水池信息
*/
private final static String FHS_WirelessliquidDetector_WaterLevel = "FHS_WirelessliquidDetector_WaterLevel";
/**
* 水箱液位
*/
private final static String CAFS_WaterTank_WaterTankLevel = "CAFS_WaterTank_WaterTankLevel";
private final static String FHS_PressurePump_Start = "FHS_PressurePump_Start";
private static final String REALTIME_IOT_INDEX_KEY = "realtime_iot_index_key";
private static final String REALTIME_IOT_INDEX_VALUE = "realtime_iot_index_value";
@Autowired
MqttReceiveServiceImpl mqttReceiveService;
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttMessage(String topic, String message) {
try {
JSONObject messageObj = JSON.parseObject(message);
IotDataVO iotDataVO = new IotDataVO();
iotDataVO.setKey(messageObj.get(REALTIME_IOT_INDEX_KEY).toString());
iotDataVO.setValue(messageObj.get(REALTIME_IOT_INDEX_VALUE).toString());
EquipmentSpecificIndex equipmentSpeIndex = new EquipmentSpecificIndex();
equipmentSpeIndex.setEquipmentSpecificId(Long.valueOf(messageObj.get("id").toString()));
equipmentSpeIndex.setEquipmentSpecificName(messageObj.get("name").toString());
equipmentSpeIndex.setEquipmentSpecificCode(messageObj.get("code").toString());
equipmentSpeIndex.setLocation(messageObj.get("position").toString());
equipmentSpeIndex.setBizOrgCode(messageObj.get("biz_org_code").toString());
equipmentSpeIndex.setBizOrgName(messageObj.get("biz_org_name").toString());
equipmentSpeIndex.setIotCode(messageObj.get("iot_code").toString());
isAlarmFlag(equipmentSpeIndex, iotDataVO);
} catch (Exception e) {
e.printStackTrace();
}
}
private void isAlarmFlag(EquipmentSpecificIndex equipmentSpeIndex, IotDataVO iotDataVO) {
//管网压力、泡沫罐信息、水箱液位告警处理
if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel)) {
mqttReceiveService.doFoamTankLevel(iotDataVO, equipmentSpeIndex, null);
}
//消防水池液位处理
if (iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_LevelDetector_WaterLevel)) {
mqttReceiveService.doWaterPoolLevel(iotDataVO, equipmentSpeIndex, null);
// 处理每站消防储水量不少于4000m³ 预警问题
mqttReceiveService.doWaterStationWarning(equipmentSpeIndex.getBizOrgCode(), equipmentSpeIndex.getBizOrgName());
}
//稳压泵启动次数大于15次触发预警
if (iotDataVO.getKey().equalsIgnoreCase(FHS_PressurePump_Start) && "true".equals(iotDataVO.getValue().toString())) {
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setIotCode(equipmentSpeIndex.getIotCode());
mqttReceiveService.doPressurePumInfo(topicEntity, equipmentSpeIndex);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment