Commit 02642fd1 authored by 张森's avatar 张森

稳压泵启停频次触发预警

给iot发送消息时添加 traceId 字段
parent 36bd7877
......@@ -86,4 +86,12 @@ public interface IotFeign {
ResponseModel<Map<String ,Object>> queryIotDataNum(@RequestParam("timeStart") String timeStart,
@RequestParam("timeEnd") String timeEnd);
@RequestMapping(value = "v1/livedata/queryIotDataNumByIndex", method = RequestMethod.GET, consumes = "application/json")
ResponseModel<Map<String ,Integer>> queryIotDataNumByIndex(@RequestParam(value = "timeStart") String timeStart,
@RequestParam(value = "timeEnd") String timeEnd,
@RequestParam(value = "productKey") String productKey,
@RequestParam(value = "deviceName") String deviceName,
@RequestParam(value = "indexKeys") String indexKeys,
@RequestParam(value = "value") String value);
}
package com.yeejoin.equipmanage.service.impl;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
......@@ -104,6 +105,11 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
* 水箱液位
*/
private final static String CAFS_WaterTank_WaterTankLevel = "CAFS_WaterTank_WaterTankLevel";
private final static String FHS_PressurePump_Start = "FHS_PressurePump_Start";
private final static String FHS_PressurePump_Stop = "FHS_PressurePump_Stop";
private static final String PUMP_JOB_GROUP_NAME = "EQUIP_PUMP_JOB_GROUP_NAME";
private static final String PUMP_TRIGGER_NAME = "EQUIP_PUMP_TRIGGER_NAME";
private static final String PUMP_TRIGGER_GROUP_NAME = "EQUIP_PUMP_TRIGGER_GROUP_NAME";
......@@ -389,7 +395,12 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
//给 iot服务 推送消息 插数据到 influxdb
if (isSendIot) {
mqttSendGateway.sendToMqtt("influxdb/" + topic.substring(0, endIndex), message);
JSONObject messageObj = JSON.parseObject(message);
if (!messageObj.containsKey("traceId")) {
messageObj.put("traceId", System.currentTimeMillis());
}
String messageTraceId = JSON.toJSONString(messageObj);
mqttSendGateway.sendToMqtt("influxdb/" + topic.substring(0, endIndex), messageTraceId);
}
EquipmentSpecificVo vo = eqIotCodeList.get(0);
topicEntity.setType(vo.getType());
......@@ -1120,6 +1131,11 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
iotDataVO.getKey().equalsIgnoreCase(FHS_LevelDetector_WaterLevel)) {
alarmFlag = doWaterPoolLevel(iotDataVO, equipmentSpecificIndex, messageBodyMap);
}
//稳压泵启停次数大于15次触发预警
if (iotDataVO.getKey().equalsIgnoreCase(FHS_PressurePump_Start) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_PressurePump_Stop)) {
doPressurePumInfo(topicEntity, equipmentSpecificIndex);
}
// 遥测数据生成告警事件、日志处理
if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
......@@ -1440,6 +1456,65 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
return alarmFlag;
}
private void doPressurePumInfo(TopicEntityVo topicEntity, EquipmentSpecificIndex equipmentSpecificIndex) {
// 查询iot该稳压泵的启停次数 一个小时内
String startDate = DateUtil.format(new Date(), DatePattern.NORM_DATETIME_PATTERN);
String endDate = DateUtil.format(DateUtil.offsetHour(new Date(), -1), DatePattern.NORM_DATETIME_PATTERN);
String prefix = topicEntity.getIotCode().substring(0, 8);
String suffix = topicEntity.getIotCode().substring(8);
ResponseModel<Map<String, Integer>> mapResponseModel = iotFeign.queryIotDataNumByIndex(startDate, endDate, prefix, suffix, FHS_PressurePump_Start + "," + FHS_PressurePump_Stop, "true");
if (200 == mapResponseModel.getStatus()) {
Map<String, Integer> result = mapResponseModel.getResult();
Integer totalNum = result.get("num");
HashMap<String, String> extra = new HashMap<>();
extra.put("useSource", "center");
extra.put("codingSystem", "center");
extra.put("codingType", "equipment");
extra.put("problemReception", "station");
extra.put("bussId", String.valueOf(equipmentSpecificIndex.getEquipmentSpecificId()));
extra.put("clearUniqueCode", "equip-pressure");
TableContentVo tableContentVo = new TableContentVo("报警类型", "text", "稳压泵启停频次过高", "1");
TableContentVo tableContentVo1 = new TableContentVo("报警部位", "text", equipmentSpecificIndex.getLocation(), "2");
TableContentVo tableContentVo2 = new TableContentVo("报警时间", "text", DateUtil.now(), "3");
TableContentVo tableContentVo3 = new TableContentVo("报警对象", "text", equipmentSpecificIndex.getEquipmentSpecificName(), "4");
List<TableContentVo> tableContentVos = Arrays.asList(tableContentVo, tableContentVo1, tableContentVo2, tableContentVo3);
handlePressureWarning(totalNum.toString(),
equipmentSpecificIndex,
String.valueOf(equipmentSpecificIndex.getEquipmentSpecificId()),
"fireIot/data/analysis",
"START_NUM",
extra,
"equip",
tableContentVos);
}
}
private void handlePressureWarning(String indexValue,
EquipmentSpecificIndex equipmentSpecificIndex,
String businessId,
String topic,
String indexKey,
Object extra,
String source,
List<TableContentVo> tableContentVos) {
// 触发预警业务
BizMessage bizMessage = new BizMessage();
bizMessage.setIndexKey(indexKey);
bizMessage.setIndexValue(indexValue);
RiskBizInfoVo riskBizInfoVo = fetchData(equipmentSpecificIndex, extra, source);
riskBizInfoVo.setWarningObjectCode(businessId);
riskBizInfoVo.getDynamicDetails().get(0).setTabContent(tableContentVos);
bizMessage.setBizInfo(riskBizInfoVo);
bizMessage.setTraceId(businessId);
try {
emqKeeper.getMqttClient().publish(topic,
JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 消防水池、工业水池和消防水箱 消息发送
*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment