Commit cd0d13a5 authored by hezhuozhi's avatar hezhuozhi

【智能分析】添加升压站并进行分析

parent 88b62f9b
......@@ -152,6 +152,9 @@ public class IdxBizFanPointVarCorrelation{
/**
* 匹配工况变量
*/
@TableField("ORG_CODE")
private String orgCode;
@TableField("MATCH_PROCESS_PONIT")
private String matchProcessPoint;
}
......@@ -152,6 +152,10 @@ public class IdxBizPvPointVarCorrelation{
@TableField("PROCESS_POINT_NAME")
private String processPointName;
@TableField("ORG_CODE")
private String orgCode;
@TableField("MATCH_PROCESS_POINT")
private String matchProcessPoint;
}
......@@ -13,21 +13,21 @@ import java.util.Date;
public interface Constant {
// 风电相关性消费者
String kafkaTopicConsumer = "FAN_XGX";
String kafkaTopicConsumer = "FAN_XGX_NEW";
// 光伏相关性消费者
String kafkaTopicConsumerPv = "PV_XGX";
String kafkaTopicConsumerPv = "PV_XGX_NEW";
// 风电 工况区间划分
String kafkaTopicConsumerGKHFFan = "FAN_QJHF";
String kafkaTopicConsumerGKHFFan = "FAN_QJHF_NEW";
// 光伏 工况区间划分
String kafkaTopicConsumerGKHFPv = "PV_QJHF";
String kafkaTopicConsumerGKHFPv = "PV_QJHF_NEW";
// 风电 中心值计算
String kafkaTopicConsumerZXZFan = "FAN_ZXZ";
String kafkaTopicConsumerZXZFan = "FAN_ZXZ_NEW";
// 光伏 中心值计算
String kafkaTopicConsumerZXZPv = "PV_ZXZ";
String kafkaTopicConsumerZXZPv = "PV_ZXZ_NEW";
}
package com.yeejoin.amos.boot.module.jxiop.biz.kafka;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Sequence;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointProcessVariableClassification;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointVarCorrelation;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvPointProcessVariableClassification;
......@@ -16,7 +18,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static com.yeejoin.amos.boot.module.jxiop.biz.kafka.Constant.*;
......@@ -37,11 +43,6 @@ public class FanConditionVariablesMessage {
@Autowired
private IdxBizPvPointProcessVariableClassificationMapper classificationMapperPv;
@Autowired
private IdxBizFanPointProcessVariableClassificationMapper idxBizFanPointProcessVariableClassificationMapper;
@Autowired
private IdxBizPvPointProcessVariableClassificationMapper idxBizPvPointProcessVariableClassificationMapper;
@Autowired
private KafkaProducerService kafkaProducerService;
......@@ -49,17 +50,96 @@ public class FanConditionVariablesMessage {
// 相关性分析-风机入口
@Async
public void getFanConditionVariables() {
List<IdxBizFanPointVarCorrelation> pointVarCorrelationsList = pointVarCorrelationMapper.selectList(new LambdaQueryWrapper<IdxBizFanPointVarCorrelation>());
List<IdxBizFanPointVarCorrelation> pointVarCorrelationsList = initFanPointVar();
log.debug("==================初始化了{}条数据==============================",pointVarCorrelationsList.size());
pointVarCorrelationsList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumer, JSON.toJSONString(item)));
}
/**
* 初始化风机的相关性分析
* @return
*/
private List<IdxBizFanPointVarCorrelation> initFanPointVar() {
//清空表
pointVarCorrelationMapper.delete(null);
Date recDate = new Date();
Sequence sequence = new Sequence();
List<IdxBizFanPointProcessVariableClassification> gkblList = classificationMapperFan.selectList(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "工况变量"));
List<IdxBizFanPointProcessVariableClassification> fxblList = classificationMapperFan.selectList(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "分析变量"));
List<IdxBizFanPointVarCorrelation> fanPointVarCorrelations=new ArrayList<>();
//工况变量和分析变量不为空 进行计算 总数值=同一设备的工况变量*分析变量
if(CollectionUtil.isNotEmpty(gkblList) && CollectionUtil.isNotEmpty(fxblList)){
//聚合同一设备
Map<String, List<IdxBizFanPointProcessVariableClassification>> fxblMap = fxblList.stream().collect(Collectors.groupingBy(IdxBizFanPointProcessVariableClassification::getEquipmentName));
gkblList.forEach(item ->{
fxblMap.get(item.getEquipmentName()).forEach(item1->{
IdxBizFanPointVarCorrelation idxBizFanPointVarCorrelation = BeanUtil.copyProperties(item, IdxBizFanPointVarCorrelation.class);
idxBizFanPointVarCorrelation.setSequenceNbr(String.valueOf(sequence.nextId()));
idxBizFanPointVarCorrelation.setRecDate(recDate);
idxBizFanPointVarCorrelation.setCorrelationCoefficient(0.0);
idxBizFanPointVarCorrelation.setAnalysisPointId(Long.valueOf(item1.getSequenceNbr()));
idxBizFanPointVarCorrelation.setProcessPointId(Long.valueOf(item.getSequenceNbr()));
idxBizFanPointVarCorrelation.setDeviceType(null);
idxBizFanPointVarCorrelation.setAnalysisGatewayId(item1.getGatewayId());
idxBizFanPointVarCorrelation.setAnalysisIndexAddress(item1.getIndexAddress());
idxBizFanPointVarCorrelation.setProcessGatewayId(item.getGatewayId());
idxBizFanPointVarCorrelation.setProcessIndexAddress(item.getIndexAddress());
idxBizFanPointVarCorrelation.setAnalysisPointName(item1.getPointName());
idxBizFanPointVarCorrelation.setProcessPointName(item.getPointName());
idxBizFanPointVarCorrelation.setMatchProcessPoint("匹配");
fanPointVarCorrelations.add(idxBizFanPointVarCorrelation);
});
});
}
return fanPointVarCorrelations;
}
// 相关性分析-光伏入口
@Async
public void getPvConditionVariables() {
List<IdxBizPvPointVarCorrelation> pointVarCorrelationsList = pointVarCorrelationMapperPv.selectList(null);
List<IdxBizPvPointVarCorrelation> pointVarCorrelationsList = initPvPointVar();
log.debug("==============初始化了光伏{}条数据===============",pointVarCorrelationsList.size());
pointVarCorrelationsList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerPv, JSON.toJSONString(item)));
}
/**
* 初始化光伏的相关性分析
*/
private List<IdxBizPvPointVarCorrelation> initPvPointVar() {
//清空表
pointVarCorrelationMapper.delete(null);
Date recDate = new Date();
Sequence sequence = new Sequence();
List<IdxBizPvPointProcessVariableClassification> gkblList = classificationMapperPv.selectList(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "工况变量"));
List<IdxBizPvPointProcessVariableClassification> fxblList = classificationMapperPv.selectList(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "分析变量"));
List<IdxBizPvPointVarCorrelation> pvPointVarCorrelations =new ArrayList<>();
//工况变量和分析变量不为空 进行计算 总数值=同一设备的工况变量*分析变量
if(CollectionUtil.isNotEmpty(gkblList) && CollectionUtil.isNotEmpty(fxblList)){
//聚合同一设备
Map<String, List<IdxBizPvPointProcessVariableClassification>> fxblMap = fxblList.stream().collect(Collectors.groupingBy(IdxBizPvPointProcessVariableClassification::getEquipmentName));
gkblList.forEach(item ->{
fxblMap.get(item.getEquipmentName()).forEach(item1->{
IdxBizPvPointVarCorrelation idxBizPvPointVarCorrelation = BeanUtil.copyProperties(item, IdxBizPvPointVarCorrelation.class);
idxBizPvPointVarCorrelation.setSequenceNbr(String.valueOf(sequence.nextId()));
idxBizPvPointVarCorrelation.setRecDate(recDate);
idxBizPvPointVarCorrelation.setCorrelationCoefficient(0.0);
idxBizPvPointVarCorrelation.setAnalysisPointId(item1.getSequenceNbr());
idxBizPvPointVarCorrelation.setProcessPointId(item.getSequenceNbr());
idxBizPvPointVarCorrelation.setDeviceType(item.getDeviceType());
idxBizPvPointVarCorrelation.setAnalysisGatewayId(item1.getGatewayId());
idxBizPvPointVarCorrelation.setAnalysisIndexAddress(Integer.valueOf(item1.getIndexAddress()));
idxBizPvPointVarCorrelation.setProcessGatewayId(item.getGatewayId());
idxBizPvPointVarCorrelation.setProcessIndexAddress(item.getIndexAddress());
idxBizPvPointVarCorrelation.setAnalysisPointName(item1.getPointName());
idxBizPvPointVarCorrelation.setProcessPointName(item.getPointName());
idxBizPvPointVarCorrelation.setMatchProcessPoint("匹配");
pvPointVarCorrelations.add(idxBizPvPointVarCorrelation);
});
});
}
return pvPointVarCorrelations;
}
// 工况划分 - 风电 - 新
@Async
public void getFanConditionVariablesGKHF() {
......@@ -77,14 +157,14 @@ public class FanConditionVariablesMessage {
// 中心值 - 风电 - 新
@Async
public void getFanConditionVariablesZXZ() {
List<IdxBizFanPointProcessVariableClassification> fenxiList = idxBizFanPointProcessVariableClassificationMapper.selectList(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "分析变量"));
List<IdxBizFanPointProcessVariableClassification> fenxiList = classificationMapperFan.selectList(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "分析变量"));
fenxiList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerZXZFan, JSON.toJSONString(item)));
}
// 中心值 - 风电 - 新
@Async
public void getPvConditionVariablesZXZ() {
List<IdxBizPvPointProcessVariableClassification> fenxiList = idxBizPvPointProcessVariableClassificationMapper.selectList(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "分析变量"));
List<IdxBizPvPointProcessVariableClassification> fenxiList = classificationMapperPv.selectList(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "分析变量"));
fenxiList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerZXZPv, JSON.toJSONString(item)));
}
}
......@@ -247,6 +247,7 @@ public class KafkaConsumerService {
idxBizFanPointVarCentralValue.setSubSystem(analysisVariable.getSubSystem());
idxBizFanPointVarCentralValue.setNumber(analysisVariable.getNumber());
idxBizFanPointVarCentralValue.setEquipmentName(analysisVariable.getEquipmentName());
idxBizFanPointVarCentralValue.setOrgCode(analysisVariable.getOrgCode());
insertList.add(idxBizFanPointVarCentralValue);
}
if (CollectionUtils.isNotEmpty(insertList)) {
......@@ -963,6 +964,7 @@ public class KafkaConsumerService {
idxBizPvPointVarCentralValue.setSubarray(analysisVariable.getSubarray());
idxBizPvPointVarCentralValue.setManufacturer(analysisVariable.getManufacturer());
idxBizPvPointVarCentralValue.setEquipmentName(analysisVariable.getEquipmentName());
idxBizPvPointVarCentralValue.setOrgCode(analysisVariable.getOrgCode());
insertList.add(idxBizPvPointVarCentralValue);
}
if (CollectionUtils.isNotEmpty(insertList)) {
......
package com.yeejoin.amos.boot.module.jxiop.biz.listener;
import java.text.ParseException;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqxListener;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.CommonServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.TdengineTimeServiceImpl;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @author Administrator
......@@ -97,6 +94,12 @@ public class SyncESDataToTdengineMqttListener extends EmqxListener {
String pvResult = pv.get();
// 区域 全域最后统一生成
tdengineTimeService.insertMomentDataAll(format);
if (isWholeHour(format)){
tdengineTimeService.insertHourData();
}
if (isWholeDay(format)){
tdengineTimeService.insertDayData();
}
} catch (InterruptedException | ExecutionException e) {
System.out.println("任务执行异常");
e.printStackTrace();
......@@ -156,4 +159,23 @@ public class SyncESDataToTdengineMqttListener extends EmqxListener {
// }).start();
// }
}
private boolean isWholeHour(String dateTimeStr) {
try {
DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime dateTime = LocalDateTime.parse(dateTimeStr, FORMATTER);
return dateTime.getMinute() == 0 && dateTime.getSecond() == 0;
} catch (DateTimeParseException e) {
return false;
}
}
private boolean isWholeDay(String dateTimeStr) {
try {
DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime dateTime = LocalDateTime.parse(dateTimeStr, FORMATTER);
return dateTime.getMinute() == 0 && dateTime.getSecond() == 0 && dateTime.getHour() == 0;
} catch (DateTimeParseException e) {
return false;
}
}
}
......@@ -100,7 +100,7 @@ public class TdengineTimeServiceImpl {
/**
* 风电 - 按小时生成测点、子系统、设备、场站、区域 数据
*/
@Scheduled(cron = "0 0 0/1 * * ? ")
// @Scheduled(cron = "0 0 0/1 * * ? ")
public void insertHourData() throws ParseException {
if (!openHealth) {
return;
......@@ -191,7 +191,7 @@ public class TdengineTimeServiceImpl {
/**
* 风电 - 按天生成测点、子系统、设备、场站、区域 数据
*/
@Scheduled(cron = "0 05 0 1/1 * ? ")
// @Scheduled(cron = "0 05 0 1/1 * ? ")
public void insertDayData() throws ParseException {
if (!openHealth) {
return;
......
......@@ -26,7 +26,7 @@ spring.db6.datasource.password=Yeejoin@2020
spring.db6.datasource.driver-class-name=com.kingbase8.Driver
## eureka properties:
eureka.instance.hostname=47.92.234.253
eureka.instance.hostname=10.20.1.160
eureka.client.serviceUrl.defaultZone=http://admin:a1234560@${eureka.instance.hostname}:10001/eureka/
## redis properties:
spring.redis.database=1
......@@ -166,7 +166,7 @@ pictureUrl=upload/jxiop/syz/
#kafka
spring.kafka.bootstrap-servers=10.20.0.223:9092,10.20.0.133:9200
spring.kafka.bootstrap-servers=10.20.0.223:9092
spring.kafka.producer.retries=1
spring.kafka.producer.bootstrap-servers=10.20.0.223:9092,10.20.0.133:9200
spring.kafka.producer.batch-size=16384
......@@ -175,7 +175,7 @@ spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=consumerGroup
spring.kafka.consumer.bootstrap-servers=10.20.0.223:9092,10.20.0.133:9200
spring.kafka.consumer.bootstrap-servers=10.20.0.223:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment