Commit c47788d1 authored by maoying's avatar maoying

调整设备实时消息处理代码

parent 73d45681
......@@ -181,7 +181,7 @@ public interface IEquipmentSpecificSerivce extends IService<EquipmentSpecific> {
*
* @param indexs
*/
void updateEquipmentSpecIndexRealtimeData(List<EquipmentSpecificIndex> indexs);
void updateEquipmentSpecIndexRealtimeData(EquipmentSpecificIndex index);
List<EquipmentSpecificVo> getEquipAndCarIotcodeByIotcode(String iotCode);
......
......@@ -1615,10 +1615,9 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
}
@Override
public void updateEquipmentSpecIndexRealtimeData(List<EquipmentSpecificIndex> indexs) {
public void updateEquipmentSpecIndexRealtimeData(EquipmentSpecificIndex index) {
// TODO Auto-generated method stub
if (!ObjectUtils.isEmpty(indexs)) {
EquipmentSpecificIndex index = indexs.get(0);
if (!ObjectUtils.isEmpty(index)) {
EquipmentSpecific es = equipmentSpecificMapper.selectById(index.getEquipmentSpecificId());
es.setRealtimeIotEsIndexId(index.getId());
es.setRealtimeIotIndexKey(index.getNameKey());
......
......@@ -77,7 +77,6 @@ import com.yeejoin.equipmanage.service.IEquipmentIndexService;
import com.yeejoin.equipmanage.service.IEquipmentService;
import com.yeejoin.equipmanage.service.IEquipmentSpecificAlarmLogService;
import com.yeejoin.equipmanage.service.IEquipmentSpecificAlarmService;
import com.yeejoin.equipmanage.service.IEquipmentSpecificIndexSerivce;
import com.yeejoin.equipmanage.service.IEquipmentSpecificIndexService;
import com.yeejoin.equipmanage.service.IEquipmentSpecificSerivce;
import com.yeejoin.equipmanage.service.IFireFightingSystemService;
......@@ -94,996 +93,979 @@ import lombok.extern.slf4j.Slf4j;
/**
* @author keyong
* @title: MqttReceiveServiceImpl
* <pre>
* @description: 增量数据处理
* </pre>
*
* <pre>
* &#64;description: 增量数据处理
* </pre>
*
* @date 2020/11/3 13:39
*/
@Slf4j
@Service
public class MqttReceiveServiceImpl implements MqttReceiveService {
private static Map<String, TemperatureAlarmDto> temperatureMap = new HashMap<>();
private static Map<String, TemperatureAlarmDto> temperatureMap = new HashMap<>();
@Autowired
IEquipmentSpecificIndexService equipmentSpecificIndexService;
@Autowired
IEquipmentSpecificIndexService equipmentSpecificIndexService;
@Autowired
ICarPropertyService carPropertyService;
@Autowired
ICarPropertyService carPropertyService;
@Autowired
IEquipmentSpecificAlarmService equipmentSpecificAlarmService;
@Autowired
IEquipmentSpecificAlarmService equipmentSpecificAlarmService;
@Autowired
IEquipmentSpecificAlarmLogService equipmentSpecificAlarmLogService;
@Autowired
IEquipmentSpecificAlarmLogService equipmentSpecificAlarmLogService;
@Autowired
EquipmentSpecificAlarmMapper equipmentSpecificAlarmMapper;
@Autowired
EquipmentSpecificAlarmLogMapper equipmentSpecificAlarmLogMapper;
@Autowired
EquipmentSpecificAlarmMapper equipmentSpecificAlarmMapper;
@Autowired
@Lazy
IEquipmentIndexService equipmentIndexService;
@Autowired
EquipmentSpecificAlarmLogMapper equipmentSpecificAlarmLogMapper;
@Autowired
EquipmentSpecificIndexMapper equipmentSpecificIndexMapper;
@Autowired
@Lazy
IEquipmentIndexService equipmentIndexService;
@Autowired
EquipmentSpecificMapper equipmentSpecificMapper;
@Autowired
EquipmentSpecificIndexMapper equipmentSpecificIndexMapper;
@Autowired
FireFightingSystemMapper FireFightingSystemMapper;
@Autowired
EquipmentSpecificMapper equipmentSpecificMapper;
@Autowired
IFireFightingSystemService fireFightingSystemService;
@Autowired
FireFightingSystemMapper FireFightingSystemMapper;
@Autowired
MqttSendGateway mqttSendGateway;
@Autowired
IFireFightingSystemService fireFightingSystemService;
@Autowired
private RedisUtils redisUtils;
@Autowired
MqttSendGateway mqttSendGateway;
@Autowired
CarMapper carMapper;
@Autowired
IMainIotMonitorSerivce iMainIotMonitorSerivce;
@Autowired
private IEquipmentSpecificIndexSerivce equipmentSpecificIndexSerivce;
@Autowired
private ISyncDataService syncDataService;
@Autowired
private IEquipmentAlarmReportDayService iEquipmentAlarmReportDayService;
@Autowired
private IEquipmentSpecificSerivce iEquipmentSpecificSerivce;
@Autowired
FireFightingSystemMapper fireFightingSystemMapper;
@Autowired
private SystemctlFeign systemctlFeign;
@Autowired
private RemoteSecurityService remoteSecurityService;
@Autowired
private TopographyService topographyService;
@Autowired
private IEquipmentService equipmentService;
@Value("${equipManage.name}")
private String serverName;
@Value("${mqtt.vehicle.topic}")
private String carTopic;
@Value("${equip.point.equipmentdata.topic}")
private String canvasTopic;
@Value("${equip.index.topic}")
private String indexTopic;
@Value("${spring.redis.expire.time}")
private long redisExpireTime;
@Value("${systemctl.sync.switch}")
private Boolean syncSwitch;
@Value("${systemctl.amos.switch}")
private Boolean amosSwitch;
@Value("${isSendApp}")
private Boolean isSendApp;
private final static Map staticMap = new HashMap();
private static Boolean bool = Boolean.FALSE;
static {
staticMap.put("FireCar_GDLongitude", "0");
staticMap.put("FireCar_GDLatitude", "0");
staticMap.put("FireCar_Speed", "0");
staticMap.put("FireCar_Power", "0");
staticMap.put("FireCar_CourseOverGround", "0");
staticMap.put("time", System.currentTimeMillis());
staticMap.put("FireCar_Start", "false");
}
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIncrementMessage(String topic, String message) {
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic);
topicEntity.setMessage(message);
int endIndex = topic.lastIndexOf("/");
String iotCode = topic.substring(0, endIndex).replace("/", "");
topicEntity.setIotCode(iotCode);
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
if (eqIotCodeList.isEmpty()) {
log.info("该数据{}不存在!", iotCode);
return;
}
if (eqIotCodeList.size() > 1) {
log.info("有重复的{}数据!", iotCode);
}
EquipmentSpecificVo vo = eqIotCodeList.get(0);
topicEntity.setType(vo.getType());
topicEntity.setCode(vo.getCode());
JSONObject json = JSONObject.parseObject(message);
Iterator it = json.entrySet().iterator();
List<IotDataVO> iotDatalist = new ArrayList<IotDataVO>();
while (it.hasNext()) {
IotDataVO iotDataVO = new IotDataVO();
Map.Entry<String, Object> entry = (Map.Entry<String, Object>) it.next();
iotDataVO.setKey(entry.getKey());
iotDataVO.setValue(entry.getValue());
iotDatalist.add(iotDataVO);
}
if (ObjectUtils.isEmpty(iotDatalist)) {
return;
}
log.info(String.format("收到mqtt消息:%s", message));
realTimeDateProcessing(topicEntity, iotDatalist);
}
/**
* 物联数据处理
* @param topicEntity
* @param iotDatalist
*/
public void realTimeDateProcessing(TopicEntityVo topicEntity, List<IotDataVO> iotDatalist){
String iotCode = topicEntity.getIotCode();
if(EquipAndCarEnum.equip.type.equals(topicEntity.getType())){
List<EquipmentSpecificIndex> indexList = equipmentSpecificIndexService.getEquipmentSpeIndexBySpeIotCode(iotCode);
if (ObjectUtils.isEmpty(indexList)){
return;
}
equipRealTimeDate(iotDatalist, indexList, topicEntity);
}else{
List<CarProperty> carProperties = carPropertyService.getCarPropListByIotCode(iotCode);
if (ObjectUtils.isEmpty(carProperties)){
return;
}
carRealTimeDate(iotDatalist, carProperties);
}
}
public static List<EquipmentSpecificAlarmLog> upAlarmLogStatus(String iotCode, String equipmentSpecificIndexKey, IEquipmentSpecificAlarmLogService equipmentSpecificAlarmLogService) {
LambdaQueryWrapper<EquipmentSpecificAlarmLog> queryWrapper = new LambdaQueryWrapper();
queryWrapper.eq(EquipmentSpecificAlarmLog::getIotCode, iotCode);
queryWrapper.eq(EquipmentSpecificAlarmLog::getEquipmentSpecificIndexKey, equipmentSpecificIndexKey);
queryWrapper.ne(EquipmentSpecificAlarmLog::getStatus, AlarmStatusEnum.HF.getCode());
List<EquipmentSpecificAlarmLog> logs = equipmentSpecificAlarmLogService.getBaseMapper().selectList(queryWrapper);
logs.forEach(x -> {
x.setCleanTime(new Date());
x.setStatus(AlarmStatusEnum.HF.getCode());
});
if (!logs.isEmpty()) {
equipmentSpecificAlarmLogService.updateBatchById(logs);
}
return logs;
}
public void publishDataToCanvas(List<EquipmentSpecificIndex> indexList) {
if (!ObjectUtils.isEmpty(indexList)) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
EquipmentSpecificIndex index = indexList.stream().filter(x -> x.getUpdateDate() != null)
.sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
.collect(Collectors.toList()).get(0);
EquipmentStateVo equipmentStateVo = new EquipmentStateVo();
equipmentStateVo.setEquipName(index.getEquipmentSpecificName());
equipmentStateVo.setOrgCode(index.getOrgCode());
equipmentStateVo.setSpecificId(index.getEquipmentSpecificId());
equipmentStateVo.setEquipCode(index.getQrCode());
equipmentStateVo.setEquipIotCode(index.getIotCode());
equipmentStateVo.setStatus("");
equipmentStateVo.setColor(index.getEmergencyLevelColor());
// 添加性能指标项
equipmentStateVo.setSpeindexList(fireFightingSystemService.getSpeIndex(index.getEquipmentSpecificId()));
Map<String, Object> topicObject = new HashMap<>();
topicObject.put("equipCode", equipmentStateVo.getEquipCode());
if (TrueOrFalseEnum.real.value.toUpperCase().equals(index.getValue().toUpperCase())) {
topicObject.put("color", equipmentStateVo.getColor());
} else {
topicObject.put("color", "");
}
Map<String, Object> map = JSON.parseObject(JSON.toJSONString(equipmentStateVo));
map.put(canvasTopic, topicObject);
// 发送数据至画布
mqttSendGateway.sendToMqtt(canvasTopic, JSON.toJSONString(map));
}
});
}
}
void syncSystemctlMsg(EquipmentSpecificAlarmLog equipmentSpecificAlarmLog) {
try {
MessageModel model = new MessageModel();
String alarmReason = ValidationUtil.isEmpty(equipmentSpecificAlarmLog.getAlarmReason()) ? "" : equipmentSpecificAlarmLog.getAlarmReason();
if (alarmReason.contains(":")) {
String[] split = alarmReason.split(":");
alarmReason = split[1];
}
model.setTitle(equipmentSpecificAlarmLog.getEquipmentSpecificIndexName());
String body = String.format("警情类型:%s;报警设备:%s;报警位置:%s;报警原因:%s;报警时间:%s",
equipmentSpecificAlarmLog.getEquipmentSpecificIndexName(), equipmentSpecificAlarmLog.getEquipmentSpecificName(),
equipmentSpecificAlarmLog.getLocation(), alarmReason,
DateUtils.date2LongStr(equipmentSpecificAlarmLog.getCreateDate()));
model.setBody(body);
model.setMsgType("iotMonitor");
if (isSendApp) {
model.setIsSendApp(true);
model.setTerminal("APP/WEB");
} else {
model.setIsSendApp(false);
model.setTerminal("WEB");
}
model.setIsSendWeb(true);
model.setCategory(1);
model.setRelationId(String.valueOf(equipmentSpecificAlarmLog.getId()));
Token token = remoteSecurityService.getServerToken();
systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
log.info(String.format("调用平台消息服务成功:%s", JSON.toJSONString(model)));
} catch (Exception e) {
// e.printStackTrace();
log.error("告警消息同步平台失败:syncSystemctlMsg,===>>>" + e.getMessage());
}
}
public List<EquipmentSpecificAlarm> addIndexAlarmRecord(EquipmentSpecificIndex equipmentSpcIndex, List<IotDataVO> iotDatalist) {
// 处理火眼视频异常
EquipmentSpecificIndex equipmentSpecificIndex = handleTemperatureAlarm(equipmentSpcIndex, iotDatalist);
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
EquipmentSpecificAlarm equipmentSpecificAlarm = new EquipmentSpecificAlarm();
equipmentSpecificAlarm.setSystemIds(equipmentSpcIndex.getSystemId());
equipmentSpecificAlarm.setSystemCodes(this.getSystemCodeBySpeId(equipmentSpcIndex.getSystemId()));
if (EquipmentRiskTypeEnum.GZ.getCode().equals(equipmentSpecificIndex.getTypeCode()) || EquipmentRiskTypeEnum.HZGJ.getCode().equals(equipmentSpecificIndex.getTypeCode())
|| EquipmentRiskTypeEnum.PB.getCode().equals(equipmentSpecificIndex.getTypeCode())) {
List<EquipmentSpecificAlarm> indexAlarms = equipmentSpecificAlarmMapper.findEquipmentSpecificAlarmByEquipmentSpecificIdAndEquipmentIndexIdAndStatusIstrue(equipmentSpecificIndex.getEquipmentSpecificId(), equipmentSpecificIndex.getEquipmentIndexId());
// NB设备告警
if (verifyNB(equipmentSpecificIndex.getNameKey())) {
return getNbEquipAlarmList(indexAlarms, equipmentSpecificIndex, equipmentSpecificAlarm);
}
// 报警表新增信息
if (ObjectUtils.isEmpty(indexAlarms) && (TrueOrFalseEnum.real.value.equals(equipmentSpecificIndex.getValue()))) {
addEquipmentSpecificAlarm(equipmentSpecificAlarms, equipmentSpecificIndex, equipmentSpecificAlarm);
} else {
// 报警表更新信息
indexAlarms.forEach(action -> {
if (TrueOrFalseEnum.real.value.equals(equipmentSpecificIndex.getValue())) {
// 报警,修改发生频率
action.setFrequency((action.getFrequency() + 1));
} else {
// 报警恢复,修改数据为恢复状态
action.setRecoveryDate(new Date());
action.setEquipmentSpecificIndexValue(TrueOrFalseEnum.fake.value);
action.setStatus(AlarmStatusEnum.HF.getCode());
}
action.setUpdateDate(new Date());
// 更新所在系统,设备可能编辑过,更新所在系统、装备名称、装备定义code
action.setSystemIds(equipmentSpcIndex.getSystemId());
action.setSystemCodes(this.getSystemCodeBySpeId(equipmentSpcIndex.getSystemId()));
action.setEquipmentSpecificName(equipmentSpcIndex.getEquipmentSpecificName());
action.setEquipmentCode(equipmentSpcIndex.getEquipmentCode());
// 冗余字段,alarm_log表更新时使用
action.setEquipmentSpecificCode(equipmentSpcIndex.getEquipmentSpecificCode());
equipmentSpecificAlarms.add(action);
});
}
}
return equipmentSpecificAlarms;
}
private String getSystemCodeBySpeId(String sysIds) {
if (StringUtil.isNotEmpty(sysIds)) {
List<FireFightingSystemEntity> sys = fireFightingSystemMapper.getFightingSysByIds(sysIds.split(","));
return sys.stream().map(FireFightingSystemEntity::getCode).collect(Collectors.joining(","));
} else {
return null;
}
}
private EquipmentAlarmReportDay addEquipAlarmReportRecord(EquipmentSpecificIndex equipmentSpecificIndex) {
EquipmentAlarmReportDay equipmentAlarmReportDay = new EquipmentAlarmReportDay();
equipmentAlarmReportDay.setOrgCode(equipmentSpecificIndex.getOrgCode());
equipmentAlarmReportDay.setReportDate(new Date());
equipmentAlarmReportDay.setUpdateDate(new Date());
equipmentAlarmReportDay.setSystemIds(equipmentSpecificIndex.getSystemId());
equipmentAlarmReportDay.setLastReportDate(new Date());
equipmentAlarmReportDay.setEquipmentSpecificId(equipmentSpecificIndex.getEquipmentSpecificId());
equipmentAlarmReportDay.setEquipmentSpecificName(equipmentSpecificIndex.getEquipmentSpecificName());
equipmentAlarmReportDay.setEquipmentDetailId(equipmentSpecificIndex.getEquipmentDetailId());
equipmentAlarmReportDay.setEquipmentId(equipmentSpecificIndex.getEquipmentId());
equipmentAlarmReportDay.setEquipmentCode(equipmentSpecificIndex.getEquipmentCode());
equipmentAlarmReportDay.setIndexTrueNum(TrueOrFalseEnum.real.value.equals(equipmentSpecificIndex.getValue()) ? 1L : 0L);
equipmentAlarmReportDay.setAlarmType(equipmentSpecificIndex.getTypeCode());
equipmentAlarmReportDay.setAlarmTypeName(equipmentSpecificIndex.getTypeName());
equipmentAlarmReportDay.setIndexId(equipmentSpecificIndex.getEquipmentIndexId());
equipmentAlarmReportDay.setIndexName(equipmentSpecificIndex.getEquipmentSpecificIndexName());
equipmentAlarmReportDay.setIndexType(equipmentSpecificIndex.getNameKey());
equipmentAlarmReportDay.setValue(equipmentSpecificIndex.getValue());
equipmentAlarmReportDay.setIsAlarm(equipmentSpecificIndex.getIsAlarm());
return equipmentAlarmReportDay;
}
private EquipmentSpecificAlarmLog addEquipAlarmLogRecord(EquipmentSpecificAlarm equipmentSpecificAlarm) {
EquipmentSpecificAlarmLog equipmentSpecificAlarmLog = new EquipmentSpecificAlarmLog();
BeanUtils.copyProperties(equipmentSpecificAlarm, equipmentSpecificAlarmLog);
equipmentSpecificAlarmLog.setId(null);
equipmentSpecificAlarmLog.setCreateDate(new Date());
equipmentSpecificAlarmLog.setEquipmentSpecificAlarmId(equipmentSpecificAlarm.getId());
boolean bool = equipmentSpecificAlarmLogService.save(equipmentSpecificAlarmLog);
// 同步告警消息给平台
if (amosSwitch && bool) {
EquipmentSpecificAlarmLog alarmLog = equipmentSpecificAlarmLogService.getById(equipmentSpecificAlarmLog.getId());
new Thread(new Runnable() {
@Override
public void run() {
syncSystemctlMsg(alarmLog);
}
}).start();
}
return equipmentSpecificAlarmLog;
}
/**
* 添加告警
*
* @param equipmentSpecificAlarms
* @param equipmentSpecificIndex
* @return
*/
private List<EquipmentSpecificAlarm> addEquipmentSpecificAlarm(List<EquipmentSpecificAlarm> equipmentSpecificAlarms, EquipmentSpecificIndex equipmentSpecificIndex, EquipmentSpecificAlarm equipmentSpecificAlarm) {
BeanUtils.copyProperties(equipmentSpecificIndex, equipmentSpecificAlarm);
equipmentSpecificAlarm.setId(null);
equipmentSpecificAlarm.setFrequency(1);
equipmentSpecificAlarm.setStatus(AlarmStatusEnum.BJ.getCode());
equipmentSpecificAlarm.setEquipmentSpecificIndexKey(equipmentSpecificIndex.getNameKey());
equipmentSpecificAlarm.setEquipmentSpecificIndexValue(equipmentSpecificIndex.getValue());
equipmentSpecificAlarm.setEquipmentSpecificIndexLabel(equipmentSpecificIndex.getValueLabel());
equipmentSpecificAlarm.setType(equipmentSpecificIndex.getTypeCode());
equipmentSpecificAlarm.setCreateDate(new Date());
equipmentSpecificAlarm.setUpdateDate(new Date());
equipmentSpecificAlarm.setEquipmentCode(equipmentSpecificIndex.getEquipmentCode());
equipmentSpecificAlarm.setEquipmentId(equipmentSpecificIndex.getEquipmentId());
equipmentSpecificAlarm.setEquipmentDetailId(equipmentSpecificIndex.getEquipmentDetailId());
equipmentSpecificAlarm.setEquipmentSpecificCode(equipmentSpecificIndex.getEquipmentSpecificCode());
equipmentSpecificAlarm.setEmergencyLevel(equipmentSpecificIndex.getEmergencyLevel());
equipmentSpecificAlarm.setEmergencyLevelColor(equipmentSpecificIndex.getEmergencyLevelColor());
equipmentSpecificAlarm.setEmergencyLevelDescribe(equipmentSpecificIndex.getEmergencyLevelDescribe());
equipmentSpecificAlarms.add(equipmentSpecificAlarm);
return equipmentSpecificAlarms;
}
// NB装备告警
private List<EquipmentSpecificAlarm> getNbEquipAlarmList(List<EquipmentSpecificAlarm> indexAlarms, EquipmentSpecificIndex equipmentSpecificIndex, EquipmentSpecificAlarm equipmentSpecificAlarm) {
List<EquipmentSpecificAlarm> equipmentSpecificAlarmList = new ArrayList<>();
if (ValidationUtil.isEmpty(indexAlarms)) { // 告警表为空,新增告警数据
addEquipmentSpecificAlarm(equipmentSpecificAlarmList, equipmentSpecificIndex, equipmentSpecificAlarm);
if (!checkStateIsNormal(equipmentSpecificAlarm, equipmentSpecificIndex)) {
return equipmentSpecificAlarmList;
} else {
equipmentSpecificAlarmList.clear();
}
} else {
indexAlarms.forEach(action -> {
// 状态为正常或报警解除
if (checkStateIsNormal(action, equipmentSpecificIndex)) {
//修改报警数据为正常
action.setRecoveryDate(new Date());
action.setStatus(AlarmStatusEnum.HF.getCode());
// 修改之前数据为已处理
action.setResolveResult(action.getAlamReason());
action.setConfirmUserName("系统");
action.setConfirmType(action.getType());
equipmentSpecificAlarmLogService.updateAlarmLogByIotCodeAndIndexKey(action);
} else {
action.setFrequency((action.getFrequency() + 1));
}
// 更新所在系统,设备可能编辑过
action.setSystemIds(equipmentSpecificIndex.getSystemId());
action.setSystemCodes(this.getSystemCodeBySpeId(equipmentSpecificIndex.getSystemId()));
action.setUpdateDate(new Date());
equipmentSpecificAlarmList.add(action);
});
}
return equipmentSpecificAlarmList;
}
private boolean ifSendToGis(List<CarIndexGisVo> list) {
boolean flag = true;
List<CarIndexGisVo> list1 = list.stream().filter(x -> CarForGisEnum.JD.getNameKey().equals(x.getNameKey())).collect(Collectors.toList());
List<CarIndexGisVo> list2 = list.stream().filter(x -> CarForGisEnum.WD.getNameKey().equals(x.getNameKey())).collect(Collectors.toList());
for (CarIndexGisVo gisVo : list) {
if (CarForGisEnum.JD.getNameKey().equals(gisVo.getNameKey())) {
if (!StringUtil.isNotEmpty(gisVo.getValue()) || "0".equals(gisVo.getValue())) {
flag = false;
continue;
}
}
if (CarForGisEnum.WD.getNameKey().equals(gisVo.getNameKey()) || "0".equals(gisVo.getValue())) {
if (!StringUtil.isNotEmpty(gisVo.getValue())) {
flag = false;
continue;
}
}
}
if (list1.size() == 0 || list2.size() == 0) {
flag = false;
}
return flag;
}
/**
* 接收到的IOT数据为火眼存储到Map中
*/
private void iotDataListToCacheMap(List<IotDataVO> iotDatalist) {
List<IotDataVO> iotDataVOs = iotDatalist.stream().filter(x -> "alarmLevel".equals(x.getKey()) || "alarmType".equals(x.getKey()) || "temperature".equals(x.getKey())
|| "ruleTemperature".equals(x.getKey()) || "thermometryUnit".equals(x.getKey()) || "alarmRule".equals(x.getKey())).collect(Collectors.toList());
if (iotDataVOs.size() > 0) {
Map<String, Object> map = iotDatalist.stream().collect(Collectors.toMap(IotDataVO::getKey, IotDataVO::getValue));
putTemperatureMap(map.get("traceId").toString(), map);
}
}
/**
* 处理火眼逻辑合并为一条告警
*/
private EquipmentSpecificIndex handleTemperatureAlarm(EquipmentSpecificIndex equipmentSpecificIndex, List<IotDataVO> iotDatalist) {
List<IotDataVO> collect = iotDatalist.stream().filter(x -> "traceId".equals(x.getKey())).collect(Collectors.toList());
if (collect.size() > 0 && temperatureMapIsEmpty(String.valueOf(collect.get(0).getValue()))) {
String traceId = String.valueOf(collect.get(0).getValue());
TemperatureAlarmDto dto = temperatureMap.get(traceId);
equipmentSpecificIndex.setEquipmentSpecificIndexName(AlarmTypeEnum.getTypeByCode(AlarmTypeEnum.GZGJ.getCode()));
equipmentSpecificIndex.setNameKey(AlarmTypeEnum.GZGJ.getCode());
equipmentSpecificIndex.setAlamReason(TemperatureAlarm.getAlarmContent(dto.getAlarmLevel(), dto.getAlarmType(), dto.getAlarmRule(), dto.getRuleTemperature(), dto.getTemperature(), dto.getThermometryUnit()));
equipmentSpecificIndex.setValue("true");
equipmentSpecificIndex.setIsAlarm(1);
temperatureMap.remove(traceId);
}
return equipmentSpecificIndex;
}
/**
* temperatureMap存储数据
*/
private void putTemperatureMap(String traceId, Map<String, Object> map) {
TemperatureAlarmDto cacheTemperatureAlarmDto = temperatureMap.get(traceId);
if (ValidationUtil.isEmpty(cacheTemperatureAlarmDto)) {
cacheTemperatureAlarmDto = new TemperatureAlarmDto();
}
TemperatureAlarmDto newMap = JSON.parseObject(JSON.toJSONString(map), TemperatureAlarmDto.class);
BeanUtil.copyPropertiesIgnoreNull(newMap, cacheTemperatureAlarmDto);
temperatureMap.put(traceId, cacheTemperatureAlarmDto);
}
private boolean temperatureMapIsEmpty(String traceId) {
TemperatureAlarmDto dto = temperatureMap.get(traceId);
if (!ValidationUtil.isEmpty(dto) && !ValidationUtil.isEmpty(dto.getAlarmLevel()) && !ValidationUtil.isEmpty(dto.getAlarmType()) && !ValidationUtil.isEmpty(dto.getAlarmRule())
&& !ValidationUtil.isEmpty(dto.getRuleTemperature()) && !ValidationUtil.isEmpty(dto.getTemperature()) && !ValidationUtil.isEmpty(dto.getThermometryUnit())) {
return true;
}
return false;
}
/**
* 判断是否为NB物联监测设备参数
*
* @param nameKey
* @return
*/
private boolean verifyNB(String nameKey) {
return nameKey.startsWith("NB_");
}
/**
* 判断状态为正常或报警解除
*/
private boolean checkStateIsNormal(EquipmentSpecificAlarm equipmentSpecificAlarm, EquipmentSpecificIndex equipmentSpecificIndex) {
String enumKey = String.format("%s_%s", equipmentSpecificAlarm.getEquipmentSpecificIndexKey(), equipmentSpecificIndex.getValue());
equipmentSpecificAlarm.setAlamReason(equipmentSpecificAlarm.getEquipmentSpecificIndexName().replace("NB_", "")
+ ":" + NBalarmEnum.getDescByKey(enumKey));
boolean flag = false;
if (!ValidationUtil.isEmpty(equipmentSpecificAlarm.getType())
&& !ValidationUtil.isEmpty(equipmentSpecificAlarm.getIotCode())
&& !ValidationUtil.isEmpty(equipmentSpecificAlarm.getEquipmentSpecificIndexKey())
&& (NBalarmEnum.NB_liquid_level_state_0.getKey().equals(enumKey)
|| NBalarmEnum.NB_error_code_0.getKey().equals(enumKey)
|| NBalarmEnum.NB_battery_state_0.getKey().equals(enumKey)
|| NBalarmEnum.NB_hydraulic_state_0.getKey().equals(enumKey)
|| NBalarmEnum.NB_hydraulic_state_2.getKey().equals(enumKey)
|| NBalarmEnum.NB_hydraulic_state_4.getKey().equals(enumKey)
|| NBalarmEnum.NB_alarm_status_4.getKey().equals(enumKey))) {
flag = true;
}
return flag;
}
/**
* 装备实时数据处理
* @param iotDatalist
* @param indexList
* @param topicEntity
*/
private void equipRealTimeDate(List<IotDataVO> iotDatalist, List<EquipmentSpecificIndex> indexList, TopicEntityVo topicEntity){
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
List<IndexStateVo> indexStateList = new ArrayList<>();
// 存储温度数据至内存中(火眼)
iotDataListToCacheMap(iotDatalist);
iotDatalist.forEach( iotDataVO -> {
for (EquipmentSpecificIndex equipmentSpecificIndex : indexList) {
if (!ObjectUtils.isEmpty(equipmentSpecificIndex.getNameKey())
&& equipmentSpecificIndex.getNameKey().toLowerCase().equals(iotDataVO.getKey().toLowerCase())) {
EquipmentSpecificIndex equipmentSpeIndex = new EquipmentSpecificIndex();
BeanUtils.copyProperties(equipmentSpecificIndex, equipmentSpeIndex);
String value = iotDataVO.getValue().toString();
equipmentSpeIndex.setValue(value);
equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpecificIndex.getValueEnum()));
equipmentSpeIndex.setUpdateDate(new Date());
equipmentSpecificIndexService.updateById(equipmentSpeIndex);
equipmentSpecificIndexList.add(equipmentSpeIndex);
indexStateList.add(createIndexStateVo(equipmentSpeIndex));
// 向预控系统发送消息
sendEquipSpecIndexToAutosysTopic(equipmentSpeIndex);
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
//火眼数据构造告警指标逻辑
equipmentSpecificIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
//指标告警处理
if(equipmentSpecificIndex.getIsAlarm() !=null && 1 == equipmentSpecificIndex.getIsAlarm()){
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpecificIndex));
}
}
}
});
// 首页性能指标数据订阅
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
//组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
//数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 报警数据保存
saveOrUpdateEquipAlarm(equipmentSpecificAlarms);
//则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
//向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
}
public void saveOrUpdateEquipAlarm(List<EquipmentSpecificAlarm> equipmentSpecificAlarms){
if(ObjectUtils.isEmpty(equipmentSpecificAlarms)){
return;
}
equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
List<EquipmentSpecificAlarmLog> equipmentAlarmLogs = new ArrayList<>();
equipmentSpecificAlarms.forEach(action->{
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
equipmentAlarmLogs.add(addEquipAlarmLogRecord(action));
if (ValidationUtil.isEmpty(action.getAlamContent())) {
action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
}
mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
} else {
equipmentAlarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(), equipmentSpecificAlarmLogService));
mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.TRUE;
}
});
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("seqNo", UUID.randomUUID().toString().replace("-", "").toLowerCase());
mqttSendGateway.sendToMqtt(TopicEnum.ALARM_LOG_INSERT.getTopic(), jsonObject.toString());
mqttSendGateway.sendToMqtt(TopicEnum.EQZXDT.getTopic(), "");
// 数字换流站数据处理(高斯库同步及南瑞告警推送)
if (syncSwitch) {
List<FireEquipmentFireAlarm> alarmList = createFireEquipmentFireAlarmVo(equipmentAlarmLogs);
if (!CollectionUtils.isEmpty(alarmList)) {
Map<String, List<FireEquipmentFireAlarm>> collect = alarmList.stream().collect(Collectors.groupingBy(FireEquipmentFireAlarm::getType));
for (String key : collect.keySet()) {
List<FireEquipmentFireAlarm> list = collect.get(key);
if (!CollectionUtils.isEmpty(list)) {
if ("FIREALARM".equalsIgnoreCase(key)) {
syncDataService.syncCreatedFireEquipAlarm(list);
} else if ("BREAKDOWN".equalsIgnoreCase(key)) {
List<FireEquipmentFaultAlarm> faultAlarms = list.stream().map(x -> {
FireEquipmentFaultAlarm fireEquipmentFaultAlarm = new FireEquipmentFaultAlarm();
BeanUtils.copyProperties(x, fireEquipmentFaultAlarm);
return fireEquipmentFaultAlarm;
}).collect(Collectors.toList());
syncDataService.syncCreatedFireEquiptFaultAlarm(faultAlarms);
} else if ("SHIELD".equalsIgnoreCase(key)) {
List<FireEquipmentDefectAlarm> defectAlarms = list.stream().map(x -> {
FireEquipmentDefectAlarm fireEquipmentFaultAlarm = new FireEquipmentDefectAlarm();
BeanUtils.copyProperties(x, fireEquipmentFaultAlarm);
return fireEquipmentFaultAlarm;
}).collect(Collectors.toList());
syncDataService.syncCreatedFireEquipDefectAlarm(defectAlarms);
}
}
}
}
// 向南瑞平台推送报警消息
if(!bool){
syncDataService.syncCreatedSendAlarm(equipmentAlarmLogs);
}
}
}
});
}
/**
* 组装数字换流站平台告警数据
* @param
* @return
*/
private List<FireEquipmentFireAlarm> createFireEquipmentFireAlarmVo(List<EquipmentSpecificAlarmLog> equipmentAlarmLogs){
Map<String,String> stationInfo = equipmentSpecificMapper.getStationInfo().get(0);
List<FireEquipmentFireAlarm> alarmList = new ArrayList<>();
equipmentAlarmLogs.forEach(action->{
FireEquipmentFireAlarm alarm = new FireEquipmentFireAlarm();
BeanUtils.copyProperties(action, alarm);
alarm.setAliasname(StringUtil.toNotEmptyString(action.getEquipmentSpecificIndexName()));
alarm.setEquipmentMeasurementId(StringUtil.toNotEmptyString(action.getEquipmentIndexId().toString()));
alarm.setEquipmentMeasurementMRid(StringUtil.toNotEmptyString(action.getEquipmentIndexId().toString()));
alarm.setFieldLabel(StringUtil.toNotEmptyString(action.getEquipmentSpecificIndexKey()));
alarm.setFieldName(StringUtil.toNotEmptyString(action.getEquipmentSpecificIndexName()));
alarm.setFireEquipmentId(StringUtil.toNotEmptyString(action.getEquipmentSpecificId().toString()));
alarm.setFireEquipmentMRid(StringUtil.toNotEmptyString(action.getEquipmentSpecificCode()));
alarm.setFireEquipmentName(StringUtil.toNotEmptyString(action.getEquipmentSpecificName()));
alarm.setFrequency(1);
alarm.setId(StringUtil.toNotEmptyString(action.getId().toString()));
alarm.setMrid(action.getId().toString());
alarm.setName(action.getEquipmentSpecificIndexName());
alarm.setRecoveryDate(action.getUpdateDate());
alarm.setStationCode(StringUtil.toNotEmptyString(stationInfo.get("stationCode")));
alarm.setStationName(StringUtil.toNotEmptyString(stationInfo.get("stationName")));
alarm.setValue(StringUtil.toNotEmptyString(action.getEquipmentSpecificIndexValue()));
alarmList.add(alarm);
});
return alarmList;
}
/**
* 高斯库同步指标修改
* @param equipmentSpecificIndexList
*/
private void syncSpecificIndexsToGS(List<EquipmentSpecificIndex> equipmentSpecificIndexList){
if (!ObjectUtils.isEmpty(equipmentSpecificIndexList) && syncSwitch) {
// 数据同步
List<EquipmentIndexVO> fireEquipMeasurementCollect = new ArrayList<>();
equipmentSpecificIndexList.forEach(action->{
EquipmentIndexVO equipmentIndexVO = new EquipmentIndexVO();
BeanUtils.copyProperties(action, equipmentIndexVO);
fireEquipMeasurementCollect.add(equipmentIndexVO);
});
if (0 < fireEquipMeasurementCollect.size()) {
syncDataService.syncCreatedFireEquipMeasurement(fireEquipMeasurementCollect);
}
}
}
private IndexStateVo createIndexStateVo(EquipmentSpecificIndex equipmentSpecificIndex){
IndexStateVo indexStateVo = new IndexStateVo();
BeanUtils.copyProperties(equipmentSpecificIndex, indexStateVo);
indexStateVo.setId(equipmentSpecificIndex.getIotCode() + "_" + equipmentSpecificIndex.getNameKey());
indexStateVo.setData(equipmentSpecificIndex.getValue());
indexStateVo.setIndexKey(equipmentSpecificIndex.getNameKey());
return indexStateVo;
}
public List<EquipmentSpecificAlarm> createIndexAlarmRecord(EquipmentSpecificIndex equipmentSpcIndex) {
// 处理火眼视频异常
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
EquipmentSpecificAlarm equipmentSpecificAlarm = new EquipmentSpecificAlarm();
equipmentSpecificAlarm.setSystemIds(equipmentSpcIndex.getSystemId());
equipmentSpecificAlarm.setSystemCodes(this.getSystemCodeBySpeId(equipmentSpcIndex.getSystemId()));
List<EquipmentSpecificAlarm> indexAlarms = equipmentSpecificAlarmMapper.findEquipmentSpecificAlarmByEquipmentSpecificIdAndEquipmentIndexIdAndStatusIstrue(equipmentSpcIndex.getEquipmentSpecificId()
, equipmentSpcIndex.getEquipmentIndexId());
// NB设备告警
if (verifyNB(equipmentSpcIndex.getNameKey())) {
return getNbEquipAlarmList(indexAlarms, equipmentSpcIndex, equipmentSpecificAlarm);
}
// 报警表新增信息
if (ObjectUtils.isEmpty(indexAlarms) && (TrueOrFalseEnum.real.value.equals(equipmentSpcIndex.getValue()))) {
addEquipmentSpecificAlarm(equipmentSpecificAlarms, equipmentSpcIndex, equipmentSpecificAlarm);
} else {
// 报警表更新信息
indexAlarms.forEach(action -> {
if (TrueOrFalseEnum.real.value.equals(equipmentSpcIndex.getValue())) {
// 报警,修改发生频率
action.setFrequency((action.getFrequency() + 1));
} else {
// 报警恢复,修改数据为恢复状态
action.setRecoveryDate(new Date());
action.setEquipmentSpecificIndexValue(TrueOrFalseEnum.fake.value);
action.setStatus(AlarmStatusEnum.HF.getCode());
}
action.setUpdateDate(new Date());
// 更新所在系统,设备可能编辑过,更新所在系统、装备名称、装备定义code
action.setSystemIds(equipmentSpcIndex.getSystemId());
action.setSystemCodes(this.getSystemCodeBySpeId(equipmentSpcIndex.getSystemId()));
action.setEquipmentSpecificName(equipmentSpcIndex.getEquipmentSpecificName());
action.setEquipmentCode(equipmentSpcIndex.getEquipmentCode());
// 冗余字段,alarm_log表更新时使用
action.setEquipmentSpecificCode(equipmentSpcIndex.getEquipmentSpecificCode());
equipmentSpecificAlarms.add(action);
});
}
return equipmentSpecificAlarms;
}
/**
* 发送数据至换流站
* @param
*/
private void sendEquipSpecIndexToAutosysTopic(EquipmentSpecificIndex equipmentSpeIndex){
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
String topic = "";
if (TrueOrFalseEnum.real.value.equals(equipmentSpeIndex.getValue()) && EquipmentRiskTypeEnum.HZGJ.getCode().equals(equipmentSpeIndex.getTypeCode())) {
equipmentSpeIndex.setType(EquipmentRiskTypeEnum.HZGJ.getCode());
topic = String.format("%s.%s%s", serverName, "equipment/", RiskLeverForAutoSys.BJ.getCode());
} else if (TrueOrFalseEnum.real.value.equals(equipmentSpeIndex.getValue()) && EquipmentRiskTypeEnum.GZ.getCode().equals(equipmentSpeIndex.getTypeCode())) {
equipmentSpeIndex.setType(EquipmentRiskTypeEnum.GZ.getCode());
topic = String.format("%s.%s%s", serverName, "equipment/", RiskLeverForAutoSys.GZ.getCode());
} else {
equipmentSpeIndex.setType(EquipmentRiskTypeEnum.QT.getCode());
topic = String.format("%s.%s%s", serverName, "equipment/", RiskLeverForAutoSys.JC.getCode());
}
TopicEntityVo topicEntityVo = new TopicEntityVo();
topicEntityVo.setIotCode(equipmentSpeIndex.getIotCode());
topicEntityVo.setTopic(topic);
topicEntityVo.setMessage(JSON.toJSONString(equipmentSpeIndex));
mqttSendGateway.sendToMqtt(topic, JSON.toJSONString(topicEntityVo));
}
});
}
/**
* 组态大屏消息推送,设备表实时指标修改
* @param equipmentSpecificIndexList
* @param topicEntity
*/
public void intePageSysDataRefresh(List<EquipmentSpecificIndex> equipmentSpecificIndexList, TopicEntityVo topicEntity){
//TODO 数字化换流站组态屏数据推送,需要在事务提交之后,否侧事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
mqttSendGateway.sendToMqtt(TopicEnum.EQXXTJ.getTopic(), "");
iEquipmentSpecificSerivce.integrationPageSysDataRefresh(topicEntity.getCode());
iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpecificIndexList);
}
});
}
/**
* 更新数据报表表
* @param equipmentSpecificIndex
*/
private void saveEquipmentAlarmReportDay(EquipmentSpecificIndex equipmentSpecificIndex){
SimpleDateFormat sdf = new SimpleDateFormat(DateUtils.DATE_PATTERN);
EquipmentAlarmReportDay equipmentAlarmReportDay = addEquipAlarmReportRecord(equipmentSpecificIndex);
LambdaQueryWrapper<EquipmentAlarmReportDay> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(EquipmentAlarmReportDay::getReportDate, sdf.format(equipmentAlarmReportDay.getReportDate())).eq(EquipmentAlarmReportDay::getIndexId, equipmentAlarmReportDay.getIndexId())
.eq(EquipmentAlarmReportDay::getEquipmentSpecificId, equipmentAlarmReportDay.getEquipmentSpecificId());
List<EquipmentAlarmReportDay> reportDayList = iEquipmentAlarmReportDayService.list(wrapper);
if (reportDayList.isEmpty()) {
equipmentAlarmReportDay.setReportDate(new Date());
equipmentAlarmReportDay.setFrequency(1);
iEquipmentAlarmReportDayService.save(equipmentAlarmReportDay);
} else {
EquipmentAlarmReportDay reportDay = reportDayList.get(0);
reportDay.setLastReportDate(new Date());
reportDay.setValue(equipmentAlarmReportDay.getValue());
reportDay.setFrequency(reportDay.getFrequency() + 1);
reportDay.setIndexTrueNum(reportDay.getIndexTrueNum() == null ? equipmentAlarmReportDay.getIndexTrueNum() : reportDay.getIndexTrueNum() + equipmentAlarmReportDay.getIndexTrueNum());
iEquipmentAlarmReportDayService.updateById(reportDay);
}
}
private String valueTranslate(String value, String enumStr){
if(ObjectUtils.isEmpty(enumStr)){
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
private void carRealTimeDate(List<IotDataVO> iotDatalist, List<CarProperty> carProperties){
List<CarProperty> carIndexsList = new ArrayList<>();
iotDatalist.forEach(iotDataVO->{
// 对指标key为labels的数据处理
if (EquipmentIndexLabelsEnum.labels.name.equals(iotDataVO.getKey())) {
StringBuilder sb = new StringBuilder("equipmentOnCar_");
EquipmentIndexLabelsVo labelsVo = new EquipmentIndexLabelsVo();
Object obj = iotDataVO.getValue();
if (obj instanceof JSONArray) {
List<String> labelList = (List<String>) obj;
labelList.forEach(code -> {
String key = sb.append(code).toString();
labelsVo.setEquipmentIotCode(code);
labelsVo.setTime(new Date());
redisUtils.set(key, com.alibaba.fastjson.JSONObject.toJSONString(labelsVo), redisExpireTime);
});
}
}
List<CarPropertyVo> carPropertyVos = new ArrayList<>();
carProperties.forEach(carProperty->{
if (iotDataVO.getKey().equals(carProperty.getNameKey())) {
carProperty.setValue(iotDataVO.getValue().toString());
carProperty.setUpdateDate(new Date());
carPropertyVos.add(carPropertyToCarPropertyVo(carProperty));
carIndexsList.add(carProperty);
}
});
boolean updateBatchById = carPropertyService.updateBatchById(carIndexsList);
if(updateBatchById){
carTransactionSynch(carProperties,carPropertyVos);
}
});
}
/**
* 车辆数据推送及同步
* @param carProperties
* @param carPropertyVos
*/
public void carTransactionSynch(List<CarProperty> carProperties, List<CarPropertyVo> carPropertyVos){
//TODO 数字化换流站组态屏数据推送,需要在事务提交之后,否侧事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
List<CarIndexGisVo> list = createCarIndexGisVo(carProperties);
mqttSendGateway.sendToMqtt(TopicEnum.CARZXDT.getTopic(), "");
boolean flag = ifSendToGis(list);
if (flag) {
mqttSendGateway.sendToMqtt(carTopic, JSON.toJSONString(list));
}
if (syncSwitch) {
syncDataService.syncCreatedFireVehicleMeasurement(carPropertyVos);
}
}
});
}
private CarPropertyVo carPropertyToCarPropertyVo(CarProperty property){
CarPropertyVo carPropertyVo = new CarPropertyVo();
carPropertyVo.setCarId(property.getCarId());
carPropertyVo.setCreateDate(property.getCreateDate());
carPropertyVo.setId(property.getId());
carPropertyVo.setIsIot(1);
carPropertyVo.setMRid(property.getEquipmentIndexId().toString());
carPropertyVo.setName(property.getEquipmentIndexName());
carPropertyVo.setNameKey(property.getEquipmentIndexKey());
carPropertyVo.setSort(1);
carPropertyVo.setUnit(property.getUnitName());
carPropertyVo.setValue(property.getValue());
return carPropertyVo;
}
private List<CarIndexGisVo> createCarIndexGisVo(List<CarProperty> carProperties){
List<CarIndexGisVo> list = new ArrayList<>();
long id =0l;
String iotCode = "";
for(CarProperty action : carProperties){
CarIndexGisVo v = new CarIndexGisVo();
id = action.getCarId();
iotCode = action.getIotCode();
v.setId(action.getCarId());
v.setIotCode(action.getIotCode());
v.setNameKey(action.getEquipmentIndexKey());
v.setValue(ObjectUtils.isEmpty(action.getValue())?"0":action.getValue());
list.add(v);
}
CarIndexGisVo time = new CarIndexGisVo();
time.setId(id);
time.setIotCode(iotCode);
time.setNameKey(CarForGisEnum.SJ.getNameKey());
time.setValue(String.valueOf(new Date().getTime()));
list.add(time);
return list;
}
/**
* //若为物联设备,则更新拓扑节点数据及告警状态
* @param indexList
*/
public void updateNodeDateByEquipId(List<EquipmentSpecificIndex> indexList){
if (!ObjectUtils.isEmpty(indexList)) {
EquipmentVo equipmentVo = equipmentService.getEquipBySpecific(indexList.get(0).getEquipmentSpecificId());
if (equipmentVo.getIsIot().equals("1")) {
List<EquipmentSpecificAlarm> alarmList = equipmentSpecificAlarmService.getEquipListBySpecific(true, indexList.get(0).getEquipmentSpecificId());
topographyService.updateNodeDateByEquipId(indexList.get(0).getEquipmentSpecificId(), indexList, alarmList);
}
}
}
@Autowired
private RedisUtils redisUtils;
@Autowired
CarMapper carMapper;
@Autowired
IMainIotMonitorSerivce iMainIotMonitorSerivce;
@Autowired
private ISyncDataService syncDataService;
@Autowired
private IEquipmentAlarmReportDayService iEquipmentAlarmReportDayService;
@Autowired
private IEquipmentSpecificSerivce iEquipmentSpecificSerivce;
@Autowired
FireFightingSystemMapper fireFightingSystemMapper;
@Autowired
private SystemctlFeign systemctlFeign;
@Autowired
private RemoteSecurityService remoteSecurityService;
@Autowired
private TopographyService topographyService;
@Autowired
private IEquipmentService equipmentService;
@Value("${equipManage.name}")
private String serverName;
@Value("${mqtt.vehicle.topic}")
private String carTopic;
@Value("${equip.point.equipmentdata.topic}")
private String canvasTopic;
@Value("${equip.index.topic}")
private String indexTopic;
@Value("${spring.redis.expire.time}")
private long redisExpireTime;
@Value("${systemctl.sync.switch}")
private Boolean syncSwitch;
@Value("${systemctl.amos.switch}")
private Boolean amosSwitch;
@Value("${isSendApp}")
private Boolean isSendApp;
private static Boolean bool = Boolean.FALSE;
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIncrementMessage(String topic, String message) {
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic);
topicEntity.setMessage(message);
int endIndex = topic.lastIndexOf("/");
String iotCode = topic.substring(0, endIndex).replace("/", "");
topicEntity.setIotCode(iotCode);
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
if (eqIotCodeList.isEmpty()) {
log.info("该数据{}不存在!", iotCode);
return;
}
if (eqIotCodeList.size() > 1) {
log.info("有重复的{}数据!", iotCode);
}
EquipmentSpecificVo vo = eqIotCodeList.get(0);
topicEntity.setType(vo.getType());
topicEntity.setCode(vo.getCode());
JSONObject json = JSONObject.parseObject(message);
Iterator it = json.entrySet().iterator();
List<IotDataVO> iotDatalist = new ArrayList<IotDataVO>();
while (it.hasNext()) {
IotDataVO iotDataVO = new IotDataVO();
Map.Entry<String, Object> entry = (Map.Entry<String, Object>) it.next();
iotDataVO.setKey(entry.getKey());
iotDataVO.setValue(entry.getValue());
iotDatalist.add(iotDataVO);
}
if (ObjectUtils.isEmpty(iotDatalist)) {
return;
}
log.info(String.format("收到mqtt消息:%s", message));
realTimeDateProcessing(topicEntity, iotDatalist);
}
/**
* 物联数据处理
*
* @param topicEntity
* @param iotDatalist
*/
public void realTimeDateProcessing(TopicEntityVo topicEntity, List<IotDataVO> iotDatalist) {
String iotCode = topicEntity.getIotCode();
if (EquipAndCarEnum.equip.type.equals(topicEntity.getType())) {
List<EquipmentSpecificIndex> indexList = equipmentSpecificIndexService
.getEquipmentSpeIndexBySpeIotCode(iotCode);
if (ObjectUtils.isEmpty(indexList)) {
return;
}
equipRealTimeDate(iotDatalist, indexList, topicEntity);
} else {
List<CarProperty> carProperties = carPropertyService.getCarPropListByIotCode(iotCode);
if (ObjectUtils.isEmpty(carProperties)) {
return;
}
carRealTimeDate(iotDatalist, carProperties);
}
}
/**
* 装备实时数据处理
*
* @param iotDatalist
* @param indexList
* @param topicEntity
*/
public void equipRealTimeDate(List<IotDataVO> iotDatalist, List<EquipmentSpecificIndex> indexList,
TopicEntityVo topicEntity) {
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
List<IndexStateVo> indexStateList = new ArrayList<>();
// 存储温度数据至内存中(火眼)
iotDataListToCacheMap(iotDatalist);
iotDatalist.forEach(iotDataVO -> {
for (EquipmentSpecificIndex equipmentSpecificIndex : indexList) {
if (!ObjectUtils.isEmpty(equipmentSpecificIndex.getNameKey())
&& equipmentSpecificIndex.getNameKey().toLowerCase().equals(iotDataVO.getKey().toLowerCase())) {
EquipmentSpecificIndex equipmentSpeIndex = new EquipmentSpecificIndex();
BeanUtils.copyProperties(equipmentSpecificIndex, equipmentSpeIndex);
String value = iotDataVO.getValue().toString();
equipmentSpeIndex.setValue(value);
equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpecificIndex.getValueEnum()));
equipmentSpeIndex.setUpdateDate(new Date());
equipmentSpecificIndexService.updateById(equipmentSpeIndex);
// 更新设备表指标状态
iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
equipmentSpecificIndexList.add(equipmentSpeIndex);
indexStateList.add(createIndexStateVo(equipmentSpeIndex));
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
// 火眼数据构造告警指标逻辑
equipmentSpecificIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
// 指标告警处理
if (equipmentSpecificIndex.getIsAlarm() != null && 1 == equipmentSpecificIndex.getIsAlarm()) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpecificIndex));
}
}
}
});
// 报警数据保存
List<EquipmentSpecificAlarmLog> alarmLogs = saveOrUpdateEquipAlarm(equipmentSpecificAlarms);
// 需要在事务提交之后,否侧事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
// 向预控系统发送消息
sendEquipSpecIndexToAutosysTopic(equipmentSpecificIndexList);
// 首页性能指标数据订阅
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
// 组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
// 数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
// 向其他系统推送报警
equipmentAlarmLogsToOtherSystems(alarmLogs);
}
});
}
public void carRealTimeDate(List<IotDataVO> iotDatalist, List<CarProperty> carProperties) {
List<CarProperty> carIndexsList = new ArrayList<>();
iotDatalist.forEach(iotDataVO -> {
// 对指标key为labels的数据处理
if (EquipmentIndexLabelsEnum.labels.name.equals(iotDataVO.getKey())) {
StringBuilder sb = new StringBuilder("equipmentOnCar_");
EquipmentIndexLabelsVo labelsVo = new EquipmentIndexLabelsVo();
Object obj = iotDataVO.getValue();
if (obj instanceof JSONArray) {
List<String> labelList = (List<String>) obj;
labelList.forEach(code -> {
String key = sb.append(code).toString();
labelsVo.setEquipmentIotCode(code);
labelsVo.setTime(new Date());
redisUtils.set(key, com.alibaba.fastjson.JSONObject.toJSONString(labelsVo), redisExpireTime);
});
}
}
List<CarPropertyVo> carPropertyVos = new ArrayList<>();
carProperties.forEach(carProperty -> {
if (iotDataVO.getKey().equals(carProperty.getNameKey())) {
carProperty.setValue(iotDataVO.getValue().toString());
carProperty.setUpdateDate(new Date());
carPropertyVos.add(carPropertyToCarPropertyVo(carProperty));
carIndexsList.add(carProperty);
}
});
boolean updateBatchById = carPropertyService.updateBatchById(carIndexsList);
if (updateBatchById) {
carTransactionSynch(carProperties, carPropertyVos);
}
});
}
public static List<EquipmentSpecificAlarmLog> upAlarmLogStatus(String iotCode, String equipmentSpecificIndexKey,
IEquipmentSpecificAlarmLogService equipmentSpecificAlarmLogService) {
LambdaQueryWrapper<EquipmentSpecificAlarmLog> queryWrapper = new LambdaQueryWrapper();
queryWrapper.eq(EquipmentSpecificAlarmLog::getIotCode, iotCode);
queryWrapper.eq(EquipmentSpecificAlarmLog::getEquipmentSpecificIndexKey, equipmentSpecificIndexKey);
queryWrapper.ne(EquipmentSpecificAlarmLog::getStatus, AlarmStatusEnum.HF.getCode());
List<EquipmentSpecificAlarmLog> logs = equipmentSpecificAlarmLogService.getBaseMapper()
.selectList(queryWrapper);
logs.forEach(x -> {
x.setCleanTime(new Date());
x.setStatus(AlarmStatusEnum.HF.getCode());
});
if (!logs.isEmpty()) {
equipmentSpecificAlarmLogService.updateBatchById(logs);
}
return logs;
}
/**
* 画布数据消息推送
*
* @param indexList
*/
public void publishDataToCanvas(List<EquipmentSpecificIndex> indexList) {
if (!ObjectUtils.isEmpty(indexList)) {
EquipmentSpecificIndex index = indexList.stream().filter(x -> x.getUpdateDate() != null)
.sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
.collect(Collectors.toList()).get(0);
EquipmentStateVo equipmentStateVo = new EquipmentStateVo();
equipmentStateVo.setEquipName(index.getEquipmentSpecificName());
equipmentStateVo.setOrgCode(index.getOrgCode());
equipmentStateVo.setSpecificId(index.getEquipmentSpecificId());
equipmentStateVo.setEquipCode(index.getQrCode());
equipmentStateVo.setEquipIotCode(index.getIotCode());
equipmentStateVo.setStatus("");
equipmentStateVo.setColor(index.getEmergencyLevelColor());
// 添加性能指标项
equipmentStateVo.setSpeindexList(fireFightingSystemService.getSpeIndex(index.getEquipmentSpecificId()));
Map<String, Object> topicObject = new HashMap<>();
topicObject.put("equipCode", equipmentStateVo.getEquipCode());
if (TrueOrFalseEnum.real.value.toUpperCase().equals(index.getValue().toUpperCase())) {
topicObject.put("color", equipmentStateVo.getColor());
} else {
topicObject.put("color", "");
}
Map<String, Object> map = JSON.parseObject(JSON.toJSONString(equipmentStateVo));
map.put(canvasTopic, topicObject);
// 发送数据至画布
mqttSendGateway.sendToMqtt(canvasTopic, JSON.toJSONString(map));
}
}
/**
* 报警消息推送amos平台
*
* @param equipmentSpecificAlarmLog
*/
void syncSystemctlMsg(EquipmentSpecificAlarmLog equipmentSpecificAlarmLog) {
try {
MessageModel model = new MessageModel();
String alarmReason = ValidationUtil.isEmpty(equipmentSpecificAlarmLog.getAlarmReason()) ? ""
: equipmentSpecificAlarmLog.getAlarmReason();
if (alarmReason.contains(":")) {
String[] split = alarmReason.split(":");
alarmReason = split[1];
}
model.setTitle(equipmentSpecificAlarmLog.getEquipmentSpecificIndexName());
String body = String.format("警情类型:%s;报警设备:%s;报警位置:%s;报警原因:%s;报警时间:%s",
equipmentSpecificAlarmLog.getEquipmentSpecificIndexName(),
equipmentSpecificAlarmLog.getEquipmentSpecificName(), equipmentSpecificAlarmLog.getLocation(),
alarmReason, DateUtils.date2LongStr(equipmentSpecificAlarmLog.getCreateDate()));
model.setBody(body);
model.setMsgType("iotMonitor");
if (isSendApp) {
model.setIsSendApp(true);
model.setTerminal("APP/WEB");
} else {
model.setIsSendApp(false);
model.setTerminal("WEB");
}
model.setIsSendWeb(true);
model.setCategory(1);
model.setRelationId(String.valueOf(equipmentSpecificAlarmLog.getId()));
Token token = remoteSecurityService.getServerToken();
systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
log.info(String.format("调用平台消息服务成功:%s", JSON.toJSONString(model)));
} catch (Exception e) {
// e.printStackTrace();
log.error("告警消息同步平台失败:syncSystemctlMsg,===>>>" + e.getMessage());
}
}
private String getSystemCodeBySpeId(String sysIds) {
if (StringUtil.isNotEmpty(sysIds)) {
List<FireFightingSystemEntity> sys = fireFightingSystemMapper.getFightingSysByIds(sysIds.split(","));
return sys.stream().map(FireFightingSystemEntity::getCode).collect(Collectors.joining(","));
} else {
return null;
}
}
private EquipmentAlarmReportDay addEquipAlarmReportRecord(EquipmentSpecificIndex equipmentSpecificIndex) {
EquipmentAlarmReportDay equipmentAlarmReportDay = new EquipmentAlarmReportDay();
equipmentAlarmReportDay.setOrgCode(equipmentSpecificIndex.getOrgCode());
equipmentAlarmReportDay.setReportDate(new Date());
equipmentAlarmReportDay.setUpdateDate(new Date());
equipmentAlarmReportDay.setSystemIds(equipmentSpecificIndex.getSystemId());
equipmentAlarmReportDay.setLastReportDate(new Date());
equipmentAlarmReportDay.setEquipmentSpecificId(equipmentSpecificIndex.getEquipmentSpecificId());
equipmentAlarmReportDay.setEquipmentSpecificName(equipmentSpecificIndex.getEquipmentSpecificName());
equipmentAlarmReportDay.setEquipmentDetailId(equipmentSpecificIndex.getEquipmentDetailId());
equipmentAlarmReportDay.setEquipmentId(equipmentSpecificIndex.getEquipmentId());
equipmentAlarmReportDay.setEquipmentCode(equipmentSpecificIndex.getEquipmentCode());
equipmentAlarmReportDay
.setIndexTrueNum(TrueOrFalseEnum.real.value.equals(equipmentSpecificIndex.getValue()) ? 1L : 0L);
equipmentAlarmReportDay.setAlarmType(equipmentSpecificIndex.getTypeCode());
equipmentAlarmReportDay.setAlarmTypeName(equipmentSpecificIndex.getTypeName());
equipmentAlarmReportDay.setIndexId(equipmentSpecificIndex.getEquipmentIndexId());
equipmentAlarmReportDay.setIndexName(equipmentSpecificIndex.getEquipmentSpecificIndexName());
equipmentAlarmReportDay.setIndexType(equipmentSpecificIndex.getNameKey());
equipmentAlarmReportDay.setValue(equipmentSpecificIndex.getValue());
equipmentAlarmReportDay.setIsAlarm(equipmentSpecificIndex.getIsAlarm());
return equipmentAlarmReportDay;
}
private EquipmentSpecificAlarmLog addEquipAlarmLogRecord(EquipmentSpecificAlarm equipmentSpecificAlarm) {
EquipmentSpecificAlarmLog equipmentSpecificAlarmLog = new EquipmentSpecificAlarmLog();
BeanUtils.copyProperties(equipmentSpecificAlarm, equipmentSpecificAlarmLog);
equipmentSpecificAlarmLog.setId(null);
equipmentSpecificAlarmLog.setCreateDate(new Date());
equipmentSpecificAlarmLog.setEquipmentSpecificAlarmId(equipmentSpecificAlarm.getId());
boolean bool = equipmentSpecificAlarmLogService.save(equipmentSpecificAlarmLog);
// 同步告警消息给平台
if (amosSwitch && bool) {
EquipmentSpecificAlarmLog alarmLog = equipmentSpecificAlarmLogService
.getById(equipmentSpecificAlarmLog.getId());
new Thread(new Runnable() {
@Override
public void run() {
syncSystemctlMsg(alarmLog);
}
}).start();
}
return equipmentSpecificAlarmLog;
}
/**
* 添加告警
*
* @param equipmentSpecificAlarms
* @param equipmentSpecificIndex
* @return
*/
private List<EquipmentSpecificAlarm> addEquipmentSpecificAlarm(List<EquipmentSpecificAlarm> equipmentSpecificAlarms,
EquipmentSpecificIndex equipmentSpecificIndex, EquipmentSpecificAlarm equipmentSpecificAlarm) {
BeanUtils.copyProperties(equipmentSpecificIndex, equipmentSpecificAlarm);
equipmentSpecificAlarm.setId(null);
equipmentSpecificAlarm.setFrequency(1);
equipmentSpecificAlarm.setStatus(AlarmStatusEnum.BJ.getCode());
equipmentSpecificAlarm.setEquipmentSpecificIndexKey(equipmentSpecificIndex.getNameKey());
equipmentSpecificAlarm.setEquipmentSpecificIndexValue(equipmentSpecificIndex.getValue());
equipmentSpecificAlarm.setEquipmentSpecificIndexLabel(equipmentSpecificIndex.getValueLabel());
equipmentSpecificAlarm.setType(equipmentSpecificIndex.getTypeCode());
equipmentSpecificAlarm.setCreateDate(new Date());
equipmentSpecificAlarm.setUpdateDate(new Date());
equipmentSpecificAlarm.setEquipmentCode(equipmentSpecificIndex.getEquipmentCode());
equipmentSpecificAlarm.setEquipmentId(equipmentSpecificIndex.getEquipmentId());
equipmentSpecificAlarm.setEquipmentDetailId(equipmentSpecificIndex.getEquipmentDetailId());
equipmentSpecificAlarm.setEquipmentSpecificCode(equipmentSpecificIndex.getEquipmentSpecificCode());
equipmentSpecificAlarm.setEmergencyLevel(equipmentSpecificIndex.getEmergencyLevel());
equipmentSpecificAlarm.setEmergencyLevelColor(equipmentSpecificIndex.getEmergencyLevelColor());
equipmentSpecificAlarm.setEmergencyLevelDescribe(equipmentSpecificIndex.getEmergencyLevelDescribe());
equipmentSpecificAlarms.add(equipmentSpecificAlarm);
return equipmentSpecificAlarms;
}
// NB装备告警
private List<EquipmentSpecificAlarm> getNbEquipAlarmList(List<EquipmentSpecificAlarm> indexAlarms,
EquipmentSpecificIndex equipmentSpecificIndex, EquipmentSpecificAlarm equipmentSpecificAlarm) {
List<EquipmentSpecificAlarm> equipmentSpecificAlarmList = new ArrayList<>();
if (ValidationUtil.isEmpty(indexAlarms)) { // 告警表为空,新增告警数据
addEquipmentSpecificAlarm(equipmentSpecificAlarmList, equipmentSpecificIndex, equipmentSpecificAlarm);
if (!checkStateIsNormal(equipmentSpecificAlarm, equipmentSpecificIndex)) {
return equipmentSpecificAlarmList;
} else {
equipmentSpecificAlarmList.clear();
}
} else {
indexAlarms.forEach(action -> {
// 状态为正常或报警解除
if (checkStateIsNormal(action, equipmentSpecificIndex)) {
// 修改报警数据为正常
action.setRecoveryDate(new Date());
action.setStatus(AlarmStatusEnum.HF.getCode());
// 修改之前数据为已处理
action.setResolveResult(action.getAlamReason());
action.setConfirmUserName("系统");
action.setConfirmType(action.getType());
equipmentSpecificAlarmLogService.updateAlarmLogByIotCodeAndIndexKey(action);
} else {
action.setFrequency((action.getFrequency() + 1));
}
// 更新所在系统,设备可能编辑过
action.setSystemIds(equipmentSpecificIndex.getSystemId());
action.setSystemCodes(this.getSystemCodeBySpeId(equipmentSpecificIndex.getSystemId()));
action.setUpdateDate(new Date());
equipmentSpecificAlarmList.add(action);
});
}
return equipmentSpecificAlarmList;
}
private boolean ifSendToGis(List<CarIndexGisVo> list) {
boolean flag = true;
List<CarIndexGisVo> list1 = list.stream().filter(x -> CarForGisEnum.JD.getNameKey().equals(x.getNameKey()))
.collect(Collectors.toList());
List<CarIndexGisVo> list2 = list.stream().filter(x -> CarForGisEnum.WD.getNameKey().equals(x.getNameKey()))
.collect(Collectors.toList());
for (CarIndexGisVo gisVo : list) {
if (CarForGisEnum.JD.getNameKey().equals(gisVo.getNameKey())) {
if (!StringUtil.isNotEmpty(gisVo.getValue()) || "0".equals(gisVo.getValue())) {
flag = false;
continue;
}
}
if (CarForGisEnum.WD.getNameKey().equals(gisVo.getNameKey()) || "0".equals(gisVo.getValue())) {
if (!StringUtil.isNotEmpty(gisVo.getValue())) {
flag = false;
continue;
}
}
}
if (list1.size() == 0 || list2.size() == 0) {
flag = false;
}
return flag;
}
/**
* 接收到的IOT数据为火眼存储到Map中
*/
private void iotDataListToCacheMap(List<IotDataVO> iotDatalist) {
List<IotDataVO> iotDataVOs = iotDatalist.stream()
.filter(x -> "alarmLevel".equals(x.getKey()) || "alarmType".equals(x.getKey())
|| "temperature".equals(x.getKey()) || "ruleTemperature".equals(x.getKey())
|| "thermometryUnit".equals(x.getKey()) || "alarmRule".equals(x.getKey()))
.collect(Collectors.toList());
if (iotDataVOs.size() > 0) {
Map<String, Object> map = iotDatalist.stream()
.collect(Collectors.toMap(IotDataVO::getKey, IotDataVO::getValue));
putTemperatureMap(map.get("traceId").toString(), map);
}
}
/**
* 处理火眼逻辑合并为一条告警
*/
private EquipmentSpecificIndex handleTemperatureAlarm(EquipmentSpecificIndex equipmentSpecificIndex,
List<IotDataVO> iotDatalist) {
List<IotDataVO> collect = iotDatalist.stream().filter(x -> "traceId".equals(x.getKey()))
.collect(Collectors.toList());
if (collect.size() > 0 && temperatureMapIsEmpty(String.valueOf(collect.get(0).getValue()))) {
String traceId = String.valueOf(collect.get(0).getValue());
TemperatureAlarmDto dto = temperatureMap.get(traceId);
equipmentSpecificIndex
.setEquipmentSpecificIndexName(AlarmTypeEnum.getTypeByCode(AlarmTypeEnum.GZGJ.getCode()));
equipmentSpecificIndex.setNameKey(AlarmTypeEnum.GZGJ.getCode());
equipmentSpecificIndex.setAlamReason(
TemperatureAlarm.getAlarmContent(dto.getAlarmLevel(), dto.getAlarmType(), dto.getAlarmRule(),
dto.getRuleTemperature(), dto.getTemperature(), dto.getThermometryUnit()));
equipmentSpecificIndex.setValue("true");
equipmentSpecificIndex.setIsAlarm(1);
temperatureMap.remove(traceId);
}
return equipmentSpecificIndex;
}
/**
* temperatureMap存储数据
*/
private void putTemperatureMap(String traceId, Map<String, Object> map) {
TemperatureAlarmDto cacheTemperatureAlarmDto = temperatureMap.get(traceId);
if (ValidationUtil.isEmpty(cacheTemperatureAlarmDto)) {
cacheTemperatureAlarmDto = new TemperatureAlarmDto();
}
TemperatureAlarmDto newMap = JSON.parseObject(JSON.toJSONString(map), TemperatureAlarmDto.class);
BeanUtil.copyPropertiesIgnoreNull(newMap, cacheTemperatureAlarmDto);
temperatureMap.put(traceId, cacheTemperatureAlarmDto);
}
private boolean temperatureMapIsEmpty(String traceId) {
TemperatureAlarmDto dto = temperatureMap.get(traceId);
if (!ValidationUtil.isEmpty(dto) && !ValidationUtil.isEmpty(dto.getAlarmLevel())
&& !ValidationUtil.isEmpty(dto.getAlarmType()) && !ValidationUtil.isEmpty(dto.getAlarmRule())
&& !ValidationUtil.isEmpty(dto.getRuleTemperature()) && !ValidationUtil.isEmpty(dto.getTemperature())
&& !ValidationUtil.isEmpty(dto.getThermometryUnit())) {
return true;
}
return false;
}
/**
* 判断是否为NB物联监测设备参数
*
* @param nameKey
* @return
*/
private boolean verifyNB(String nameKey) {
return nameKey.startsWith("NB_");
}
/**
* 判断状态为正常或报警解除
*/
private boolean checkStateIsNormal(EquipmentSpecificAlarm equipmentSpecificAlarm,
EquipmentSpecificIndex equipmentSpecificIndex) {
String enumKey = String.format("%s_%s", equipmentSpecificAlarm.getEquipmentSpecificIndexKey(),
equipmentSpecificIndex.getValue());
equipmentSpecificAlarm.setAlamReason(equipmentSpecificAlarm.getEquipmentSpecificIndexName().replace("NB_", "")
+ ":" + NBalarmEnum.getDescByKey(enumKey));
boolean flag = false;
if (!ValidationUtil.isEmpty(equipmentSpecificAlarm.getType())
&& !ValidationUtil.isEmpty(equipmentSpecificAlarm.getIotCode())
&& !ValidationUtil.isEmpty(equipmentSpecificAlarm.getEquipmentSpecificIndexKey())
&& (NBalarmEnum.NB_liquid_level_state_0.getKey().equals(enumKey)
|| NBalarmEnum.NB_error_code_0.getKey().equals(enumKey)
|| NBalarmEnum.NB_battery_state_0.getKey().equals(enumKey)
|| NBalarmEnum.NB_hydraulic_state_0.getKey().equals(enumKey)
|| NBalarmEnum.NB_hydraulic_state_2.getKey().equals(enumKey)
|| NBalarmEnum.NB_hydraulic_state_4.getKey().equals(enumKey)
|| NBalarmEnum.NB_alarm_status_4.getKey().equals(enumKey))) {
flag = true;
}
return flag;
}
public List<EquipmentSpecificAlarmLog> saveOrUpdateEquipAlarm(
List<EquipmentSpecificAlarm> equipmentSpecificAlarms) {
List<EquipmentSpecificAlarmLog> equipmentAlarmLogs = new ArrayList<>();
if (ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
return equipmentAlarmLogs;
}
equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
equipmentAlarmLogs.add(addEquipAlarmLogRecord(action));
if (ValidationUtil.isEmpty(action.getAlamContent())) {
action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
}
mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
} else {
equipmentAlarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(),
equipmentSpecificAlarmLogService));
mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.TRUE;
}
});
return equipmentAlarmLogs;
}
/**
* 报警日志同步其他系统
*
* @param equipmentAlarmLogs
*/
public void equipmentAlarmLogsToOtherSystems(List<EquipmentSpecificAlarmLog> equipmentAlarmLogs) {
if (ObjectUtils.isEmpty(equipmentAlarmLogs)) {
return;
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("seqNo", UUID.randomUUID().toString().replace("-", "").toLowerCase());
mqttSendGateway.sendToMqtt(TopicEnum.ALARM_LOG_INSERT.getTopic(), jsonObject.toString());
mqttSendGateway.sendToMqtt(TopicEnum.EQZXDT.getTopic(), "");
// 数字换流站数据处理(高斯库同步及南瑞告警推送)
if (syncSwitch) {
List<FireEquipmentFireAlarm> alarmList = createFireEquipmentFireAlarmVo(equipmentAlarmLogs);
if (!CollectionUtils.isEmpty(alarmList)) {
Map<String, List<FireEquipmentFireAlarm>> collect = alarmList.stream()
.collect(Collectors.groupingBy(FireEquipmentFireAlarm::getType));
for (String key : collect.keySet()) {
List<FireEquipmentFireAlarm> list = collect.get(key);
if (!CollectionUtils.isEmpty(list)) {
if ("FIREALARM".equalsIgnoreCase(key)) {
syncDataService.syncCreatedFireEquipAlarm(list);
} else if ("BREAKDOWN".equalsIgnoreCase(key)) {
List<FireEquipmentFaultAlarm> faultAlarms = list.stream().map(x -> {
FireEquipmentFaultAlarm fireEquipmentFaultAlarm = new FireEquipmentFaultAlarm();
BeanUtils.copyProperties(x, fireEquipmentFaultAlarm);
return fireEquipmentFaultAlarm;
}).collect(Collectors.toList());
syncDataService.syncCreatedFireEquiptFaultAlarm(faultAlarms);
} else if ("SHIELD".equalsIgnoreCase(key)) {
List<FireEquipmentDefectAlarm> defectAlarms = list.stream().map(x -> {
FireEquipmentDefectAlarm fireEquipmentFaultAlarm = new FireEquipmentDefectAlarm();
BeanUtils.copyProperties(x, fireEquipmentFaultAlarm);
return fireEquipmentFaultAlarm;
}).collect(Collectors.toList());
syncDataService.syncCreatedFireEquipDefectAlarm(defectAlarms);
}
}
}
}
// 向南瑞平台推送报警消息
if (!bool) {
syncDataService.syncCreatedSendAlarm(equipmentAlarmLogs);
}
}
}
/**
* 组装数字换流站平台告警数据
*
* @param
* @return
*/
private List<FireEquipmentFireAlarm> createFireEquipmentFireAlarmVo(
List<EquipmentSpecificAlarmLog> equipmentAlarmLogs) {
Map<String, String> stationInfo = equipmentSpecificMapper.getStationInfo().get(0);
List<FireEquipmentFireAlarm> alarmList = new ArrayList<>();
equipmentAlarmLogs.forEach(action -> {
FireEquipmentFireAlarm alarm = new FireEquipmentFireAlarm();
BeanUtils.copyProperties(action, alarm);
alarm.setAliasname(StringUtil.toNotEmptyString(action.getEquipmentSpecificIndexName()));
alarm.setEquipmentMeasurementId(StringUtil.toNotEmptyString(action.getEquipmentIndexId().toString()));
alarm.setEquipmentMeasurementMRid(StringUtil.toNotEmptyString(action.getEquipmentIndexId().toString()));
alarm.setFieldLabel(StringUtil.toNotEmptyString(action.getEquipmentSpecificIndexKey()));
alarm.setFieldName(StringUtil.toNotEmptyString(action.getEquipmentSpecificIndexName()));
alarm.setFireEquipmentId(StringUtil.toNotEmptyString(action.getEquipmentSpecificId().toString()));
alarm.setFireEquipmentMRid(StringUtil.toNotEmptyString(action.getEquipmentSpecificCode()));
alarm.setFireEquipmentName(StringUtil.toNotEmptyString(action.getEquipmentSpecificName()));
alarm.setFrequency(1);
alarm.setId(StringUtil.toNotEmptyString(action.getId().toString()));
alarm.setMrid(action.getId().toString());
alarm.setName(action.getEquipmentSpecificIndexName());
alarm.setRecoveryDate(action.getUpdateDate());
alarm.setStationCode(StringUtil.toNotEmptyString(stationInfo.get("stationCode")));
alarm.setStationName(StringUtil.toNotEmptyString(stationInfo.get("stationName")));
alarm.setValue(StringUtil.toNotEmptyString(action.getEquipmentSpecificIndexValue()));
alarmList.add(alarm);
});
return alarmList;
}
/**
* 高斯库同步指标修改
*
* @param equipmentSpecificIndexList
*/
private void syncSpecificIndexsToGS(List<EquipmentSpecificIndex> equipmentSpecificIndexList) {
if (!ObjectUtils.isEmpty(equipmentSpecificIndexList) && syncSwitch) {
// 数据同步
List<EquipmentIndexVO> fireEquipMeasurementCollect = new ArrayList<>();
equipmentSpecificIndexList.forEach(action -> {
EquipmentIndexVO equipmentIndexVO = new EquipmentIndexVO();
BeanUtils.copyProperties(action, equipmentIndexVO);
fireEquipMeasurementCollect.add(equipmentIndexVO);
});
if (0 < fireEquipMeasurementCollect.size()) {
syncDataService.syncCreatedFireEquipMeasurement(fireEquipMeasurementCollect);
}
}
}
private IndexStateVo createIndexStateVo(EquipmentSpecificIndex equipmentSpecificIndex) {
IndexStateVo indexStateVo = new IndexStateVo();
BeanUtils.copyProperties(equipmentSpecificIndex, indexStateVo);
indexStateVo.setId(equipmentSpecificIndex.getIotCode() + "_" + equipmentSpecificIndex.getNameKey());
indexStateVo.setData(equipmentSpecificIndex.getValue());
indexStateVo.setIndexKey(equipmentSpecificIndex.getNameKey());
return indexStateVo;
}
public List<EquipmentSpecificAlarm> createIndexAlarmRecord(EquipmentSpecificIndex equipmentSpcIndex) {
// 处理火眼视频异常
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
EquipmentSpecificAlarm equipmentSpecificAlarm = new EquipmentSpecificAlarm();
equipmentSpecificAlarm.setSystemIds(equipmentSpcIndex.getSystemId());
equipmentSpecificAlarm.setSystemCodes(this.getSystemCodeBySpeId(equipmentSpcIndex.getSystemId()));
List<EquipmentSpecificAlarm> indexAlarms = equipmentSpecificAlarmMapper
.findEquipmentSpecificAlarmByEquipmentSpecificIdAndEquipmentIndexIdAndStatusIstrue(
equipmentSpcIndex.getEquipmentSpecificId(), equipmentSpcIndex.getEquipmentIndexId());
// NB设备告警
if (verifyNB(equipmentSpcIndex.getNameKey())) {
return getNbEquipAlarmList(indexAlarms, equipmentSpcIndex, equipmentSpecificAlarm);
}
// 报警表新增信息
if (ObjectUtils.isEmpty(indexAlarms) && (TrueOrFalseEnum.real.value.equals(equipmentSpcIndex.getValue()))) {
addEquipmentSpecificAlarm(equipmentSpecificAlarms, equipmentSpcIndex, equipmentSpecificAlarm);
} else {
// 报警表更新信息
indexAlarms.forEach(action -> {
if (TrueOrFalseEnum.real.value.equals(equipmentSpcIndex.getValue())) {
// 报警,修改发生频率
action.setFrequency((action.getFrequency() + 1));
} else {
// 报警恢复,修改数据为恢复状态
action.setRecoveryDate(new Date());
action.setEquipmentSpecificIndexValue(TrueOrFalseEnum.fake.value);
action.setStatus(AlarmStatusEnum.HF.getCode());
}
action.setUpdateDate(new Date());
// 更新所在系统,设备可能编辑过,更新所在系统、装备名称、装备定义code
action.setSystemIds(equipmentSpcIndex.getSystemId());
action.setSystemCodes(this.getSystemCodeBySpeId(equipmentSpcIndex.getSystemId()));
action.setEquipmentSpecificName(equipmentSpcIndex.getEquipmentSpecificName());
action.setEquipmentCode(equipmentSpcIndex.getEquipmentCode());
// 冗余字段,alarm_log表更新时使用
action.setEquipmentSpecificCode(equipmentSpcIndex.getEquipmentSpecificCode());
action.setBuildId(equipmentSpcIndex.getBuildId());
equipmentSpecificAlarms.add(action);
});
}
return equipmentSpecificAlarms;
}
/**
* 发送数据至换流站
*
* @param
*/
public void sendEquipSpecIndexToAutosysTopic(List<EquipmentSpecificIndex> equipmentSpeIndexs) {
equipmentSpeIndexs.forEach(equipmentSpeIndex -> {
String topic = "";
if (TrueOrFalseEnum.real.value.equals(equipmentSpeIndex.getValue())
&& EquipmentRiskTypeEnum.HZGJ.getCode().equals(equipmentSpeIndex.getTypeCode())) {
equipmentSpeIndex.setType(EquipmentRiskTypeEnum.HZGJ.getCode());
topic = String.format("%s.%s%s", serverName, "equipment/", RiskLeverForAutoSys.BJ.getCode());
} else if (TrueOrFalseEnum.real.value.equals(equipmentSpeIndex.getValue())
&& EquipmentRiskTypeEnum.GZ.getCode().equals(equipmentSpeIndex.getTypeCode())) {
equipmentSpeIndex.setType(EquipmentRiskTypeEnum.GZ.getCode());
topic = String.format("%s.%s%s", serverName, "equipment/", RiskLeverForAutoSys.GZ.getCode());
} else {
equipmentSpeIndex.setType(EquipmentRiskTypeEnum.QT.getCode());
topic = String.format("%s.%s%s", serverName, "equipment/", RiskLeverForAutoSys.JC.getCode());
}
TopicEntityVo topicEntityVo = new TopicEntityVo();
topicEntityVo.setIotCode(equipmentSpeIndex.getIotCode());
topicEntityVo.setTopic(topic);
topicEntityVo.setMessage(JSON.toJSONString(equipmentSpeIndex));
mqttSendGateway.sendToMqtt(topic, JSON.toJSONString(topicEntityVo));
});
}
/**
* 组态大屏消息推送
*
* @param equipmentSpecificIndexList
* @param topicEntity
*/
public void intePageSysDataRefresh(List<EquipmentSpecificIndex> equipmentSpecificIndexList,
TopicEntityVo topicEntity) {
mqttSendGateway.sendToMqtt(TopicEnum.EQXXTJ.getTopic(), "");
iEquipmentSpecificSerivce.integrationPageSysDataRefresh(topicEntity.getCode());
}
/**
* 更新数据报表表
*
* @param equipmentSpecificIndex
*/
private void saveEquipmentAlarmReportDay(EquipmentSpecificIndex equipmentSpecificIndex) {
SimpleDateFormat sdf = new SimpleDateFormat(DateUtils.DATE_PATTERN);
EquipmentAlarmReportDay equipmentAlarmReportDay = addEquipAlarmReportRecord(equipmentSpecificIndex);
LambdaQueryWrapper<EquipmentAlarmReportDay> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(EquipmentAlarmReportDay::getReportDate, sdf.format(equipmentAlarmReportDay.getReportDate()))
.eq(EquipmentAlarmReportDay::getIndexId, equipmentAlarmReportDay.getIndexId())
.eq(EquipmentAlarmReportDay::getEquipmentSpecificId, equipmentAlarmReportDay.getEquipmentSpecificId());
List<EquipmentAlarmReportDay> reportDayList = iEquipmentAlarmReportDayService.list(wrapper);
if (reportDayList.isEmpty()) {
equipmentAlarmReportDay.setReportDate(new Date());
equipmentAlarmReportDay.setFrequency(1);
iEquipmentAlarmReportDayService.save(equipmentAlarmReportDay);
} else {
EquipmentAlarmReportDay reportDay = reportDayList.get(0);
reportDay.setLastReportDate(new Date());
reportDay.setValue(equipmentAlarmReportDay.getValue());
reportDay.setFrequency(reportDay.getFrequency() + 1);
reportDay.setIndexTrueNum(reportDay.getIndexTrueNum() == null ? equipmentAlarmReportDay.getIndexTrueNum()
: reportDay.getIndexTrueNum() + equipmentAlarmReportDay.getIndexTrueNum());
iEquipmentAlarmReportDayService.updateById(reportDay);
}
}
private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
/**
* 车辆数据推送及同步
*
* @param carProperties
* @param carPropertyVos
*/
public void carTransactionSynch(List<CarProperty> carProperties, List<CarPropertyVo> carPropertyVos) {
// TODO 数字化换流站组态屏数据推送,需要在事务提交之后,否侧事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
List<CarIndexGisVo> list = createCarIndexGisVo(carProperties);
mqttSendGateway.sendToMqtt(TopicEnum.CARZXDT.getTopic(), "");
boolean flag = ifSendToGis(list);
if (flag) {
mqttSendGateway.sendToMqtt(carTopic, JSON.toJSONString(list));
}
if (syncSwitch) {
syncDataService.syncCreatedFireVehicleMeasurement(carPropertyVos);
}
}
});
}
private CarPropertyVo carPropertyToCarPropertyVo(CarProperty property) {
CarPropertyVo carPropertyVo = new CarPropertyVo();
carPropertyVo.setCarId(property.getCarId());
carPropertyVo.setCreateDate(property.getCreateDate());
carPropertyVo.setId(property.getId());
carPropertyVo.setIsIot(1);
carPropertyVo.setMRid(property.getEquipmentIndexId().toString());
carPropertyVo.setName(property.getEquipmentIndexName());
carPropertyVo.setNameKey(property.getEquipmentIndexKey());
carPropertyVo.setSort(1);
carPropertyVo.setUnit(property.getUnitName());
carPropertyVo.setValue(property.getValue());
return carPropertyVo;
}
private List<CarIndexGisVo> createCarIndexGisVo(List<CarProperty> carProperties) {
List<CarIndexGisVo> list = new ArrayList<>();
long id = 0l;
String iotCode = "";
for (CarProperty action : carProperties) {
CarIndexGisVo v = new CarIndexGisVo();
id = action.getCarId();
iotCode = action.getIotCode();
v.setId(action.getCarId());
v.setIotCode(action.getIotCode());
v.setNameKey(action.getEquipmentIndexKey());
v.setValue(ObjectUtils.isEmpty(action.getValue()) ? "0" : action.getValue());
list.add(v);
}
CarIndexGisVo time = new CarIndexGisVo();
time.setId(id);
time.setIotCode(iotCode);
time.setNameKey(CarForGisEnum.SJ.getNameKey());
time.setValue(String.valueOf(new Date().getTime()));
list.add(time);
return list;
}
/**
* //若为物联设备,则更新拓扑节点数据及告警状态
*
* @param indexList
*/
public void updateNodeDateByEquipId(List<EquipmentSpecificIndex> indexList) {
if (!ObjectUtils.isEmpty(indexList)) {
EquipmentVo equipmentVo = equipmentService.getEquipBySpecific(indexList.get(0).getEquipmentSpecificId());
if (equipmentVo.getIsIot().equals("1")) {
List<EquipmentSpecificAlarm> alarmList = equipmentSpecificAlarmService.getEquipListBySpecific(true,
indexList.get(0).getEquipmentSpecificId());
topographyService.updateNodeDateByEquipId(indexList.get(0).getEquipmentSpecificId(), indexList,
alarmList);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment