Commit 71c46ac9 authored by 刘林's avatar 刘林

Merge remote-tracking branch 'origin/develop_dl' into develop_dl

parents 85388bb5 dd88276d
...@@ -78,7 +78,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -78,7 +78,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
private static Map<String, TemperatureAlarmDto> temperatureMap = new HashMap<>(); private static Map<String, TemperatureAlarmDto> temperatureMap = new HashMap<>();
static IEquipmentSpecificIndexService equipmentSpecificIndexService; static IEquipmentSpecificIndexService equipmentSpecificIndexService;
@Value("${iot.async.flag}")
private boolean iotAsyncExecutorFlag;
@Autowired @Autowired
public void setEquipmentSpecificIndexService(IEquipmentSpecificIndexService equipmentSpecificIndexService) { public void setEquipmentSpecificIndexService(IEquipmentSpecificIndexService equipmentSpecificIndexService) {
MqttReceiveServiceImpl.equipmentSpecificIndexService = equipmentSpecificIndexService; MqttReceiveServiceImpl.equipmentSpecificIndexService = equipmentSpecificIndexService;
...@@ -2547,29 +2548,32 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2547,29 +2548,32 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@PostConstruct @PostConstruct
public void iotAsyncExecutor() { public void iotAsyncExecutor() {
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor(); if(iotAsyncExecutorFlag) {
// 设置核心线程数 System.out.println("-----------------iotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlag");
int length = Runtime.getRuntime().availableProcessors(); ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor();
int size = Math.max(length, 80); // 设置核心线程数
workExecutor.setCorePoolSize(size * 2); int length = Runtime.getRuntime().availableProcessors();
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size); int size = Math.max(length, 80);
// 设置最大线程数 workExecutor.setCorePoolSize(size * 2);
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize()); log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
//配置队列大小 // 设置最大线程数
workExecutor.setQueueCapacity(Integer.MAX_VALUE); workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
// 设置线程活跃时间(秒) //配置队列大小
workExecutor.setKeepAliveSeconds(60); workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置默认线程名称 // 设置线程活跃时间(秒)
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-"); workExecutor.setKeepAliveSeconds(60);
// 等待所有任务结束后再关闭线程池 // 设置默认线程名称
//当调度器shutdown被调用时,等待当前被调度的任务完成 workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
workExecutor.setWaitForTasksToCompleteOnShutdown(true); // 等待所有任务结束后再关闭线程池
//执行初始化 //当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.initialize(); workExecutor.setWaitForTasksToCompleteOnShutdown(true);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务 //执行初始化
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 workExecutor.initialize();
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // rejection-policy:当pool已经达到max size的时候,如何处理新任务
this.dataExecutor = workExecutor; // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
}
} }
} }
...@@ -79,9 +79,7 @@ public class ThreadCarMileageTreatment extends Thread { ...@@ -79,9 +79,7 @@ public class ThreadCarMileageTreatment extends Thread {
&& Obj.getDoubleValue("FireCar_Latitude") != 0) { && Obj.getDoubleValue("FireCar_Latitude") != 0) {
filterList.add(list.get(j)); filterList.add(list.get(j));
// 获取第一个不为空的坐标 // 获取第一个不为空的坐标
if (lastObj == null) { lastObj = Obj;
lastObj = Obj;
}
} }
} }
Log.info("----------------------------------------lastobj----------------------"+lastObj.toJSONString()); Log.info("----------------------------------------lastobj----------------------"+lastObj.toJSONString());
...@@ -116,7 +114,7 @@ public class ThreadCarMileageTreatment extends Thread { ...@@ -116,7 +114,7 @@ public class ThreadCarMileageTreatment extends Thread {
} }
last.setTravel(new BigDecimal(travel / 1000).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue()); last.setTravel(new BigDecimal(travel / 1000).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue());
Log.info("----------------------------------------last----------------------"+lastObj.toJSONString()); Log.info("----------------------------------------last----------------------"+lastObj.toJSONString());
wlCarMileageServiceImpl.updateById(last); // wlCarMileageServiceImpl.updateById(last);
} }
} }
} }
......
...@@ -149,4 +149,5 @@ spring.influx.database=iot_platform ...@@ -149,4 +149,5 @@ spring.influx.database=iot_platform
spring.influx.retention_policy=default spring.influx.retention_policy=default
spring.influx.retention_policy_time=30d spring.influx.retention_policy_time=30d
spring.influx.actions=10000 spring.influx.actions=10000
spring.influx.bufferLimit=20000 spring.influx.bufferLimit=20000
\ No newline at end of file iot.async.flag = false
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment