Commit e61a2dc9 authored by litengwei's avatar litengwei

Merge remote-tracking branch 'origin/develop_dl_3.7.0.9' into develop_dl_3.7.0.9

parents c97f4f50 b70a2fdd
......@@ -9,6 +9,7 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
......@@ -26,6 +27,8 @@ public class KafkaConsumerService {
@Autowired
protected EmqKeeper emqKeeper;
private static final String MQTT_TOPIC = "romaSite/data/transmit";
/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息
......@@ -53,7 +56,6 @@ public class KafkaConsumerService {
Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) {
try {
String topic = "romaSite/data/transmit";
JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body");
if (data.size() == 0){
......@@ -61,7 +63,7 @@ public class KafkaConsumerService {
data.put("datatype","state");
}
log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} catch (MqttException e) {
e.printStackTrace();
}
......
......@@ -2,18 +2,18 @@ package com.yeejoin.amos.message.kafka.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
......@@ -26,13 +26,16 @@ import java.util.Map;
@Configuration
class KafkaConfig {
@Value("${queue.kafka.bootstrap-servers}")
@Value("${kafka.auto-startup:false}")
private boolean autoStartup;
@Value("${queue.kafka.bootstrap-servers:}")
private String bootstrapServers;
@Value("${queue.kafka.consumer.group-id}")
@Value("${queue.kafka.consumer.group-id:}")
private String groupId;
@Value("${queue.kafka.consumer.enable-auto-commit}")
@Value("${queue.kafka.consumer.enable-auto-commit:false}")
private boolean enableAutoCommit;
@Value("${queue.kafka.ssl.enabled:false}")
......@@ -56,46 +59,29 @@ class KafkaConfig {
@Value("${queue.kafka.confluent.ssl.algorithm:}")
private String sslAlgorithm;
@Bean(name = "kafkaRomaTemplate")
public KafkaTemplate<String, String> kafkaRomaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Value("${queue.kafka.max.poll.records:}")
private String maxPollRecords;
@Bean(name = "kafkaRomaContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaEsContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
//Listener配置
factory.getContainerProperties()
.setPollTimeout(3000);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties()
.setPollTimeout(15000);
// 禁止消费者监听器自启动
factory.setAutoStartup(autoStartup);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
......@@ -111,7 +97,7 @@ class KafkaConfig {
props.put("sasl.jaas.config", saslConfig);
props.put("ssl.endpoint.identification.algorithm", sslAlgorithm);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put("max.poll.records", 8);
props.put("max.poll.records", maxPollRecords);
props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000");
}
......
......@@ -96,7 +96,5 @@ emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq
#emq.topic=
#自定义Kafka配置对接交换站
queue.kafka.bootstrap-servers=121.199.39.218:9092
queue.kafka.consumer.group-id=kafkaRoma
queue.kafka.consumer.enable-auto-commit=false
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
\ No newline at end of file
queue.kafka.topics=null
kafka.auto-startup=false
\ No newline at end of file
......@@ -130,4 +130,6 @@ queue.kafka.confluent.sasl.jaas.config=org.apache.kafka.common.security.plain.Pl
username="io.cs" \
password="=4#4x%pN$Ky2+X5.54ZS+/8WL2Pyu@916--/ycV3.9Bzkq6CZKt!7OZ1uRCPwt65";
queue.kafka.confluent.ssl.algorithm=
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
\ No newline at end of file
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
queue.kafka.max.poll.records=500
kafka.auto-startup=true
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment