Commit 87a890bc authored by 高建强's avatar 高建强

item:稳压泵统计与分析,多站融合

parent 076ca9aa
......@@ -18,18 +18,21 @@ public interface IPressurePumpService {
* redis缓存物联采集数据,内部读取JSON配置指定有效期
*
* @param iotDatalist
* @param topic
* @param iotCode
* @param bizOrgCode
*/
void saveDataToRedis(List<IotDataVO> iotDatalist, String topic);
void saveDataToRedis(List<IotDataVO> iotDatalist, String iotCode, String bizOrgCode);
/**
* 根据nameKey,模糊查询所有的redis缓存数据
*
* @param infoCode
* @param nameKey
* @param iotCode
* @param bizOrgCode
* @return
*/
List<IotDataVO> getDataToRedis(String infoCode, String nameKey, String iotCode);
List<IotDataVO> getDataToRedis(String infoCode, String nameKey, String iotCode, String bizOrgCode);
/**
* 获取指标配置JSON信息集合
......@@ -87,13 +90,15 @@ public interface IPressurePumpService {
/**
* 根据时间范围,获取redis指定指标或指定指标设备的缓存数据
*
* @param infoCode
* @param nameKey
* @param iotCode
* @param startDate
* @param endDate
* @param bizOrgCode
*/
List<IotDataVO> getDataToRedisByDateBetween(String infoCode, String nameKey, String iotCode, Date startDate, Date endDate);
List<IotDataVO> getDataToRedisByDateBetween(String infoCode, String nameKey, String iotCode, Date startDate, Date endDate, String bizOrgCode);
/**
* 根据时间范围获取iot物联数据集合
......@@ -137,6 +142,7 @@ public interface IPressurePumpService {
/**
* 获取iot指标统计数据
*
* @param startTime
* @param endTime
* @param infoCode
......@@ -146,9 +152,10 @@ public interface IPressurePumpService {
* @param key
* @param fieldKey
* @param expire
* @param bizOrgCode
* @return
*/
List<PressurePumpCountVo> getIotCountData(String startTime, String endTime, String infoCode, String countRedisKey, String prefix, String suffix, String key, String fieldKey, long expire);
List<PressurePumpCountVo> getIotCountData(String startTime, String endTime, String infoCode, String countRedisKey, String prefix, String suffix, String key, String fieldKey, long expire, String bizOrgCode);
/**
* 获取稳压泵时间范围统计数据
......
......@@ -328,7 +328,7 @@ public class EmergencyServiceImpl implements IEmergencyService {
dataMap.put("data", dataList);
yDataList.add(dataMap);
}
List<String> collect = timeList.stream().map(x -> x.substring(4)).collect(Collectors.toList());
List<String> collect = timeList.stream().map(x -> x.substring(5)).collect(Collectors.toList());
map.put("xData", collect);
map.put("yData", yDataList);
}
......@@ -366,7 +366,7 @@ public class EmergencyServiceImpl implements IEmergencyService {
Date endDate = DateUtils.convertStrToDate(endTime, DateUtils.DATE_TIME_PATTERN);
if (DateUtils.dateCompare(startDate, beforeDate) >= 0) {
// 获取redis稳压泵缓存数据,默认JSON配置最近4小时
List<IotDataVO> redisDataList = pressurePumpService.getDataToRedisByDateBetween(PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), pressurePumpStart, iotCode.toString(), startDate, endDate);
List<IotDataVO> redisDataList = pressurePumpService.getDataToRedisByDateBetween(PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), pressurePumpStart, iotCode.toString(), startDate, endDate, bizOrgCode);
if (!CollectionUtils.isEmpty(redisDataList)) {
List<Map<String, String>> dataList = new ArrayList<>();
redisDataList.forEach(y -> {
......@@ -482,11 +482,13 @@ public class EmergencyServiceImpl implements IEmergencyService {
for (Map<String, Object> map : pumpInfoList) {
// iot获取数据,返回并存储redis
Object iotCode = map.get("iotCode");
if (!ObjectUtils.isEmpty(iotCode)) {
Object orgCode = map.get("bizOrgCode");
bizOrgCode = StringUtils.isNotBlank(bizOrgCode) ? bizOrgCode : ObjectUtils.isEmpty(orgCode) ? "" : orgCode.toString();
if (!ObjectUtils.isEmpty(iotCode) && !ObjectUtils.isEmpty(bizOrgCode)) {
String iotCodeStr = iotCode.toString();
String prefix = ObjectUtils.isEmpty(iotCode) ? "" : iotCodeStr.substring(0, 8);
String suffix = ObjectUtils.isEmpty(iotCode) ? "" : iotCodeStr.substring(8);
List<PressurePumpCountVo> dataList = pressurePumpService.getIotCountData(startTime, endTime, PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), countRedisKey, prefix, suffix, PressurePumpRelateEnum.IOT_INDEX_VALUE_TRUE.getValue(), pressurePumpStart, countExpire);
List<PressurePumpCountVo> dataList = pressurePumpService.getIotCountData(startTime, endTime, PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), countRedisKey, prefix, suffix, PressurePumpRelateEnum.IOT_INDEX_VALUE_TRUE.getValue(), pressurePumpStart, countExpire, bizOrgCode);
dataMap.put(iotCodeStr, dataList);
}
}
......
......@@ -316,9 +316,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
}
// redis缓存指定指标、指定时长物联数据
pressurePumpService.saveDataToRedis(iotDatalist, topic);
if (!StringUtils.isEmpty(traceId)) {
String finalTraceId = traceId;
List<IotDataVO> collect = iotDatalist.stream().map(x -> {
......@@ -339,7 +336,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
*/
public void realTimeDateProcessing(TopicEntityVo topicEntity, List<IotDataVO> iotDatalist,EquipmentSpecificVo vo) {
String iotCode = topicEntity.getIotCode();
if (EquipAndCarEnum.equip.type.equals(topicEntity.getType())) {
List<EquipmentSpecificIndex> indexList = equipmentSpecificIndexService
.getEquipmentSpeIndexBySpeIotCode(iotCode);
......@@ -347,6 +343,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
return;
}
equipRealTimeDate(iotDatalist, indexList, topicEntity,vo);
String bizOrgCode = indexList.get(0).getBizOrgCode();
// redis缓存指定指标、指定时长物联数据
pressurePumpService.saveDataToRedis(iotDatalist, iotCode, bizOrgCode);
} else {
List<CarProperty> carProperties = carPropertyService.getCarPropListByIotCode(iotCode);
if (ObjectUtils.isEmpty(carProperties)) {
......@@ -354,7 +353,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
carRealTimeDate(iotDatalist, carProperties,topicEntity);
}
}
/**
......
......@@ -2,7 +2,6 @@ package com.yeejoin.equipmanage.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.equipmanage.common.enums.PressurePumpRelateEnum;
......@@ -52,11 +51,11 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
@Override
@Async
public void saveDataToRedis(List<IotDataVO> iotDatalist, String topic) {
public void saveDataToRedis(List<IotDataVO> iotDatalist, String iotCode, String bizOrgCode) {
String pressurePumpValue = PressurePumpRelateEnum.PRESSURE_PUMP.getValue();
// 获取配置JSON信息集合
List<Map> list = getNameKeyInfoList(pressurePumpValue);
if (CollectionUtils.isNotEmpty(list)) {
if (CollectionUtils.isNotEmpty(list) && StringUtils.isNotBlank(iotCode) && StringUtils.isNotBlank(bizOrgCode)) {
Map map = list.get(0);
String nameKey = map.get("nameKey").toString();
int expire = Integer.parseInt(map.get("expire").toString());
......@@ -65,19 +64,17 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
for (IotDataVO vo : iotDatalist) {
String key = vo.getKey();
if (nameKey.contains(vo.getKey())) {
String[] split = topic.split("/");
topic = split.length > 2 ? String.join("", split[0], split[1]) : "";
vo.setCreatedTime(nowString);
redisUtils.set(String.join(":", pressurePumpValue, key, topic, String.valueOf(timeMillis)), JSONObject.toJSONString(vo), expire);
redisUtils.set(String.join(":", pressurePumpValue, bizOrgCode, key, iotCode, String.valueOf(timeMillis)), JSON.toJSONString(vo), expire);
}
}
}
}
@Override
public List<IotDataVO> getDataToRedis(String infoCode, String nameKey, String iotCode) {
public List<IotDataVO> getDataToRedis(String infoCode, String nameKey, String iotCode, String bizOrgCode) {
List<IotDataVO> list = new ArrayList<>();
Set<String> keys = redisUtils.getKeys(String.join(":", infoCode, nameKey, StringUtils.isNotEmpty(iotCode) ? iotCode : ""));
Set<String> keys = redisUtils.getKeys(String.join(":", infoCode, bizOrgCode, nameKey, StringUtils.isNotEmpty(iotCode) ? iotCode : ""));
if (CollectionUtils.isNotEmpty(keys)) {
keys.forEach(x -> list.add(JSON.parseObject(redisUtils.get(x).toString(), IotDataVO.class)));
// 时间倒序排序
......@@ -87,9 +84,9 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
}
@Override
public List<IotDataVO> getDataToRedisByDateBetween(String infoCode, String nameKey, String iotCode, Date startDate, Date endDate) {
public List<IotDataVO> getDataToRedisByDateBetween(String infoCode, String nameKey, String iotCode, Date startDate, Date endDate, String bizOrgCode) {
List<IotDataVO> list = new ArrayList<>();
Set<String> keys = redisUtils.getKeys(String.join(":", infoCode, nameKey, StringUtils.isNotEmpty(iotCode) ? iotCode : ""));
Set<String> keys = redisUtils.getKeys(String.join(":", infoCode, bizOrgCode, nameKey, StringUtils.isNotEmpty(iotCode) ? iotCode : ""));
if (CollectionUtils.isNotEmpty(keys)) {
keys.forEach(x -> {
String[] split = x.split(":");
......@@ -109,9 +106,9 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
return list;
}
public List<PressurePumpCountVo> getCountDataToRedisByDateBetween(String infoCode, String countRedisKey, String nameKey, String iotCode, Date startDate, Date endDate) {
public List<PressurePumpCountVo> getCountDataToRedisByDateBetween(String infoCode, String countRedisKey, String nameKey, String iotCode, Date startDate, Date endDate, String bizOrgCode) {
List<PressurePumpCountVo> list = new ArrayList<>();
Set<String> keys = redisUtils.getKeys(String.join(":", infoCode, countRedisKey, nameKey, StringUtils.isNotEmpty(iotCode) ? iotCode : ""));
Set<String> keys = redisUtils.getKeys(String.join(":", infoCode, bizOrgCode, countRedisKey, nameKey, StringUtils.isNotEmpty(iotCode) ? iotCode : ""));
if (CollectionUtils.isNotEmpty(keys)) {
keys.forEach(x -> {
String[] split = x.split(":");
......@@ -188,7 +185,7 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
public Map<String, List<IotDataVO>> getDataList(List<Map<String, Object>> pumpInfoList, String infoCode, String equipmentCode, String top, String nameKey, String bizOrgCode, String iotCode) {
Map<String, List<IotDataVO>> map = new HashMap<>();
// 获取redis稳压泵缓存数据,默认JSON配置最近4小时
List<IotDataVO> dataList = getDataToRedis(infoCode, nameKey, iotCode);
List<IotDataVO> dataList = getDataToRedis(infoCode, nameKey, iotCode, bizOrgCode);
// 过滤指定值数据
List<IotDataVO> dataListFilterTrue = getDataListFilter(dataList, PressurePumpRelateEnum.IOT_INDEX_VALUE_TRUE.getValue());
List<IotDataVO> dataListFilterFalse = getDataListFilter(dataList, PressurePumpRelateEnum.IOT_INDEX_VALUE_FALSE.getValue());
......@@ -280,12 +277,12 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
String iotCodeStr = iotCode.toString();
Date startDate = DateUtils.convertStrToDate(startTime, DateUtils.DATE_PATTERN);
Date endDate = DateUtils.convertStrToDate(endTime, DateUtils.DATE_PATTERN);
List<PressurePumpCountVo> dataList = getCountDataToRedisByDateBetween(PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), countRedisKey, nameKey, iotCodeStr, startDate, endDate);
List<PressurePumpCountVo> dataList = getCountDataToRedisByDateBetween(PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), countRedisKey, nameKey, iotCodeStr, startDate, endDate, bizOrgCode);
Long days = DateUtils.getDurationDays(startTime, endTime, DateUtils.DATE_TIME_PATTERN);
if (CollectionUtils.isEmpty(dataList) || (CollectionUtils.isNotEmpty(dataList) && dataList.size() < days)) {
String prefix = ObjectUtils.isEmpty(iotCode) ? "" : iotCodeStr.substring(0, 8);
String suffix = ObjectUtils.isEmpty(iotCode) ? "" : iotCodeStr.substring(8);
dataList = getIotCountData(startTime, endTime, infoCode, countRedisKey, prefix, suffix, PressurePumpRelateEnum.IOT_INDEX_VALUE_TRUE.getValue(), nameKey, countExpire);
dataList = getIotCountData(startTime, endTime, infoCode, countRedisKey, prefix, suffix, PressurePumpRelateEnum.IOT_INDEX_VALUE_TRUE.getValue(), nameKey, countExpire, bizOrgCode);
}
dataMap.put(iotCodeStr, dataList);
}
......@@ -298,7 +295,7 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
}
@Override
public List<PressurePumpCountVo> getIotCountData(String startTime, String endTime, String infoCode, String countRedisKey, String prefix, String suffix, String key, String fieldKey, long expire) {
public List<PressurePumpCountVo> getIotCountData(String startTime, String endTime, String infoCode, String countRedisKey, String prefix, String suffix, String key, String fieldKey, long expire, String bizOrgCode) {
List<Map<String, String>> dataMapList = getIotCommonListData(startTime, endTime, prefix, suffix, key, fieldKey);
if (CollectionUtils.isNotEmpty(dataMapList) && StringUtils.isNotBlank(key)) {
Map<String, List<Map<String, String>>> dataMap = dataMapList.stream().filter(y -> y.containsKey(PressurePumpRelateEnum.CREATED_TIME.getValue()) && y.get(fieldKey) != null && key.equalsIgnoreCase(y.get(fieldKey))).collect(Collectors.groupingBy(e -> e.get(PressurePumpRelateEnum.CREATED_TIME.getValue()).substring(0, 10)));
......@@ -310,7 +307,7 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
countVo.setTime(time);
countVo.setValue(dataMap.get(time).size());
// 获取的数据存储到redis
String topic = String.join(":", infoCode, countRedisKey, fieldKey, iotCode, time);
String topic = String.join(":", infoCode, bizOrgCode, countRedisKey, fieldKey, iotCode, time);
countVo.setIotCode(topic);
redisUtils.set(topic, JSON.toJSONString(countVo), expire);
dataList.add(countVo);
......
......@@ -195,11 +195,11 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap
}
// 获取redis稳压泵缓存数据,默认JSON配置最近4小时
List<IotDataVO> DataList = pressurePumpService.getDataToRedis(PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), pressurePumpStart, null);
List<IotDataVO> DataList = pressurePumpService.getDataToRedis(PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), pressurePumpStart, null, bizOrgCode);
if(CollectionUtils.isEmpty(DataList)){
//从influxdb中获取最近一次启停间隔
List<Map<String, String>> iotDataList = pressurePumpService.getIotTopSingleField(top, prefix, null, null, pressurePumpStart);
iotDataList.stream().forEach(e -> {
iotDataList.forEach(e -> {
try {
IotDataVO iotDataVO = (IotDataVO) pressurePumpService.mapToObject(e, IotDataVO.class, pressurePumpStart);
DataList.add(iotDataVO);
......
......@@ -1733,7 +1733,8 @@
wes.realtime_iot_es_index_id realtimeIotSpecificIndexId,
wes.realtime_iot_index_update_date realtiemIotIndexUpdateDate,
wes.realtime_iot_index_id realtimeIotIndexId,
wes.iot_code iotCode
wes.iot_code iotCode,
wes.biz_org_code bizOrgCode
FROM
wl_equipment_specific wes
<where>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment