Commit 1a6a52de authored by 高建强's avatar 高建强

item:稳压泵指标信息缓存提交

parent 3f13d148
...@@ -21,4 +21,6 @@ public class IotDataVO { ...@@ -21,4 +21,6 @@ public class IotDataVO {
* iot数据上报唯一id * iot数据上报唯一id
*/ */
private String traceId; private String traceId;
private String createdTime;
} }
package com.yeejoin.equipmanage.service;
import com.yeejoin.equipmanage.common.vo.IotDataVO;
import java.util.List;
import java.util.Map;
/**
* @author GaoJianqiang
* @date 2023/02/15 16:20
*/
public interface IPressurePumpService {
/**
* redis缓存物联采集数据,内部读取JSON配置指定有效期
* @param message
* @param iotDatalist
*/
void saveDataToRedis(List<IotDataVO> iotDatalist);
/**
* 根据key,模糊查询所有的redis缓存数据
* @param key
* @return
*/
List<IotDataVO> getDataToRedis(String key);
/**
* 获取指标配置JSON信息集合
* @return
*/
List<Map> getNameKeyInfoList();
}
...@@ -193,6 +193,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -193,6 +193,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Autowired @Autowired
private IEquipmentService equipmentService; private IEquipmentService equipmentService;
@Autowired
private IPressurePumpService pressurePumpService;
@Value("${equipManage.name}") @Value("${equipManage.name}")
private String serverName; private String serverName;
...@@ -313,6 +316,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -313,6 +316,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage())); log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
} }
// redis缓存指定指标、指定时长物联数据
pressurePumpService.saveDataToRedis(iotDatalist);
if (!StringUtils.isEmpty(traceId)) { if (!StringUtils.isEmpty(traceId)) {
String finalTraceId = traceId; String finalTraceId = traceId;
List<IotDataVO> collect = iotDatalist.stream().map(x -> { List<IotDataVO> collect = iotDatalist.stream().map(x -> {
......
package com.yeejoin.equipmanage.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.equipmanage.common.vo.IotDataVO;
import com.yeejoin.equipmanage.service.IPressurePumpService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
@Service
public class PressurePumpServiceImpl implements IPressurePumpService {
@Autowired
private RedisUtils redisUtils;
@Value("classpath:/json/nameKeyInfo.json")
private Resource nameKeyInfo;
// 稳压泵标识
private static final String PRESSURE_PUMP = "PressurePump";
@Override
@Async
public void saveDataToRedis(List<IotDataVO> iotDatalist) {
// 获取配置JSON信息集合
List<Map> infoList = getNameKeyInfoList();
if (CollectionUtils.isNotEmpty(infoList)) {
// 过滤出稳压泵标识的信息集合
List<Map> pumpInfoList = infoList.stream().filter(x -> PRESSURE_PUMP.equals(x.get("code"))).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(pumpInfoList)) {
Map map = pumpInfoList.get(0);
String nameKey = map.get("nameKey").toString();
int expire = Integer.parseInt(map.get("expire").toString());
String nowString = DateUtils.getDateNowString();
long timeMillis = System.currentTimeMillis();
for (IotDataVO vo : iotDatalist) {
String key = vo.getKey();
if (nameKey.contains(vo.getKey())) {
vo.setCreatedTime(nowString);
redisUtils.set(String.join(":", PRESSURE_PUMP, key, String.valueOf(timeMillis)), JSONObject.toJSONString(vo), expire);
}
List<IotDataVO> dataToRedis = getDataToRedis(key);
}
}
}
}
@Override
public List<IotDataVO> getDataToRedis(String key) {
List<IotDataVO> list = new ArrayList<>();
Set<String> keys = redisUtils.getKeys(String.join(":", PRESSURE_PUMP, key));
if (CollectionUtils.isNotEmpty(keys)) {
keys.forEach(x -> {
list.add(JSON.parseObject(redisUtils.get(x).toString(), IotDataVO.class));
});
// 时间倒序排序
list.sort((t1, t2) -> t2.getCreatedTime().compareTo(t1.getCreatedTime()));
}
return list;
}
@Override
public List<Map> getNameKeyInfoList() {
String json = null;
try {
json = IOUtils.toString(nameKeyInfo.getInputStream(), java.lang.String.valueOf(StandardCharsets.UTF_8));
} catch (IOException e) {
log.error("获取指标JSON配置信息失败:{}", e.getMessage());
}
return JSONObject.parseArray(json, Map.class);
}
}
[
{
"name": "稳压泵",
"code": "PressurePump",
"nameKey": "FHS_PressurePump_Start",
"expire": 14400
}
]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment