Commit 8d0a176d authored by wujiang's avatar wujiang

修改物联告警mqtt

parent fd28cd1e
......@@ -81,4 +81,10 @@ public class EquipmentSpecificIndex {
@TableField("unit")
private String unit;
@TableField("topic")
private String topic;
@TableField("group")
private String group;
}
......@@ -59,4 +59,10 @@ public class PointSystem extends BaseEntity {
@ApiModelProperty(value = "场站缩写")
@TableField("station_abbr")
private String stationAbbr;
@TableField("topic")
private String topic;
@TableField("group")
private String group;
}
......@@ -6,10 +6,9 @@ import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -20,8 +19,8 @@ import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yeejoin.amos.api.alarm.dto.DynamicDetails;
......@@ -267,8 +266,8 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
if ("SYZ".equals(pointSystem.getNumber())) {
// 如果开启升压站预警
if (warnSYZ) {
indexKey =new StringBuilder("WL-").append(pointSystem.getStation()).append("#").append(pointSystem.getNumber())
.append("#").append(pointSystem.getFunctionNum());
indexKey = new StringBuilder("WL-").append(pointSystem.getStation()).append("#")
.append(pointSystem.getNumber()).append("#").append(pointSystem.getFunctionNum());
} else {
System.out.println("升压站预警不发送: " + warningObjectCode + " , " + indexValue);
return null;
......@@ -289,4 +288,91 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
return WarningDto;
}
}
@Async("equipAsyncExecutor")
public void sendWarningMqttAsync(String topic, String msg) {
try {
JSONObject jsonObject = JSONObject.parseObject(msg);
String group = jsonObject.getString("group");
JSONObject valueObj = jsonObject.getJSONObject("value");
Set<String> keys = valueObj.keySet();
String address = null;
String value = null;
for (String key : keys) {
address = key;
value = valueObj.getString(key);
break;
}
this.sendWarningMqtt(address, value, topic, group);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendWarningMqtt(String address, String value, String topic, String group) {
QueryWrapper<EquipmentSpecificIndex> indexWrapper = new QueryWrapper<>();
indexWrapper.lambda().eq(EquipmentSpecificIndex::getIndexAddress, address);
indexWrapper.lambda().eq(EquipmentSpecificIndex::getTopic, topic);
indexWrapper.lambda().eq(EquipmentSpecificIndex::getGroup, group);
EquipmentSpecificIndex esi = equipmentSpecificIndexMapper.selectOne(indexWrapper);
String isAlarm = String.valueOf(esi.getIsAlarm().intValue());
// 对应 equipment库的wl_equipment_specific_index_alarm_dic表
String[] s = { "1", "7", "9" };
// 如果不满足择返回
if (!Arrays.asList(s).contains(isAlarm)) {
System.out.println("不满足告警类型: " + isAlarm);
return;
}
System.out.println("满足告警消息address: " + address + ",topic: " + topic + " ,value:" + value
+ " ,group: " + group + " ,isAlarm: " + isAlarm);
logger.info("满足告警消息address: " + address + ",topic: " + topic + " ,value:" + value
+ " ,group: " + group + " ,isAlarm: " + isAlarm);
LambdaQueryWrapper<PointSystem> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(PointSystem::getTopic, topic);
wrapper.eq(PointSystem::getGroup, group);
wrapper.eq(PointSystem::getAddress, address);
PointSystem pointSystem = pointSystemMapper.selectOne(wrapper);
String valueLabe=null;
JSONObject eqdata = new JSONObject();
// 调用获取设备相关信息
QueryWrapper<KKSData> KKSDataWrapper = new QueryWrapper<>();
KKSDataWrapper.lambda().eq(KKSData::getKKSBM, pointSystem.getKks());
KKSData KKSData = kksDataMapper.selectOne(KKSDataWrapper);
if (KKSData == null) {
// throw new RuntimeException("kks码查询热工院表不存在:" + pointSystem.getKks());
System.out.println("kks码查询热工院表不存在:" + pointSystem.getKks());
return;
}
eqdata.put("kksms", KKSData.getKKSMS());
QueryWrapper<StationBasic> stationWrapper = new QueryWrapper<>();
stationWrapper.lambda().eq(StationBasic::getStationNumber, pointSystem.getStation());
StationBasic stationBasic = stationBasicMapper.selectOne(stationWrapper);
if (stationBasic != null) {
eqdata.put("sourceAttribution", stationBasic.getProjectOrgCode());
eqdata.put("sourceAttributionDesc", stationBasic.getStationName());
} else {
// throw new RuntimeException("获取场站失败: " + pointSystem.getStation());
System.out.println("获取场站失败: " + pointSystem.getStation());
return;
}
try {
// 组装数据,发送预警
WarningDto warningDto = setWarningDto(pointSystem, eqdata, valueLabe);
if (warningDto != null) {
emqKeeper.getMqttClient().publish(STATIONWARNING, JSON.toJSONString(warningDto).getBytes(), 0, false);
System.out.println("发送预警成功: " + JSON.toJSONString(warningDto));
// logger.info("发送预警成功: " + JSON.toJSONString(warningDto));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment