Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
2b98f3cb
Commit
2b98f3cb
authored
Nov 21, 2024
by
李秀明
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/develop_dl_bugfix_0723' into develop_dl_bugfix_0723
parents
ca7e9143
fbc846bd
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
121 additions
and
81 deletions
+121
-81
EquipmentIotMqttReceiveConfig.java
...oin/equipmanage/config/EquipmentIotMqttReceiveConfig.java
+7
-0
MqttWaringReceiveService.java
...yeejoin/equipmanage/service/MqttWaringReceiveService.java
+12
-0
MqttReceiveServiceImpl.java
...join/equipmanage/service/impl/MqttReceiveServiceImpl.java
+4
-81
MqttWarningReceiveServiceImpl.java
...uipmanage/service/impl/MqttWarningReceiveServiceImpl.java
+98
-0
No files found.
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/config/EquipmentIotMqttReceiveConfig.java
View file @
2b98f3cb
...
...
@@ -7,6 +7,7 @@ import com.yeejoin.equipmanage.mapper.EquipmentSpecificMapper;
import
com.yeejoin.equipmanage.service.ISyncDataService
;
import
com.yeejoin.equipmanage.service.MqttEventReceiveService
;
import
com.yeejoin.equipmanage.service.MqttReceiveService
;
import
com.yeejoin.equipmanage.service.MqttWaringReceiveService
;
import
org.eclipse.paho.client.mqttv3.MqttConnectOptions
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
...
...
@@ -69,6 +70,9 @@ public class EquipmentIotMqttReceiveConfig {
private
MqttEventReceiveService
mqttEventReceiveService
;
@Autowired
private
MqttWaringReceiveService
mqttWaringReceiveService
;
private
ISyncDataService
iSyncDataService
;
...
...
@@ -136,6 +140,7 @@ public class EquipmentIotMqttReceiveConfig {
list
.
add
(
riskMsgCenterEquipTopic
);
list
.
add
(
riskMsgCenterPatrolTopic
);
list
.
add
(
"+/+/eventAlarm"
);
// 添加换流站韶山监听事件 --- shaoshan 修改为 eventAlarm:事件告警 - 统一
list
.
add
(
"equip/threshold/warning"
);
// 触发预警相关 - 【站储水量不大于4000、泡沫罐和管网压力等阈值信息、稳压泵信息】- 中心系统使用
String
[]
arr
=
list
.
toArray
(
new
String
[
list
.
size
()]);
adapter
=
new
MqttPahoMessageDrivenChannelAdapter
(
clientId
+
"_inbound"
,
mqttPahoClientFactory
(),
arr
);
adapter
.
setCompletionTimeout
(
completionTimeout
);
...
...
@@ -163,6 +168,8 @@ public class EquipmentIotMqttReceiveConfig {
mqttReceiveService
.
handlerMqttRomaMessage
(
topic
,
msg
);
}
else
if
(
dataType
.
equals
(
"eventAlarm"
)
&&
StringUtil
.
isNotEmpty
(
msg
)){
// 告警信号
mqttReceiveService
.
handlerMqttStationMessage
(
topic
,
msg
);
}
else
if
(
dataType
.
equals
(
"warning"
)
&&
StringUtil
.
isNotEmpty
(
msg
))
{
mqttWaringReceiveService
.
handlerMqttMessage
(
topic
,
msg
);
}
}
};
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/MqttWaringReceiveService.java
0 → 100644
View file @
2b98f3cb
package
com
.
yeejoin
.
equipmanage
.
service
;
public
interface
MqttWaringReceiveService
{
/**
* 处理稳压泵、水池、泡沫罐、管网压力等阈值相关数据
*
* @param topic 主题
* @param message 消息内容
*/
void
handlerMqttMessage
(
String
topic
,
String
message
);
}
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/MqttReceiveServiceImpl.java
View file @
2b98f3cb
...
...
@@ -1380,9 +1380,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
* @param iotDataVO iotDataVO
* @param equipmentSpecificIndex equipmentSpecificIndex
*/
p
rivate
boolean
doFoamTankLevel
(
IotDataVO
iotDataVO
,
EquipmentSpecificIndex
equipmentSpecificIndex
,
Map
<
String
,
String
>
messageBody
)
{
p
ublic
boolean
doFoamTankLevel
(
IotDataVO
iotDataVO
,
EquipmentSpecificIndex
equipmentSpecificIndex
,
Map
<
String
,
String
>
messageBody
)
{
boolean
alarmFlag
=
false
;
MessageModel
model
=
new
MessageModel
();
Map
<
String
,
Object
>
map
=
new
HashMap
<>();
String
indexKey
=
""
;
if
(
iotDataVO
.
getKey
().
equalsIgnoreCase
(
CAFS_FoamTank_FoamTankLevel
)
||
iotDataVO
.
getKey
().
equalsIgnoreCase
(
CAFS_WaterTank_WaterTankLevel
))
{
...
...
@@ -1400,9 +1399,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
if
(!
ObjectUtils
.
isEmpty
(
checkValue
))
{
nowValue
=
checkValue
;
}
//预警业务 泡沫罐 或 者管网压力
HashMap
<
String
,
String
>
extra
=
new
HashMap
<>();
extra
.
put
(
"useSource"
,
"center"
);
extra
.
put
(
"codingSystem"
,
"center"
);
...
...
@@ -1426,49 +1423,10 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
extra
,
"cafsWaterLevelOver"
,
tableContentVos
);
if
(
nowValue
.
compareTo
(
minValue
)
<
0
||
nowValue
.
compareTo
(
maxValue
)
>
0
)
{
String
body
=
""
;
if
(
nowValue
.
compareTo
(
minValue
)
<
0
)
{
body
=
"当前数值 "
+
nowValue
+
" 低于最低报警阈值 "
+
minValue
;
}
else
if
(
nowValue
.
compareTo
(
maxValue
)
>
0
)
{
body
=
"当前数值 "
+
nowValue
+
" 超过最高报警阈值 "
+
maxValue
;
}
messageBody
.
put
(
"messageBody"
,
equipmentSpecificIndex
.
getEquipmentSpecificName
()
+
"-"
+
body
);
String
bodyMain
=
String
.
format
(
"%s,- 当前数值%s,%s ,请及时查看处理。"
,
equipmentSpecificIndex
.
getEquipmentSpecificName
()
+
"-"
+
equipmentSpecificIndex
.
getLocation
(),
nowValue
,
nowValue
.
compareTo
(
minValue
)
<
0
?
"低于最低报警阈值"
+
minValue
:
"超过最高报警阈值"
+
maxValue
);
model
.
setTitle
(
"模拟量提醒"
);
model
.
setBody
(
bodyMain
);
model
.
setMsgType
(
"FoamTankOrPipeNetwork"
);
model
.
setSendTime
(
new
Date
());
model
.
setIsSendWeb
(
true
);
model
.
setCategory
(
1
);
model
.
setRelationId
(
equipmentSpecificIndex
.
getEquipmentSpecificId
().
toString
());
model
.
setIsSendApp
(
false
);
model
.
setTerminal
(
"WEB"
);
model
.
setRecivers
(
Collections
.
singletonList
(
"system"
));
Map
<
String
,
String
>
ext
=
new
HashMap
<>();
ext
.
put
(
"content"
,
body
);
ext
.
put
(
"type"
,
"模拟量超阈值提醒"
);
ext
.
put
(
"name"
,
equipmentSpecificIndex
.
getEquipmentSpecificName
());
ext
.
put
(
"time"
,
new
SimpleDateFormat
(
DateUtils
.
DATE_TIME_PATTERN
).
format
(
new
Date
()));
model
.
setExtras
(
ext
);
// try {
// Token token = remoteSecurityService.getServerToken();
// systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
// } catch (Exception e) {
// log.error("调用平台出错!");
// }
// log.info(String.format("调用平台消息服务成功:%s", JSON.toJSONString(model)));
alarmFlag
=
true
;
}
return
alarmFlag
;
}
p
rivate
void
doWaterStationWarning
(
String
bizOrgCode
,
String
bizOrgName
)
{
p
ublic
void
doWaterStationWarning
(
String
bizOrgCode
,
String
bizOrgName
)
{
List
<
Map
<
String
,
Object
>>
result
=
poolStatisticController
.
getStationWaterPanelInfo
(
bizOrgCode
).
getResult
();
String
indexValue
=
result
.
stream
()
.
map
(
map
->
(
BigDecimal
)
map
.
getOrDefault
(
"volumeBigDecimal"
,
new
BigDecimal
(
0
)))
...
...
@@ -1520,7 +1478,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
}
p
rivate
void
doPressurePumInfo
(
TopicEntityVo
topicEntity
,
EquipmentSpecificIndex
equipmentSpecificIndex
)
{
p
ublic
void
doPressurePumInfo
(
TopicEntityVo
topicEntity
,
EquipmentSpecificIndex
equipmentSpecificIndex
)
{
// 查询iot该稳压泵的启停次数 一个小时内
String
endDate
=
DateUtil
.
format
(
new
Date
(),
DatePattern
.
NORM_DATETIME_PATTERN
);
String
startDate
=
DateUtil
.
format
(
DateUtil
.
offsetHour
(
new
Date
(),
-
1
),
DatePattern
.
NORM_DATETIME_PATTERN
);
...
...
@@ -1585,7 +1543,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
* @param iotDataVO iotDataVO
* @param equipmentSpecificIndex equipmentSpecificIndex
*/
p
rivate
boolean
doWaterPoolLevel
(
IotDataVO
iotDataVO
,
EquipmentSpecificIndex
equipmentSpecificIndex
,
Map
<
String
,
String
>
messageBody
)
{
p
ublic
boolean
doWaterPoolLevel
(
IotDataVO
iotDataVO
,
EquipmentSpecificIndex
equipmentSpecificIndex
,
Map
<
String
,
String
>
messageBody
)
{
// 权限处理
PermissionInterceptorContext
.
setDataAuthRule
(
authKeyEnable
);
boolean
alarmFlag
=
false
;
...
...
@@ -1640,41 +1598,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
"waterLevelOver"
,
tableContentVos
);
}
if
(
nowValue
!=
null
&&(
nowValue
.
compareTo
(
minValue
)
<
0
||
nowValue
.
compareTo
(
maxValue
)
>
0
))
{
String
body
=
""
;
if
(
nowValue
.
compareTo
(
minValue
)
<
0
)
{
body
=
"当前数值 "
+
nowValue
+
" 低于最低报警阈值 "
+
minValue
;
}
else
if
(
nowValue
.
compareTo
(
maxValue
)
>
0
)
{
body
=
"当前数值 "
+
nowValue
+
" 超过最高报警阈值 "
+
maxValue
;
}
String
bodyMain
=
String
.
format
(
"%s,- 当前数值%s,%s ,请及时查看处理。"
,
map
.
get
(
"name"
),
nowValue
,
nowValue
.
compareTo
(
minValue
)
<
0
?
"低于最低报警阈值"
+
minValue
:
"超过最高报警阈值"
+
maxValue
);
model
.
setTitle
(
"模拟量提醒"
);
model
.
setBody
(
bodyMain
);
model
.
setMsgType
(
"WaterPoolKey"
);
model
.
setSendTime
(
new
Date
());
model
.
setIsSendWeb
(
true
);
model
.
setCategory
(
1
);
model
.
setRelationId
(
map
.
get
(
"id"
).
toString
());
model
.
setIsSendApp
(
false
);
model
.
setTerminal
(
"WEB"
);
model
.
setRecivers
(
Collections
.
singletonList
(
"system"
));
Map
<
String
,
String
>
ext
=
new
HashMap
<>();
ext
.
put
(
"content"
,
body
);
ext
.
put
(
"type"
,
"模拟量超阈值提醒"
);
ext
.
put
(
"name"
,
(
String
)
map
.
get
(
"name"
));
ext
.
put
(
"time"
,
new
SimpleDateFormat
(
DateUtils
.
DATE_TIME_PATTERN
).
format
(
new
Date
()));
model
.
setExtras
(
ext
);
// Token token = remoteSecurityService.getServerToken();
// systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
// log.info(String.format("调用平台消息服务成功:%s", JSON.toJSONString(model)));
alarmFlag
=
true
;
messageBody
.
put
(
"messageBody"
,
map
.
get
(
"name"
)
+
"-"
+
body
);
}
}
return
alarmFlag
;
}
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/MqttWarningReceiveServiceImpl.java
0 → 100644
View file @
2b98f3cb
package
com
.
yeejoin
.
equipmanage
.
service
.
impl
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex
;
import
com.yeejoin.equipmanage.common.vo.IotDataVO
;
import
com.yeejoin.equipmanage.common.vo.TopicEntityVo
;
import
com.yeejoin.equipmanage.service.MqttWaringReceiveService
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
import
java.util.Map
;
@Slf4j
@Service
public
class
MqttWarningReceiveServiceImpl
implements
MqttWaringReceiveService
{
/**
* 泡沫罐KEY
*/
private
final
static
String
CAFS_FoamTank_FoamTankLevel
=
"CAFS_FoamTank_FoamTankLevel"
;
/**
* 泡沫罐KEY
*/
private
final
static
String
FHS_PipePressureDetector_PipePressure
=
"FHS_PipePressureDetector_PipePressure"
;
/**
* 水池信息
*/
private
final
static
String
FHS_FirePoolDevice_WaterLevel
=
"FHS_FirePoolDevice_WaterLevel"
;
private
final
static
String
FHS_LevelDetector_WaterLevel
=
"FHS_LevelDetector_WaterLevel"
;
/**
* 水池信息
*/
private
final
static
String
FHS_WirelessliquidDetector_WaterLevel
=
"FHS_WirelessliquidDetector_WaterLevel"
;
/**
* 水箱液位
*/
private
final
static
String
CAFS_WaterTank_WaterTankLevel
=
"CAFS_WaterTank_WaterTankLevel"
;
private
final
static
String
FHS_PressurePump_Start
=
"FHS_PressurePump_Start"
;
private
static
final
String
REALTIME_IOT_INDEX_KEY
=
"realtime_iot_index_key"
;
private
static
final
String
REALTIME_IOT_INDEX_VALUE
=
"realtime_iot_index_value"
;
@Autowired
MqttReceiveServiceImpl
mqttReceiveService
;
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
void
handlerMqttMessage
(
String
topic
,
String
message
)
{
try
{
JSONObject
messageObj
=
JSON
.
parseObject
(
message
);
IotDataVO
iotDataVO
=
new
IotDataVO
();
iotDataVO
.
setKey
(
messageObj
.
get
(
REALTIME_IOT_INDEX_KEY
).
toString
());
iotDataVO
.
setValue
(
messageObj
.
get
(
REALTIME_IOT_INDEX_VALUE
).
toString
());
EquipmentSpecificIndex
equipmentSpeIndex
=
new
EquipmentSpecificIndex
();
equipmentSpeIndex
.
setEquipmentSpecificId
(
Long
.
valueOf
(
messageObj
.
get
(
"id"
).
toString
()));
equipmentSpeIndex
.
setEquipmentSpecificName
(
messageObj
.
get
(
"name"
).
toString
());
equipmentSpeIndex
.
setEquipmentSpecificCode
(
messageObj
.
get
(
"code"
).
toString
());
equipmentSpeIndex
.
setLocation
(
messageObj
.
get
(
"position"
).
toString
());
equipmentSpeIndex
.
setBizOrgCode
(
messageObj
.
get
(
"biz_org_code"
).
toString
());
equipmentSpeIndex
.
setBizOrgName
(
messageObj
.
get
(
"biz_org_name"
).
toString
());
equipmentSpeIndex
.
setIotCode
(
messageObj
.
get
(
"iot_code"
).
toString
());
isAlarmFlag
(
equipmentSpeIndex
,
iotDataVO
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
private
void
isAlarmFlag
(
EquipmentSpecificIndex
equipmentSpeIndex
,
IotDataVO
iotDataVO
)
{
//管网压力、泡沫罐信息、水箱液位告警处理
if
(
iotDataVO
.
getKey
().
equalsIgnoreCase
(
CAFS_FoamTank_FoamTankLevel
)
||
FHS_PipePressureDetector_PipePressure
.
equalsIgnoreCase
(
iotDataVO
.
getKey
())
||
iotDataVO
.
getKey
().
equalsIgnoreCase
(
CAFS_WaterTank_WaterTankLevel
))
{
mqttReceiveService
.
doFoamTankLevel
(
iotDataVO
,
equipmentSpeIndex
,
null
);
}
//消防水池液位处理
if
(
iotDataVO
.
getKey
().
equalsIgnoreCase
(
FHS_FirePoolDevice_WaterLevel
)
||
iotDataVO
.
getKey
().
equalsIgnoreCase
(
FHS_WirelessliquidDetector_WaterLevel
)
||
iotDataVO
.
getKey
().
equalsIgnoreCase
(
FHS_LevelDetector_WaterLevel
))
{
mqttReceiveService
.
doWaterPoolLevel
(
iotDataVO
,
equipmentSpeIndex
,
null
);
// 处理每站消防储水量不少于4000m³ 预警问题
mqttReceiveService
.
doWaterStationWarning
(
equipmentSpeIndex
.
getBizOrgCode
(),
equipmentSpeIndex
.
getBizOrgName
());
}
//稳压泵启动次数大于15次触发预警
if
(
iotDataVO
.
getKey
().
equalsIgnoreCase
(
FHS_PressurePump_Start
)
&&
"true"
.
equals
(
iotDataVO
.
getValue
().
toString
()))
{
TopicEntityVo
topicEntity
=
new
TopicEntityVo
();
topicEntity
.
setIotCode
(
equipmentSpeIndex
.
getIotCode
());
mqttReceiveService
.
doPressurePumInfo
(
topicEntity
,
equipmentSpeIndex
);
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment