Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
fc85e8b2
Commit
fc85e8b2
authored
Feb 26, 2024
by
李秀明
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
feat: 系统图支持拖设备绑定多个测点
parent
c6eaf6fb
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
107 additions
and
49 deletions
+107
-49
IntegratePageDataListener.java
...ejoin/equipmanage/listener/IntegratePageDataListener.java
+91
-48
MqttReceiveServiceImpl.java
...join/equipmanage/service/impl/MqttReceiveServiceImpl.java
+16
-1
No files found.
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/listener/IntegratePageDataListener.java
View file @
fc85e8b2
package
com
.
yeejoin
.
equipmanage
.
listener
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.
sun.org.apache.xpath.internal.operations.Bool
;
import
com.
baomidou.mybatisplus.core.toolkit.Wrappers
;
import
com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex
;
import
com.yeejoin.equipmanage.common.enums.ConfigPageTopicEnum
;
import
com.yeejoin.equipmanage.service.IEquipmentSpecificIndexSerivce
;
import
com.yeejoin.equipmanage.service.IEquipmentSpecificSerivce
;
import
com.yeejoin.equipmanage.service.IFireFightingSystemService
;
import
javafx.concurrent.Task
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.sis.util.Static
;
import
org.eclipse.paho.client.mqttv3.MqttException
;
import
org.eclipse.paho.client.mqttv3.MqttMessage
;
import
org.json.JSONString
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
org.springframework.util.ObjectUtils
;
import
org.typroject.tyboot.component.emq.EmqKeeper
;
import
org.typroject.tyboot.component.emq.EmqxListener
;
import
java.sql.Time
;
import
java.util.*
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.ScheduledThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.stream.Collectors
;
/**
* @author DELL
*/
@Component
@Slf4j
@Component
public
class
IntegratePageDataListener
extends
EmqxListener
{
@Autowired
IFireFightingSystemService
fireFightingSystemService
;
@Autowired
IFireFightingSystemService
fireFightingSystemService
;
@Autowired
IEquipmentSpecificSerivce
equipmentSpecificService
;
@Autowired
IEquipmentSpecificIndexSerivce
equipmentSpecificIndexSeriv
ce
;
@Autowired
IEquipmentSpecificIndexSerivce
equipmentSpecificIndexServi
ce
;
@Autowired
@Autowired
EmqKeeper
emqKeeper
;
private
boolean
initialized
=
Boolean
.
TRUE
;
@Override
@Override
public
void
processMessage
(
String
topic
,
MqttMessage
message
)
throws
Exception
{
if
(
log
.
isInfoEnabled
())
{
log
.
info
(
"收到消息主题:{},消息内容:{}"
,
topic
,
message
.
toString
());
if
(
log
.
isInfoEnabled
())
{
log
.
info
(
"收到消息主题:{},消息内容:{}"
,
topic
,
message
.
toString
());
}
Map
msg
=
JSON
.
parseObject
(
message
.
toString
());
Timer
timer
=
new
Timer
();
if
(
msg
.
containsKey
(
"request"
)){
String
split
=
"/"
;
if
(
topic
.
contains
(
split
)){
Map
<
String
,
Object
>
msg
=
JSON
.
parseObject
(
message
.
toString
());
if
(
msg
.
containsKey
(
"request"
))
{
String
split
=
"/"
;
if
(
topic
.
contains
(
split
))
{
String
code
=
topic
.
substring
(
topic
.
indexOf
(
split
)
+
1
);
fireFightingSystemService
.
integrationPageSysData
(
code
,
false
);
}
}
else
if
(
ConfigPageTopicEnum
.
SYSTEMDETAIL
.
getTopic
().
equalsIgnoreCase
(
topic
)
&&
initialized
)
{
if
(!
ObjectUtils
.
isEmpty
(
msg
.
get
(
"codes"
)))
{
List
<
String
>
list
=
JSON
.
parseArray
(
String
.
valueOf
(
msg
.
get
(
"codes"
)),
String
.
class
);
list
.
parallelStream
().
forEach
(
x
->
{
EquipmentSpecificIndex
indexEntity
=
equipmentSpecificIndexSerivce
.
getById
(
x
);
Map
<
String
,
String
>
map
=
new
HashMap
<>();
map
.
put
(
"code"
,
String
.
valueOf
(
indexEntity
.
getId
()));
map
.
put
(
"value"
,
indexEntity
.
getValue
());
map
.
put
(
"status"
,
indexEntity
.
getValue
());
try
{
emqKeeper
.
getMqttClient
().
publish
(
topic
,
JSON
.
toJSONString
(
map
).
getBytes
(),
1
,
false
);
}
catch
(
MqttException
e
)
{
e
.
printStackTrace
();
}
});
}
else
if
(
ConfigPageTopicEnum
.
SYSTEMDETAIL
.
getTopic
().
equalsIgnoreCase
(
topic
))
{
this
.
initializeIntegrationPageData
(
topic
,
msg
);
}
}
private
void
initializeIntegrationPageData
(
String
topic
,
Map
<
String
,
Object
>
message
)
{
Object
multiIndicator
=
message
.
get
(
"multiIndicator"
);
Object
codes
=
message
.
get
(
"codes"
);
if
(
ObjectUtils
.
isEmpty
(
codes
))
{
return
;
}
// 判断是否是设备绑定多测点
boolean
isDeviceBindMultiIndicator
=
Objects
.
nonNull
(
multiIndicator
)
&&
Boolean
.
parseBoolean
(
String
.
valueOf
(
multiIndicator
));
List
<
String
>
ids
=
JSON
.
parseArray
(
String
.
valueOf
(
codes
),
String
.
class
);
// 设备绑定多测点:codes为"equipment_specific_id"
if
(
isDeviceBindMultiIndicator
)
{
List
<
EquipmentSpecificIndex
>
indices
=
equipmentSpecificIndexService
.
list
(
Wrappers
.<
EquipmentSpecificIndex
>
lambdaQuery
()
.
select
(
EquipmentSpecificIndex:
:
getEquipmentSpecificId
,
EquipmentSpecificIndex:
:
getEquipmentIndexKey
,
EquipmentSpecificIndex:
:
getValue
)
.
in
(
EquipmentSpecificIndex:
:
getEquipmentSpecificId
,
ids
)
.
isNotNull
(
EquipmentSpecificIndex:
:
getValue
)
.
ne
(
EquipmentSpecificIndex:
:
getValue
,
"false"
)
.
ne
(
EquipmentSpecificIndex:
:
getValue
,
""
)
.
orderByDesc
(
EquipmentSpecificIndex:
:
getEmergencyLevel
)
);
Map
<
Long
,
List
<
EquipmentSpecificIndex
>>
groupedIndices
=
indices
.
stream
().
collect
(
Collectors
.
groupingBy
(
EquipmentSpecificIndex:
:
getEquipmentSpecificId
));
JSONObject
emqMessage
;
for
(
Map
.
Entry
<
Long
,
List
<
EquipmentSpecificIndex
>>
entry
:
groupedIndices
.
entrySet
())
{
Long
equipmentSpecificId
=
entry
.
getKey
();
List
<
EquipmentSpecificIndex
>
valuedIndexes
=
entry
.
getValue
();
List
<
HashMap
<
String
,
String
>>
indexKey
=
valuedIndexes
.
stream
().
map
(
index
->
new
HashMap
<
String
,
String
>()
{{
put
(
"key"
,
index
.
getEquipmentIndexKey
());
put
(
"value"
,
index
.
getValue
());
}}).
collect
(
Collectors
.
toList
());
emqMessage
=
new
JSONObject
()
{{
this
.
put
(
"code"
,
String
.
valueOf
(
equipmentSpecificId
));
this
.
put
(
"indexKey"
,
indexKey
);
}};
try
{
emqKeeper
.
getMqttClient
().
publish
(
topic
,
JSON
.
toJSONString
(
emqMessage
).
getBytes
(),
1
,
false
);
}
catch
(
MqttException
e
)
{
log
.
error
(
"发布消息失败"
,
e
);
}
}
initialized
=
Boolean
.
FALSE
;
}
// 单测点:codes为"equipment_specific_index_id"
else
{
List
<
EquipmentSpecificIndex
>
indices
=
equipmentSpecificIndexService
.
list
(
Wrappers
.<
EquipmentSpecificIndex
>
lambdaQuery
()
.
select
(
EquipmentSpecificIndex:
:
getId
,
EquipmentSpecificIndex:
:
getEquipmentIndexKey
,
EquipmentSpecificIndex:
:
getValue
)
.
in
(
EquipmentSpecificIndex:
:
getId
,
ids
)
);
JSONObject
emqMessage
;
for
(
EquipmentSpecificIndex
index
:
indices
)
{
emqMessage
=
new
JSONObject
()
{{
this
.
put
(
"code"
,
String
.
valueOf
(
index
.
getId
()));
this
.
put
(
"value"
,
index
.
getValue
());
this
.
put
(
"status"
,
index
.
getValue
());
}};
TimerTask
timerTask
=
new
TimerTask
()
{
@Override
public
void
run
(
)
{
initialized
=
Boolean
.
TRUE
;
}
};
timer
.
schedule
(
timerTask
,
3000
);
try
{
emqKeeper
.
getMqttClient
().
publish
(
topic
,
JSON
.
toJSONString
(
emqMessage
).
getBytes
(),
1
,
false
);
}
catch
(
MqttException
e
)
{
log
.
error
(
"发布消息失败"
,
e
)
;
}
}
}
}
}
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/MqttReceiveServiceImpl.java
View file @
fc85e8b2
...
...
@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
import
com.alibaba.fastjson.JSONObject
;
import
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
;
import
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper
;
import
com.baomidou.mybatisplus.core.toolkit.Wrappers
;
import
com.yeejoin.amos.boot.biz.common.utils.RedisKey
;
import
com.yeejoin.amos.boot.biz.common.utils.RedisUtils
;
import
com.yeejoin.amos.component.influxdb.InfluxDbConnection
;
...
...
@@ -2686,12 +2687,26 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
}
private
void
pushDataToIntegrationPage
(
List
<
EquipmentSpecificIndex
>
specificIndices
)
{
@Async
public
void
pushDataToIntegrationPage
(
List
<
EquipmentSpecificIndex
>
specificIndices
)
{
for
(
EquipmentSpecificIndex
specificIndex
:
specificIndices
)
{
Long
equipmentSpecificId
=
specificIndex
.
getEquipmentSpecificId
();
List
<
EquipmentSpecificIndex
>
equipmentSpecificIndices
=
equipmentSpecificIndexMapper
.
selectList
(
Wrappers
.<
EquipmentSpecificIndex
>
lambdaQuery
()
.
eq
(
EquipmentSpecificIndex:
:
getEquipmentSpecificId
,
equipmentSpecificId
)
.
isNotNull
(
EquipmentSpecificIndex:
:
getValue
)
.
ne
(
EquipmentSpecificIndex:
:
getValue
,
""
)
);
List
<
HashMap
<
String
,
String
>>
indexKey
=
equipmentSpecificIndices
.
stream
().
map
(
index
->
new
HashMap
<
String
,
String
>()
{{
put
(
"key"
,
index
.
getEquipmentIndexKey
());
put
(
"value"
,
index
.
getValue
());
}}).
collect
(
Collectors
.
toList
());
JSONObject
message
=
new
JSONObject
()
{{
put
(
"code"
,
String
.
valueOf
(
specificIndex
.
getId
()));
put
(
"status"
,
specificIndex
.
getValue
());
put
(
"value"
,
specificIndex
.
getValue
());
put
(
"indexKey"
,
indexKey
);
}};
mqttSendGateway
.
sendToMqtt
(
ConfigPageTopicEnum
.
SYSTEMDETAIL
.
getTopic
(),
message
.
toJSONString
());
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment