Commit 53a46a65 authored by caotao's avatar caotao

优化消息推送service 与influxdb 代码

parent 1ee21614
...@@ -23,6 +23,7 @@ import io.swagger.annotations.ApiOperation; ...@@ -23,6 +23,7 @@ import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.foundation.enumeration.UserType; import org.typroject.tyboot.core.foundation.enumeration.UserType;
...@@ -70,58 +71,60 @@ public class DemoController extends BaseController { ...@@ -70,58 +71,60 @@ public class DemoController extends BaseController {
public StationBasic demoTest() { public StationBasic demoTest() {
return stationBasicServiceimpl.getById(1660231556607774721L); return stationBasicServiceimpl.getById(1660231556607774721L);
} }
@TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY) @TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY)
@ApiOperation(value = "demo测试中间库") @ApiOperation(value = "demo测试中间库")
@GetMapping("/test1") @GetMapping("/test1")
public List<Test> demoTest1() { public List<Test> demoTest1() {
return testServiceimpl.getList(); return testServiceimpl.getList();
} }
@TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY) @TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY)
@ApiOperation(value = "demo测试influxdb") @ApiOperation(value = "demo测试influxdb")
@GetMapping("/test3") @GetMapping("/test3")
public void demoTest2() { public void demoTest2() {
String sql = "SELECT createdTime ,gatewayId,address,valueLabel FROM \"iot_data\" WHERE equipmentSpecificName=~/.*27风机.*/ and gatewayId='1668801435891929089' GROUP BY equipmentIndex ORDER BY time desc LIMIT 600"; String sql = "SELECT createdTime ,gatewayId,address,valueLabel FROM \"iot_data\" WHERE equipmentSpecificName=~/.*27风机.*/ and gatewayId='1668801435891929089' GROUP BY equipmentIndex ORDER BY time desc LIMIT 600";
List<IndexDto> list=influxDButils.getListData(sql,IndexDto.class); List<IndexDto> list = influxDButils.getListData(sql, IndexDto.class);
System.out.println(list.size()); System.out.println(list.size());
} }
@TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY) @TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY)
@ApiOperation(value = "测试获取风速平均值") @ApiOperation(value = "初始化风机")
@GetMapping("/test4") @GetMapping("/test4")
public void demoTest3() { public void demoTest3() {
// Object o=this.monitorFanIndicatorImpl.getIndicatoralueAvage("1668801435891929089","60秒平均风速"); // Object o=this.monitorFanIndicatorImpl.getIndicatoralueAvage("1668801435891929089","60秒平均风速");
// System.out.println(o.toString()); // System.out.println(o.toString());
QueryWrapper<MonitorFanIndicator> QueryWrapper= new QueryWrapper<>(); QueryWrapper<MonitorFanIndicator> QueryWrapper = new QueryWrapper<>();
QueryWrapper.eq("gateway","1668801435891929089"); QueryWrapper.eq("gateway", "1668801435891929089");
long DATE =new Date().getTime(); long DATE = new Date().getTime();
System.out.println(DATE); System.out.println(DATE);
List<MonitorFanIndicator> list=monitorFanIndicatorregionMapper.selectList(QueryWrapper); List<MonitorFanIndicator> list = monitorFanIndicatorregionMapper.selectList(QueryWrapper);
for (MonitorFanIndicator monitorFanIndicator : list) { for (MonitorFanIndicator monitorFanIndicator : list) {
Map<String, String> tag = new HashMap<>(); Map<String, String> tag = new HashMap<>();
Map<String, Object> maps2 = new HashMap<>(); Map<String, Object> maps2 = new HashMap<>();
tag.put("address",monitorFanIndicator.getIndexAddress() ); tag.put("address", monitorFanIndicator.getIndexAddress());
tag.put("dataType", monitorFanIndicator.getDataType()); tag.put("dataType", monitorFanIndicator.getDataType());
tag.put("equipmentSpecificName",monitorFanIndicator.getFanCode() ); tag.put("equipmentSpecificName", monitorFanIndicator.getFanCode());
tag.put("equipmentsIdx",monitorFanIndicator.getAddressGateway() ); tag.put("equipmentsIdx", monitorFanIndicator.getAddressGateway());
tag.put("gatewayId", monitorFanIndicator.getGateway()); tag.put("gatewayId", monitorFanIndicator.getGateway());
tag.put("isAlarm", monitorFanIndicator.getIsAlarm()); tag.put("isAlarm", monitorFanIndicator.getIsAlarm());
maps2.put("createdTime", "2023-07-05 18:30:26"); // maps2.put("createdTime", "2023-07-05 18:30:26");
maps2.put("unit", monitorFanIndicator.getUnit()); // maps2.put("unit", monitorFanIndicator.getUnit());
maps2.put("value", ""); // maps2.put("value", "");
maps2.put("valueLabel","" ); // maps2.put("valueLabel","" );
maps2.put("traceId", ""); // maps2.put("traceId", "");
maps2.put("equipmentIndexName", monitorFanIndicator.getIndicator()); // maps2.put("equipmentIndexName", monitorFanIndicator.getIndicator());
maps2.put("equipmentNumber", monitorFanIndicator.getEquipmentNumber()); // maps2.put("equipmentNumber", monitorFanIndicator.getEquipmentNumber());
maps2.put("frontModule", monitorFanIndicator.getFrontModule()); maps2.put("frontModule", monitorFanIndicator.getFrontModule());
maps2.put("systemType", monitorFanIndicator.getSystemType()); maps2.put("systemType", monitorFanIndicator.getSystemType());
influxDbConnection.insert("indicators_"+monitorFanIndicator.getGateway(), tag, maps2,1688558007051L, TimeUnit.MILLISECONDS); influxDbConnection.insert("indicators_" + monitorFanIndicator.getGateway(), tag, maps2, 1688558007051L, TimeUnit.MILLISECONDS);
} }
} }
@TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY) @TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY)
...@@ -129,8 +132,8 @@ public class DemoController extends BaseController { ...@@ -129,8 +132,8 @@ public class DemoController extends BaseController {
@GetMapping("/test5") @GetMapping("/test5")
public void demoTest4() { public void demoTest4() {
HashMap<String, String> hashMap = new HashMap<>(); HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("test", String.valueOf(Math.random() *1000)); hashMap.put("test", String.valueOf(Math.random() * 1000));
hashMap.put("test1", String.valueOf(Math.random() *100)); hashMap.put("test1", String.valueOf(Math.random() * 100));
try { try {
emqKeeper.getMqttClient().publish("test_topic", JSON.toJSON(hashMap).toString().getBytes("UTF-8"), 1, true); emqKeeper.getMqttClient().publish("test_topic", JSON.toJSON(hashMap).toString().getBytes("UTF-8"), 1, true);
emqKeeper.getMqttClient().publish("test_topic1", JSON.toJSON(hashMap).toString().getBytes("UTF-8"), 1, true); emqKeeper.getMqttClient().publish("test_topic1", JSON.toJSON(hashMap).toString().getBytes("UTF-8"), 1, true);
...@@ -140,5 +143,57 @@ public class DemoController extends BaseController { ...@@ -140,5 +143,57 @@ public class DemoController extends BaseController {
} }
} }
@TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY)
@ApiOperation(value = "初始化升压站")
@GetMapping("/test6")
public void demoTest5() {
// Object o=this.monitorFanIndicatorImpl.getIndicatoralueAvage("1668801435891929089","60秒平均风速");
// System.out.println(o.toString());
QueryWrapper<MonitorFanIndicator> QueryWrapper = new QueryWrapper<>();
QueryWrapper.eq("gateway", "1668801570352926721");
long DATE = new Date().getTime();
System.out.println(DATE);
List<MonitorFanIndicator> list = monitorFanIndicatorregionMapper.selectList(QueryWrapper);
for (MonitorFanIndicator monitorFanIndicator : list) {
Map<String, String> tag = new HashMap<>();
Map<String, Object> maps2 = new HashMap<>();
tag.put("address", monitorFanIndicator.getIndexAddress());
tag.put("dataType", "");
if (!ObjectUtils.isEmpty(monitorFanIndicator.getDataType())) {
tag.put("dataType", monitorFanIndicator.getDataType());
}
tag.put("equipmentSpecificName", monitorFanIndicator.getFanCode());
tag.put("equipmentsIdx", monitorFanIndicator.getAddressGateway());
tag.put("gatewayId", monitorFanIndicator.getGateway());
tag.put("isAlarm", monitorFanIndicator.getIsAlarm());
maps2.put("createdTime", "2023-07-05 18:30:26");
maps2.put("unit", "");
maps2.put("value", "");
maps2.put("valueLabel", "");
maps2.put("traceId", "");
maps2.put("equipmentIndexName", monitorFanIndicator.getIndicator());
maps2.put("equipmentNumber", "");
maps2.put("frontModule", "");
maps2.put("systemType", "");
//升压站的字段显示名称
maps2.put("displayName", "");
if (!ObjectUtils.isEmpty(monitorFanIndicator.getFrontModule())) {
maps2.put("frontModule", monitorFanIndicator.getFrontModule());
}
if (!ObjectUtils.isEmpty(monitorFanIndicator.getSystemType())) {
maps2.put("systemType", monitorFanIndicator.getSystemType());
}
if (!ObjectUtils.isEmpty(monitorFanIndicator.getSystemType())) {
maps2.put("unit", monitorFanIndicator.getUnit());
}
influxDbConnection.insert("indicators_" + monitorFanIndicator.getGateway(), tag, maps2, 1688558007051L, TimeUnit.MILLISECONDS);
}
}
} }
...@@ -26,6 +26,20 @@ import java.util.stream.Collectors; ...@@ -26,6 +26,20 @@ import java.util.stream.Collectors;
@Service @Service
public class MonitoringServiceIMQTTmpl { public class MonitoringServiceIMQTTmpl {
Logger logger = LoggerFactory.getLogger(MonitoringServiceIMQTTmpl.class); Logger logger = LoggerFactory.getLogger(MonitoringServiceIMQTTmpl.class);
//社会贡献Cron表达式
private final String totalSocialContributionCron = "0 0/1 * * * *";
//区域完成情况 Cron表达式
private final String completionOfPowerIndicatorsByProvinceNameCron = "0/30 * * * * *";
//风电站状态Cron表达式
private final String fanStatusStatisticsCron = "0/30 0 * * * *";
//风电站总览Cron表达式
private final String fanstationOverviewCron = "0/30 * * * * *";
//风电站功率曲线图表达式
private final String fanStationPowerBightCron = "0/30 * * * * *";
//风机状态列表Cron表达式
private final String fanStatusListCron = "0/30 * * * * *";
//风机实时数据Cron表达式
private final String fanCurrentDataCron = "0 0/1 * * * *";
/** /**
* 场站mapper * 场站mapper
...@@ -45,10 +59,11 @@ public class MonitoringServiceIMQTTmpl { ...@@ -45,10 +59,11 @@ public class MonitoringServiceIMQTTmpl {
@Autowired @Autowired
CommonServiceImpl commonService; CommonServiceImpl commonService;
/** /**
* 社会贡献定时消息发送 1分钟推送一次 * 社会贡献定时消息发送 1分钟推送一次
*/ */
@Scheduled(cron = "0 0/1 * * * *") @Scheduled(cron = totalSocialContributionCron)
public void getTotalSocialContribution() { public void getTotalSocialContribution() {
Page<SocialContributionDto> socialContributionDtoPage = new Page<SocialContributionDto>(); Page<SocialContributionDto> socialContributionDtoPage = new Page<SocialContributionDto>();
logger.error("--------------------------社会贡献定时执行----------------------------------------------"); logger.error("--------------------------社会贡献定时执行----------------------------------------------");
...@@ -100,7 +115,7 @@ public class MonitoringServiceIMQTTmpl { ...@@ -100,7 +115,7 @@ public class MonitoringServiceIMQTTmpl {
/** /**
* 区域实时数据消息推送-30s一次 * 区域实时数据消息推送-30s一次
*/ */
@Scheduled(cron = "0/30 * * * * *") @Scheduled(cron = completionOfPowerIndicatorsByProvinceNameCron)
public void getCompletionOfPowerIndicatorsByProvinceName() { public void getCompletionOfPowerIndicatorsByProvinceName() {
Page<SocialContributionDto> socialContributionDtoPage = new Page<SocialContributionDto>(); Page<SocialContributionDto> socialContributionDtoPage = new Page<SocialContributionDto>();
logger.error("--------------------------区域实时数据消息开始发送----------------------------------------------"); logger.error("--------------------------区域实时数据消息开始发送----------------------------------------------");
...@@ -157,7 +172,7 @@ public class MonitoringServiceIMQTTmpl { ...@@ -157,7 +172,7 @@ public class MonitoringServiceIMQTTmpl {
/** /**
* 实时推送-风电站风机状态统计数据 * 实时推送-风电站风机状态统计数据
*/ */
@Scheduled(cron = "0/30 0 * * * *") @Scheduled(cron = fanStatusStatisticsCron)
public void getFanStatusStatistics() { public void getFanStatusStatistics() {
List<StationBasic> stationBasicList = stationBasicMapper.selectList(new QueryWrapper<StationBasic>().isNotNull("sequence_nbr").eq("station_type", "FDZ")); List<StationBasic> stationBasicList = stationBasicMapper.selectList(new QueryWrapper<StationBasic>().isNotNull("sequence_nbr").eq("station_type", "FDZ"));
stationBasicList.forEach(stationBasic -> { stationBasicList.forEach(stationBasic -> {
...@@ -180,7 +195,7 @@ public class MonitoringServiceIMQTTmpl { ...@@ -180,7 +195,7 @@ public class MonitoringServiceIMQTTmpl {
/** /**
* 实时推送-风电站风场总概览 * 实时推送-风电站风场总概览
*/ */
@Scheduled(cron = "0/30 * * * * *") @Scheduled(cron = fanstationOverviewCron)
public void getFanstationOverview() { public void getFanstationOverview() {
List<StationBasic> stationBasicList = stationBasicMapper.selectList(new QueryWrapper<StationBasic>().isNotNull("sequence_nbr").eq("station_type", "FDZ")); List<StationBasic> stationBasicList = stationBasicMapper.selectList(new QueryWrapper<StationBasic>().isNotNull("sequence_nbr").eq("station_type", "FDZ"));
stationBasicList.forEach(stationBasic -> { stationBasicList.forEach(stationBasic -> {
...@@ -243,7 +258,7 @@ public class MonitoringServiceIMQTTmpl { ...@@ -243,7 +258,7 @@ public class MonitoringServiceIMQTTmpl {
/** /**
* 实时推送-场站功率曲线总概览 * 实时推送-场站功率曲线总概览
*/ */
@Scheduled(cron = "0/30 * * * * *") @Scheduled(cron = fanStationPowerBightCron)
public void getFanStationPowerBight() { public void getFanStationPowerBight() {
List<StationBasic> stationBasicList = stationBasicMapper.selectList(new QueryWrapper<StationBasic>().isNotNull("sequence_nbr").eq("station_type", "FDZ")); List<StationBasic> stationBasicList = stationBasicMapper.selectList(new QueryWrapper<StationBasic>().isNotNull("sequence_nbr").eq("station_type", "FDZ"));
stationBasicList.forEach(stationBasic -> { stationBasicList.forEach(stationBasic -> {
...@@ -261,7 +276,7 @@ public class MonitoringServiceIMQTTmpl { ...@@ -261,7 +276,7 @@ public class MonitoringServiceIMQTTmpl {
/** /**
* 实时同送-获取各场站的风机列表 * 实时同送-获取各场站的风机列表
*/ */
@Scheduled(cron = "0/30 * * * * *") @Scheduled(cron = fanStatusListCron)
public void getFanStatusList() { public void getFanStatusList() {
List<StationBasic> stationBasicList = stationBasicMapper.selectList(new QueryWrapper<StationBasic>().isNotNull("sequence_nbr").eq("station_type", "FDZ")); List<StationBasic> stationBasicList = stationBasicMapper.selectList(new QueryWrapper<StationBasic>().isNotNull("sequence_nbr").eq("station_type", "FDZ"));
stationBasicList.forEach(stationBasic -> { stationBasicList.forEach(stationBasic -> {
...@@ -281,7 +296,7 @@ public class MonitoringServiceIMQTTmpl { ...@@ -281,7 +296,7 @@ public class MonitoringServiceIMQTTmpl {
}); });
} }
@Scheduled(cron = "0 0/1 * * * *") @Scheduled(cron = fanCurrentDataCron)
public void getFanCurrentData() { public void getFanCurrentData() {
Integer current = 1; Integer current = 1;
Integer size = 99; Integer size = 99;
...@@ -304,7 +319,7 @@ public class MonitoringServiceIMQTTmpl { ...@@ -304,7 +319,7 @@ public class MonitoringServiceIMQTTmpl {
IPage<IndexDto> realTimedata = monitorFanIndicator.getFanIdxInfoByPage(equipmentNumber, stationBasicId, "实时运行数据", current, size, ""); IPage<IndexDto> realTimedata = monitorFanIndicator.getFanIdxInfoByPage(equipmentNumber, stationBasicId, "实时运行数据", current, size, "");
Map<String, Object> windSpeedOfFan = monitorFanIndicator.getDetailsWindSpeed(stationBasic.getFanGatewayId(), equipmentNumber); Map<String, Object> windSpeedOfFan = monitorFanIndicator.getDetailsWindSpeed(stationBasic.getFanGatewayId(), equipmentNumber);
//发电机系统 //发电机系统
IPage<IndexDto> fdjSystem = monitorFanIndicator.getFanIdxInfoByPage(equipmentNumber, stationBasicId, "实时运行数据", current, size,"发电机系统"); IPage<IndexDto> fdjSystem = monitorFanIndicator.getFanIdxInfoByPage(equipmentNumber, stationBasicId, "实时运行数据", current, size, "发电机系统");
List<Map<String, Object>> fdjSystemStatus = monitorFanIndicator.getStatusMonitoring(stationBasic.getFanGatewayId(), equipmentNumber, "发电机系统"); List<Map<String, Object>> fdjSystemStatus = monitorFanIndicator.getStatusMonitoring(stationBasic.getFanGatewayId(), equipmentNumber, "发电机系统");
//机舱与塔筒系统 //机舱与塔筒系统
IPage<IndexDto> jcyttSystem = monitorFanIndicator.getFanIdxInfoByPage(equipmentNumber, stationBasicId, "实时运行数据", current, size, "机舱与塔筒系统"); IPage<IndexDto> jcyttSystem = monitorFanIndicator.getFanIdxInfoByPage(equipmentNumber, stationBasicId, "实时运行数据", current, size, "机舱与塔筒系统");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment