Commit a32d0d83 authored by 刘林's avatar 刘林

fix(equip):删除不用代码,添加异步多线程处理

parent 52113eb5
...@@ -28,14 +28,6 @@ public interface MqttReceiveService { ...@@ -28,14 +28,6 @@ public interface MqttReceiveService {
void handlerMqttRomaMessage(String topic, String message); void handlerMqttRomaMessage(String topic, String message);
/** /**
* 处理Iot消息数据
*
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttIotMessage(String topic, String message);
/**
* 中心级接收消息发送至消息服务 * 中心级接收消息发送至消息服务
* *
* @param topic * @param topic
......
...@@ -35,6 +35,7 @@ import org.springframework.beans.BeanUtils; ...@@ -35,6 +35,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronization;
...@@ -46,7 +47,6 @@ import org.typroject.tyboot.component.emq.EmqKeeper; ...@@ -46,7 +47,6 @@ import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil; import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import org.typroject.tyboot.core.restful.exception.instance.BadRequest; import org.typroject.tyboot.core.restful.exception.instance.BadRequest;
import org.typroject.tyboot.core.restful.utils.ResponseModel; import org.typroject.tyboot.core.restful.utils.ResponseModel;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.math.RoundingMode; import java.math.RoundingMode;
...@@ -367,6 +367,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -367,6 +367,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
} }
@Override @Override
@Async("equipAsyncExecutor")
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void handlerMqttRomaMessage(String topic, String message) { public void handlerMqttRomaMessage(String topic, String message) {
log.info("接收到Mqtt消息: {}", message); log.info("接收到Mqtt消息: {}", message);
...@@ -638,138 +639,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -638,138 +639,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}); });
} }
} }
/**
* 废弃代码,暂时不用,后期删除
* @param topic 主题
* @param message 消息内容
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIotMessage(String topic, String message) {
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
log.info("接收到iot消息: {}", message);
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic);
topicEntity.setMessage(message);
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<IndexStateVo> indexStateList = new ArrayList<>();
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String deviceCode = jsonObject.getString("deviceCode");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress, gatewayId);
if (equipmentSpeIndex == null) {
return;
}
equipmentSpeIndex.setValue(value);
equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpeIndex.getValueEnum()));
equipmentSpeIndex.setEquipmentType(topicEntity.getType());
equipmentSpeIndex.setUpdateDate(new Date());
equipmentSpeIndex.setGatewayId(gatewayId);
equipmentSpeIndex.setDataType(dataType);
equipmentSpeIndex.setTraceId(traceId);
equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
//更新装备性能指标
//equipmentSpecificIndexService.updateById(equipmentSpeIndex);
tagsMap.put("key", indexAddress + "_" + gatewayId);
fieldsMap.put("traceId", traceId);
fieldsMap.put("address", indexAddress);
fieldsMap.put("value", value);
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId());
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("equipmentIndexKey", equipmentSpeIndex.getEquipmentIndexKey());
fieldsMap.put("isAlarm", equipmentSpeIndex.getIsAlarm().toString());
fieldsMap.put("unit", equipmentSpeIndex.getUnit());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
//保存influxDB库
influxDbConnection.insert("iot_data", tagsMap, fieldsMap);
QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
EquipmentSpecific equipmentSpecific = iEquipmentSpecificSerivce.getOne(queryWrapper);
if (equipmentSpecific == null) {
return;
}
String iotCode = equipmentSpecific.getIotCode();
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
if (eqIotCodeList.isEmpty()) {
log.info("该数据{}不存在!", iotCode);
return;
}
if (eqIotCodeList.size() > 1) {
log.info("有重复的{}数据!", iotCode);
}
EquipmentSpecificVo equipmentSpecificVo = eqIotCodeList.get(0);
topicEntity.setType(equipmentSpecificVo.getType());
topicEntity.setCode(equipmentSpecificVo.getCode());
// 更新设备表指标状态
iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
equipmentSpecificIndexList.add(equipmentSpeIndex);
indexStateList.add(createIndexStateVo(equipmentSpeIndex));
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
// 需要在事务提交之后,否则事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
// 直流中心消息推送刷新
publishDataToDCCenterPage(equipmentSpecificIndexList);
// 四横八纵遥测信号信息列表刷新
publishNormalIndexValueToPage(equipmentSpecificIndexList);
if ("zd".equals(system)) {
System.out.println("站端系统----------------");
// 向预控系统发送消息
sendEquipSpecIndexToAutosysTopic(equipmentSpecificIndexList);
// 首页性能指标数据订阅
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
// 组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
// 数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
}
}
});
}
/** /**
* 物联数据处理 * 物联数据处理
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment