Commit c3788de8 authored by KeYong's avatar KeYong

提交初始化消息发送代码

parent a730ef3d
package com.yeejoin.equipmanage.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.sun.org.apache.xpath.internal.operations.Bool;
import com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex;
import com.yeejoin.equipmanage.common.enums.ConfigPageTopicEnum;
import com.yeejoin.equipmanage.service.IEquipmentSpecificIndexSerivce;
import com.yeejoin.equipmanage.service.IFireFightingSystemService;
import javafx.concurrent.Task;
import lombok.extern.slf4j.Slf4j;
import org.apache.sis.util.Static;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONString;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import java.util.List;
import java.util.Map;
import java.sql.Time;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author DELL
......@@ -25,22 +38,50 @@ public class IntegratePageDataListener extends EmqxListener {
@Autowired
IEquipmentSpecificIndexSerivce equipmentSpecificIndexSerivce;
@Autowired
EmqKeeper emqKeeper;
@Override
private boolean initialized = Boolean.TRUE;
@Override
public void processMessage(String topic, MqttMessage message) throws Exception {
if(log.isInfoEnabled()){
log.info("收到消息主题:{},消息内容:{}",topic, message.toString());
}
Map msg = JSON.parseObject(message.toString());
Timer timer = new Timer();
if(msg.containsKey("request")){
String split = "/";
if(topic.contains(split)){
String code = topic.substring(topic.indexOf(split) + 1);
fireFightingSystemService.integrationPageSysData(code, false);
}
} else if ("EQUIP_INDEX_ON_SYSTEM_DETAIL".equalsIgnoreCase(topic)) {
log.warn(msg.toString());
// List<EquipmentSpecificIndex> list = equipmentSpecificIndexSerivce.listByIds();
} else if (ConfigPageTopicEnum.SYSTEMDETAIL.getTopic().equalsIgnoreCase(topic) && initialized) {
if (!ObjectUtils.isEmpty(msg.get("codes"))) {
List<String> list = JSON.parseArray(String.valueOf(msg.get("codes")), String.class);
list.parallelStream().forEach(x -> {
EquipmentSpecificIndex indexEntity = equipmentSpecificIndexSerivce.getById(x);
Map<String, String> map = new HashMap<>();
map.put("code", String.valueOf(indexEntity.getId()));
map.put("value", indexEntity.getValue());
map.put("status", indexEntity.getValue());
try {
emqKeeper.getMqttClient().publish(topic, JSON.toJSONString(map).getBytes(), 1, false);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
initialized = Boolean.FALSE;
}
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
initialized = Boolean.TRUE;
}
};
timer.schedule(timerTask, 3000);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment