Commit 0ea7e3df authored by 刘林's avatar 刘林

fix(equip):江西数据中心添加多线程以及缓存预热处理消息

parent 47f280da
......@@ -43,6 +43,11 @@ public class RedisKey {
/** 企业用户注册前缀 */
public static final String FLC_USER_TEL = "flc_tel_";
/**
* 装备指标Key值
*/
public static final String EQUIP_INDEX_ADDRESS = "equip_index_address";
/** 驼峰转下划线(简单写法,效率低于 ) */
public static String humpToLine(String str) {
return str.replaceAll("[A-Z]", "_$0").toLowerCase();
......
......@@ -69,4 +69,13 @@ public class EquipmentIndexVO {
@ApiModelProperty(value = "指标枚举")
private String valueEnum;
@ApiModelProperty(value = "信号的索引键key,用于唯一索引信号")
private String indexAddress;
@ApiModelProperty(value = "测点类型,analog/state")
private String dataType;
@ApiModelProperty(value = "网关标识")
private String gatewayId;
}
package com.yeejoin.equipmanage.config;
import com.yeejoin.amos.boot.biz.common.utils.RedisKey;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.equipmanage.common.entity.vo.EquipmentIndexVO;
import com.yeejoin.equipmanage.mapper.EquipmentSpecificIndexMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author LiuLin
* @date 2023/6/15
* @apiNote
*/
@Component
@Slf4j
public class EquipmentIndexCacheRunner implements CommandLineRunner {
@Resource
private EquipmentSpecificIndexMapper equipmentSpecificIndexMapper;
@Resource
private RedisUtils redisUtils;
@Override
public void run(String... args) throws Exception {
log.info(">>服务启动执行,执行预加载数据等操作");
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS);
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null)
.collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity()));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
}
}
......@@ -76,10 +76,12 @@ public class EquipmentIotMqttReceiveConfig {
public void setEquipmentSpecificMapper(EquipmentSpecificMapper equipmentSpecificMapper) {
this.equipmentSpecificMapper = equipmentSpecificMapper;
}
@Autowired
public void setMqttEventReceiveService(MqttEventReceiveService mqttEventReceiveService) {
this.mqttEventReceiveService = mqttEventReceiveService;
}
@Autowired
public void setiSyncDataService(ISyncDataService iSyncDataService) {
this.iSyncDataService = iSyncDataService;
......@@ -156,10 +158,10 @@ public class EquipmentIotMqttReceiveConfig {
mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
} else if (dataType.equals("event") && StringUtil.isNotEmpty(msg)) {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
}else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttRomaMessage(topic,msg);
}else if (dataType.equals("perspective") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttIotMessage(topic,msg);
} else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handlerMqttRomaMessage(topic, msg);
} else if (dataType.equals("perspective") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handlerIotMessage(topic, msg);
} else if (dataType.equals("trigger") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handleDataToRiskModel(topic, msg);
}
......
......@@ -13,6 +13,7 @@ public interface MqttReceiveService {
/**
* 增量数据处理
*
* @param topic 主题
* @param message 消息内容
*/
......@@ -20,6 +21,7 @@ public interface MqttReceiveService {
/**
* 处理交换站消息数据
*
* @param topic 主题
* @param message 消息内容
*/
......@@ -27,14 +29,25 @@ public interface MqttReceiveService {
/**
* 处理Iot消息数据
*
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttIotMessage(String topic, String message);
/**
* 中心级接收消息发送至消息服务
*
* @param topic
* @param message
*/
void handleDataToRiskModel(String topic, String message);
/**
* 处理Iot消息数据
*
* @param topic 主题
* @param message 消息内容
*/
void handlerIotMessage(String topic, String message);
}
......@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.boot.biz.common.utils.RedisKey;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.amos.feign.systemctl.model.MessageModel;
......@@ -34,6 +35,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
......@@ -46,11 +48,14 @@ import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import org.typroject.tyboot.core.restful.exception.instance.BadRequest;
import org.typroject.tyboot.core.restful.utils.ResponseModel;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
......@@ -119,6 +124,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Autowired
private InfluxDbConnection influxDbConnection;
private Executor dataExecutor = new ThreadPoolTaskExecutor();
/**
* 泡沫罐KEY
*/
......@@ -628,8 +635,62 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerIotMessage(String topic, String message) {
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
dataExecutor.execute(new Runnable() {
@Override
public void run() {
log.info("接收到iot消息: {}", message);
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String deviceCode = jsonObject.getString("deviceCode");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
tagsMap.put("key", key);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("address", indexAddress);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
//保存influxDB库
influxDbConnection.insert("iot_data", tagsMap, fieldsMap);
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
} finally {
}
}
});
}
/**
* 废弃代码,暂时不用,后期删除
* @param topic 主题
* @param message 消息内容
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIotMessage(String topic, String message) {
//influxdb
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
......@@ -648,24 +709,11 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
String deviceCode = jsonObject.getString("deviceCode");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress, gatewayId);
if (equipmentSpeIndex == null) {
return;
}
equipmentSpeIndex.setValue(value);
equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpeIndex.getValueEnum()));
equipmentSpeIndex.setEquipmentType(topicEntity.getType());
equipmentSpeIndex.setUpdateDate(new Date());
equipmentSpeIndex.setGatewayId(gatewayId);
equipmentSpeIndex.setDataType(dataType);
equipmentSpeIndex.setTraceId(traceId);
equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
//更新装备性能指标
//equipmentSpecificIndexService.updateById(equipmentSpeIndex);
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
if (equipmentIndexVOMap.get(key) != null) {
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
tagsMap.put("key", indexAddress + "_" + gatewayId);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
......@@ -676,16 +724,34 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId());
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("equipmentIndexKey", equipmentSpeIndex.getEquipmentIndexKey());
fieldsMap.put("isAlarm", equipmentSpeIndex.getIsAlarm().toString());
fieldsMap.put("unit", equipmentSpeIndex.getIndexUnitName());
// fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
// fieldsMap.put("equipmentIndexKey", equipmentSpeIndex.getEquipmentIndexKey());
// fieldsMap.put("isAlarm", equipmentSpeIndex.getIsAlarm().toString());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
//保存influxDB库
influxDbConnection.insert("iot_data", tagsMap, fieldsMap);
}
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress, gatewayId);
if (equipmentSpeIndex == null) {
return;
}
equipmentSpeIndex.setValue(value);
equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpeIndex.getValueEnum()));
equipmentSpeIndex.setEquipmentType(topicEntity.getType());
equipmentSpeIndex.setUpdateDate(new Date());
equipmentSpeIndex.setGatewayId(gatewayId);
equipmentSpeIndex.setDataType(dataType);
equipmentSpeIndex.setTraceId(traceId);
equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
//更新装备性能指标
//equipmentSpecificIndexService.updateById(equipmentSpeIndex);
QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
......@@ -2473,4 +2539,32 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
}
}
@PostConstruct
public void iotAsyncExecutor() {
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
int length = Runtime.getRuntime().availableProcessors();
int size = Math.max(length, 80);
workExecutor.setCorePoolSize(size * 2);
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
// 设置最大线程数
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
//配置队列大小
workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程活跃时间(秒)
workExecutor.setKeepAliveSeconds(60);
// 设置默认线程名称
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
// 等待所有任务结束后再关闭线程池
//当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
workExecutor.initialize();
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
}
}
......@@ -324,10 +324,13 @@
ei.`name` AS perfQuotaName,
si.`value`,
ei.is_iot,
ei.unit AS unitName,
si.unit AS unitName,
ei.sort_num,
si.create_date,
si.update_date
si.update_date,
si.index_address,
si.gateway_id,
si.data_type
FROM
wl_equipment_specific_index si
LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment