Commit ce382de3 authored by wujiang's avatar wujiang

添加物联告警缓存

parent 4388d6ea
......@@ -2,6 +2,7 @@ package com.yeejoin.amos.api.alarm.service.impl;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
......@@ -18,6 +19,9 @@ public class AlarmKafkaConsumer {
@Autowired
PointSystemServiceImpl pointSystemServiceImpl;
@Value("${warning.wait.time:1000}")
private long waitTime;
// 消费者来处理消息
@KafkaListener(id = "alarmInfo", topics = { "${kafka.equipment.alarm}" })
......@@ -29,7 +33,7 @@ public class AlarmKafkaConsumer {
pointSystemServiceImpl.sendWarningAsync(date);
// 等待1s
Thread.sleep(1000L);
Thread.sleep(waitTime);
// 手动提交
ack.acknowledge();
......
......@@ -6,7 +6,6 @@ import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
......@@ -39,6 +38,7 @@ import com.yeejoin.amos.api.alarm.mapper2.JumpConfigMapper;
import com.yeejoin.amos.api.alarm.mapper2.KKSDataMapper;
import com.yeejoin.amos.api.alarm.mapper2.StationBasicMapper;
import com.yeejoin.amos.api.alarm.service.IPointSystemService;
import com.yeejoin.amos.api.alarm.utils.RedisUtils;
/**
* @description:
......@@ -74,6 +74,18 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
@Autowired
private StationBasicMapper stationBasicMapper;
@Autowired
private RedisUtils redisUtil;
@Value("${warning.redis.use:false}")
private boolean redisUse;
@Value("${warning.redis.limit.time:3600}")
private long limitTime;
@Value("${warning.SYZ:false}")
private boolean warnSYZ;
public String getJumpUrlByInfo(String sbbm) {
List<JumpConfig> jumpConfigs = jumpConfigMapper.selectList(null);
Map<String, String> collect = jumpConfigs.stream()
......@@ -115,9 +127,9 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
// @PostConstruct
// public void test() {
// String address = "16469";
// String address = "22163";
// String value = "1.0";
// String valueLabe = "1.0";
// String valueLabe = "扇区1限功率";
// String gatewayId = "1668801435891929089";
// String isAlarm = "1";
// this.sendWarning(address, value, valueLabe, gatewayId, isAlarm);
......@@ -146,7 +158,10 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
pointSystemWrapper.lambda().eq(PointSystem::getGatewayId, gatewayId);
List<PointSystem> pointSystems = pointSystemMapper.selectList(pointSystemWrapper);
if (pointSystems == null || pointSystems.size() < 1) {
throw new RuntimeException("获取kks码失败!");
// throw new RuntimeException("获取kks码失败: "+"address: " + address + ",gatewayId:
// " + gatewayId + " ,value:" + value);
System.out.println("获取kks码失败: " + "address:" + address + ", gatewayId:" + gatewayId + " ,value:" + value);
return;
}
PointSystem pointSystem = pointSystems.get(0);
......@@ -159,10 +174,12 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
KKSData KKSData = kksDataMapper.selectOne(KKSDataWrapper);
JSONObject eqdata = new JSONObject();
if (KKSData == null) {
throw new RuntimeException("kks码查询热工院表不存在:" + pointSystem.getKks());
// throw new RuntimeException("kks码查询热工院表不存在:" + pointSystem.getKks());
System.out.println("kks码查询热工院表不存在:" + pointSystem.getKks());
return;
}
eqdata.put("kksms", KKSData.getKKSMS());
QueryWrapper<StationBasic> stationWrapper = new QueryWrapper<>();
stationWrapper.lambda().eq(StationBasic::getStationNumber, pointSystem.getStation());
StationBasic stationBasic = stationBasicMapper.selectOne(stationWrapper);
......@@ -170,7 +187,9 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
eqdata.put("sourceAttribution", stationBasic.getProjectOrgCode());
eqdata.put("sourceAttributionDesc", stationBasic.getStationName());
} else {
throw new RuntimeException("获取场站失败!");
// throw new RuntimeException("获取场站失败: " + pointSystem.getStation());
System.out.println("获取场站失败: " + pointSystem.getStation());
return;
}
try {
// Map<String, String> maps = new HashMap<>();
......@@ -192,9 +211,11 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
// eqdata = (JSONObject) list.get(0);
// 组装数据,发送预警
WarningDto warningDto = setWarningDto(pointSystem, eqdata, valueLabe);
emqKeeper.getMqttClient().publish(STATIONWARNING, JSON.toJSONString(warningDto).getBytes(), 0, false);
System.out.println("发送预警成功: " + JSON.toJSONString(warningDto));
logger.info("发送预警成功: " + JSON.toJSONString(warningDto));
if (warningDto != null) {
emqKeeper.getMqttClient().publish(STATIONWARNING, JSON.toJSONString(warningDto).getBytes(), 0, false);
System.out.println("发送预警成功: " + JSON.toJSONString(warningDto));
logger.info("发送预警成功: " + JSON.toJSONString(warningDto));
}
} catch (Exception e) {
e.printStackTrace();
}
......@@ -212,16 +233,11 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
DynamicDetails dynamicDetails = new DynamicDetails(TABNAME, tabContent);
List<DynamicDetails> dynamicDetailsList = new ArrayList<>();
dynamicDetailsList.add(dynamicDetails);
StringBuilder indexKey = new StringBuilder(pointSystem.getStationAbbr()).append("#")
.append(pointSystem.getNumber()).append("#").append(pointSystem.getFunctionNum());
StringBuilder indexKey = new StringBuilder(pointSystem.getStation()).append("#").append(pointSystem.getNumber())
.append("#").append(pointSystem.getFunctionNum());
if (pointSystem.getNumber() == null) {
indexKey = new StringBuilder(pointSystem.getStationAbbr()).append("#").append(pointSystem.getFunctionNum());
}
// 如果是升压站
if ("SYZ".equals(pointSystem.getNumber())) {
indexKey = new StringBuilder(pointSystem.getStationAbbr()).append("#").append(pointSystem.getNumber())
.append("#").append(pointSystem.getFunctionNum());
}
QueryWrapper<EquipmentSpecificIndex> indexWrapper = new QueryWrapper<>();
indexWrapper.lambda().eq(EquipmentSpecificIndex::getIndexAddress, pointSystem.getAddress());
......@@ -232,24 +248,44 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
if ("遥信".equals(pointSystem.getType())) {
indexValue = esi.getEquipmentIndexName();
} else if ("遥测".equals(pointSystem.getType())) {
if ("XZ".equals(pointSystem.getStationAbbr())) {
String json = esi.getValueEnum();
JSONArray arr = JSONArray.parseArray(json);
List<JSONObject> list = arr.toJavaList(JSONObject.class);
Map<String, String> map = list.stream()
.collect(Collectors.toMap(i -> i.getString("key"), i -> i.getString("label")));
indexValue = map.get(valueLabe);
} else if ("TH".equals(pointSystem.getStationAbbr())) {
if ("W005".equals(pointSystem.getStation())) {
// String json = esi.getValueEnum();
// JSONArray arr = JSONArray.parseArray(json);
// List<JSONObject> list = arr.toJavaList(JSONObject.class);
// Map<String, String> map = list.stream()
// .collect(Collectors.toMap(i -> i.getString("key"), i -> i.getString("label")));
indexValue = valueLabe;
} else if ("P001".equals(pointSystem.getStation())) {
}
} else {
}
WarningDto WarningDto = new WarningDto(indexKey.toString(), indexValue, null,
(String) eqdata.get("sourceAttributionDesc"), (String) eqdata.get("sourceAttribution"),
dynamicDetailsList, warningObjectCode, time, (String) eqdata.get("kksms"), "equip",
getJumpUrlByInfo(warningObjectCode));
return WarningDto;
}
}
// 如果是升压站
if ("SYZ".equals(pointSystem.getNumber())) {
// 如果开启升压站预警
if (warnSYZ) {
indexKey = new StringBuilder(pointSystem.getStation()).append("#").append(pointSystem.getNumber())
.append("#").append(pointSystem.getFunctionNum());
} else {
System.out.println("升压站预警不发送: " + warningObjectCode + " , " + indexValue);
return null;
}
}
String key = "104_warning:" + warningObjectCode + "_" + indexKey.toString() + "_" + indexValue;
// 添加缓存机制 在有限的时间内防止一直触发
if (redisUtil.get(key) != null&&redisUse) {
System.out.println("预警缓存存在,不触发: " + key);
return null;
} else {
WarningDto WarningDto = new WarningDto(indexKey.toString(), indexValue, null,
(String) eqdata.get("sourceAttributionDesc"), (String) eqdata.get("sourceAttribution"),
dynamicDetailsList, warningObjectCode, time, (String) eqdata.get("kksms"), "equip",
getJumpUrlByInfo(warningObjectCode));
redisUtil.set(key, indexValue, limitTime);
return WarningDto;
}
}
}
\ No newline at end of file
package com.yeejoin.amos.api.alarm.utils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.typroject.tyboot.component.cache.enumeration.CacheType;
import org.typroject.tyboot.core.foundation.context.RequestContext;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author DELL
*/
@Component
public class RedisUtils {
public static String VAR_SPLITOR = ":";
public static Long DEFAULT_SESSION_EXPIRATION = 2592000L;
@Autowired
private RedisTemplate redisTemplate;
/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
* @return
*/
public boolean expire(String key, long time) {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
return true;
} else {
throw new RuntimeException("超时时间小于0");
}
}
/**
* 根据key 获取过期时间
*
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
return redisTemplate.hasKey(key);
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
*
* @param key 键
* @return 值
*/
public Object get(String key) {
Object object = redisTemplate.opsForValue().get(key);
return key == null ? null : object;
}
/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
redisTemplate.opsForValue().set(key, value);
return true;
}
/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
this.set(key, value);
}
return true;
}
/**
* 递增
*
* @param key 键
* @param by 要增加几(大于0)
* @return
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
*
* @param key 键
* @param by 要减少几(小于0)
* @return
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return 值
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hmset(String key, Map<String, Object> map) {
redisTemplate.opsForHash().putAll(key, map);
return true;
}
/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
redisTemplate.opsForHash().put(key, item, value);
return true;
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
* @return
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
* @return
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
*
* @param key 键
* @return
*/
public Set<Object> sGet(String key) {
return redisTemplate.opsForSet().members(key);
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
return redisTemplate.opsForSet().isMember(key, value);
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
return redisTemplate.opsForSet().add(key, values);
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
final Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
}
/**
* 获取set缓存的长度
*
* @param key 键
* @return
*/
public long sGetSetSize(String key) {
return redisTemplate.opsForSet().size(key);
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
final Long count = redisTemplate.opsForSet().remove(key, values);
return count;
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
* @return
*/
public List<Object> lGet(String key, long start, long end) {
return redisTemplate.opsForList().range(key, start, end);
}
/**
* 获取list缓存的长度
*
* @param key 键
* @return
*/
public long lGetListSize(String key) {
return redisTemplate.opsForList().size(key);
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
* @return
*/
public Object lGetIndex(String key, long index) {
return redisTemplate.opsForList().index(key, index);
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, Object value) {
redisTemplate.opsForList().rightPush(key, value);
return true;
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, Object value, long time) {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) {
expire(key, time);
}
return true;
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) {
expire(key, time);
}
return true;
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
redisTemplate.opsForList().set(key, index, value);
return true;
}
public Long getAndDeletePatternKeys(String pattern) {
Set<String> keys = redisTemplate.keys(pattern);
if (!CollectionUtils.isEmpty(keys)) {
return redisTemplate.delete(keys);
}
return null;
}
public Set<String> getPatternKeys(String pattern) {
return redisTemplate.keys(pattern);
}
/**
* 获取指定前缀key列表
*
* @return
*/
public Set<String> getKeys(String prefix) {
return redisTemplate.keys(prefix.concat("*"));
}
public Boolean refresh(String token) {
String cacheKey = genKey(new String[]{CacheType.ERASABLE.name(), "SESSION_TOKEN", RequestContext.getProduct(), token});
boolean hasKey = redisTemplate.hasKey(cacheKey);
if (hasKey) {
redisTemplate.expire(cacheKey, DEFAULT_SESSION_EXPIRATION, TimeUnit.SECONDS);
}
return hasKey;
}
public static String genKey(String... keyMembers) {
return StringUtils.join(keyMembers, VAR_SPLITOR).toUpperCase();
}
}
......@@ -36,3 +36,12 @@ kafka.equipment.test=test88888
power.station.url=http://172.16.4.29:80/prod-api/fdgl/process/DataInterface
#电站104采集预警
power.station.warning=104/data/analysis
#104发送告警间隔时间
warning.wait.time=100
#104缓存预警过滤
warning.redis.use=false
#104缓存预警过滤过期时间单位秒
warning.redis.limit.time=3600
#104升压站预警发送
warning.SYZ=false
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment