Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
7cfab757
Commit
7cfab757
authored
Feb 24, 2022
by
maoying
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
消息处理代码整理
parent
2df4f85f
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
170 additions
and
68 deletions
+170
-68
CarProperty.java
...va/com/yeejoin/equipmanage/common/entity/CarProperty.java
+15
-9
CarForGisEnum.java
...a/com/yeejoin/equipmanage/common/enums/CarForGisEnum.java
+1
-1
EquipmentIndexLabelsEnum.java
...in/equipmanage/common/enums/EquipmentIndexLabelsEnum.java
+1
-1
MqttReceiveServiceImpl.java
...join/equipmanage/service/impl/MqttReceiveServiceImpl.java
+147
-56
CarPropertyMapper.xml
...tem-equip/src/main/resources/mapper/CarPropertyMapper.xml
+6
-1
No files found.
amos-boot-module/amos-boot-module-api/amos-boot-module-equip-api/src/main/java/com/yeejoin/equipmanage/common/entity/CarProperty.java
View file @
7cfab757
...
...
@@ -52,16 +52,8 @@ public class CarProperty extends BaseEntity {
*/
@TableField
(
value
=
"equipment_index_key"
)
private
String
equipmentIndexKey
;
@TableField
(
exist
=
false
)
private
String
unitName
;
@TableField
(
exist
=
false
)
private
String
perfQuotaName
;
@TableField
(
exist
=
false
)
private
String
groupName
;
@TableField
(
exist
=
false
)
private
String
nameKey
;
/**
* 颜色
...
...
@@ -87,4 +79,18 @@ public class CarProperty extends BaseEntity {
@TableField
(
value
=
"emergency_level_describe"
)
private
String
emergencyLevelDescribe
;
@TableField
(
exist
=
false
)
private
String
unitName
;
@TableField
(
exist
=
false
)
private
String
perfQuotaName
;
@TableField
(
exist
=
false
)
private
String
groupName
;
@TableField
(
exist
=
false
)
private
String
nameKey
;
@TableField
(
exist
=
false
)
private
String
iotCode
;
}
amos-boot-module/amos-boot-module-api/amos-boot-module-equip-api/src/main/java/com/yeejoin/equipmanage/common/enums/CarForGisEnum.java
View file @
7cfab757
...
...
@@ -16,7 +16,7 @@ public enum CarForGisEnum {
DL
(
"电量"
,
"FireCar_Power"
),
DDHX
(
"对地航向"
,
"FireCar_CourseOverGround"
),
SJ
(
"时间"
,
"time"
),
QT
(
"启停"
,
"
state
"
);
QT
(
"启停"
,
"
FireCar_Start
"
);
private
String
describe
;
...
...
amos-boot-module/amos-boot-module-api/amos-boot-module-equip-api/src/main/java/com/yeejoin/equipmanage/common/enums/EquipmentIndexLabelsEnum.java
View file @
7cfab757
...
...
@@ -10,7 +10,7 @@ package com.yeejoin.equipmanage.common.enums;
*/
public
enum
EquipmentIndexLabelsEnum
{
labels
(
"
l
abels"
,
"装备指标编码"
);
labels
(
"
FireCar_L
abels"
,
"装备指标编码"
);
public
final
String
name
;
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/MqttReceiveServiceImpl.java
View file @
7cfab757
...
...
@@ -223,7 +223,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
staticMap
.
put
(
"FireCar_Power"
,
"0"
);
staticMap
.
put
(
"FireCar_CourseOverGround"
,
"0"
);
staticMap
.
put
(
"time"
,
System
.
currentTimeMillis
());
staticMap
.
put
(
"
state
"
,
"false"
);
staticMap
.
put
(
"
FireCar_Start
"
,
"false"
);
}
@Override
...
...
@@ -1116,19 +1116,24 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
if
(
ObjectUtils
.
isEmpty
(
indexList
)){
return
;
}
equipRealTimeDate
(
iotDatalist
,
indexList
);
equipRealTimeDate
(
iotDatalist
,
indexList
,
topicEntity
);
}
else
{
List
<
CarProperty
>
carProperties
=
carPropertyService
.
getCarPropListByIotCode
(
iotCode
);
if
(
ObjectUtils
.
isEmpty
(
carProperties
)){
return
;
}
carRealTimeDate
(
iotDatalist
,
carProperties
);
}
}
private
void
equipRealTimeDate
(
List
<
IotDataVO
>
iotDatalist
,
List
<
EquipmentSpecificIndex
>
indexList
){
/**
* 装备实时数据处理
* @param iotDatalist
* @param indexList
* @param topicEntity
*/
private
void
equipRealTimeDate
(
List
<
IotDataVO
>
iotDatalist
,
List
<
EquipmentSpecificIndex
>
indexList
,
TopicEntityVo
topicEntity
){
List
<
EquipmentSpecificIndex
>
equipmentSpecificIndexList
=
new
ArrayList
<>();
List
<
EquipmentSpecificAlarm
>
equipmentSpecificAlarms
=
new
ArrayList
<>();
List
<
IndexStateVo
>
indexStateList
=
new
ArrayList
<>();
...
...
@@ -1171,15 +1176,18 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
});
//向画布推送
publishDataToCanvas
(
equipmentSpecificIndexList
);
// 首页性能指标数据订阅
mqttSendGateway
.
sendToMqtt
(
indexTopic
,
JSON
.
toJSONString
(
indexStateList
));
// 报警数据保存
saveOrUpdateEquipAlarm
(
equipmentSpecificAlarms
);
//组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh
(
equipmentSpecificIndexList
,
topicEntity
);
//数字换流站同步指标修改
syncSpecificIndexsToGS
(
equipmentSpecificIndexList
);
// 报警数据保存
saveOrUpdateEquipAlarm
(
equipmentSpecificAlarms
);
}
...
...
@@ -1205,37 +1213,51 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
});
// 数字换流站数据处理(高斯库同步及南瑞告警推送)
if
(
syncSwitch
&&
!
ObjectUtils
.
isEmpty
(
equipmentAlarmLogs
))
{
List
<
FireEquipmentFireAlarm
>
alarmList
=
createFireEquipmentFireAlarmVo
(
equipmentAlarmLogs
);
if
(!
CollectionUtils
.
isEmpty
(
alarmList
))
{
Map
<
String
,
List
<
FireEquipmentFireAlarm
>>
collect
=
alarmList
.
stream
().
collect
(
Collectors
.
groupingBy
(
FireEquipmentFireAlarm:
:
getType
));
for
(
String
key
:
collect
.
keySet
())
{
List
<
FireEquipmentFireAlarm
>
list
=
collect
.
get
(
key
);
if
(!
CollectionUtils
.
isEmpty
(
list
))
{
if
(
"FIREALARM"
.
equalsIgnoreCase
(
key
))
{
syncDataService
.
syncCreatedFireEquipAlarm
(
list
);
}
else
if
(
"BREAKDOWN"
.
equalsIgnoreCase
(
key
))
{
List
<
FireEquipmentFaultAlarm
>
faultAlarms
=
list
.
stream
().
map
(
x
->
{
FireEquipmentFaultAlarm
fireEquipmentFaultAlarm
=
new
FireEquipmentFaultAlarm
();
BeanUtils
.
copyProperties
(
x
,
fireEquipmentFaultAlarm
);
return
fireEquipmentFaultAlarm
;
}).
collect
(
Collectors
.
toList
());
syncDataService
.
syncCreatedFireEquiptFaultAlarm
(
faultAlarms
);
}
else
if
(
"SHIELD"
.
equalsIgnoreCase
(
key
))
{
List
<
FireEquipmentDefectAlarm
>
defectAlarms
=
list
.
stream
().
map
(
x
->
{
FireEquipmentDefectAlarm
fireEquipmentFaultAlarm
=
new
FireEquipmentDefectAlarm
();
BeanUtils
.
copyProperties
(
x
,
fireEquipmentFaultAlarm
);
return
fireEquipmentFaultAlarm
;
}).
collect
(
Collectors
.
toList
());
syncDataService
.
syncCreatedFireEquipDefectAlarm
(
defectAlarms
);
if
(
ObjectUtils
.
isEmpty
(
equipmentAlarmLogs
)){
return
;
}
TransactionSynchronizationManager
.
registerSynchronization
(
new
TransactionSynchronization
()
{
@Override
public
void
afterCommit
()
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"seqNo"
,
UUID
.
randomUUID
().
toString
().
replace
(
"-"
,
""
).
toLowerCase
());
mqttSendGateway
.
sendToMqtt
(
TopicEnum
.
ALARM_LOG_INSERT
.
getTopic
(),
jsonObject
.
toString
());
// 数字换流站数据处理(高斯库同步及南瑞告警推送)
if
(
syncSwitch
)
{
List
<
FireEquipmentFireAlarm
>
alarmList
=
createFireEquipmentFireAlarmVo
(
equipmentAlarmLogs
);
if
(!
CollectionUtils
.
isEmpty
(
alarmList
))
{
Map
<
String
,
List
<
FireEquipmentFireAlarm
>>
collect
=
alarmList
.
stream
().
collect
(
Collectors
.
groupingBy
(
FireEquipmentFireAlarm:
:
getType
));
for
(
String
key
:
collect
.
keySet
())
{
List
<
FireEquipmentFireAlarm
>
list
=
collect
.
get
(
key
);
if
(!
CollectionUtils
.
isEmpty
(
list
))
{
if
(
"FIREALARM"
.
equalsIgnoreCase
(
key
))
{
syncDataService
.
syncCreatedFireEquipAlarm
(
list
);
}
else
if
(
"BREAKDOWN"
.
equalsIgnoreCase
(
key
))
{
List
<
FireEquipmentFaultAlarm
>
faultAlarms
=
list
.
stream
().
map
(
x
->
{
FireEquipmentFaultAlarm
fireEquipmentFaultAlarm
=
new
FireEquipmentFaultAlarm
();
BeanUtils
.
copyProperties
(
x
,
fireEquipmentFaultAlarm
);
return
fireEquipmentFaultAlarm
;
}).
collect
(
Collectors
.
toList
());
syncDataService
.
syncCreatedFireEquiptFaultAlarm
(
faultAlarms
);
}
else
if
(
"SHIELD"
.
equalsIgnoreCase
(
key
))
{
List
<
FireEquipmentDefectAlarm
>
defectAlarms
=
list
.
stream
().
map
(
x
->
{
FireEquipmentDefectAlarm
fireEquipmentFaultAlarm
=
new
FireEquipmentDefectAlarm
();
BeanUtils
.
copyProperties
(
x
,
fireEquipmentFaultAlarm
);
return
fireEquipmentFaultAlarm
;
}).
collect
(
Collectors
.
toList
());
syncDataService
.
syncCreatedFireEquipDefectAlarm
(
defectAlarms
);
}
}
}
}
// 向南瑞平台推送报警消息
syncDataService
.
syncCreatedSendAlarm
(
equipmentAlarmLogs
);
}
}
syncDataService
.
syncCreatedSendAlarm
(
equipmentAlarmLogs
);
}
});
}
/**
...
...
@@ -1380,25 +1402,21 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
});
}
public
void
send
(){
/**
* 组态大屏消息推送,设备表实时指标修改
* @param equipmentSpecificIndexList
* @param topicEntity
*/
public
void
intePageSysDataRefresh
(
List
<
EquipmentSpecificIndex
>
equipmentSpecificIndexList
,
TopicEntityVo
topicEntity
){
//TODO 数字化换流站组态屏数据推送,需要在事务提交之后,否侧事务隔离查询不出数据
// TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
// @Override
// public void afterCommit() {
// iEquipmentSpecificSerivce.integrationPageSysDataRefresh(topicEntity.getCode());
// iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpecificIndexList);
// //数字换流站站使用
// mqttSendGateway.sendToMqtt(TopicEnum.EQZXDT.getTopic(), "");
// if (!equipmentAlarmLogs.isEmpty() || bool) {
// JSONObject jsonObject = new JSONObject();
// jsonObject.put("seqNo", UUID.randomUUID().toString().replace("-", "").toLowerCase());
// mqttSendGateway.sendToMqtt(TopicEnum.ALARM_LOG_INSERT.getTopic(), jsonObject.toString());
// if (syncSwitch) {
// syncDataService.syncCreatedSendAlarm(equipmentAlarmLogs);
// }
// }
// }
// });
TransactionSynchronizationManager
.
registerSynchronization
(
new
TransactionSynchronization
()
{
@Override
public
void
afterCommit
()
{
iEquipmentSpecificSerivce
.
integrationPageSysDataRefresh
(
topicEntity
.
getCode
());
iEquipmentSpecificSerivce
.
updateEquipmentSpecIndexRealtimeData
(
equipmentSpecificIndexList
);
}
});
}
/**
...
...
@@ -1445,8 +1463,81 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
return
""
;
}
private
void
carRealTimeDate
(){
private
void
carRealTimeDate
(
List
<
IotDataVO
>
iotDatalist
,
List
<
CarProperty
>
carProperties
){
List
<
CarProperty
>
carIndexsList
=
new
ArrayList
<>();
iotDatalist
.
forEach
(
iotDataVO
->{
// 对指标key为labels的数据处理
if
(
EquipmentIndexLabelsEnum
.
labels
.
name
.
equals
(
iotDataVO
.
getKey
()))
{
StringBuilder
sb
=
new
StringBuilder
(
"equipmentOnCar_"
);
EquipmentIndexLabelsVo
labelsVo
=
new
EquipmentIndexLabelsVo
();
Object
obj
=
iotDataVO
.
getValue
();
if
(
obj
instanceof
JSONArray
)
{
List
<
String
>
labelList
=
(
List
<
String
>)
obj
;
labelList
.
forEach
(
code
->
{
String
key
=
sb
.
append
(
code
).
toString
();
labelsVo
.
setEquipmentIotCode
(
code
);
labelsVo
.
setTime
(
new
Date
());
redisUtils
.
set
(
key
,
com
.
alibaba
.
fastjson
.
JSONObject
.
toJSONString
(
labelsVo
),
redisExpireTime
);
});
}
}
List
<
CarPropertyVo
>
carPropertyVos
=
new
ArrayList
<>();
carProperties
.
forEach
(
carProperty
->{
CarProperty
property
=
new
CarProperty
();
if
(
iotDataVO
.
getKey
().
equals
(
carProperty
.
getNameKey
()))
{
BeanUtils
.
copyProperties
(
carProperty
,
property
);
property
.
setValue
(
iotDataVO
.
getValue
().
toString
());
carPropertyVos
.
add
(
carPropertyToCarPropertyVo
(
carProperty
));
carIndexsList
.
add
(
property
);
}
});
boolean
updateBatchById
=
carPropertyService
.
updateBatchById
(
carIndexsList
);
if
(
updateBatchById
){
List
<
CarIndexGisVo
>
list
=
createCarIndexGisVo
(
carIndexsList
);
boolean
flag
=
ifSendToGis
(
list
);
if
(
flag
)
{
mqttSendGateway
.
sendToMqtt
(
carTopic
,
JSON
.
toJSONString
(
list
));
}
if
(
syncSwitch
)
{
syncDataService
.
syncCreatedFireVehicleMeasurement
(
carPropertyVos
);
}
}
});
}
private
CarPropertyVo
carPropertyToCarPropertyVo
(
CarProperty
property
){
CarPropertyVo
carPropertyVo
=
new
CarPropertyVo
();
carPropertyVo
.
setCarId
(
property
.
getCarId
());
carPropertyVo
.
setCreateDate
(
property
.
getCreateDate
());
carPropertyVo
.
setId
(
property
.
getId
());
carPropertyVo
.
setIsIot
(
1
);
carPropertyVo
.
setMRid
(
property
.
getEquipmentIndexId
().
toString
());
carPropertyVo
.
setName
(
property
.
getEquipmentIndexName
());
carPropertyVo
.
setNameKey
(
property
.
getEquipmentIndexKey
());
carPropertyVo
.
setSort
(
1
);
carPropertyVo
.
setUnit
(
property
.
getUnitName
());
carPropertyVo
.
setValue
(
property
.
getValue
());
return
carPropertyVo
;
}
private
List
<
CarIndexGisVo
>
createCarIndexGisVo
(
List
<
CarProperty
>
carProperties
){
List
<
CarIndexGisVo
>
list
=
new
ArrayList
<>();
carProperties
.
forEach
(
action
->{
CarIndexGisVo
v
=
new
CarIndexGisVo
();
v
.
setId
(
action
.
getCarId
());
v
.
setIotCode
(
action
.
getIotCode
());
v
.
setNameKey
(
action
.
getEquipmentIndexKey
());
v
.
setValue
(
action
.
getValue
());
list
.
add
(
v
);
});
return
list
;
}
}
amos-boot-system-equip/src/main/resources/mapper/CarPropertyMapper.xml
View file @
7cfab757
...
...
@@ -12,7 +12,12 @@
wcp.is_alarm as isAlarm,
wcp.emergency_level_color as emergencyLevelColor,
wcp.emergency_level as emergencyLevel,
wcp.emergency_level_describe as emergencyLevelDescribe
wcp.emergency_level_describe as emergencyLevelDescribe,
wc.iot_code as iotCode,
wc.`name` as carName,
wc.car_num as carNum,
wei.unit unitName,
wei.group_name as groupName
FROM
wl_car_property wcp
LEFT JOIN wl_car AS wc ON wc.id = wcp.car_id
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment