Commit a1dacfae authored by caotao's avatar caotao

1.健康指数计算方式由原来定时计算修改为消息通知后业务计算。(进行中)

parent f2aacfb2
......@@ -3,10 +3,12 @@ package com.yeejoin.amos;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure;
import com.yeejoin.amos.boot.biz.common.utils.oConvertUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.listener.SyncESDataToTdengineMqttListener;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
......@@ -18,11 +20,13 @@ import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.net.InetAddress;
......@@ -50,6 +54,11 @@ import java.net.InetAddress;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, DruidDataSourceAutoConfigure.class})
//@SpringBootApplication
public class AmosJxiopAnalyseApplication {
// @Autowired
// private EmqKeeper emqKeeper;
//
// @Autowired
// private SyncESDataToTdengineMqttListener syncESDataToTdengineMqttListener;
private static final Logger logger = LoggerFactory.getLogger(AmosJxiopAnalyseApplication.class);
......@@ -64,4 +73,9 @@ public class AmosJxiopAnalyseApplication {
+ "Application Amos-Biz-Boot-Jxiop-Montior is running! Access URLs:\n\t" + "Swagger文档: \thttp://" + ip + ":" + port
+ path + "/doc.html\n" + "----------------------------------------------------------");
}
// @Bean
// public void initMqtt() throws Exception {
// emqKeeper.subscript("sync_esdata_to_tdengine_notice", 2,syncESDataToTdengineMqttListener );
// }
}
package com.yeejoin.amos.boot.module.jxiop.biz.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.CommonServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqxListener;
/**
* @author Administrator
*/
@Component
@Slf4j
public class SyncESDataToTdengineMqttListener extends EmqxListener {
@Autowired
CommonServiceImpl commonServiceImpl;
@Override
public void processMessage(String topic, MqttMessage mqttMessage) {
log.info(topic + "收到数据同步成功,开始计算健康指数!");
byte[] payload = mqttMessage.getPayload();
String str = new String(payload);
String msg = JSON.parse(str).toString();
JSONObject jsonObject = JSONObject.parseObject(msg);
String flag = jsonObject.get("sync_flag").toString();
if ("success".equals(flag)){
//开始异步计算光伏的健康指数算法
new Thread(()->{
//调用光伏的健康指数算法
commonServiceImpl.healthWarningMinuteByPv();
}).start();
//开始异步计算风机的健康指数算法
new Thread(()->{
//调用风机的健康指数算法
commonServiceImpl.healthWarningMinuteByFan();
}).start();
}
}
}
......@@ -1656,7 +1656,8 @@ public class CommonServiceImpl {
if (analysisVariableIdArray.get(i).toString().equals(obj.getSequenceNbr())){
IdxBizFanHealthIndex idxBizFanHealthIndex = new IdxBizFanHealthIndex();
BeanUtils.copyProperties(obj, idxBizFanHealthIndex,"sequenceNbr");
idxBizFanHealthIndex.setHealthIndex(indexValueArray.getDoubleValue(i)< 0 ? Math.abs(indexValueArray.getDoubleValue(i)) : indexValueArray.getDoubleValue(i));
//2023年10月30日10点05分 移除原来的判断 无论是否小于0都取绝对值
idxBizFanHealthIndex.setHealthIndex( Math.abs(indexValueArray.getDoubleValue(i)));
idxBizFanHealthIndex.setAnalysisObjSeq(obj.getSequenceNbr());
idxBizFanHealthIndex.setRecDate(time);
// idxBizFanHealthIndex.setSequenceNbr(null);
......@@ -1716,7 +1717,7 @@ public class CommonServiceImpl {
for (String s : maps.keySet()) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
List<String> address = maps.get(s).stream().map(IdxBizPvPointProcessVariableClassificationDto::getIndexAddress).collect(Collectors.toList());
boolQueryBuilder.must(QueryBuilders.termsQuery("address.keyword", address)).must(QueryBuilders.matchQuery("gatewayId.keyword", s));
boolQueryBuilder.must(QueryBuilders.termsQuery("address.keyword", address)).must(QueryBuilders.termsQuery("gatewayId.keyword", s));
should.add(boolQueryBuilder);
}
// 创建查询构造器
......@@ -1868,7 +1869,8 @@ public class CommonServiceImpl {
IdxBizPvHealthIndex idxBizPvHealthIndex = new IdxBizPvHealthIndex();
BeanUtils.copyProperties(obj, idxBizPvHealthIndex, "sequenceNbr");
// idxBizPvHealthIndex.setSequenceNbr(null);
idxBizPvHealthIndex.setHealthIndex(indexValueArray.getDoubleValue(i)< 0 ? Math.abs(indexValueArray.getDoubleValue(i)) : indexValueArray.getDoubleValue(i));
//2023年10月30日10点05分 移除原来的判断 无论是否小于0都取绝对值
idxBizPvHealthIndex.setHealthIndex(Math.abs(indexValueArray.getDoubleValue(i)));
idxBizPvHealthIndex.setAnalysisObjSeq(obj.getSequenceNbr());
idxBizPvHealthIndex.setRecDate(time);
idxBizPvHealthIndex.setWeigth(1.0);
......
......@@ -47,5 +47,7 @@ public interface IndicatorDataMapper extends BaseMapper<IndicatorData> {
@Select("select `value`, created_time from iot_data.indicator_data where id =#{id} and ts >= #{startTime} and ts <= #{endTime} ")
List<IndicatorData> selectDataByequipmentIndexNameAndtimeAndEquipmentNumber(@Param("id") String id, @Param("startTime") String startTime, @Param("endTime") String endTime);
@Select("select `value`,`value_f`, address, gateway_id from iot_data.indicator_data where ts >=NOW()-10m and gateway_id = #{gatewayId} and address in(#{addressIds})")
List<IndicatorData> selectDataByGatewayIdAndAddress(@Param("gatewayId") String gatewayId,@Param("addressIds") String addressIds);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment