Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
ff70a79c
Commit
ff70a79c
authored
Jun 28, 2023
by
tangwei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
新增监听
parent
9ef95dee
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
487 additions
and
0 deletions
+487
-0
EquipExecutorConfig.java
...om/yeejoin/amos/api/alarm/config/EquipExecutorConfig.java
+40
-0
BizInfo.java
...src/main/java/com/yeejoin/amos/api/alarm/dto/BizInfo.java
+36
-0
DynamicDetails.java
...n/java/com/yeejoin/amos/api/alarm/dto/DynamicDetails.java
+22
-0
TabContent.java
.../main/java/com/yeejoin/amos/api/alarm/dto/TabContent.java
+23
-0
WarningDto.java
.../main/java/com/yeejoin/amos/api/alarm/dto/WarningDto.java
+33
-0
BaseEntity.java
...in/java/com/yeejoin/amos/api/alarm/entity/BaseEntity.java
+37
-0
PointSystem.java
...n/java/com/yeejoin/amos/api/alarm/entity/PointSystem.java
+48
-0
PointSystemMapper.java
.../com/yeejoin/amos/api/alarm/mapper/PointSystemMapper.java
+16
-0
IPointSystemService.java
...m/yeejoin/amos/api/alarm/service/IPointSystemService.java
+14
-0
AlarmKafkaConsumer.java
...ejoin/amos/api/alarm/service/impl/AlarmKafkaConsumer.java
+76
-0
PointSystemServiceImpl.java
...n/amos/api/alarm/service/impl/PointSystemServiceImpl.java
+142
-0
HttpContentTypeUtil.java
...com/yeejoin/amos/api/alarm/utils/HttpContentTypeUtil.java
+0
-0
No files found.
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/config/EquipExecutorConfig.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
config
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.scheduling.annotation.EnableAsync
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
;
import
java.util.concurrent.Executor
;
import
java.util.concurrent.ThreadPoolExecutor
;
@Slf4j
@Configuration
@EnableAsync
public
class
EquipExecutorConfig
{
@Bean
(
name
=
"equipAsyncExecutor"
)
public
Executor
asyncServiceExecutor
()
{
ThreadPoolTaskExecutor
executor
=
new
ThreadPoolTaskExecutor
();
//配置核心线程数
executor
.
setCorePoolSize
(
10
);
//配置最大线程数
executor
.
setMaxPoolSize
(
500
);
//配置队列大小
executor
.
setQueueCapacity
(
2000
);
//配置线程池中的线程的名称前缀
executor
.
setThreadNamePrefix
(
"namePrefix"
);
//线程池维护线程所允许的空闲时间
executor
.
setKeepAliveSeconds
(
30
);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行--拒绝策略
executor
.
setRejectedExecutionHandler
(
new
ThreadPoolExecutor
.
CallerRunsPolicy
());
//执行初始化
executor
.
initialize
();
//等待所有任务结束后再关闭线程池
executor
.
setWaitForTasksToCompleteOnShutdown
(
true
);
return
executor
;
}
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/dto/BizInfo.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
dto
;
import
lombok.Data
;
import
java.util.List
;
/**
* @description:
* @author: tw
* @createDate: 2023/6/19
*/
@Data
public
class
BizInfo
{
private
String
sourceAttributionDesc
;
private
String
sourceAttribution
;
private
List
<
DynamicDetails
>
dynamicDetails
;
private
String
warningObjectCode
;
private
String
warningTime
;
private
String
warningObjectName
;
public
BizInfo
(
String
sourceAttributionDesc
,
String
sourceAttribution
,
List
<
DynamicDetails
>
dynamicDetails
,
String
warningObjectCode
,
String
warningTime
,
String
warningObjectName
)
{
this
.
sourceAttributionDesc
=
sourceAttributionDesc
;
this
.
sourceAttribution
=
sourceAttribution
;
this
.
dynamicDetails
=
dynamicDetails
;
this
.
warningObjectCode
=
warningObjectCode
;
this
.
warningTime
=
warningTime
;
this
.
warningObjectName
=
warningObjectName
;
}
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/dto/DynamicDetails.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
dto
;
import
lombok.Data
;
import
java.util.List
;
/**
* @description:
* @author: tw
* @createDate: 2023/6/19
*/
@Data
public
class
DynamicDetails
{
private
String
tabName
;
private
List
<
TabContent
>
tabContent
;
public
DynamicDetails
(
String
tabName
,
List
<
TabContent
>
tabContent
)
{
this
.
tabName
=
tabName
;
this
.
tabContent
=
tabContent
;
}
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/dto/TabContent.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
dto
;
import
lombok.Data
;
/**
* @description:
* @author: tw
* @createDate: 2023/6/19
*/
@Data
public
class
TabContent
{
private
String
label
;
private
String
type
;
private
Object
value
;
private
String
key
;
public
TabContent
(
String
label
,
String
type
,
Object
value
,
String
key
)
{
this
.
label
=
label
;
this
.
type
=
type
;
this
.
value
=
value
;
this
.
key
=
key
;
}
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/dto/WarningDto.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
dto
;
import
lombok.Data
;
import
java.util.List
;
/**
* @description:
* @author: tw
* @createDate: 2023/6/19
*/
@Data
public
class
WarningDto
{
private
BizInfo
bizInfo
;
private
String
indexKey
;
private
String
indexValue
;
private
String
traceId
;
public
WarningDto
(
String
indexKey
,
String
indexValue
,
String
traceId
,
String
sourceAttributionDesc
,
String
sourceAttribution
,
List
<
DynamicDetails
>
dynamicDetails
,
String
warningObjectCode
,
String
warningTime
,
String
warningObjectName
)
{
this
.
bizInfo
=
new
BizInfo
(
sourceAttributionDesc
,
sourceAttribution
,
dynamicDetails
,
warningObjectCode
,
warningTime
,
warningObjectName
);
this
.
indexKey
=
indexKey
;
this
.
indexValue
=
indexValue
;
this
.
traceId
=
traceId
;
}
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/entity/BaseEntity.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
entity
;
import
com.baomidou.mybatisplus.annotation.FieldFill
;
import
com.baomidou.mybatisplus.annotation.IdType
;
import
com.baomidou.mybatisplus.annotation.TableField
;
import
com.baomidou.mybatisplus.annotation.TableId
;
import
com.fasterxml.jackson.databind.annotation.JsonSerialize
;
import
com.fasterxml.jackson.databind.ser.std.ToStringSerializer
;
import
lombok.Data
;
import
lombok.experimental.Accessors
;
import
java.io.Serializable
;
import
java.util.Date
;
/**
* @description: 公共实体
* @author: duanwei
**/
@Data
@Accessors
(
chain
=
true
)
public
class
BaseEntity
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
5464322936854328207L
;
@TableId
(
type
=
IdType
.
ID_WORKER
)
@JsonSerialize
(
using
=
ToStringSerializer
.
class
)
private
Long
id
;
/**
* 新增和更新执行
*/
@TableField
(
value
=
"create_date"
,
fill
=
FieldFill
.
INSERT
)
private
Date
createDate
;
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/entity/PointSystem.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
entity
;
import
com.baomidou.mybatisplus.annotation.TableField
;
import
com.baomidou.mybatisplus.annotation.TableName
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
import
lombok.EqualsAndHashCode
;
import
lombok.experimental.Accessors
;
/**
* @description:
* @author: tw
* @createDate: 2023/6/19
*/
@Data
@EqualsAndHashCode
(
callSuper
=
true
)
@Accessors
(
chain
=
true
)
@TableName
(
"dz_point_system"
)
@ApiModel
(
value
=
"PointSystem对象"
,
description
=
""
)
public
class
PointSystem
extends
BaseEntity
{
@ApiModelProperty
(
value
=
"场站"
)
@TableField
(
"station"
)
private
String
station
;
@ApiModelProperty
(
value
=
"二维码"
)
@TableField
(
"number"
)
private
String
number
;
@ApiModelProperty
(
value
=
"类型"
)
@TableField
(
"type"
)
private
String
type
;
@ApiModelProperty
(
value
=
"'地址'"
)
@TableField
(
"address"
)
private
String
address
;
@ApiModelProperty
(
value
=
"测点类型"
)
@TableField
(
"point_type"
)
private
String
pointType
;
@ApiModelProperty
(
value
=
"测点值"
)
@TableField
(
"value"
)
private
String
value
;
@ApiModelProperty
(
value
=
"功能码"
)
@TableField
(
"function_num"
)
private
String
functionNum
;
@ApiModelProperty
(
value
=
"kks码"
)
@TableField
(
"kks"
)
private
String
kks
;
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/mapper/PointSystemMapper.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
mapper
;
import
com.baomidou.mybatisplus.core.mapper.BaseMapper
;
import
com.yeejoin.amos.api.alarm.entity.PointSystem
;
/**
* @description:
* @author: tw
* @createDate: 2023/6/19
*/
public
interface
PointSystemMapper
extends
BaseMapper
<
PointSystem
>
{
//推送预警
public
void
sendWarning
();
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/service/IPointSystemService.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
service
;
/**
* @description:
* @author: tw
* @createDate: 2023/6/19
*/
public
interface
IPointSystemService
{
//触发风险预警
public
void
sendWarning
(
String
address
,
String
value
,
String
valueLabe
);
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/service/impl/AlarmKafkaConsumer.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
service
.
impl
;
import
org.apache.kafka.clients.consumer.ConsumerRecord
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
org.springframework.kafka.annotation.TopicPartition
;
import
org.springframework.kafka.support.Acknowledgment
;
import
org.springframework.stereotype.Service
;
/**
* @description: 监听设备告警信息
* @author: tw
* @createDate: 2023/6/27
*/
@Service
public
class
AlarmKafkaConsumer
{
@Autowired
PointSystemServiceImpl
pointSystemServiceImpl
;
//消费者来处理消息
@KafkaListener
(
id
=
"user1"
,
topics
={
"${kafka.equipment.alarm}"
})
public
void
message1
(
String
record
,
Acknowledgment
ack
){
// 处理业务
String
date
=
record
;
//异步触发预警
pointSystemServiceImpl
.
sendWarningAsync
(
date
);
//手动提交
ack
.
acknowledge
();
}
// public void message1( ConsumerRecord<?, ?> record, Acknowledgment ack){
// // 消费的哪个topic、partition的消息,打印出消息内容
//
// StringBuffer sb = new StringBuffer();
// // 主题
// sb.append(record.topic() + "-");
// // 分区
// sb.append(record.partition() + "-");
// // 需要消费的值
// sb.append(record.value() + "-");
// // 位移
// sb.append(record.offset());
//
// System.out.println( "消费者进行消费:"+ sb);
// ack.acknowledge();
//
// }
// // 简单消费者,groupId可以任意起
// @KafkaListener(id = "Consumer0", groupId = "jf0-group", topics = "jf1", topicPartitions = {
// @TopicPartition(topic = "jf1", partitions = {"0"}),
// }, containerFactory = "kafkaListenerContainerFactory")
// public void consumer0(ConsumerRecord<String, String> records, Acknowledgment ack) {
// this.message1(records,ack);
// }
//
// @KafkaListener(id = "Consumer1", groupId = "jf1-group", topics = "jf1", topicPartitions = {
// @TopicPartition(topic = "jf1", partitions = {"1"}),
// }, containerFactory = "kafkaListenerContainerFactory")
// public void consumer1(ConsumerRecord<String, String> records, Acknowledgment ack) {
// this.message1(records,ack);
// }
//
// @KafkaListener(id = "Consumer2", groupId = "jf2-group", topics = "jf1", topicPartitions = {
// @TopicPartition(topic = "jf1", partitions = {"2"}),
// }, containerFactory = "kafkaListenerContainerFactory")
// public void consumer3(ConsumerRecord<String, String> records, Acknowledgment ack) {
// this.message1(records,ack);
// }
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/service/impl/PointSystemServiceImpl.java
0 → 100644
View file @
ff70a79c
package
com
.
yeejoin
.
amos
.
api
.
alarm
.
service
.
impl
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper
;
import
com.baomidou.mybatisplus.extension.service.impl.ServiceImpl
;
import
com.yeejoin.amos.api.alarm.dto.DynamicDetails
;
import
com.yeejoin.amos.api.alarm.dto.TabContent
;
import
com.yeejoin.amos.api.alarm.dto.WarningDto
;
import
com.yeejoin.amos.api.alarm.entity.PointSystem
;
import
com.yeejoin.amos.api.alarm.mapper.PointSystemMapper
;
import
com.yeejoin.amos.api.alarm.service.IPointSystemService
;
import
com.yeejoin.amos.api.alarm.utils.HttpContentTypeUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.scheduling.annotation.Async
;
import
org.springframework.stereotype.Service
;
import
org.typroject.tyboot.component.emq.EmqKeeper
;
import
java.text.SimpleDateFormat
;
import
java.util.*
;
/**
* @description:
* @author: tw
* @createDate: 2023/6/19
*/
@Service
public
class
PointSystemServiceImpl
extends
ServiceImpl
<
PointSystemMapper
,
PointSystem
>
implements
IPointSystemService
{
private
static
final
Logger
logger
=
LogManager
.
getLogger
(
PointSystemServiceImpl
.
class
);
@Autowired
PointSystemMapper
pointSystemMapper
;
@Value
(
"${power.station.url}"
)
private
String
powerStationUrl
;
private
final
String
TABNAME
=
"预警问题"
;
private
final
String
TEXT
=
"text"
;
@Value
(
"${power.station.warning:104/data/analysis}"
)
private
String
STATIONWARNING
;
@Autowired
protected
EmqKeeper
emqKeeper
;
@Async
(
"equipAsyncExecutor"
)
public
void
sendWarningAsync
(
String
date
){
try
{
logger
.
info
(
"收到告警信息"
+
date
);
com
.
alibaba
.
fastjson
.
JSONObject
messageObj
=
JSON
.
parseObject
(
date
);
String
address
=
messageObj
.
get
(
"address"
).
toString
();
String
value
=
messageObj
.
get
(
"value"
).
toString
();
String
valueLabe
=
messageObj
.
get
(
"valueLabel"
).
toString
();
this
.
sendWarning
(
address
,
value
,
valueLabe
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
@Override
public
void
sendWarning
(
String
address
,
String
value
,
String
valueLabe
)
{
try
{
//通过测点地址获取,和对应值 获取kks
QueryWrapper
<
PointSystem
>
pointSystemWrapper
=
new
QueryWrapper
<>();
pointSystemWrapper
.
lambda
().
eq
(
PointSystem:
:
getAddress
,
address
);
pointSystemWrapper
.
lambda
().
eq
(
PointSystem:
:
getValue
,
value
);
PointSystem
pointSystem
=
pointSystemMapper
.
selectOne
(
pointSystemWrapper
);
if
(
pointSystem
==
null
)
{
throw
new
RuntimeException
(
"获取kks码失败!"
);
}
//调用获取设备相关信息
Map
<
String
,
String
>
maps
=
new
HashMap
<>();
maps
.
put
(
"type"
,
"equipinfo"
);
maps
.
put
(
"kksbm"
,
pointSystem
.
getKks
());
String
data
=
HttpContentTypeUtil
.
sendHttpPost
(
powerStationUrl
,
maps
);
if
(
StringUtils
.
isEmpty
(
data
)
||
!(
Boolean
)
JSON
.
parseObject
(
data
).
get
(
"success"
))
{
throw
new
RuntimeException
(
"获取设备信息失败!"
);
}
JSONObject
json
=
JSON
.
parseObject
(
data
);
JSONObject
jsond
=
(
JSONObject
)
json
.
get
(
"dataset"
);
JSONArray
list
=
(
JSONArray
)
jsond
.
get
(
"datas"
);
JSONObject
eqdata
=
null
;
if
(
list
==
null
||
list
.
isEmpty
())
{
throw
new
RuntimeException
(
"获取设备信息失败!"
);
}
eqdata
=
(
JSONObject
)
list
.
get
(
0
);
//组装数据,发送预警
WarningDto
warningDto
=
setWarningDto
(
pointSystem
,
eqdata
,
valueLabe
);
emqKeeper
.
getMqttClient
().
publish
(
STATIONWARNING
,
JSON
.
toJSONString
(
warningDto
).
getBytes
(),
0
,
false
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
throw
new
RuntimeException
(
"预警消息发送失败!"
);
}
}
public
WarningDto
setWarningDto
(
PointSystem
pointSystem
,
JSONObject
eqdata
,
String
valueLabe
){
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
String
time
=
sdf
.
format
(
new
Date
());
String
warningObjectCode
=
pointSystem
.
getKks
();
List
<
TabContent
>
tabContent
=
new
ArrayList
<>();
tabContent
.
add
(
new
TabContent
(
"KKS编码"
,
TEXT
,
warningObjectCode
,
"key1"
));
tabContent
.
add
(
new
TabContent
(
"设备名称"
,
TEXT
,
eqdata
.
get
(
"kksms"
),
"key2"
));
tabContent
.
add
(
new
TabContent
(
"告警原因"
,
TEXT
,
valueLabe
,
"key3"
));
tabContent
.
add
(
new
TabContent
(
"发生时间"
,
TEXT
,
time
,
"key4"
));
DynamicDetails
dynamicDetails
=
new
DynamicDetails
(
TABNAME
,
tabContent
);
List
<
DynamicDetails
>
dynamicDetailsList
=
new
ArrayList
<>();
dynamicDetailsList
.
add
(
dynamicDetails
);
StringBuilder
indexKey
=
new
StringBuilder
(
pointSystem
.
getStation
())
.
append
(
"#"
)
.
append
(
pointSystem
.
getNumber
())
.
append
(
"#"
)
.
append
(
pointSystem
.
getFunctionNum
());
String
indexValue
=
valueLabe
;
WarningDto
WarningDto
=
new
WarningDto
(
indexKey
.
toString
(),
indexValue
,
null
,
(
String
)
eqdata
.
get
(
"sourceAttributionDesc"
),
(
String
)
eqdata
.
get
(
"sourceAttribution"
),
dynamicDetailsList
,
warningObjectCode
,
time
,
(
String
)
eqdata
.
get
(
"kksms"
)
);
return
WarningDto
;
}
}
amos-boot-data/amos-boot-data-alarm/src/main/java/com/yeejoin/amos/api/alarm/utils/HttpContentTypeUtil.java
0 → 100644
View file @
ff70a79c
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment