Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-bus
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
高建强
amos-boot-bus
Commits
97b07924
Commit
97b07924
authored
Jun 10, 2022
by
KeYong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
解决冲突
parent
fabb6e9c
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
163 additions
and
183 deletions
+163
-183
DataSyncTypeEnum.java
...ain/java/com/boot/bus/sqlsync/enums/DataSyncTypeEnum.java
+29
-22
ParWorker.java
...n/java/com/boot/bus/sqlsync/async/parallel/ParWorker.java
+4
-4
MqttReceiveConfig.java
...ain/java/com/boot/bus/sqlsync/emqx/MqttReceiveConfig.java
+0
-4
MqttReceiveServiceImpl.java
...boot/bus/sqlsync/service/impl/MqttReceiveServiceImpl.java
+130
-0
SyncMqttMessageService.java
...boot/bus/sqlsync/service/impl/SyncMqttMessageService.java
+0
-153
No files found.
amos-boot-module/amos-boot-module-api/amos-boot-module-sqlsync-api/src/main/java/com/boot/bus/sqlsync/enums/DataSyncTypeEnum.java
View file @
97b07924
package
com
.
boot
.
bus
.
sqlsync
.
enums
;
import
lombok.AllArgsConstructor
;
import
lombok.Getter
;
import
lombok.NoArgsConstructor
;
/**
* <h1>同步数据类型</h1>
*
* @Author SingleTian
* @Date 2021-04-01 09:20
* @author keyong
* @title: DataSyncTypeEnum
* <pre>
* @description: TODO
* </pre>
* @date 2022/6/10 14:08
*/
@Getter
@AllArgsConstructor
@NoArgsConstructor
public
enum
DataSyncTypeEnum
{
// 站端相关枚举
CONTINGENCY_PLAN_DETAIL
(
"11"
,
"cs/v1/fireASF/dataSync"
),
CONTINGENCY_PLAN_OPERATION_RECORD
(
"11"
,
"cs/v1/fireASF/dataSync"
),
CONTINGENCY_ORIGINAL_DATA
(
"11"
,
"cs/v1/fireASF/dataSync"
),
CONTINGENCY_PLAN_INSTANCE
(
"11"
,
"cs/v1/fireASF/dataSync"
);
CONTINGENCY_PLAN_INSTANCE
(
"11"
,
"cs/v1/fireASF/dataSync"
),
// 巡检相关枚举
PATROL_CHECK
(
"11"
,
"cs/v1/fireASF/dataSync"
),
PATROL_CHECK_INPUT
(
"11"
,
"cs/v1/fireASF/dataSync"
),
PATROL_CHECK_SHOT
(
"11"
,
"cs/v1/fireASF/dataSync"
),
PATROL_LATENT_DANGER
(
"11"
,
"cs/v1/fireASF/dataSync"
),
PATROL_LATENT_DANGER_FLOW_RECORD
(
"11"
,
"cs/v1/fireASF/dataSync"
),
PATROL_LATENT_DANGER_PATROL
(
"11"
,
"cs/v1/fireASF/dataSync"
),
PATROL_LATENT_DANGER_PHOTO
(
"11"
,
"cs/v1/fireASF/dataSync"
),
PATROL_PLAN_TASK
(
"11"
,
"cs/v1/fireASF/dataSync"
),
PATROL_PLAN_TASK_DETAIL
(
"11"
,
"cs/v1/fireASF/dataSync"
);
/**
* 资源类型编码
*/
private
final
String
sourceCode
;
private
String
sourceCode
;
/**
* mqtt主题
*/
private
final
String
mqTopic
;
DataSyncTypeEnum
(
String
sourceCode
,
String
mqTopic
)
{
this
.
sourceCode
=
sourceCode
;
this
.
mqTopic
=
mqTopic
;
}
public
String
getSourceCode
()
{
return
sourceCode
;
}
public
String
getMqTopic
()
{
return
mqTopic
;
}
private
String
mqTopic
;
}
amos-boot-module/amos-boot-module-bus/amos-boot-module-sqlsync-bus/src/main/java/com/boot/bus/sqlsync/async/parallel/ParWorker.java
View file @
97b07924
...
...
@@ -7,7 +7,7 @@ import com.boot.bus.sqlsync.async.executor.timer.SystemClock;
import
com.boot.bus.sqlsync.async.worker.WorkResult
;
import
com.boot.bus.sqlsync.async.wrapper.WorkerWrapper
;
import
com.boot.bus.sqlsync.message.DataSyncMessage
;
import
com.boot.bus.sqlsync.service.i
mpl.SyncMqttMessag
eService
;
import
com.boot.bus.sqlsync.service.i
nfc.MqttReceiv
eService
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
...
...
@@ -20,17 +20,17 @@ import java.util.Map;
public
class
ParWorker
implements
IWorker
<
DataSyncMessage
,
String
>,
ICallback
<
DataSyncMessage
,
String
>
{
// @Autowired
private
static
SyncMqttMessag
eService
mqttMessageService
;
private
static
MqttReceiv
eService
mqttMessageService
;
@Autowired
public
void
setDatastore
(
SyncMqttMessag
eService
mqttMessageService
)
{
public
void
setDatastore
(
MqttReceiv
eService
mqttMessageService
)
{
ParWorker
.
mqttMessageService
=
mqttMessageService
;
}
@Override
public
String
action
(
DataSyncMessage
object
,
Map
<
String
,
WorkerWrapper
>
allWrappers
)
{
try
{
mqttMessageService
.
syncData
(
object
);
// mqttMessageService.handlerMqttIncrementMessage
(object);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
...
...
amos-boot-module/amos-boot-module-bus/amos-boot-module-sqlsync-bus/src/main/java/com/boot/bus/sqlsync/emqx/MqttReceiveConfig.java
View file @
97b07924
package
com
.
boot
.
bus
.
sqlsync
.
emqx
;
import
com.boot.bus.sqlsync.service.impl.SyncMqttMessageService
;
import
com.boot.bus.sqlsync.service.infc.MqttReceiveService
;
import
org.eclipse.paho.client.mqttv3.MqttConnectOptions
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -52,9 +51,6 @@ public class MqttReceiveConfig {
@Autowired
MqttReceiveService
mqttReceiveService
;
@Autowired
private
SyncMqttMessageService
syncMqttMessageService
;
// 全局变量adapter
public
MqttPahoMessageDrivenChannelAdapter
adapter
;
...
...
amos-boot-module/amos-boot-module-bus/amos-boot-module-sqlsync-bus/src/main/java/com/boot/bus/sqlsync/service/impl/MqttReceiveServiceImpl.java
View file @
97b07924
package
com
.
boot
.
bus
.
sqlsync
.
service
.
impl
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.boot.bus.sqlsync.enums.DataSyncOperationEnum
;
import
com.boot.bus.sqlsync.enums.DataSyncTypeEnum
;
import
com.boot.bus.sqlsync.message.DataSyncMessage
;
import
com.boot.bus.sqlsync.service.infc.IContingencyPlanInstance
;
import
com.boot.bus.sqlsync.service.infc.MqttReceiveService
;
import
com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.stream.Collectors
;
/**
* @author keyong
...
...
@@ -22,6 +29,8 @@ import java.util.List;
@Slf4j
@Service
public
class
MqttReceiveServiceImpl
implements
MqttReceiveService
{
@Autowired
private
IContingencyPlanInstance
contingencyPlanInstance
;
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
...
...
@@ -29,6 +38,127 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
log
.
info
(
String
.
format
(
"收到mqtt消息:%s"
,
message
));
DataSyncMessage
dataSyncMessage
=
JSON
.
parseObject
(
message
,
DataSyncMessage
.
class
);
List
<
Serializable
>
data
=
dataSyncMessage
.
getData
();
DataSyncTypeEnum
type
=
dataSyncMessage
.
getType
();
DataSyncOperationEnum
operation
=
dataSyncMessage
.
getOperation
();
assert
dataSyncMessage
.
getType
()
!=
null
&&
dataSyncMessage
.
getOperation
()
!=
null
:
"同步消息体信息不足"
;
if
(
data
==
null
||
data
.
isEmpty
())
{
return
;
}
switch
(
type
)
{
case
CONTINGENCY_PLAN_INSTANCE:
{
switch
(
operation
)
{
case
DELETE:
{
contingencyPlanInstance
.
astDeleteByIds
(
data
.
stream
().
map
(
x
->
((
JSONObject
)
x
).
toJavaObject
(
ContingencyPlanInstance
.
class
).
getId
()).
collect
(
Collectors
.
toList
()));
break
;
}
default
:
{
contingencyPlanInstance
.
astSaveOrUpdateBatch
(
data
.
stream
().
map
(
x
->
((
JSONObject
)
x
).
toJavaObject
(
ContingencyPlanInstance
.
class
)).
collect
(
Collectors
.
toList
()));
}
}
break
;
}
case
PATROL_CHECK:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
PATROL_CHECK_INPUT:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
PATROL_CHECK_SHOT:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
PATROL_LATENT_DANGER:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
PATROL_LATENT_DANGER_FLOW_RECORD:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
PATROL_LATENT_DANGER_PATROL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
PATROL_LATENT_DANGER_PHOTO:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
PATROL_PLAN_TASK:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
PATROL_PLAN_TASK_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
default
:
// Async.shutDown();
}
}
}
amos-boot-module/amos-boot-module-bus/amos-boot-module-sqlsync-bus/src/main/java/com/boot/bus/sqlsync/service/impl/SyncMqttMessageService.java
deleted
100644 → 0
View file @
fabb6e9c
package
com
.
boot
.
bus
.
sqlsync
.
service
.
impl
;
import
com.alibaba.fastjson.JSONObject
;
import
com.boot.bus.sqlsync.enums.DataSyncOperationEnum
;
import
com.boot.bus.sqlsync.enums.DataSyncTypeEnum
;
import
com.boot.bus.sqlsync.message.DataSyncMessage
;
import
com.boot.bus.sqlsync.service.infc.IContingencyPlanInstance
;
import
com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Service
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.stream.Collectors
;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.service.impl
* @ClassName: SyncMqttMessageService
* @Author: Jianqiang Gao
* @Description: 同步mqtt消息
* @Date: 2021/7/8 16:54
* @Version: 1.0
*/
@Service
(
"SyncMqttMessageService"
)
public
class
SyncMqttMessageService
{
@Autowired
private
IContingencyPlanInstance
contingencyPlanInstance
;
@Autowired
private
RedisTemplate
redisTemplate
;
/**
* @param message
*/
public
void
syncData
(
DataSyncMessage
message
)
{
DataSyncTypeEnum
type
=
message
.
getType
();
DataSyncOperationEnum
operation
=
message
.
getOperation
();
List
<
Serializable
>
data
=
message
.
getData
();
assert
message
.
getType
()
!=
null
&&
message
.
getOperation
()
!=
null
:
"同步消息体信息不足"
;
if
(
data
==
null
||
data
.
isEmpty
())
{
return
;
}
switch
(
type
)
{
case
CONTINGENCY_PLAN_INSTANCE:
{
switch
(
operation
)
{
case
DELETE:
{
contingencyPlanInstance
.
astDeleteByIds
(
data
.
stream
().
map
(
x
->
((
JSONObject
)
x
).
toJavaObject
(
ContingencyPlanInstance
.
class
).
getId
()).
collect
(
Collectors
.
toList
()));
break
;
}
default
:
{
contingencyPlanInstance
.
astSaveOrUpdateBatch
(
data
.
stream
().
map
(
x
->
((
JSONObject
)
x
).
toJavaObject
(
ContingencyPlanInstance
.
class
)).
collect
(
Collectors
.
toList
()));
}
}
break
;
}
case
CONTINGENCY_PLAN_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
CONTINGENCY_PLAN_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
CONTINGENCY_PLAN_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
CONTINGENCY_PLAN_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
CONTINGENCY_PLAN_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
CONTINGENCY_PLAN_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
CONTINGENCY_PLAN_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
case
CONTINGENCY_PLAN_DETAIL:
{
switch
(
operation
)
{
case
DELETE:
{
break
;
}
default
:
{
}
}
}
default
:
// Async.shutDown();
}
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment