Commit dd88276d authored by caotao's avatar caotao

新增数据透传开关

parent fe5471c1
...@@ -82,7 +82,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -82,7 +82,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
private static Map<String, TemperatureAlarmDto> temperatureMap = new HashMap<>(); private static Map<String, TemperatureAlarmDto> temperatureMap = new HashMap<>();
static IEquipmentSpecificIndexService equipmentSpecificIndexService; static IEquipmentSpecificIndexService equipmentSpecificIndexService;
@Value("${iot.async.flag}")
private boolean iotAsyncExecutorFlag;
@Autowired @Autowired
public void setEquipmentSpecificIndexService(IEquipmentSpecificIndexService equipmentSpecificIndexService) { public void setEquipmentSpecificIndexService(IEquipmentSpecificIndexService equipmentSpecificIndexService) {
MqttReceiveServiceImpl.equipmentSpecificIndexService = equipmentSpecificIndexService; MqttReceiveServiceImpl.equipmentSpecificIndexService = equipmentSpecificIndexService;
...@@ -2548,29 +2549,32 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2548,29 +2549,32 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@PostConstruct @PostConstruct
public void iotAsyncExecutor() { public void iotAsyncExecutor() {
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor(); if(iotAsyncExecutorFlag) {
// 设置核心线程数 System.out.println("-----------------iotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlag");
int length = Runtime.getRuntime().availableProcessors(); ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor();
int size = Math.max(length, 80); // 设置核心线程数
workExecutor.setCorePoolSize(size * 2); int length = Runtime.getRuntime().availableProcessors();
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size); int size = Math.max(length, 80);
// 设置最大线程数 workExecutor.setCorePoolSize(size * 2);
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize()); log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
//配置队列大小 // 设置最大线程数
workExecutor.setQueueCapacity(Integer.MAX_VALUE); workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
// 设置线程活跃时间(秒) //配置队列大小
workExecutor.setKeepAliveSeconds(60); workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置默认线程名称 // 设置线程活跃时间(秒)
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-"); workExecutor.setKeepAliveSeconds(60);
// 等待所有任务结束后再关闭线程池 // 设置默认线程名称
//当调度器shutdown被调用时,等待当前被调度的任务完成 workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
workExecutor.setWaitForTasksToCompleteOnShutdown(true); // 等待所有任务结束后再关闭线程池
//执行初始化 //当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.initialize(); workExecutor.setWaitForTasksToCompleteOnShutdown(true);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务 //执行初始化
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 workExecutor.initialize();
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // rejection-policy:当pool已经达到max size的时候,如何处理新任务
this.dataExecutor = workExecutor; // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
}
} }
} }
...@@ -149,4 +149,5 @@ spring.influx.database=iot_platform ...@@ -149,4 +149,5 @@ spring.influx.database=iot_platform
spring.influx.retention_policy=default spring.influx.retention_policy=default
spring.influx.retention_policy_time=30d spring.influx.retention_policy_time=30d
spring.influx.actions=10000 spring.influx.actions=10000
spring.influx.bufferLimit=20000 spring.influx.bufferLimit=20000
\ No newline at end of file iot.async.flag = false
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment