Commit 02175635 authored by 刘林's avatar 刘林

fix(equip):对接Iot平台透传数据

parent 8196356b
...@@ -203,4 +203,8 @@ public class EquipmentSpecificIndex extends BaseEntity { ...@@ -203,4 +203,8 @@ public class EquipmentSpecificIndex extends BaseEntity {
@ApiModelProperty(value = "指标值枚举") @ApiModelProperty(value = "指标值枚举")
@TableField("value_enum") @TableField("value_enum")
private String valueEnum; private String valueEnum;
@ApiModelProperty(value = "网关标识")
@TableField(value = "gateway_id")
private String gatewayId;
} }
...@@ -125,6 +125,7 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -125,6 +125,7 @@ public class EquipmentIotMqttReceiveConfig {
list.add("+/+/property"); // 添加iot車輛裝備數據上報事件监听 list.add("+/+/property"); // 添加iot車輛裝備數據上報事件监听
list.add("+/+/event"); // 添加iot事件监听 list.add("+/+/event"); // 添加iot事件监听
list.add("+/+/transmit"); // 添加交换站事件监听 list.add("+/+/transmit"); // 添加交换站事件监听
list.add("+/+/perspective"); // 添加交换站事件监听
String[] arr = list.toArray(new String[list.size()]); String[] arr = list.toArray(new String[list.size()]);
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr); adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr);
adapter.setCompletionTimeout(completionTimeout); adapter.setCompletionTimeout(completionTimeout);
...@@ -150,6 +151,8 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -150,6 +151,8 @@ public class EquipmentIotMqttReceiveConfig {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg); mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
}else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)){ }else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttRomaMessage(topic,msg); mqttReceiveService.handlerMqttRomaMessage(topic,msg);
}else if (dataType.equals("perspective") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttIotMessage(topic,msg);
} }
} }
}; };
......
...@@ -114,5 +114,5 @@ public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecif ...@@ -114,5 +114,5 @@ public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecif
List<EquipmentSpecificIndex> getEquipIndexInIndex(@Param("list") List<String> listIndex); List<EquipmentSpecificIndex> getEquipIndexInIndex(@Param("list") List<String> listIndex);
EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress); EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId);
} }
...@@ -40,5 +40,5 @@ public interface IEquipmentSpecificIndexService extends IService<EquipmentSpecif ...@@ -40,5 +40,5 @@ public interface IEquipmentSpecificIndexService extends IService<EquipmentSpecif
* @param indexAddress indexAddress * @param indexAddress indexAddress
* @return EquipmentSpecificIndex * @return EquipmentSpecificIndex
*/ */
EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress); EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId);
} }
...@@ -24,4 +24,11 @@ public interface MqttReceiveService { ...@@ -24,4 +24,11 @@ public interface MqttReceiveService {
* @param message 消息内容 * @param message 消息内容
*/ */
void handlerMqttRomaMessage(String topic, String message); void handlerMqttRomaMessage(String topic, String message);
/**
* 处理Iot消息数据
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttIotMessage(String topic, String message);
} }
...@@ -30,7 +30,7 @@ public class EquipmentSpecificIndexServiceImpl extends ServiceImpl<EquipmentSpec ...@@ -30,7 +30,7 @@ public class EquipmentSpecificIndexServiceImpl extends ServiceImpl<EquipmentSpec
} }
@Override @Override
public EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress) { public EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId) {
return this.baseMapper.getEquipmentSpeIndexByIndexAddress(indexAddress); return this.baseMapper.getEquipmentSpeIndexByIndexAddress(indexAddress,gatewayId);
} }
} }
...@@ -340,6 +340,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -340,6 +340,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void handlerMqttRomaMessage(String topic, String message) { public void handlerMqttRomaMessage(String topic, String message) {
log.info("接收到Mqtt消息: {}", message);
TopicEntityVo topicEntity = new TopicEntityVo(); TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic); topicEntity.setTopic(topic);
topicEntity.setMessage(message); topicEntity.setMessage(message);
...@@ -351,12 +352,21 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -351,12 +352,21 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
JSONObject jsonObject = JSONObject.parseObject(message); JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("datatype"); String dataType = jsonObject.getString("datatype");
String indexAddress = dataType != null ? jsonObject.getString("key") : jsonObject.getString("scadaid"); String indexAddress,value,timeStamp,quality = null;
String quality = jsonObject.getString("quality");
String value = jsonObject.getInteger("value") ==1 ? "true":"false"; //如果消息是遥信类型,进行指标转换
String timeStamp = dataType != null ? jsonObject.getString("time_stamp") : jsonObject.getString("timestamp"); if(dataType != null && dataType.equals("state")){
indexAddress = jsonObject.getString("scadaid");
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress); value = jsonObject.getInteger("value") ==1 ? "true":"false";
timeStamp = jsonObject.getString("timestamp");
}else{
indexAddress = jsonObject.getString("key");
value = jsonObject.getString("value");
timeStamp = jsonObject.getString("time_stamp");
quality = jsonObject.getString("quality");
}
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress,null);
if (equipmentSpeIndex == null){ if (equipmentSpeIndex == null){
return; return;
} }
...@@ -588,6 +598,262 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -588,6 +598,262 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
} }
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIotMessage(String topic, String message) {
log.info("接收到iot消息: {}", message);
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic);
topicEntity.setMessage(message);
List<IotDataVO> iotDatalist = new ArrayList<>();
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
List<IndexStateVo> indexStateList = new ArrayList<>();
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String gatewayId = jsonObject.getString("gatewayId");
String value;
//如果消息是遥信类型,进行指标转换
if(dataType != null && dataType.equals("state")){
value = jsonObject.getString("value");
}else{
value = jsonObject.getInteger("value") ==1 ? "true":"false";
}
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress,gatewayId);
if (equipmentSpeIndex == null){
return;
}
equipmentSpeIndex.setValue(value);
equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpeIndex.getValueEnum()));
equipmentSpeIndex.setEquipmentType(topicEntity.getType());
equipmentSpeIndex.setUpdateDate(new Date());
equipmentSpeIndex.setGatewayId(gatewayId);
equipmentSpeIndex.setDataType(dataType);
equipmentSpeIndex.setTraceId(traceId);
equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
IotDataVO iotDataVO = new IotDataVO();
iotDataVO.setKey(equipmentSpeIndex.getNameKey());
iotDataVO.setValue(value);
iotDatalist.add(iotDataVO);
QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
EquipmentSpecific equipmentSpecific = iEquipmentSpecificSerivce.getOne(queryWrapper);
if (equipmentSpecific == null) {
return;
}
String iotCode = equipmentSpecific.getIotCode();
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
if (eqIotCodeList.isEmpty()) {
log.info("该数据{}不存在!", iotCode);
return;
}
if (eqIotCodeList.size() > 1) {
log.info("有重复的{}数据!", iotCode);
}
EquipmentSpecificVo equipmentSpecificVo = eqIotCodeList.get(0);
topicEntity.setType(equipmentSpecificVo.getType());
topicEntity.setCode(equipmentSpecificVo.getCode());
//es存储数据
eSeqService.saveESEquiplistSpecificBySystemESVO(equipmentSpeIndex, String.valueOf(equipmentSpecificVo.getSystemId()), equipmentSpecificVo.getSystemName());
//更新装备性能指标
equipmentSpecificIndexService.updateById(equipmentSpeIndex);
// 更新设备表指标状态
iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
equipmentSpecificIndexList.add(equipmentSpeIndex);
indexStateList.add(createIndexStateVo(equipmentSpeIndex));
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
// 火眼数据构造告警指标逻辑
equipmentSpeIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
boolean alarmFlag = false;
Map<String, String> messageBodyMap = new HashMap<>();
//管网压力、泡沫罐信息、水箱液位告警处理
if (iotDataVO.getKey().toLowerCase().equals(CAFS_FoamTank_FoamTankLevel.toLowerCase()) ||
FHS_PipePressureDetector_PipePressure.toLowerCase().equals(iotDataVO.getKey().toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(CAFS_WaterTank_WaterTankLevel.toLowerCase())) {
alarmFlag = doFoamTankLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
//消防水池液位处理
if (iotDataVO.getKey().toLowerCase().equals(FHS_FirePoolDevice_WaterLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_WirelessliquidDetector_WaterLevel.toLowerCase())) {
alarmFlag = doWaterPoolLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
// 遥测数据生成告警事件、日志处理
if (iotDataVO.getKey().toLowerCase().equals(CAFS_FoamTank_FoamTankLevel.toLowerCase()) ||
FHS_PipePressureDetector_PipePressure.toLowerCase().equals(iotDataVO.getKey().toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(CAFS_WaterTank_WaterTankLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_FirePoolDevice_WaterLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_WirelessliquidDetector_WaterLevel.toLowerCase())) {
handlingAlarms(equipmentSpeIndex, alarmFlag);
}
// 指标告警处理
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
}
// 遥测遥信数据推送云端kafka
JSONObject jsonObjectXf = new JSONObject();
jsonObjectXf.put("data_class", "realdata");
if (equipmentSpeIndex.getIsTrend() == 1) {
jsonObjectXf.put("data_type", "analog");
} else {
jsonObjectXf.put("data_type", "state");
}
String date = DateUtils.date2LongStr(new Date());
jsonObjectXf.put("op_type", "subscribe_emergency");
JSONObject jsonObjectCondition = new JSONObject();
jsonObjectCondition.put("station_psr_id", stationCode);
jsonObjectCondition.put("station_name", stationName);
jsonObjectCondition.put("data_upload_time", date);
jsonObjectXf.put("condition", jsonObjectCondition);
JSONObject jsonObjectData = new JSONObject();
jsonObjectData.put("psrId", stationCode);
jsonObjectData.put("astId", equipmentSpeIndex.getSpecificCode());
jsonObjectData.put("equipType", equipmentSpeIndex.getEquipmentCode());
jsonObjectData.put("name", equipmentSpeIndex.getEquipmentSpecificName() + "-" + equipmentSpeIndex.getEquipmentSpecificIndexName());
if (value.equals("true")) {
jsonObjectData.put("value", "1");
} else if (value.equals("false")) {
jsonObjectData.put("value", "0");
} else {
jsonObjectData.put("value", value);
}
jsonObjectData.put("measurementType", null == equipmentSpeIndex.getEquipmentIndexKey() ? "" : equipmentSpeIndex.getEquipmentIndexKey());
jsonObjectData.put("dateTime", date);
jsonObjectData.put("quality", "0"); // 量测质量码:0 有效,1 无效
List<JSONObject> jsonObjects = Arrays.asList(jsonObjectData);
jsonObjectXf.put("data", jsonObjects);
// 遥测
if (!isOpenTelemetering && equipmentSpeIndex.getIsTrend() == 1) {
} else {
try {
emqKeeper.getMqttClient().publish("emq.xf.created", jsonObjectXf.toString().getBytes(), 1, false);
log.info("遥测遥信数据推送云端kafka成功");
} catch (MqttException e) {
log.error("遥测遥信数据推送云端kafka失败=====>" + e.getMessage());
e.printStackTrace();
}
}
// 报警数据保存
List<EquipmentSpecificAlarmLog> alarmLogs = new ArrayList<>();
if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
}
// 需要在事务提交之后,否则事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action));
if (ValidationUtil.isEmpty(action.getAlamContent())) {
action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
}
mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
} else {
alarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(), action.getTraceId(),
equipmentSpecificAlarmLogService, false));
mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.TRUE;
}
});
// 直流中心消息推送刷新
publishDataToDCCenterPage(equipmentSpecificIndexList);
// 四横八纵遥测信号信息列表刷新
publishNormalIndexValueToPage(equipmentSpecificIndexList);
if ("zd".equals(system)) {
System.out.println("站端系统----------------");
// 向预控系统发送消息
sendEquipSpecIndexToAutosysTopic(equipmentSpecificIndexList);
// 首页性能指标数据订阅
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
// 组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
// 数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
// 向其他系统推送报警
equipmentAlarmLogsToOtherSystems(alarmLogs);
if (equipmentSpecificVo.getEcode() != null) {
String ecode = equipmentSpecificVo.getEcode();
boolean flag = false;
//消防泵
String[] strings = pumpCodes.split(",");
for (String string : strings) {
if (ecode.startsWith(string)) {
//通知>消防应急预案
topicEntity.setType("xfb");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
// 消防炮
String[] stringxfp = monitorCodes.split(",");
if (!flag) {
for (String string1 : stringxfp) {
if (ecode.startsWith(string1)) {
//通知>消防应急预案
topicEntity.setType("xfp");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
}
//消防水源
if (!flag) {
List<Map> lit = iEquipmentSpecificSerivce.getWater(equipmentSpecificVo.getId());
if (lit != null && lit.size() > 0) {
topicEntity.setType("xfsy");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
}
}
}
}
}
});
}
/** /**
* 物联数据处理 * 物联数据处理
* *
......
...@@ -904,4 +904,15 @@ ...@@ -904,4 +904,15 @@
</sql> </sql>
</changeSet> </changeSet>
<changeSet author="LiuLin" id="1686650059">
<preConditions onFail="MARK_RAN">
<not>
<columnExists tableName="wl_equipment_specific_index" columnName="gateway_id"/>
</not>
</preConditions>
<comment>新增属性字段 gateway_id</comment>
<sql>
alter table `wl_equipment_specific_index` add column `gateway_id` varchar(50) COMMENT '网关标识ID';
</sql>
</changeSet>
</databaseChangeLog> </databaseChangeLog>
\ No newline at end of file
...@@ -558,6 +558,8 @@ ...@@ -558,6 +558,8 @@
LEFT JOIN wl_equipment_index AS wei ON wei.id = wesi.equipment_index_id LEFT JOIN wl_equipment_index AS wei ON wei.id = wesi.equipment_index_id
WHERE WHERE
wesi.index_address = #{indexAddress} wesi.index_address = #{indexAddress}
AND wei.is_iot = true <if test="gatewayId != null">
AND wesi.gateway_id = #{gatewayId}
</if>
</select> </select>
</mapper> </mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment