Commit 70fd8fd0 authored by zhangsen's avatar zhangsen

计算问题处理

parent 8ee7ed3d
...@@ -47,6 +47,17 @@ ...@@ -47,6 +47,17 @@
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>3.2.4</version> <version>3.2.4</version>
</dependency> </dependency>
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package com.yeejoin.amos.boot.module.jxiop.biz.controller; package com.yeejoin.amos.boot.module.jxiop.biz.controller;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.boot.biz.common.controller.BaseController; import com.yeejoin.amos.boot.biz.common.controller.BaseController;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils; import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
...@@ -67,6 +68,30 @@ public class AnalyseController extends BaseController { ...@@ -67,6 +68,30 @@ public class AnalyseController extends BaseController {
} }
return ResponseHelper.buildResponse(commonServiceImpl.getFanConditionVariablesByTimeAnalyseThread(startTime, endTime)); return ResponseHelper.buildResponse(commonServiceImpl.getFanConditionVariablesByTimeAnalyseThread(startTime, endTime));
} }
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/getFanConditionVariablesByTimeAnalyseNew")
@ApiOperation(httpMethod = "POST", value = "相关性分析 - 风机 - 新", notes = "相关性分析 - 风机 - 新")
public ResponseModel<Object> getFanConditionVariablesByTimeAnalyseNew() throws InterruptedException {
commonServiceImpl.chuli();
return ResponseHelper.buildResponse(null);
}
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@ApiOperation(httpMethod = "GET", value = "相关性分析 - 光伏 - 新", notes = "相关性分析 - 光伏 - 新")
@GetMapping(value = "/getPvConditionVariablesByTimeAnalyseNew")
public ResponseModel<String> getPvConditionVariablesByTimeAnalyseNew(@RequestParam(required = false) String startTime, @RequestParam(required = false) String endTime) throws InterruptedException {
if (StringUtils.isEmpty(startTime) && StringUtils.isEmpty(endTime) ){
startTime = DateUtils.convertDateToString(DateUtil.beginOfYear(new Date()), DateUtils.DATE_TIME_PATTERN);
endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
}
commonServiceImpl.chuliPv(startTime, endTime);
return ResponseHelper.buildResponse(commonServiceImpl.getPvConditionVariablesByTimeAnalyseThread(startTime, endTime));
}
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false) @TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@ApiOperation(httpMethod = "GET", value = "相关性分析-风机", notes = "相关性分析-风机") @ApiOperation(httpMethod = "GET", value = "相关性分析-风机", notes = "相关性分析-风机")
@GetMapping(value = "/getPvConditionVariablesByTimeAnalyse") @GetMapping(value = "/getPvConditionVariablesByTimeAnalyse")
......
package com.yeejoin.amos.boot.module.jxiop.biz.controller;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.kafka.FanConditionVariablesMessage;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.typroject.tyboot.core.foundation.context.RequestContext;
import org.typroject.tyboot.core.foundation.enumeration.UserType;
import org.typroject.tyboot.core.restful.doc.TycloudOperation;
import org.typroject.tyboot.core.restful.utils.ResponseHelper;
import org.typroject.tyboot.core.restful.utils.ResponseModel;
import static com.yeejoin.amos.boot.module.jxiop.biz.kafka.Constant.*;
@RestController
@Api(tags = "智能分析")
@RequestMapping(value = "/kafkaAnalyse")
public class KafkaAnalyseController {
@Autowired
FanConditionVariablesMessage fanConditionVariablesMessage;
@Autowired
RedisUtils redisUtils;
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/getFanConditionVariables")
@ApiOperation(httpMethod = "POST", value = "计算相关性分析 - 风机 - 新", notes = "计算相关性分析 - 风机 - 新")
public ResponseModel<Object> getFanConditionVariables() {
if (redisUtils.hasKey(kafkaTopicConsumer)) {
return ResponseHelper.buildResponse("计算中");
}
fanConditionVariablesMessage.getFanConditionVariables();
redisUtils.set(kafkaTopicConsumer, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("计算中");
}
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/getPvConditionVariables")
@ApiOperation(httpMethod = "POST", value = "计算相关性分析 - 光伏 - 新", notes = "计算相关性分析 - 光伏 - 新")
public ResponseModel<Object> getPvConditionVariables() {
if (redisUtils.hasKey(kafkaTopicConsumerPv)) {
return ResponseHelper.buildResponse("计算中");
}
fanConditionVariablesMessage.getPvConditionVariables();
redisUtils.set(kafkaTopicConsumerPv, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("计算中");
}
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/getFanConditionVariablesGKHF")
@ApiOperation(httpMethod = "POST", value = "工况划分 - 风电 - 新", notes = "工况划分 - 风电 - 新")
public ResponseModel<Object> getFanConditionVariablesGKHF() {
if (redisUtils.hasKey(kafkaTopicConsumerGKHFFan)) {
return ResponseHelper.buildResponse("计算中");
}
fanConditionVariablesMessage.getFanConditionVariablesGKHF();
redisUtils.set(kafkaTopicConsumerGKHFFan, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("计算中");
}
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/getPvConditionVariablesPvGKHF")
@ApiOperation(httpMethod = "POST", value = "工况划分 - 光伏 - 新", notes = "工况划分 - 光伏 - 新")
public ResponseModel<Object> getPvConditionVariablesPvGKFX() {
if (redisUtils.hasKey(kafkaTopicConsumerGKHFPv)) {
return ResponseHelper.buildResponse("计算中");
}
fanConditionVariablesMessage.getPvConditionVariablesPvGKFX();
redisUtils.set(kafkaTopicConsumerGKHFPv, RequestContext.getTraceId(), 300);
return ResponseHelper.buildResponse("计算中");
}
}
package com.yeejoin.amos.boot.module.jxiop.biz.kafka;
import cn.hutool.core.date.DateUtil;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import org.springframework.beans.factory.annotation.Value;
import java.util.Date;
/**
* @author LiuLin
* @date 2023年09月02日 11:02
*/
public interface Constant {
String INSERT = "INSERT";
String UPDATE = "UPDATE";
String DATA = "data";
String TOPIC = "topic";
String TABLE = "table";
String TYPE = "type";
String DB_TYPE = "dbType";
String BODY = "body";
String DATA_TYPE = "datatype";
String STATE = "state";
// 风电相关性消费者
String kafkaTopicConsumer = "FanConditionVariables";
// 光伏相关性消费者
String kafkaTopicConsumerPv = "PvConditionVariables";
// 风电 工况区间划分
String kafkaTopicConsumerGKHFFan = "FanConditionVariablesGKHF";
// 光伏 工况区间划分
String kafkaTopicConsumerGKHFPv = "PvConditionVariablesGKHF";
@Value("${last.month.num:12}")
public Integer lastMonthNum = 12;
String startTime = DateUtils.convertDateToString(DateUtil.offsetMonth(new Date(), -lastMonthNum), DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
// 相关性
String baseUrlXGX = "http://139.9.171.247:8052/intelligent-analysis/correlation";
// 工况划分
String baseUrlGKHF = "http://139.9.171.247:8052/intelligent-analysis/working-condition-division";
}
package com.yeejoin.amos.boot.module.jxiop.biz.kafka;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointProcessVariableClassification;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointVarCorrelation;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvPointProcessVariableClassification;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvPointVarCorrelation;
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizFanPointProcessVariableClassificationMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizFanPointVarCorrelationMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizPvPointProcessVariableClassificationMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizPvPointVarCorrelationMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import static com.yeejoin.amos.boot.module.jxiop.biz.kafka.Constant.*;
@Service
@Slf4j
public class FanConditionVariablesMessage {
@Autowired
private IdxBizFanPointVarCorrelationMapper pointVarCorrelationMapper;
@Autowired
private IdxBizPvPointVarCorrelationMapper pointVarCorrelationMapperPv;
@Autowired
private IdxBizFanPointProcessVariableClassificationMapper classificationMapperFan;
@Autowired
private IdxBizPvPointProcessVariableClassificationMapper classificationMapperPv;
@Autowired
private KafkaProducerService kafkaProducerService;
// 相关性分析-风机入口
public void getFanConditionVariables() {
List<IdxBizFanPointVarCorrelation> pointVarCorrelationsList = pointVarCorrelationMapper.selectList(null);
pointVarCorrelationsList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumer, JSON.toJSONString(item)));
}
// 相关性分析-光伏入口
public void getPvConditionVariables() {
List<IdxBizPvPointVarCorrelation> pointVarCorrelationsList = pointVarCorrelationMapperPv.selectList(null);
pointVarCorrelationsList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerPv, JSON.toJSONString(item)));
}
// 工况划分 - 风电 - 新
public void getFanConditionVariablesGKHF() {
List<IdxBizFanPointProcessVariableClassification> variableClassificationList = classificationMapperFan.selectList(new QueryWrapper<IdxBizFanPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "工况变量"));
variableClassificationList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerGKHFFan, JSON.toJSONString(item)));
}
// 工况划分 - 光伏 - 新
public void getPvConditionVariablesPvGKFX() {
List<IdxBizPvPointProcessVariableClassification> variableClassificationList = classificationMapperPv.selectList(new QueryWrapper<IdxBizPvPointProcessVariableClassification>().isNotNull("SEQUENCE_NBR").eq("TAG_CODE", "工况变量"));
variableClassificationList.forEach(item -> kafkaProducerService.sendMessageAsync(kafkaTopicConsumerGKHFPv, JSON.toJSONString(item)));
}
}
package com.yeejoin.amos.boot.module.jxiop.biz.kafka;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.*;
import com.yeejoin.amos.boot.module.jxiop.biz.mapper2.IdxBizFanPointProcessVariableClassificationMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizFanPointProcessVariableClassificationServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizFanPointVarCorrelationServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizPvPointProcessVariableClassificationServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizPvPointVarCorrelationServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.tdmapper.IndicatorDataMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static com.yeejoin.amos.boot.module.jxiop.biz.kafka.Constant.*;
/**
* kafka 消费服务
*
* @author litw
* @create 2022/11/1 10:06
**/
@Slf4j
@Service
public class KafkaConsumerService {
@Autowired
private IndicatorDataMapper indicatorDataMapper;
@Autowired
RedisUtils redisUtils;
@Autowired
private IdxBizFanPointVarCorrelationServiceImpl idxBizFanPointVarCorrelationService;
@Autowired
private IdxBizPvPointVarCorrelationServiceImpl idxBizPvPointVarCorrelationService;
@Autowired
IdxBizFanPointProcessVariableClassificationServiceImpl idxBizFanPointProcessVariableClassificationService;
@Autowired
IdxBizPvPointProcessVariableClassificationServiceImpl idxBizPvPointProcessVariableClassificationService;
ExecutorService service = Executors.newFixedThreadPool(30);
/**
* 批量消费kafka消息 【风电站 相关性】
*
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "xgxFanConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumer)
public void listenXGXFan(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
ack.acknowledge();
try {
CompletableFuture[] completableFutures = consumerRecords.stream().map(m -> CompletableFuture.supplyAsync(() -> consumerRecords(m), service)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
} finally {
}
}
/**
* 风电处理消息
* @param consumerRecord
* @return
*/
boolean consumerRecords(ConsumerRecord<String, String> consumerRecord) {
try {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
IdxBizFanPointVarCorrelation fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointVarCorrelation.class);
List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataByAddressAndtimeNew(fanPointVarCorrelation.getAnalysisIndexAddress(), startTime, endTime, fanPointVarCorrelation.getAnalysisGatewayId(), fanPointVarCorrelation.getProcessGatewayId(), fanPointVarCorrelation.getProcessIndexAddress());
List<Double> data1 = new ArrayList<>();
List<Double> data2 = new ArrayList<>();
tdengineData1.forEach(item -> {
if (item.getAddress().equals(fanPointVarCorrelation.getAnalysisIndexAddress()) && item.getGatewayId().equals(fanPointVarCorrelation.getAnalysisGatewayId())) {
data1.add(Double.parseDouble(item.getValue()));
} else {
data2.add(Double.parseDouble(item.getValue()));
}
});
if (data1.size() < data2.size()) {
Integer a = data2.size() - data1.size();
for (int i = 0; i < a; i++) {
data2.remove(0);
}
} else if (data2.size() < data1.size()) {
Integer a = data1.size() - data2.size();
for (int i = 0; i < a; i++) {
data1.remove(0);
}
}
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("data1", data1);
resultMap.put("data2", data2);
String response = HttpUtil.createPost(baseUrlXGX).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("correlation") && !response.contains("warning")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
fanPointVarCorrelation.setCorrelationCoefficient(jsonObject.getDoubleValue("correlation"));
log.info("------------------------------------------风机相关性::计算成功,待更新表数据----------------------------------------");
} else {
fanPointVarCorrelation.setCorrelationCoefficient(0.0);
}
fanPointVarCorrelation.setRecDate(new Date());
idxBizFanPointVarCorrelationService.saveOrUpdate(fanPointVarCorrelation);
log.info("表数据已更新");
log.info("----------------------------风机相关性--------------分析变量与工况变量相关性分析算法结束----------------------------------------");
log.info("kafka消费zhTestGroup消息{}", consumerRecord);
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecord, e);
} finally {
redisUtils.expire(kafkaTopicConsumer, 600);
}
return true;
}
/**
* 批量消费kafka消息 【光伏相关性 】
*
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "xgxPvConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerPv)
public void listenXGXPv(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
ack.acknowledge();
try {
CompletableFuture[] completableFutures = consumerRecords.stream().map(m -> CompletableFuture.supplyAsync(() -> consumerRecordsPv(m), service)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
} finally {
}
}
/**
* 光伏处理消息
* @param consumerRecord
* @return
*/
boolean consumerRecordsPv(ConsumerRecord<String, String> consumerRecord) {
try {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
IdxBizPvPointVarCorrelation pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointVarCorrelation.class);
List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataByAddressAndtimeNew(pvPointVarCorrelation.getAnalysisIndexAddress().toString(), startTime, endTime, pvPointVarCorrelation.getAnalysisGatewayId(), pvPointVarCorrelation.getProcessGatewayId(), pvPointVarCorrelation.getProcessIndexAddress());
List<Double> data1 = new ArrayList<>();
List<Double> data2 = new ArrayList<>();
tdengineData1.forEach(item -> {
if (item.getAddress().equals(pvPointVarCorrelation.getAnalysisIndexAddress()) && item.getGatewayId().equals(pvPointVarCorrelation.getAnalysisGatewayId())) {
data1.add(Double.parseDouble(item.getValue()));
} else {
data2.add(Double.parseDouble(item.getValue()));
}
});
if (data1.size() < data2.size()) {
Integer a = data2.size() - data1.size();
for (int i = 0; i < a; i++) {
data2.remove(0);
}
} else if (data2.size() < data1.size()) {
Integer a = data1.size() - data2.size();
for (int i = 0; i < a; i++) {
data1.remove(0);
}
}
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("data1", data1);
resultMap.put("data2", data2);
String response = HttpUtil.createPost(baseUrlXGX).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("correlation") && !response.contains("warning")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
pvPointVarCorrelation.setCorrelationCoefficient(jsonObject.getDoubleValue("correlation"));
log.info("------------------------------------------光伏相关性::计算成功,待更新表数据----------------------------------------");
} else {
pvPointVarCorrelation.setCorrelationCoefficient(0.0);
}
pvPointVarCorrelation.setRecDate(new Date());
idxBizPvPointVarCorrelationService.saveOrUpdate(pvPointVarCorrelation);
log.info("表数据已更新");
log.info("kafka消费zhTestGroup消息{}", consumerRecord);
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecord, e);
} finally {
redisUtils.expire(kafkaTopicConsumerPv, 600);
}
return true;
}
/**
* 批量消费kafka消息 【风电 工况划分 】
*
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "GKHFFanConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerGKHFFan)
public void listenGKHFFan(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
ack.acknowledge();
try {
CompletableFuture[] completableFutures = consumerRecords.stream().map(m -> CompletableFuture.supplyAsync(() -> consumerRecordsGKFXFan(m), service)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
} finally {
}
}
/**
* 风电 工况划分 处理
* @param consumerRecord
* @return
*/
boolean consumerRecordsGKFXFan(ConsumerRecord<String, String> consumerRecord) {
try {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
IdxBizFanPointProcessVariableClassification fanPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizFanPointProcessVariableClassification.class);
List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataByAddressAndtime(fanPointVarCorrelation.getIndexAddress(), startTime, endTime, fanPointVarCorrelation.getGatewayId());
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("processVariable", tdengineData1.stream().map(t -> Double.parseDouble(t.getValue())).collect(Collectors.toList()));
resultMap.put("processVariableId", fanPointVarCorrelation.getSequenceNbr());
String response = HttpUtil.createPost(baseUrlGKHF).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("intervalValue1") && response.contains("processVariableId")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
fanPointVarCorrelation.setIntervalValue5(jsonObject.getDoubleValue("intervalValue5"));
fanPointVarCorrelation.setIntervalValue4(jsonObject.getDoubleValue("intervalValue4"));
fanPointVarCorrelation.setIntervalValue3(jsonObject.getDoubleValue("intervalValue3"));
fanPointVarCorrelation.setIntervalValue2(jsonObject.getDoubleValue("intervalValue2"));
fanPointVarCorrelation.setIntervalValue1(jsonObject.getDoubleValue("intervalValue1"));
log.info("------------------------------------------光伏相关性::计算成功,待更新表数据----------------------------------------");
} else {
fanPointVarCorrelation.setIntervalValue5(0.0);
fanPointVarCorrelation.setIntervalValue4(0.0);
fanPointVarCorrelation.setIntervalValue3(0.0);
fanPointVarCorrelation.setIntervalValue2(0.0);
fanPointVarCorrelation.setIntervalValue1(0.0);
}
fanPointVarCorrelation.setRecDate(new Date());
idxBizFanPointProcessVariableClassificationService.saveOrUpdate(fanPointVarCorrelation);
log.info("表数据已更新");
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecord, e);
} finally {
redisUtils.expire(kafkaTopicConsumerGKHFFan, 600);
}
return true;
}
/**
* 批量消费kafka消息 【光伏 工况划分 】
*
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "GKHFPvConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerGKHFPv)
public void listenGKHFPv(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
ack.acknowledge();
try {
CompletableFuture[] completableFutures = consumerRecords.stream().map(m -> CompletableFuture.supplyAsync(() -> consumerRecordsGKFXPv(m), service)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
} finally {
}
}
/**
* 光伏 工况划分 处理
* @param consumerRecord
* @return
*/
boolean consumerRecordsGKFXPv(ConsumerRecord<String, String> consumerRecord) {
try {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
IdxBizPvPointProcessVariableClassification pvPointVarCorrelation = JSON.parseObject(kafkaMessage.get().toString(), IdxBizPvPointProcessVariableClassification.class);
List<IndicatorData> tdengineData1 = indicatorDataMapper.selectDataByAddressAndtime(pvPointVarCorrelation.getIndexAddress(), startTime, endTime, pvPointVarCorrelation.getGatewayId());
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("processVariable", tdengineData1.stream().map(t -> Double.parseDouble(t.getValue())).collect(Collectors.toList()));
resultMap.put("processVariableId", pvPointVarCorrelation.getSequenceNbr());
String response = HttpUtil.createPost(baseUrlGKHF).body(JSON.toJSONString(resultMap)).execute().body();
if (response.contains("intervalValue1") && response.contains("processVariableId")) {
com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(response);
pvPointVarCorrelation.setIntervalValue5(jsonObject.getDoubleValue("intervalValue5"));
pvPointVarCorrelation.setIntervalValue4(jsonObject.getDoubleValue("intervalValue4"));
pvPointVarCorrelation.setIntervalValue3(jsonObject.getDoubleValue("intervalValue3"));
pvPointVarCorrelation.setIntervalValue2(jsonObject.getDoubleValue("intervalValue2"));
pvPointVarCorrelation.setIntervalValue1(jsonObject.getDoubleValue("intervalValue1"));
log.info("------------------------------------------光伏相关性::计算成功,待更新表数据----------------------------------------");
} else {
pvPointVarCorrelation.setIntervalValue5(0.0);
pvPointVarCorrelation.setIntervalValue4(0.0);
pvPointVarCorrelation.setIntervalValue3(0.0);
pvPointVarCorrelation.setIntervalValue2(0.0);
pvPointVarCorrelation.setIntervalValue1(0.0);
}
pvPointVarCorrelation.setRecDate(new Date());
idxBizPvPointProcessVariableClassificationService.saveOrUpdate(pvPointVarCorrelation);
log.info("表数据已更新");
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecord, e);
} finally {
redisUtils.expire(kafkaTopicConsumerGKHFPv, 600);
}
return true;
}
}
package com.yeejoin.amos.boot.module.jxiop.biz.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
/**
* kafka 生产服务
*
* @author litw
* @create 2022/11/1 10:06
**/
@Slf4j
@Service
public class KafkaProducerService {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息(异步)
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
//添加回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息(异步) failure! topic : {}, message: {}", topic, message);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
// log.info("发送消息(异步) success! topic: {}, message: {}", topic, message);
}
});
}
}
...@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; ...@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits; import org.springframework.data.elasticsearch.core.SearchHits;
...@@ -35,10 +36,11 @@ import org.springframework.util.ObjectUtils; ...@@ -35,10 +36,11 @@ import org.springframework.util.ObjectUtils;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Service @Service("commonServiceImpl")
@Configuration
public class CommonServiceImpl { public class CommonServiceImpl {
private static final HashMap<String, Object> cacheExecInfo = new HashMap<>(); private static final HashMap<String, Object> cacheExecInfo = new HashMap<>();
//utc时间格式 //utc时间格式
...@@ -52,6 +54,9 @@ public class CommonServiceImpl { ...@@ -52,6 +54,9 @@ public class CommonServiceImpl {
Integer zxzsleepTime; Integer zxzsleepTime;
@Value("${base.url:http://139.9.173.44:30009/maas/maas/processes/api/}") @Value("${base.url:http://139.9.173.44:30009/maas/maas/processes/api/}")
String baseUrl; String baseUrl;
String baseUrlXGX = "http://139.9.173.44:8052/intelligent-analysis/correlation";
//----------------工况变量工况变量划分请求属性配置------------------------ //----------------工况变量工况变量划分请求属性配置------------------------
@Value("${gkblhffan.url:74435221-796d-43c0-ae72-319792b8f89e}") @Value("${gkblhffan.url:74435221-796d-43c0-ae72-319792b8f89e}")
String gkqjhfurlfan; String gkqjhfurlfan;
...@@ -1104,7 +1109,7 @@ public class CommonServiceImpl { ...@@ -1104,7 +1109,7 @@ public class CommonServiceImpl {
} }
@Scheduled(cron = "0 0/10 * * * ?") @Scheduled(cron = "0 0/10 * * * ?")
private void healthWarningMinuteByGF() { public void healthWarningMinuteByGF() {
Date time = new Date(); Date time = new Date();
List<IdxBizPvPointProcessVariableClassificationDto> data = idxBizPvPointProcessVariableClassificationMapper.getInfluxDBData(); List<IdxBizPvPointProcessVariableClassificationDto> data = idxBizPvPointProcessVariableClassificationMapper.getInfluxDBData();
Map<String, List<IdxBizPvPointProcessVariableClassificationDto>> maps = data.stream().collect(Collectors.groupingBy(IdxBizPvPointProcessVariableClassificationDto::getGatewayId)); Map<String, List<IdxBizPvPointProcessVariableClassificationDto>> maps = data.stream().collect(Collectors.groupingBy(IdxBizPvPointProcessVariableClassificationDto::getGatewayId));
...@@ -1277,4 +1282,172 @@ public class CommonServiceImpl { ...@@ -1277,4 +1282,172 @@ public class CommonServiceImpl {
resultMap.put("title", indicatorData.getUnit()); resultMap.put("title", indicatorData.getUnit());
return resultMap; return resultMap;
} }
private static final BlockingQueue<IdxBizFanPointProcessVariableClassification> fifo = new LinkedBlockingQueue<>(5000);
private static int threadNumber = Runtime.getRuntime().availableProcessors() * 2;
public void chuli() throws InterruptedException {
this.getFanConditionVariablesByTimeAnalyseNew(null, null);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < threadNumber; i++) {
Consumer consumer = new Consumer(fifo, this);
service.execute(consumer);
}
Thread.sleep(200000);
// 退出Executor
service.shutdown();
}
public void info(String gatewayId, String startTime, String endTime, IdxBizFanPointProcessVariableClassification idxBizFanPointProcessVariableClassification){
logger.info("异步执行,gatewayId:{},分析Id:{}", gatewayId, idxBizFanPointProcessVariableClassification.getSequenceNbr());
List<IdxBizFanPointVarCorrelation> gongkuangList = idxBizFanPointVarCorrelationMapper.selectList(new QueryWrapper<IdxBizFanPointVarCorrelation>().eq("ANALYSIS_GATEWAY_ID", idxBizFanPointProcessVariableClassification.getGatewayId()).eq("ANALYSIS_POINT_ID", idxBizFanPointProcessVariableClassification.getSequenceNbr()));
if (gongkuangList.size() > 0) {
foreachHandlerConditionVariabAnalyseFanNew(gatewayId, gongkuangList, startTime, endTime, idxBizFanPointProcessVariableClassification);
}
}
//遍历处理数据-组装风机
public void foreachHandlerConditionVariabAnalyseFanNew(String tableName, List<IdxBizFanPointVarCorrelation> list, String startTime, String endTime, IdxBizFanPointProcessVariableClassification idxBizFanPointProcessVariableClassification) {
List<IndicatorData> returnList = indicatorDataMapper.selectDataByAddressAndtime(idxBizFanPointProcessVariableClassification.getIndexAddress(), startTime, endTime,tableName);
List<Double> data1 = returnList.stream().map(t -> Double.parseDouble(t.getValue())).collect(Collectors.toList());
list.forEach(idxBizFanPointVarCorrelation -> {
logger.info("---------------------------------风机相关性-----------开始查询influxdb--------------------------------");
List<IndicatorData> returnList1 = indicatorDataMapper.selectDataByAddressAndtime(idxBizFanPointVarCorrelation.getProcessIndexAddress(), startTime, endTime,tableName);
List<Double> data2 = returnList1.stream().map(t -> Double.parseDouble(t.getValue())).collect(Collectors.toList());
HashMap<String, Object> map1 = new HashMap<>();
map1.put("data1", data1);
map1.put("data2", data2);
if (!map1.isEmpty()) {
logger.info("------------------------------风机相关性------------分析变量与工况变量相关性分析算法开始----------------------------------------");
logger.info("------------------------------风机相关性------------分析变量与工况变量相关性分析算参数---------------------------------------" + JSON.toJSONString(map1));
String response = HttpUtil.createPost(baseUrlXGX).body(JSON.toJSONString(map1)).execute().body();
if (response.contains("correlation")) {
JSONObject jsonObject = JSONObject.parseObject(response);
idxBizFanPointVarCorrelation.setCorrelationCoefficient(jsonObject.getDoubleValue("correlation"));
logger.info("------------------------------------------风机相关性::相关性更新业务表成功----------------------------------------");
} else {
idxBizFanPointVarCorrelation.setCorrelationCoefficient(0.0);
}
idxBizFanPointVarCorrelation.setRecDate(new Date());
idxBizFanPointVarCorrelationMapper.updateById(idxBizFanPointVarCorrelation);
logger.info("----------------------------风机相关性--------------分析变量与工况变量相关性分析算法结束----------------------------------------");
} else {
idxBizFanPointVarCorrelation.setCorrelationCoefficient(0.0);
idxBizFanPointVarCorrelation.setRecDate(new Date());
idxBizFanPointVarCorrelationMapper.updateById(idxBizFanPointVarCorrelation);
}
});
}
//相关性分析-风机入口
public void getFanConditionVariablesByTimeAnalyseNew(String startTime, String endTime) {
try {
HashMap<String, List<IdxBizFanPointProcessVariableClassification>> idxBizFanPointProcessVariableClassificationHashMap = getIdxBizFanPointProcessVariableClassificationListOfAnaLyse();
idxBizFanPointProcessVariableClassificationHashMap.keySet().forEach(s -> {
List<IdxBizFanPointProcessVariableClassification> list = idxBizFanPointProcessVariableClassificationHashMap.get(s);
fifo.addAll(list);
});
cacheExecInfo.remove(SmartAnalyseEnum.FAN_XGX.getKey());
} catch (Exception exception) {
cacheExecInfo.remove(SmartAnalyseEnum.PV_XGX.getKey());
}
}
/**
* 以下为相关性-光伏-方法
*/
private static final BlockingQueue<IdxBizPvPointProcessVariableClassification> fifoPv = new LinkedBlockingQueue<>(5000);
public void chuliPv(String startTime, String endTime) throws InterruptedException {
this.getPvConditionVariablesByTimeAnalyseNew();
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < threadNumber; i++) {
ConsumerPv consumer = new ConsumerPv(fifoPv, this, startTime, endTime);
service.execute(consumer);
}
Thread.sleep(200000);
// 退出Executor
service.shutdown();
}
public void infoPv(String gatewayId, String startTime, String endTime, IdxBizPvPointProcessVariableClassification idxBizPvPointProcessVariableClassification){
logger.info("异步执行,gatewayId:{},分析Id:{}", gatewayId, idxBizPvPointProcessVariableClassification.getSequenceNbr());
List<IdxBizPvPointVarCorrelation> gongkuangList = idxBizPvPointVarCorrelationMapper.selectList(new QueryWrapper<IdxBizPvPointVarCorrelation>().eq("ANALYSIS_GATEWAY_ID", idxBizPvPointProcessVariableClassification.getGatewayId()).eq("ANALYSIS_POINT_ID", idxBizPvPointProcessVariableClassification.getSequenceNbr()));
if (gongkuangList.size() > 0) {
foreachHandlerConditionVariabAnalysePvNew(gatewayId, gongkuangList, startTime, endTime, idxBizPvPointProcessVariableClassification);
}
}
//遍历处理数据-组装风机
public void foreachHandlerConditionVariabAnalysePvNew(String tableName, List<IdxBizPvPointVarCorrelation> list, String startTime, String endTime, IdxBizPvPointProcessVariableClassification idxBizPvPointProcessVariableClassification) {
List<IndicatorData> returnList = indicatorDataMapper.selectDataByAddressAndtime(idxBizPvPointProcessVariableClassification.getIndexAddress(), startTime, endTime,tableName);
List<Double> data1 = returnList.stream().map(t -> Double.parseDouble(t.getValue())).collect(Collectors.toList());
list.forEach(idxBizPvPointVarCorrelation -> {
logger.info("---------------------------------风机相关性-----------开始查询influxdb--------------------------------");
List<IndicatorData> returnList1 = indicatorDataMapper.selectDataByAddressAndtime(idxBizPvPointVarCorrelation.getProcessIndexAddress(), startTime, endTime,tableName);
List<Double> data2 = returnList1.stream().map(t -> Double.parseDouble(t.getValue())).collect(Collectors.toList());
HashMap<String, Object> map1 = new HashMap<>();
map1.put("data1", data1);
map1.put("data2", data2);
if (!map1.isEmpty()) {
logger.info("------------------------------风机相关性------------分析变量与工况变量相关性分析算法开始----------------------------------------");
logger.info("------------------------------风机相关性------------分析变量与工况变量相关性分析算参数---------------------------------------" + JSON.toJSONString(map1));
String response = HttpUtil.createPost(baseUrlXGX).body(JSON.toJSONString(map1)).execute().body();
if (response.contains("correlation")) {
JSONObject jsonObject = JSONObject.parseObject(response);
idxBizPvPointVarCorrelation.setCorrelationCoefficient(jsonObject.getDoubleValue("correlation"));
logger.info("------------------------------------------风机相关性::相关性更新业务表成功----------------------------------------");
} else {
idxBizPvPointVarCorrelation.setCorrelationCoefficient(0.0);
}
idxBizPvPointVarCorrelation.setRecDate(new Date());
idxBizPvPointVarCorrelationMapper.updateById(idxBizPvPointVarCorrelation);
logger.info("----------------------------风机相关性--------------分析变量与工况变量相关性分析算法结束----------------------------------------");
} else {
idxBizPvPointVarCorrelation.setCorrelationCoefficient(0.0);
idxBizPvPointVarCorrelation.setRecDate(new Date());
idxBizPvPointVarCorrelationMapper.updateById(idxBizPvPointVarCorrelation);
}
});
}
//相关性分析-光伏
public void getPvConditionVariablesByTimeAnalyseNew() {
try {
HashMap<String, List<IdxBizFanPointProcessVariableClassification>> idxBizFanPointProcessVariableClassificationHashMap = getIdxBizFanPointProcessVariableClassificationListOfAnaLyse();
idxBizFanPointProcessVariableClassificationHashMap.keySet().forEach(s -> {
List<IdxBizFanPointProcessVariableClassification> list = idxBizFanPointProcessVariableClassificationHashMap.get(s);
fifo.addAll(list);
});
cacheExecInfo.remove(SmartAnalyseEnum.FAN_XGX.getKey());
} catch (Exception exception) {
cacheExecInfo.remove(SmartAnalyseEnum.PV_XGX.getKey());
}
}
} }
package com.yeejoin.amos.boot.module.jxiop.biz.service.impl;
import cn.hutool.core.date.DateUtil;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanPointProcessVariableClassification;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 消费者线程
*
* @author jackyuj
*/
public class Consumer implements Runnable {
private BlockingQueue<IdxBizFanPointProcessVariableClassification> queue = new LinkedBlockingQueue<>(5000);
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
//构造函数
public Consumer(BlockingQueue<IdxBizFanPointProcessVariableClassification> queue, CommonServiceImpl commonService) {
this.queue = queue;
this.commonService = commonService;
}
public final String startTime = DateUtils.convertDateToString(DateUtil.beginOfYear(new Date()), DateUtils.DATE_TIME_PATTERN);
public final String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
@Autowired
CommonServiceImpl commonService;
public void run() {
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
IdxBizFanPointProcessVariableClassification poll = queue.poll();//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
if (ObjectUtils.isNotEmpty(poll)) {
commonService.info(poll.getGatewayId(), startTime, endTime, poll);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunning = false;
}
}
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程!");
}
}
}
\ No newline at end of file
package com.yeejoin.amos.boot.module.jxiop.biz.service.impl;
import cn.hutool.core.date.DateUtil;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvPointProcessVariableClassification;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 消费者线程
*
* @author jackyuj
*/
public class ConsumerPv implements Runnable {
private BlockingQueue<IdxBizPvPointProcessVariableClassification> queue = new LinkedBlockingQueue<>(5000);
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
String startTime1 = DateUtils.convertDateToString(DateUtil.beginOfYear(new Date()), DateUtils.DATE_TIME_PATTERN);
String endTime1 = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()), DateUtils.DATE_TIME_PATTERN);
//构造函数
public ConsumerPv(BlockingQueue<IdxBizPvPointProcessVariableClassification> queue, CommonServiceImpl commonService, String startTime, String endTime) {
this.queue = queue;
this.commonService = commonService;
this.startTime1 = startTime;
this.endTime1 = endTime;
}
@Autowired
CommonServiceImpl commonService;
public void run() {
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
IdxBizPvPointProcessVariableClassification poll = queue.poll();//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
if (ObjectUtils.isNotEmpty(poll)) {
commonService.infoPv(poll.getGatewayId(), startTime1, endTime1, poll);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunning = false;
}
}
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程!");
}
}
}
\ No newline at end of file
...@@ -18,4 +18,9 @@ public interface IndicatorDataMapper extends BaseMapper<IndicatorData> { ...@@ -18,4 +18,9 @@ public interface IndicatorDataMapper extends BaseMapper<IndicatorData> {
@Select("select `unit` from iot_data.indicator_data where address=#{address} and gateway_id =#{gatewayId} limit 1") @Select("select `unit` from iot_data.indicator_data where address=#{address} and gateway_id =#{gatewayId} limit 1")
IndicatorData selectUnitByAddressAndGatewayId(@Param("address")String address, @Param("gatewayId")String gatewayId); IndicatorData selectUnitByAddressAndGatewayId(@Param("address")String address, @Param("gatewayId")String gatewayId);
@Select("select `value`, created_time as createdTime, `value_f` as valueF, `address`, gateway_id as gatewayId from iot_data.indicator_data where created_time >= #{startTime} and created_time <= #{endTime} and ((gateway_id =#{gatewayId} and address=#{address}) or (gateway_id =#{gatewayId1} and address=#{address1})) ")
List<IndicatorData> selectDataByAddressAndtimeNew(@Param("address")String address,@Param("startTime") String startTime, @Param("endTime")String endTime, @Param("gatewayId")String gatewayId, @Param("gatewayId1")String gatewayId1, @Param("address1")String address1);
} }
## DB properties: ## DB properties:
## db1-production database ## db1-production database
spring.db1.datasource.type: com.alibaba.druid.pool.DruidDataSource spring.db1.datasource.type: com.alibaba.druid.pool.DruidDataSource
spring.db1.datasource.url=jdbc:mysql://39.98.224.23:3306/production?allowMultiQueries=true&serverTimezone=GMT%2B8&characterEncoding=utf8 spring.db1.datasource.url=jdbc:mysql://139.9.173.44:3306/production?allowMultiQueries=true&serverTimezone=GMT%2B8&characterEncoding=utf8
spring.db1.datasource.username=root spring.db1.datasource.username=root
spring.db1.datasource.password=Yeejoin@2020 spring.db1.datasource.password=Yeejoin@2020
spring.db1.datasource.driver-class-name: com.mysql.cj.jdbc.Driver spring.db1.datasource.driver-class-name: com.mysql.cj.jdbc.Driver
## db2-sync_data ## db2-sync_data
spring.db2.datasource.type: com.alibaba.druid.pool.DruidDataSource spring.db2.datasource.type: com.alibaba.druid.pool.DruidDataSource
spring.db2.datasource.url=jdbc:mysql://36.40.66.175:3306/yeeamos_amos_idx_biz?allowMultiQueries=true&serverTimezone=GMT%2B8&characterEncoding=utf8 spring.db2.datasource.url=jdbc:mysql://139.9.173.44:3306/amos_idx_biz?allowMultiQueries=true&serverTimezone=GMT%2B8&characterEncoding=utf8
spring.db2.datasource.username=root spring.db2.datasource.username=root
spring.db2.datasource.password=Yeejoin@2020 spring.db2.datasource.password=Yeejoin@2020
spring.db2.datasource.driver-class-name: com.mysql.cj.jdbc.Driver spring.db2.datasource.driver-class-name: com.mysql.cj.jdbc.Driver
...@@ -53,10 +53,10 @@ lettuce.timeout=10000 ...@@ -53,10 +53,10 @@ lettuce.timeout=10000
emqx.clean-session=true emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]} emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
emqx.broker=tcp://172.16.10.253:1883 emqx.broker=tcp://172.16.10.220:1883
emqx.user-name=admin emqx.user-name=admin
emqx.password=public emqx.password=public
mqtt.scene.host=mqtt://172.16.10.253:8083/mqtt mqtt.scene.host=mqtt://172.16.10.220:8083/mqtt
mqtt.client.product.id=mqtt mqtt.client.product.id=mqtt
mqtt.topic=topic_mqtt mqtt.topic=topic_mqtt
spring.mqtt.completionTimeout=3000 spring.mqtt.completionTimeout=3000
...@@ -96,7 +96,7 @@ spring.db3.datasource.driver-class-name: com.taosdata.jdbc.rs.RestfulDriver ...@@ -96,7 +96,7 @@ spring.db3.datasource.driver-class-name: com.taosdata.jdbc.rs.RestfulDriver
#spring.influx.bufferLimit=20000 #spring.influx.bufferLimit=20000
spring.influx.url=http://172.16.10.253:8086 spring.influx.url=http://139.9.173.44:8086
spring.influx.password=Yeejoin@2020 spring.influx.password=Yeejoin@2020
spring.influx.user=root spring.influx.user=root
spring.influx.database=iot_platform spring.influx.database=iot_platform
...@@ -126,7 +126,7 @@ amos.secret.key=qaz ...@@ -126,7 +126,7 @@ amos.secret.key=qaz
#eureka.instance.ip-address=172.16.3.122 #eureka.instance.ip-address=172.16.3.122
spring.activemq.broker-url=tcp://39.98.223.23:61616 spring.activemq.broker-url=tcp://172.16.10.220:61616
spring.activemq.user=admin spring.activemq.user=admin
spring.activemq.password=admin spring.activemq.password=admin
spring.jms.pub-sub-domain=false spring.jms.pub-sub-domain=false
...@@ -136,3 +136,33 @@ myqueue=amos.privilege.v1.JXIOP.AQSC_FDGL.userBusiness ...@@ -136,3 +136,33 @@ myqueue=amos.privilege.v1.JXIOP.AQSC_FDGL.userBusiness
fan.statuts.stattuspath=upload/jxiop/device_status fan.statuts.stattuspath=upload/jxiop/device_status
pictureUrl=upload/jxiop/syz/ pictureUrl=upload/jxiop/syz/
#kafka
spring.kafka.bootstrap-servers=139.9.173.44:9092
spring.kafka.producer.retries=1
spring.kafka.producer.bootstrap-servers=139.9.173.44:9092
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=consumerGroup
spring.kafka.consumer.bootstrap-servers=139.9.173.44:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch
spring.kafka.consumer.max-poll-records=100
spring.kafka.consumer.fetch-max-wait= 10000
#当前时间向前偏移月数 向历史偏移月数
last.month.num = 12
...@@ -75,10 +75,10 @@ station.section=10 ...@@ -75,10 +75,10 @@ station.section=10
gl.sum.column=日发电量,月发电量,年发电量 gl.sum.column=日发电量,月发电量,年发电量
gl.avg.column=有功功率,日利用小时,瞬时风速 gl.avg.column=有功功率,日利用小时,瞬时风速
spring.elasticsearch.rest.uris=http://39.98.224.23:9200 spring.elasticsearch.rest.uris=http://139.9.173.44:9200
spring.elasticsearch.rest.connection-timeout=30000 spring.elasticsearch.rest.connection-timeout=30000
spring.elasticsearch.rest.username=elastic spring.elasticsearch.rest.username=elastic
spring.elasticsearch.rest.password=123456 spring.elasticsearch.rest.password=Yeejoin@2020
spring.elasticsearch.rest.read-timeout=30000 spring.elasticsearch.rest.read-timeout=30000
healthValue_Warn=39 healthValue_Warn=39
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment