Commit 3f8d7d19 authored by litengwei's avatar litengwei

中心去掉物联信息生成告警信息逻辑

parent 3f79db0f
...@@ -140,7 +140,7 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -140,7 +140,7 @@ public class EquipmentIotMqttReceiveConfig {
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr); adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr);
adapter.setCompletionTimeout(completionTimeout); adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(0); adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel()); adapter.setOutputChannel(mqttInputChannel());
return adapter; return adapter;
} }
......
...@@ -409,53 +409,56 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -409,53 +409,56 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
String messageTraceId = JSON.toJSONString(messageObj); String messageTraceId = JSON.toJSONString(messageObj);
mqttSendGateway.sendToMqtt("influxdb/" + topic.substring(0, endIndex), messageTraceId); mqttSendGateway.sendToMqtt("influxdb/" + topic.substring(0, endIndex), messageTraceId);
} }
EquipmentSpecificVo vo = eqIotCodeList.get(0);
topicEntity.setType(vo.getType());
topicEntity.setCode(vo.getCode());
JSONObject json = JSONObject.parseObject(message); //
Iterator it = json.entrySet().iterator(); //
List<IotDataVO> iotDatalist = new ArrayList<>(); // EquipmentSpecificVo vo = eqIotCodeList.get(0);
String traceId = ""; // topicEntity.setType(vo.getType());
while (it.hasNext()) { // topicEntity.setCode(vo.getCode());
IotDataVO iotDataVO = new IotDataVO(); //
Map.Entry<String, Object> entry = (Map.Entry<String, Object>) it.next(); // JSONObject json = JSONObject.parseObject(message);
String key = entry.getKey(); // Iterator it = json.entrySet().iterator();
Object value = entry.getValue(); // List<IotDataVO> iotDatalist = new ArrayList<>();
iotDataVO.setKey(key); // String traceId = "";
iotDataVO.setValue(value); // while (it.hasNext()) {
if ("traceId".equalsIgnoreCase(key)) { // IotDataVO iotDataVO = new IotDataVO();
traceId = value.toString(); // Map.Entry<String, Object> entry = (Map.Entry<String, Object>) it.next();
continue; // String key = entry.getKey();
} // Object value = entry.getValue();
iotDatalist.add(iotDataVO); // iotDataVO.setKey(key);
} // iotDataVO.setValue(value);
if (ObjectUtils.isEmpty(iotDatalist)) { // if ("traceId".equalsIgnoreCase(key)) {
return; // traceId = value.toString();
} // continue;
log.info(String.format("收到mqtt消息:%s", message)); // }
// iotDatalist.add(iotDataVO);
// 发送emq消息转kafka // }
JSONObject jsonObject = new JSONObject(); // if (ObjectUtils.isEmpty(iotDatalist)) {
jsonObject.put("topic", topic); // return;
jsonObject.put("data", message); // }
// log.info(String.format("收到mqtt消息:%s", message));
try { //
emqKeeper.getMqttClient().publish("emq.iot.created", jsonObject.toString().getBytes(), 1, false); // // 发送emq消息转kafka
} catch (MqttException e) { // JSONObject jsonObject = new JSONObject();
log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage())); // jsonObject.put("topic", topic);
} // jsonObject.put("data", message);
//
if (!StringUtils.isEmpty(traceId)) { // try {
String finalTraceId = traceId; // emqKeeper.getMqttClient().publish("emq.iot.created", jsonObject.toString().getBytes(), 1, false);
List<IotDataVO> collect = iotDatalist.stream().map(x -> { // } catch (MqttException e) {
x.setTraceId(finalTraceId); // log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
return x; // }
}).collect(Collectors.toList()); //
realTimeDateProcessing(topicEntity, collect, vo); // if (!StringUtils.isEmpty(traceId)) {
} else { // String finalTraceId = traceId;
realTimeDateProcessing(topicEntity, iotDatalist, vo); // List<IotDataVO> collect = iotDatalist.stream().map(x -> {
} // x.setTraceId(finalTraceId);
// return x;
// }).collect(Collectors.toList());
// realTimeDateProcessing(topicEntity, collect, vo);
// } else {
// realTimeDateProcessing(topicEntity, iotDatalist, vo);
// }
} }
@Override @Override
...@@ -1929,16 +1932,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -1929,16 +1932,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// equipmentSpecificAlarmLog.setStationName(stationName); // equipmentSpecificAlarmLog.setStationName(stationName);
boolean bool = equipmentSpecificAlarmLogService.save(equipmentSpecificAlarmLog); boolean bool = equipmentSpecificAlarmLogService.save(equipmentSpecificAlarmLog);
// 同步告警消息给平台 // 同步告警消息给平台
if (amosSwitch && bool) { // if (amosSwitch && bool) {
EquipmentSpecificAlarmLog alarmLog = equipmentSpecificAlarmLogService // EquipmentSpecificAlarmLog alarmLog = equipmentSpecificAlarmLogService
.getById(equipmentSpecificAlarmLog.getId()); // .getById(equipmentSpecificAlarmLog.getId());
new Thread(new Runnable() { // new Thread(new Runnable() {
@Override // @Override
public void run() { // public void run() {
syncSystemctlMsg(alarmLog); // syncSystemctlMsg(alarmLog);
} // }
}).start(); // }).start();
} // }
return equipmentSpecificAlarmLog; return equipmentSpecificAlarmLog;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment