Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
20970b49
Commit
20970b49
authored
Oct 09, 2023
by
litengwei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
任务 19992 装备报废逻辑
parent
3539ed33
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
65 additions
and
95 deletions
+65
-95
DateUtils.java
.../java/com/yeejoin/equipmanage/common/utils/DateUtils.java
+1
-0
EquipmentDetailController.java
...oin/equipmanage/controller/EquipmentDetailController.java
+5
-25
IdxFeign.java
...src/main/java/com/yeejoin/equipmanage/fegin/IdxFeign.java
+3
-1
EquipmentSpecificSerivceImpl.java
...quipmanage/service/impl/EquipmentSpecificSerivceImpl.java
+23
-3
MqttReceiveServiceImpl.java
...join/equipmanage/service/impl/MqttReceiveServiceImpl.java
+1
-0
RiskSourceServiceImpl.java
...ejoin/equipmanage/service/impl/RiskSourceServiceImpl.java
+1
-1
ScrapServiceImpl.java
...om/yeejoin/equipmanage/service/impl/ScrapServiceImpl.java
+8
-0
EquipmentSpecificIndexMapper.xml
...rc/main/resources/mapper/EquipmentSpecificIndexMapper.xml
+1
-1
KafkaConsumerService.java
.../com/yeejoin/amos/message/kafka/KafkaConsumerService.java
+16
-64
topic.json
...mos-boot-utils-message/src/main/resources/json/topic.json
+6
-0
No files found.
amos-boot-module/amos-boot-module-api/amos-boot-module-equip-api/src/main/java/com/yeejoin/equipmanage/common/utils/DateUtils.java
View file @
20970b49
...
@@ -24,6 +24,7 @@ public class DateUtils {
...
@@ -24,6 +24,7 @@ public class DateUtils {
private
static
final
Logger
logs
=
LoggerFactory
.
getLogger
(
DateUtils
.
class
);
private
static
final
Logger
logs
=
LoggerFactory
.
getLogger
(
DateUtils
.
class
);
public
static
final
String
DATE_TIME_PATTERN
=
"yyyy-MM-dd HH:mm:ss"
;
public
static
final
String
DATE_TIME_PATTERN
=
"yyyy-MM-dd HH:mm:ss"
;
public
static
final
String
DATE_TIME_TT_PATTERN
=
"yyyy-MM-dd'T'HH:mm:ss"
;
public
static
final
String
MINUTE_PATTERN
=
"yyyy-MM-dd HH:mm"
;
public
static
final
String
MINUTE_PATTERN
=
"yyyy-MM-dd HH:mm"
;
public
static
final
String
HOUR_PATTERN
=
"yyyy-MM-dd HH"
;
public
static
final
String
HOUR_PATTERN
=
"yyyy-MM-dd HH"
;
public
static
final
String
DATE_PATTERN
=
"yyyy-MM-dd"
;
public
static
final
String
DATE_PATTERN
=
"yyyy-MM-dd"
;
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/controller/EquipmentDetailController.java
View file @
20970b49
...
@@ -22,6 +22,7 @@ import com.yeejoin.equipmanage.common.enums.SourceTypeEnum;
...
@@ -22,6 +22,7 @@ import com.yeejoin.equipmanage.common.enums.SourceTypeEnum;
import
com.yeejoin.equipmanage.common.utils.*
;
import
com.yeejoin.equipmanage.common.utils.*
;
import
com.yeejoin.equipmanage.common.vo.EquipmentDate
;
import
com.yeejoin.equipmanage.common.vo.EquipmentDate
;
import
com.yeejoin.equipmanage.dto.ExcelDto
;
import
com.yeejoin.equipmanage.dto.ExcelDto
;
import
com.yeejoin.equipmanage.fegin.IdxFeign
;
import
com.yeejoin.equipmanage.mapper.*
;
import
com.yeejoin.equipmanage.mapper.*
;
import
com.yeejoin.equipmanage.service.*
;
import
com.yeejoin.equipmanage.service.*
;
import
com.yeejoin.equipmanage.service.impl.FireFightingSystemServiceImpl
;
import
com.yeejoin.equipmanage.service.impl.FireFightingSystemServiceImpl
;
...
@@ -124,6 +125,9 @@ public class EquipmentDetailController extends AbstractBaseController {
...
@@ -124,6 +125,9 @@ public class EquipmentDetailController extends AbstractBaseController {
@Autowired
@Autowired
IEquipmentIndexService
equipmentIndexService
;
IEquipmentIndexService
equipmentIndexService
;
@Autowired
private
IdxFeign
idxFeign
;
@Value
(
"${iot.code.prefix.have.used:20210003,20210004,20210005}"
)
@Value
(
"${iot.code.prefix.have.used:20210003,20210004,20210005}"
)
private
String
haveUsedIotPrefix
;
private
String
haveUsedIotPrefix
;
...
@@ -310,31 +314,7 @@ public class EquipmentDetailController extends AbstractBaseController {
...
@@ -310,31 +314,7 @@ public class EquipmentDetailController extends AbstractBaseController {
Date
now
=
new
Date
();
Date
now
=
new
Date
();
int
day
=
DateUtils
.
dateBetween
(
now
,
calendar
.
getTime
());
int
day
=
DateUtils
.
dateBetween
(
now
,
calendar
.
getTime
());
if
(
day
>
-
1
){
if
(
day
>
-
1
){
idxFeign
.
handleEquipScrapWhenExpired
(
String
.
valueOf
(
vo
.
getId
()));
//消除设备二维码记录时间
LambdaQueryWrapper
<
EquipQrcodeRecord
>
wrapper
=
new
LambdaQueryWrapper
<>();
wrapper
.
eq
(
EquipQrcodeRecord:
:
getEquipid
,
vo
.
getId
());
wrapper
.
eq
(
EquipQrcodeRecord:
:
getSource
,
"scrap"
);
wrapper
.
isNull
(
EquipQrcodeRecord:
:
getCleanTime
);
EquipQrcodeRecord
equipQrcodeRecord
=
equipQrcodeRecordMapper
.
selectOne
(
wrapper
);
if
(!
ObjectUtils
.
isEmpty
(
equipQrcodeRecord
)){
equipQrcodeRecord
.
setCleanTime
(
new
Date
());
equipQrcodeRecord
.
setCleanReason
(
"设备报废日期更新后消除"
);
equipQrcodeRecordMapper
.
updateById
(
equipQrcodeRecord
);
}
//查询二维码事件记录表中该设备的历史数据
LambdaQueryWrapper
<
EquipQrcodeRecord
>
query
=
new
LambdaQueryWrapper
<>();
query
.
eq
(
EquipQrcodeRecord:
:
getEquipid
,
vo
.
getId
());
query
.
isNull
(
EquipQrcodeRecord:
:
getCleanTime
);
List
<
EquipQrcodeRecord
>
equipQrcodeRecords
=
equipQrcodeRecordMapper
.
selectList
(
query
);
String
status
=
equipQrcodeRecords
.
stream
().
sorted
(
Comparator
.
comparing
(
EquipQrcodeRecord:
:
getStatus
)).
findFirst
().
get
().
getStatus
();
if
(
equipQrcodeRecords
.
size
()
>
0
)
{
//如果记录表中还存在未消除的其他事件 则按照优先级赋码
equipmentSpecificSerivce
.
updateEquipSpecificStatus
(
status
,
String
.
valueOf
(
vo
.
getId
()));
}
else
{
equipmentSpecificSerivce
.
updateEquipSpecificStatus
(
EquipQrcodeColorEnum
.
GREEN
.
getCode
(),
String
.
valueOf
(
vo
.
getId
()));
}
}
}
}
catch
(
ParseException
e
)
{
}
catch
(
ParseException
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/fegin/IdxFeign.java
View file @
20970b49
...
@@ -36,8 +36,10 @@ public interface IdxFeign {
...
@@ -36,8 +36,10 @@ public interface IdxFeign {
@RequestMapping
(
value
=
"/qrcode/scrap/expired/put"
,
method
=
RequestMethod
.
GET
)
@RequestMapping
(
value
=
"/qrcode/scrap/expired/put"
,
method
=
RequestMethod
.
GET
)
ResponseModel
<
JSONObject
>
handleEquipNotScrapWhenExpired
(
@RequestParam
(
"equipId"
)
String
equipId
,
ResponseModel
<
JSONObject
>
handleEquipNotScrapWhenExpired
(
@RequestParam
(
"equipId"
)
String
equipId
,
@RequestParam
(
"bizOrgCode"
)
String
bizOrgCode
,
@RequestParam
(
"bizOrgCode"
)
String
bizOrgCode
,
@RequestParam
(
"bizOrg
Cod
e"
)
String
bizOrgName
);
@RequestParam
(
"bizOrg
Nam
e"
)
String
bizOrgName
);
@RequestMapping
(
value
=
"/qrcode/scrap/expired/clean"
,
method
=
RequestMethod
.
GET
)
ResponseModel
<
JSONObject
>
handleEquipScrapWhenExpired
(
@RequestParam
(
"equipId"
)
String
equipId
);
@RequestMapping
(
value
=
"/eventLog/cleanQrcode"
,
method
=
RequestMethod
.
GET
)
@RequestMapping
(
value
=
"/eventLog/cleanQrcode"
,
method
=
RequestMethod
.
GET
)
ResponseModel
<
JSONObject
>
cleanQrcode
(
@RequestParam
(
"sourceId"
)
String
sourceId
,
ResponseModel
<
JSONObject
>
cleanQrcode
(
@RequestParam
(
"sourceId"
)
String
sourceId
,
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/EquipmentSpecificSerivceImpl.java
View file @
20970b49
...
@@ -41,6 +41,7 @@ import liquibase.pro.packaged.S;
...
@@ -41,6 +41,7 @@ import liquibase.pro.packaged.S;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.io.IOUtils
;
import
org.apache.commons.io.IOUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.eclipse.paho.client.mqttv3.MqttException
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.beans.factory.annotation.Value
;
...
@@ -52,6 +53,7 @@ import org.springframework.stereotype.Service;
...
@@ -52,6 +53,7 @@ import org.springframework.stereotype.Service;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.util.CollectionUtils
;
import
org.springframework.util.CollectionUtils
;
import
org.springframework.util.ObjectUtils
;
import
org.springframework.util.ObjectUtils
;
import
org.typroject.tyboot.component.emq.EmqKeeper
;
import
org.typroject.tyboot.core.foundation.utils.Bean
;
import
org.typroject.tyboot.core.foundation.utils.Bean
;
import
org.typroject.tyboot.core.foundation.utils.DateTimeUtil
;
import
org.typroject.tyboot.core.foundation.utils.DateTimeUtil
;
import
org.typroject.tyboot.core.restful.exception.instance.BadRequest
;
import
org.typroject.tyboot.core.restful.exception.instance.BadRequest
;
...
@@ -213,6 +215,9 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
...
@@ -213,6 +215,9 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
@Value
(
"${equipment.pressurepump.start}"
)
@Value
(
"${equipment.pressurepump.start}"
)
private
String
pressurePumpStart
;
private
String
pressurePumpStart
;
@Autowired
protected
EmqKeeper
emqKeeper
;
private
final
String
injection
=
"{\n"
+
private
final
String
injection
=
"{\n"
+
"\n"
+
"\n"
+
...
@@ -1986,7 +1991,7 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
...
@@ -1986,7 +1991,7 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
try
{
try
{
if
(
e
.
get
(
"weExpiry"
)
!=
null
)
{
if
(
e
.
get
(
"weExpiry"
)
!=
null
)
{
int
year
=
Integer
.
parseInt
(
e
.
get
(
"weExpiry"
).
toString
());
int
year
=
Integer
.
parseInt
(
e
.
get
(
"weExpiry"
).
toString
());
Date
productDate
=
DateUtils
.
dateParse
(
e
.
get
(
"product"
).
toString
(),
DateUtils
.
DATE_TIME_PATTERN
);
Date
productDate
=
DateUtils
.
dateParse
(
e
.
get
(
"product"
).
toString
(),
DateUtils
.
DATE_TIME_
TT_
PATTERN
);
Calendar
calendar
=
Calendar
.
getInstance
();
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTime
(
productDate
);
calendar
.
setTime
(
productDate
);
calendar
.
add
(
Calendar
.
YEAR
,
year
);
calendar
.
add
(
Calendar
.
YEAR
,
year
);
...
@@ -1995,9 +2000,24 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
...
@@ -1995,9 +2000,24 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
String
scrapTime
=
new
SimpleDateFormat
(
DateUtils
.
DATE_TIME_PATTERN
).
format
(
calendar
.
getTime
());
String
scrapTime
=
new
SimpleDateFormat
(
DateUtils
.
DATE_TIME_PATTERN
).
format
(
calendar
.
getTime
());
int
day
=
DateUtils
.
dateBetween
(
now
,
calendar
.
getTime
());
int
day
=
DateUtils
.
dateBetween
(
now
,
calendar
.
getTime
());
if
(
day
<
Integer
.
parseInt
(
equipmentScrapDay
)
&&
day
>
-
1
)
{
if
(
day
<
Integer
.
parseInt
(
equipmentScrapDay
)
&&
day
>
-
1
)
{
syncSystemctlMsg
(
e
,
scrapTime
,
day
);
syncSystemctlMsg
(
e
,
scrapTime
,
day
);
}
else
if
(
day
<=
-
1
){
}
else
if
(
day
==
-
1
){
// 发送emq消息转kafka
JSONObject
jsonObject
=
new
JSONObject
();
Map
<
String
,
String
>
map
=
new
HashMap
<>();
map
.
put
(
"id"
,
e
.
get
(
"id"
).
toString
());
map
.
put
(
"bizOrgCode"
,
e
.
get
(
"bizOrgCode"
).
toString
());
map
.
put
(
"bizOrgName"
,
e
.
get
(
"bizOrgName"
).
toString
());
jsonObject
.
put
(
"topic"
,
"equip/scrap/put"
);
jsonObject
.
put
(
"data"
,
JSONObject
.
toJSON
(
map
));
try
{
emqKeeper
.
getMqttClient
().
publish
(
"emq.scrap.qrcode.put"
,
jsonObject
.
toString
().
getBytes
(),
1
,
false
);
}
catch
(
MqttException
exp
)
{
log
.
info
(
String
.
format
(
"发送eqm转kafka消息失败:%s"
,
exp
.
getMessage
()));
}
idxFeign
.
handleEquipNotScrapWhenExpired
(
String
.
valueOf
(
e
.
get
(
"id"
)),
String
.
valueOf
(
e
.
get
(
"bizOrgCode"
)),
String
.
valueOf
(
e
.
get
(
"bizOrgName"
)));
idxFeign
.
handleEquipNotScrapWhenExpired
(
String
.
valueOf
(
e
.
get
(
"id"
)),
String
.
valueOf
(
e
.
get
(
"bizOrgCode"
)),
String
.
valueOf
(
e
.
get
(
"bizOrgName"
)));
}
}
}
}
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/MqttReceiveServiceImpl.java
View file @
20970b49
...
@@ -1010,6 +1010,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
...
@@ -1010,6 +1010,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
equipQrDateDto
.
setSource
(
"equip"
);
equipQrDateDto
.
setSource
(
"equip"
);
equipQrDateDto
.
setType
(
equipmentSpecificIndex
.
getTypeCode
());
equipQrDateDto
.
setType
(
equipmentSpecificIndex
.
getTypeCode
());
equipQrDateDto
.
setData
(
list
);
equipQrDateDto
.
setData
(
list
);
log
.
error
(
"调用规则JSON对象:{}"
,
JSONObject
.
toJSONString
(
equipQrDateDto
));
try
{
try
{
ruleTrigger
.
publish
(
equipQrDateDto
,
"中心配置赋码规则/update-qr-code"
,
null
);
ruleTrigger
.
publish
(
equipQrDateDto
,
"中心配置赋码规则/update-qr-code"
,
null
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/RiskSourceServiceImpl.java
View file @
20970b49
...
@@ -126,7 +126,7 @@ public class RiskSourceServiceImpl implements IRiskSourceService {
...
@@ -126,7 +126,7 @@ public class RiskSourceServiceImpl implements IRiskSourceService {
}
}
equipInfoVo
.
setId
(
equipmentSpecId
);
equipInfoVo
.
setId
(
equipmentSpecId
);
if
(
StringUtil
.
isNotEmpty
(
equipmentSpecId
))
{
if
(
StringUtil
.
isNotEmpty
(
equipmentSpecId
)
&&
!
source
.
equalsIgnoreCase
(
RiskSourceTypeEnum
.
PATROL
.
getName
())
)
{
EquipmentSpecific
specific
=
equipmentSpecificMapper
.
selectById
(
equipmentSpecId
);
EquipmentSpecific
specific
=
equipmentSpecificMapper
.
selectById
(
equipmentSpecId
);
equipInfoVo
.
setPosition
(
specific
.
getPosition
());
equipInfoVo
.
setPosition
(
specific
.
getPosition
());
equipInfoVo
.
setManufacturerName
(
getEquipmentDetailInfo
(
specific
.
getEquipmentDetailId
()).
getManufacturerName
());
equipInfoVo
.
setManufacturerName
(
getEquipmentDetailInfo
(
specific
.
getEquipmentDetailId
()).
getManufacturerName
());
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/ScrapServiceImpl.java
View file @
20970b49
...
@@ -10,6 +10,7 @@ import com.yeejoin.equipmanage.common.entity.vo.EquipmentOnCarAppVO;
...
@@ -10,6 +10,7 @@ import com.yeejoin.equipmanage.common.entity.vo.EquipmentOnCarAppVO;
import
com.yeejoin.equipmanage.common.enums.*
;
import
com.yeejoin.equipmanage.common.enums.*
;
import
com.yeejoin.equipmanage.common.exception.BaseException
;
import
com.yeejoin.equipmanage.common.exception.BaseException
;
import
com.yeejoin.equipmanage.common.utils.StringUtil
;
import
com.yeejoin.equipmanage.common.utils.StringUtil
;
import
com.yeejoin.equipmanage.fegin.IdxFeign
;
import
com.yeejoin.equipmanage.mapper.CarMapper
;
import
com.yeejoin.equipmanage.mapper.CarMapper
;
import
com.yeejoin.equipmanage.mapper.EquipmentOnCarMapper
;
import
com.yeejoin.equipmanage.mapper.EquipmentOnCarMapper
;
import
com.yeejoin.equipmanage.mapper.ScrapMapper
;
import
com.yeejoin.equipmanage.mapper.ScrapMapper
;
...
@@ -56,6 +57,9 @@ public class ScrapServiceImpl extends ServiceImpl<ScrapMapper, Scrap> implements
...
@@ -56,6 +57,9 @@ public class ScrapServiceImpl extends ServiceImpl<ScrapMapper, Scrap> implements
@Autowired
@Autowired
private
EquipmentOnCarMapper
equipmentOnCarMapper
;
private
EquipmentOnCarMapper
equipmentOnCarMapper
;
@Autowired
private
IdxFeign
idxFeign
;
@Override
@Override
public
Scrap
create
(
List
<
ScrapDetail
>
list
,
String
type
,
AgencyUserModel
agencyUserModel
)
{
public
Scrap
create
(
List
<
ScrapDetail
>
list
,
String
type
,
AgencyUserModel
agencyUserModel
)
{
this
.
scrapCheck
(
type
,
list
);
this
.
scrapCheck
(
type
,
list
);
...
@@ -201,6 +205,10 @@ public class ScrapServiceImpl extends ServiceImpl<ScrapMapper, Scrap> implements
...
@@ -201,6 +205,10 @@ public class ScrapServiceImpl extends ServiceImpl<ScrapMapper, Scrap> implements
// stockDetail.setAmount(stockAmount.subtract(scrapAmount).doubleValue());
// stockDetail.setAmount(stockAmount.subtract(scrapAmount).doubleValue());
stockDetail
.
setAmount
(
stockAmount
.
doubleValue
());
stockDetail
.
setAmount
(
stockAmount
.
doubleValue
());
stockDetailList
.
add
(
stockDetail
);
stockDetailList
.
add
(
stockDetail
);
// 装备报废时消码为绿码
idxFeign
.
handleEquipScrapWhenExpired
(
String
.
valueOf
(
stockDetail
.
getEquipmentSpecificId
()));
}
}
stockDetailService
.
updateBatchById
(
stockDetailList
);
stockDetailService
.
updateBatchById
(
stockDetailList
);
}
}
...
...
amos-boot-system-equip/src/main/resources/mapper/EquipmentSpecificIndexMapper.xml
View file @
20970b49
...
@@ -492,7 +492,7 @@
...
@@ -492,7 +492,7 @@
left join wl_warehouse_structure str on str.id = wes.warehouse_structure_id
left join wl_warehouse_structure str on str.id = wes.warehouse_structure_id
left join wl_stock_detail wlsd on wes.id = wlsd.equipment_specific_id
left join wl_stock_detail wlsd on wes.id = wlsd.equipment_specific_id
where wed.production_date is not null
where wed.production_date is not null
and wlsd.status != 7
</select>
</select>
<select
id=
"getEquipIndexInIndex"
resultType=
"com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex"
>
<select
id=
"getEquipIndexInIndex"
resultType=
"com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex"
>
SELECT
SELECT
...
...
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/message/kafka/KafkaConsumerService.java
View file @
20970b49
...
@@ -25,9 +25,8 @@ import static com.yeejoin.amos.message.kafka.Constant.*;
...
@@ -25,9 +25,8 @@ import static com.yeejoin.amos.message.kafka.Constant.*;
@Slf4j
@Slf4j
@Service
@Service
public
class
KafkaConsumerService
{
public
class
KafkaConsumerService
{
private
static
final
String
MQTT_TOPIC
=
"romaSite/data/transmit"
;
private
static
final
String
MQTT_TOPIC_SHAOSHAN
=
"romaSite/data/shaoshan"
;
private
static
final
String
MQTT_TOPIC
=
"romaSite/data/transmit"
;
private
static
final
String
PROVINCE_MQTT_TOPIC
=
"province/data/transport"
;
private
static
final
String
PROVINCE_MQTT_TOPIC
=
"province/data/transport"
;
@Autowired
@Autowired
protected
EmqKeeper
emqKeeper
;
protected
EmqKeeper
emqKeeper
;
...
@@ -56,7 +55,7 @@ public class KafkaConsumerService {
...
@@ -56,7 +55,7 @@ public class KafkaConsumerService {
}
}
}
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"kafka失败,当前失败的批次: data:{}
"
,
consumerRecords
);
log
.
error
(
"kafka失败,当前失败的批次: data:{}
, {}"
,
consumerRecords
,
e
);
}
finally
{
}
finally
{
ack
.
acknowledge
();
ack
.
acknowledge
();
}
}
...
@@ -90,73 +89,33 @@ public class KafkaConsumerService {
...
@@ -90,73 +89,33 @@ public class KafkaConsumerService {
}
}
}
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"kafka失败,当前失败的批次: data:{}
"
,
consumerRecords
);
log
.
error
(
"kafka失败,当前失败的批次: data:{}
, {}"
,
consumerRecords
,
e
);
}
finally
{
}
finally
{
ack
.
acknowledge
();
ack
.
acknowledge
();
}
}
}
}
/**
/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
*
* @param message 消息
*
*/
@KafkaListener
(
id
=
"consumerSingle"
,
idIsGroup
=
false
,
topics
=
"#{'${kafka.topics}'.split(',')}"
,
concurrency
=
"2"
)
public
void
consumerSingle
(
String
message
,
Acknowledgment
ack
)
{
JSONObject
messageObj
=
JSONObject
.
fromObject
(
message
);
try
{
String
topic
=
messageObj
.
getString
(
"topic"
);
JSONObject
data
=
messageObj
.
getJSONObject
(
"data"
);
emqKeeper
.
getMqttClient
().
publish
(
topic
,
data
.
toString
().
getBytes
(
StandardCharsets
.
UTF_8
),
0
,
false
);
ack
.
acknowledge
();
}
catch
(
MqttException
e
)
{
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
}
}
/**
* 转发苏州,绍兴换流站Kafka数据对emq
* 转发苏州,绍兴换流站Kafka数据对emq
*
*
* @param record record
* @param record record
* 绍兴,苏州换流站对接Kafka数据
* @param ack ack
* @param record record
*/
*/
@KafkaListener
(
id
=
"kafkaRoma"
,
groupId
=
"kafkaRoma"
,
topics
=
"#{'${queue.kafka.topics}'.split(',')}"
,
containerFactory
=
"kafkaRomaContainerFactory"
)
@KafkaListener
(
id
=
"kafkaRoma"
,
groupId
=
"kafkaRoma"
,
topics
=
"#{'${queue.kafka.topics}'.split(',')}"
,
containerFactory
=
"kafkaRomaContainerFactory"
)
public
void
kafkaListener
(
ConsumerRecord
<?,
String
>
record
,
Acknowledgment
ack
)
{
public
void
kafkaListener
(
ConsumerRecord
<?,
String
>
record
,
Acknowledgment
ack
)
{
try
{
Optional
<?>
messages
=
Optional
.
ofNullable
(
record
.
value
());
Optional
<?>
messages
=
Optional
.
ofNullable
(
record
.
value
());
if
(
messages
.
isPresent
())
{
if
(
messages
.
isPresent
())
{
try
{
JSONObject
messageObj
=
JSONObject
.
fromObject
(
record
.
value
());
if
(
messageObj
.
getJSONObject
(
BODY
).
isEmpty
())
{
messageObj
.
put
(
DATA_TYPE
,
STATE
);
}
emqKeeper
.
getMqttClient
().
publish
(
MQTT_TOPIC
,
messageObj
.
toString
().
getBytes
(
StandardCharsets
.
UTF_8
),
0
,
false
);
}
catch
(
MqttException
e
)
{
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
}
finally
{
ack
.
acknowledge
();
}
}
}
/**
* 韶山换流对接Kafka
* @param record record
* @param ack ack
*/
@KafkaListener
(
id
=
"kafkaConsumer"
,
groupId
=
"kafkaConsumerGroup"
,
topics
=
"#{'${queue.kafka.shaoshan.topics}'.split(',')}"
,
containerFactory
=
"kafkaRomaContainerFactory"
)
public
void
kafkaConsumer
(
ConsumerRecord
<?,
String
>
record
,
Acknowledgment
ack
)
{
Optional
<?>
message
=
Optional
.
ofNullable
(
record
.
value
());
if
(
message
.
isPresent
())
{
try
{
JSONObject
messageObj
=
JSONObject
.
fromObject
(
record
.
value
());
JSONObject
messageObj
=
JSONObject
.
fromObject
(
record
.
value
());
JSONObject
data
=
messageObj
.
getJSONObject
(
"body"
);
if
(
messageObj
.
getJSONObject
(
BODY
).
isEmpty
())
{
emqKeeper
.
getMqttClient
().
publish
(
MQTT_TOPIC_SHAOSHAN
,
data
.
toString
().
getBytes
(
StandardCharsets
.
UTF_8
),
0
,
false
);
messageObj
.
put
(
DATA_TYPE
,
STATE
);
ack
.
acknowledge
();
}
}
catch
(
MqttException
e
)
{
emqKeeper
.
getMqttClient
().
publish
(
MQTT_TOPIC
,
messageObj
.
toString
().
getBytes
(
StandardCharsets
.
UTF_8
),
0
,
false
);
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
}
}
}
catch
(
MqttException
e
)
{
log
.
error
(
"换流站转发Kafka消息失败"
+
e
.
getMessage
(),
e
);
}
finally
{
ack
.
acknowledge
();
}
}
}
}
...
@@ -253,6 +212,7 @@ public class KafkaConsumerService {
...
@@ -253,6 +212,7 @@ public class KafkaConsumerService {
// }
// }
//
//
//
//
//
//kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
//kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
//@KafkaListener(topics = "test", groupId = "zhTestGroup")
//@KafkaListener(topics = "test", groupId = "zhTestGroup")
//public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
//public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
...
@@ -262,14 +222,6 @@ public class KafkaConsumerService {
...
@@ -262,14 +222,6 @@ public class KafkaConsumerService {
// //手动提交offset
// //手动提交offset
// ack.acknowledge();
// ack.acknowledge();
//}
//}
// //kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
// @KafkaListener(topics = "test", groupId = "zhTestGroup")
// public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
// String value = record.value();
// System.out.println(value);
// System.out.println(record);
// //手动提交offset
// ack.acknowledge();
// }
}
}
amos-boot-utils/amos-boot-utils-message/src/main/resources/json/topic.json
View file @
20970b49
...
@@ -44,6 +44,12 @@
...
@@ -44,6 +44,12 @@
"emqTopic"
:
"emq.mcb.zxj"
,
"emqTopic"
:
"emq.mcb.zxj"
,
"akkaTopic"
:
"JKXT2BP-XFYY-Topic"
"akkaTopic"
:
"JKXT2BP-XFYY-Topic"
},
},
,
{
"code"
:
"equipScrapQrcodePut"
,
"emqTopic"
:
"emq.scrap.qrcode.put"
,
"akkaTopic"
:
"JKXT2BP-XFYY-Topic"
},
{
{
"code"
:
"mcbsubmit"
,
"code"
:
"mcbsubmit"
,
"emqTopic"
:
"risk.submit"
,
"emqTopic"
:
"risk.submit"
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment