Commit 98a7ca1e authored by tangwei's avatar tangwei

修改bug

parent 60dd03cb
...@@ -156,7 +156,8 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -156,7 +156,8 @@ public class EquipmentIotMqttReceiveConfig {
return message -> { return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
//异步处理
EquipmentIotMqttReceiveConfig controllerProxy = SpringUtils.getBean(EquipmentIotMqttReceiveConfig.class);
String msg = message.getPayload().toString(); String msg = message.getPayload().toString();
int endIndex = topic.lastIndexOf("/"); int endIndex = topic.lastIndexOf("/");
if (endIndex > 0 && StringUtil.isNotEmpty(message)) { if (endIndex > 0 && StringUtil.isNotEmpty(message)) {
...@@ -164,14 +165,13 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -164,14 +165,13 @@ public class EquipmentIotMqttReceiveConfig {
if (dataType.equals("property") && StringUtil.isNotEmpty(msg)) { if (dataType.equals("property") && StringUtil.isNotEmpty(msg)) {
//mqttReceiveService.handlerMqttIncrementMessage(topic, msg); //mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
//异步处理
EquipmentIotMqttReceiveConfig controllerProxy = SpringUtils.getBean(EquipmentIotMqttReceiveConfig.class);
controllerProxy.namePrefixhandlerMqttIncrementMessage(topic, msg); controllerProxy.namePrefixhandlerMqttIncrementMessage(topic, msg);
} else if (dataType.equals("event") && StringUtil.isNotEmpty(msg)) { } else if (dataType.equals("event") && StringUtil.isNotEmpty(msg)) {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg); // mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
controllerProxy.namePrefixhandlerMqttIncrementMessageEven(topic, msg);
} }
} }
}; };
...@@ -181,7 +181,10 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -181,7 +181,10 @@ public class EquipmentIotMqttReceiveConfig {
this.mqttReceiveService = mqttReceiveService; this.mqttReceiveService = mqttReceiveService;
} }
@Async("equipAsyncExecutor")
public void namePrefixhandlerMqttIncrementMessageEven(String topic, String msg) {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
}
@Async("equipAsyncExecutor") @Async("equipAsyncExecutor")
public void namePrefixhandlerMqttIncrementMessage(String topic, String msg) { public void namePrefixhandlerMqttIncrementMessage(String topic, String msg) {
...@@ -192,18 +195,10 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -192,18 +195,10 @@ public class EquipmentIotMqttReceiveConfig {
RLock lock = redisson.getLock(topic); RLock lock = redisson.getLock(topic);
try { try {
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。 //拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
boolean flag= lock.tryLock(2, 20, TimeUnit.SECONDS); boolean flag= lock.tryLock(10, 20, TimeUnit.SECONDS);
System.out.println("msg==================================="+flag);
for (int i = 0; i <= 1000000; i++)
{
System.out.println("msg==================================="+i);
}
//为了防止重复
mqttReceiveService.handlerMqttIncrementMessage(topic, msg); mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
System.out.println("msg==================================="+msg);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment