Commit f034e394 authored by KeYong's avatar KeYong

更新

parent e86ab82b
......@@ -6,18 +6,66 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@Component
public class WebMqttComponent {
@Autowired
private EmqKeeper emqKeeper;
public void publish(String topic, String jsonStr) {
BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
ExecutorService service = null;
@PostConstruct
public void init() {
service = Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
this.emqKeeper.getMqttClient().publish(topic, jsonStr.getBytes(), 2, false);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
Message msg = queue.take();
emqKeeper.getMqttClient().publish(msg.getTopic(), msg.getJsonStr().getBytes(), 0, false);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
public void publish(String topic, String jsonStr) {
Message msg = new Message(topic, jsonStr);
queue.add(msg);
}
class Message {
private String topic;
private String jsonStr;
public Message(String topic, String jsonStr) {
this.topic = topic;
this.jsonStr = jsonStr;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getJsonStr() {
return jsonStr;
}
public void setJsonStr(String jsonStr) {
this.jsonStr = jsonStr;
}
}
}
......@@ -31,10 +31,10 @@ import org.springframework.messaging.MessageHandler;
@IntegrationComponentScan
public class WebMqttSubscribe {
@Value("${emqx.user-name}")
@Value("${emqx.client-user-name}")
private String userName;
@Value("${emqx.password}")
@Value("${emqx.client-password}")
private String password;
@Value("${emqx.broker}")
......
......@@ -159,21 +159,21 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
ContingencyPlanInstance contingencyPlanInstance = contingencyInstance.updateExtendColumn(planInstance);
ContingencyPlanInstance instance = this.repository.save(contingencyPlanInstance);
// 异步数据同步之消息发送
contingencyPlanInstanceDataSync(instance);
// contingencyPlanInstanceDataSync(instance);
return instance;
}
private void contingencyPlanInstanceDataSync(ContingencyPlanInstance instance) {
if (dataSyncSwitch) {
try {
dataSyncService.asyncInvoke(() -> {
dataSyncService.syncCreatedContingencyPlanInstance(instance);
});
} catch (Exception e) {
log.info("数据同步之消息发送. [method='{}']", "createInstanceRecord==>syncCreatedContingencyPlanInstance", e);
}
}
}
// private void contingencyPlanInstanceDataSync(ContingencyPlanInstance instance) {
// if (dataSyncSwitch) {
// try {
// dataSyncService.asyncInvoke(() -> {
// dataSyncService.syncCreatedContingencyPlanInstance(instance);
// });
// } catch (Exception e) {
// log.info("数据同步之消息发送. [method='{}']", "createInstanceRecord==>syncCreatedContingencyPlanInstance", e);
// }
// }
// }
/**
......@@ -224,8 +224,8 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
batchNo
);
// 异步数据同步之消息发送
contingencyOriginalDataDataSync(batchNo, update);
// 异步数据同步之消息发送 暂时屏蔽 原数字化1.0 同步数据
//contingencyOriginalDataDataSync(batchNo, update);
//使用原始数据触发规则
if ("CONFIRM".equals(buttonState)
......@@ -254,24 +254,7 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
log.info("stepCode:" + stepCode);
equipment = impAndFireEquipMapper.queryImpEqumtByFireEquipmt(Long.parseLong(contingencyRo.getFireEquipmentId()));
if (equipment != null) {
// 获取重点设备胚胎指标
// 获取遥信指标
List<Map> points = fireEquipPointMapper.getPointsByEquipmentIdAndType(equipment.getId(), "SWITCH");
HashMap<String, Integer> telesignallingMap = new HashMap<>();
for (Map map : points) {
telesignallingMap.put(map.get("code") + "", (ObjectUtils.isEmpty(map.get("value")) || "false".equals(map.get("value").toString())) ? 0 : 1);
}
contingencyRo.setTelesignallingMap(telesignallingMap);
//获取遥测指标
points = fireEquipPointMapper.getPointsByEquipmentIdAndType(equipment.getId(), "ANALOGUE");
HashMap<String, Double> telemetryMap = new HashMap<>();
for (Map map : points) {
telemetryMap.put(map.get("code") + "", Double.valueOf(ObjectUtils.isEmpty(map.get("value")) ? "0" : map.get("value").toString()));
}
contingencyRo.setTelemetryMap(telemetryMap);
contingencyRo.setEquipmentCode(equipment.getCode());
Map<String, Object> params = contingencyRo.getParams();
params.put("appKey", RequestContext.getAppKey());
params.put("product", RequestContext.getProduct());
......@@ -290,7 +273,7 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
}
return Optional.ofNullable(equipment);
}
// 异步数据同步之消息发送 暂时屏蔽 原数字化1.0 同步数据
private void contingencyOriginalDataDataSync(String batchNo, int update) {
if (update > 0 && dataSyncSwitch) {
try {
......@@ -383,7 +366,7 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
webMqttComponent.publish(topic, JSON.toJSONString(map));
}
// 异步数据同步之消息发送
contingencyPlanInstanceDataSync(instance);
//contingencyPlanInstanceDataSync(instance);
if ("CONFIRM".equals(buttonState) && ("FIRE_CANCEL".equals(code) || "END_EMERGENCY".equals(code))) {
redisTemplate.delete(RiskSourceServiceImpl.cacheKeyForCanBeRunning());
// 应急处置中断,初始化planStep,json数据
......@@ -425,7 +408,7 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
contingencyPlanInstance.setContent(operateJson);
ContingencyPlanInstance instance = repository.save(contingencyPlanInstance);
// 异步数据同步之消息发送
contingencyPlanInstanceDataSync(instance);
// contingencyPlanInstanceDataSync(instance);
}
}
......@@ -434,7 +417,7 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
public void updateStep(String step, String batchNo) {
int update = iContingencyOriginalDataDao.updateByButtonStep(step, batchNo);
// 异步数据同步之消息发送
contingencyOriginalDataDataSync(batchNo, update);
// contingencyOriginalDataDataSync(batchNo, update);
}
@Override
......@@ -454,20 +437,13 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
map.put("startUserName", startUserName);
}
map.put("buttonJson", buttonJson);
map.put("appKey", RequestContext.getAppKey());
map.put("token", RequestContext.getToken());
map.put("product", RequestContext.getProduct());
if (StringUtils.isBlank(token) || StringUtils.isBlank(product)) {
Toke serverToken = remoteSecurityService.getServerToken();
map.put("token", serverToken.getToke());
map.put("product", serverToken.getProduct());
map.put("appKey", serverToken.getAppKey());
} else {
map.put("token", token);
map.put("product", product);
map.put("appKey", appKey);
}
fireQueue.addLast(map);
//应急指挥给总部推送消息
sendPlanAlarm(batchNo, buttonCode);
// sendPlanAlarm(batchNo, buttonCode);
return Optional.empty();
}
......@@ -589,7 +565,6 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
result.put("msgContext", Collections.EMPTY_MAP);
result.put("msgType", "refreshRecord");
webMqttComponent.publish(topic, JSON.toJSONString(result));
// webMqttComponent.publish(topic, "{\"msgType\":\"refreshRecord\"}");
fireQueue.removeFirst();
}
}
......
......@@ -34,7 +34,7 @@
<dependency>
<groupId>com.yeejoin</groupId>
<artifactId>amos-component-rule</artifactId>
<version>1.7.8-SNAPSHOT</version>
<version>1.8.5-SNAPSHOT</version>
</dependency>
<dependency>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment