Commit 8196356b authored by 刘林's avatar 刘林

fix(message):message服务添加Kafka多数据源,以及配置消息转发

parent b646432f
......@@ -2,14 +2,16 @@ package com.yeejoin.amos.message.kafka;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
/**
* kafka 消费服务
......@@ -46,6 +48,26 @@ public class KafkaConsumerService {
ack.acknowledge();
}
@KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) {
try {
String topic = "romaSite/data/transmit";
JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body");
if (data.size() == 0){
data = messageObj;
}
log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} catch (MqttException e) {
e.printStackTrace();
}
}
ack.acknowledge();
}
/* @KafkaListener(id = "consumerBatch", topicPartitions = {
@TopicPartition(topic = "hello-batch1", partitions = "0"),
......
//package com.yeejoin.amos.message.kafka.config;
//
//
//import org.apache.kafka.clients.admin.NewTopic;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
//import java.util.Arrays;
//
///**
// * topic初始化
// *
// * @author litw
// * @create 2022/11/1 10:06
// */
//@Configuration class KafkaConfig {
//
// @Value("${kafka.init.topics}")
// private String topics;
//
// /**
// * 创建一个名为topic.test的Topic并设置分区数为8,分区副本数为2
// */
// @Bean public void initialTopic() {
// String[] split = topics.split(",");
// Arrays.stream(split).forEach(e->{
// new NewTopic(e, 8, (short) 2);
// });
// }
//
//
//}
\ No newline at end of file
package com.yeejoin.amos.message.kafka.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
/**
* topic初始化
*
* @author litw
* @create 2022/11/1 10:06
*/
@Configuration
class KafkaConfig {
@Value("${queue.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${queue.kafka.consumer.group-id}")
private String groupId;
@Value("${queue.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${queue.kafka.ssl.enabled:false}")
private boolean sslEnabled;
@Value("${queue.kafka.ssl.truststore.location:}")
private String sslTruststoreLocation;
@Value("${queue.kafka.ssl.truststore.password:}")
private String sslTruststorePassword;
@Value("${queue.kafka.confluent.sasl.jaas.config:}")
private String saslConfig;
@Value("${queue.kafka.confluent.sasl.mechanism:}")
private String saslMechanism;
@Value("${queue.kafka.confluent.security.protocol:}")
private String securityProtocol;
@Value("${queue.kafka.confluent.ssl.algorithm:}")
private String sslAlgorithm;
@Bean(name = "kafkaRomaTemplate")
public KafkaTemplate<String, String> kafkaRomaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean(name = "kafkaRomaContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaEsContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
//Listener配置
factory.getContainerProperties()
.setPollTimeout(3000);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties()
.setPollTimeout(15000);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (sslEnabled) {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
props.put("sasl.mechanism", saslMechanism);
props.put("sasl.jaas.config", saslConfig);
props.put("ssl.endpoint.identification.algorithm", sslAlgorithm);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put("max.poll.records", 8);
props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000");
}
return props;
}
}
\ No newline at end of file
......@@ -93,4 +93,10 @@ emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq
#kafka.topics=JKXT2BP-XFYY-Topic
#
##需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created,
#emq.topic=
\ No newline at end of file
#emq.topic=
#自定义Kafka配置对接交换站
queue.kafka.bootstrap-servers=121.199.39.218:9092
queue.kafka.consumer.group-id=kafkaRoma
queue.kafka.consumer.enable-auto-commit=false
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
\ No newline at end of file
......@@ -112,4 +112,22 @@ emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq
##kafka.topics=JKXT2BP-XFYY-Topic
#需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
##emq.topic=
#中心级配置配置结束
\ No newline at end of file
#中心级配置配置结束
#自定义Kafka消息配置对接交换站
queue.kafka.bootstrap-servers=192.168.4.214:29093,192.168.4.215:29093,192.168.4.216:29093
queue.kafka.consumer.group-id=kafkaRoma
queue.kafka.consumer.enable-auto-commit=false
# 是否开启消费者SSL
queue.kafka.ssl.enabled=true
queue.kafka.ssl.truststore.location=D:/client.truststore.jks
queue.kafka.ssl.truststore.password=dms@kafka
queue.kafka.confluent.security.protocol=SASL_SSL
queue.kafka.confluent.sasl.mechanism=PLAIN
queue.kafka.confluent.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="io.cs" \
password="=4#4x%pN$Ky2+X5.54ZS+/8WL2Pyu@916--/ycV3.9Bzkq6CZKt!7OZ1uRCPwt65";
queue.kafka.confluent.ssl.algorithm=
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment