Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
65e68b55
Commit
65e68b55
authored
Feb 28, 2024
by
litengwei
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/develop_dl' into develop_dl
parents
d1919f48
62ea3a99
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
109 additions
and
59 deletions
+109
-59
ConfigPageTopicEnum.java
...yeejoin/equipmanage/common/enums/ConfigPageTopicEnum.java
+5
-4
IntegratePageDataListener.java
...ejoin/equipmanage/listener/IntegratePageDataListener.java
+79
-51
ApplicationRunnerImpl.java
...ejoin/equipmanage/service/impl/ApplicationRunnerImpl.java
+2
-1
MqttReceiveServiceImpl.java
...join/equipmanage/service/impl/MqttReceiveServiceImpl.java
+23
-3
No files found.
amos-boot-module/amos-boot-module-api/amos-boot-module-equip-api/src/main/java/com/yeejoin/equipmanage/common/enums/ConfigPageTopicEnum.java
View file @
65e68b55
...
...
@@ -15,13 +15,14 @@ import java.util.List;
public
enum
ConfigPageTopicEnum
{
INTEGRATE
(
"INTEGRATE_TOPIC/#"
,
"集成页面统配主题"
),
SYSTEMDETAIL
(
"EQUIP_INDEX_ON_SYSTEM_DETAIL"
,
"系统详情页面大屏初始化主题"
);
EQUIP_INDICATOR
(
"EQUIP_INDEX_ON_SYSTEM_DETAIL"
,
"系统详情页面测点初始化主题(单测点)"
),
EQUIP_MULTI_INDICATOR
(
"EQUIP_MULTI_INDEX_ON_SYSTEM_DETAIL"
,
"系统详情页面测点初始化主题(设备绑定多测点)"
);
private
String
topic
;
private
String
describe
;
public
static
List
<
String
>
getEnumTopicList
()
{
List
<
String
>
topics
=
new
ArrayList
<>();
for
(
ConfigPageTopicEnum
e
:
ConfigPageTopicEnum
.
values
())
{
...
...
@@ -29,7 +30,7 @@ public enum ConfigPageTopicEnum {
}
return
topics
;
}
public
static
boolean
isEqualsTopic
(
String
topic
)
{
for
(
ConfigPageTopicEnum
e
:
ConfigPageTopicEnum
.
values
())
{
if
(
e
.
getTopic
().
equals
(
topic
)){
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/listener/IntegratePageDataListener.java
View file @
65e68b55
package
com
.
yeejoin
.
equipmanage
.
listener
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.
sun.org.apache.xpath.internal.operations.Bool
;
import
com.
baomidou.mybatisplus.core.toolkit.Wrappers
;
import
com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex
;
import
com.yeejoin.equipmanage.common.enums.ConfigPageTopicEnum
;
import
com.yeejoin.equipmanage.service.IEquipmentSpecificIndexSerivce
;
import
com.yeejoin.equipmanage.service.IEquipmentSpecificSerivce
;
import
com.yeejoin.equipmanage.service.IFireFightingSystemService
;
import
javafx.concurrent.Task
;
import
com.yeejoin.equipmanage.service.MqttSendGateway
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.sis.util.Static
;
import
org.eclipse.paho.client.mqttv3.MqttException
;
import
org.eclipse.paho.client.mqttv3.MqttMessage
;
import
org.json.JSONString
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
org.springframework.util.ObjectUtils
;
import
org.typroject.tyboot.component.emq.EmqKeeper
;
import
org.typroject.tyboot.component.emq.EmqxListener
;
import
java.sql.Time
;
import
java.util.*
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.ScheduledThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.stream.Collectors
;
/**
* @author DELL
*/
@Component
@Slf4j
@Component
public
class
IntegratePageDataListener
extends
EmqxListener
{
@Autowired
IFireFightingSystemService
fireFightingSystemService
;
@Autowired
IFireFightingSystemService
fireFightingSystemService
;
@Autowired
IEquipmentSpecificIndexSerivce
equipmentSpecificIndexSeriv
ce
;
@Autowired
IEquipmentSpecificSerivce
equipmentSpecificServi
ce
;
@Autowired
EmqKeeper
emqKeeper
;
private
boolean
initialized
=
Boolean
.
TRUE
;
@Autowired
IEquipmentSpecificIndexSerivce
equipmentSpecificIndexService
;
@Override
static
MqttSendGateway
mqttSendGateway
;
@Autowired
public
void
setMqttSendGateway
(
MqttSendGateway
mqttSendGateway
)
{
IntegratePageDataListener
.
mqttSendGateway
=
mqttSendGateway
;
}
@Override
public
void
processMessage
(
String
topic
,
MqttMessage
message
)
throws
Exception
{
if
(
log
.
isInfoEnabled
())
{
log
.
info
(
"收到消息主题:{},消息内容:{}"
,
topic
,
message
.
toString
());
if
(
log
.
isInfoEnabled
())
{
log
.
info
(
"收到消息主题:{},消息内容:{}"
,
topic
,
message
.
toString
());
}
Map
msg
=
JSON
.
parseObject
(
message
.
toString
());
Timer
timer
=
new
Timer
();
if
(
msg
.
containsKey
(
"request"
)){
String
split
=
"/"
;
if
(
topic
.
contains
(
split
)){
Map
<
String
,
Object
>
msg
=
JSON
.
parseObject
(
message
.
toString
());
if
(
msg
.
containsKey
(
"request"
))
{
String
split
=
"/"
;
if
(
topic
.
contains
(
split
))
{
String
code
=
topic
.
substring
(
topic
.
indexOf
(
split
)
+
1
);
fireFightingSystemService
.
integrationPageSysData
(
code
,
false
);
}
}
else
if
(
ConfigPageTopicEnum
.
SYSTEMDETAIL
.
getTopic
().
equalsIgnoreCase
(
topic
)
&&
initialized
)
{
if
(!
ObjectUtils
.
isEmpty
(
msg
.
get
(
"codes"
)))
{
List
<
String
>
list
=
JSON
.
parseArray
(
String
.
valueOf
(
msg
.
get
(
"codes"
)),
String
.
class
);
list
.
parallelStream
().
forEach
(
x
->
{
EquipmentSpecificIndex
indexEntity
=
equipmentSpecificIndexSerivce
.
getById
(
x
);
Map
<
String
,
String
>
map
=
new
HashMap
<>();
map
.
put
(
"code"
,
String
.
valueOf
(
indexEntity
.
getId
()));
map
.
put
(
"value"
,
indexEntity
.
getValue
());
map
.
put
(
"status"
,
indexEntity
.
getValue
());
try
{
emqKeeper
.
getMqttClient
().
publish
(
topic
,
JSON
.
toJSONString
(
map
).
getBytes
(),
1
,
false
);
}
catch
(
MqttException
e
)
{
e
.
printStackTrace
();
}
});
}
else
if
(
ConfigPageTopicEnum
.
EQUIP_INDICATOR
.
getTopic
().
equalsIgnoreCase
(
topic
)
||
ConfigPageTopicEnum
.
EQUIP_MULTI_INDICATOR
.
getTopic
().
equalsIgnoreCase
(
topic
))
{
this
.
initializeIntegrationPageData
(
topic
,
msg
);
}
}
private
void
initializeIntegrationPageData
(
String
topic
,
Map
<
String
,
Object
>
message
)
{
Object
codes
=
message
.
get
(
"codes"
);
// 设备绑定多测点:codes为"equipment_specific_id"
if
(
ConfigPageTopicEnum
.
EQUIP_MULTI_INDICATOR
.
getTopic
().
equalsIgnoreCase
(
topic
)
&&
Objects
.
nonNull
(
codes
)
&&
!
ObjectUtils
.
isEmpty
(
codes
))
{
List
<
String
>
equipIds
=
JSON
.
parseArray
(
String
.
valueOf
(
codes
),
String
.
class
);
List
<
EquipmentSpecificIndex
>
indices
=
equipmentSpecificIndexService
.
list
(
Wrappers
.<
EquipmentSpecificIndex
>
lambdaQuery
()
.
select
(
EquipmentSpecificIndex:
:
getEquipmentSpecificId
,
EquipmentSpecificIndex:
:
getEquipmentIndexKey
,
EquipmentSpecificIndex:
:
getValue
)
.
in
(
EquipmentSpecificIndex:
:
getEquipmentSpecificId
,
equipIds
)
.
isNotNull
(
EquipmentSpecificIndex:
:
getValue
)
.
ne
(
EquipmentSpecificIndex:
:
getValue
,
""
)
.
orderByDesc
(
EquipmentSpecificIndex:
:
getEmergencyLevel
)
);
Map
<
Long
,
List
<
EquipmentSpecificIndex
>>
groupedIndices
=
indices
.
stream
().
collect
(
Collectors
.
groupingBy
(
EquipmentSpecificIndex:
:
getEquipmentSpecificId
));
JSONObject
emqMessage
;
for
(
Map
.
Entry
<
Long
,
List
<
EquipmentSpecificIndex
>>
entry
:
groupedIndices
.
entrySet
())
{
Long
equipmentSpecificId
=
entry
.
getKey
();
List
<
EquipmentSpecificIndex
>
specificIndexList
=
entry
.
getValue
();
List
<
HashMap
<
String
,
String
>>
valuedIndexes
=
specificIndexList
.
stream
().
map
(
index
->
new
HashMap
<
String
,
String
>()
{{
put
(
"key"
,
index
.
getEquipmentIndexKey
());
put
(
"value"
,
index
.
getValue
());
}}).
collect
(
Collectors
.
toList
());
emqMessage
=
new
JSONObject
()
{{
this
.
put
(
"code"
,
String
.
valueOf
(
equipmentSpecificId
));
this
.
put
(
"valuedIndexes"
,
valuedIndexes
);
}};
mqttSendGateway
.
sendToMqtt
(
ConfigPageTopicEnum
.
EQUIP_MULTI_INDICATOR
.
getTopic
(),
emqMessage
.
toJSONString
());
}
initialized
=
Boolean
.
FALSE
;
}
// 单测点:codes为"equipment_specific_index_id"
if
(
ConfigPageTopicEnum
.
EQUIP_INDICATOR
.
getTopic
().
equalsIgnoreCase
(
topic
)
&&
Objects
.
nonNull
(
codes
)
&&
!
ObjectUtils
.
isEmpty
(
codes
))
{
List
<
String
>
ids
=
JSON
.
parseArray
(
String
.
valueOf
(
codes
),
String
.
class
);
List
<
EquipmentSpecificIndex
>
indices
=
equipmentSpecificIndexService
.
list
(
Wrappers
.<
EquipmentSpecificIndex
>
lambdaQuery
()
.
select
(
EquipmentSpecificIndex:
:
getId
,
EquipmentSpecificIndex:
:
getEquipmentIndexKey
,
EquipmentSpecificIndex:
:
getValue
)
.
in
(
EquipmentSpecificIndex:
:
getId
,
ids
)
);
TimerTask
timerTask
=
new
TimerTask
()
{
@Override
public
void
run
()
{
initialized
=
Boolean
.
TRUE
;
}
};
timer
.
schedule
(
timerTask
,
3000
);
JSONObject
emqMessage
;
for
(
EquipmentSpecificIndex
index
:
indices
)
{
emqMessage
=
new
JSONObject
()
{{
this
.
put
(
"code"
,
String
.
valueOf
(
index
.
getId
()));
this
.
put
(
"value"
,
index
.
getValue
());
this
.
put
(
"status"
,
index
.
getValue
());
}};
mqttSendGateway
.
sendToMqtt
(
ConfigPageTopicEnum
.
EQUIP_INDICATOR
.
getTopic
(),
emqMessage
.
toJSONString
());
}
}
}
}
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/ApplicationRunnerImpl.java
View file @
65e68b55
...
...
@@ -63,6 +63,7 @@ public class ApplicationRunnerImpl implements ApplicationRunner {
maintenanceResourceDataService
.
subscribeTopic
();
emqKeeper
.
subscript
(
ConfigPageTopicEnum
.
INTEGRATE
.
getTopic
(),
2
,
integratePageDataListener
);
emqKeeper
.
subscript
(
ConfigPageTopicEnum
.
SYSTEMDETAIL
.
getTopic
(),
2
,
integratePageDataListener
);
emqKeeper
.
subscript
(
ConfigPageTopicEnum
.
EQUIP_INDICATOR
.
getTopic
(),
2
,
integratePageDataListener
);
emqKeeper
.
subscript
(
ConfigPageTopicEnum
.
EQUIP_MULTI_INDICATOR
.
getTopic
(),
2
,
integratePageDataListener
);
}
}
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/MqttReceiveServiceImpl.java
View file @
65e68b55
...
...
@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
import
com.alibaba.fastjson.JSONObject
;
import
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
;
import
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper
;
import
com.baomidou.mybatisplus.core.toolkit.Wrappers
;
import
com.yeejoin.amos.boot.biz.common.utils.RedisKey
;
import
com.yeejoin.amos.boot.biz.common.utils.RedisUtils
;
import
com.yeejoin.amos.component.influxdb.InfluxDbConnection
;
...
...
@@ -2686,14 +2687,33 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
}
private
void
pushDataToIntegrationPage
(
List
<
EquipmentSpecificIndex
>
specificIndices
)
{
@Async
public
void
pushDataToIntegrationPage
(
List
<
EquipmentSpecificIndex
>
specificIndices
)
{
for
(
EquipmentSpecificIndex
specificIndex
:
specificIndices
)
{
JSONObject
message
=
new
JSONObject
()
{{
Long
equipmentSpecificId
=
specificIndex
.
getEquipmentSpecificId
();
List
<
EquipmentSpecificIndex
>
equipmentSpecificIndices
=
equipmentSpecificIndexMapper
.
selectList
(
Wrappers
.<
EquipmentSpecificIndex
>
lambdaQuery
()
.
eq
(
EquipmentSpecificIndex:
:
getEquipmentSpecificId
,
equipmentSpecificId
)
.
isNotNull
(
EquipmentSpecificIndex:
:
getValue
)
.
ne
(
EquipmentSpecificIndex:
:
getValue
,
""
)
.
orderByDesc
(
EquipmentSpecificIndex:
:
getEmergencyLevel
)
);
List
<
HashMap
<
String
,
String
>>
valuedIndexes
=
equipmentSpecificIndices
.
stream
().
map
(
index
->
new
HashMap
<
String
,
String
>()
{{
put
(
"key"
,
index
.
getEquipmentIndexKey
());
put
(
"value"
,
index
.
getValue
());
}}).
collect
(
Collectors
.
toList
());
JSONObject
message1
=
new
JSONObject
()
{{
put
(
"code"
,
String
.
valueOf
(
specificIndex
.
getId
()));
put
(
"status"
,
specificIndex
.
getValue
());
put
(
"value"
,
specificIndex
.
getValue
());
}};
mqttSendGateway
.
sendToMqtt
(
ConfigPageTopicEnum
.
SYSTEMDETAIL
.
getTopic
(),
message
.
toJSONString
());
JSONObject
message2
=
new
JSONObject
()
{{
put
(
"code"
,
String
.
valueOf
(
specificIndex
.
getEquipmentSpecificId
()));
put
(
"valuedIndexes"
,
valuedIndexes
);
}};
mqttSendGateway
.
sendToMqtt
(
ConfigPageTopicEnum
.
EQUIP_INDICATOR
.
getTopic
(),
message1
.
toJSONString
());
mqttSendGateway
.
sendToMqtt
(
ConfigPageTopicEnum
.
EQUIP_MULTI_INDICATOR
.
getTopic
(),
message2
.
toJSONString
());
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment