Commit e0cd0a5a authored by KeYong's avatar KeYong

Merge remote-tracking branch 'origin/dev_upgrade' into dev_upgrade

parents 2de2f32e 2eac6e81
...@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils; ...@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
...@@ -41,6 +42,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager ...@@ -41,6 +42,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import org.typroject.tyboot.core.foundation.utils.Bean; import org.typroject.tyboot.core.foundation.utils.Bean;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil; import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
...@@ -799,8 +801,11 @@ public class ContingencyPlanServiceImpl implements IContingencyPlanService { ...@@ -799,8 +801,11 @@ public class ContingencyPlanServiceImpl implements IContingencyPlanService {
@Override @Override
public void subscribeTopic() { public void subscribeTopic() {
try { try {
emqKeeper.getMqttClient().subscribe(DELETE_SYNC_PLAN_DOC, (s, mqttMessage) -> { emqKeeper.subscript(DELETE_SYNC_PLAN_DOC, 1, new EmqxListener() {
byte[] payload = mqttMessage.getPayload(); @Override
public void processMessage(String topic, MqttMessage message) {
byte[] payload = message.getPayload();
try { try {
List<Long> ids = (List<Long>) ClazzUtils.deserializableObject(payload); List<Long> ids = (List<Long>) ClazzUtils.deserializableObject(payload);
if (!ValidationUtil.isEmpty(ids)) { if (!ValidationUtil.isEmpty(ids)) {
...@@ -809,13 +814,16 @@ public class ContingencyPlanServiceImpl implements IContingencyPlanService { ...@@ -809,13 +814,16 @@ public class ContingencyPlanServiceImpl implements IContingencyPlanService {
} catch (Exception e) { } catch (Exception e) {
logger.error("预案文档删除同步出错", e); logger.error("预案文档删除同步出错", e);
} }
}
}); });
} catch (MqttException e) { } catch (Exception e) {
logger.fatal("订阅文档删除同步消息失败,资源删除或取消无法同步", e); logger.fatal("订阅文档删除同步消息失败,资源删除或取消无法同步", e);
} }
try { try {
emqKeeper.getMqttClient().subscribe(DELETE_SYNC_PLAN_RULE, (s, mqttMessage) -> { emqKeeper.subscript(DELETE_SYNC_PLAN_RULE, 1, new EmqxListener() {
byte[] payload = mqttMessage.getPayload(); @Override
public void processMessage(String topic, MqttMessage message) {
byte[] payload = message.getPayload();
try { try {
String[] ids = new String(payload).split(","); String[] ids = new String(payload).split(",");
if (!ValidationUtil.isEmpty(ids)) { if (!ValidationUtil.isEmpty(ids)) {
...@@ -824,13 +832,16 @@ public class ContingencyPlanServiceImpl implements IContingencyPlanService { ...@@ -824,13 +832,16 @@ public class ContingencyPlanServiceImpl implements IContingencyPlanService {
} catch (Exception e) { } catch (Exception e) {
logger.error("预案规则删除同步出错", e); logger.error("预案规则删除同步出错", e);
} }
}
}); });
} catch (MqttException e) { } catch (Exception e) {
logger.fatal("订阅规则删除同步消息失败,资源删除或取消无法同步", e); logger.fatal("订阅规则删除同步消息失败,资源删除或取消无法同步", e);
} }
try { try {
emqKeeper.getMqttClient().subscribe(VIEW_3D_OPEN_STATUS, (s, mqttMessage) -> { emqKeeper.subscript(VIEW_3D_OPEN_STATUS, 1, new EmqxListener() {
Map msg = JSON.parseObject(mqttMessage.toString()); @Override
public void processMessage(String topic, MqttMessage message) {
Map msg = JSON.parseObject(message.toString());
if (msg.containsKey("status") && msg.containsKey("seq")) { if (msg.containsKey("status") && msg.containsKey("seq")) {
String seq = msg.get("seq").toString(); String seq = msg.get("seq").toString();
String status = msg.get("status").toString(); String status = msg.get("status").toString();
...@@ -841,8 +852,9 @@ public class ContingencyPlanServiceImpl implements IContingencyPlanService { ...@@ -841,8 +852,9 @@ public class ContingencyPlanServiceImpl implements IContingencyPlanService {
redisTemplate.opsForValue().set(key, status); redisTemplate.opsForValue().set(key, status);
} }
} }
}
}); });
} catch (MqttException e) { } catch (Exception e) {
logger.fatal("订阅规则删除同步消息失败,资源删除或取消无法同步", e); logger.fatal("订阅规则删除同步消息失败,资源删除或取消无法同步", e);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment