Commit 7864b12e authored by 刘林's avatar 刘林

fix(equip):换流站添加消息转发iot,存储influxdb,兼容绍兴换流站消息

parent 85d334d2
...@@ -358,18 +358,24 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -358,18 +358,24 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
JSONObject jsonObject = JSONObject.parseObject(message); JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("datatype"); String dataType = jsonObject.getString("datatype");
String indexAddress, value, timeStamp, quality = null; String indexAddress = null, value, timeStamp, quality = null;
//如果消息是遥信类型,进行指标转换 //如果消息是遥信类型,进行指标转换
if (dataType != null && dataType.equals("state")) { assert dataType != null;
if (dataType.equals("state")) {
indexAddress = jsonObject.getString("scadaid"); indexAddress = jsonObject.getString("scadaid");
value = jsonObject.getInteger("value") == 1 ? "true" : "false"; value = jsonObject.getInteger("value") == 1 ? "true" : "false";
timeStamp = jsonObject.getString("timestamp"); timeStamp = jsonObject.getString("timestamp");
} else { }else if (dataType.equals("analog")){
indexAddress = jsonObject.getString("key"); indexAddress = jsonObject.getString("key");
value = jsonObject.getString("value"); value = jsonObject.getString("value");
timeStamp = jsonObject.getString("time_stamp"); timeStamp = jsonObject.getString("time_stamp");
quality = jsonObject.getString("quality"); quality = jsonObject.getString("quality");
}else {
indexAddress = jsonObject.getString("key");
value = jsonObject.getFloat("value") == 0.0 ? "false" : "true";
timeStamp = jsonObject.getString("time_stamp");
quality = jsonObject.getString("quality");
} }
Map<Object, Object> equipmentIndexKeyMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS_KEY); Map<Object, Object> equipmentIndexKeyMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS_KEY);
if (equipmentIndexKeyMap.get(indexAddress) != null) { if (equipmentIndexKeyMap.get(indexAddress) != null) {
...@@ -389,7 +395,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -389,7 +395,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
iotDataVO.setValue(value); iotDataVO.setValue(value);
iotDatalist.add(iotDataVO); iotDatalist.add(iotDataVO);
// iEquipmentSpecificSerivce.getEquipSpecificDetailsByEquipmentId(equipmentSpeIndex.getEquipmentSpecificId());
QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>(); QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId()); queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
EquipmentSpecific equipmentSpecific = iEquipmentSpecificSerivce.getOne(queryWrapper); EquipmentSpecific equipmentSpecific = iEquipmentSpecificSerivce.getOne(queryWrapper);
...@@ -397,6 +402,13 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -397,6 +402,13 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
return; return;
} }
String iotCode = equipmentSpecific.getIotCode(); String iotCode = equipmentSpecific.getIotCode();
StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/');
String iotTopic = "influxdb/" + endIndex;
if (isSendIot) {
JSONObject msg = new JSONObject();
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
}
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode); List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment