Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
281491e4
Commit
281491e4
authored
Jul 10, 2023
by
caotao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
监盘数据由接口调用改为消息推送
parent
49f3c99a
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
186 additions
and
3 deletions
+186
-3
MonitoringServiceIMQTTmpl.java
...ule/jxiop/biz/service/impl/MonitoringServiceIMQTTmpl.java
+186
-3
No files found.
amos-boot-system-jxiop/amos-boot-module-jxiop-monitor-biz/src/main/java/com/yeejoin/amos/boot/module/jxiop/biz/service/impl/MonitoringServiceIMQTTmpl.java
View file @
281491e4
...
...
@@ -2,7 +2,9 @@ package com.yeejoin.amos.boot.module.jxiop.biz.service.impl;
import
com.alibaba.fastjson.JSON
;
import
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper
;
import
com.baomidou.mybatisplus.core.metadata.IPage
;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
import
com.yeejoin.amos.boot.module.jxiop.api.dto.IndexDto
;
import
com.yeejoin.amos.boot.module.jxiop.api.entity.Region
;
import
com.yeejoin.amos.boot.module.jxiop.api.entity.StationBasic
;
import
com.yeejoin.amos.boot.module.jxiop.api.mapper.RegionMapper
;
...
...
@@ -16,9 +18,9 @@ import org.springframework.scheduling.annotation.Scheduled;
import
org.springframework.stereotype.Service
;
import
org.typroject.tyboot.component.emq.EmqKeeper
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.*
;
import
java.util.concurrent.atomic.AtomicReference
;
import
java.util.stream.Collectors
;
@Async
@Service
...
...
@@ -37,8 +39,15 @@ public class MonitoringServiceIMQTTmpl {
RegionMapper
regionMapper
;
@Autowired
EmqKeeper
emqKeeper
;
@Autowired
MonitorFanIndicatorImpl
monitorFanIndicator
;
@Autowired
CommonServiceImpl
commonService
;
/**
* 社会贡献定时消息发送 1分钟推送一次
*/
@Scheduled
(
cron
=
"0 0/1 * * * *"
)
public
void
getTotalSocialContribution
()
{
Page
<
SocialContributionDto
>
socialContributionDtoPage
=
new
Page
<
SocialContributionDto
>();
...
...
@@ -88,7 +97,9 @@ public class MonitoringServiceIMQTTmpl {
}
}
/**
* 区域实时数据消息推送-30s一次
*/
@Scheduled
(
cron
=
"0/30 * * * * *"
)
public
void
getCompletionOfPowerIndicatorsByProvinceName
()
{
Page
<
SocialContributionDto
>
socialContributionDtoPage
=
new
Page
<
SocialContributionDto
>();
...
...
@@ -142,4 +153,176 @@ public class MonitoringServiceIMQTTmpl {
logger
.
error
(
"-----------------发送区域实时生产数据消息=================== 失败!"
);
}
}
/**
* 实时推送-风电站风机状态统计数据
*/
@Scheduled
(
cron
=
"0/30 0 * * * *"
)
public
void
getFanStatusStatistics
()
{
List
<
StationBasic
>
stationBasicList
=
stationBasicMapper
.
selectList
(
new
QueryWrapper
<
StationBasic
>().
isNotNull
(
"sequence_nbr"
).
eq
(
"station_type"
,
"FDZ"
));
stationBasicList
.
forEach
(
stationBasic
->
{
List
<
IndexDto
>
fanStatusStatistics
=
monitorFanIndicator
.
getFanStatusStatistics
(
String
.
valueOf
(
stationBasic
.
getSequenceNbr
()));
Page
<
IndexDto
>
page
=
new
Page
<>(
1
,
10
);
List
<
IndexDto
>
collect
=
fanStatusStatistics
.
stream
()
.
limit
(
10
)
.
collect
(
Collectors
.
toList
());
page
.
setTotal
(
fanStatusStatistics
.
size
());
page
.
setRecords
(
collect
);
try
{
emqKeeper
.
getMqttClient
().
publish
(
stationBasic
.
getSequenceNbr
()
+
"_fanStatusStatistics_topic"
,
JSON
.
toJSON
(
page
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
logger
.
info
(
"-----------------发送风电站风机状态数据消息=================== 成功!"
+
JSON
.
toJSONString
(
page
));
}
catch
(
Exception
exception
)
{
logger
.
error
(
"-----------------发送风电站风机状态数据消息=================== 失败!"
);
}
});
}
/**
* 实时推送-风电站风场总概览
*/
@Scheduled
(
cron
=
"0/30 * * * * *"
)
public
void
getFanstationOverview
()
{
List
<
StationBasic
>
stationBasicList
=
stationBasicMapper
.
selectList
(
new
QueryWrapper
<
StationBasic
>().
isNotNull
(
"sequence_nbr"
).
eq
(
"station_type"
,
"FDZ"
));
stationBasicList
.
forEach
(
stationBasic
->
{
String
gatewayId
=
stationBasic
.
getFanGatewayId
();
String
[]
sumColumns
=
new
String
[]{
"日发电量"
,
"月发电量"
,
"年发电量"
};
String
[]
avgColumns
=
new
String
[]{
"有功功率"
,
"瞬时风速"
};
List
<
String
>
columnList
=
Arrays
.
asList
(
sumColumns
);
List
<
String
>
columnLists
=
Arrays
.
asList
(
avgColumns
);
Map
<
String
,
Object
>
columnMap
=
new
HashMap
<>();
for
(
String
column
:
columnList
)
{
Double
result
=
commonService
.
getTotalByIndicatior
(
gatewayId
,
column
);
columnMap
.
put
(
column
,
result
);
}
for
(
String
column
:
columnLists
)
{
Double
result
=
commonService
.
getAvgvalueByIndicatior
(
gatewayId
,
column
);
columnMap
.
put
(
column
,
result
);
}
String
num
=
monitorFanIndicator
.
getFJCount
(
gatewayId
);
columnMap
.
put
(
"风机台数"
,
num
);
Double
capacityl
=
commonService
.
getStationCapactityByStationWerks
(
stationBasic
.
getStationNumber
());
columnMap
.
put
(
"装机容量"
,
capacityl
);
List
<
Map
<
String
,
Object
>>
objects
=
new
ArrayList
<>();
Map
<
String
,
Object
>
data
=
new
HashMap
<>();
data
.
put
(
"title"
,
Float
.
valueOf
(
columnMap
.
get
(
"装机容量"
).
toString
()));
objects
.
add
(
data
);
Map
<
String
,
Object
>
data1
=
new
HashMap
<>();
data1
.
put
(
"title"
,
Float
.
valueOf
(
columnMap
.
get
(
"日发电量"
).
toString
()));
objects
.
add
(
data1
);
Map
<
String
,
Object
>
data2
=
new
HashMap
<>();
data2
.
put
(
"title"
,
columnMap
.
get
(
"风机台数"
).
toString
().
replace
(
".0"
,
""
));
objects
.
add
(
data2
);
Map
<
String
,
Object
>
data3
=
new
HashMap
<>();
data3
.
put
(
"title"
,
Float
.
valueOf
(
columnMap
.
get
(
"月发电量"
).
toString
()));
objects
.
add
(
data3
);
Map
<
String
,
Object
>
data4
=
new
HashMap
<>();
data4
.
put
(
"title"
,
Float
.
valueOf
(
columnMap
.
get
(
"瞬时风速"
).
toString
()));
objects
.
add
(
data4
);
Map
<
String
,
Object
>
data5
=
new
HashMap
<>();
data5
.
put
(
"title"
,
Float
.
valueOf
(
columnMap
.
get
(
"年发电量"
).
toString
()));
objects
.
add
(
data5
);
Map
<
String
,
Object
>
data6
=
new
HashMap
<>();
data6
.
put
(
"title"
,
Float
.
valueOf
(
columnMap
.
get
(
"有功功率"
).
toString
()));
objects
.
add
(
data6
);
Map
<
String
,
Object
>
data7
=
new
HashMap
<>();
data7
.
put
(
"title"
,
7.47
);
objects
.
add
(
data7
);
IPage
<
Map
<
String
,
Object
>>
result
=
new
Page
<>();
result
.
setRecords
(
objects
);
result
.
setCurrent
(
1
);
result
.
setTotal
(
objects
.
size
());
try
{
emqKeeper
.
getMqttClient
().
publish
(
stationBasic
.
getSequenceNbr
()
+
"_fanStationOverview_topic"
,
JSON
.
toJSON
(
result
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
logger
.
info
(
"-----------------发送风电站总概览数据消息=================== 成功!"
+
JSON
.
toJSONString
(
result
));
}
catch
(
Exception
exception
)
{
logger
.
error
(
"-----------------发送风电站总概览数据消息=================== 失败!"
);
}
});
}
/**
* 实时推送-场站功率曲线总概览
*/
@Scheduled
(
cron
=
"0/30 * * * * *"
)
public
void
getFanStationPowerBight
()
{
List
<
StationBasic
>
stationBasicList
=
stationBasicMapper
.
selectList
(
new
QueryWrapper
<
StationBasic
>().
isNotNull
(
"sequence_nbr"
).
eq
(
"station_type"
,
"FDZ"
));
stationBasicList
.
forEach
(
stationBasic
->
{
String
gatewayId
=
stationBasic
.
getFanGatewayId
();
Map
<
String
,
Object
>
detailsWindSpeed
=
monitorFanIndicator
.
getDetailsWindSpeedAll
(
gatewayId
);
try
{
emqKeeper
.
getMqttClient
().
publish
(
stationBasic
.
getSequenceNbr
()
+
"_fanStationPowerBight_topic"
,
JSON
.
toJSON
(
detailsWindSpeed
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
logger
.
info
(
"-----------------发送风电站功率曲线数据消息=================== 成功!"
+
JSON
.
toJSONString
(
detailsWindSpeed
));
}
catch
(
Exception
exception
)
{
logger
.
error
(
"-----------------发送风电站功率曲线数据消息=================== 失败!"
);
}
});
}
/**
* 实时同送-获取各场站的风机列表
*/
@Scheduled
(
cron
=
"0/30 * * * * *"
)
public
void
getFanStatusList
()
{
List
<
StationBasic
>
stationBasicList
=
stationBasicMapper
.
selectList
(
new
QueryWrapper
<
StationBasic
>().
isNotNull
(
"sequence_nbr"
).
eq
(
"station_type"
,
"FDZ"
));
stationBasicList
.
forEach
(
stationBasic
->
{
List
<
IndexDto
>
fanStatusList
=
monitorFanIndicator
.
getFanStatusList
(
String
.
valueOf
(
stationBasic
.
getSequenceNbr
()));
Page
<
IndexDto
>
page
=
new
Page
<>(
1
,
999
);
List
<
IndexDto
>
collect
=
fanStatusList
.
stream
()
.
limit
(
999
)
.
collect
(
Collectors
.
toList
());
page
.
setTotal
(
fanStatusList
.
size
());
page
.
setRecords
(
collect
);
try
{
emqKeeper
.
getMqttClient
().
publish
(
stationBasic
.
getSequenceNbr
()
+
"_fanStationFanStatusList_topic"
,
JSON
.
toJSON
(
page
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
logger
.
info
(
"-----------------发送风电站风机列表数据消息=================== 成功!"
+
JSON
.
toJSONString
(
page
));
}
catch
(
Exception
exception
)
{
logger
.
error
(
"-----------------发送风电站风机列表数据消息=================== 失败!"
);
}
});
}
@Scheduled
(
cron
=
"0 0/1 * * * *"
)
public
void
getFanCurrentData
()
{
List
<
StationBasic
>
stationBasicList
=
stationBasicMapper
.
selectList
(
new
QueryWrapper
<
StationBasic
>().
isNotNull
(
"sequence_nbr"
).
eq
(
"station_type"
,
"FDZ"
));
stationBasicList
.
forEach
(
stationBasic
->
{
List
<
IndexDto
>
fanStatusList
=
monitorFanIndicator
.
getFanStatusList
(
String
.
valueOf
(
stationBasic
.
getSequenceNbr
()));
fanStatusList
.
forEach
(
indexDto
->
{
String
stationBasicId
=
String
.
valueOf
(
stationBasic
.
getSequenceNbr
());
String
equipmentNumber
=
indexDto
.
getEquipmentNumber
();
String
topicPrefix
=
stationBasicId
+
"_"
+
equipmentNumber
;
IndexDto
info
=
monitorFanIndicator
.
getFanBasicInfoByEquipNum
(
equipmentNumber
,
stationBasicId
);
HashMap
<
String
,
Object
>
windspeddInfo
=
new
HashMap
<>();
String
windspeddInfoValue
=
monitorFanIndicator
.
getFanDataByType
(
stationBasicId
,
equipmentNumber
,
"实时监控表计"
,
"风速"
);
windspeddInfo
.
put
(
"value"
,
windspeddInfoValue
);
windspeddInfo
.
put
(
"value"
,
100
);
String
activepower
=
monitorFanIndicator
.
getFanDataByType
(
stationBasicId
,
equipmentNumber
,
"实时监控表计"
,
"有功功率"
);
String
powerFrequency
=
monitorFanIndicator
.
getFanDataByType
(
stationBasicId
,
equipmentNumber
,
"实时监控表计"
,
"电网频率"
);
String
hubSpeed
=
monitorFanIndicator
.
getFanDataByType
(
stationBasicId
,
equipmentNumber
,
"实时监控表计"
,
"轮毂转速"
);
HashMap
<
String
,
List
<
String
>>
realTimeTemperatureResult
=
monitorFanIndicator
.
getRealTimeTemperature
(
equipmentNumber
,
stationBasicId
,
"实时监控表计"
);
IPage
<
IndexDto
>
realTimedata
=
monitorFanIndicator
.
getFanIdxInfoByPage
(
equipmentNumber
,
stationBasicId
,
"实时运行数据"
,
1
,
99
,
""
);
Map
<
String
,
Object
>
windSpeedOfFan
=
monitorFanIndicator
.
getDetailsWindSpeed
(
stationBasic
.
getFanGatewayId
(),
equipmentNumber
);
try
{
//{stationBasicId}_{equipmentNumber}_fanBasicInfo_topic
emqKeeper
.
getMqttClient
().
publish
(
topicPrefix
+
"_fanBasicInfo_topic"
,
JSON
.
toJSON
(
info
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
//{stationBasicId}_{equipmentNumber}_windspeddInfo_topic
emqKeeper
.
getMqttClient
().
publish
(
topicPrefix
+
"_windspeddInfo_topic"
,
JSON
.
toJSON
(
windspeddInfo
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
//{stationBasicId}_{equipmentNumber}_activepower_topic
emqKeeper
.
getMqttClient
().
publish
(
topicPrefix
+
"_activepower_topic"
,
activepower
.
getBytes
(
"UTF-8"
),
1
,
true
);
//{stationBasicId}_{equipmentNumber}_powerFrequency_topic
emqKeeper
.
getMqttClient
().
publish
(
topicPrefix
+
"_powerFrequency_topic"
,
powerFrequency
.
getBytes
(
"UTF-8"
),
1
,
true
);
//{stationBasicId}_{equipmentNumber}_hubSpeed_topic
emqKeeper
.
getMqttClient
().
publish
(
topicPrefix
+
"_hubSpeed_topic"
,
hubSpeed
.
getBytes
(
"UTF-8"
),
1
,
true
);
//{stationBasicId}_{equipmentNumber}_realTimeTemperature_topic
emqKeeper
.
getMqttClient
().
publish
(
topicPrefix
+
"_realTimeTemperature_topic"
,
JSON
.
toJSON
(
realTimeTemperatureResult
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
//{stationBasicId}_{equipmentNumber}_realTimedata_topic
emqKeeper
.
getMqttClient
().
publish
(
topicPrefix
+
"_realTimedata_topic"
,
JSON
.
toJSON
(
realTimedata
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
//{stationBasicId}_{equipmentNumber}_windSpeedOfFan_topic
emqKeeper
.
getMqttClient
().
publish
(
topicPrefix
+
"_windSpeedOfFan_topic"
,
JSON
.
toJSON
(
windSpeedOfFan
).
toString
().
getBytes
(
"UTF-8"
),
1
,
true
);
logger
.
info
(
"-----------------发送风电站风机基础数据消息=================== 成功!"
);
}
catch
(
Exception
exception
)
{
logger
.
error
(
"-----------------发送风电站风机基础数据消息=================== 失败!"
);
}
});
});
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment