Commit 074becfe authored by lisong's avatar lisong

代码同步

parent ff00c72f
......@@ -143,5 +143,15 @@ public class EquipmentSpecificIndex extends BaseEntity {
@TableField(exist = false)
private String buildId;
@ApiModelProperty(value = "装备系统code")
@TableField(exist = false)
private String specificCode;
@ApiModelProperty(value = "装备定义名称")
@TableField(exist = false)
private String equipmentName;
@ApiModelProperty(value = "是否遥测")
@TableField(exist = false)
private Integer isTrend;
}
package com.yeejoin.equipmanage.service.impl;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -21,6 +15,7 @@ import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import com.alibaba.fastjson.JSON;
......@@ -145,6 +140,18 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Autowired
private RedisUtils redisUtils;
@Value("${state.code:code}")
private String stationCode;
@Value("${state.name:name}")
private String stationName;
@Value("${is.open.telemetering:false}")
private Boolean isOpenTelemetering;
@Autowired
protected EmqKeeper emqKeeper;
@Autowired
CarMapper carMapper;
......@@ -254,6 +261,18 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
return;
}
log.info(String.format("收到mqtt消息:%s", message));
// 发送emq消息转kafka
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", topic);
jsonObject.put("data",message);
try {
emqKeeper.getMqttClient().publish("emq.iot.created",jsonObject.toString().getBytes(),1,false);
} catch (MqttException e) {
log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
}
realTimeDateProcessing(topicEntity, iotDatalist);
}
......@@ -685,7 +704,55 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
if(equipmentSpecificIndex.getIsAlarm() !=null && 1 == equipmentSpecificIndex.getIsAlarm()){
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpecificIndex));
}
// 遥测遥信数据推送云端kafka
JSONObject jsonObjectXf = new JSONObject();
jsonObjectXf.put("data_class", "realdata");
if(equipmentSpeIndex.getIsTrend() == 1) {
jsonObjectXf.put("data_type", "analog");
} else {
jsonObjectXf.put("data_type", "state");
}
String date = DateUtils.date2LongStr(new Date());
jsonObjectXf.put("op_type", "subscribe_emergency");
JSONObject jsonObjectCondition = new JSONObject();
jsonObjectCondition.put("station_psr_id", stationCode);
jsonObjectCondition.put("station_name", stationName);
jsonObjectCondition.put("data_upload_time", date );
jsonObjectXf.put("condition",jsonObjectCondition);
JSONObject jsonObjectData = new JSONObject();
jsonObjectData.put("psrId", stationCode);
jsonObjectData.put("astId", equipmentSpecificIndex.getSpecificCode());
jsonObjectData.put("equipType", equipmentSpecificIndex.getEquipmentCode());
jsonObjectData.put("name", equipmentSpecificIndex.getEquipmentSpecificName()+"-"+equipmentSpecificIndex.getEquipmentSpecificIndexName());
if(value.equals("true")) {
jsonObjectData.put("value","1");
} else if (value.equals("false")) {
jsonObjectData.put("value","0");
} else {
jsonObjectData.put("value", value);
}
jsonObjectData.put("measurementType",equipmentSpecificIndex.getEquipmentIndexKey());
jsonObjectData.put("dateTime",date);
jsonObjectData.put("quality","0"); // 量测质量码:0 有效,1 无效
List<JSONObject> jsonObjects = Arrays.asList(jsonObjectData);
jsonObjectXf.put("data", jsonObjects);
// 遥测
if(!isOpenTelemetering && equipmentSpeIndex.getIsTrend() == 1) {
} else {
try {
emqKeeper.getMqttClient().publish("emq.xf.created",jsonObjectXf.toString().getBytes(),1,false);
log.info("遥测遥信数据推送云端kafka成功");
} catch (MqttException e) {
log.error("遥测遥信数据推送云端kafka失败=====>" + e.getMessage());
e.printStackTrace();
}
}
}
}
});
......
......@@ -73,3 +73,6 @@ param.nrvideo.url=http://198.87.103.158:8001;
#南瑞视频平台通过视频id获取flv格式视频播放地址
param.nrflvbyvoideoid.url=http://192.168.4.159:10010/api/media/live
# 站端标识
state.code=JP
state.name=NAME
\ No newline at end of file
......@@ -45,12 +45,15 @@
wei.type_code AS typeCode,
wei.type_name AS typeName,
wei.name AS indexName,
ed.equipment_name AS equipmentName,
wei.unit AS indexUnitName,
wes.org_code AS orgCode,
wes.code AS specificCode,
ed.`name` AS equipmentSpecificName,
wes.iot_code AS iotCode,
wei.`name` AS equipmentSpecificIndexName,
wei.`value_enum` AS valueEnum,
wei.is_trend AS isTrend,
wes.qr_code AS qrCode,
wesi.update_date AS updateDate,
ed.code AS equipmentCode,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment