Commit 749ff636 authored by tangwei's avatar tangwei

修改bug

parent fa8df9b9
package com.yeejoin.equipmanage.config;
import com.yeejoin.equipmanage.common.entity.vo.EquipmentSpecificVo;
import com.yeejoin.equipmanage.common.enums.AnalysisReportEnum;
import com.yeejoin.equipmanage.common.enums.ConfigPageTopicEnum;
import com.yeejoin.equipmanage.common.enums.TopicEnum;
import com.yeejoin.equipmanage.common.utils.DateUtils;
import com.yeejoin.equipmanage.common.utils.SpringUtils;
import com.yeejoin.equipmanage.common.utils.StringUtil;
import com.yeejoin.equipmanage.controller.EquipmentDetailController;
import com.yeejoin.equipmanage.mapper.EquipmentSpecificMapper;
import com.yeejoin.equipmanage.service.IEquipmentSpecificSerivce;
import com.yeejoin.equipmanage.service.ISyncDataService;
import com.yeejoin.equipmanage.service.MqttEventReceiveService;
import com.yeejoin.equipmanage.service.MqttReceiveService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
......@@ -25,9 +30,12 @@ import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannel
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.annotation.Async;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author keyong
......@@ -60,6 +68,12 @@ public class EquipmentIotMqttReceiveConfig {
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout;
@Autowired(required = false)
org.redisson.api.RedissonClient redisson;
@Value("${spring.redis.mode}")
private String cluster;
@Autowired
EquipmentSpecificMapper equipmentSpecificMapper;
......@@ -140,15 +154,24 @@ public class EquipmentIotMqttReceiveConfig {
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
int endIndex = topic.lastIndexOf("/");
if (endIndex > 0 && StringUtil.isNotEmpty(message)) {
String dataType = topic.substring(endIndex + 1);
if (dataType.equals("property") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
//mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
//异步处理
EquipmentIotMqttReceiveConfig controllerProxy = SpringUtils.getBean(EquipmentIotMqttReceiveConfig.class);
controllerProxy.handlerMqttIncrementMessage(topic, msg);
} else if (dataType.equals("event") && StringUtil.isNotEmpty(msg)) {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
}
}
};
......@@ -157,4 +180,37 @@ public class EquipmentIotMqttReceiveConfig {
public void setMqttReceiveService(MqttReceiveService mqttReceiveService) {
this.mqttReceiveService = mqttReceiveService;
}
@Async("equipAsyncExecutor")
public void handlerMqttIncrementMessage(String topic, String msg) {
if(cluster.equals("cluster")){
//不同设备加锁,防止消息顺序错乱,此处加锁高并发下小概率会出现顺序错乱,
//为了减少加锁前耗时,引起后续强锁失败,直接用topic(按每个设备锁)减少顺序错乱概率。
RLock lock = redisson.getLock(topic);
try {
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //释放锁
}
}else {
//捕获处理异常
try {
mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
......@@ -49,8 +49,8 @@ public class AnalysisReportSchedulerJob {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_dayReport_key")) {
redisUtils.set(jobName + "_dayReport_key", "1");//增加标识
......@@ -100,8 +100,8 @@ public class AnalysisReportSchedulerJob {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_weekReport_key")) {
redisUtils.set(jobName + "_weekReport_key", "1");//增加标识
......@@ -137,8 +137,8 @@ public class AnalysisReportSchedulerJob {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_monthReport_key")) {
redisUtils.set(jobName + "_monthReport_key", "1");//增加标识
......
......@@ -241,8 +241,8 @@ public class View3dController extends AbstractBaseController {
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
System.out.println("获取锁成功============");
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
......
......@@ -68,8 +68,8 @@ public class QuoteCountFlushTiming {
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
System.out.println("获取锁成功============");
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
......@@ -117,8 +117,8 @@ public class QuoteCountFlushTiming {
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
System.out.println("获取锁成功============");
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
......
......@@ -315,8 +315,8 @@ public class JobService implements IJobService {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......@@ -450,8 +450,8 @@ public class JobService implements IJobService {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......
......@@ -47,7 +47,8 @@ public class PlanTaskJob {
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
System.out.println("获取锁成功============");
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
......@@ -80,8 +81,8 @@ public class PlanTaskJob {
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
System.out.println("获取锁成功============");
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
......
......@@ -431,8 +431,8 @@ public class PlanTaskController extends AbstractBaseController {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_pushCarData_key")) {
redisUtils.set(jobName + "_pushCarData_key", "1");//增加标识
......@@ -469,8 +469,8 @@ public class PlanTaskController extends AbstractBaseController {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_taskMessage_key")) {
redisUtils.set(jobName + "_taskMessage_key", "1");//增加标识
......
......@@ -667,8 +667,8 @@ public class JobService implements IJobService {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if(!redisUtils.hasKey(jobName+"_redisson_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......@@ -808,8 +808,8 @@ public class JobService implements IJobService {
if("cluster".equals(cluster)) {
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
System.out.println("获取锁成功============");
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......@@ -868,8 +868,8 @@ public class JobService implements IJobService {
if("cluster".equals(cluster)) {
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......
......@@ -47,8 +47,8 @@ public class LatentDanerScheduled {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_updateDangerStateOfOvertime_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......
......@@ -311,8 +311,8 @@ public class JobService implements IJobService {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName + "_redisson");
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......@@ -444,8 +444,8 @@ public class JobService implements IJobService {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName + "_redisson");
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......@@ -506,8 +506,8 @@ public class JobService implements IJobService {
//为了便于区分key,增加后缀_redisson
RLock lock = redisson.getLock(jobName + "_redisson");
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
redisUtils.set(jobName + "_redisson_key", "1");//增加标识
......
......@@ -46,8 +46,8 @@ public class PlanTaskJob {
RLock lock = redisson.getLock(jobName);
try {
//拿锁失败10停止尝试, 任务执行超时默认续30s 每个10秒续到30
lock.tryLock(10, TimeUnit.SECONDS);
//拿锁失败10停止尝试,20秒后直接释放锁 无论是返回 true 还是 false,都会继续执行之后的代码。
lock.tryLock(10, 20, TimeUnit.SECONDS);
System.out.println("获取锁成功============");
//为了防止重复
if (!redisUtils.hasKey(jobName + "_redisson_key")) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment