Commit 0fce1fe6 authored by maoying's avatar maoying

添加从南南瑞平台过来的数据转发至中心

parent 277ecdc7
......@@ -537,6 +537,21 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
//向中心级推送从数据站过来的数据实时数据
JSONObject jsonObject = new JSONObject();
String zxTopic = endIndex+"/property";
JSONObject zxmsg = new JSONObject();
zxmsg.put("traceId", equipmentSpeIndex.getId() + "");
zxmsg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
jsonObject.put("topic", zxTopic);
jsonObject.put("data", zxmsg);
try {
emqKeeper.getMqttClient().publish("emq.iot.created", jsonObject.toString().getBytes(), 2, false);
} catch (MqttException e) {
log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
}
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
if (eqIotCodeList.isEmpty()) {
......@@ -830,6 +845,22 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
msg.put("traceId", equipmentSpeIndex.getId() + "");
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
//向中心级推送从数据站过来的数据实时数据
JSONObject jsonObject = new JSONObject();
String zxTopic = endIndex+"/property";
JSONObject zxmsg = new JSONObject();
zxmsg.put("traceId", equipmentSpeIndex.getId() + "");
zxmsg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
jsonObject.put("topic", zxTopic);
jsonObject.put("data", zxmsg);
try {
emqKeeper.getMqttClient().publish("emq.iot.created", jsonObject.toString().getBytes(), 2, false);
} catch (MqttException e) {
log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
}
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
if (eqIotCodeList.isEmpty()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment