Commit 7c0f50b3 authored by zhangsen's avatar zhangsen

预警存入 td相关

parent 9f7d2709
package com.yeejoin.amos.boot.module.jxiop.biz.emqx;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
......@@ -7,9 +8,11 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.HealthStatusIndicatorServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizFanWarningRecordServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizPvWarningRecordServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.*;
import com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.FanWaringRecordMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.PvWaringRecordMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.FanWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.PvWarningRecord;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -45,6 +48,18 @@ public class WarningRecordStatusMessage extends EmqxListener {
@Autowired
private IdxBizPvWarningRecordServiceImpl idxBizPvWarningRecordService;
@Autowired
private FanWaringRecordMapper fanWaringRecordMapper;
@Autowired
private PvWaringRecordMapper pvWaringRecordMapper;
@Autowired
FanWarningRecordServiceImpl fanWarningRecordService;
@Autowired
PvWarningRecordServiceImpl pvWarningRecordService;
@PostConstruct
void init() throws Exception {
new Thread(taskRunnable).start();
......@@ -96,6 +111,13 @@ public class WarningRecordStatusMessage extends EmqxListener {
lambda.in(IdxBizFanWarningRecord::getSequenceNbr, traceIds);
idxBizFanWarningRecordService.update(lambda);
LambdaUpdateWrapper<FanWarningRecord> lambdaTd = new LambdaUpdateWrapper<>();
lambdaTd.set(FanWarningRecord::getDisposotionState, "已处置");
lambdaTd.set(FanWarningRecord::getStatus, "1");
lambdaTd.set(FanWarningRecord::getDisposotionDate, DateUtil.now());
lambdaTd.in(FanWarningRecord::getTs, traceIds);
fanWarningRecordService.update(lambdaTd);
}
public void jxIopUpdatePv(JSONArray analysisResult) {
......@@ -108,5 +130,14 @@ public class WarningRecordStatusMessage extends EmqxListener {
lambda.set(IdxBizPvWarningRecord::getDisposotionDate, new Date());
lambda.in(IdxBizPvWarningRecord::getSequenceNbr, traceIds);
idxBizPvWarningRecordService.update(lambda);
// td
LambdaUpdateWrapper<PvWarningRecord> lambdaTd = new LambdaUpdateWrapper<>();
lambdaTd.set(PvWarningRecord::getDisposotionState, "已处置");
lambdaTd.set(PvWarningRecord::getStatus, "1");
lambdaTd.set(PvWarningRecord::getDisposotionDate, DateUtil.now());
lambdaTd.in(PvWarningRecord::getTs, traceIds);
pvWarningRecordService.update(lambdaTd);
}
}
package com.yeejoin.amos.boot.module.jxiop.biz.service.impl;
import com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.FanWaringRecordMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.FanWarningRecord;
import org.springframework.stereotype.Service;
import org.typroject.tyboot.core.rdbms.service.BaseService;
@Service
public class FanWarningRecordServiceImpl extends BaseService<FanWarningRecord, FanWarningRecord, FanWaringRecordMapper> {
}
package com.yeejoin.amos.boot.module.jxiop.biz.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.jxiop.api.dto.BizMessage;
import com.yeejoin.amos.boot.module.jxiop.api.dto.RiskBizInfoVo;
......@@ -13,8 +15,11 @@ import com.yeejoin.amos.boot.module.jxiop.api.mapper.StationBasicMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.Enum.WarningNameEnum;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.*;
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.*;
import com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.*;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.*;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
......@@ -71,6 +76,27 @@ public class HealthStatusIndicatorServiceImpl {
@Autowired
IdxBizPvHealthIndexMapper idxBizPvHealthIndexMapper;
@Autowired
PvHealthIndexHourMapper pvHealthIndexHourMapper;
@Autowired
PvHealthIndexDayMapper pvHealthIndexDayMapper;
@Autowired
PvHealthIndexMomentMapper pvHealthIndexMomentMapper;
@Autowired
FanHealthIndexHourMapper fanHealthIndexHourMapper;
@Autowired
FanHealthIndexDayMapper fanHealthIndexDayMapper;
@Autowired
FanHealthIndexMomentMapper fanHealthIndexMomentMapper;
@Autowired
IdxBizPvWarningRuleSetMapper idxBizPvWarningRuleSetMapper;
@Autowired
......@@ -106,6 +132,14 @@ public class HealthStatusIndicatorServiceImpl {
*/
public static final String SMART_ANALYSE_FAN = "smartAnalyseFan";
@Autowired
private FanWaringRecordMapper fanWaringRecordMapper;
@Autowired
private FanWarningRecordServiceImpl fanWarningRecordService;
@Autowired
private PvWaringRecordMapper pvWaringRecordMapper;
/***
* 每一小时获取一次最大粒度内的指数异常数据
......@@ -119,19 +153,21 @@ public class HealthStatusIndicatorServiceImpl {
if (!openHealth){
return;
}
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
Date date = DateUtils.dateAddHours(time, -8);
log.info("光伏---------------------预警时间----"+time);
// Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-1);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
LambdaQueryWrapper<IdxBizPvHealthIndex> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(IdxBizPvHealthIndex::getAnalysisType,"按时刻");
wrapper.ne(IdxBizPvHealthIndex::getHealthLevel,"安全");
wrapper.ge(IdxBizPvHealthIndex::getRecDate,df.format(calendar.getTime()));
wrapper.orderByDesc(IdxBizPvHealthIndex::getRecDate);
List<IdxBizPvHealthIndex> healthIndices = idxBizPvHealthIndexMapper.selectList(wrapper);
List<String> collect = healthIndices.stream().map(IdxBizPvHealthIndex::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<PvHealthIndexMoment> wrapper = new LambdaQueryWrapper<>();
wrapper.ne(PvHealthIndexMoment::getHealthLevel,"安全");
wrapper.ge(PvHealthIndexMoment::getTs,date);
wrapper.eq(PvHealthIndexMoment::getAnalysisObjType, "测点");
wrapper.orderByDesc(PvHealthIndexMoment::getTs);
List<PvHealthIndexMoment> healthIndices = pvHealthIndexMomentMapper.selectList(wrapper);
List<String> collect = healthIndices.stream().map(PvHealthIndexMoment::getAnalysisObjSeq).collect(Collectors.toList());
if (null == healthIndices ){
return;
}
......@@ -141,8 +177,9 @@ public class HealthStatusIndicatorServiceImpl {
List<IdxBizPvWarningRuleSet> idxBizPvWarningRules = idxBizPvWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<IdxBizPvHealthIndex>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(IdxBizPvHealthIndex::getGatewayId, Collectors.groupingBy(IdxBizPvHealthIndex::getIndexAddress)));
Map<String, Map<String, List<PvHealthIndexMoment>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(PvHealthIndexMoment::getGatewayId, Collectors.groupingBy(PvHealthIndexMoment::getIndexAddress)));
List<IdxBizPvWarningRecord> idxBizPvWarningRecordList = new ArrayList<>();
List<PvWarningRecord> tdPvWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) {
......@@ -152,9 +189,9 @@ public class HealthStatusIndicatorServiceImpl {
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<IdxBizPvHealthIndex>> healthDataMaps = gateWayMaps.get(gateWayId);
Map<String, List<PvHealthIndexMoment>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
List<IdxBizPvHealthIndex> idxBizPvHealthIndices = healthDataMaps.get(address);
List<PvHealthIndexMoment> idxBizPvHealthIndices = healthDataMaps.get(address);
List<IdxBizPvWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizPvHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets) ){
continue;
......@@ -187,7 +224,7 @@ public class HealthStatusIndicatorServiceImpl {
}
List<Double> healthIndex = idxBizPvHealthIndices.stream().map(IdxBizPvHealthIndex::getHealthIndex).collect(Collectors.toList());
List<Double> healthIndex = idxBizPvHealthIndices.stream().map(PvHealthIndexMoment::getHealthIndex).collect(Collectors.toList());
Double finalHealthValueRisk = healthValueRisk;
long riskNum = healthIndex.stream().filter(e -> e <= finalHealthValueRisk).count();
Double finalHealthValueWarn = healthValueWarn;
......@@ -215,19 +252,18 @@ public class HealthStatusIndicatorServiceImpl {
}
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<IdxBizPvWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(IdxBizPvWarningRecord::getAnalysisPointId,idxBizPvHealthIndices.get(0).getAnalysisObjSeq());
query.eq(IdxBizPvWarningRecord::getStatus,0);
query.orderByDesc(IdxBizPvWarningRecord::getRecDate);
List<IdxBizPvWarningRecord> idxBizPvWarningRecords = idxBizPvWarningRecordMapper.selectList(query);
LambdaQueryWrapper<PvWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(PvWarningRecord::getAnalysisPointId,idxBizPvHealthIndices.get(0).getAnalysisObjSeq());
query.eq(PvWarningRecord::getStatus,0);
query.orderByDesc(PvWarningRecord::getTs);
List<PvWarningRecord> idxBizPvWarningRecords = pvWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizPvWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizPvWarningRecords.get(0).getWarningName()) ? 0 :1;
if (!level.equals("") && flag == 0){
IdxBizPvWarningRecord idxBizPvWarningRecord = new IdxBizPvWarningRecord();
idxBizPvWarningRecord.setKks(idxBizPvHealthIndices.get(0).getKks());
idxBizPvWarningRecord.setRecord(idxBizPvHealthIndices.get(0).getRecord());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArae());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArea());
idxBizPvWarningRecord.setStation(idxBizPvHealthIndices.get(0).getStation());
idxBizPvWarningRecord.setSubarray(idxBizPvHealthIndices.get(0).getSubarray());
idxBizPvWarningRecord.setGatewayId(gateWayId);
......@@ -245,14 +281,27 @@ public class HealthStatusIndicatorServiceImpl {
idxBizPvWarningRecord.setHealthIndexSeq(idxBizPvHealthIndices.get(0).getHealthIndex().toString());
idxBizPvWarningRecord.setHealthLevel(idxBizPvHealthIndices.get(0).getHealthLevel());
idxBizPvWarningRecordList.add(idxBizPvWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
PvWarningRecord pvWarningRecord = new PvWarningRecord();
BeanUtils.copyProperties(idxBizPvWarningRecord, pvWarningRecord, "disposotionDate", "recDate", "CONTENT");
pvWarningRecord.setContent(idxBizPvWarningRecord.getCONTENT());
pvWarningRecord.setRecDate(format);
pvWarningRecord.setTs(timestamp);
tdPvWarningRecordList.add(pvWarningRecord);
// idxBizPvWarningRecordMapper.insert(idxBizPvWarningRecord);
}
}
}
idxBizPvWarningRecordService.saveBatch(idxBizPvWarningRecordList);
if (CollUtil.isNotEmpty(tdPvWarningRecordList)) {
// tdengine插入
pvWaringRecordMapper.saveBatchWarningRecords(tdPvWarningRecordList);
}
// 触发风险模型生成预警处置模块的预警记录
fetchDataPv(idxBizPvWarningRecordList, stationMap);
fetchDataPv(tdPvWarningRecordList, stationMap);
}
......@@ -269,18 +318,20 @@ public class HealthStatusIndicatorServiceImpl {
return;
}
Date time = new Date();
Date date = DateUtils.dateAddHours(time, -8);
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-5);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
LambdaQueryWrapper<IdxBizPvHealthIndex> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(IdxBizPvHealthIndex::getAnalysisType,"按小时");
wrapper.ne(IdxBizPvHealthIndex::getHealthLevel,"安全");
wrapper.eq(IdxBizPvHealthIndex::getAnalysisObjType,"测点");
wrapper.ge(IdxBizPvHealthIndex::getRecDate,df.format(calendar.getTime()));
wrapper.orderByDesc(IdxBizPvHealthIndex::getRecDate);
List<IdxBizPvHealthIndex> healthIndices = idxBizPvHealthIndexMapper.selectList(wrapper);
List<String> collect = healthIndices.stream().map(IdxBizPvHealthIndex::getAnalysisObjSeq).collect(Collectors.toList());
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
LambdaQueryWrapper<PvHealthIndexHour> wrapper = new LambdaQueryWrapper<>();
wrapper.ne(PvHealthIndexHour::getHealthLevel,"安全");
wrapper.eq(PvHealthIndexHour::getAnalysisObjType,"测点");
wrapper.ge(PvHealthIndexHour::getTs,date);
wrapper.orderByDesc(PvHealthIndexHour::getTs);
List<PvHealthIndexHour> healthIndices = pvHealthIndexHourMapper.selectList(wrapper);
List<String> collect = healthIndices.stream().map(PvHealthIndexHour::getAnalysisObjSeq).collect(Collectors.toList());
if (null == healthIndices ){
return;
}
......@@ -290,8 +341,9 @@ public class HealthStatusIndicatorServiceImpl {
List<IdxBizPvWarningRuleSet> idxBizPvWarningRules = idxBizPvWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<IdxBizPvHealthIndex>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(IdxBizPvHealthIndex::getGatewayId, Collectors.groupingBy(IdxBizPvHealthIndex::getIndexAddress)));
Map<String, Map<String, List<PvHealthIndexHour>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(PvHealthIndexHour::getGatewayId, Collectors.groupingBy(PvHealthIndexHour::getIndexAddress)));
List<IdxBizPvWarningRecord> idxBizPvWarningRecordList = new ArrayList<>();
List<PvWarningRecord> tdPvWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) {
......@@ -301,9 +353,9 @@ public class HealthStatusIndicatorServiceImpl {
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<IdxBizPvHealthIndex>> healthDataMaps = gateWayMaps.get(gateWayId);
Map<String, List<PvHealthIndexHour>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
List<IdxBizPvHealthIndex> idxBizPvHealthIndices = healthDataMaps.get(address);
List<PvHealthIndexHour> idxBizPvHealthIndices = healthDataMaps.get(address);
List<IdxBizPvWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizPvHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets) ){
continue;
......@@ -338,7 +390,7 @@ public class HealthStatusIndicatorServiceImpl {
}
List<Double> healthIndex = idxBizPvHealthIndices.stream().map(IdxBizPvHealthIndex::getHealthIndex).collect(Collectors.toList());
List<Double> healthIndex = idxBizPvHealthIndices.stream().map(PvHealthIndexHour::getHealthIndex).collect(Collectors.toList());
Double finalHealthValueRisk = healthValueRisk;
long riskNum = healthIndex.stream().filter(e -> e <= finalHealthValueRisk).count();
Double finalHealthValueWarn = healthValueWarn;
......@@ -364,19 +416,18 @@ public class HealthStatusIndicatorServiceImpl {
content = healthValueRiskCount + "小时";
}
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<IdxBizPvWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(IdxBizPvWarningRecord::getAnalysisPointId,idxBizPvHealthIndices.get(0).getAnalysisObjSeq());
query.eq(IdxBizPvWarningRecord::getStatus,0);
query.orderByDesc(IdxBizPvWarningRecord::getRecDate);
List<IdxBizPvWarningRecord> idxBizPvWarningRecords = idxBizPvWarningRecordMapper.selectList(query);
LambdaQueryWrapper<PvWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(PvWarningRecord::getAnalysisPointId,idxBizPvHealthIndices.get(0).getAnalysisObjSeq());
query.eq(PvWarningRecord::getStatus,0);
query.orderByDesc(PvWarningRecord::getRecDate);
List<PvWarningRecord> idxBizPvWarningRecords = pvWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizPvWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizPvWarningRecords.get(0).getWarningName()) ? 0 :1;
if (!level.equals("") && flag == 0){
IdxBizPvWarningRecord idxBizPvWarningRecord = new IdxBizPvWarningRecord();
idxBizPvWarningRecord.setKks(idxBizPvHealthIndices.get(0).getKks());
idxBizPvWarningRecord.setRecord(idxBizPvHealthIndices.get(0).getRecord());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArae());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArea());
idxBizPvWarningRecord.setStation(idxBizPvHealthIndices.get(0).getStation());
idxBizPvWarningRecord.setSubarray(idxBizPvHealthIndices.get(0).getSubarray());
idxBizPvWarningRecord.setGatewayId(gateWayId);
......@@ -395,12 +446,26 @@ public class HealthStatusIndicatorServiceImpl {
idxBizPvWarningRecord.setHealthLevel(idxBizPvHealthIndices.get(0).getHealthLevel());
idxBizPvWarningRecordList.add(idxBizPvWarningRecord);
//idxBizPvWarningRecordMapper.insert(idxBizPvWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
PvWarningRecord pvWarningRecord = new PvWarningRecord();
BeanUtils.copyProperties(idxBizPvWarningRecord, pvWarningRecord, "disposotionDate", "recDate", "CONTENT");
pvWarningRecord.setContent(idxBizPvWarningRecord.getCONTENT());
pvWarningRecord.setRecDate(format);
pvWarningRecord.setTs(timestamp);
tdPvWarningRecordList.add(pvWarningRecord);
}
}
}
idxBizPvWarningRecordService.saveBatch(idxBizPvWarningRecordList);
if (CollUtil.isNotEmpty(tdPvWarningRecordList)) {
// tdengine插入
pvWaringRecordMapper.saveBatchWarningRecords(tdPvWarningRecordList);
}
// 触发风险模型生成预警处置模块的预警记录
fetchDataPv(idxBizPvWarningRecordList, stationMap);
fetchDataPv(tdPvWarningRecordList, stationMap);
}
/***
......@@ -417,27 +482,30 @@ public class HealthStatusIndicatorServiceImpl {
}
Calendar calendar = Calendar.getInstance();
Date time = new Date();
Date date = DateUtils.dateAddHours(time, -8);
calendar.set(Calendar.DAY_OF_MONTH,calendar.get(Calendar.DAY_OF_MONTH)-3);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
LambdaQueryWrapper<IdxBizPvHealthIndex> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(IdxBizPvHealthIndex::getAnalysisType,"按天");
wrapper.ne(IdxBizPvHealthIndex::getHealthLevel,"安全");
wrapper.ge(IdxBizPvHealthIndex::getRecDate,df.format(calendar.getTime()));
wrapper.orderByDesc(IdxBizPvHealthIndex::getRecDate);
List<IdxBizPvHealthIndex> healthIndices = idxBizPvHealthIndexMapper.selectList(wrapper);
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
LambdaQueryWrapper<PvHealthIndexDay> wrapper = new LambdaQueryWrapper<>();
wrapper.ne(PvHealthIndexDay::getHealthLevel, "安全");
wrapper.eq(PvHealthIndexDay::getAnalysisObjType, "测点");
wrapper.ge(PvHealthIndexDay::getRecDate, date);
wrapper.orderByDesc(PvHealthIndexDay::getTs);
List<PvHealthIndexDay> healthIndices = pvHealthIndexDayMapper.selectList(wrapper);
if (null == healthIndices ){
return;
}
List<String> collect = healthIndices.stream().map(IdxBizPvHealthIndex::getAnalysisObjSeq).collect(Collectors.toList());
List<String> collect = healthIndices.stream().map(PvHealthIndexDay::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizPvWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizPvWarningRuleSet::getAnalysisType,"按天");
queryWrapper.in(IdxBizPvWarningRuleSet::getAnalysisPointId, collect);
List<IdxBizPvWarningRuleSet> idxBizPvWarningRules = idxBizPvWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<IdxBizPvHealthIndex>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(IdxBizPvHealthIndex::getGatewayId, Collectors.groupingBy(IdxBizPvHealthIndex::getIndexAddress)));
Map<String, Map<String, List<PvHealthIndexDay>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(PvHealthIndexDay::getGatewayId, Collectors.groupingBy(PvHealthIndexDay::getIndexAddress)));
List<IdxBizPvWarningRecord> idxBizPvWarningRecordList = new ArrayList<>();
List<PvWarningRecord> tdPvWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) {
......@@ -447,9 +515,9 @@ public class HealthStatusIndicatorServiceImpl {
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<IdxBizPvHealthIndex>> healthDataMaps = gateWayMaps.get(gateWayId);
Map<String, List<PvHealthIndexDay>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
List<IdxBizPvHealthIndex> idxBizPvHealthIndices = healthDataMaps.get(address);
List<PvHealthIndexDay> idxBizPvHealthIndices = healthDataMaps.get(address);
List<IdxBizPvWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizPvHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets) ){
continue;
......@@ -481,7 +549,7 @@ public class HealthStatusIndicatorServiceImpl {
}
List<Double> healthIndex = idxBizPvHealthIndices.stream().map(IdxBizPvHealthIndex::getHealthIndex).collect(Collectors.toList());
List<Double> healthIndex = idxBizPvHealthIndices.stream().map(PvHealthIndexDay::getHealthIndex).collect(Collectors.toList());
Double finalHealthValueRisk = healthValueRisk;
long riskNum = healthIndex.stream().filter(e -> e <= finalHealthValueRisk).count();
Double finalHealthValueWarn = healthValueWarn;
......@@ -509,19 +577,18 @@ public class HealthStatusIndicatorServiceImpl {
content = healthValueRiskCount + "天";
}
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<IdxBizPvWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(IdxBizPvWarningRecord::getAnalysisPointId,idxBizPvHealthIndices.get(0).getAnalysisObjSeq());
query.eq(IdxBizPvWarningRecord::getStatus,0);
query.orderByDesc(IdxBizPvWarningRecord::getRecDate);
List<IdxBizPvWarningRecord> idxBizPvWarningRecords = idxBizPvWarningRecordMapper.selectList(query);
LambdaQueryWrapper<PvWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(PvWarningRecord::getAnalysisPointId,idxBizPvHealthIndices.get(0).getAnalysisObjSeq());
query.eq(PvWarningRecord::getStatus,0);
query.orderByDesc(PvWarningRecord::getTs);
List<PvWarningRecord> idxBizPvWarningRecords = pvWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizPvWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizPvWarningRecords.get(0).getWarningName()) ? 0 :1;
if (!level.equals("") && flag == 0){
IdxBizPvWarningRecord idxBizPvWarningRecord = new IdxBizPvWarningRecord();
idxBizPvWarningRecord.setKks(idxBizPvHealthIndices.get(0).getKks());
idxBizPvWarningRecord.setRecord(idxBizPvHealthIndices.get(0).getRecord());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArae());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArea());
idxBizPvWarningRecord.setStation(idxBizPvHealthIndices.get(0).getStation());
idxBizPvWarningRecord.setSubarray(idxBizPvHealthIndices.get(0).getSubarray());
idxBizPvWarningRecord.setGatewayId(gateWayId);
......@@ -540,12 +607,27 @@ public class HealthStatusIndicatorServiceImpl {
idxBizPvWarningRecord.setHealthLevel(idxBizPvHealthIndices.get(0).getHealthLevel());
idxBizPvWarningRecordList.add(idxBizPvWarningRecord);
//idxBizPvWarningRecordMapper.insert(idxBizPvWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
PvWarningRecord pvWarningRecord = new PvWarningRecord();
BeanUtils.copyProperties(idxBizPvWarningRecord, pvWarningRecord, "disposotionDate", "recDate", "CONTENT");
pvWarningRecord.setContent(idxBizPvWarningRecord.getCONTENT());
pvWarningRecord.setRecDate(format);
pvWarningRecord.setTs(timestamp);
tdPvWarningRecordList.add(pvWarningRecord);
}
}
}
idxBizPvWarningRecordService.saveBatch(idxBizPvWarningRecordList);
if (CollUtil.isNotEmpty(tdPvWarningRecordList)) {
// tdengine插入
pvWaringRecordMapper.saveBatchWarningRecords(tdPvWarningRecordList);
}
// 触发风险模型生成预警处置模块的预警记录
fetchDataPv(idxBizPvWarningRecordList, stationMap);
fetchDataPv(tdPvWarningRecordList, stationMap);
}
......@@ -557,28 +639,31 @@ public class HealthStatusIndicatorServiceImpl {
}
// Calendar calendar = Calendar.getInstance();
log.info("风机---------------------预警时间----"+time);
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-1);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
LambdaQueryWrapper<IdxBizFanHealthIndex> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(IdxBizFanHealthIndex::getAnalysisType,"按时刻");
wrapper.ne(IdxBizFanHealthIndex::getHealthLevel,"安全");
wrapper.ge(IdxBizFanHealthIndex::getRecDate,df.format(calendar.getTime()));
wrapper.orderByDesc(IdxBizFanHealthIndex::getRecDate);
List<IdxBizFanHealthIndex> healthIndices = idxBizFanHealthIndexMapper.selectList(wrapper);
// 用ts字段查询时需要减8小时
Date date = DateUtils.dateAddHours(time, -8);
LambdaQueryWrapper<FanHealthIndexMoment> wrapper = new LambdaQueryWrapper<>();
wrapper.ne(FanHealthIndexMoment::getHealthLevel,"安全");
wrapper.eq(FanHealthIndexMoment::getAnalysisObjType, "测点");
wrapper.ge(FanHealthIndexMoment::getTs, date);
wrapper.orderByDesc(FanHealthIndexMoment::getTs);
List<FanHealthIndexMoment> healthIndices = fanHealthIndexMomentMapper.selectList(wrapper);
if (ObjectUtils.isEmpty(healthIndices)){
return;
}
List<String> collect = healthIndices.stream().map(IdxBizFanHealthIndex::getAnalysisObjSeq).collect(Collectors.toList());
List<String> collect = healthIndices.stream().map(FanHealthIndexMoment::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType,"按时刻");
queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId,collect);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<IdxBizFanHealthIndex>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(IdxBizFanHealthIndex::getGatewayId, Collectors.groupingBy(IdxBizFanHealthIndex::getIndexAddress)));
Map<String, Map<String, List<FanHealthIndexMoment>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexMoment::getGatewayId, Collectors.groupingBy(FanHealthIndexMoment::getIndexAddress)));
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) {
......@@ -588,9 +673,9 @@ public class HealthStatusIndicatorServiceImpl {
basicLambdaQueryWrapper.last("limit 1");
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<IdxBizFanHealthIndex>> healthDataMaps = gateWayMaps.get(gateWayId);
Map<String, List<FanHealthIndexMoment>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
List<IdxBizFanHealthIndex> idxBizFanHealthIndices = healthDataMaps.get(address);
List<FanHealthIndexMoment> idxBizFanHealthIndices = healthDataMaps.get(address);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
......@@ -628,7 +713,7 @@ public class HealthStatusIndicatorServiceImpl {
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(IdxBizFanHealthIndex::getHealthIndex).collect(Collectors.toList());
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexMoment::getHealthIndex).collect(Collectors.toList());
Double finalHealthValueRisk = healthValueRisk;
long riskNum = healthIndex.stream().filter(e -> e <= finalHealthValueRisk).count();
Double finalHealthValueWarn = healthValueWarn;
......@@ -657,19 +742,18 @@ public class HealthStatusIndicatorServiceImpl {
content = healthValueRiskCount*10 + "分钟";
}
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<IdxBizFanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(IdxBizFanWarningRecord::getAnalysisPointId,idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(IdxBizFanWarningRecord::getStatus,0);
query.orderByDesc(IdxBizFanWarningRecord::getRecDate);
List<IdxBizFanWarningRecord> idxBizFanWarningRecords = idxBizFanWarningRecordMapper.selectList(query);
LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(FanWarningRecord::getAnalysisPointId,idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(FanWarningRecord::getStatus,0);
query.orderByDesc(FanWarningRecord::getTs);
List<FanWarningRecord> idxBizFanWarningRecords = fanWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizFanWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizFanWarningRecords.get(0).getWarningName()) ? 0 :1;
if (!level.equals("") && flag == 0){
IdxBizFanWarningRecord idxBizFanWarningRecord = new IdxBizFanWarningRecord();
idxBizFanWarningRecord.setKks(idxBizFanHealthIndices.get(0).getKks());
idxBizFanWarningRecord.setRecord(idxBizFanHealthIndices.get(0).getRecord());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArae());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArea());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(0).getStation());
idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(0).getSubSystem());
idxBizFanWarningRecord.setGatewayId(gateWayId);
......@@ -688,12 +772,33 @@ public class HealthStatusIndicatorServiceImpl {
idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(0).getHealthLevel());
idxBizFanWarningRecordList.add(idxBizFanWarningRecord);
//idxBizFanWarningRecordMapper.insert(idxBizFanWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
FanWarningRecord fanWarningRecord = new FanWarningRecord();
BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT");
fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT());
fanWarningRecord.setRecDate(format);
fanWarningRecord.setTs(timestamp);
fanWarningRecord.setHealthIndex(idxBizFanHealthIndices.get(0).getHealthIndex().toString());
tdFanWarningRecordList.add(fanWarningRecord);
}
}
}
idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
// tdengine插入
// fanWarningRecordService.saveBatch(tdFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList);
}
// 触发风险模型生成预警处置模块的预警记录
fetchDataFan(idxBizFanWarningRecordList, stationMap);
fetchDataFan(tdFanWarningRecordList, stationMap);
}
/***
......@@ -709,29 +814,31 @@ public class HealthStatusIndicatorServiceImpl {
return;
}
Date time = new Date();
String format = DateUtil.format(time, "yyyy-MM-dd HH:00:00");
Date date = DateUtils.dateAddHours(time, -8);
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-5);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
LambdaQueryWrapper<IdxBizFanHealthIndex> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(IdxBizFanHealthIndex::getAnalysisType,"按小时");
wrapper.eq(IdxBizFanHealthIndex::getAnalysisObjType,"测点");
wrapper.ne(IdxBizFanHealthIndex::getHealthLevel,"安全");
wrapper.ge(IdxBizFanHealthIndex::getRecDate,df.format(calendar.getTime()));
wrapper.orderByDesc(IdxBizFanHealthIndex::getRecDate);
List<IdxBizFanHealthIndex> healthIndices = idxBizFanHealthIndexMapper.selectList(wrapper);
LambdaQueryWrapper<FanHealthIndexHour> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(FanHealthIndexHour::getAnalysisObjType, "测点");
wrapper.ne(FanHealthIndexHour::getHealthLevel, "安全");
wrapper.ge(FanHealthIndexHour::getTs, date);
wrapper.orderByDesc(FanHealthIndexHour::getTs);
List<FanHealthIndexHour> healthIndices = fanHealthIndexHourMapper.selectList(wrapper);
if (ObjectUtils.isEmpty(healthIndices)){
return;
}
List<String> collect = healthIndices.stream().map(IdxBizFanHealthIndex::getAnalysisObjSeq).collect(Collectors.toList());
List<String> collect = healthIndices.stream().map(FanHealthIndexHour::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType,"按小时");
queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId,collect);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<IdxBizFanHealthIndex>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(IdxBizFanHealthIndex::getGatewayId, Collectors.groupingBy(IdxBizFanHealthIndex::getIndexAddress)));
Map<String, Map<String, List<FanHealthIndexHour>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexHour::getGatewayId, Collectors.groupingBy(FanHealthIndexHour::getIndexAddress)));
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) {
......@@ -741,9 +848,9 @@ public class HealthStatusIndicatorServiceImpl {
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<IdxBizFanHealthIndex>> healthDataMaps = gateWayMaps.get(gateWayId);
Map<String, List<FanHealthIndexHour>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
List<IdxBizFanHealthIndex> idxBizFanHealthIndices = healthDataMaps.get(address);
List<FanHealthIndexHour> idxBizFanHealthIndices = healthDataMaps.get(address);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
......@@ -779,7 +886,7 @@ public class HealthStatusIndicatorServiceImpl {
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(IdxBizFanHealthIndex::getHealthIndex).collect(Collectors.toList());
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexHour::getHealthIndex).collect(Collectors.toList());
Double finalHealthValueRisk = healthValueRisk;
long riskNum = healthIndex.stream().filter(e -> e <= finalHealthValueRisk).count();
Double finalHealthValueWarn = healthValueWarn;
......@@ -803,19 +910,18 @@ public class HealthStatusIndicatorServiceImpl {
num = ""+healthValueRisk;
content = healthValueRiskCount + "小时";
}
LambdaQueryWrapper<IdxBizFanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(IdxBizFanWarningRecord::getAnalysisPointId,idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(IdxBizFanWarningRecord::getStatus,0);
query.orderByDesc(IdxBizFanWarningRecord::getRecDate);
List<IdxBizFanWarningRecord> idxBizFanWarningRecords = idxBizFanWarningRecordMapper.selectList(query);
LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(FanWarningRecord::getAnalysisPointId,idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(FanWarningRecord::getStatus,0);
query.orderByDesc(FanWarningRecord::getTs);
List<FanWarningRecord> idxBizFanWarningRecords = fanWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizFanWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizFanWarningRecords.get(0).getWarningName()) ? 0 :1;
if (!level.equals("") && flag == 0){
IdxBizFanWarningRecord idxBizFanWarningRecord = new IdxBizFanWarningRecord();
idxBizFanWarningRecord.setKks(idxBizFanHealthIndices.get(0).getKks());
idxBizFanWarningRecord.setRecord(idxBizFanHealthIndices.get(0).getRecord());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArae());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArea());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(0).getStation());
idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(0).getSubSystem());
idxBizFanWarningRecord.setGatewayId(gateWayId);
......@@ -833,12 +939,27 @@ public class HealthStatusIndicatorServiceImpl {
idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(0).getHealthLevel());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(0).getPointName());
idxBizFanWarningRecordList.add(idxBizFanWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
FanWarningRecord fanWarningRecord = new FanWarningRecord();
BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT");
fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT());
fanWarningRecord.setRecDate(format);
fanWarningRecord.setTs(timestamp);
fanWarningRecord.setHealthIndex(idxBizFanHealthIndices.get(0).getHealthIndex().toString());
tdFanWarningRecordList.add(fanWarningRecord);
}
}
}
idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
// tdengine插入
fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList);
// 触发风险模型生成预警处置模块的预警记录
fetchDataFan(idxBizFanWarningRecordList, stationMap);
fetchDataFan(tdFanWarningRecordList, stationMap);
}
/***
......@@ -854,28 +975,31 @@ public class HealthStatusIndicatorServiceImpl {
return;
}
Date time = new Date();
String format = DateUtil.format(time, "yyyy-MM-dd 00:00:00");
Date date = DateUtils.dateAddHours(time, -8);
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.DAY_OF_MONTH,calendar.get(Calendar.DAY_OF_MONTH)-3);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
LambdaQueryWrapper<IdxBizFanHealthIndex> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(IdxBizFanHealthIndex::getAnalysisType,"按天");
wrapper.ne(IdxBizFanHealthIndex::getHealthLevel,"安全");
wrapper.ge(IdxBizFanHealthIndex::getRecDate,df.format(calendar.getTime()));
wrapper.orderByDesc(IdxBizFanHealthIndex::getRecDate);
List<IdxBizFanHealthIndex> healthIndices = idxBizFanHealthIndexMapper.selectList(wrapper);
LambdaQueryWrapper<FanHealthIndexDay> wrapper = new LambdaQueryWrapper<>();
wrapper.ne(FanHealthIndexDay::getHealthLevel,"安全");
wrapper.ge(FanHealthIndexDay::getRecDate,date);
wrapper.eq(FanHealthIndexDay::getAnalysisObjType, "测点");
wrapper.orderByDesc(FanHealthIndexDay::getTs);
List<FanHealthIndexDay> healthIndices = fanHealthIndexDayMapper.selectList(wrapper);
if (ObjectUtils.isEmpty(healthIndices)){
return;
}
List<String> collect = healthIndices.stream().map(IdxBizFanHealthIndex::getAnalysisObjSeq).collect(Collectors.toList());
List<String> collect = healthIndices.stream().map(FanHealthIndexDay::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType,"按天");
queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId,collect);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<IdxBizFanHealthIndex>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(IdxBizFanHealthIndex::getGatewayId, Collectors.groupingBy(IdxBizFanHealthIndex::getIndexAddress)));
Map<String, Map<String, List<FanHealthIndexDay>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexDay::getGatewayId, Collectors.groupingBy(FanHealthIndexDay::getIndexAddress)));
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) {
......@@ -885,9 +1009,9 @@ public class HealthStatusIndicatorServiceImpl {
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<IdxBizFanHealthIndex>> healthDataMaps = gateWayMaps.get(gateWayId);
Map<String, List<FanHealthIndexDay>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
List<IdxBizFanHealthIndex> idxBizFanHealthIndices = healthDataMaps.get(address);
List<FanHealthIndexDay> idxBizFanHealthIndices = healthDataMaps.get(address);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
......@@ -924,7 +1048,7 @@ public class HealthStatusIndicatorServiceImpl {
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(IdxBizFanHealthIndex::getHealthIndex).collect(Collectors.toList());
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexDay::getHealthIndex).collect(Collectors.toList());
Double finalHealthValueRisk = healthValueRisk;
long riskNum = healthIndex.stream().filter(e -> e <= finalHealthValueRisk).count();
Double finalHealthValueWarn = healthValueWarn;
......@@ -952,18 +1076,17 @@ public class HealthStatusIndicatorServiceImpl {
content = healthValueRiskCount + "天";
}
LambdaQueryWrapper<IdxBizFanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(IdxBizFanWarningRecord::getAnalysisPointId,idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(IdxBizFanWarningRecord::getStatus,0);
query.orderByDesc(IdxBizFanWarningRecord::getRecDate);
List<IdxBizFanWarningRecord> idxBizFanWarningRecords = idxBizFanWarningRecordMapper.selectList(query);
LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(FanWarningRecord::getAnalysisPointId,idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(FanWarningRecord::getStatus,0);
query.orderByDesc(FanWarningRecord::getTs);
List<FanWarningRecord> idxBizFanWarningRecords = fanWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizFanWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizFanWarningRecords.get(0).getWarningName()) ? 0 :1;
if (!level.equals("") && flag == 0){
if (!level.equals("") && flag == 0) {
IdxBizFanWarningRecord idxBizFanWarningRecord = new IdxBizFanWarningRecord();
idxBizFanWarningRecord.setKks(idxBizFanHealthIndices.get(0).getKks());
idxBizFanWarningRecord.setRecord(idxBizFanHealthIndices.get(0).getRecord());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArae());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArea());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(0).getStation());
idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(0).getSubSystem());
idxBizFanWarningRecord.setGatewayId(gateWayId);
......@@ -981,13 +1104,27 @@ public class HealthStatusIndicatorServiceImpl {
idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(0).getHealthLevel());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(0).getPointName());
idxBizFanWarningRecordList.add(idxBizFanWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
FanWarningRecord fanWarningRecord = new FanWarningRecord();
BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT");
fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT());
fanWarningRecord.setRecDate(format);
fanWarningRecord.setTs(timestamp);
tdFanWarningRecordList.add(fanWarningRecord);
}
}
}
idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
// tdengine插入
fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList);
// 触发风险模型生成预警处置模块的预警记录
fetchDataFan(idxBizFanWarningRecordList, stationMap);
fetchDataFan(tdFanWarningRecordList, stationMap);
}
......@@ -997,7 +1134,7 @@ public class HealthStatusIndicatorServiceImpl {
* @return
*/
@Async
public void fetchDataPv(List<IdxBizPvWarningRecord> idxBizPvWarningRecords, HashMap<String, StationBasic> stationMap) {
public void fetchDataPv(List<PvWarningRecord> idxBizPvWarningRecords, HashMap<String, StationBasic> stationMap) {
log.info("===开始触发风险预警模型===数量为:{}", idxBizPvWarningRecords.size());
idxBizPvWarningRecords.stream().forEach(idxBizPvWarningRecord -> {
BizMessage bizMessage = new BizMessage();
......@@ -1017,7 +1154,7 @@ public class HealthStatusIndicatorServiceImpl {
detailsVos.add(dynamicDetailsVo);
riskBizInfoVo.setDynamicDetails(detailsVos);
bizMessage.setBizInfo(riskBizInfoVo);
bizMessage.setTraceId2(idxBizPvWarningRecord.getSequenceNbr());
bizMessage.setTraceId2(idxBizPvWarningRecord.getTs().toString());
try {
emqKeeper.getMqttClient().publish(SMART_ANALYSE_PV + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
} catch (MqttException e) {
......@@ -1033,7 +1170,7 @@ public class HealthStatusIndicatorServiceImpl {
* @return
*/
@Async
public void fetchDataFan(List<IdxBizFanWarningRecord> idxBizFanWarningRecords, HashMap<String, StationBasic> stationMap) {
public void fetchDataFan(List<FanWarningRecord> idxBizFanWarningRecords, HashMap<String, StationBasic> stationMap) {
log.info("===开始触发风险预警模型===数量为:{}", idxBizFanWarningRecords.size());
idxBizFanWarningRecords.stream().forEach(idxBizFanWarningRecord -> {
BizMessage bizMessage = new BizMessage();
......@@ -1053,7 +1190,7 @@ public class HealthStatusIndicatorServiceImpl {
detailsVos.add(dynamicDetailsVo);
riskBizInfoVo.setDynamicDetails(detailsVos);
bizMessage.setBizInfo(riskBizInfoVo);
bizMessage.setTraceId2(idxBizFanWarningRecord.getSequenceNbr());
bizMessage.setTraceId2(idxBizFanWarningRecord.getTs().toString());
try {
emqKeeper.getMqttClient().publish(SMART_ANALYSE_FAN + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
} catch (MqttException e) {
......
package com.yeejoin.amos.boot.module.jxiop.biz.service.impl;
import com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.PvWaringRecordMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.PvWarningRecord;
import org.springframework.stereotype.Service;
import org.typroject.tyboot.core.rdbms.service.BaseService;
@Service
public class PvWarningRecordServiceImpl extends BaseService<PvWarningRecord, PvWarningRecord, PvWaringRecordMapper> {
}
package com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.FanHealthIndex;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.FanWarningRecord;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import org.apache.ibatis.annotations.Select;
import java.util.List;
......@@ -10,4 +14,6 @@ import java.util.Map;
public interface FanWaringRecordMapper extends BaseMapper<FanWarningRecord> {
public List<Map<String,Object>> selectFanWarningNum(String station);
int saveBatchWarningRecords(@Param("list") List<FanWarningRecord> list);
}
......@@ -3,7 +3,11 @@ package com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.FanWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.PvWarningRecord;
import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface PvWaringRecordMapper extends BaseMapper<PvWarningRecord> {
int saveBatchWarningRecords(@Param("list") List<PvWarningRecord> list);
}
......@@ -3,9 +3,11 @@ package com.yeejoin.amos.boot.module.jxiop.biz.tdengine;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
@Data
@TableName(value = "", autoResultMap = true)
public class FanWarningRecord {
@TableName(value = "fan_warning_record", autoResultMap = true)
public class FanWarningRecord implements Serializable {
private Long ts;
private String recDate;
private String disposotionState;
......@@ -25,4 +27,5 @@ public class FanWarningRecord {
private String disposotionDate;
private String kks;
private String warningPeriod;
private String status;
}
......@@ -4,10 +4,11 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@TableName(value = "", autoResultMap = true)
public class PvWarningRecord {
@TableName(value = "pv_warning_record", autoResultMap = true)
public class PvWarningRecord implements Serializable {
private Long ts;
private String recDate;
private String disposotionState;
......@@ -28,4 +29,5 @@ public class PvWarningRecord {
private String disposotionDate;
private String kks;
private String warningPeriod;
private String status;
}
......@@ -2,6 +2,60 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.FanWaringRecordMapper">
<insert id="saveBatchWarningRecords">
insert
into
fan_warning_record
<!-- (ts,-->
<!-- rec_date,-->
<!-- disposotion_state,-->
<!-- health_index_seq,-->
<!-- health_index,-->
<!-- analysis_point_id,-->
<!-- warning_name,-->
<!-- arae,-->
<!-- station,-->
<!-- subarray,-->
<!-- manufacturer,-->
<!-- device_type,-->
<!-- equipment_name,-->
<!-- gateway_id,-->
<!-- index_address,-->
<!-- content,-->
<!-- point_name,-->
<!-- health_level,-->
<!-- disposotion_date,-->
<!-- kks,-->
<!-- warning_period,-->
<!-- status)-->
values
<foreach collection="list" separator="," item="item" index="index">
(#{item.ts, jdbcType=TIMESTAMP},
#{item.recDate, jdbcType=VARCHAR},
#{item.disposotionState, jdbcType=VARCHAR},
<!-- #{item.healthIndexSeq, jdbcType=VARCHAR},-->
#{item.healthIndex, jdbcType=VARCHAR},
#{item.analysisPointId, jdbcType=VARCHAR},
#{item.warningName, jdbcType=VARCHAR},
#{item.arae, jdbcType=VARCHAR},
#{item.station, jdbcType=VARCHAR},
#{item.subSystem, jdbcType=VARCHAR},
#{item.number, jdbcType=VARCHAR},
#{item.equipmentName, jdbcType=VARCHAR},
#{item.gatewayId, jdbcType=VARCHAR},
#{item.indexAddress, jdbcType=VARCHAR},
#{item.content, jdbcType=VARCHAR},
#{item.pointName, jdbcType=VARCHAR},
#{item.healthLevel, jdbcType=VARCHAR},
#{item.disposotionDate, jdbcType=VARCHAR},
#{item.kks, jdbcType=VARCHAR},
#{item.warningPeriod, jdbcType=VARCHAR},
#{item.status, jdbcType=VARCHAR})
</foreach>
</insert>
<select id="selectFanWarningNum" resultType="map">
SELECT `b`.`EQUIPMENT_NAME` AS `EQUIPMENT_NAME`,
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.PvWaringRecordMapper">
<insert id="saveBatchWarningRecords">
insert
into
pv_warning_record
(ts,
rec_date,
disposotion_state,
health_index_seq,
health_index,
analysis_point_id,
warning_name,
arae,
station,
subarray,
manufacturer,
device_type,
equipment_name,
gateway_id,
index_address,
content,
point_name,
health_level,
disposotion_date,
kks,
warning_period,
status)
values
<foreach collection="list" separator="," item="item" index="index">
(#{item.ts, jdbcType=TIMESTAMP},
#{item.recDate, jdbcType=VARCHAR},
#{item.disposotionState, jdbcType=VARCHAR},
#{item.healthIndexSeq, jdbcType=VARCHAR},
#{item.healthIndex, jdbcType=VARCHAR},
#{item.analysisPointId, jdbcType=VARCHAR},
#{item.warningName, jdbcType=VARCHAR},
#{item.arae, jdbcType=VARCHAR},
#{item.station, jdbcType=VARCHAR},
#{item.subarray, jdbcType=VARCHAR},
#{item.manufacturer, jdbcType=VARCHAR},
#{item.deviceType, jdbcType=VARCHAR},
#{item.equipmentName, jdbcType=VARCHAR},
#{item.gatewayId, jdbcType=VARCHAR},
#{item.indexAddress, jdbcType=VARCHAR},
#{item.content, jdbcType=VARCHAR},
#{item.pointName, jdbcType=VARCHAR},
#{item.healthLevel, jdbcType=VARCHAR},
#{item.disposotionDate, jdbcType=VARCHAR},
#{item.kks, jdbcType=VARCHAR},
#{item.warningPeriod, jdbcType=VARCHAR},
#{item.status, jdbcType=VARCHAR})
</foreach>
</insert>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment