Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
c7cc8cc3
Commit
c7cc8cc3
authored
Oct 17, 2023
by
李秀明
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/develop_dl' into develop_dl
parents
e0475f0f
3425aa4a
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
66 additions
and
96 deletions
+66
-96
DateUtils.java
.../java/com/yeejoin/equipmanage/common/utils/DateUtils.java
+1
-0
EquipmentDetailController.java
...oin/equipmanage/controller/EquipmentDetailController.java
+5
-25
IdxFeign.java
...src/main/java/com/yeejoin/equipmanage/fegin/IdxFeign.java
+3
-1
EquipmentSpecificSerivceImpl.java
...quipmanage/service/impl/EquipmentSpecificSerivceImpl.java
+23
-3
MqttReceiveServiceImpl.java
...join/equipmanage/service/impl/MqttReceiveServiceImpl.java
+1
-0
RiskSourceServiceImpl.java
...ejoin/equipmanage/service/impl/RiskSourceServiceImpl.java
+1
-1
ScrapServiceImpl.java
...om/yeejoin/equipmanage/service/impl/ScrapServiceImpl.java
+8
-0
EquipmentSpecificIndexMapper.xml
...rc/main/resources/mapper/EquipmentSpecificIndexMapper.xml
+1
-1
KafkaConsumerService.java
.../com/yeejoin/amos/message/kafka/KafkaConsumerService.java
+16
-64
application-dev.properties
...ils-message/src/main/resources/application-dev.properties
+1
-1
topic.json
...mos-boot-utils-message/src/main/resources/json/topic.json
+6
-0
No files found.
amos-boot-module/amos-boot-module-api/amos-boot-module-equip-api/src/main/java/com/yeejoin/equipmanage/common/utils/DateUtils.java
View file @
c7cc8cc3
...
@@ -24,6 +24,7 @@ public class DateUtils {
...
@@ -24,6 +24,7 @@ public class DateUtils {
private
static
final
Logger
logs
=
LoggerFactory
.
getLogger
(
DateUtils
.
class
);
private
static
final
Logger
logs
=
LoggerFactory
.
getLogger
(
DateUtils
.
class
);
public
static
final
String
DATE_TIME_PATTERN
=
"yyyy-MM-dd HH:mm:ss"
;
public
static
final
String
DATE_TIME_PATTERN
=
"yyyy-MM-dd HH:mm:ss"
;
public
static
final
String
DATE_TIME_TT_PATTERN
=
"yyyy-MM-dd'T'HH:mm:ss"
;
public
static
final
String
MINUTE_PATTERN
=
"yyyy-MM-dd HH:mm"
;
public
static
final
String
MINUTE_PATTERN
=
"yyyy-MM-dd HH:mm"
;
public
static
final
String
HOUR_PATTERN
=
"yyyy-MM-dd HH"
;
public
static
final
String
HOUR_PATTERN
=
"yyyy-MM-dd HH"
;
public
static
final
String
DATE_PATTERN
=
"yyyy-MM-dd"
;
public
static
final
String
DATE_PATTERN
=
"yyyy-MM-dd"
;
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/controller/EquipmentDetailController.java
View file @
c7cc8cc3
...
@@ -22,6 +22,7 @@ import com.yeejoin.equipmanage.common.enums.SourceTypeEnum;
...
@@ -22,6 +22,7 @@ import com.yeejoin.equipmanage.common.enums.SourceTypeEnum;
import
com.yeejoin.equipmanage.common.utils.*
;
import
com.yeejoin.equipmanage.common.utils.*
;
import
com.yeejoin.equipmanage.common.vo.EquipmentDate
;
import
com.yeejoin.equipmanage.common.vo.EquipmentDate
;
import
com.yeejoin.equipmanage.dto.ExcelDto
;
import
com.yeejoin.equipmanage.dto.ExcelDto
;
import
com.yeejoin.equipmanage.fegin.IdxFeign
;
import
com.yeejoin.equipmanage.mapper.*
;
import
com.yeejoin.equipmanage.mapper.*
;
import
com.yeejoin.equipmanage.service.*
;
import
com.yeejoin.equipmanage.service.*
;
import
com.yeejoin.equipmanage.service.impl.FireFightingSystemServiceImpl
;
import
com.yeejoin.equipmanage.service.impl.FireFightingSystemServiceImpl
;
...
@@ -124,6 +125,9 @@ public class EquipmentDetailController extends AbstractBaseController {
...
@@ -124,6 +125,9 @@ public class EquipmentDetailController extends AbstractBaseController {
@Autowired
@Autowired
IEquipmentIndexService
equipmentIndexService
;
IEquipmentIndexService
equipmentIndexService
;
@Autowired
private
IdxFeign
idxFeign
;
@Value
(
"${iot.code.prefix.have.used:20210003,20210004,20210005}"
)
@Value
(
"${iot.code.prefix.have.used:20210003,20210004,20210005}"
)
private
String
haveUsedIotPrefix
;
private
String
haveUsedIotPrefix
;
...
@@ -310,31 +314,7 @@ public class EquipmentDetailController extends AbstractBaseController {
...
@@ -310,31 +314,7 @@ public class EquipmentDetailController extends AbstractBaseController {
Date
now
=
new
Date
();
Date
now
=
new
Date
();
int
day
=
DateUtils
.
dateBetween
(
now
,
calendar
.
getTime
());
int
day
=
DateUtils
.
dateBetween
(
now
,
calendar
.
getTime
());
if
(
day
>
-
1
){
if
(
day
>
-
1
){
idxFeign
.
handleEquipScrapWhenExpired
(
String
.
valueOf
(
vo
.
getId
()));
//消除设备二维码记录时间
LambdaQueryWrapper
<
EquipQrcodeRecord
>
wrapper
=
new
LambdaQueryWrapper
<>();
wrapper
.
eq
(
EquipQrcodeRecord:
:
getEquipid
,
vo
.
getId
());
wrapper
.
eq
(
EquipQrcodeRecord:
:
getSource
,
"scrap"
);
wrapper
.
isNull
(
EquipQrcodeRecord:
:
getCleanTime
);
EquipQrcodeRecord
equipQrcodeRecord
=
equipQrcodeRecordMapper
.
selectOne
(
wrapper
);
if
(!
ObjectUtils
.
isEmpty
(
equipQrcodeRecord
)){
equipQrcodeRecord
.
setCleanTime
(
new
Date
());
equipQrcodeRecord
.
setCleanReason
(
"设备报废日期更新后消除"
);
equipQrcodeRecordMapper
.
updateById
(
equipQrcodeRecord
);
}
//查询二维码事件记录表中该设备的历史数据
LambdaQueryWrapper
<
EquipQrcodeRecord
>
query
=
new
LambdaQueryWrapper
<>();
query
.
eq
(
EquipQrcodeRecord:
:
getEquipid
,
vo
.
getId
());
query
.
isNull
(
EquipQrcodeRecord:
:
getCleanTime
);
List
<
EquipQrcodeRecord
>
equipQrcodeRecords
=
equipQrcodeRecordMapper
.
selectList
(
query
);
String
status
=
equipQrcodeRecords
.
stream
().
sorted
(
Comparator
.
comparing
(
EquipQrcodeRecord:
:
getStatus
)).
findFirst
().
get
().
getStatus
();
if
(
equipQrcodeRecords
.
size
()
>
0
)
{
//如果记录表中还存在未消除的其他事件 则按照优先级赋码
equipmentSpecificSerivce
.
updateEquipSpecificStatus
(
status
,
String
.
valueOf
(
vo
.
getId
()));
}
else
{
equipmentSpecificSerivce
.
updateEquipSpecificStatus
(
EquipQrcodeColorEnum
.
GREEN
.
getCode
(),
String
.
valueOf
(
vo
.
getId
()));
}
}
}
}
catch
(
ParseException
e
)
{
}
catch
(
ParseException
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/fegin/IdxFeign.java
View file @
c7cc8cc3
...
@@ -36,8 +36,10 @@ public interface IdxFeign {
...
@@ -36,8 +36,10 @@ public interface IdxFeign {
@RequestMapping
(
value
=
"/qrcode/scrap/expired/put"
,
method
=
RequestMethod
.
GET
)
@RequestMapping
(
value
=
"/qrcode/scrap/expired/put"
,
method
=
RequestMethod
.
GET
)
ResponseModel
<
JSONObject
>
handleEquipNotScrapWhenExpired
(
@RequestParam
(
"equipId"
)
String
equipId
,
ResponseModel
<
JSONObject
>
handleEquipNotScrapWhenExpired
(
@RequestParam
(
"equipId"
)
String
equipId
,
@RequestParam
(
"bizOrgCode"
)
String
bizOrgCode
,
@RequestParam
(
"bizOrgCode"
)
String
bizOrgCode
,
@RequestParam
(
"bizOrg
Cod
e"
)
String
bizOrgName
);
@RequestParam
(
"bizOrg
Nam
e"
)
String
bizOrgName
);
@RequestMapping
(
value
=
"/qrcode/scrap/expired/clean"
,
method
=
RequestMethod
.
GET
)
ResponseModel
<
JSONObject
>
handleEquipScrapWhenExpired
(
@RequestParam
(
"equipId"
)
String
equipId
);
@RequestMapping
(
value
=
"/eventLog/cleanQrcode"
,
method
=
RequestMethod
.
GET
)
@RequestMapping
(
value
=
"/eventLog/cleanQrcode"
,
method
=
RequestMethod
.
GET
)
ResponseModel
<
JSONObject
>
cleanQrcode
(
@RequestParam
(
"sourceId"
)
String
sourceId
,
ResponseModel
<
JSONObject
>
cleanQrcode
(
@RequestParam
(
"sourceId"
)
String
sourceId
,
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/EquipmentSpecificSerivceImpl.java
View file @
c7cc8cc3
...
@@ -41,6 +41,7 @@ import liquibase.pro.packaged.S;
...
@@ -41,6 +41,7 @@ import liquibase.pro.packaged.S;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.io.IOUtils
;
import
org.apache.commons.io.IOUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.eclipse.paho.client.mqttv3.MqttException
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.beans.factory.annotation.Value
;
...
@@ -52,6 +53,7 @@ import org.springframework.stereotype.Service;
...
@@ -52,6 +53,7 @@ import org.springframework.stereotype.Service;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.util.CollectionUtils
;
import
org.springframework.util.CollectionUtils
;
import
org.springframework.util.ObjectUtils
;
import
org.springframework.util.ObjectUtils
;
import
org.typroject.tyboot.component.emq.EmqKeeper
;
import
org.typroject.tyboot.core.foundation.utils.Bean
;
import
org.typroject.tyboot.core.foundation.utils.Bean
;
import
org.typroject.tyboot.core.foundation.utils.DateTimeUtil
;
import
org.typroject.tyboot.core.foundation.utils.DateTimeUtil
;
import
org.typroject.tyboot.core.restful.exception.instance.BadRequest
;
import
org.typroject.tyboot.core.restful.exception.instance.BadRequest
;
...
@@ -213,6 +215,9 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
...
@@ -213,6 +215,9 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
@Value
(
"${equipment.pressurepump.start}"
)
@Value
(
"${equipment.pressurepump.start}"
)
private
String
pressurePumpStart
;
private
String
pressurePumpStart
;
@Autowired
protected
EmqKeeper
emqKeeper
;
private
final
String
injection
=
"{\n"
+
private
final
String
injection
=
"{\n"
+
"\n"
+
"\n"
+
...
@@ -1986,7 +1991,7 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
...
@@ -1986,7 +1991,7 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
try
{
try
{
if
(
e
.
get
(
"weExpiry"
)
!=
null
)
{
if
(
e
.
get
(
"weExpiry"
)
!=
null
)
{
int
year
=
Integer
.
parseInt
(
e
.
get
(
"weExpiry"
).
toString
());
int
year
=
Integer
.
parseInt
(
e
.
get
(
"weExpiry"
).
toString
());
Date
productDate
=
DateUtils
.
dateParse
(
e
.
get
(
"product"
).
toString
(),
DateUtils
.
DATE_TIME_PATTERN
);
Date
productDate
=
DateUtils
.
dateParse
(
e
.
get
(
"product"
).
toString
(),
DateUtils
.
DATE_TIME_
TT_
PATTERN
);
Calendar
calendar
=
Calendar
.
getInstance
();
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTime
(
productDate
);
calendar
.
setTime
(
productDate
);
calendar
.
add
(
Calendar
.
YEAR
,
year
);
calendar
.
add
(
Calendar
.
YEAR
,
year
);
...
@@ -1995,9 +2000,24 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
...
@@ -1995,9 +2000,24 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
String
scrapTime
=
new
SimpleDateFormat
(
DateUtils
.
DATE_TIME_PATTERN
).
format
(
calendar
.
getTime
());
String
scrapTime
=
new
SimpleDateFormat
(
DateUtils
.
DATE_TIME_PATTERN
).
format
(
calendar
.
getTime
());
int
day
=
DateUtils
.
dateBetween
(
now
,
calendar
.
getTime
());
int
day
=
DateUtils
.
dateBetween
(
now
,
calendar
.
getTime
());
if
(
day
<
Integer
.
parseInt
(
equipmentScrapDay
)
&&
day
>
-
1
)
{
if
(
day
<
Integer
.
parseInt
(
equipmentScrapDay
)
&&
day
>
-
1
)
{
syncSystemctlMsg
(
e
,
scrapTime
,
day
);
syncSystemctlMsg
(
e
,
scrapTime
,
day
);
}
else
if
(
day
<=
-
1
){
}
else
if
(
day
==
-
1
){
// 发送emq消息转kafka
JSONObject
jsonObject
=
new
JSONObject
();
Map
<
String
,
String
>
map
=
new
HashMap
<>();
map
.
put
(
"id"
,
e
.
get
(
"id"
).
toString
());
map
.
put
(
"bizOrgCode"
,
e
.
get
(
"bizOrgCode"
).
toString
());
map
.
put
(
"bizOrgName"
,
e
.
get
(
"bizOrgName"
).
toString
());
jsonObject
.
put
(
"topic"
,
"equip/scrap/put"
);
jsonObject
.
put
(
"data"
,
JSONObject
.
toJSON
(
map
));
try
{
emqKeeper
.
getMqttClient
().
publish
(
"emq.scrap.qrcode.put"
,
jsonObject
.
toString
().
getBytes
(),
1
,
false
);
}
catch
(
MqttException
exp
)
{
log
.
info
(
String
.
format
(
"发送eqm转kafka消息失败:%s"
,
exp
.
getMessage
()));
}
idxFeign
.
handleEquipNotScrapWhenExpired
(
String
.
valueOf
(
e
.
get
(
"id"
)),
String
.
valueOf
(
e
.
get
(
"bizOrgCode"
)),
String
.
valueOf
(
e
.
get
(
"bizOrgName"
)));
idxFeign
.
handleEquipNotScrapWhenExpired
(
String
.
valueOf
(
e
.
get
(
"id"
)),
String
.
valueOf
(
e
.
get
(
"bizOrgCode"
)),
String
.
valueOf
(
e
.
get
(
"bizOrgName"
)));
}
}
}
}
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/MqttReceiveServiceImpl.java
View file @
c7cc8cc3
...
@@ -1010,6 +1010,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
...
@@ -1010,6 +1010,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
equipQrDateDto
.
setSource
(
"equip"
);
equipQrDateDto
.
setSource
(
"equip"
);
equipQrDateDto
.
setType
(
equipmentSpecificIndex
.
getTypeCode
());
equipQrDateDto
.
setType
(
equipmentSpecificIndex
.
getTypeCode
());
equipQrDateDto
.
setData
(
list
);
equipQrDateDto
.
setData
(
list
);
log
.
error
(
"调用规则JSON对象:{}"
,
JSONObject
.
toJSONString
(
equipQrDateDto
));
try
{
try
{
ruleTrigger
.
publish
(
equipQrDateDto
,
"中心配置赋码规则/update-qr-code"
,
null
);
ruleTrigger
.
publish
(
equipQrDateDto
,
"中心配置赋码规则/update-qr-code"
,
null
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/RiskSourceServiceImpl.java
View file @
c7cc8cc3
...
@@ -126,7 +126,7 @@ public class RiskSourceServiceImpl implements IRiskSourceService {
...
@@ -126,7 +126,7 @@ public class RiskSourceServiceImpl implements IRiskSourceService {
}
}
equipInfoVo
.
setId
(
equipmentSpecId
);
equipInfoVo
.
setId
(
equipmentSpecId
);
if
(
StringUtil
.
isNotEmpty
(
equipmentSpecId
))
{
if
(
StringUtil
.
isNotEmpty
(
equipmentSpecId
)
&&
!
source
.
equalsIgnoreCase
(
RiskSourceTypeEnum
.
PATROL
.
getName
())
)
{
EquipmentSpecific
specific
=
equipmentSpecificMapper
.
selectById
(
equipmentSpecId
);
EquipmentSpecific
specific
=
equipmentSpecificMapper
.
selectById
(
equipmentSpecId
);
equipInfoVo
.
setPosition
(
specific
.
getPosition
());
equipInfoVo
.
setPosition
(
specific
.
getPosition
());
equipInfoVo
.
setManufacturerName
(
getEquipmentDetailInfo
(
specific
.
getEquipmentDetailId
()).
getManufacturerName
());
equipInfoVo
.
setManufacturerName
(
getEquipmentDetailInfo
(
specific
.
getEquipmentDetailId
()).
getManufacturerName
());
...
...
amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/ScrapServiceImpl.java
View file @
c7cc8cc3
...
@@ -10,6 +10,7 @@ import com.yeejoin.equipmanage.common.entity.vo.EquipmentOnCarAppVO;
...
@@ -10,6 +10,7 @@ import com.yeejoin.equipmanage.common.entity.vo.EquipmentOnCarAppVO;
import
com.yeejoin.equipmanage.common.enums.*
;
import
com.yeejoin.equipmanage.common.enums.*
;
import
com.yeejoin.equipmanage.common.exception.BaseException
;
import
com.yeejoin.equipmanage.common.exception.BaseException
;
import
com.yeejoin.equipmanage.common.utils.StringUtil
;
import
com.yeejoin.equipmanage.common.utils.StringUtil
;
import
com.yeejoin.equipmanage.fegin.IdxFeign
;
import
com.yeejoin.equipmanage.mapper.CarMapper
;
import
com.yeejoin.equipmanage.mapper.CarMapper
;
import
com.yeejoin.equipmanage.mapper.EquipmentOnCarMapper
;
import
com.yeejoin.equipmanage.mapper.EquipmentOnCarMapper
;
import
com.yeejoin.equipmanage.mapper.ScrapMapper
;
import
com.yeejoin.equipmanage.mapper.ScrapMapper
;
...
@@ -56,6 +57,9 @@ public class ScrapServiceImpl extends ServiceImpl<ScrapMapper, Scrap> implements
...
@@ -56,6 +57,9 @@ public class ScrapServiceImpl extends ServiceImpl<ScrapMapper, Scrap> implements
@Autowired
@Autowired
private
EquipmentOnCarMapper
equipmentOnCarMapper
;
private
EquipmentOnCarMapper
equipmentOnCarMapper
;
@Autowired
private
IdxFeign
idxFeign
;
@Override
@Override
public
Scrap
create
(
List
<
ScrapDetail
>
list
,
String
type
,
AgencyUserModel
agencyUserModel
)
{
public
Scrap
create
(
List
<
ScrapDetail
>
list
,
String
type
,
AgencyUserModel
agencyUserModel
)
{
this
.
scrapCheck
(
type
,
list
);
this
.
scrapCheck
(
type
,
list
);
...
@@ -201,6 +205,10 @@ public class ScrapServiceImpl extends ServiceImpl<ScrapMapper, Scrap> implements
...
@@ -201,6 +205,10 @@ public class ScrapServiceImpl extends ServiceImpl<ScrapMapper, Scrap> implements
// stockDetail.setAmount(stockAmount.subtract(scrapAmount).doubleValue());
// stockDetail.setAmount(stockAmount.subtract(scrapAmount).doubleValue());
stockDetail
.
setAmount
(
stockAmount
.
doubleValue
());
stockDetail
.
setAmount
(
stockAmount
.
doubleValue
());
stockDetailList
.
add
(
stockDetail
);
stockDetailList
.
add
(
stockDetail
);
// 装备报废时消码为绿码
idxFeign
.
handleEquipScrapWhenExpired
(
String
.
valueOf
(
stockDetail
.
getEquipmentSpecificId
()));
}
}
stockDetailService
.
updateBatchById
(
stockDetailList
);
stockDetailService
.
updateBatchById
(
stockDetailList
);
}
}
...
...
amos-boot-system-equip/src/main/resources/mapper/EquipmentSpecificIndexMapper.xml
View file @
c7cc8cc3
...
@@ -492,7 +492,7 @@
...
@@ -492,7 +492,7 @@
left join wl_warehouse_structure str on str.id = wes.warehouse_structure_id
left join wl_warehouse_structure str on str.id = wes.warehouse_structure_id
left join wl_stock_detail wlsd on wes.id = wlsd.equipment_specific_id
left join wl_stock_detail wlsd on wes.id = wlsd.equipment_specific_id
where wed.production_date is not null
where wed.production_date is not null
and wlsd.status != 7
</select>
</select>
<select
id=
"getEquipIndexInIndex"
resultType=
"com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex"
>
<select
id=
"getEquipIndexInIndex"
resultType=
"com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex"
>
SELECT
SELECT
...
...
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/message/kafka/KafkaConsumerService.java
View file @
c7cc8cc3
...
@@ -25,9 +25,8 @@ import static com.yeejoin.amos.message.kafka.Constant.*;
...
@@ -25,9 +25,8 @@ import static com.yeejoin.amos.message.kafka.Constant.*;
@Slf4j
@Slf4j
@Service
@Service
public
class
KafkaConsumerService
{
public
class
KafkaConsumerService
{
private
static
final
String
MQTT_TOPIC
=
"romaSite/data/transmit"
;
private
static
final
String
MQTT_TOPIC_SHAOSHAN
=
"romaSite/data/shaoshan"
;
private
static
final
String
MQTT_TOPIC
=
"romaSite/data/transmit"
;
private
static
final
String
PROVINCE_MQTT_TOPIC
=
"province/data/transport"
;
private
static
final
String
PROVINCE_MQTT_TOPIC
=
"province/data/transport"
;
@Autowired
@Autowired
protected
EmqKeeper
emqKeeper
;
protected
EmqKeeper
emqKeeper
;
...
@@ -56,7 +55,7 @@ public class KafkaConsumerService {
...
@@ -56,7 +55,7 @@ public class KafkaConsumerService {
}
}
}
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"kafka失败,当前失败的批次: data:{}
"
,
consumerRecords
);
log
.
error
(
"kafka失败,当前失败的批次: data:{}
, {}"
,
consumerRecords
,
e
);
}
finally
{
}
finally
{
ack
.
acknowledge
();
ack
.
acknowledge
();
}
}
...
@@ -90,73 +89,33 @@ public class KafkaConsumerService {
...
@@ -90,73 +89,33 @@ public class KafkaConsumerService {
}
}
}
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"kafka失败,当前失败的批次: data:{}
"
,
consumerRecords
);
log
.
error
(
"kafka失败,当前失败的批次: data:{}
, {}"
,
consumerRecords
,
e
);
}
finally
{
}
finally
{
ack
.
acknowledge
();
ack
.
acknowledge
();
}
}
}
}
/**
/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
*
* @param message 消息
*
*/
@KafkaListener
(
id
=
"consumerSingle"
,
idIsGroup
=
false
,
topics
=
"#{'${kafka.topics}'.split(',')}"
,
concurrency
=
"2"
)
public
void
consumerSingle
(
String
message
,
Acknowledgment
ack
)
{
JSONObject
messageObj
=
JSONObject
.
fromObject
(
message
);
try
{
String
topic
=
messageObj
.
getString
(
"topic"
);
JSONObject
data
=
messageObj
.
getJSONObject
(
"data"
);
emqKeeper
.
getMqttClient
().
publish
(
topic
,
data
.
toString
().
getBytes
(
StandardCharsets
.
UTF_8
),
0
,
false
);
ack
.
acknowledge
();
}
catch
(
MqttException
e
)
{
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
}
}
/**
* 转发苏州,绍兴换流站Kafka数据对emq
* 转发苏州,绍兴换流站Kafka数据对emq
*
*
* @param record record
* @param record record
* 绍兴,苏州换流站对接Kafka数据
* @param ack ack
* @param record record
*/
*/
@KafkaListener
(
id
=
"kafkaRoma"
,
groupId
=
"kafkaRoma"
,
topics
=
"#{'${queue.kafka.topics}'.split(',')}"
,
containerFactory
=
"kafkaRomaContainerFactory"
)
@KafkaListener
(
id
=
"kafkaRoma"
,
groupId
=
"kafkaRoma"
,
topics
=
"#{'${queue.kafka.topics}'.split(',')}"
,
containerFactory
=
"kafkaRomaContainerFactory"
)
public
void
kafkaListener
(
ConsumerRecord
<?,
String
>
record
,
Acknowledgment
ack
)
{
public
void
kafkaListener
(
ConsumerRecord
<?,
String
>
record
,
Acknowledgment
ack
)
{
try
{
Optional
<?>
messages
=
Optional
.
ofNullable
(
record
.
value
());
Optional
<?>
messages
=
Optional
.
ofNullable
(
record
.
value
());
if
(
messages
.
isPresent
())
{
if
(
messages
.
isPresent
())
{
try
{
JSONObject
messageObj
=
JSONObject
.
fromObject
(
record
.
value
());
if
(
messageObj
.
getJSONObject
(
BODY
).
isEmpty
())
{
messageObj
.
put
(
DATA_TYPE
,
STATE
);
}
emqKeeper
.
getMqttClient
().
publish
(
MQTT_TOPIC
,
messageObj
.
toString
().
getBytes
(
StandardCharsets
.
UTF_8
),
0
,
false
);
}
catch
(
MqttException
e
)
{
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
}
finally
{
ack
.
acknowledge
();
}
}
}
/**
* 韶山换流对接Kafka
* @param record record
* @param ack ack
*/
@KafkaListener
(
id
=
"kafkaConsumer"
,
groupId
=
"kafkaConsumerGroup"
,
topics
=
"#{'${queue.kafka.shaoshan.topics}'.split(',')}"
,
containerFactory
=
"kafkaRomaContainerFactory"
)
public
void
kafkaConsumer
(
ConsumerRecord
<?,
String
>
record
,
Acknowledgment
ack
)
{
Optional
<?>
message
=
Optional
.
ofNullable
(
record
.
value
());
if
(
message
.
isPresent
())
{
try
{
JSONObject
messageObj
=
JSONObject
.
fromObject
(
record
.
value
());
JSONObject
messageObj
=
JSONObject
.
fromObject
(
record
.
value
());
JSONObject
data
=
messageObj
.
getJSONObject
(
"body"
);
if
(
messageObj
.
getJSONObject
(
BODY
).
isEmpty
())
{
emqKeeper
.
getMqttClient
().
publish
(
MQTT_TOPIC_SHAOSHAN
,
data
.
toString
().
getBytes
(
StandardCharsets
.
UTF_8
),
0
,
false
);
messageObj
.
put
(
DATA_TYPE
,
STATE
);
ack
.
acknowledge
();
}
}
catch
(
MqttException
e
)
{
emqKeeper
.
getMqttClient
().
publish
(
MQTT_TOPIC
,
messageObj
.
toString
().
getBytes
(
StandardCharsets
.
UTF_8
),
0
,
false
);
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
}
}
}
catch
(
MqttException
e
)
{
log
.
error
(
"换流站转发Kafka消息失败"
+
e
.
getMessage
(),
e
);
}
finally
{
ack
.
acknowledge
();
}
}
}
}
...
@@ -253,6 +212,7 @@ public class KafkaConsumerService {
...
@@ -253,6 +212,7 @@ public class KafkaConsumerService {
// }
// }
//
//
//
//
//
//kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
//kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
//@KafkaListener(topics = "test", groupId = "zhTestGroup")
//@KafkaListener(topics = "test", groupId = "zhTestGroup")
//public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
//public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
...
@@ -262,14 +222,6 @@ public class KafkaConsumerService {
...
@@ -262,14 +222,6 @@ public class KafkaConsumerService {
// //手动提交offset
// //手动提交offset
// ack.acknowledge();
// ack.acknowledge();
//}
//}
// //kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
// @KafkaListener(topics = "test", groupId = "zhTestGroup")
// public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
// String value = record.value();
// System.out.println(value);
// System.out.println(record);
// //手动提交offset
// ack.acknowledge();
// }
}
}
amos-boot-utils/amos-boot-utils-message/src/main/resources/application-dev.properties
View file @
c7cc8cc3
...
@@ -88,7 +88,7 @@ emqx.max-inflight=1000
...
@@ -88,7 +88,7 @@ emqx.max-inflight=1000
kafka.topics
=
JKXT2BP-XFZX-Topic
kafka.topics
=
JKXT2BP-XFZX-Topic
#\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
#\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
emq.topic
=
emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created,emq.mcb.zxj
emq.topic
=
emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created,emq.mcb.zxj
,emq.scrap.qrcode.put
##\u4E2D\u5FC3\u7EA7\u914D\u7F6E\u914D\u7F6E
##\u4E2D\u5FC3\u7EA7\u914D\u7F6E\u914D\u7F6E
##\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E
##\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E
...
...
amos-boot-utils/amos-boot-utils-message/src/main/resources/json/topic.json
View file @
c7cc8cc3
...
@@ -44,6 +44,12 @@
...
@@ -44,6 +44,12 @@
"emqTopic"
:
"emq.mcb.zxj"
,
"emqTopic"
:
"emq.mcb.zxj"
,
"akkaTopic"
:
"JKXT2BP-XFYY-Topic"
"akkaTopic"
:
"JKXT2BP-XFYY-Topic"
},
},
,
{
"code"
:
"equipScrapQrcodePut"
,
"emqTopic"
:
"emq.scrap.qrcode.put"
,
"akkaTopic"
:
"JKXT2BP-XFYY-Topic"
},
{
{
"code"
:
"mcbsubmit"
,
"code"
:
"mcbsubmit"
,
"emqTopic"
:
"risk.submit"
,
"emqTopic"
:
"risk.submit"
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment