Commit b70a2fdd authored by 刘林's avatar 刘林

fix(equip):添加开关控制多数据源Kafka配置信息

parent 314cf33d
...@@ -9,6 +9,7 @@ import org.springframework.kafka.annotation.KafkaListener; ...@@ -9,6 +9,7 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Optional; import java.util.Optional;
...@@ -26,6 +27,8 @@ public class KafkaConsumerService { ...@@ -26,6 +27,8 @@ public class KafkaConsumerService {
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
private static final String MQTT_TOPIC = "romaSite/data/transmit";
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息 * @param message 消息
...@@ -53,7 +56,6 @@ public class KafkaConsumerService { ...@@ -53,7 +56,6 @@ public class KafkaConsumerService {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) { if (messages.isPresent()) {
try { try {
String topic = "romaSite/data/transmit";
JSONObject messageObj = JSONObject.fromObject(record.value()); JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body"); JSONObject data = messageObj.getJSONObject("body");
if (data.size() == 0){ if (data.size() == 0){
...@@ -61,7 +63,7 @@ public class KafkaConsumerService { ...@@ -61,7 +63,7 @@ public class KafkaConsumerService {
data.put("datatype","state"); data.put("datatype","state");
} }
log.info("接收到Roma消息对象: {}", data); log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); e.printStackTrace();
} }
......
...@@ -2,18 +2,18 @@ package com.yeejoin.amos.message.kafka.config; ...@@ -2,18 +2,18 @@ package com.yeejoin.amos.message.kafka.config;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -26,13 +26,16 @@ import java.util.Map; ...@@ -26,13 +26,16 @@ import java.util.Map;
@Configuration @Configuration
class KafkaConfig { class KafkaConfig {
@Value("${queue.kafka.bootstrap-servers}") @Value("${kafka.auto-startup:false}")
private boolean autoStartup;
@Value("${queue.kafka.bootstrap-servers:}")
private String bootstrapServers; private String bootstrapServers;
@Value("${queue.kafka.consumer.group-id}") @Value("${queue.kafka.consumer.group-id:}")
private String groupId; private String groupId;
@Value("${queue.kafka.consumer.enable-auto-commit}") @Value("${queue.kafka.consumer.enable-auto-commit:false}")
private boolean enableAutoCommit; private boolean enableAutoCommit;
@Value("${queue.kafka.ssl.enabled:false}") @Value("${queue.kafka.ssl.enabled:false}")
...@@ -56,46 +59,29 @@ class KafkaConfig { ...@@ -56,46 +59,29 @@ class KafkaConfig {
@Value("${queue.kafka.confluent.ssl.algorithm:}") @Value("${queue.kafka.confluent.ssl.algorithm:}")
private String sslAlgorithm; private String sslAlgorithm;
@Bean(name = "kafkaRomaTemplate") @Value("${queue.kafka.max.poll.records:}")
public KafkaTemplate<String, String> kafkaRomaTemplate() { private String maxPollRecords;
return new KafkaTemplate<>(producerFactory());
}
@Bean(name = "kafkaRomaContainerFactory") @Bean(name = "kafkaRomaContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaEsContainerFactory() { KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaEsContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); factory.setConcurrency(3);
//Listener配置
factory.getContainerProperties() factory.getContainerProperties()
.setPollTimeout(3000); .setPollTimeout(3000);
factory.getContainerProperties() factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties() factory.getContainerProperties()
.setPollTimeout(15000); .setPollTimeout(15000);
// 禁止消费者监听器自启动
factory.setAutoStartup(autoStartup);
return factory; return factory;
} }
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() { public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs()); return new DefaultKafkaConsumerFactory<>(consumerConfigs());
} }
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() { private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(); Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
...@@ -111,7 +97,7 @@ class KafkaConfig { ...@@ -111,7 +97,7 @@ class KafkaConfig {
props.put("sasl.jaas.config", saslConfig); props.put("sasl.jaas.config", saslConfig);
props.put("ssl.endpoint.identification.algorithm", sslAlgorithm); props.put("ssl.endpoint.identification.algorithm", sslAlgorithm);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put("max.poll.records", 8); props.put("max.poll.records", maxPollRecords);
props.put("max.poll.interval.ms", "30000"); props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000"); props.put("session.timeout.ms", "30000");
} }
......
...@@ -96,7 +96,5 @@ emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq ...@@ -96,7 +96,5 @@ emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq
#emq.topic= #emq.topic=
#自定义Kafka配置对接交换站 #自定义Kafka配置对接交换站
queue.kafka.bootstrap-servers=121.199.39.218:9092 queue.kafka.topics=null
queue.kafka.consumer.group-id=kafkaRoma kafka.auto-startup=false
queue.kafka.consumer.enable-auto-commit=false \ No newline at end of file
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
\ No newline at end of file
...@@ -130,4 +130,6 @@ queue.kafka.confluent.sasl.jaas.config=org.apache.kafka.common.security.plain.Pl ...@@ -130,4 +130,6 @@ queue.kafka.confluent.sasl.jaas.config=org.apache.kafka.common.security.plain.Pl
username="io.cs" \ username="io.cs" \
password="=4#4x%pN$Ky2+X5.54ZS+/8WL2Pyu@916--/ycV3.9Bzkq6CZKt!7OZ1uRCPwt65"; password="=4#4x%pN$Ky2+X5.54ZS+/8WL2Pyu@916--/ycV3.9Bzkq6CZKt!7OZ1uRCPwt65";
queue.kafka.confluent.ssl.algorithm= queue.kafka.confluent.ssl.algorithm=
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
\ No newline at end of file queue.kafka.max.poll.records=500
kafka.auto-startup=true
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment