Commit 8e86c1fc authored by zhangsen's avatar zhangsen

计算问题处理,代码提交

parent eb3a4bfa
...@@ -58,6 +58,17 @@ ...@@ -58,6 +58,17 @@
<version>2.4</version> <version>2.4</version>
<classifier>jdk15</classifier> <classifier>jdk15</classifier>
</dependency> </dependency>
<dependency>
<groupId>tech.tablesaw</groupId>
<artifactId>tablesaw-core</artifactId>
<version>0.43.1</version>
</dependency>
<dependency>
<groupId>tech.tablesaw</groupId>
<artifactId>tablesaw-json</artifactId>
<version>0.43.1</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package com.yeejoin.amos.boot.module.jxiop.biz.ESDto; package com.yeejoin.amos.boot.module.jxiop.biz.ESDto;
import io.github.classgraph.json.Id; //import io.github.classgraph.json.Id;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import nonapi.io.github.classgraph.json.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat; import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.Field;
......
...@@ -15,6 +15,7 @@ import org.springframework.core.io.support.PathMatchingResourcePatternResolver; ...@@ -15,6 +15,7 @@ import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.SQLException;
/** /**
* 从数据源配置 * 从数据源配置
...@@ -49,6 +50,18 @@ public class TdEngineConfig { ...@@ -49,6 +50,18 @@ public class TdEngineConfig {
datasource.setUsername(username); datasource.setUsername(username);
datasource.setPassword(password); datasource.setPassword(password);
datasource.setDriverClassName(driverClassName); datasource.setDriverClassName(driverClassName);
datasource.setInitialSize(100); // 初始连接数
datasource.setMaxActive(200); // 最大连接数
datasource.setMaxWait(60000); // 最大等待时间
datasource.setMinIdle(5); // 最小空闲连接数
datasource.setValidationQuery("SELECT 1"); // 验证查询
try {
datasource.setFilters("stat");
} catch (SQLException throwables) {
throwables.printStackTrace();
}
datasource.setQueryTimeout(30); // 查询超时时间
datasource.setConnectionProperties("useUnicode=true;characterEncoding=UTF-8"); // 连接属性
return datasource; return datasource;
} }
......
package com.yeejoin.amos.boot.module.jxiop.biz.controller; package com.yeejoin.amos.boot.module.jxiop.biz.controller;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils; import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IndicatorData;
import com.yeejoin.amos.boot.module.jxiop.biz.kafka.FanConditionVariablesMessage; import com.yeejoin.amos.boot.module.jxiop.biz.kafka.FanConditionVariablesMessage;
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizFanHealthIndexMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdmapper.IndicatorDataMapper;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
...@@ -14,6 +18,8 @@ import org.typroject.tyboot.core.restful.doc.TycloudOperation; ...@@ -14,6 +18,8 @@ import org.typroject.tyboot.core.restful.doc.TycloudOperation;
import org.typroject.tyboot.core.restful.utils.ResponseHelper; import org.typroject.tyboot.core.restful.utils.ResponseHelper;
import org.typroject.tyboot.core.restful.utils.ResponseModel; import org.typroject.tyboot.core.restful.utils.ResponseModel;
import java.util.List;
import static com.yeejoin.amos.boot.module.jxiop.biz.kafka.Constant.*; import static com.yeejoin.amos.boot.module.jxiop.biz.kafka.Constant.*;
@RestController @RestController
...@@ -36,7 +42,7 @@ public class KafkaAnalyseController { ...@@ -36,7 +42,7 @@ public class KafkaAnalyseController {
} }
fanConditionVariablesMessage.getFanConditionVariables(); fanConditionVariablesMessage.getFanConditionVariables();
redisUtils.set(kafkaTopicConsumer, RequestContext.getTraceId(), 300); redisUtils.set(kafkaTopicConsumer, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("计算中"); return ResponseHelper.buildResponse("开始计算");
} }
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false) @TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
...@@ -48,7 +54,7 @@ public class KafkaAnalyseController { ...@@ -48,7 +54,7 @@ public class KafkaAnalyseController {
} }
fanConditionVariablesMessage.getPvConditionVariables(); fanConditionVariablesMessage.getPvConditionVariables();
redisUtils.set(kafkaTopicConsumerPv, RequestContext.getTraceId(), 300); redisUtils.set(kafkaTopicConsumerPv, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("计算中"); return ResponseHelper.buildResponse("开始计算");
} }
...@@ -61,7 +67,7 @@ public class KafkaAnalyseController { ...@@ -61,7 +67,7 @@ public class KafkaAnalyseController {
} }
fanConditionVariablesMessage.getFanConditionVariablesGKHF(); fanConditionVariablesMessage.getFanConditionVariablesGKHF();
redisUtils.set(kafkaTopicConsumerGKHFFan, RequestContext.getTraceId(), 300); redisUtils.set(kafkaTopicConsumerGKHFFan, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("计算中"); return ResponseHelper.buildResponse("开始计算");
} }
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false) @TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
...@@ -73,6 +79,61 @@ public class KafkaAnalyseController { ...@@ -73,6 +79,61 @@ public class KafkaAnalyseController {
} }
fanConditionVariablesMessage.getPvConditionVariablesPvGKFX(); fanConditionVariablesMessage.getPvConditionVariablesPvGKFX();
redisUtils.set(kafkaTopicConsumerGKHFPv, RequestContext.getTraceId(), 300); redisUtils.set(kafkaTopicConsumerGKHFPv, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("开始计算");
}
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/getFanConditionVariablesZXZ")
@ApiOperation(httpMethod = "POST", value = "中心值 - 风电 - 新", notes = "中心值 - 风电 - 新")
public ResponseModel<Object> getFanConditionVariablesZXZ() {
if (redisUtils.hasKey(kafkaTopicConsumerZXZFan)) {
return ResponseHelper.buildResponse("计算中");
}
fanConditionVariablesMessage.getFanConditionVariablesZXZ();
redisUtils.set(kafkaTopicConsumerZXZFan, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("开始计算");
}
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/getPvConditionVariablesZXZ")
@ApiOperation(httpMethod = "POST", value = "中心值 - 光伏 - 新", notes = "工况划分 - 光伏 - 新")
public ResponseModel<Object> getPvConditionVariablesZXZ() {
if (redisUtils.hasKey(kafkaTopicConsumerZXZPv)) {
return ResponseHelper.buildResponse("计算中"); return ResponseHelper.buildResponse("计算中");
} }
fanConditionVariablesMessage.getPvConditionVariablesZXZ();
redisUtils.set(kafkaTopicConsumerZXZPv, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("开始计算");
}
@Autowired
IndicatorDataMapper indicatorDataMapper;
@Autowired
IdxBizFanHealthIndexMapper idxBizFanHealthIndexMapper;
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@GetMapping(value = "/getAddressInfo")
@ApiOperation(httpMethod = "GET", value = "getAddressInfo", notes = "getAddressInfo")
public ResponseModel<List<IndicatorData>> getAddressInfo() {
List<String> addressInfo = idxBizFanHealthIndexMapper.getAddressInfo();
String join = String.join(",", addressInfo);
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, "1668801435891929089");
return ResponseHelper.buildResponse(indicatorData);
}
} }
...@@ -102,7 +102,7 @@ public class IdxBizPvPointProcessVariableClassification{ ...@@ -102,7 +102,7 @@ public class IdxBizPvPointProcessVariableClassification{
* *
*/ */
@TableField("POINT_ID") @TableField("POINT_ID")
private Integer pointId; private String pointId;
/** /**
* 片区 * 片区
......
...@@ -11,17 +11,6 @@ import java.util.Date; ...@@ -11,17 +11,6 @@ import java.util.Date;
* @date 2023年09月02日 11:02 * @date 2023年09月02日 11:02
*/ */
public interface Constant { public interface Constant {
String INSERT = "INSERT";
String UPDATE = "UPDATE";
String DATA = "data";
String TOPIC = "topic";
String TABLE = "table";
String TYPE = "type";
String DB_TYPE = "dbType";
String BODY = "body";
String DATA_TYPE = "datatype";
String STATE = "state";
// 风电相关性消费者 // 风电相关性消费者
String kafkaTopicConsumer = "FanConditionVariables"; String kafkaTopicConsumer = "FanConditionVariables";
...@@ -34,16 +23,10 @@ public interface Constant { ...@@ -34,16 +23,10 @@ public interface Constant {
// 光伏 工况区间划分 // 光伏 工况区间划分
String kafkaTopicConsumerGKHFPv = "PvConditionVariablesGKHF"; String kafkaTopicConsumerGKHFPv = "PvConditionVariablesGKHF";
@Value("${last.month.num:12}")
public Integer lastMonthNum = 12;
String startTime = DateUtils.convertDateToString(DateUtil.offsetMonth(new Date(), -lastMonthNum), DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
// 相关性 // 风电 中心值计算
String baseUrlXGX = "http://139.9.171.247:8052/intelligent-analysis/correlation"; String kafkaTopicConsumerZXZFan = "FanConditionVariablesGZXZ";
// 光伏 中心值计算
String kafkaTopicConsumerZXZPv = "PvConditionVariablesZXZ";
// 工况划分
String baseUrlGKHF = "http://139.9.171.247:8052/intelligent-analysis/working-condition-division";
} }
package com.yeejoin.amos.boot.module.jxiop.biz.kafka; package com.yeejoin.amos.boot.module.jxiop.biz.kafka;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointProcessVariableClassification; import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointProcessVariableClassification;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointVarCorrelation; import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointVarCorrelation;
...@@ -12,6 +13,7 @@ import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizPvPointProcessVariab ...@@ -12,6 +13,7 @@ import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizPvPointProcessVariab
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizPvPointVarCorrelationMapper; import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizPvPointVarCorrelationMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
...@@ -36,29 +38,53 @@ public class FanConditionVariablesMessage { ...@@ -36,29 +38,53 @@ public class FanConditionVariablesMessage {
private IdxBizPvPointProcessVariableClassificationMapper classificationMapperPv; private IdxBizPvPointProcessVariableClassificationMapper classificationMapperPv;
@Autowired @Autowired
private IdxBizFanPointProcessVariableClassificationMapper idxBizFanPointProcessVariableClassificationMapper;
@Autowired
private IdxBizPvPointProcessVariableClassificationMapper idxBizPvPointProcessVariableClassificationMapper;
@Autowired
private KafkaProducerService kafkaProducerService; private KafkaProducerService kafkaProducerService;
// 相关性分析-风机入口 // 相关性分析-风机入口
@Async
public void getFanConditionVariables() { public void getFanConditionVariables() {
List<IdxBizFanPointVarCorrelation> pointVarCorrelationsList = pointVarCorrelationMapper.selectList(null); List<IdxBizFanPointVarCorrelation> pointVarCorrelationsList = pointVarCorrelationMapper.selectList(new LambdaQueryWrapper<IdxBizFanPointVarCorrelation>());
pointVarCorrelationsList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumer, JSON.toJSONString(item))); pointVarCorrelationsList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumer, JSON.toJSONString(item)));
} }
// 相关性分析-光伏入口 // 相关性分析-光伏入口
@Async
public void getPvConditionVariables() { public void getPvConditionVariables() {
List<IdxBizPvPointVarCorrelation> pointVarCorrelationsList = pointVarCorrelationMapperPv.selectList(null); List<IdxBizPvPointVarCorrelation> pointVarCorrelationsList = pointVarCorrelationMapperPv.selectList(null);
pointVarCorrelationsList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerPv, JSON.toJSONString(item))); pointVarCorrelationsList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerPv, JSON.toJSONString(item)));
} }
// 工况划分 - 风电 - 新 // 工况划分 - 风电 - 新
@Async
public void getFanConditionVariablesGKHF() { public void getFanConditionVariablesGKHF() {
List<IdxBizFanPointProcessVariableClassification> variableClassificationList = classificationMapperFan.selectList(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "工况变量")); List<IdxBizFanPointProcessVariableClassification> variableClassificationList = classificationMapperFan.selectList(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "工况变量"));
variableClassificationList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerGKHFFan, JSON.toJSONString(item))); variableClassificationList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerGKHFFan, JSON.toJSONString(item)));
} }
// 工况划分 - 光伏 - 新 // 工况划分 - 光伏 - 新
@Async
public void getPvConditionVariablesPvGKFX() { public void getPvConditionVariablesPvGKFX() {
List<IdxBizPvPointProcessVariableClassification> variableClassificationList = classificationMapperPv.selectList(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "工况变量")); List<IdxBizPvPointProcessVariableClassification> variableClassificationList = classificationMapperPv.selectList(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "工况变量"));
variableClassificationList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerGKHFPv, JSON.toJSONString(item))); variableClassificationList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerGKHFPv, JSON.toJSONString(item)));
} }
// 中心值 - 风电 - 新
@Async
public void getFanConditionVariablesZXZ() {
List<IdxBizFanPointProcessVariableClassification> fenxiList = idxBizFanPointProcessVariableClassificationMapper.selectList(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "分析变量"));
fenxiList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerZXZFan, JSON.toJSONString(item)));
}
// 中心值 - 风电 - 新
@Async
public void getPvConditionVariablesZXZ() {
List<IdxBizPvPointProcessVariableClassification> fenxiList = idxBizPvPointProcessVariableClassificationMapper.selectList(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "分析变量"));
fenxiList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerZXZFan, JSON.toJSONString(item)));
}
} }
package com.yeejoin.amos.boot.module.jxiop.biz.kafka;
import cn.hutool.core.date.DateUtil;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import org.springframework.beans.factory.annotation.Value;
import java.util.Date;
public class KafkaConstant {
@Value("${last.month.num:12}")
public Integer lastMonthNum = 12;
String startTime = DateUtils.convertDateToString(DateUtil.offsetMonth(new Date(), -lastMonthNum), DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
// 相关性
@Value("${base.url.XGX:http://139.9.171.247:8052/intelligent-analysis/working-condition-division}")
public static final String baseUrlXGX = "http://139.9.171.247:8052/intelligent-analysis/correlation";
// 工况划分
@Value("${base.url.GKHF:http://139.9.171.247:8052/intelligent-analysis/working-condition-division}")
public static final String baseUrlGKHF = "http://139.9.171.247:8052/intelligent-analysis/working-condition-division";
@Value("${spring.kafka.consumer.max-poll-records:30}")
public static final Integer threadNum = 30;
}
package com.yeejoin.amos.boot.module.jxiop.biz.kafka; package com.yeejoin.amos.boot.module.jxiop.biz.kafka;
import cn.hutool.core.date.DateUtil;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils; import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.*; import com.yeejoin.amos.boot.module.jxiop.biz.entity.*;
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizFanPointProcessVariableClassificationMapper; import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizFanPointVarCentralValueMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizFanPointProcessVariableClassificationServiceImpl; import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizPvPointVarCentralValueMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizFanPointVarCorrelationServiceImpl; import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.*;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizPvPointProcessVariableClassificationServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizPvPointVarCorrelationServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.tdmapper.IndicatorDataMapper; import com.yeejoin.amos.boot.module.jxiop.biz.tdmapper.IndicatorDataMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import tech.tablesaw.api.ColumnType;
import tech.tablesaw.api.DoubleColumn;
import tech.tablesaw.api.Table;
import tech.tablesaw.io.json.JsonReadOptions;
import tech.tablesaw.selection.Selection;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService; import java.util.function.Function;
import java.util.concurrent.Executors;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.yeejoin.amos.boot.module.jxiop.biz.kafka.Constant.*; import static com.yeejoin.amos.boot.module.jxiop.biz.kafka.Constant.*;
...@@ -57,57 +63,226 @@ public class KafkaConsumerService { ...@@ -57,57 +63,226 @@ public class KafkaConsumerService {
@Autowired @Autowired
IdxBizPvPointProcessVariableClassificationServiceImpl idxBizPvPointProcessVariableClassificationService; IdxBizPvPointProcessVariableClassificationServiceImpl idxBizPvPointProcessVariableClassificationService;
@Autowired
IdxBizFanPointVarCentralValueServiceImpl idxBizFanPointVarCentralValueService;
ExecutorService service = Executors.newFixedThreadPool(30); @Autowired
IdxBizFanPointVarCentralValueMapper idxBizFanPointVarCentralValueMapper;
/** @Autowired
* 批量消费kafka消息 【风电站 相关性】 IdxBizPvPointVarCentralValueServiceImpl idxBizPvPointVarCentralValueService;
*
* @param consumerRecords messages @Autowired
* @param ack ack IdxBizPvPointVarCentralValueMapper idxBizPvPointVarCentralValueMapper;
*/
@KafkaListener(id = "xgxFanConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumer) // 相关性
public void listenXGXFan(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) { @Value("${base.url.XGX:http://139.9.171.247:8052/intelligent-analysis/correlation}")
ack.acknowledge(); private String baseUrlXGX;
// 工况划分
@Value("${base.url.GKHF:http://139.9.171.247:8052/intelligent-analysis/working-condition-division}")
private String baseUrlGKHF;
@Value("${base.url.ZXZ:http://172.16.3.29:8052/intelligent-analysis/central-value}")
private String zxzJsUrlFanBySF;
@Value("${spring.kafka.consumer.max-poll-records:30}")
private Integer threadNum = 30;
@Value("${last.month.num:12}")
private Integer lastMonthNum;
ExecutorService service = Executors.newFixedThreadPool(threadNum);
BlockingQueue<PointData> queue = new LinkedBlockingQueue<>();
public KafkaConsumerService() {
for (int i = 0 ; i < threadNum; i++) {
service.execute(new Runnable() {
@Override
public void run() {
while(true) {
try { try {
CompletableFuture[] completableFutures = consumerRecords.stream().map(m -> CompletableFuture.supplyAsync(() -> consumerRecords(m), service)).toArray(CompletableFuture[]::new); PointData pointsData = queue.take();
CompletableFuture.allOf(completableFutures).join(); List<ConsumerRecord<String, String>> consumerRecords = pointsData.getConsumerRecords();
} finally { Table table = pointsData.getTable();
if ("xgxFanConsumer".equals(pointsData.getOperatorType())){
execFanCorrelation(consumerRecords, table);
} else if ("xgxPvConsumer".equals(pointsData.getOperatorType())) {
execPvCorrelation(consumerRecords, table);
} else if ("GKHFFanConsumer".equals(pointsData.getOperatorType())) {
consumerRecordsGKFXFan(consumerRecords, table);
} else if ("GKHFPvConsumer".equals(pointsData.getOperatorType())) {
consumerRecordsGKFXPv(consumerRecords, table);
} else if ("ZXZFanConsumer".equals(pointsData.getOperatorType())) {
consumerRecordsZXZFan(consumerRecords, pointsData);
} else if ("ZXZPvConsumer".equals(pointsData.getOperatorType())) {
consumerRecordsZXZPv(consumerRecords, pointsData);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
} }
} }
/**
* 风电处理消息
* @param consumerRecord
* @return
*/
boolean consumerRecords(ConsumerRecord<String, String> consumerRecord) {
try {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
IdxBizFanPointVarCorrelation fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointVarCorrelation.class);
List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataByAddressAndtimeNew(fanPointVarCorrelation.getAnalysisIndexAddress(), startTime, endTime, fanPointVarCorrelation.getAnalysisGatewayId(), fanPointVarCorrelation.getProcessGatewayId(), fanPointVarCorrelation.getProcessIndexAddress());
List<Double> data1 = new ArrayList<>(); private void consumerRecordsZXZFan(List<ConsumerRecord<String, String>> consumerRecords, PointData pointsData ) {
List<Double> data2 = new ArrayList<>(); Table table = pointsData.getTable();
tdengineData1.forEach(item -> {
if (item.getAddress().equals(fanPointVarCorrelation.getAnalysisIndexAddress()) && item.getGatewayId().equals(fanPointVarCorrelation.getAnalysisGatewayId())) { Map<String, List<IdxBizFanPointProcessVariableClassification>> zxzIds = pointsData.getZxzIds();
data1.add(Double.parseDouble(item.getValue())); for (String id : zxzIds.keySet()) {
List<IdxBizFanPointProcessVariableClassification> variableClassificationList = zxzIds.get(id);
String analysisVariableId = id;
List<IdxBizFanPointProcessVariableClassification> processVariableList = variableClassificationList.stream().filter(v -> !id.equals(v.getSequenceNbr().toString())).collect(Collectors.toList());
IdxBizFanPointProcessVariableClassification analysisVariable = variableClassificationList.stream().filter(v -> id.equals(v.getSequenceNbr().toString())).findFirst().get();
Map<String, Object> data1 = new HashMap<>();
Map<String, Object> data2 = new HashMap<>();
int index = 1;
Table dataTable = Table.create();
int minRow = 0;
for (IdxBizFanPointProcessVariableClassification processVariable : processVariableList) {
Selection selection = table.stringColumn("id").isEqualTo(processVariable.getIndexAddress() + "_" + processVariable.getGatewayId());
DoubleColumn values = table.where(selection).doubleColumn("value");
// 获取最小数据长度
if (index == 1) {
minRow = values.size();
} else { } else {
data2.add(Double.parseDouble(item.getValue())); minRow = minRow > values.size() ? values.size() : minRow;
} }
}); values.setName("processVariable" + index);
if (data1.size() < data2.size()) { if (!dataTable.isEmpty() && dataTable.rowCount() < values.size()) {
Integer a = data2.size() - data1.size(); dataTable.addColumns(values.inRange(0, dataTable.rowCount()));
for (int i = 0; i < a; i++) { } else {
data2.remove(0); dataTable.addColumns(values);
} }
} else if (data2.size() < data1.size()) {
Integer a = data1.size() - data2.size(); data1.put("processVariable" + index + "Id", processVariable.getSequenceNbr());
for (int i = 0; i < a; i++) {
data1.remove(0); // 构建工况区间数组
List<Object> IntervalValues = new ArrayList<>();
IntervalValues.add(processVariable.getIntervalValue1());
IntervalValues.add(processVariable.getIntervalValue2());
IntervalValues.add(processVariable.getIntervalValue3());
IntervalValues.add(processVariable.getIntervalValue4());
data2.put("processVariable" + index, IntervalValues);
index++;
}
Selection selection = table.stringColumn("id").isEqualTo(analysisVariable.getIndexAddress() + "_" + analysisVariable.getGatewayId());
DoubleColumn values = table.where(selection).doubleColumn("value");
values.setName("analysisVariable");
if (!dataTable.isEmpty() && dataTable.rowCount() < values.size()) {
dataTable.addColumns(values.inRange(0, dataTable.rowCount()));
} else {
dataTable.addColumns(values);
}
data1.put("analysisVariableId", analysisVariable.getSequenceNbr());
// 获取相同长度的数据
dataTable = dataTable.inRange(0, minRow);
List<String> list = dataTable.columnNames();
for (String column : list) {
data1.put(column, dataTable.doubleColumn(column).asDoubleArray());
}
Map<String,Object> requestMap = new HashMap<>();
requestMap.put("data1", data1);
requestMap.put("data2", data2);
String response = HttpUtil.createPost(zxzJsUrlFanBySF).body(JSON.toJSONString(requestMap)).execute().body();
if (response.contains("stdDev")) {
idxBizFanPointVarCentralValueMapper.delete(new QueryWrapper<IdxBizFanPointVarCentralValue>().eq("ANALYSIS_POINT_ID", analysisVariableId));
JSONObject jsonObject = JSON.parseObject(response);
int length = jsonObject.getJSONArray("stdDev").size();
List<IdxBizFanPointVarCentralValue> insertList = new ArrayList<>();
for (int i = 0; i < length; i++) {
IdxBizFanPointVarCentralValue idxBizFanPointVarCentralValue = new IdxBizFanPointVarCentralValue();
idxBizFanPointVarCentralValue.setProcess1Min(jsonObject.getJSONArray("process1Min").getDoubleValue(i));
idxBizFanPointVarCentralValue.setProcess2Min(jsonObject.getJSONArray("process2Min").getDoubleValue(i));
idxBizFanPointVarCentralValue.setProcess3Min(jsonObject.getJSONArray("process3Min").getDoubleValue(i));
idxBizFanPointVarCentralValue.setProcess1Max(jsonObject.getJSONArray("process1Max").getDoubleValue(i));
idxBizFanPointVarCentralValue.setPorcess2Max(jsonObject.getJSONArray("process2Max").getDoubleValue(i));
idxBizFanPointVarCentralValue.setProcess3Max(jsonObject.getJSONArray("process3Max").getDoubleValue(i));
idxBizFanPointVarCentralValue.setAnalysisPointId(analysisVariableId);
idxBizFanPointVarCentralValue.setAnalysisPointName(analysisVariable.getPointName());
idxBizFanPointVarCentralValue.setProcessPoint1Id(data1.get("processVariable1Id").toString());
idxBizFanPointVarCentralValue.setProcessPoint1Name(processVariableList.get(0).getPointName());
idxBizFanPointVarCentralValue.setProcessPoint2Id(data1.get("processVariable2Id").toString());
idxBizFanPointVarCentralValue.setProcessPoint2Name(processVariableList.get(1).getPointName());
idxBizFanPointVarCentralValue.setProcessPoint3Id(data1.get("processVariable3Id").toString());
idxBizFanPointVarCentralValue.setProcessPoint3Name(processVariableList.get(2).getPointName());
idxBizFanPointVarCentralValue.setAnalysisStdDev(jsonObject.getJSONArray("stdDev").getDoubleValue(i));
idxBizFanPointVarCentralValue.setAnalysisCenterValue(jsonObject.getJSONArray("centerValue").getDoubleValue(i));
idxBizFanPointVarCentralValue.setArae(analysisVariable.getArae());
idxBizFanPointVarCentralValue.setStation(analysisVariable.getStation());
idxBizFanPointVarCentralValue.setSubSystem(analysisVariable.getSubSystem());
idxBizFanPointVarCentralValue.setNumber(analysisVariable.getNumber());
idxBizFanPointVarCentralValue.setEquipmentName(analysisVariable.getEquipmentName());
insertList.add(idxBizFanPointVarCentralValue);
} }
if (CollectionUtils.isNotEmpty(insertList)) {
idxBizFanPointVarCentralValueService.saveBatch(insertList);
} }
}
redisUtils.expire(kafkaTopicConsumerZXZFan, 600);
}
}
private void execPvCorrelation(List<ConsumerRecord<String, String>> consumerRecords, Table table) {
consumerRecords.parallelStream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizPvPointVarCorrelation pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointVarCorrelation.class);
Selection selection = table.stringColumn("id").isEqualTo(pvPointVarCorrelation.getAnalysisIndexAddress() + "_" + pvPointVarCorrelation.getAnalysisGatewayId());
double[] data1 = table.where(selection).doubleColumn("value").asDoubleArray();
selection = table.stringColumn("id").isEqualTo(pvPointVarCorrelation.getProcessIndexAddress() + "_" + pvPointVarCorrelation.getProcessGatewayId());
double[] data2 = table.where(selection).doubleColumn("value").asDoubleArray();
int shortestLength = Math.min(data1.length, data2.length);
data1 = subset(data1, shortestLength);
data2 = subset(data2, shortestLength);
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("data1", data1);
resultMap.put("data2", data2);
String response = HttpUtil.createPost(baseUrlXGX).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("correlation") && !response.contains("warning")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
pvPointVarCorrelation.setCorrelationCoefficient(jsonObject.getDoubleValue("correlation"));
log.info("------------------------------------------光伏相关性::计算成功,待更新表数据----------------------------------------");
} else {
pvPointVarCorrelation.setCorrelationCoefficient(0.0);
}
pvPointVarCorrelation.setRecDate(new Date());
idxBizPvPointVarCorrelationService.saveOrUpdate(pvPointVarCorrelation);
}
});
redisUtils.expire(kafkaTopicConsumerPv, 600);
}
private void execFanCorrelation(List<ConsumerRecord<String, String>> consumerRecords, Table table) {
consumerRecords.parallelStream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizFanPointVarCorrelation fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointVarCorrelation.class);
Selection selection = table.stringColumn("id").isEqualTo(fanPointVarCorrelation.getAnalysisIndexAddress() + "_" + fanPointVarCorrelation.getAnalysisGatewayId());
double[] data1 = table.where(selection).doubleColumn("value").asDoubleArray();
selection = table.stringColumn("id").isEqualTo(fanPointVarCorrelation.getProcessIndexAddress() + "_" + fanPointVarCorrelation.getProcessGatewayId());
double[] data2 = table.where(selection).doubleColumn("value").asDoubleArray();
int shortestLength = Math.min(data1.length, data2.length);
data1 = subset(data1, shortestLength);
data2 = subset(data2, shortestLength);
HashMap<String, Object> resultMap = new HashMap<>(); HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("data1", data1); resultMap.put("data1", data1);
resultMap.put("data2", data2); resultMap.put("data2", data2);
...@@ -122,19 +297,101 @@ public class KafkaConsumerService { ...@@ -122,19 +297,101 @@ public class KafkaConsumerService {
fanPointVarCorrelation.setRecDate(new Date()); fanPointVarCorrelation.setRecDate(new Date());
idxBizFanPointVarCorrelationService.saveOrUpdate(fanPointVarCorrelation); idxBizFanPointVarCorrelationService.saveOrUpdate(fanPointVarCorrelation);
log.info("表数据已更新");
log.info("----------------------------风机相关性--------------分析变量与工况变量相关性分析算法结束----------------------------------------");
log.info("kafka消费zhTestGroup消息{}", consumerRecord);
} }
} catch (Exception e) { });
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecord, e);
} finally {
redisUtils.expire(kafkaTopicConsumer, 600); redisUtils.expire(kafkaTopicConsumer, 600);
} }
return true;
public static double[] subset(double[] array, int length) {
if (length >= array.length) {
return array;
} else {
double[] subset = new double[length];
System.arraycopy(array, 0, subset, 0, length);
return subset;
}
}
class PointData {
private List<ConsumerRecord<String, String>> consumerRecords;
private Table table;
private String operatorType;
private Map<String, List<IdxBizFanPointProcessVariableClassification>> zxzIds;
private Map<String, List<IdxBizPvPointProcessVariableClassification>> zxzPvIds;
public PointData(List<ConsumerRecord<String, String>> consumerRecords, Table table, String operatorType) {
this.consumerRecords = consumerRecords;
this.table = table;
this.operatorType = operatorType;
} }
public PointData(List<ConsumerRecord<String, String>> consumerRecords, Table table, String operatorType, Map<String, List<IdxBizFanPointProcessVariableClassification>> zxzIds) {
this.consumerRecords = consumerRecords;
this.table = table;
this.operatorType = operatorType;
this.zxzIds = zxzIds;
}
public PointData(List<ConsumerRecord<String, String>> consumerRecords, Table table, String operatorType, Map<String, List<IdxBizPvPointProcessVariableClassification>> zxzPvIds, String notString) {
this.consumerRecords = consumerRecords;
this.table = table;
this.operatorType = operatorType;
this.zxzPvIds = zxzPvIds;
}
public Map<String, List<IdxBizPvPointProcessVariableClassification>> getZxzIdsPv() {
return zxzPvIds;
}
public Map<String, List<IdxBizFanPointProcessVariableClassification>> getZxzIds() {
return zxzIds;
}
public String getOperatorType() {
return operatorType;
}
public List<ConsumerRecord<String, String>> getConsumerRecords() {
return consumerRecords;
}
public Table getTable() {
return table;
}
}
/**
* 批量消费kafka消息 【风电站 相关性】
*
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "xgxFanConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumer)
public void listenXGXFan(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
ack.acknowledge();
Map<String, Set<String>> gatewayPoints = new HashMap<>();
consumerRecords.stream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizFanPointVarCorrelation fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointVarCorrelation.class);
Set<String> idSet = null;
if (gatewayPoints.containsKey(fanPointVarCorrelation.getAnalysisGatewayId())) {
idSet = gatewayPoints.get(fanPointVarCorrelation.getAnalysisGatewayId());
} else {
idSet = new HashSet<>();
}
idSet.add(fanPointVarCorrelation.getAnalysisIndexAddress());
idSet.add(fanPointVarCorrelation.getProcessIndexAddress());
gatewayPoints.put(fanPointVarCorrelation.getAnalysisGatewayId(), idSet);
}
});
buildExecData(consumerRecords, gatewayPoints, "xgxFanConsumer");
} catch (Exception e) {
e.printStackTrace();
}
}
/** /**
* 批量消费kafka消息 【光伏相关性 】 * 批量消费kafka消息 【光伏相关性 】
...@@ -144,67 +401,234 @@ public class KafkaConsumerService { ...@@ -144,67 +401,234 @@ public class KafkaConsumerService {
*/ */
@KafkaListener(id = "xgxPvConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerPv) @KafkaListener(id = "xgxPvConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerPv)
public void listenXGXPv(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) { public void listenXGXPv(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
ack.acknowledge();
try { try {
CompletableFuture[] completableFutures = consumerRecords.stream().map(m -> CompletableFuture.supplyAsync(() -> consumerRecordsPv(m), service)).toArray(CompletableFuture[]::new); ack.acknowledge();
CompletableFuture.allOf(completableFutures).join(); Map<String, Set<String>> gatewayPoints = new HashMap<>();
consumerRecords.stream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizPvPointVarCorrelation pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointVarCorrelation.class);
Set<String> idSet = null;
if (gatewayPoints.containsKey(pvPointVarCorrelation.getAnalysisGatewayId())) {
idSet = gatewayPoints.get(pvPointVarCorrelation.getAnalysisGatewayId());
} else {
idSet = new HashSet<>();
}
idSet.add(pvPointVarCorrelation.getAnalysisIndexAddress().toString());
idSet.add(pvPointVarCorrelation.getProcessIndexAddress());
gatewayPoints.put(pvPointVarCorrelation.getAnalysisGatewayId(), idSet);
}
});
buildExecData(consumerRecords, gatewayPoints, "xgxPvConsumer");
} finally { } finally {
} }
} }
private void buildZXZExecData(List<ConsumerRecord<String, String>> consumerRecords, Map<String, Set<String>> gatewayPoints, Map<String, List<IdxBizFanPointProcessVariableClassification>> zxzIds, String xgxPvConsumer) {
for (String gatewayId : gatewayPoints.keySet()) {
String join = String.join(",", gatewayPoints.get(gatewayId));
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId);
JsonReadOptions options = JsonReadOptions.builderFromString(JSON.toJSONString(indicatorData))
.columnTypes(new Function<String, ColumnType>() {
@Override
public ColumnType apply(String t) {
if (t.equals("value")) {
return ColumnType.DOUBLE;
}
return ColumnType.STRING;
}
}).build();
Table table = Table.read().usingOptions(options);
queue.add(new PointData(consumerRecords, table, xgxPvConsumer, zxzIds));
}
}
private void buildExecData(List<ConsumerRecord<String, String>> consumerRecords, Map<String, Set<String>> gatewayPoints, String xgxPvConsumer) {
for (String gatewayId : gatewayPoints.keySet()) {
String join = String.join(",", gatewayPoints.get(gatewayId));
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId);
JsonReadOptions options = JsonReadOptions.builderFromString(JSON.toJSONString(indicatorData))
.columnTypes(new Function<String, ColumnType>() {
@Override
public ColumnType apply(String t) {
if (t.equals("value")) {
return ColumnType.DOUBLE;
}
return ColumnType.STRING;
}
}).build();
Table table = Table.read().usingOptions(options);
queue.add(new PointData(consumerRecords, table, xgxPvConsumer));
}
}
/** /**
* 光伏处理消息 * 批量消费kafka消息 【风电 工况划分 】
* @param consumerRecord *
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "GKHFFanConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerGKHFFan)
public void listenGKHFFan(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
ack.acknowledge();
Map<String, Set<String>> gatewayPoints = new HashMap<>();
consumerRecords.stream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizFanPointProcessVariableClassification fanPointProcessVariable = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointProcessVariableClassification.class);
Set<String> idSet = null;
if (gatewayPoints.containsKey(fanPointProcessVariable.getGatewayId())) {
idSet = gatewayPoints.get(fanPointProcessVariable.getGatewayId());
} else {
idSet = new HashSet<>();
}
idSet.add(fanPointProcessVariable.getIndexAddress());
gatewayPoints.put(fanPointProcessVariable.getGatewayId(), idSet);
}
});
buildExecData(consumerRecords, gatewayPoints, "GKHFFanConsumer");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 风电 工况划分 处理
* @param consumerRecords
* @param table
* @return * @return
*/ */
boolean consumerRecordsPv(ConsumerRecord<String, String> consumerRecord) { boolean consumerRecordsGKFXFan(List<ConsumerRecord<String, String>> consumerRecords, Table table) {
try { try {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); consumerRecords.parallelStream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) { if (kafkaMessage.isPresent()) {
IdxBizPvPointVarCorrelation pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointVarCorrelation.class); IdxBizFanPointProcessVariableClassification fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointProcessVariableClassification.class);
List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataByAddressAndtimeNew(pvPointVarCorrelation.getAnalysisIndexAddress().toString(), startTime, endTime, pvPointVarCorrelation.getAnalysisGatewayId(), pvPointVarCorrelation.getProcessGatewayId(), pvPointVarCorrelation.getProcessIndexAddress()); HashMap<String, Object> resultMap = new HashMap<>();
List<Double> data1 = new ArrayList<>(); Selection selection = table.stringColumn("id").isEqualTo(fanPointVarCorrelation.getIndexAddress() + "_" + fanPointVarCorrelation.getGatewayId());
List<Double> data2 = new ArrayList<>(); double[] data1 = table.where(selection).doubleColumn("value").asDoubleArray();
tdengineData1.forEach(item -> { resultMap.put("processVariable", data1);
if (item.getAddress().equals(pvPointVarCorrelation.getAnalysisIndexAddress()) && item.getGatewayId().equals(pvPointVarCorrelation.getAnalysisGatewayId())) { resultMap.put("processVariableId", fanPointVarCorrelation.getSequenceNbr());
data1.add(Double.parseDouble(item.getValue()));
String response = HttpUtil.createPost(baseUrlGKHF).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("intervalValue1") && response.contains("processVariableId")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
fanPointVarCorrelation.setIntervalValue5(jsonObject.getDoubleValue("intervalValue5"));
fanPointVarCorrelation.setIntervalValue4(jsonObject.getDoubleValue("intervalValue4"));
fanPointVarCorrelation.setIntervalValue3(jsonObject.getDoubleValue("intervalValue3"));
fanPointVarCorrelation.setIntervalValue2(jsonObject.getDoubleValue("intervalValue2"));
fanPointVarCorrelation.setIntervalValue1(jsonObject.getDoubleValue("intervalValue1"));
log.info("------------------------------------------光伏相关性::计算成功,待更新表数据----------------------------------------");
} else { } else {
data2.add(Double.parseDouble(item.getValue())); fanPointVarCorrelation.setIntervalValue5(0.0);
fanPointVarCorrelation.setIntervalValue4(0.0);
fanPointVarCorrelation.setIntervalValue3(0.0);
fanPointVarCorrelation.setIntervalValue2(0.0);
fanPointVarCorrelation.setIntervalValue1(0.0);
}
fanPointVarCorrelation.setRecDate(new Date());
idxBizFanPointProcessVariableClassificationService.saveOrUpdate(fanPointVarCorrelation);
log.info("表数据已更新");
} }
}); });
if (data1.size() < data2.size()) { } catch (Exception e) {
Integer a = data2.size() - data1.size(); log.error("kafka失败,当前失败的批次");
for (int i = 0; i < a; i++) { } finally {
data2.remove(0); redisUtils.expire(kafkaTopicConsumerGKHFFan, 600);
} }
} else if (data2.size() < data1.size()) { return true;
Integer a = data1.size() - data2.size();
for (int i = 0; i < a; i++) {
data1.remove(0);
} }
/**
* 批量消费kafka消息 【光伏 工况划分 】
*
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "GKHFPvConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerGKHFPv)
public void listenGKHFPv(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
ack.acknowledge();
Map<String, Set<String>> gatewayPoints = new HashMap<>();
consumerRecords.stream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizPvPointProcessVariableClassification pvPointProcessVariable = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointProcessVariableClassification.class);
Set<String> idSet = null;
if (gatewayPoints.containsKey(pvPointProcessVariable.getGatewayId())) {
idSet = gatewayPoints.get(pvPointProcessVariable.getGatewayId());
} else {
idSet = new HashSet<>();
}
idSet.add(pvPointProcessVariable.getIndexAddress());
gatewayPoints.put(pvPointProcessVariable.getGatewayId(), idSet);
} }
});
buildExecData(consumerRecords, gatewayPoints, "GKHFPvConsumer");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 风电 工况划分 处理
* @param consumerRecords
* @return
*/
boolean consumerRecordsGKFXPv(List<ConsumerRecord<String, String>> consumerRecords, Table table) {
try {
consumerRecords.parallelStream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizPvPointProcessVariableClassification pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointProcessVariableClassification.class);
HashMap<String, Object> resultMap = new HashMap<>(); HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("data1", data1); Selection selection = table.stringColumn("id").isEqualTo(pvPointVarCorrelation.getIndexAddress() + "_" + pvPointVarCorrelation.getGatewayId());
resultMap.put("data2", data2); double[] data1 = table.where(selection).doubleColumn("value").asDoubleArray();
String response = HttpUtil.createPost(baseUrlXGX).body(JSON.toJSONString(resultMap)).execute().body(); resultMap.put("processVariable", data1);
if (response.contains("correlation") && !response.contains("warning")) { resultMap.put("processVariableId", pvPointVarCorrelation.getSequenceNbr());
String response = HttpUtil.createPost(baseUrlGKHF).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("intervalValue1") && response.contains("processVariableId")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response); com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
pvPointVarCorrelation.setCorrelationCoefficient(jsonObject.getDoubleValue("correlation"));
pvPointVarCorrelation.setIntervalValue5(jsonObject.getDoubleValue("intervalValue5"));
pvPointVarCorrelation.setIntervalValue4(jsonObject.getDoubleValue("intervalValue4"));
pvPointVarCorrelation.setIntervalValue3(jsonObject.getDoubleValue("intervalValue3"));
pvPointVarCorrelation.setIntervalValue2(jsonObject.getDoubleValue("intervalValue2"));
pvPointVarCorrelation.setIntervalValue1(jsonObject.getDoubleValue("intervalValue1"));
log.info("------------------------------------------光伏相关性::计算成功,待更新表数据----------------------------------------"); log.info("------------------------------------------光伏相关性::计算成功,待更新表数据----------------------------------------");
} else { } else {
pvPointVarCorrelation.setCorrelationCoefficient(0.0); pvPointVarCorrelation.setIntervalValue5(0.0);
pvPointVarCorrelation.setIntervalValue4(0.0);
pvPointVarCorrelation.setIntervalValue3(0.0);
pvPointVarCorrelation.setIntervalValue2(0.0);
pvPointVarCorrelation.setIntervalValue1(0.0);
} }
pvPointVarCorrelation.setRecDate(new Date()); pvPointVarCorrelation.setRecDate(new Date());
idxBizPvPointProcessVariableClassificationService.saveOrUpdate(pvPointVarCorrelation);
idxBizPvPointVarCorrelationService.saveOrUpdate(pvPointVarCorrelation);
log.info("表数据已更新"); log.info("表数据已更新");
log.info("kafka消费zhTestGroup消息{}", consumerRecord);
} }
});
} catch (Exception e) { } catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecord, e); log.error("kafka失败,当前失败的批次");
} finally { } finally {
redisUtils.expire(kafkaTopicConsumerPv, 600); redisUtils.expire(kafkaTopicConsumerGKHFPv, 600);
} }
return true; return true;
} }
...@@ -217,7 +641,49 @@ public class KafkaConsumerService { ...@@ -217,7 +641,49 @@ public class KafkaConsumerService {
/**
* 批量消费kafka消息 【风电 中心值 】
*
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "ZXZFanConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerZXZFan)
public void listenZXZFan(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
ack.acknowledge();
Map<String, Set<String>> gatewayPoints = new HashMap<>();
Map<String, List<IdxBizFanPointProcessVariableClassification>> zxzIds = new HashMap<>();
consumerRecords.stream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizFanPointProcessVariableClassification fanPointProcessVariable = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointProcessVariableClassification.class);
List<IdxBizFanPointVarCorrelation> gongkuangList = idxBizFanPointVarCorrelationService.list(new QueryWrapper<IdxBizFanPointVarCorrelation>().eq("ANALYSIS_GATEWAY_ID", fanPointProcessVariable.getGatewayId()).eq("ANALYSIS_POINT_ID", fanPointProcessVariable.getSequenceNbr()).orderByDesc("CORRELATION_COEFFICIENT").last("limit 3"));
List<String> processPointIds = gongkuangList.stream().map(idxBizFanPointVarCorrelation -> idxBizFanPointVarCorrelation.getProcessPointId().toString()).collect(Collectors.toList());
List<IdxBizFanPointProcessVariableClassification> idxBizFanPointProcessVariableClassificationList = idxBizFanPointProcessVariableClassificationService.list(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().in("SEQUENCE_NBR", processPointIds));
idxBizFanPointProcessVariableClassificationList.add(fanPointProcessVariable);
zxzIds.put(fanPointProcessVariable.getSequenceNbr(), idxBizFanPointProcessVariableClassificationList);
idxBizFanPointProcessVariableClassificationList.forEach(item -> {
Set<String> idSet = null;
if (gatewayPoints.containsKey(fanPointProcessVariable.getGatewayId())) {
idSet = gatewayPoints.get(fanPointProcessVariable.getGatewayId());
} else {
idSet = new HashSet<>();
}
idSet.add(item.getIndexAddress());
gatewayPoints.put(fanPointProcessVariable.getGatewayId(), idSet);
});
}
});
buildZXZExecData(consumerRecords, gatewayPoints, zxzIds, "ZXZFanConsumer");
} catch (Exception e) {
e.printStackTrace();
}
}
...@@ -227,29 +693,277 @@ public class KafkaConsumerService { ...@@ -227,29 +693,277 @@ public class KafkaConsumerService {
/** /**
* 批量消费kafka消息 【风电 工况划分 * 批量消费kafka消息 【光伏 中心值
* *
* @param consumerRecords messages * @param consumerRecords messages
* @param ack ack * @param ack ack
*/ */
@KafkaListener(id = "GKHFFanConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerGKHFFan) @KafkaListener(id = "ZXZPvConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerZXZPv)
public void listenGKHFFan(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) { public void listenZXZPv(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
ack.acknowledge(); ack.acknowledge();
Map<String, Set<String>> gatewayPoints = new HashMap<>();
Map<String, List<IdxBizPvPointProcessVariableClassification>> zxzIds = new HashMap<>();
consumerRecords.stream().forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
IdxBizPvPointProcessVariableClassification pvPointProcessVariable = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointProcessVariableClassification.class);
List<IdxBizFanPointVarCorrelation> gongkuangList = idxBizFanPointVarCorrelationService.list(new QueryWrapper<IdxBizFanPointVarCorrelation>().eq("ANALYSIS_GATEWAY_ID", pvPointProcessVariable.getGatewayId()).eq("ANALYSIS_POINT_ID", pvPointProcessVariable.getSequenceNbr()).orderByDesc("CORRELATION_COEFFICIENT").last("limit 3"));
List<String> processPointIds = gongkuangList.stream().map(idxBizFanPointVarCorrelation -> idxBizFanPointVarCorrelation.getProcessPointId().toString()).collect(Collectors.toList());
List<IdxBizPvPointProcessVariableClassification> idxBizPvPointProcessVariableClassificationList = idxBizPvPointProcessVariableClassificationService.list(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().in("SEQUENCE_NBR", processPointIds));
idxBizPvPointProcessVariableClassificationList.add(pvPointProcessVariable);
zxzIds.put(pvPointProcessVariable.getSequenceNbr(), idxBizPvPointProcessVariableClassificationList);
idxBizPvPointProcessVariableClassificationList.forEach(item -> {
Set<String> idSet = null;
if (gatewayPoints.containsKey(pvPointProcessVariable.getGatewayId())) {
idSet = gatewayPoints.get(pvPointProcessVariable.getGatewayId());
} else {
idSet = new HashSet<>();
}
idSet.add(item.getIndexAddress());
gatewayPoints.put(pvPointProcessVariable.getGatewayId(), idSet);
});
}
});
buildZXZPvExecData(consumerRecords, gatewayPoints, zxzIds, "ZXZFanConsumer");
} catch (Exception e) {
e.printStackTrace();
}
}
private void buildZXZPvExecData(List<ConsumerRecord<String, String>> consumerRecords, Map<String, Set<String>> gatewayPoints, Map<String, List<IdxBizPvPointProcessVariableClassification>> zxzIds, String xgxPvConsumer) {
for (String gatewayId : gatewayPoints.keySet()) {
String join = String.join(",", gatewayPoints.get(gatewayId));
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId);
JsonReadOptions options = JsonReadOptions.builderFromString(JSON.toJSONString(indicatorData))
.columnTypes(new Function<String, ColumnType>() {
@Override
public ColumnType apply(String t) {
if (t.equals("value")) {
return ColumnType.DOUBLE;
}
return ColumnType.STRING;
}
}).build();
Table table = Table.read().usingOptions(options);
queue.add(new PointData(consumerRecords, table, xgxPvConsumer, zxzIds, ""));
}
}
/**
* 中心值 - 光伏
* @param consumerRecords
* @param pointsData
*/
private void consumerRecordsZXZPv(List<ConsumerRecord<String, String>> consumerRecords, PointData pointsData ) {
Table table = pointsData.getTable();
Map<String, List<IdxBizPvPointProcessVariableClassification>> zxzIds = pointsData.getZxzIdsPv();
for (String id : zxzIds.keySet()) {
List<IdxBizPvPointProcessVariableClassification> variableClassificationList = zxzIds.get(id);
String analysisVariableId = id;
List<IdxBizPvPointProcessVariableClassification> processVariableList = variableClassificationList.stream().filter(v -> !id.equals(v.getSequenceNbr().toString())).collect(Collectors.toList());
IdxBizPvPointProcessVariableClassification analysisVariable = variableClassificationList.stream().filter(v -> id.equals(v.getSequenceNbr().toString())).findFirst().get();
Map<String, Object> data1 = new HashMap<>();
Map<String, Object> data2 = new HashMap<>();
int index = 1;
Table dataTable = Table.create();
int minRow = 0;
for (IdxBizPvPointProcessVariableClassification processVariable : processVariableList) {
Selection selection = table.stringColumn("id").isEqualTo(processVariable.getIndexAddress() + "_" + processVariable.getGatewayId());
DoubleColumn values = table.where(selection).doubleColumn("value");
// 获取最小数据长度
if (index == 1) {
minRow = values.size();
} else {
minRow = minRow > values.size() ? values.size() : minRow;
}
values.setName("processVariable" + index);
if (!dataTable.isEmpty() && dataTable.rowCount() < values.size()) {
dataTable.addColumns(values.inRange(0, dataTable.rowCount()));
} else {
dataTable.addColumns(values);
}
data1.put("processVariable" + index + "Id", processVariable.getSequenceNbr());
// 构建工况区间数组
List<Object> IntervalValues = new ArrayList<>();
IntervalValues.add(processVariable.getIntervalValue1());
IntervalValues.add(processVariable.getIntervalValue2());
IntervalValues.add(processVariable.getIntervalValue3());
IntervalValues.add(processVariable.getIntervalValue4());
data2.put("processVariable" + index, IntervalValues);
index++;
}
Selection selection = table.stringColumn("id").isEqualTo(analysisVariable.getIndexAddress() + "_" + analysisVariable.getGatewayId());
DoubleColumn values = table.where(selection).doubleColumn("value");
values.setName("analysisVariable");
if (!dataTable.isEmpty() && dataTable.rowCount() < values.size()) {
dataTable.addColumns(values.inRange(0, dataTable.rowCount()));
} else {
dataTable.addColumns(values);
}
data1.put("analysisVariableId", analysisVariable.getSequenceNbr());
// 获取相同长度的数据
dataTable = dataTable.inRange(0, minRow);
List<String> list = dataTable.columnNames();
for (String column : list) {
data1.put(column, dataTable.doubleColumn(column).asDoubleArray());
}
Map<String,Object> requestMap = new HashMap<>();
requestMap.put("data1", data1);
requestMap.put("data2", data2);
String response = HttpUtil.createPost(zxzJsUrlFanBySF).body(JSON.toJSONString(requestMap)).execute().body();
if (response.contains("stdDev")) {
idxBizPvPointVarCentralValueMapper.delete(new QueryWrapper<IdxBizPvPointVarCentralValue>().eq("ANALYSIS_POINT_ID", analysisVariable.getSequenceNbr()));
JSONObject jsonObject = JSON.parseObject(response);
int length = jsonObject.getJSONArray("stdDev").size();
List<IdxBizPvPointVarCentralValue> insertList = new ArrayList<>();
for (int i = 0; i < length; i++) {
IdxBizPvPointVarCentralValue idxBizPvPointVarCentralValue = new IdxBizPvPointVarCentralValue();
idxBizPvPointVarCentralValue.setProcess1Min(jsonObject.getJSONArray("process1Min").getDoubleValue(i));
idxBizPvPointVarCentralValue.setProcess2Min(jsonObject.getJSONArray("process2Min").getDoubleValue(i));
idxBizPvPointVarCentralValue.setProcess3Min(jsonObject.getJSONArray("process3Min").getDoubleValue(i));
idxBizPvPointVarCentralValue.setProcess1Max(jsonObject.getJSONArray("process1Max").getDoubleValue(i));
idxBizPvPointVarCentralValue.setProcess2Max(jsonObject.getJSONArray("process2Max").getDoubleValue(i));
idxBizPvPointVarCentralValue.setProcess3Max(jsonObject.getJSONArray("process3Max").getDoubleValue(i));
idxBizPvPointVarCentralValue.setAnalysisPointId(jsonObject.getString("analysisVariableId"));
idxBizPvPointVarCentralValue.setAnalysisPointIdName(analysisVariable.getPointName());
idxBizPvPointVarCentralValue.setProcessPoint1Id(jsonObject.getString("processVariable1Id"));
idxBizPvPointVarCentralValue.setProcessPoint1IdName(processVariableList.get(0).getPointName());
idxBizPvPointVarCentralValue.setProcessPoint2Id(jsonObject.getString("processVariable2Id"));
idxBizPvPointVarCentralValue.setProcessPoint2IdName(processVariableList.get(1).getPointName());
idxBizPvPointVarCentralValue.setProcessPoint3Id(jsonObject.getString("processVariable3Id"));
idxBizPvPointVarCentralValue.setProcessPoint3IdName(processVariableList.get(2).getPointName());
idxBizPvPointVarCentralValue.setAnalysisStdDev(jsonObject.getJSONArray("stdDev").getDoubleValue(i));
idxBizPvPointVarCentralValue.setAnalysisCenterValue(jsonObject.getJSONArray("centerValue").getDoubleValue(i));
idxBizPvPointVarCentralValue.setArae(analysisVariable.getArae());
idxBizPvPointVarCentralValue.setStation(analysisVariable.getStation());
idxBizPvPointVarCentralValue.setSubarray(analysisVariable.getSubarray());
idxBizPvPointVarCentralValue.setManufacturer(analysisVariable.getManufacturer());
idxBizPvPointVarCentralValue.setEquipmentName(analysisVariable.getEquipmentName());
insertList.add(idxBizPvPointVarCentralValue);
}
if (CollectionUtils.isNotEmpty(insertList)) {
idxBizPvPointVarCentralValueService.saveBatch(insertList);
}
}
}
redisUtils.expire(kafkaTopicConsumerZXZPv, 600);
}
/**
* 风电处理消息 - 弃用
* @param consumerRecord
* @return
*/
boolean consumerRecords(ConsumerRecord<String, String> consumerRecord) {
try { try {
CompletableFuture[] completableFutures = consumerRecords.stream().map(m -> CompletableFuture.supplyAsync(() -> consumerRecordsGKFXFan(m), service)).toArray(CompletableFuture[]::new); Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
CompletableFuture.allOf(completableFutures).join(); if (kafkaMessage.isPresent()) {
} finally { IdxBizFanPointVarCorrelation fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointVarCorrelation.class);
List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataById(fanPointVarCorrelation.getAnalysisIndexAddress() + "_" + fanPointVarCorrelation.getAnalysisGatewayId());
// List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataByAddressAndtimeNotDate(fanPointVarCorrelation.getAnalysisIndexAddress(), fanPointVarCorrelation.getAnalysisGatewayId());
List<IndicatorData> tdengineData2 = indicatorDataMapper.selectDataById(fanPointVarCorrelation.getProcessIndexAddress() + "_" + fanPointVarCorrelation.getProcessGatewayId());
// List<IndicatorData> tdengineData2 = indicatorDataMapper.selectDataByAddressAndtimeNotDate(fanPointVarCorrelation.getProcessIndexAddress(), fanPointVarCorrelation.getProcessGatewayId());
List<Double> data1 = tdengineData1.stream().map(t -> Double.parseDouble(t.getValue())).collect(Collectors.toList());
List<Double> data2 = tdengineData2.stream().map(t -> Double.parseDouble(t.getValue())).collect(Collectors.toList());
// List<Double> data1 = new ArrayList<>();
// List<Double> data2 = new ArrayList<>();
// tdengineData1.forEach(item -> {
// if (item.getAddress().equals(fanPointVarCorrelation.getAnalysisIndexAddress()) && item.getGatewayId().equals(fanPointVarCorrelation.getAnalysisGatewayId())) {
// data1.add(Double.parseDouble(item.getValue()));
// } else {
// data2.add(Double.parseDouble(item.getValue()));
// }
// });
if (data1.size() < data2.size()) {
Integer a = data2.size() - data1.size();
for (int i = 0; i < a; i++) {
data2.remove(0);
}
} else if (data2.size() < data1.size()) {
Integer a = data1.size() - data2.size();
for (int i = 0; i < a; i++) {
data1.remove(0);
}
}
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("data1", data1);
resultMap.put("data2", data2);
String response = HttpUtil.createPost(baseUrlXGX).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("correlation") && !response.contains("warning")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
fanPointVarCorrelation.setCorrelationCoefficient(jsonObject.getDoubleValue("correlation"));
log.info("------------------------------------------风机相关性::计算成功,待更新表数据----------------------------------------");
} else {
fanPointVarCorrelation.setCorrelationCoefficient(0.0);
}
fanPointVarCorrelation.setRecDate(new Date());
idxBizFanPointVarCorrelationService.saveOrUpdate(fanPointVarCorrelation);
log.info("表数据已更新");
log.info("----------------------------风机相关性--------------分析变量与工况变量相关性分析算法结束----------------------------------------");
log.info("kafka消费zhTestGroup消息{}", consumerRecord);
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecord, e);
} finally {
redisUtils.expire(kafkaTopicConsumer, 600);
} }
return true;
} }
/** /**
* 风电 工况划分 处理 * 风电 工况划分 处理 -弃用
* @param consumerRecord * @param consumerRecord
* @return * @return
*/ */
boolean consumerRecordsGKFXFan(ConsumerRecord<String, String> consumerRecord) { boolean consumerRecordsGKFXFan(ConsumerRecord<String, String> consumerRecord) {
try { try {
String startTime = DateUtils.convertDateToString(DateUtil.offsetMonth(new Date(), -lastMonthNum), DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) { if (kafkaMessage.isPresent()) {
IdxBizFanPointProcessVariableClassification fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointProcessVariableClassification.class); IdxBizFanPointProcessVariableClassification fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointProcessVariableClassification.class);
...@@ -289,38 +1003,75 @@ public class KafkaConsumerService { ...@@ -289,38 +1003,75 @@ public class KafkaConsumerService {
} }
/** /**
* 批量消费kafka消息 【光伏 工况划分 】 * 光伏处理消息 - 弃用
* * @param consumerRecord
* @param consumerRecords messages * @return
* @param ack ack
*/ */
@KafkaListener(id = "GKHFPvConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerGKHFPv) boolean consumerRecordsPv(ConsumerRecord<String, String> consumerRecord) {
public void listenGKHFPv(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
ack.acknowledge(); String startTime = DateUtils.convertDateToString(DateUtil.offsetMonth(new Date(), -lastMonthNum), DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
try { try {
CompletableFuture[] completableFutures = consumerRecords.stream().map(m -> CompletableFuture.supplyAsync(() -> consumerRecordsGKFXPv(m), service)).toArray(CompletableFuture[]::new); Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
CompletableFuture.allOf(completableFutures).join(); if (kafkaMessage.isPresent()) {
} finally { IdxBizPvPointVarCorrelation pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointVarCorrelation.class);
List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataByAddressAndtimeNew(pvPointVarCorrelation.getAnalysisIndexAddress().toString(), startTime, endTime, pvPointVarCorrelation.getAnalysisGatewayId(), pvPointVarCorrelation.getProcessGatewayId(), pvPointVarCorrelation.getProcessIndexAddress());
List<Double> data1 = new ArrayList<>();
List<Double> data2 = new ArrayList<>();
tdengineData1.forEach(item -> {
if (item.getAddress().equals(pvPointVarCorrelation.getAnalysisIndexAddress()) && item.getGatewayId().equals(pvPointVarCorrelation.getAnalysisGatewayId())) {
data1.add(Double.parseDouble(item.getValue()));
} else {
data2.add(Double.parseDouble(item.getValue()));
}
});
if (data1.size() < data2.size()) {
Integer a = data2.size() - data1.size();
for (int i = 0; i < a; i++) {
data2.remove(0);
}
} else if (data2.size() < data1.size()) {
Integer a = data1.size() - data2.size();
for (int i = 0; i < a; i++) {
data1.remove(0);
}
}
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("data1", data1);
resultMap.put("data2", data2);
String response = HttpUtil.createPost(baseUrlXGX).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("correlation") && !response.contains("warning")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
pvPointVarCorrelation.setCorrelationCoefficient(jsonObject.getDoubleValue("correlation"));
log.info("------------------------------------------光伏相关性::计算成功,待更新表数据----------------------------------------");
} else {
pvPointVarCorrelation.setCorrelationCoefficient(0.0);
}
pvPointVarCorrelation.setRecDate(new Date());
idxBizPvPointVarCorrelationService.saveOrUpdate(pvPointVarCorrelation);
log.info("表数据已更新");
log.info("kafka消费zhTestGroup消息{}", consumerRecord);
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecord, e);
} finally {
redisUtils.expire(kafkaTopicConsumerPv, 600);
} }
return true;
} }
/** /**
* 光伏 工况划分 处理 * 光伏 工况划分 处理 - 弃用
* @param consumerRecord * @param consumerRecord
* @return * @return
*/ */
boolean consumerRecordsGKFXPv(ConsumerRecord<String, String> consumerRecord) { boolean consumerRecordsGKFXPv(ConsumerRecord<String, String> consumerRecord) {
try { try {String startTime = DateUtils.convertDateToString(DateUtil.offsetMonth(new Date(), -lastMonthNum), DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) { if (kafkaMessage.isPresent()) {
IdxBizPvPointProcessVariableClassification pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointProcessVariableClassification.class); IdxBizPvPointProcessVariableClassification pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointProcessVariableClassification.class);
...@@ -360,18 +1111,6 @@ public class KafkaConsumerService { ...@@ -360,18 +1111,6 @@ public class KafkaConsumerService {
} }
} }
...@@ -91,4 +91,7 @@ public interface IdxBizFanHealthIndexMapper extends BaseMapper<IdxBizFanHealthIn ...@@ -91,4 +91,7 @@ public interface IdxBizFanHealthIndexMapper extends BaseMapper<IdxBizFanHealthIn
List<IdxBizFanHealthLevel> getHealthLevelInfoList(); List<IdxBizFanHealthLevel> getHealthLevelInfoList();
List<String> getAddressInfo();
} }
...@@ -23,4 +23,11 @@ public interface IndicatorDataMapper extends BaseMapper<IndicatorData> { ...@@ -23,4 +23,11 @@ public interface IndicatorDataMapper extends BaseMapper<IndicatorData> {
List<IndicatorData> selectDataByAddressAndtimeNew(@Param("address")String address,@Param("startTime") String startTime, @Param("endTime")String endTime, @Param("gatewayId")String gatewayId, @Param("gatewayId1")String gatewayId1, @Param("address1")String address1); List<IndicatorData> selectDataByAddressAndtimeNew(@Param("address")String address,@Param("startTime") String startTime, @Param("endTime")String endTime, @Param("gatewayId")String gatewayId, @Param("gatewayId1")String gatewayId1, @Param("address1")String address1);
@Select("select `value` from iot_data.indicator_data where id =#{id}")
List<IndicatorData> selectDataById (@Param("id")String id);
@Select("select `id`, `value` from iot_data.indicator_data where `address` in (${addresses}) and gateway_id = #{gatewayId}")
List<IndicatorData> selectByAddresses(@Param("addresses") String addresses, @Param("gatewayId") String gatewayId);
} }
...@@ -142,27 +142,33 @@ pictureUrl=upload/jxiop/syz/ ...@@ -142,27 +142,33 @@ pictureUrl=upload/jxiop/syz/
#kafka #kafka
spring.kafka.bootstrap-servers=172.16.10.215:9092 spring.kafka.bootstrap-servers=139.9.173.44:9092
spring.kafka.producer.retries=1 spring.kafka.producer.retries=1
spring.kafka.producer.bootstrap-servers=172.16.10.215:9092 spring.kafka.producer.bootstrap-servers=139.9.173.44:9092
spring.kafka.producer.batch-size=16384 spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1 spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=consumerGroup spring.kafka.consumer.group-id=consumerGroup
spring.kafka.consumer.bootstrap-servers=172.16.10.215:9092 spring.kafka.consumer.bootstrap-servers=139.9.173.44:9092
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.ack-mode=manual_immediate spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch spring.kafka.listener.type=batch
#一次拉取数量 && 线程数量
spring.kafka.consumer.max-poll-records=100 spring.kafka.consumer.max-poll-records=30
spring.kafka.consumer.fetch-max-wait= 10000 #spring.kafka.consumer.fetch-max-wait= 10000
#当前时间向前偏移月数 向历史偏移月数 #当前时间向前偏移月数 向历史偏移月数
last.month.num = 12 last.month.num = 12
#相关性 算法调用
base.url.XGX=http://139.9.171.247:8052/intelligent-analysis/correlation
#工况划分 算法调用地址
base.url.GKHF=http://139.9.171.247:8052/intelligent-analysis/working-condition-division
#相关性 算法调用
base.url.ZXZ=http://139.9.171.247:8052/intelligent-analysis/central-value
...@@ -767,4 +767,8 @@ ...@@ -767,4 +767,8 @@
STATUS IS NOT NULL STATUS IS NOT NULL
OR STATUS != '' OR STATUS != ''
</select> </select>
<select id="getAddressInfo" resultType="java.lang.String">
select index_address from wl_equipment_specific_index where gateway_id = '1668801435891929089' and data_type = 'analog' limit 100
</select>
</mapper> </mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment