Commit 738177bd authored by maoying's avatar maoying

优化接收iot数据逻辑处理

parent d2d857a9
......@@ -31,7 +31,7 @@ public interface IdxFeign {
ResponseModel<JSONObject> queryDefectByBatchId(@PathVariable(value = "alarmLogId") String alarmLogId);
@RequestMapping(value = "/defect/check/list", method = RequestMethod.POST)
JSONObject queryDefectByCodes(@RequestBody(required = false) List<String> codes, @RequestParam String checkId);
JSONObject queryDefectByCodes(@RequestBody(required = false) List<String> codes, @RequestParam("checkId") String checkId);
@RequestMapping(value = "/qrcode/scrap/expired/put", method = RequestMethod.GET)
ResponseModel<JSONObject> handleEquipNotScrapWhenExpired(@RequestParam("equipId") String equipId,
......
......@@ -33,6 +33,14 @@ public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecif
* @Date 2020/11/3 17:58
*/
List<EquipmentSpecificIndex> getEquipmentSpeIndexBySpeIotCode(String iotCode);
/**
*
* @param iotCode
* @param indexKey
* @return
*/
List<EquipmentSpecificIndex> getEquipmentSpeIndexBySpeIotCodeAndIndexKey(@Param("iotCode") String iotCode, @Param("indexKey") String indexKey);
/**
* <pre>
......
......@@ -197,9 +197,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
*/
private final static String CAFS_WaterTank_WaterTankLevel = "CAFS_WaterTank_WaterTankLevel";
private final static String FHS_PressurePump_Start = "FHS_PressurePump_Start";
// private final static String FHS_PressurePump_Start = "FHS_PressurePump_Start";
private final static String FHS_PressurePump_Stop = "FHS_PressurePump_Stop";
// private final static String FHS_PressurePump_Stop = "FHS_PressurePump_Stop";
private static final String PUMP_JOB_GROUP_NAME = "EQUIP_PUMP_JOB_GROUP_NAME";
private static final String PUMP_TRIGGER_NAME = "EQUIP_PUMP_TRIGGER_NAME";
......@@ -518,7 +518,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
traceId = value.toString();
continue;
}
if (iotDataVO.getKey().equalsIgnoreCase(FHS_PressurePump_Start) && "true".equals(iotDataVO.getValue().toString())) {
if (iotDataVO.getKey().equalsIgnoreCase(pressurePumpStart) && "true".equals(iotDataVO.getValue().toString())) {
Integer pumNumHour = getPumNumHour(topicEntity);
iotDataVO.setPumNum(pumNumHour);
}
......@@ -822,10 +822,10 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
// 组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
//intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
// 数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
//syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
......@@ -1134,10 +1134,10 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
// 组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
//intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
// 数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
// syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
......@@ -1207,15 +1207,15 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
public void realTimeDateProcessing(TopicEntityVo topicEntity, List<IotDataVO> iotDatalist, EquipmentSpecificVo vo) {
String iotCode = topicEntity.getIotCode();
if (EquipAndCarEnum.equip.type.equals(topicEntity.getType())) {
List<EquipmentSpecificIndex> indexList = equipmentSpecificIndexService
.getEquipmentSpeIndexBySpeIotCode(iotCode);
List<EquipmentSpecificIndex> indexList = null;
// equipmentSpecificIndexService.getEquipmentSpeIndexBySpeIotCode(iotCode);
if (ObjectUtils.isEmpty(indexList)) {
return;
}
equipRealTimeDate(iotDatalist, indexList, topicEntity, vo);
String bizOrgCode = indexList.get(0).getBizOrgCode();
// redis缓存指定指标、指定时长物联数据
pressurePumpService.saveDataToRedis(iotDatalist, iotCode, bizOrgCode);
// String bizOrgCode = indexList.get(0).getBizOrgCode();
// // redis缓存指定指标、指定时长物联数据
// pressurePumpService.saveDataToRedis(iotDatalist, iotCode, bizOrgCode);
} else {
List<CarProperty> carProperties = carPropertyService.getCarPropListByIotCode(iotCode);
if (ObjectUtils.isEmpty(carProperties)) {
......@@ -1253,7 +1253,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
* @param indexList
* @param topicEntity
*/
public void equipRealTimeDate(List<IotDataVO> iotDatalist, List<EquipmentSpecificIndex> indexList,
public void equipRealTimeDate(List<IotDataVO> iotDatalist, List<EquipmentSpecificIndex> indexList1,
TopicEntityVo topicEntity, EquipmentSpecificVo vo) {
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
......@@ -1262,10 +1262,12 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
iotDataListToCacheMap(iotDatalist);
iotDatalist.forEach(iotDataVO -> {
boolean alarmFlag = false;
for (EquipmentSpecificIndex equipmentSpecificIndex : indexList) {
if (!ObjectUtils.isEmpty(equipmentSpecificIndex.getNameKey())
&& equipmentSpecificIndex.getNameKey().equalsIgnoreCase(iotDataVO.getKey())) {
List<EquipmentSpecificIndex> indexs = equipmentSpecificIndexMapper.getEquipmentSpeIndexBySpeIotCodeAndIndexKey(topicEntity.getIotCode(), iotDataVO.getKey());
if(StringUtils.isEmpty(indexs)){
return;
}
for (EquipmentSpecificIndex equipmentSpecificIndex : indexs) {
if (!ObjectUtils.isEmpty(equipmentSpecificIndex.getNameKey())) {
EquipmentSpecificIndex equipmentSpeIndex = new EquipmentSpecificIndex();
BeanUtils.copyProperties(equipmentSpecificIndex, equipmentSpeIndex);
String value = iotDataVO.getValue().toString();
......@@ -1311,11 +1313,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
indexStateList.add(createIndexStateVo(equipmentSpeIndex));
// 推送数据到组态大屏
pushDataToIntegrationPage(equipmentSpecificIndexList);
// 火眼数据构造告警指标逻辑
equipmentSpecificIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
// 告警日志表消息内容
......@@ -1335,7 +1333,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
doWaterStationWarning(equipmentSpecificIndex.getBizOrgCode(), equipmentSpecificIndex.getBizOrgName());
}
//稳压泵启动次数大于15次触发预警
if (iotDataVO.getKey().equalsIgnoreCase(FHS_PressurePump_Start) && "true".equals(iotDataVO.getValue().toString())) {
if (iotDataVO.getKey().equalsIgnoreCase(pressurePumpStart) && "true".equals(iotDataVO.getValue().toString())) {
doPressurePumInfo(iotDataVO.getPumNum(), equipmentSpecificIndex);
}
// 遥测数据生成告警事件、日志处理
......@@ -1351,7 +1349,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
saveEquipmentAlarmReportDay(equipmentSpeIndex, alarmFlag);
// 指标告警处理
if (equipmentSpecificIndex.getIsAlarm() != null && 1 == equipmentSpecificIndex.getIsAlarm() && !equipmentSpecificIndex.getEquipmentIndexKey().equals(pressurePumpStart)) {
if (equipmentSpecificIndex.getIsAlarm() != null && 1 == equipmentSpecificIndex.getIsAlarm()
// && !equipmentSpecificIndex.getEquipmentIndexKey().equals(pressurePumpStart)
) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpecificIndex, messageBodyMap));
}
// 遥测遥信数据推送云端kafka
......@@ -1437,7 +1437,11 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
iotDatalist.forEach(iotDataVO -> {
// redis缓存指定指标、指定时长物联数据
pressurePumpService.saveDataToRedis(iotDatalist, vo.getIotCode(), vo.getBizOrgCode());
iotDatalist.forEach(iotDataVO -> {
String indexKey = iotDataVO.getKey();
String indexValue = iotDataVO.getValue().toString();
// 稳压泵启停信号处理
......@@ -1469,7 +1473,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
// 四横八纵遥测信号信息列表刷新
publishNormalIndexValueToPage(equipmentSpecificIndexList);
// publishNormalIndexValueToPage(equipmentSpecificIndexList);
// 触发风险---> 站端发送消息到Message服务
publishDataToMessage(equipmentSpecificIndexList, isAlarm);
......@@ -1484,13 +1488,19 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
// 组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
//intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
// 数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 数字换流站同步指标修改
//syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
// 推送数据到组态大屏
pushDataToIntegrationPage(equipmentSpecificIndexList);
// 推送数据到组态大屏
pushDataToIntegrationPage(equipmentSpecificIndexList);
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
......@@ -1755,7 +1765,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
String startDate = DateUtil.format(DateUtil.offsetHour(new Date(), -1), DatePattern.NORM_DATETIME_PATTERN);
String prefix = topicEntity.getIotCode().substring(0, 8);
String suffix = topicEntity.getIotCode().substring(8);
ResponseModel<Map<String, Integer>> mapResponseModel = iotFeign.queryIotDataNumByIndex(startDate, endDate, prefix, suffix, FHS_PressurePump_Start, "true");
ResponseModel<Map<String, Integer>> mapResponseModel = iotFeign.queryIotDataNumByIndex(startDate, endDate, prefix, suffix, pressurePumpStart, "true");
if (200 == mapResponseModel.getStatus()) {
Map<String, Integer> result = mapResponseModel.getResult();
Integer totalNum = result.get("num");
......
......@@ -81,6 +81,57 @@
wes.iot_code = #{iotCode}
and wei.is_iot = true
</select>
<select id="getEquipmentSpeIndexBySpeIotCodeAndIndexKey"
resultType="com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex">
SELECT wesi.id AS id,
wei.name_key AS nameKey,
wesi.value AS value,
wesi.equipment_specific_id AS equipmentSpecificId,
wesi.equipment_index_id AS equipmentIndexId,
wesi.equipment_index_name AS equipmentIndexName,
wesi.equipment_index_key AS equipmentIndexKey,
wesi.value_label AS valueLabel,
wei.type_code AS typeCode,
wei.type_name AS typeName,
wei.name AS indexName,
wei.unit AS indexUnitName,
wes.org_code AS orgCode,
wes.name AS equipmentSpecificName,
ed.equipment_name AS equipmentName,
wes.iot_code AS iotCode,
wes.code AS specificCode,
wei.`name` AS equipmentSpecificIndexName,
wei.`value_enum` AS valueEnum,
wei.is_trend AS isTrend,
wes.qr_code AS qrCode,
wesi.update_date AS updateDate,
ed.code AS equipmentCode,
ed.equipment_id AS equipmentId,
ed.id AS equipmentDetailId,
wes.code as equipmentSpecificCode,
wes.system_id as systemId,
wesi.is_alarm as isAlarm,
wesi.emergency_level_color as emergencyLevelColor,
wesi.emergency_level as emergencyLevel,
wesi.emergency_level_describe as emergencyLevelDescribe,
TRIM(CONCAT_WS(' ',wes.position,sd.description)) AS location,
sd.warehouse_structure_id AS buildId,
wes.biz_org_name AS bizOrgName,
wes.biz_org_code AS bizOrgCode,
wesi.alarm_rule AS alarmRule,
wesi.formula
FROM
wl_equipment_specific_index AS wesi
LEFT JOIN wl_equipment_specific AS wes
ON wes.id = wesi.equipment_specific_id
LEFT JOIN wl_equipment_detail ed ON ed.id = wes.equipment_detail_id
LEFT JOIN wl_equipment_index AS wei ON wei.id = wesi.equipment_index_id
LEFT JOIN wl_stock_detail sd ON sd.equipment_specific_id = wes.id
WHERE
wes.iot_code = #{iotCode}
AND wesi.equipment_index_key = #{indexKey}
</select>
<select id="getEquipmentSpeIndexByIotCode"
resultType="com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment