Commit 8fc74636 authored by caotao's avatar caotao

健康指数日月年问题修改。

parent b3d8ddb0
...@@ -26,6 +26,12 @@ public interface IdxBizFanWarningRuleSetMapper extends BaseMapper<IdxBizFanWarni ...@@ -26,6 +26,12 @@ public interface IdxBizFanWarningRuleSetMapper extends BaseMapper<IdxBizFanWarni
Integer getMaxWaringCycleOfMinutesByGatewayId(String gatewayId); Integer getMaxWaringCycleOfMinutesByGatewayId(String gatewayId);
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按小时'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按小时'")
Integer getMaxWaringCycleOfHour(); Integer getMaxWaringCycleOfHour();
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按小时' and GATEWAY_ID = #{gatewayId}")
Integer getMaxWaringCycleOfHourByGatewayId(String gatewayId);
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按天'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按天'")
Integer getMaxWaringCycleOfDay(); Integer getMaxWaringCycleOfDay();
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按天' and GATEWAY_ID = #{gatewayId}")
Integer getMaxWaringCycleOfDayByGatewayId(String gatewayId);
@Select("SELECT GATEWAY_ID FROM idx_biz_pv_warning_rule_set GROUP BY GATEWAY_ID")
List<String> getGatewayIds();
} }
...@@ -6,6 +6,8 @@ import io.swagger.models.auth.In; ...@@ -6,6 +6,8 @@ import io.swagger.models.auth.In;
import javafx.beans.binding.DoubleExpression; import javafx.beans.binding.DoubleExpression;
import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Select;
import java.util.List;
/** /**
* Mapper 接口 * Mapper 接口
* *
...@@ -20,6 +22,12 @@ public interface IdxBizPvWarningRuleSetMapper extends BaseMapper<IdxBizPvWarning ...@@ -20,6 +22,12 @@ public interface IdxBizPvWarningRuleSetMapper extends BaseMapper<IdxBizPvWarning
Integer getMaxWaringCycleOfMinutesByGatewayId(String gateWayId); Integer getMaxWaringCycleOfMinutesByGatewayId(String gateWayId);
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按小时'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按小时'")
Integer getMaxWaringCycleOfHour(); Integer getMaxWaringCycleOfHour();
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按小时' and GATEWAY_ID = #{gateWayId}")
Integer getMaxWaringCycleOfHourByGatewayId(String gateWayId);
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按天'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按天'")
Integer getMaxWaringCycleOfDay(); Integer getMaxWaringCycleOfDay();
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按天 and GATEWAY_ID = #{gateWayId}'")
Integer getMaxWaringCycleOfDayByGatewayId(String gateWayId);
@Select("SELECT GATEWAY_ID FROM idx_biz_pv_warning_rule_set GROUP BY GATEWAY_ID")
List<String> getGatewayIds();
} }
...@@ -899,6 +899,197 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -899,6 +899,197 @@ public class HealthStatusIndicatorServiceImpl {
fetchDataPv(tdPvWarningRecordList, stationMap); fetchDataPv(tdPvWarningRecordList, stationMap);
} }
// @Scheduled(cron = "0 0 0/1 * * ?")
// @Scheduled(cron = "0 0/5 * * * ?")
@Async("async")
public void healthWarningHourGFNew() {
if (!openHealth) {
return;
}
Date time = new Date();
// Date date = DateUtils.dateAddHours(time, -13);
// Calendar calendar = Calendar.getInstance();
// calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-5);
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
List<String> gateWayIds = idxBizPvWarningRuleSetMapper.getGatewayIds();
gateWayIds.forEach(gateWayId -> {
new Thread(() -> {
Integer maxWaringCycle = idxBizPvWarningRuleSetMapper.getMaxWaringCycleOfHourByGatewayId(gateWayId) + AnalyseOffset;
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
Date date = DateUtil.offsetHour(time, -8);
date = DateUtil.offsetHour(date, 0 - maxWaringCycle);
LambdaQueryWrapper<PvHealthIndexHour> wrapper = new LambdaQueryWrapper<>();
//wrapper.ne(PvHealthIndexHour::getHealthLevel,"安全");
wrapper.eq(PvHealthIndexHour::getGatewayId, gateWayId);
wrapper.eq(PvHealthIndexHour::getAnalysisObjType, "测点");
wrapper.ge(PvHealthIndexHour::getTs, date);
Date dateMax = DateUtil.offsetHour(time, -8);
wrapper.le(PvHealthIndexHour::getTs, dateMax);
wrapper.orderByAsc(PvHealthIndexHour::getTs);
List<PvHealthIndexHour> healthIndices = pvHealthIndexHourMapper.selectList(wrapper);
List<String> collect = healthIndices.stream().map(PvHealthIndexHour::getAnalysisObjSeq).collect(Collectors.toList());
if (null == healthIndices) {
return;
}
LambdaQueryWrapper<IdxBizPvWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizPvWarningRuleSet::getAnalysisType, WarningPeriodEnum.HOUR.getName());
queryWrapper.in(IdxBizPvWarningRuleSet::getAnalysisPointId, collect);
List<IdxBizPvWarningRuleSet> idxBizPvWarningRules = idxBizPvWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<PvHealthIndexHour>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(PvHealthIndexHour::getGatewayId, Collectors.groupingBy(PvHealthIndexHour::getIndexAddress)));
List<IdxBizPvWarningRecord> idxBizPvWarningRecordList = new ArrayList<>();
List<PvWarningRecord> tdPvWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
// for (String gateWayId : gateWayMaps.keySet()) {
LambdaQueryWrapper<StationBasic> basicLambdaQueryWrapper = new LambdaQueryWrapper<>();
basicLambdaQueryWrapper.eq(StationBasic::getFanGatewayId, gateWayId);
basicLambdaQueryWrapper.last("limit 1");
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<PvHealthIndexHour>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
List<PvHealthIndexHour> idxBizPvHealthIndices = healthDataMaps.get(address);
List<IdxBizPvWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizPvHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets)) {
continue;
}
Double healthValueWarn = 0.0;
Double healthValueRisk = 0.0;
Double healthValueNotice = 0.0;
int healthValueWarnCount = 0;
int healthValueRiskCount = 0;
int healthValueNoticeCount = 0;
int healthValueMinCount = 0;
int riskNum = 0;
int warnNum = 0;
int noticeNum = 0;
for (IdxBizPvWarningRuleSet e : idxBizPvWarningRuleSets) {
switch (e.getWarningName()) {
case "警告":
healthValueWarn = Double.parseDouble(e.getWarningIf().substring(2));
healthValueWarnCount = Integer.parseInt(e.getWarningCycle());
break;
case "危险":
healthValueRisk = Double.parseDouble(e.getWarningIf().substring(2));
healthValueRiskCount = Integer.parseInt(e.getWarningCycle());
break;
case "注意":
healthValueNotice = Double.parseDouble(e.getWarningIf().substring(2));
healthValueNoticeCount = Integer.parseInt(e.getWarningCycle());
break;
}
}
List<Double> healthIndex = idxBizPvHealthIndices.stream().map(PvHealthIndexHour::getHealthIndex).collect(Collectors.toList());
// Double finalHealthValueRisk = healthValueRisk;
// long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count();
// Double finalHealthValueWarn = healthValueWarn;
// long warnNum = healthIndex.subList(healthIndex.size()>healthValueWarnCount? (int) (healthIndex.size() - healthValueWarnCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueWarn).count();
// Double finalHealthValueNotice = healthValueNotice;
// long noticeNum = healthIndex.subList(healthIndex.size()>healthValueNoticeCount? (int) (healthIndex.size() - healthValueNoticeCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueNotice).count();
Double finalHealthValueRisk = healthValueRisk;
if (healthIndex.size() >= healthValueRiskCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueRiskCount, healthIndex.size());
riskNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueRisk).count();
}
Double finalHealthValueWarn = healthValueWarn;
if (healthIndex.size() >= healthValueWarnCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueWarnCount, healthIndex.size());
warnNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueWarn).count();
}
Double finalHealthValueNotice = healthValueNotice;
if (healthIndex.size() >= healthValueNoticeCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueNoticeCount, healthIndex.size());
noticeNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueNotice).count();
}
String level = "";
String content = "";
String num = "";
if (noticeNum == healthValueNoticeCount) {
level = "注意";
num = "" + healthValueNotice;
content = healthValueNoticeCount + "小时";
}
if (warnNum == healthValueWarnCount) {
level = "警告";
num = "" + healthValueWarn;
content = healthValueWarnCount + "小时";
}
if (riskNum == healthValueRiskCount) {
level = "危险";
num = "" + healthValueRisk;
content = healthValueRiskCount + "小时";
}
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<PvWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(PvWarningRecord::getAnalysisPointId, idxBizPvHealthIndices.get(0).getAnalysisObjSeq());
query.eq(PvWarningRecord::getStatus, "0");
query.isNull(PvWarningRecord::getDisposotionDate);
query.orderByDesc(PvWarningRecord::getRecDate);
List<PvWarningRecord> idxBizPvWarningRecords = pvWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizPvWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizPvWarningRecords.get(0).getWarningName()) ? 0 : 1;
Boolean timeFlag = format.equals(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getRecDate());
if (!level.equals("") && flag == 0 && timeFlag) {
IdxBizPvWarningRecord idxBizPvWarningRecord = new IdxBizPvWarningRecord();
idxBizPvWarningRecord.setKks(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getKks());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getArea());
idxBizPvWarningRecord.setStation(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getStation());
idxBizPvWarningRecord.setSubarray(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getSubarray());
idxBizPvWarningRecord.setGatewayId(gateWayId);
idxBizPvWarningRecord.setIndexAddress(address);
idxBizPvWarningRecord.setEquipmentName(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getEquipmentName());
idxBizPvWarningRecord.setAnalysisPointId(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getAnalysisObjSeq());
idxBizPvWarningRecord.setDisposotionState("待确认");
idxBizPvWarningRecord.setStatus("0");
idxBizPvWarningRecord.setWarningName(level);
idxBizPvWarningRecord.setCONTENT(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num);
idxBizPvWarningRecord.setRecDate(time);
idxBizPvWarningRecord.setWarningPeriod(WarningPeriodEnum.HOUR.getName());
idxBizPvWarningRecord.setManufacturer(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getManufacturer());
idxBizPvWarningRecord.setPointName(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getPointName());
idxBizPvWarningRecord.setHealthIndexSeq(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getHealthIndex().toString());
idxBizPvWarningRecord.setHealthLevel(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getHealthLevel());
idxBizPvWarningRecordList.add(idxBizPvWarningRecord);
//idxBizPvWarningRecordMapper.insert(idxBizPvWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
PvWarningRecord pvWarningRecord = new PvWarningRecord();
BeanUtils.copyProperties(idxBizPvWarningRecord, pvWarningRecord, "disposotionDate", "recDate", "CONTENT");
pvWarningRecord.setContent(idxBizPvWarningRecord.getCONTENT());
pvWarningRecord.setRecDate(format);
pvWarningRecord.setTs(timestamp);
pvWarningRecord.setHealthIndex(String.format(CommonConstans.Onedecimalplaces, idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getHealthIndex()));
pvWarningRecord.setOrgCode(idxBizPvHealthIndices.get(0).getOrgCode());
tdPvWarningRecordList.add(pvWarningRecord);
}
}
// }
// idxBizPvWarningRecordService.saveBatch(idxBizPvWarningRecordList);
if (CollUtil.isNotEmpty(tdPvWarningRecordList)) {
// tdengine插入
log.info("==================光伏按小时产生预警数据成功", JSON.toJSONString(tdPvWarningRecordList));
pvWaringRecordMapper.saveBatchWarningRecords(tdPvWarningRecordList);
}
// 触发风险模型生成预警处置模块的预警记录
fetchDataPv(tdPvWarningRecordList, stationMap);
}).start();
});
}
/*** /***
* 每三天取一次最大粒度内的指数异常数据 * 每三天取一次最大粒度内的指数异常数据
* 判断三天内数据是否符合预警规则 符合则报警并在redis中缓存 同一级别的预警记录下次不生成 * 判断三天内数据是否符合预警规则 符合则报警并在redis中缓存 同一级别的预警记录下次不生成
...@@ -1090,56 +1281,651 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1090,56 +1281,651 @@ public class HealthStatusIndicatorServiceImpl {
fetchDataPv(tdPvWarningRecordList, stationMap); fetchDataPv(tdPvWarningRecordList, stationMap);
} }
// @Scheduled(cron = "0 0 */1 * * ?") // @Scheduled(cron = "0 0 0 0/1 * ? ")
@Async("async") @Async("async")
//@PostConstruct public void healthWarningDayGFNew() {
public void healthWarningMinute(Date time) {
if (!openHealth) { if (!openHealth) {
return; return;
} }
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // Calendar calendar = Calendar.getInstance();
// Date time=null; Date time = new Date();
// try { //三天 + 8小时
// time = simpleDateFormat.parse("2024-03-14 13:50:00"); // Date date = DateUtils.dateAddHours(time, -80);
// } catch (ParseException e1) { // calendar.set(Calendar.DAY_OF_MONTH,calendar.get(Calendar.DAY_OF_MONTH)-3);
// // TODO Auto-generated catch block // SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
// e1.printStackTrace(); List<String> gateWayIds = idxBizPvWarningRuleSetMapper.getGatewayIds();
// } gateWayIds.forEach(gateWayId -> {
// Calendar calendar = Calendar.getInstance(); new Thread(() -> {
log.info("风机---------------------预警时间----" + time); Integer maxWaringCycle = idxBizPvWarningRuleSetMapper.getMaxWaringCycleOfDayByGatewayId(gateWayId) + AnalyseOffset;
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00"); String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
// calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-1);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
// 用ts字段查询时需要减8小时
// Date date = DateUtils.dateAddHours(time, -8);
Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfMinutes() + AnalyseOffset;
Date date = DateUtil.offsetHour(time, -8); Date date = DateUtil.offsetHour(time, -8);
date = DateUtil.offsetMinute(date, 0 - (maxWaringCycle * 10)); date = DateUtil.offsetDay(date, 0 - maxWaringCycle);
LambdaQueryWrapper<FanHealthIndexMoment> wrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<PvHealthIndexDay> wrapper = new LambdaQueryWrapper<>();
//wrapper.ne(FanHealthIndexMoment::getHealthLevel,"安全"); //wrapper.ne(PvHealthIndexDay::getHealthLevel, "安全");
wrapper.eq(FanHealthIndexMoment::getAnalysisObjType, "测点"); wrapper.eq(PvHealthIndexDay::getAnalysisObjType, "测点");
wrapper.ge(FanHealthIndexMoment::getTs, date); wrapper.eq(PvHealthIndexDay::getGatewayId, gateWayId);
wrapper.ge(PvHealthIndexDay::getTs, date);
Date dateMax = DateUtil.offsetHour(time, -8);
wrapper.le(PvHealthIndexDay::getTs, dateMax);
wrapper.orderByAsc(PvHealthIndexDay::getTs);
List<PvHealthIndexDay> healthIndices = pvHealthIndexDayMapper.selectList(wrapper);
if (null == healthIndices) {
return;
}
List<String> collect = healthIndices.stream().map(PvHealthIndexDay::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizPvWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizPvWarningRuleSet::getAnalysisType, WarningPeriodEnum.DAY.getName());
queryWrapper.in(IdxBizPvWarningRuleSet::getAnalysisPointId, collect);
List<IdxBizPvWarningRuleSet> idxBizPvWarningRules = idxBizPvWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<PvHealthIndexDay>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(PvHealthIndexDay::getGatewayId, Collectors.groupingBy(PvHealthIndexDay::getIndexAddress)));
List<IdxBizPvWarningRecord> idxBizPvWarningRecordList = new ArrayList<>();
List<PvWarningRecord> tdPvWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
// for (String gateWayId : gateWayMaps.keySet()) {
LambdaQueryWrapper<StationBasic> basicLambdaQueryWrapper = new LambdaQueryWrapper<>();
basicLambdaQueryWrapper.eq(StationBasic::getFanGatewayId, gateWayId);
basicLambdaQueryWrapper.last("limit 1");
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<PvHealthIndexDay>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
List<PvHealthIndexDay> idxBizPvHealthIndices = healthDataMaps.get(address);
List<IdxBizPvWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizPvHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets)) {
continue;
}
Double healthValueWarn = 0.0;
Double healthValueRisk = 0.0;
Double healthValueNotice = 0.0;
int healthValueWarnCount = 0;
int healthValueRiskCount = 0;
int healthValueNoticeCount = 0;
int healthValueMinCount = 0;
int riskNum = 0;
int warnNum = 0;
int noticeNum = 0;
for (IdxBizPvWarningRuleSet e : idxBizPvWarningRuleSets) {
switch (e.getWarningName()) {
case "警告":
healthValueWarn = Double.parseDouble(e.getWarningIf().substring(2));
healthValueWarnCount = Integer.parseInt(e.getWarningCycle());
break;
case "危险":
healthValueRisk = Double.parseDouble(e.getWarningIf().substring(2));
healthValueRiskCount = Integer.parseInt(e.getWarningCycle());
break;
case "注意":
healthValueNotice = Double.parseDouble(e.getWarningIf().substring(2));
healthValueNoticeCount = Integer.parseInt(e.getWarningCycle());
break;
}
}
List<Double> healthIndex = idxBizPvHealthIndices.stream().map(PvHealthIndexDay::getHealthIndex).collect(Collectors.toList());
// Double finalHealthValueRisk = healthValueRisk;
// long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count();
// Double finalHealthValueWarn = healthValueWarn;
// long warnNum = healthIndex.subList(healthIndex.size()>healthValueWarnCount? (int) (healthIndex.size() - healthValueWarnCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueWarn).count();
// Double finalHealthValueNotice = healthValueNotice;
// long noticeNum = healthIndex.subList(healthIndex.size()>healthValueNoticeCount? (int) (healthIndex.size() - healthValueNoticeCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueNotice).count();
Double finalHealthValueRisk = healthValueRisk;
if (healthIndex.size() >= healthValueRiskCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueRiskCount, healthIndex.size());
riskNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueRisk).count();
}
Double finalHealthValueWarn = healthValueWarn;
if (healthIndex.size() >= healthValueWarnCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueWarnCount, healthIndex.size());
warnNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueWarn).count();
}
Double finalHealthValueNotice = healthValueNotice;
if (healthIndex.size() >= healthValueNoticeCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueNoticeCount, healthIndex.size());
noticeNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueNotice).count();
}
String level = "";
String content = "";
String num = "";
if (noticeNum == healthValueNoticeCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_notice_day","notice");
level = "注意";
num = "" + healthValueNotice;
content = healthValueNoticeCount + "天";
}
if (warnNum == healthValueWarnCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_warn_day","warn");
level = "警告";
num = "" + healthValueWarn;
content = healthValueWarnCount + "天";
}
if (riskNum == healthValueRiskCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_risk_day","risk");
level = "危险";
num = "" + healthValueRisk;
content = healthValueRiskCount + "天";
}
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<PvWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(PvWarningRecord::getAnalysisPointId, idxBizPvHealthIndices.get(0).getAnalysisObjSeq());
query.eq(PvWarningRecord::getStatus, "0");
query.isNull(PvWarningRecord::getDisposotionDate);
query.orderByDesc(PvWarningRecord::getTs);
List<PvWarningRecord> idxBizPvWarningRecords = pvWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizPvWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizPvWarningRecords.get(0).getWarningName()) ? 0 : 1;
Boolean timeFlag = format.equals(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getRecDate());
if (!level.equals("") && flag == 0 && timeFlag) {
IdxBizPvWarningRecord idxBizPvWarningRecord = new IdxBizPvWarningRecord();
idxBizPvWarningRecord.setKks(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getKks());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getArea());
idxBizPvWarningRecord.setStation(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getStation());
idxBizPvWarningRecord.setSubarray(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getSubarray());
idxBizPvWarningRecord.setGatewayId(gateWayId);
idxBizPvWarningRecord.setIndexAddress(address);
idxBizPvWarningRecord.setEquipmentName(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getEquipmentName());
idxBizPvWarningRecord.setAnalysisPointId(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getAnalysisObjSeq());
idxBizPvWarningRecord.setDisposotionState("待确认");
idxBizPvWarningRecord.setStatus("0");
idxBizPvWarningRecord.setWarningName(level);
idxBizPvWarningRecord.setCONTENT(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num);
idxBizPvWarningRecord.setRecDate(time);
idxBizPvWarningRecord.setWarningPeriod(WarningPeriodEnum.DAY.getName());
idxBizPvWarningRecord.setManufacturer(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getManufacturer());
idxBizPvWarningRecord.setPointName(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getPointName());
idxBizPvWarningRecord.setHealthIndexSeq(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getHealthIndex().toString());
idxBizPvWarningRecord.setHealthLevel(idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getHealthLevel());
idxBizPvWarningRecordList.add(idxBizPvWarningRecord);
//idxBizPvWarningRecordMapper.insert(idxBizPvWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
PvWarningRecord pvWarningRecord = new PvWarningRecord();
BeanUtils.copyProperties(idxBizPvWarningRecord, pvWarningRecord, "disposotionDate", "recDate", "CONTENT");
pvWarningRecord.setContent(idxBizPvWarningRecord.getCONTENT());
pvWarningRecord.setRecDate(format);
pvWarningRecord.setTs(timestamp);
pvWarningRecord.setHealthIndex(String.format(CommonConstans.Onedecimalplaces, idxBizPvHealthIndices.get(idxBizPvHealthIndices.size() - 1).getHealthIndex()));
pvWarningRecord.setOrgCode(idxBizPvHealthIndices.get(0).getOrgCode());
tdPvWarningRecordList.add(pvWarningRecord);
}
}
// }
// idxBizPvWarningRecordService.saveBatch(idxBizPvWarningRecordList);
if (CollUtil.isNotEmpty(tdPvWarningRecordList)) {
log.info("==================光伏按天产生预警数据成功", JSON.toJSONString(tdPvWarningRecordList));
// tdengine插入
pvWaringRecordMapper.saveBatchWarningRecords(tdPvWarningRecordList);
}
// 触发风险模型生成预警处置模块的预警记录
fetchDataPv(tdPvWarningRecordList, stationMap);
}).start();
});
}
// @Scheduled(cron = "0 0 */1 * * ?")
@Async("async")
//@PostConstruct
public void healthWarningMinute(Date time) {
if (!openHealth) {
return;
}
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Date time=null;
// try {
// time = simpleDateFormat.parse("2024-03-14 13:50:00");
// } catch (ParseException e1) {
// // TODO Auto-generated catch block
// e1.printStackTrace();
// }
// Calendar calendar = Calendar.getInstance();
log.info("风机---------------------预警时间----" + time);
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
// calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-1);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
// 用ts字段查询时需要减8小时
// Date date = DateUtils.dateAddHours(time, -8);
Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfMinutes() + AnalyseOffset;
Date date = DateUtil.offsetHour(time, -8);
date = DateUtil.offsetMinute(date, 0 - (maxWaringCycle * 10));
LambdaQueryWrapper<FanHealthIndexMoment> wrapper = new LambdaQueryWrapper<>();
//wrapper.ne(FanHealthIndexMoment::getHealthLevel,"安全");
wrapper.eq(FanHealthIndexMoment::getAnalysisObjType, "测点");
wrapper.ge(FanHealthIndexMoment::getTs, date);
Date dateMax = DateUtil.offsetHour(time, -8);
wrapper.le(FanHealthIndexMoment::getTs, dateMax);
// wrapper.eq(FanHealthIndexMoment::getIndexAddress, "18547");
wrapper.orderByAsc(FanHealthIndexMoment::getTs);
//查询最大连续时间规则的测点对象
List<FanHealthIndexMoment> healthIndices = fanHealthIndexMomentMapper.selectList(wrapper);
if (ObjectUtils.isEmpty(healthIndices)) {
return;
}
List<String> collect = healthIndices.stream().map(FanHealthIndexMoment::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType, WarningPeriodEnum.MINUTES.getName());
queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId, collect);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<FanHealthIndexMoment>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexMoment::getGatewayId, Collectors.groupingBy(FanHealthIndexMoment::getIndexAddress)));
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) {
LambdaQueryWrapper<StationBasic> basicLambdaQueryWrapper = new LambdaQueryWrapper<>();
basicLambdaQueryWrapper.eq(StationBasic::getFanGatewayId, gateWayId);
basicLambdaQueryWrapper.last("limit 1");
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic);
Map<String, List<FanHealthIndexMoment>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) {
//获取指定测点的健康指数列表
List<FanHealthIndexMoment> idxBizFanHealthIndices = healthDataMaps.get(address);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets)) {
continue;
}
Double healthValueWarn = 0.0;
Double healthValueRisk = 0.0;
Double healthValueNotice = 0.0;
int healthValueWarnCount = 0;
int healthValueRiskCount = 0;
int healthValueNoticeCount = 0;
int healthValueMinCount = 0;
int riskNum = 0;
int warnNum = 0;
int noticeNum = 0;
for (IdxBizFanWarningRuleSet e : idxBizPvWarningRuleSets) {
switch (e.getWarningName()) {
case "警告":
healthValueWarn = Double.parseDouble(e.getWarningIf().substring(2));
healthValueWarnCount = Integer.parseInt(e.getWarningCycle());
break;
case "危险":
healthValueRisk = Double.parseDouble(e.getWarningIf().substring(2));
healthValueRiskCount = Integer.parseInt(e.getWarningCycle());
break;
case "注意":
healthValueNotice = Double.parseDouble(e.getWarningIf().substring(2));
healthValueNoticeCount = Integer.parseInt(e.getWarningCycle());
break;
}
}
//获取指定测点的健康指数列表
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexMoment::getHealthIndex).collect(Collectors.toList());
// Double finalHealthValueRisk = healthValueRisk;
// long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count();
// Double finalHealthValueWarn = healthValueWarn;
// long warnNum = healthIndex.subList(healthIndex.size()>healthValueWarnCount? (int) (healthIndex.size() - healthValueWarnCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueWarn).count();
// Double finalHealthValueNotice = healthValueNotice;
// long noticeNum = healthIndex.subList(healthIndex.size()>healthValueNoticeCount? (int) (healthIndex.size() - healthValueNoticeCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueNotice).count();
Double finalHealthValueRisk = healthValueRisk;
if (healthIndex.size() >= healthValueRiskCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueRiskCount, healthIndex.size());
riskNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueRisk).count();
}
Double finalHealthValueWarn = healthValueWarn;
if (healthIndex.size() >= healthValueWarnCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueWarnCount, healthIndex.size());
warnNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueWarn).count();
}
Double finalHealthValueNotice = healthValueNotice;
if (healthIndex.size() >= healthValueNoticeCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueNoticeCount, healthIndex.size());
noticeNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueNotice).count();
}
String level = "";
String content = "";
String num = "";
if (noticeNum == healthValueNoticeCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_notice_day","notice");
level = "注意";
num = "" + healthValueNotice;
content = healthValueNoticeCount * 10 + "分钟";
}
if (warnNum == healthValueWarnCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_warn_day","warn");
level = "警告";
num = "" + healthValueWarn;
content = healthValueWarnCount * 10 + "分钟";
}
if (riskNum == healthValueRiskCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_risk_day","risk");
level = "危险";
num = "" + healthValueRisk;
content = healthValueRiskCount * 10 + "分钟";
}
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(FanWarningRecord::getAnalysisPointId, idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(FanWarningRecord::getStatus, "0");
query.isNull(FanWarningRecord::getDisposotionDate);
query.orderByDesc(FanWarningRecord::getTs);
List<FanWarningRecord> idxBizFanWarningRecords = fanWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizFanWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizFanWarningRecords.get(0).getWarningName()) ? 0 : 1;
Boolean timeFlag = format.equals(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getRecDate());
if (!level.equals("") && flag == 0 && timeFlag) {
IdxBizFanWarningRecord idxBizFanWarningRecord = new IdxBizFanWarningRecord();
idxBizFanWarningRecord.setKks(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getKks());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getArea());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getStation());
idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getSubSystem());
idxBizFanWarningRecord.setGatewayId(gateWayId);
idxBizFanWarningRecord.setIndexAddress(address);
idxBizFanWarningRecord.setEquipmentName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getEquipmentName());
idxBizFanWarningRecord.setAnalysisPointId(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getAnalysisObjSeq());
idxBizFanWarningRecord.setDisposotionState("待确认");
idxBizFanWarningRecord.setStatus("0");
idxBizFanWarningRecord.setWarningName(level);
idxBizFanWarningRecord.setCONTENT(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num);
idxBizFanWarningRecord.setRecDate(time);
idxBizFanWarningRecord.setWarningPeriod(WarningPeriodEnum.MINUTES.getName());
idxBizFanWarningRecord.setNumber(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getNumber());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName());
idxBizFanWarningRecord.setHealthIndexSeq(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex().toString());
idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthLevel());
idxBizFanWarningRecordList.add(idxBizFanWarningRecord);
//idxBizFanWarningRecordMapper.insert(idxBizFanWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
FanWarningRecord fanWarningRecord = new FanWarningRecord();
BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT");
fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT());
fanWarningRecord.setRecDate(format);
fanWarningRecord.setTs(timestamp);
fanWarningRecord.setHealthIndex(String.format(CommonConstans.Onedecimalplaces, idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex()));
fanWarningRecord.setOrgCode(idxBizFanHealthIndices.get(0).getOrgCode());
tdFanWarningRecordList.add(fanWarningRecord);
}
}
}
// idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
// tdengine插入
// fanWarningRecordService.saveBatch(tdFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList);
}
// 触发风险模型生成预警处置模块的预警记录
fetchDataFan(tdFanWarningRecordList, stationMap);
}
@Async("async")
//@PostConstruct
public void healthWarningMinute(Date time, String gatewayId) {
if (!openHealth) {
return;
}
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Date time=null;
// try {
// time = simpleDateFormat.parse("2024-03-14 13:50:00");
// } catch (ParseException e1) {
// // TODO Auto-generated catch block
// e1.printStackTrace();
// }
// Calendar calendar = Calendar.getInstance();
log.info("风机---------------------预警时间----" + time);
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
// calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-1);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
// 用ts字段查询时需要减8小时
// Date date = DateUtils.dateAddHours(time, -8);
Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfMinutesByGatewayId(gatewayId) + AnalyseOffset;
Date date = DateUtil.offsetHour(time, -8);
date = DateUtil.offsetMinute(date, 0 - (maxWaringCycle * 10));
LambdaQueryWrapper<FanHealthIndexMoment> wrapper = new LambdaQueryWrapper<>();
//wrapper.ne(FanHealthIndexMoment::getHealthLevel,"安全");
wrapper.eq(FanHealthIndexMoment::getAnalysisObjType, "测点");
wrapper.ge(FanHealthIndexMoment::getTs, date);
wrapper.eq(FanHealthIndexMoment::getGatewayId, gatewayId);
Date dateMax = DateUtil.offsetHour(time, -8);
wrapper.le(FanHealthIndexMoment::getTs, dateMax);
// wrapper.eq(FanHealthIndexMoment::getIndexAddress, "18547");
wrapper.orderByAsc(FanHealthIndexMoment::getTs);
//查询最大连续时间规则的测点对象
List<FanHealthIndexMoment> healthIndices = fanHealthIndexMomentMapper.selectList(wrapper);
if (ObjectUtils.isEmpty(healthIndices)) {
return;
}
List<String> collect = healthIndices.stream().map(FanHealthIndexMoment::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType, WarningPeriodEnum.MINUTES.getName());
queryWrapper.eq(IdxBizFanWarningRuleSet::getGatewayId, gatewayId);
queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId, collect);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<FanHealthIndexMoment>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexMoment::getGatewayId, Collectors.groupingBy(FanHealthIndexMoment::getIndexAddress)));
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>();
// for (String gateWayId : gateWayMaps.keySet()) {
LambdaQueryWrapper<StationBasic> basicLambdaQueryWrapper = new LambdaQueryWrapper<>();
basicLambdaQueryWrapper.eq(StationBasic::getFanGatewayId, gatewayId);
basicLambdaQueryWrapper.last("limit 1");
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gatewayId, stationBasic);
Map<String, List<FanHealthIndexMoment>> healthDataMaps = gateWayMaps.get(gatewayId);
for (String address : healthDataMaps.keySet()) {
//获取指定测点的健康指数列表
List<FanHealthIndexMoment> idxBizFanHealthIndices = healthDataMaps.get(address);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets)) {
continue;
}
Double healthValueWarn = 0.0;
Double healthValueRisk = 0.0;
Double healthValueNotice = 0.0;
int healthValueWarnCount = 0;
int healthValueRiskCount = 0;
int healthValueNoticeCount = 0;
int healthValueMinCount = 0;
int riskNum = 0;
int warnNum = 0;
int noticeNum = 0;
for (IdxBizFanWarningRuleSet e : idxBizPvWarningRuleSets) {
switch (e.getWarningName()) {
case "警告":
healthValueWarn = Double.parseDouble(e.getWarningIf().substring(2));
healthValueWarnCount = Integer.parseInt(e.getWarningCycle());
break;
case "危险":
healthValueRisk = Double.parseDouble(e.getWarningIf().substring(2));
healthValueRiskCount = Integer.parseInt(e.getWarningCycle());
break;
case "注意":
healthValueNotice = Double.parseDouble(e.getWarningIf().substring(2));
healthValueNoticeCount = Integer.parseInt(e.getWarningCycle());
break;
}
}
//获取指定测点的健康指数列表
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexMoment::getHealthIndex).collect(Collectors.toList());
// Double finalHealthValueRisk = healthValueRisk;
// long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count();
// Double finalHealthValueWarn = healthValueWarn;
// long warnNum = healthIndex.subList(healthIndex.size()>healthValueWarnCount? (int) (healthIndex.size() - healthValueWarnCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueWarn).count();
// Double finalHealthValueNotice = healthValueNotice;
// long noticeNum = healthIndex.subList(healthIndex.size()>healthValueNoticeCount? (int) (healthIndex.size() - healthValueNoticeCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueNotice).count();
Double finalHealthValueRisk = healthValueRisk;
if (healthIndex.size() >= healthValueRiskCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueRiskCount, healthIndex.size());
riskNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueRisk).count();
}
Double finalHealthValueWarn = healthValueWarn;
if (healthIndex.size() >= healthValueWarnCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueWarnCount, healthIndex.size());
warnNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueWarn).count();
}
Double finalHealthValueNotice = healthValueNotice;
if (healthIndex.size() >= healthValueNoticeCount) {
List<Double> healthIndexList = healthIndex.subList(healthIndex.size() - healthValueNoticeCount, healthIndex.size());
noticeNum = (int) healthIndexList.stream().filter(e -> e <= finalHealthValueNotice).count();
}
String level = "";
String content = "";
String num = "";
if (noticeNum == healthValueNoticeCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_notice_day","notice");
level = "注意";
num = "" + healthValueNotice;
content = healthValueNoticeCount * 10 + "分钟";
}
if (warnNum == healthValueWarnCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_warn_day","warn");
level = "警告";
num = "" + healthValueWarn;
content = healthValueWarnCount * 10 + "分钟";
}
if (riskNum == healthValueRiskCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_risk_day","risk");
level = "危险";
num = "" + healthValueRisk;
content = healthValueRiskCount * 10 + "分钟";
}
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(FanWarningRecord::getAnalysisPointId, idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(FanWarningRecord::getStatus, "0");
query.isNull(FanWarningRecord::getDisposotionDate);
query.orderByDesc(FanWarningRecord::getTs);
List<FanWarningRecord> idxBizFanWarningRecords = fanWaringRecordMapper.selectList(query);
int flag = ObjectUtils.isEmpty(idxBizFanWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizFanWarningRecords.get(0).getWarningName()) ? 0 : 1;
Boolean timeFlag = format.equals(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getRecDate());
if (!level.equals("") && flag == 0 && timeFlag) {
IdxBizFanWarningRecord idxBizFanWarningRecord = new IdxBizFanWarningRecord();
idxBizFanWarningRecord.setKks(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getKks());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getArea());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getStation());
idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getSubSystem());
idxBizFanWarningRecord.setGatewayId(gatewayId);
idxBizFanWarningRecord.setIndexAddress(address);
idxBizFanWarningRecord.setEquipmentName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getEquipmentName());
idxBizFanWarningRecord.setAnalysisPointId(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getAnalysisObjSeq());
idxBizFanWarningRecord.setDisposotionState("待确认");
idxBizFanWarningRecord.setStatus("0");
idxBizFanWarningRecord.setWarningName(level);
idxBizFanWarningRecord.setCONTENT(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num);
idxBizFanWarningRecord.setRecDate(time);
idxBizFanWarningRecord.setWarningPeriod(WarningPeriodEnum.MINUTES.getName());
idxBizFanWarningRecord.setNumber(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getNumber());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName());
idxBizFanWarningRecord.setHealthIndexSeq(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex().toString());
idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthLevel());
idxBizFanWarningRecordList.add(idxBizFanWarningRecord);
//idxBizFanWarningRecordMapper.insert(idxBizFanWarningRecord);
long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
FanWarningRecord fanWarningRecord = new FanWarningRecord();
BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT");
fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT());
fanWarningRecord.setRecDate(format);
fanWarningRecord.setTs(timestamp);
fanWarningRecord.setHealthIndex(String.format(CommonConstans.Onedecimalplaces, idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex()));
fanWarningRecord.setOrgCode(idxBizFanHealthIndices.get(0).getOrgCode());
tdFanWarningRecordList.add(fanWarningRecord);
}
}
// }
// idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
// tdengine插入
// fanWarningRecordService.saveBatch(tdFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList);
}
// 触发风险模型生成预警处置模块的预警记录
fetchDataFan(tdFanWarningRecordList, stationMap);
}
/***
* 每五小时获取一次最大粒度内的指数异常数据
* 判断五小时内数据是否符合预警规则 符合则报警并在redis中缓存 同一级别的预警记录下次不生成
*
*/
@Scheduled(cron = "0 0 0/1 * * ?")
@Async("async")
public void healthWarningHour() {
if (!openHealth) {
return;
}
Date time = new Date();
String format = DateUtil.format(time, "yyyy-MM-dd HH:00:00");
// Date date = DateUtils.dateAddHours(time, -13);
// Calendar calendar = Calendar.getInstance();
// calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-5);
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfHour();
Date date = DateUtil.offsetHour(time, -8);
date = DateUtil.offsetHour(date, 0 - (maxWaringCycle));
LambdaQueryWrapper<FanHealthIndexHour> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(FanHealthIndexHour::getAnalysisObjType, "测点");
//wrapper.ne(FanHealthIndexHour::getHealthLevel, "安全");
wrapper.ge(FanHealthIndexHour::getTs, date);
Date dateMax = DateUtil.offsetHour(time, -8); Date dateMax = DateUtil.offsetHour(time, -8);
wrapper.le(FanHealthIndexMoment::getTs, dateMax); wrapper.le(FanHealthIndexHour::getTs, dateMax);
// wrapper.eq(FanHealthIndexMoment::getIndexAddress, "18547"); wrapper.orderByAsc(FanHealthIndexHour::getTs);
wrapper.orderByAsc(FanHealthIndexMoment::getTs); List<FanHealthIndexHour> healthIndices = fanHealthIndexHourMapper.selectList(wrapper);
//查询最大连续时间规则的测点对象
List<FanHealthIndexMoment> healthIndices = fanHealthIndexMomentMapper.selectList(wrapper);
if (ObjectUtils.isEmpty(healthIndices)) { if (ObjectUtils.isEmpty(healthIndices)) {
return; return;
} }
List<String> collect = healthIndices.stream().map(FanHealthIndexMoment::getAnalysisObjSeq).collect(Collectors.toList()); List<String> collect = healthIndices.stream().map(FanHealthIndexHour::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType, WarningPeriodEnum.MINUTES.getName()); queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType, WarningPeriodEnum.HOUR.getName());
queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId, collect); queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId, collect);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper); List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<FanHealthIndexMoment>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexMoment::getGatewayId, Collectors.groupingBy(FanHealthIndexMoment::getIndexAddress))); Map<String, Map<String, List<FanHealthIndexHour>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexHour::getGatewayId, Collectors.groupingBy(FanHealthIndexHour::getIndexAddress)));
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>(); List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>(); List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>(); HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) { for (String gateWayId : gateWayMaps.keySet()) {
...@@ -1148,21 +1934,23 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1148,21 +1934,23 @@ public class HealthStatusIndicatorServiceImpl {
basicLambdaQueryWrapper.last("limit 1"); basicLambdaQueryWrapper.last("limit 1");
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper); StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic); stationMap.put(gateWayId, stationBasic);
Map<String, List<FanHealthIndexMoment>> healthDataMaps = gateWayMaps.get(gateWayId);
Map<String, List<FanHealthIndexHour>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) { for (String address : healthDataMaps.keySet()) {
//获取指定测点的健康指数列表 List<FanHealthIndexHour> idxBizFanHealthIndices = healthDataMaps.get(address);
List<FanHealthIndexMoment> idxBizFanHealthIndices = healthDataMaps.get(address);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList()); List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets)) { if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets)) {
continue; continue;
} }
Double healthValueWarn = 0.0; Double healthValueWarn = 0.0;
Double healthValueRisk = 0.0; Double healthValueRisk = 0.0;
Double healthValueNotice = 0.0; Double healthValueNotice = 0.0;
// long healthValueNoticeCount = 0;
// long healthValueRiskCount = 0;
// long healthValueWarnCount = 0;
int healthValueWarnCount = 0; int healthValueWarnCount = 0;
int healthValueRiskCount = 0; int healthValueRiskCount = 0;
int healthValueNoticeCount = 0; int healthValueNoticeCount = 0;
...@@ -1190,9 +1978,7 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1190,9 +1978,7 @@ public class HealthStatusIndicatorServiceImpl {
} }
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexHour::getHealthIndex).collect(Collectors.toList());
//获取指定测点的健康指数列表
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexMoment::getHealthIndex).collect(Collectors.toList());
// Double finalHealthValueRisk = healthValueRisk; // Double finalHealthValueRisk = healthValueRisk;
// long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count(); // long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count();
// Double finalHealthValueWarn = healthValueWarn; // Double finalHealthValueWarn = healthValueWarn;
...@@ -1218,24 +2004,21 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1218,24 +2004,21 @@ public class HealthStatusIndicatorServiceImpl {
String content = ""; String content = "";
String num = ""; String num = "";
if (noticeNum == healthValueNoticeCount) { if (noticeNum == healthValueNoticeCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_notice_day","notice");
level = "注意"; level = "注意";
num = "" + healthValueNotice; num = "" + healthValueNotice;
content = healthValueNoticeCount * 10 + "分钟"; content = healthValueNoticeCount + "小时";
} }
if (warnNum == healthValueWarnCount) { if (warnNum == healthValueWarnCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_warn_day","warn");
level = "警告"; level = "警告";
num = "" + healthValueWarn; num = "" + healthValueWarn;
content = healthValueWarnCount * 10 + "分钟"; content = healthValueWarnCount + "小时";
} }
if (riskNum == healthValueRiskCount) { if (riskNum == healthValueRiskCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_risk_day","risk");
level = "危险"; level = "危险";
num = "" + healthValueRisk; num = "" + healthValueRisk;
content = healthValueRiskCount * 10 + "分钟"; content = healthValueRiskCount + "小时";
} }
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>(); LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(FanWarningRecord::getAnalysisPointId, idxBizFanHealthIndices.get(0).getAnalysisObjSeq()); query.eq(FanWarningRecord::getAnalysisPointId, idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(FanWarningRecord::getStatus, "0"); query.eq(FanWarningRecord::getStatus, "0");
...@@ -1260,17 +2043,17 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1260,17 +2043,17 @@ public class HealthStatusIndicatorServiceImpl {
idxBizFanWarningRecord.setWarningName(level); idxBizFanWarningRecord.setWarningName(level);
idxBizFanWarningRecord.setCONTENT(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num); idxBizFanWarningRecord.setCONTENT(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num);
idxBizFanWarningRecord.setRecDate(time); idxBizFanWarningRecord.setRecDate(time);
idxBizFanWarningRecord.setWarningPeriod(WarningPeriodEnum.MINUTES.getName()); idxBizFanWarningRecord.setWarningPeriod(WarningPeriodEnum.HOUR.getName());
idxBizFanWarningRecord.setNumber(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getNumber()); idxBizFanWarningRecord.setNumber(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getNumber());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName());
idxBizFanWarningRecord.setHealthIndexSeq(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex().toString()); idxBizFanWarningRecord.setHealthIndexSeq(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex().toString());
idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthLevel()); idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthLevel());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName());
idxBizFanWarningRecordList.add(idxBizFanWarningRecord); idxBizFanWarningRecordList.add(idxBizFanWarningRecord);
//idxBizFanWarningRecordMapper.insert(idxBizFanWarningRecord);
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime(); long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000; long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
FanWarningRecord fanWarningRecord = new FanWarningRecord(); FanWarningRecord fanWarningRecord = new FanWarningRecord();
BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT"); BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT");
fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT()); fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT());
...@@ -1279,96 +2062,84 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1279,96 +2062,84 @@ public class HealthStatusIndicatorServiceImpl {
fanWarningRecord.setHealthIndex(String.format(CommonConstans.Onedecimalplaces, idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex())); fanWarningRecord.setHealthIndex(String.format(CommonConstans.Onedecimalplaces, idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex()));
fanWarningRecord.setOrgCode(idxBizFanHealthIndices.get(0).getOrgCode()); fanWarningRecord.setOrgCode(idxBizFanHealthIndices.get(0).getOrgCode());
tdFanWarningRecordList.add(fanWarningRecord); tdFanWarningRecordList.add(fanWarningRecord);
} }
} }
} }
// idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList); // idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
// tdengine插入 // tdengine插入
// fanWarningRecordService.saveBatch(tdFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList); fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList);
} }
// 触发风险模型生成预警处置模块的预警记录 // 触发风险模型生成预警处置模块的预警记录
fetchDataFan(tdFanWarningRecordList, stationMap); fetchDataFan(tdFanWarningRecordList, stationMap);
} }
// @Scheduled(cron = "0 0 0/1 * * ?")
@Async("async") @Async("async")
//@PostConstruct public void healthWarningHourNew() {
public void healthWarningMinute(Date time, String gatewayId) {
if (!openHealth) { if (!openHealth) {
return; return;
} }
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date time = new Date();
// Date time=null; String format = DateUtil.format(time, "yyyy-MM-dd HH:00:00");
// try { // Date date = DateUtils.dateAddHours(time, -13);
// time = simpleDateFormat.parse("2024-03-14 13:50:00"); // Calendar calendar = Calendar.getInstance();
// } catch (ParseException e1) { // calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-5);
// // TODO Auto-generated catch block // SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
// e1.printStackTrace(); List<String> gateWayIds = idxBizFanWarningRuleSetMapper.getGatewayIds();
// } gateWayIds.forEach(gateWayId -> {
// Calendar calendar = Calendar.getInstance(); new Thread(() -> {
log.info("风机---------------------预警时间----" + time); Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfHourByGatewayId(gateWayId);
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
// calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-1);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
// 用ts字段查询时需要减8小时
// Date date = DateUtils.dateAddHours(time, -8);
Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfMinutesByGatewayId(gatewayId) + AnalyseOffset;
Date date = DateUtil.offsetHour(time, -8); Date date = DateUtil.offsetHour(time, -8);
date = DateUtil.offsetMinute(date, 0 - (maxWaringCycle * 10)); date = DateUtil.offsetHour(date, 0 - (maxWaringCycle));
LambdaQueryWrapper<FanHealthIndexMoment> wrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<FanHealthIndexHour> wrapper = new LambdaQueryWrapper<>();
//wrapper.ne(FanHealthIndexMoment::getHealthLevel,"安全"); wrapper.eq(FanHealthIndexHour::getAnalysisObjType, "测点");
wrapper.eq(FanHealthIndexMoment::getAnalysisObjType, "测点"); wrapper.eq(FanHealthIndexHour::getGatewayId, gateWayId);
wrapper.ge(FanHealthIndexMoment::getTs, date); //wrapper.ne(FanHealthIndexHour::getHealthLevel, "安全");
wrapper.eq(FanHealthIndexMoment::getGatewayId, gatewayId); wrapper.ge(FanHealthIndexHour::getTs, date);
Date dateMax = DateUtil.offsetHour(time, -8); Date dateMax = DateUtil.offsetHour(time, -8);
wrapper.le(FanHealthIndexMoment::getTs, dateMax); wrapper.le(FanHealthIndexHour::getTs, dateMax);
// wrapper.eq(FanHealthIndexMoment::getIndexAddress, "18547"); wrapper.orderByAsc(FanHealthIndexHour::getTs);
wrapper.orderByAsc(FanHealthIndexMoment::getTs); List<FanHealthIndexHour> healthIndices = fanHealthIndexHourMapper.selectList(wrapper);
//查询最大连续时间规则的测点对象
List<FanHealthIndexMoment> healthIndices = fanHealthIndexMomentMapper.selectList(wrapper);
if (ObjectUtils.isEmpty(healthIndices)) { if (ObjectUtils.isEmpty(healthIndices)) {
return; return;
} }
List<String> collect = healthIndices.stream().map(FanHealthIndexMoment::getAnalysisObjSeq).collect(Collectors.toList()); List<String> collect = healthIndices.stream().map(FanHealthIndexHour::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType, WarningPeriodEnum.MINUTES.getName()); queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType, WarningPeriodEnum.HOUR.getName());
queryWrapper.eq(IdxBizFanWarningRuleSet::getGatewayId, gatewayId);
queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId, collect); queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId, collect);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper); List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<FanHealthIndexMoment>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexMoment::getGatewayId, Collectors.groupingBy(FanHealthIndexMoment::getIndexAddress))); Map<String, Map<String, List<FanHealthIndexHour>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexHour::getGatewayId, Collectors.groupingBy(FanHealthIndexHour::getIndexAddress)));
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>(); List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>(); List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>(); HashMap<String, StationBasic> stationMap = new HashMap<>();
// for (String gateWayId : gateWayMaps.keySet()) { // for (String gateWayId : gateWayMaps.keySet()) {
LambdaQueryWrapper<StationBasic> basicLambdaQueryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<StationBasic> basicLambdaQueryWrapper = new LambdaQueryWrapper<>();
basicLambdaQueryWrapper.eq(StationBasic::getFanGatewayId, gatewayId); basicLambdaQueryWrapper.eq(StationBasic::getFanGatewayId, gateWayId);
basicLambdaQueryWrapper.last("limit 1"); basicLambdaQueryWrapper.last("limit 1");
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper); StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gatewayId, stationBasic); stationMap.put(gateWayId, stationBasic);
Map<String, List<FanHealthIndexMoment>> healthDataMaps = gateWayMaps.get(gatewayId);
Map<String, List<FanHealthIndexHour>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) { for (String address : healthDataMaps.keySet()) {
//获取指定测点的健康指数列表 List<FanHealthIndexHour> idxBizFanHealthIndices = healthDataMaps.get(address);
List<FanHealthIndexMoment> idxBizFanHealthIndices = healthDataMaps.get(address);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList()); List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets)) { if (ObjectUtils.isEmpty(idxBizPvWarningRuleSets)) {
continue; continue;
} }
Double healthValueWarn = 0.0; Double healthValueWarn = 0.0;
Double healthValueRisk = 0.0; Double healthValueRisk = 0.0;
Double healthValueNotice = 0.0; Double healthValueNotice = 0.0;
// long healthValueNoticeCount = 0;
// long healthValueRiskCount = 0;
// long healthValueWarnCount = 0;
int healthValueWarnCount = 0; int healthValueWarnCount = 0;
int healthValueRiskCount = 0; int healthValueRiskCount = 0;
int healthValueNoticeCount = 0; int healthValueNoticeCount = 0;
...@@ -1396,9 +2167,7 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1396,9 +2167,7 @@ public class HealthStatusIndicatorServiceImpl {
} }
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexHour::getHealthIndex).collect(Collectors.toList());
//获取指定测点的健康指数列表
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexMoment::getHealthIndex).collect(Collectors.toList());
// Double finalHealthValueRisk = healthValueRisk; // Double finalHealthValueRisk = healthValueRisk;
// long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count(); // long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count();
// Double finalHealthValueWarn = healthValueWarn; // Double finalHealthValueWarn = healthValueWarn;
...@@ -1424,24 +2193,21 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1424,24 +2193,21 @@ public class HealthStatusIndicatorServiceImpl {
String content = ""; String content = "";
String num = ""; String num = "";
if (noticeNum == healthValueNoticeCount) { if (noticeNum == healthValueNoticeCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_notice_day","notice");
level = "注意"; level = "注意";
num = "" + healthValueNotice; num = "" + healthValueNotice;
content = healthValueNoticeCount * 10 + "分钟"; content = healthValueNoticeCount + "小时";
} }
if (warnNum == healthValueWarnCount) { if (warnNum == healthValueWarnCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_warn_day","warn");
level = "警告"; level = "警告";
num = "" + healthValueWarn; num = "" + healthValueWarn;
content = healthValueWarnCount * 10 + "分钟"; content = healthValueWarnCount + "小时";
} }
if (riskNum == healthValueRiskCount) { if (riskNum == healthValueRiskCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_risk_day","risk");
level = "危险"; level = "危险";
num = "" + healthValueRisk; num = "" + healthValueRisk;
content = healthValueRiskCount * 10 + "分钟"; content = healthValueRiskCount + "小时";
} }
//库里若已存在该测点预警 不生成重复的 若新生预警等级高于历史 则生成
LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>(); LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(FanWarningRecord::getAnalysisPointId, idxBizFanHealthIndices.get(0).getAnalysisObjSeq()); query.eq(FanWarningRecord::getAnalysisPointId, idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(FanWarningRecord::getStatus, "0"); query.eq(FanWarningRecord::getStatus, "0");
...@@ -1457,7 +2223,7 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1457,7 +2223,7 @@ public class HealthStatusIndicatorServiceImpl {
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getArea()); idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getArea());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getStation()); idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getStation());
idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getSubSystem()); idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getSubSystem());
idxBizFanWarningRecord.setGatewayId(gatewayId); idxBizFanWarningRecord.setGatewayId(gateWayId);
idxBizFanWarningRecord.setIndexAddress(address); idxBizFanWarningRecord.setIndexAddress(address);
idxBizFanWarningRecord.setEquipmentName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getEquipmentName()); idxBizFanWarningRecord.setEquipmentName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getEquipmentName());
idxBizFanWarningRecord.setAnalysisPointId(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getAnalysisObjSeq()); idxBizFanWarningRecord.setAnalysisPointId(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getAnalysisObjSeq());
...@@ -1466,17 +2232,17 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1466,17 +2232,17 @@ public class HealthStatusIndicatorServiceImpl {
idxBizFanWarningRecord.setWarningName(level); idxBizFanWarningRecord.setWarningName(level);
idxBizFanWarningRecord.setCONTENT(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num); idxBizFanWarningRecord.setCONTENT(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num);
idxBizFanWarningRecord.setRecDate(time); idxBizFanWarningRecord.setRecDate(time);
idxBizFanWarningRecord.setWarningPeriod(WarningPeriodEnum.MINUTES.getName()); idxBizFanWarningRecord.setWarningPeriod(WarningPeriodEnum.HOUR.getName());
idxBizFanWarningRecord.setNumber(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getNumber()); idxBizFanWarningRecord.setNumber(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getNumber());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName());
idxBizFanWarningRecord.setHealthIndexSeq(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex().toString()); idxBizFanWarningRecord.setHealthIndexSeq(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex().toString());
idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthLevel()); idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthLevel());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName());
idxBizFanWarningRecordList.add(idxBizFanWarningRecord); idxBizFanWarningRecordList.add(idxBizFanWarningRecord);
//idxBizFanWarningRecordMapper.insert(idxBizFanWarningRecord);
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime(); long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000; long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
FanWarningRecord fanWarningRecord = new FanWarningRecord(); FanWarningRecord fanWarningRecord = new FanWarningRecord();
BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT"); BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT");
fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT()); fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT());
...@@ -1485,62 +2251,64 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1485,62 +2251,64 @@ public class HealthStatusIndicatorServiceImpl {
fanWarningRecord.setHealthIndex(String.format(CommonConstans.Onedecimalplaces, idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex())); fanWarningRecord.setHealthIndex(String.format(CommonConstans.Onedecimalplaces, idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex()));
fanWarningRecord.setOrgCode(idxBizFanHealthIndices.get(0).getOrgCode()); fanWarningRecord.setOrgCode(idxBizFanHealthIndices.get(0).getOrgCode());
tdFanWarningRecordList.add(fanWarningRecord); tdFanWarningRecordList.add(fanWarningRecord);
} }
} }
// } // }
// idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList); // idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
// tdengine插入 // tdengine插入
// fanWarningRecordService.saveBatch(tdFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList); fanWaringRecordMapper.saveBatchWarningRecords(tdFanWarningRecordList);
} }
// 触发风险模型生成预警处置模块的预警记录 // 触发风险模型生成预警处置模块的预警记录
fetchDataFan(tdFanWarningRecordList, stationMap); fetchDataFan(tdFanWarningRecordList, stationMap);
}).start();
});
} }
/*** /***
* 每五小时获取一次最大粒度内的指数异常数据 * 每三天取一次最大粒度内的指数异常数据
* 判断五小时内数据是否符合预警规则 符合则报警并在redis中缓存 同一级别的预警记录下次不生成 * 判断三天内数据是否符合预警规则 符合则报警并在redis中缓存 同一级别的预警记录下次不生成
* *
*/ */
@Scheduled(cron = "0 0 0/1 * * ?") @Scheduled(cron = "0 0 0 0/1 * ? ")
@Async("async") @Async("async")
public void healthWarningHour() { public void healthWarningDay() {
if (!openHealth) { if (!openHealth) {
return; return;
} }
Date time = new Date(); Date time = new Date();
String format = DateUtil.format(time, "yyyy-MM-dd HH:00:00"); String format = DateUtil.format(time, "yyyy-MM-dd 00:00:00");
// Date date = DateUtils.dateAddHours(time, -13); //三天 + 8小时
// Date date = DateUtils.dateAddHours(time, -80);
// Calendar calendar = Calendar.getInstance(); // Calendar calendar = Calendar.getInstance();
// calendar.set(Calendar.HOUR_OF_DAY,calendar.get(Calendar.HOUR_OF_DAY)-5); // calendar.set(Calendar.DAY_OF_MONTH,calendar.get(Calendar.DAY_OF_MONTH)-3);
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm"); // SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfHour(); Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfDay();
Date date = DateUtil.offsetHour(time, -8); Date date = DateUtil.offsetHour(time, -8);
date = DateUtil.offsetHour(date, 0 - (maxWaringCycle)); date = DateUtil.offsetDay(date, 0 - (maxWaringCycle));
LambdaQueryWrapper<FanHealthIndexHour> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(FanHealthIndexHour::getAnalysisObjType, "测点"); LambdaQueryWrapper<FanHealthIndexDay> wrapper = new LambdaQueryWrapper<>();
//wrapper.ne(FanHealthIndexHour::getHealthLevel, "安全"); // wrapper.ne(FanHealthIndexDay::getHealthLevel,"安全");
wrapper.ge(FanHealthIndexHour::getTs, date); wrapper.ge(FanHealthIndexDay::getRecDate, date);
wrapper.eq(FanHealthIndexDay::getAnalysisObjType, "测点");
Date dateMax = DateUtil.offsetHour(time, -8); Date dateMax = DateUtil.offsetHour(time, -8);
wrapper.le(FanHealthIndexHour::getTs, dateMax); wrapper.le(FanHealthIndexDay::getRecDate, dateMax);
wrapper.orderByAsc(FanHealthIndexHour::getTs); wrapper.orderByAsc(FanHealthIndexDay::getTs);
List<FanHealthIndexHour> healthIndices = fanHealthIndexHourMapper.selectList(wrapper); List<FanHealthIndexDay> healthIndices = fanHealthIndexDayMapper.selectList(wrapper);
if (ObjectUtils.isEmpty(healthIndices)) { if (ObjectUtils.isEmpty(healthIndices)) {
return; return;
} }
List<String> collect = healthIndices.stream().map(FanHealthIndexHour::getAnalysisObjSeq).collect(Collectors.toList()); List<String> collect = healthIndices.stream().map(FanHealthIndexDay::getAnalysisObjSeq).collect(Collectors.toList());
LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<IdxBizFanWarningRuleSet> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType, WarningPeriodEnum.HOUR.getName()); queryWrapper.eq(IdxBizFanWarningRuleSet::getAnalysisType, WarningPeriodEnum.DAY.getName());
queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId, collect); queryWrapper.in(IdxBizFanWarningRuleSet::getAnalysisPointId, collect);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper); List<IdxBizFanWarningRuleSet> idxBizPvWarningRules = idxBizFanWarningRuleSetMapper.selectList(queryWrapper);
Map<String, Map<String, List<FanHealthIndexHour>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexHour::getGatewayId, Collectors.groupingBy(FanHealthIndexHour::getIndexAddress))); Map<String, Map<String, List<FanHealthIndexDay>>> gateWayMaps = healthIndices.stream().collect(Collectors.groupingBy(FanHealthIndexDay::getGatewayId, Collectors.groupingBy(FanHealthIndexDay::getIndexAddress)));
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>(); List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>(); List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>(); HashMap<String, StationBasic> stationMap = new HashMap<>();
...@@ -1552,9 +2320,9 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1552,9 +2320,9 @@ public class HealthStatusIndicatorServiceImpl {
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper); StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic); stationMap.put(gateWayId, stationBasic);
Map<String, List<FanHealthIndexHour>> healthDataMaps = gateWayMaps.get(gateWayId); Map<String, List<FanHealthIndexDay>> healthDataMaps = gateWayMaps.get(gateWayId);
for (String address : healthDataMaps.keySet()) { for (String address : healthDataMaps.keySet()) {
List<FanHealthIndexHour> idxBizFanHealthIndices = healthDataMaps.get(address); List<FanHealthIndexDay> idxBizFanHealthIndices = healthDataMaps.get(address);
List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList()); List<IdxBizFanWarningRuleSet> idxBizPvWarningRuleSets = idxBizPvWarningRules.stream().filter(t -> t.getAnalysisPointId().equals(idxBizFanHealthIndices.get(0).getAnalysisObjSeq())).collect(Collectors.toList());
...@@ -1566,8 +2334,9 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1566,8 +2334,9 @@ public class HealthStatusIndicatorServiceImpl {
Double healthValueNotice = 0.0; Double healthValueNotice = 0.0;
// long healthValueNoticeCount = 0; // long healthValueNoticeCount = 0;
// long healthValueRiskCount = 0;
// long healthValueWarnCount = 0; // long healthValueWarnCount = 0;
// long healthValueRiskCount = 0;
int healthValueWarnCount = 0; int healthValueWarnCount = 0;
int healthValueRiskCount = 0; int healthValueRiskCount = 0;
int healthValueNoticeCount = 0; int healthValueNoticeCount = 0;
...@@ -1576,7 +2345,6 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1576,7 +2345,6 @@ public class HealthStatusIndicatorServiceImpl {
int warnNum = 0; int warnNum = 0;
int noticeNum = 0; int noticeNum = 0;
for (IdxBizFanWarningRuleSet e : idxBizPvWarningRuleSets) { for (IdxBizFanWarningRuleSet e : idxBizPvWarningRuleSets) {
switch (e.getWarningName()) { switch (e.getWarningName()) {
case "警告": case "警告":
...@@ -1595,7 +2363,8 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1595,7 +2363,8 @@ public class HealthStatusIndicatorServiceImpl {
} }
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexHour::getHealthIndex).collect(Collectors.toList());
List<Double> healthIndex = idxBizFanHealthIndices.stream().map(FanHealthIndexDay::getHealthIndex).collect(Collectors.toList());
// Double finalHealthValueRisk = healthValueRisk; // Double finalHealthValueRisk = healthValueRisk;
// long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count(); // long riskNum = healthIndex.subList(healthIndex.size()>healthValueRiskCount? (int) (healthIndex.size() - healthValueRiskCount) :0,healthIndex.size()).stream().filter(e -> e <= finalHealthValueRisk).count();
// Double finalHealthValueWarn = healthValueWarn; // Double finalHealthValueWarn = healthValueWarn;
...@@ -1621,21 +2390,24 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1621,21 +2390,24 @@ public class HealthStatusIndicatorServiceImpl {
String content = ""; String content = "";
String num = ""; String num = "";
if (noticeNum == healthValueNoticeCount) { if (noticeNum == healthValueNoticeCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_notice_day","notice");
level = "注意"; level = "注意";
num = "" + healthValueNotice; num = "" + healthValueNotice;
content = healthValueNoticeCount + "小时"; content = healthValueNoticeCount + "";
} }
if (warnNum == healthValueWarnCount) { if (warnNum == healthValueWarnCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_warn_day","warn");
level = "警告"; level = "警告";
num = "" + healthValueWarn; num = "" + healthValueWarn;
content = healthValueWarnCount + "小时"; content = healthValueWarnCount + "";
} }
if (riskNum == healthValueRiskCount) { if (riskNum == healthValueRiskCount) {
// redisUtils.set(gateWayId+"_"+address+"_health_risk_day","risk");
level = "危险"; level = "危险";
num = "" + healthValueRisk; num = "" + healthValueRisk;
content = healthValueRiskCount + "小时"; content = healthValueRiskCount + "";
} }
LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>(); LambdaQueryWrapper<FanWarningRecord> query = new LambdaQueryWrapper<>();
query.eq(FanWarningRecord::getAnalysisPointId, idxBizFanHealthIndices.get(0).getAnalysisObjSeq()); query.eq(FanWarningRecord::getAnalysisPointId, idxBizFanHealthIndices.get(0).getAnalysisObjSeq());
query.eq(FanWarningRecord::getStatus, "0"); query.eq(FanWarningRecord::getStatus, "0");
...@@ -1660,17 +2432,17 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1660,17 +2432,17 @@ public class HealthStatusIndicatorServiceImpl {
idxBizFanWarningRecord.setWarningName(level); idxBizFanWarningRecord.setWarningName(level);
idxBizFanWarningRecord.setCONTENT(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num); idxBizFanWarningRecord.setCONTENT(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName() + "连续" + content + "健康指数≤" + num);
idxBizFanWarningRecord.setRecDate(time); idxBizFanWarningRecord.setRecDate(time);
idxBizFanWarningRecord.setWarningPeriod(WarningPeriodEnum.HOUR.getName()); idxBizFanWarningRecord.setWarningPeriod(WarningPeriodEnum.DAY.getName());
idxBizFanWarningRecord.setNumber(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getNumber()); idxBizFanWarningRecord.setNumber(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getNumber());
idxBizFanWarningRecord.setHealthIndexSeq(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex().toString()); idxBizFanWarningRecord.setHealthIndexSeq(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthIndex().toString());
idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthLevel()); idxBizFanWarningRecord.setHealthLevel(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getHealthLevel());
idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName()); idxBizFanWarningRecord.setPointName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getPointName());
idxBizFanWarningRecordList.add(idxBizFanWarningRecord); idxBizFanWarningRecordList.add(idxBizFanWarningRecord);
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
long nanoTime = System.nanoTime(); long nanoTime = System.nanoTime();
long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000; long timestamp = currentTimeMillis * 1000000 + nanoTime % 1000000;
FanWarningRecord fanWarningRecord = new FanWarningRecord(); FanWarningRecord fanWarningRecord = new FanWarningRecord();
BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT"); BeanUtils.copyProperties(idxBizFanWarningRecord, fanWarningRecord, "disposotionDate", "recDate", "CONTENT");
fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT()); fanWarningRecord.setContent(idxBizFanWarningRecord.getCONTENT());
...@@ -1682,6 +2454,7 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1682,6 +2454,7 @@ public class HealthStatusIndicatorServiceImpl {
} }
} }
} }
// idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList); // idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) { if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
...@@ -1692,15 +2465,9 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1692,15 +2465,9 @@ public class HealthStatusIndicatorServiceImpl {
fetchDataFan(tdFanWarningRecordList, stationMap); fetchDataFan(tdFanWarningRecordList, stationMap);
} }
/*** // @Scheduled(cron = "0 0 0 0/1 * ? ")
* 每三天取一次最大粒度内的指数异常数据
* 判断三天内数据是否符合预警规则 符合则报警并在redis中缓存 同一级别的预警记录下次不生成
*
*/
@Scheduled(cron = "0 0 0 0/1 * ? ")
@Async("async") @Async("async")
public void healthWarningDay() { public void healthWarningDayNew() {
if (!openHealth) { if (!openHealth) {
return; return;
} }
...@@ -1711,7 +2478,9 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1711,7 +2478,9 @@ public class HealthStatusIndicatorServiceImpl {
// Calendar calendar = Calendar.getInstance(); // Calendar calendar = Calendar.getInstance();
// calendar.set(Calendar.DAY_OF_MONTH,calendar.get(Calendar.DAY_OF_MONTH)-3); // calendar.set(Calendar.DAY_OF_MONTH,calendar.get(Calendar.DAY_OF_MONTH)-3);
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd"); // SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfDay(); List<String> gatewayIds = idxBizFanWarningRuleSetMapper.getGatewayIds();
gatewayIds.stream().forEach(gatewayId->{
Integer maxWaringCycle = idxBizFanWarningRuleSetMapper.getMaxWaringCycleOfDayByGatewayId(gatewayId);
Date date = DateUtil.offsetHour(time, -8); Date date = DateUtil.offsetHour(time, -8);
date = DateUtil.offsetDay(date, 0 - (maxWaringCycle)); date = DateUtil.offsetDay(date, 0 - (maxWaringCycle));
...@@ -1737,15 +2506,15 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1737,15 +2506,15 @@ public class HealthStatusIndicatorServiceImpl {
List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>(); List<IdxBizFanWarningRecord> idxBizFanWarningRecordList = new ArrayList<>();
List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>(); List<FanWarningRecord> tdFanWarningRecordList = new ArrayList<>();
HashMap<String, StationBasic> stationMap = new HashMap<>(); HashMap<String, StationBasic> stationMap = new HashMap<>();
for (String gateWayId : gateWayMaps.keySet()) { // for (String gateWayId : gateWayMaps.keySet()) {
LambdaQueryWrapper<StationBasic> basicLambdaQueryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<StationBasic> basicLambdaQueryWrapper = new LambdaQueryWrapper<>();
basicLambdaQueryWrapper.eq(StationBasic::getFanGatewayId, gateWayId); basicLambdaQueryWrapper.eq(StationBasic::getFanGatewayId, gatewayId);
basicLambdaQueryWrapper.last("limit 1"); basicLambdaQueryWrapper.last("limit 1");
StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper); StationBasic stationBasic = stationBasicMapper.selectOne(basicLambdaQueryWrapper);
stationMap.put(gateWayId, stationBasic); stationMap.put(gatewayId, stationBasic);
Map<String, List<FanHealthIndexDay>> healthDataMaps = gateWayMaps.get(gateWayId); Map<String, List<FanHealthIndexDay>> healthDataMaps = gateWayMaps.get(gatewayId);
for (String address : healthDataMaps.keySet()) { for (String address : healthDataMaps.keySet()) {
List<FanHealthIndexDay> idxBizFanHealthIndices = healthDataMaps.get(address); List<FanHealthIndexDay> idxBizFanHealthIndices = healthDataMaps.get(address);
...@@ -1848,7 +2617,7 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1848,7 +2617,7 @@ public class HealthStatusIndicatorServiceImpl {
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getArea()); idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getArea());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getStation()); idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getStation());
idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getSubSystem()); idxBizFanWarningRecord.setSubSystem(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getSubSystem());
idxBizFanWarningRecord.setGatewayId(gateWayId); idxBizFanWarningRecord.setGatewayId(gatewayId);
idxBizFanWarningRecord.setIndexAddress(address); idxBizFanWarningRecord.setIndexAddress(address);
idxBizFanWarningRecord.setEquipmentName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getEquipmentName()); idxBizFanWarningRecord.setEquipmentName(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getEquipmentName());
idxBizFanWarningRecord.setAnalysisPointId(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getAnalysisObjSeq()); idxBizFanWarningRecord.setAnalysisPointId(idxBizFanHealthIndices.get(idxBizFanHealthIndices.size() - 1).getAnalysisObjSeq());
...@@ -1878,7 +2647,7 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1878,7 +2647,7 @@ public class HealthStatusIndicatorServiceImpl {
tdFanWarningRecordList.add(fanWarningRecord); tdFanWarningRecordList.add(fanWarningRecord);
} }
} }
} // }
// idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList); // idxBizFanWarningRecordService.saveBatch(idxBizFanWarningRecordList);
if (CollUtil.isNotEmpty(tdFanWarningRecordList)) { if (CollUtil.isNotEmpty(tdFanWarningRecordList)) {
...@@ -1888,6 +2657,8 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1888,6 +2657,8 @@ public class HealthStatusIndicatorServiceImpl {
} }
// 触发风险模型生成预警处置模块的预警记录 // 触发风险模型生成预警处置模块的预警记录
fetchDataFan(tdFanWarningRecordList, stationMap); fetchDataFan(tdFanWarningRecordList, stationMap);
});
} }
public String getJumpUrlByInfo(String sbbm, List<JumpConfig> jumpConfigs) { public String getJumpUrlByInfo(String sbbm, List<JumpConfig> jumpConfigs) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment