Commit 8e7bd069 authored by 张森's avatar 张森

message服务优化配置项

parent 0877e5df
......@@ -74,6 +74,7 @@ public class KafkaConsumerService implements ApplicationRunner {
*/
@KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) {
log.info("kafka上报数据:{}", JSON.toJSONString(record));
try {
Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) {
......@@ -92,25 +93,6 @@ public class KafkaConsumerService implements ApplicationRunner {
}
}
@KafkaListener(id = "kafkaConsumerEventAlarm", groupId = "kafkaConsumerGroupEventAlarm", topics = "#{'${queue.kafka.eventAlarm.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaConsumerEventAlarm(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
try {
JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
}
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
*
......
......@@ -109,10 +109,13 @@ system.zxj=false
##\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
#emq.topic=ccs-user-login-info,sync.execute,data/mcb/warning,emq.risk.qrcode.put,emq.risk.qrcode.clean
queue.kafka.topics=null
kafka.auto-startup=false
# 现场kafka消息 topic 遥测遥信告警都 配在这个配置中(英文逗号隔开即可)
# 原告警topic配置删除queue.kafka.eventAlarm.topics
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
#浜嬩欢鍛婅瀵规帴Kafka涓婚
queue.kafka.eventAlarm.topics=JKXT2BP-GJ-Topic-site
kafka.auto-startup=false
kafka.station.groupId=shaoxing
#F:\\filterExcel11.xlsx 读取excel文件过滤kafka消息 如不需要将该配置置空,或者不添加该配置即可
filter.excel.path=F:\\filterExcel.xlsx
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment