Commit fff556ba authored by hezhuozhi's avatar hezhuozhi

Merge branches 'developer' and 'developer' of…

Merge branches 'developer' and 'developer' of http://36.40.66.175:5000/moa/jxdj_zx/amos-boot-zx-biz into developer
parents bed542c8 94d18836
......@@ -30,10 +30,8 @@ import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.net.InetAddress;
/**
* <pre>
* 智信能源科技服务启动类
......@@ -51,17 +49,19 @@ import java.net.InetAddress;
@EnableScheduling
@MapperScan({ "org.typroject.tyboot.demo.face.orm.dao*", "org.typroject.tyboot.face.*.orm.dao*",
"org.typroject.tyboot.core.auth.face.orm.dao*", "org.typroject.tyboot.component.*.face.orm.dao*",
"com.yeejoin.amos.boot.module.**.api.mapper", "com.yeejoin.amos.boot.biz.common.dao.mapper","com.yeejoin.amos.boot.module.common.biz.*","com.yeejoin.amos.boot.module.jxiop.api.mapper","com.yeejoin.amos.boot.module.jxiop.biz.tdmapper","com.yeejoin.amos.boot.module.jxiop.biz.mapper5" })
"com.yeejoin.amos.boot.module.**.api.mapper", "com.yeejoin.amos.boot.biz.common.dao.mapper",
"com.yeejoin.amos.boot.module.common.biz.*", "com.yeejoin.amos.boot.module.jxiop.api.mapper",
"com.yeejoin.amos.boot.module.jxiop.biz.tdmapper", "com.yeejoin.amos.boot.module.jxiop.biz.mapper5" })
@ComponentScan(basePackages = { "org.typroject", "com.yeejoin.amos" })
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, DruidDataSourceAutoConfigure.class})
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class, DruidDataSourceAutoConfigure.class })
//@SpringBootApplication
public class AmosJxiopAnalyseApplication {
@Autowired
private EmqKeeper emqKeeper;
//本地是否执行健康指数算法开关
// 本地是否执行健康指数算法开关
@Value("${openHealth:false}")
Boolean openHealth;
@Autowired
@Autowired
private SyncESDataToTdengineMqttListener syncESDataToTdengineMqttListener;
@Autowired
private SyncDasSuccessMqttListener syncDasSuccessMqttListener;
......@@ -76,17 +76,18 @@ public class AmosJxiopAnalyseApplication {
String path = oConvertUtils.getString(env.getProperty("server.servlet.context-path"));
logger.info("\n----------------------------------------------------------\n\t"
+ "Application Amos-Biz-Boot-Jxiop-Montior is running! Access URLs:\n\t" + "Swagger文档: \thttp://" + ip + ":" + port
+ path + "/doc.html\n" + "----------------------------------------------------------");
+ "Application Amos-Biz-Boot-Jxiop-Montior is running! Access URLs:\n\t" + "Swagger文档: \thttp://" + ip
+ ":" + port + path + "/doc.html\n" + "----------------------------------------------------------");
}
@Bean
public void initMqtt() throws Exception {
if (openHealth) {
//订阅固化周期性数据成功的消息
emqKeeper.subscript("sync_esdata_to_tdengine_notice", 1, syncESDataToTdengineMqttListener);
//订阅业务固化同步数据成功消息
emqKeeper.subscript("sync_iotdata_to_tdengine_notice", 1, syncDasSuccessMqttListener);
if (!openHealth) {
return;
}
// 订阅固化周期性数据成功的消息
emqKeeper.subscript("sync_esdata_to_tdengine_notice", 1, syncESDataToTdengineMqttListener);
// 订阅业务固化同步数据成功消息
emqKeeper.subscript("sync_iotdata_to_tdengine_notice", 1, syncDasSuccessMqttListener);
}
}
package com.yeejoin.amos.boot.module.jxiop.biz.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Date;
/**
*
*
* @author system_generator
* @date 2023-08-15
*/
@Data
@Accessors(chain = true)
@TableName("fan_health_index_latest_data")
public class IdxBizFanHealthIndexLatest {
private static final long serialVersionUID = 1L;
/**
*
*/
@TableId(value = "SEQUENCE_NBR", type = IdType.ID_WORKER_STR)
private String sequenceNbr;
/**
*
*/
@TableField("REC_DATE")
private Date recDate;
/**
* 分析维度类型
*/
@TableField("ANALYSIS_OBJ_TYPE")
private String analysisObjType;
/**
* 分析维度seq
*/
@TableField("ANALYSIS_OBJ_SEQ")
private String analysisObjSeq;
/**
*
*/
@TableField("WEIGTH")
private Double weigth;
/**
*
*/
@TableField("HEALTH_INDEX")
private Double healthIndex;
/**
* 健康等级
*/
@TableField("HEALTH_LEVEL")
private String healthLevel;
/**
*
*/
@TableField("ANALYSIS_TYPE")
private String analysisType;
/**
*
*/
@TableField("ANALYSIS_START_TIME")
private Date analysisStartTime;
/**
*
*/
@TableField("ANALYSIS_END_TIME")
private Date analysisEndTime;
/**
* 片区
*/
@TableField("ARAE")
private String arae;
/**
* 场站
*/
@TableField("STATION")
private String station;
/**
* 子系统
*/
@TableField("SUB_SYSTEM")
private String subSystem;
/**
* 型号
*/
@TableField("NUMBER")
private String number;
/**
* 设备名称
*/
@TableField("EQUIPMENT_NAME")
private String equipmentName;
/**
*
*/
@TableField("GATEWAY_ID")
private String gatewayId;
/**
* 点表地址
*/
@TableField("INDEX_ADDRESS")
private String indexAddress;
@TableField("ANOMALY")
private Double ANOMALY;
/**
* 分析变量名称
*/
@TableField("POINT_NAME")
private String pointName;
@TableField("ANALYSIS_TIME")
private String ANALYSISTIME;
/**
* KKS码
*/
@TableField("KKS")
private String kks;
}
package com.yeejoin.amos.boot.module.jxiop.biz.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.yeejoin.amos.boot.biz.common.entity.BaseEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.time.LocalDateTime;
import java.util.Date;
/**
*
*
* @author system_generator
* @date 2023-08-15
*/
@Data
@Accessors(chain = true)
@TableName("pv_health_index_latest_data")
public class IdxBizPvHealthIndexLatest{
private static final long serialVersionUID = 1L;
/**
*
*/
@TableId(value = "SEQUENCE_NBR", type = IdType.ID_WORKER_STR)
private String sequenceNbr;
/**
*
*/
@TableField("REC_DATE")
private Date recDate;
/**
* 分析维度类型
*/
@TableField("ANALYSIS_OBJ_TYPE")
private String analysisObjType;
/**
* 分析维度seq
*/
@TableField("ANALYSIS_OBJ_SEQ")
private String analysisObjSeq;
/**
*
*/
@TableField("WEIGTH")
private Double weigth;
/**
*
*/
@TableField("HEALTH_INDEX")
private Double healthIndex;
/**
* 健康等级
*/
@TableField("HEALTH_LEVEL")
private String healthLevel;
/**
* 分析周期
*/
@TableField("ANALYSIS_TYPE")
private String analysisType;
/**
*
*/
@TableField("ANALYSIS_START_TIME")
private Date analysisStartTime;
/**
*
*/
@TableField("ANALYSIS_END_TIME")
private Date analysisEndTime;
/**
* 片区
*/
@TableField("ARAE")
private String arae;
/**
* 场站
*/
@TableField("STATION")
private String station;
/**
* 子阵
*/
@TableField("SUBARRAY")
private String subarray;
/**
* 厂商
*/
@TableField("MANUFACTURER")
private String manufacturer;
/**
* 设备类型
*/
@TableField("DEVICE_TYPE")
private String deviceType;
/**
* 网关ID
*/
@TableField("GATEWAY_ID")
private String gatewayId;
/**
* 点表地址
*/
@TableField("INDEX_ADDRESS")
private String indexAddress;
/**
* 设备名称
*/
@TableField("EQUIPMENT_NAME")
private String equipmentName;
@TableField("ANOMALY")
private Double ANOMALY;
/**
* 分析变量名称
*/
@TableField("POINT_NAME")
private String pointName;
@TableField("ANALYSIS_TIME")
private String ANALYSISTIME;
/**
* KKS码
*/
@TableField("KKS")
private String kks;
}
......@@ -2,9 +2,14 @@ package com.yeejoin.amos.boot.module.jxiop.biz.listener;
import java.text.ParseException;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -13,6 +18,7 @@ import org.typroject.tyboot.component.emq.EmqxListener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.CommonServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.TdengineTimeServiceImpl;
......@@ -28,12 +34,24 @@ public class SyncESDataToTdengineMqttListener extends EmqxListener {
@Autowired
private CommonServiceImpl commonServiceImpl;
@Autowired
private TdengineTimeServiceImpl tdengineTimeService;
@Autowired
private TdengineTimeServiceImpl tdengineTimeService;
@Autowired
private RedisUtils redisUtils;
private final static String JXIOP_ANALYSE_TIME = "JXIOP_ANALYSE_TIME";
@PostConstruct
public void test() {
// 每次启动清空redis
//redisUtils.set(JXIOP_ANALYSE_TIME, "2024-07-30 13:40:00");
redisUtils.del(JXIOP_ANALYSE_TIME);
}
@Override
public void processMessage(String topic, MqttMessage mqttMessage) throws InterruptedException, ParseException {
public void processMessage(String topic, MqttMessage mqttMessage) throws ParseException {
log.info(topic + "收到数据同步成功,开始计算健康指数!");
byte[] payload = mqttMessage.getPayload();
String str = new String(payload);
......@@ -41,31 +59,77 @@ public class SyncESDataToTdengineMqttListener extends EmqxListener {
JSONObject jsonObject = JSONObject.parseObject(msg);
String flag = jsonObject.get("sync_flag").toString();
if ("success".equals(flag)) {
Date time = new Date();
time = DateUtil.offsetMinute(time, -DateUtil.minute(time) % 10);
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
time = DateUtil.parse(format, "yyyy-MM-dd HH:mm:00");
ExecutorService excutorService = Executors.newFixedThreadPool(4);
int taskCount = 2;
final CountDownLatch latch = new CountDownLatch(taskCount);
excutorService.submit(()->{
commonServiceImpl.healthWarningMinuteByFan();
latch.countDown();
});
excutorService.submit(()->{
commonServiceImpl.healthWarningMinuteByPv();
latch.countDown();
});
System.out.println("等待所有任务完成..");
latch.await();
System.out.println("所有任务完成");
if (redisUtils.get(JXIOP_ANALYSE_TIME) != null) {
// 如果相差20分钟 按10分钟处理 可能装备发消息晚了
String lastformat = String.valueOf(redisUtils.get(JXIOP_ANALYSE_TIME));
Date oldTime = DateUtil.parse(lastformat, "yyyy-MM-dd HH:mm:00");
long diffInMillies = Math.abs(time.getTime() - oldTime.getTime());
long diffInMinutes = TimeUnit.MINUTES.convert(diffInMillies, TimeUnit.MILLISECONDS);
if (diffInMinutes == 20) {
time = DateUtil.offsetMinute(time, -10);
format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
}
}
redisUtils.set(JXIOP_ANALYSE_TIME, format);
System.out.println(format);
final Date timeF = time;
//区域 全域最后统一生成
tdengineTimeService.insertMomentDataAll(format);
CompletableFuture<String> fan=CompletableFuture.supplyAsync(()->{
commonServiceImpl.healthWarningMinuteByFan(timeF);
String fanResult = "风电任务完成..";
System.out.println(fanResult);
return fanResult;
});
CompletableFuture<String> pv=CompletableFuture.supplyAsync(()->{
commonServiceImpl.healthWarningMinuteByPv(timeF);
String pvResult = "光伏任务完成..";
System.out.println(pvResult);
return pvResult;
});
try {
String fanResult = fan.get();
String pvResult = pv.get();
// 区域 全域最后统一生成
tdengineTimeService.insertMomentDataAll(format);
} catch (InterruptedException | ExecutionException e) {
System.out.println("任务执行异常");
e.printStackTrace();
}
//
// ExecutorService excutorService = Executors.newFixedThreadPool(10);
// int taskCount = 2;
// final CountDownLatch latch = new CountDownLatch(taskCount);
// excutorService.submit(() -> {
// commonServiceImpl.healthWarningMinuteByFan(timeF);
// System.out.println("风电任务完成..");
// latch.countDown();
// });
// excutorService.submit(() -> {
// commonServiceImpl.healthWarningMinuteByPv(timeF);
// System.out.println("光伏任务完成..");
// latch.countDown();
// });
//
// try {
// System.out.println("等待所有任务完成..");
// latch.await();
// System.out.println("所有任务完成");
// } catch (InterruptedException e) {
// System.out.println("任务执行异常");
// e.printStackTrace();
// Thread.currentThread().interrupt();
// }
// // 结束线程池
// excutorService.shutdown();
// 区域 全域最后统一生成
// tdengineTimeService.insertMomentDataAll(format);
// // 开始异步计算光伏的健康指数算法
// new Thread(() -> {
// // 调用光伏的健康指数算法
......@@ -77,19 +141,19 @@ public class SyncESDataToTdengineMqttListener extends EmqxListener {
// commonServiceImpl.healthWarningMinuteByFan();
// }).start();
}
if ("pvsuccess".equals(flag)) {
// 开始异步计算光伏的健康指数算法
new Thread(() -> {
// 调用光伏的健康指数算法
commonServiceImpl.healthWarningMinuteByPv();
}).start();
}
if ("fansuccess".equals(flag)) {
// 开始异步计算光伏的健康指数算法
new Thread(() -> {
// 调用光伏的健康指数算法
commonServiceImpl.healthWarningMinuteByFan();
}).start();
}
// if ("pvsuccess".equals(flag)) {
// // 开始异步计算光伏的健康指数算法
// new Thread(() -> {
// // 调用光伏的健康指数算法
// commonServiceImpl.healthWarningMinuteByPv();
// }).start();
// }
// if ("fansuccess".equals(flag)) {
// // 开始异步计算光伏的健康指数算法
// new Thread(() -> {
// // 调用光伏的健康指数算法
// commonServiceImpl.healthWarningMinuteByFan();
// }).start();
// }
}
}
package com.yeejoin.amos.boot.module.jxiop.biz.mapper2;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanHealthIndexLatest;
/**
* Mapper 接口
*
* @author system_generator
* @date 2023-08-15
*/
public interface IdxBizFanHealthIndexLatestMapper extends BaseMapper<IdxBizFanHealthIndexLatest> {
}
package com.yeejoin.amos.boot.module.jxiop.biz.mapper2;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvHealthIndexLatest;
/**
* Mapper 接口
*
* @author system_generator
* @date 2023-08-15
*/
public interface IdxBizPvHealthIndexLatestMapper extends BaseMapper<IdxBizPvHealthIndexLatest> {
}
......@@ -1828,15 +1828,15 @@ public class CommonServiceImpl {
// @Scheduled(cron = "0 0/10 * * * ?")
// @Async("async")
public void healthWarningMinuteByFan() {
public void healthWarningMinuteByFan(Date time) {
if (!openHealth) {
return;
}
Date time = new Date();
time = DateUtil.offsetMinute(time, -DateUtil.minute(time) % 10);
// Date time = new Date();
// time = DateUtil.offsetMinute(time, -DateUtil.minute(time) % 10);
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
time = DateUtil.parse(format, "yyyy-MM-dd HH:mm:00");
// time = DateUtil.parse(format, "yyyy-MM-dd HH:mm:00");
logger.info("风机---------------------健康指数时间----" + time);
// Calendar calendar = Calendar.getInstance();
List<IdxBizFanPointProcessVariableClassificationDto> data = idxBizFanPointProcessVariableClassificationMapper
......@@ -2455,15 +2455,15 @@ public class CommonServiceImpl {
// @Scheduled(cron = "0 0/10 * * * ?")
// @Async("async")
public void healthWarningMinuteByPv() {
public void healthWarningMinuteByPv(Date time) {
if (!openHealth) {
return;
}
// Calendar calendar = Calendar.getInstance();
Date time = new Date();
time = DateUtil.offsetMinute(time, -DateUtil.minute(time) % 10);
// Date time = new Date();
// time = DateUtil.offsetMinute(time, -DateUtil.minute(time) % 10);
String format = DateUtil.format(time, "yyyy-MM-dd HH:mm:00");
time = DateUtil.parse(format, "yyyy-MM-dd HH:mm:00");
//time = DateUtil.parse(format, "yyyy-MM-dd HH:mm:00");
logger.info("光伏---------------------健康指数时间----" + time);
List<IdxBizPvPointProcessVariableClassificationDto> data = idxBizPvPointProcessVariableClassificationMapper
.getInfluxDBData();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment