Commit e7772aa9 authored by litengwei's avatar litengwei

任务 11836

parent c66d7846
...@@ -28,7 +28,7 @@ public class KafkaConsumerService { ...@@ -28,7 +28,7 @@ public class KafkaConsumerService {
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息 * @param message 消息
*/ */
@KafkaListener(id = "consumerSingle", topics = "#{'${kafka.topics}'.split(',')}") @KafkaListener(id = "consumerSingle", topics = "#{'${kafka.topics}'.split(',')}", errorHandler = "consumerAwareListenerErrorHandler")
public void consumerSingle(String message,Acknowledgment ack) { public void consumerSingle(String message,Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message); JSONObject messageObj = JSONObject.fromObject(message);
String topic = messageObj.getString("topic"); String topic = messageObj.getString("topic");
......
package com.yeejoin.amos.message.kafka.config; //package com.yeejoin.amos.message.kafka.config;
//
//
import org.apache.kafka.clients.admin.NewTopic; //import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value; //import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; //import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; //import org.springframework.context.annotation.Configuration;
//
import java.util.Arrays; //import java.util.Arrays;
//
/** ///**
* topic初始化 // * topic初始化
* // *
* @author litw // * @author litw
* @create 2022/11/1 10:06 // * @create 2022/11/1 10:06
*/ // */
@Configuration class KafkaConfig { //@Configuration class KafkaConfig {
//
@Value("${kafka.init.topics}") // @Value("${kafka.init.topics}")
private String topics; // private String topics;
//
/** // /**
* 创建一个名为topic.test的Topic并设置分区数为8,分区副本数为2 // * 创建一个名为topic.test的Topic并设置分区数为8,分区副本数为2
*/ // */
@Bean public void initialTopic() { // @Bean public void initialTopic() {
String[] split = topics.split(","); // String[] split = topics.split(",");
Arrays.stream(split).forEach(e->{ // Arrays.stream(split).forEach(e->{
new NewTopic(e, 8, (short) 2); // new NewTopic(e, 8, (short) 2);
}); // });
} // }
//
//
} //}
\ No newline at end of file \ No newline at end of file
...@@ -78,9 +78,17 @@ emqx.client-password=public ...@@ -78,9 +78,17 @@ emqx.client-password=public
emqx.max-inflight=1000 emqx.max-inflight=1000
# 下面个配置默认站端 中心级系统的时候注释掉上边 放开下边
#站端配置
#需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 #需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
kafka.topics=null.topic kafka.topics=null.topic
kafka.init.topics=
#需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created, #需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created,
emq.topic=emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created
\ No newline at end of file
##中心级配置配置
##需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
#kafka.topics=JKXT2BP-XFYY-Topic
#
##需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created,
#emq.topic=
\ No newline at end of file
...@@ -98,9 +98,17 @@ emqx.client-password=public ...@@ -98,9 +98,17 @@ emqx.client-password=public
emqx.max-inflight=1000 emqx.max-inflight=1000
# 下面个配置默认站端 中心级系统的时候注释掉上边 放开下边
#站端配置
#需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 #需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
kafka.topics=null.topic kafka.topics=null.topic
kafka.init.topics=
#需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created, #需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created,
emq.topic=emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created
\ No newline at end of file
##中心级配置配置
##需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
#kafka.topics=JKXT2BP-XFYY-Topic
#
##需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created,
#emq.topic=
\ No newline at end of file
...@@ -2,26 +2,31 @@ ...@@ -2,26 +2,31 @@
{ {
"code": "iot", "code": "iot",
"emqTopic": "emq.iot.created", "emqTopic": "emq.iot.created",
"akkaTopic": "akka.iot.created" "akkaTopic": "JKXT2BP-XFYY-Topic"
}, },
{ {
"code": "patrol", "code": "patrol",
"emqTopic": "emq.patrol.created", "emqTopic": "emq.patrol.created",
"akkaTopic": "akka.patrol.created" "akkaTopic": "JKXT2BP-XFYY-Topic"
}, },
{ {
"code": "sign", "code": "sign",
"emqTopic": "emq.sign.created", "emqTopic": "emq.sign.created",
"akkaTopic": "akka.sign.created" "akkaTopic": "JKXT2BP-XFYY-Topic"
}, },
{ {
"code": "bussSign", "code": "bussSign",
"emqTopic": "emq.bussSign.created", "emqTopic": "emq.bussSign.created",
"akkaTopic": "akka.bussSign.created" "akkaTopic": "JKXT2BP-XFYY-Topic"
}, },
{ {
"code": "user", "code": "user",
"emqTopic": "emq.user.created", "emqTopic": "emq.user.created",
"akkaTopic": "akka.user.created" "akkaTopic": "JKXT2BP-XFYY-Topic"
},
{
"code": "xf",
"emqTopic": "emq.xf.created",
"akkaTopic": "JKXT2BP-XF-Topic"
} }
] ]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment