Commit 549d2fac authored by zhengjiangtao's avatar zhengjiangtao

科技处设备ping告警,风险告警增加

parent 0ab617cc
...@@ -3,105 +3,374 @@ package com.yeejoin.amos.bank.config; ...@@ -3,105 +3,374 @@ package com.yeejoin.amos.bank.config;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.bank.dao.BankInfoDao; import com.yeejoin.amos.bank.TopographyNode;
import com.yeejoin.amos.bank.common.enums.AlarmPointTypeEnum;
import com.yeejoin.amos.bank.common.enums.DevicePointEnum;
import com.yeejoin.amos.bank.common.enums.DeviceStatusEnum;
import com.yeejoin.amos.bank.dao.entity.Alarm;
import com.yeejoin.amos.bank.dao.entity.AlarmPoint;
import com.yeejoin.amos.bank.dao.mapper.BankInfoMapper; import com.yeejoin.amos.bank.dao.mapper.BankInfoMapper;
import com.yeejoin.amos.bank.entity.BankInfo; import com.yeejoin.amos.bank.dao.repository.IAlarmRepository;
import com.yeejoin.amos.bank.entity.EquipmentQualityVo;
import com.yeejoin.amos.bank.remote.client.RiskModelRemoteClient;
import com.yeejoin.amos.bank.service.IAlarmPointService;
import com.yeejoin.amos.bank.service.IAlarmService;
import com.yeejoin.amos.bank.utils.HttpUtil;
import com.yeejoin.amos.bank.webSocket.AmosWsClient; import com.yeejoin.amos.bank.webSocket.AmosWsClient;
import com.yeejoin.amos.bank.webSocket.WebsocketParam; import com.yeejoin.amos.bank.webSocket.WebsocketParam;
import com.yeejoin.amos.component.feign.config.InnerInvokException;
import com.yeejoin.amos.spc.business.dao.mapper.EquipmentMapper;
import com.yeejoin.amos.spc.business.remote.RemoteWebSocketServer;
import com.yeejoin.amos.spc.core.util.StringUtil; import com.yeejoin.amos.spc.core.util.StringUtil;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener; import org.typroject.tyboot.component.emq.EmqxListener;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
@Component @Component
public class EquipmentListener extends EmqxListener implements InitializingBean { public class EquipmentListener extends EmqxListener implements InitializingBean {
private final Logger logger = LogManager.getLogger(EquipmentListener.class); private final Logger logger = LogManager.getLogger(EquipmentListener.class);
@Autowired @Autowired
private EmqKeeper emqKeeper; private EmqKeeper emqKeeper;
@Autowired @Autowired
private BankInfoMapper bankInfoMapper; private BankInfoMapper bankInfoMapper;
@Autowired @Autowired
private AmosWsClient client; private AmosWsClient client;
/** @Autowired
* 监听主题 EquipmentMapper equipmentMapper;
*/
@Override @Autowired
public void afterPropertiesSet() throws Exception { RiskModelRemoteClient riskModelRemoteClient;
emqKeeper.subscript(CommTopic.AISLE_DEVICE.getTopic(), 2, this);
} @Autowired
private RemoteWebSocketServer webSocketServer;
/**
* 接收主题下的消息 @Autowired
*/ private IAlarmPointService alarmPointService;
@Override
public void processMessage(String s, MqttMessage mqttMessage) throws Exception { @Autowired
blueprintMsgtransfer(mqttMessage); private IAlarmService alarmService;
}
@Autowired
private IAlarmRepository iAlarmDao;
/**
* 消息处理 /**
* * 监听主题
* @param mqttMessage */
*/ @Override
private void blueprintMsgtransfer(MqttMessage mqttMessage) { public void afterPropertiesSet() throws Exception {
try { emqKeeper.subscript(CommTopic.AISLE_DEVICE.getTopic(), 2, this);
String jsonStr = new String(mqttMessage.getPayload()); }
logger.info(">>>>>>>>>>>>>>>>>jsonStr", jsonStr);
JSONArray jsonArray = JSON.parseObject(jsonStr).getJSONArray("metricDatas"); /**
if (ObjectUtils.isEmpty(jsonArray)) { * 接收主题下的消息
return; */
} @Override
// JSONArray eqpConfig = parseObject.getJSONArray("eqpConfigMap"); public void processMessage(String s, MqttMessage mqttMessage) throws Exception {
// JSONObject eqpConfigValue = JSONObject.parseObject(eqpConfig.getString(0)); blueprintMsgtransfer(mqttMessage);
// String sourceName = eqpConfigValue.getString("valueStr"); }
jsonArray.forEach(e -> { /**
JSONObject object = (JSONObject) e; * 消息处理
Long eqpId = object.getLong("eqpId"); *
String string = object.getString("metricJson"); * @param mqttMessage
if (!StringUtil.isNotEmpty(string)) { */
return; private void blueprintMsgtransfer(MqttMessage mqttMessage) {
} try {
logger.error(">>>>>>>>>>>>>>>>>string", string); String jsonStr = new String(mqttMessage.getPayload());
JSONObject metricJson = JSONObject.parseObject(string); logger.info(">>>>>>>>>>>>>>>>>jsonStr", jsonStr);
String communication = metricJson.get("communication").toString(); JSONArray jsonArray = JSON.parseObject(jsonStr).getJSONArray("metricDatas");
int status = communication.equals("正常") ? 0 : 1; if (ObjectUtils.isEmpty(jsonArray)) {
int sourceIdStatus = bankInfoMapper.findBySourceId(eqpId); return;
if (status != sourceIdStatus) { }
bankInfoMapper.updateBySourceId(status, eqpId);
jsonArray.forEach(e -> {
Map<String, Object> resultMap = bankInfoMapper.selectBankInfoBySourceId(eqpId); JSONObject object = (JSONObject) e;
resultMap.put("eqpId", eqpId); Long eqpId = object.getLong("eqpId");
pushAisleAlarm(resultMap); String string = object.getString("metricJson");
logger.info("=======aisle状态有变化====="); if (!StringUtil.isNotEmpty(string)) {
} return;
}); }
} catch (Exception e) { logger.error(">>>>>>>>>>>>>>>>>string", string);
logger.error("消息流转报错."); JSONObject metricJson = JSONObject.parseObject(string);
logger.error(e.getMessage(), e); String communication = metricJson.get("communication").toString();
e.printStackTrace(); int status = communication.equals("正常") ? 0 : 1;
} int sourceIdStatus = bankInfoMapper.findBySourceId(eqpId);
} if (status != sourceIdStatus) {
bankInfoMapper.updateBySourceId(status, eqpId);
/**
* 发送通道刷新标识 Map<String, Object> resultMap = bankInfoMapper.selectBankInfoBySourceId(eqpId);
*/ resultMap.put("eqpId", eqpId);
public void pushAisleAlarm(Map<String, Object> result) { pushAisleAlarm(resultMap);
WebsocketParam param = new WebsocketParam("aisleAlarm", JSON.toJSONString(result)); logger.info("=======aisle状态有变化=====");
client.sendMessage(param); }
} });
//告警处理
handleAlarmData(jsonStr);
} catch (Exception e) {
logger.error("消息流转报错.");
logger.error(e.getMessage(), e);
e.printStackTrace();
}
}
/**
* 发送通道刷新标识
*/
public void pushAisleAlarm(Map<String, Object> result) {
WebsocketParam param = new WebsocketParam("aisleAlarm", JSON.toJSONString(result));
client.sendMessage(param);
}
public void handleAlarmData(String jsonStr) {
try {
logger.info("start handle ip equipment alarm..");
JSONObject parseObject = JSON.parseObject(jsonStr);
Long eqpId = parseObject.getLong("eqpId");
JSONArray jsonArray = parseObject.getJSONArray("metricDatas");
JSONArray eqpConfigMap = parseObject.getJSONArray("eqpConfigMap");
// 获取ip
String ipAddress = getEqpConfigMapValueByKey(eqpConfigMap, "IP");
// 所属楼层
String lou = getEqpConfigMapValueByKey(eqpConfigMap, "所属楼层");
// 三维位置
String sanwei = getEqpConfigMapValueByKey(eqpConfigMap, "三维位置");
// 资源ID
String sourceId = getEqpConfigMapValueByKey(eqpConfigMap, "设备ID");
String eqpName = parseObject.getString("eqpName");
// orgcode
String orgcode = parseObject.getString("orgCode");
if (ObjectUtils.isEmpty(jsonArray)) {
return;
}
// 告警信息
String metricKey = "";
// 默认状态为正常
int statusByName = 0;
for (Object e : jsonArray) {
JSONObject object = (JSONObject) e;
String string = object.getString("metricJson");
metricKey = object.getString("metricKey");
if (!StringUtil.isNotEmpty(string)) {
return;
}
logger.error("---------ip equipment--string", string);
System.out.println("----ip equipment chuli recording : " + string);
// {"describe":"状态","communication":"异常"}
JSONObject metricJson = JSONObject.parseObject(string);
for (Map.Entry entry : metricJson.entrySet()) {
// 指标名
System.out.println("----start chuli ip equipment state : " + entry);
String remark = entry.getKey().toString();
String statusName = entry.getValue().toString();
if (remark.equals("communication")) {
// 触发风险合格不合格
statusByName = riskAlarm(eqpId, remark, statusName);
//拓扑闪烁
topographyAlarm(sourceId, statusByName);
}
}
}
;
// 增加报警(跑马灯,首页)
addAlarmMethod(eqpId, statusByName, eqpName, ipAddress, metricKey, orgcode, sanwei, lou);
// 增建告警记录表
addAlarmRecord(eqpId, metricKey, statusByName, orgcode, ipAddress, eqpName);
} catch (Exception e) {
logger.error("消息流转报错.");
logger.error(e.getMessage(), e);
e.printStackTrace();
}
}
private Integer riskAlarm(Long eqpId, String remark, String statusName) {
int statusByName = 0;;
EquipmentQualityVo queryEquipmentPoint = equipmentMapper.queryEquipmentPoint(eqpId, remark);
Long equipmentsPointId = queryEquipmentPoint.getId();
List<Long> riskFactorId = equipmentMapper.judgeEquipmentExists(equipmentsPointId);
if ("异常".equals(statusName)) {
Integer value = DevicePointEnum.getValue("异常");
equipmentMapper.updateEquipmentPointStatus(value, queryEquipmentPoint.getId());
statusByName = value;
} else {
Integer value = DevicePointEnum.getValue("正常");
equipmentMapper.updateEquipmentPointStatus(value, queryEquipmentPoint.getId());
statusByName = value;
}
// 修改rpn值
for (int j = 0; j < riskFactorId.size(); j++) {
try {
riskModelRemoteClient.updateEquipmentAlarmData(riskFactorId.get(j));
System.out.println("----udpate riskFactor sucess");
} catch (InnerInvokException e1) {
logger.error("update rpn fail " + e1.getMessage());
}
}
return statusByName;
}
private void topographyAlarm(String sourceId, Integer statusByName) {
// 查询设备绑定的拓扑图标
TopographyNode node = equipmentMapper.queryNodeBySourceId(sourceId);
// 拓扑闪烁
if (null != node) {
node.setNumber(statusByName);
node.setState(statusByName);
equipmentMapper.updateNodeState(node);
String nodeDetail = (statusByName == 0 ? "[{\"name\":\"通信状态\",\"value\":\"正常\"}]"
: "[{\"name\":\"通信状态\",\"value\":\"异常\"}]");
equipmentMapper.updateNodeDetail(node.getId(), nodeDetail);
try {
//前端页面刷新告警
webSocketServer.sendMessage("refresh", "technologyIp");
} catch (Exception e1) {
logger.error("webSocketServer send self! ");
}
}
}
/**
* 根据eqpConfigMap中的key获取value
* @param eqpConfigMap
* @param key
* @return
*/
public String getEqpConfigMapValueByKey(JSONArray eqpConfigMap, String key) {
// 配置信息
if (null != eqpConfigMap && eqpConfigMap.size() > 0) {
for (Object object : eqpConfigMap) {
JSONObject obj = (JSONObject) object;
String displayName = obj.getString("displayName");
if (displayName.contains(key)) {
String valueStr = obj.getString("valueStr");
return valueStr;
}
}
}
return null;
}
/**
* 增加告警记录
* @param eqpId
* @param metricKey
* @param statusByName
* @param orgCode1
* @param ipAddress
* @param eqpName
*/
public void addAlarmRecord(Long eqpId,String metricKey,Integer statusByName,String orgCode1,String ipAddress,String eqpName) {
List<Alarm> curAlarmList = alarmService.findByQueryColumn(eqpId + metricKey);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (curAlarmList != null && curAlarmList.size() > 0) {
// if("异常".equals(curAlarmList.get(0).getCurrentState())){
Alarm alarm = curAlarmList.get(0);
alarm.setUpdateDate(df.format(new Date()));
if (statusByName != 0) {
alarm.setCurrentState("异常");
} else {
alarm.setCurrentState("正常");
}
alarm.setQueryColumn(eqpId + metricKey);
alarm.setAlarmSourceType(metricKey);
alarm.setClearPerson(metricKey);
iAlarmDao.save(alarm);
// }
} else {
Alarm alarm = new Alarm();
alarm.setSourceId(eqpId + "");
alarm.setAlarmLevel("紧急告警");
alarm.setAlarmReason("");
alarm.setAlarmSourceIp(ipAddress + "");
alarm.setAlarmSourceName(eqpName);
alarm.setAlarmSourceType(metricKey);
alarm.setClearDate("");
alarm.setClearPerson(metricKey);
alarm.setContinueDate("");
if (statusByName != 0) {
alarm.setCurrentState("异常");
} else {
alarm.setCurrentState("正常");
}
alarm.setEnsureDate("");
alarm.setHappenDate("");
alarm.setEnsurePerson("");
alarm.setOrgCode(orgCode1);
alarm.setUpdateDate(df.format(new Date()));
alarm.setQueryColumn(eqpId + metricKey);
iAlarmDao.save(alarm);
}
}
public void addAlarmMethod(Long eqpId,Integer statusByName,String eqpName,String ipAddress,String metricKey,String orgCode,String sanwei,String lou) throws IOException {
List<AlarmPoint> alarmPoint = alarmPointService.selectPointTypeAndPointId(AlarmPointTypeEnum.设备.getCode(),
eqpId);
if (alarmPoint != null && alarmPoint.size() > 0) {
// 存在报警,判断当前状态是不是合格,合格则删除上次的报警
if (statusByName == 0) {
// 删除告警
alarmPointService.delete(alarmPoint.get(0));
}
} else {
// 不存在,判断当前状态是不是合格,不合格新增,合格不做任何操作
if (statusByName != 0) {
// 增加告警
AlarmPoint alarmPoint1 = new AlarmPoint();
alarmPoint1.setPointId(eqpId);
alarmPoint1.setPointType(AlarmPointTypeEnum.设备.getCode());
alarmPoint1.setUpdateDate(new Date());
alarmPoint1.setIsAlarm(1);// 不合格
alarmPoint1.setContent(eqpName + "-" + ipAddress + "-" + metricKey);
alarmPoint1.setCode(orgCode);
alarmPoint1.setPointName(eqpName);
Map<String, Object> map = new HashMap<>();
map.put("position", sanwei);
map.put("storey", lou);
map.put("levelStr", "impEqu_04");
alarmPoint1.setPointAttrs(JSON.toJSONString(map));
alarmPointService.save(alarmPoint1);
// 推送告警
List<AlarmPoint> list = alarmPointService.findAll();
WebsocketParam param = new WebsocketParam("alarmPoint", JSON.toJSONString(list));
Properties props = PropertiesLoaderUtils.loadAllProperties("application.properties");
String url = (String) props.get("params.remoteWebsocketUrl") + "/generic/sendMessage";
HttpUtil.PostJson(url, JSON.toJSONString(param));
}
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment