Commit f3171b87 authored by 张森's avatar 张森

activeMQ接收消息 更改为 kafka接收

parent ada001db
......@@ -27,6 +27,12 @@
</exclusion>
</exclusions>
</dependency>
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>
package com.yeejoin.amos.boot.module.jcs.biz.activeMq;
import com.yeejoin.amos.boot.module.jcs.api.service.ICcsToStationUserInfo;
import com.yeejoin.amos.boot.module.jcs.biz.config.StartLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.TextMessage;
/**
* @author DELL
*/
@Component
public class QueueConsumerService {
private final Logger logger = LoggerFactory.getLogger(StartLoader.class);
@Autowired
ICcsToStationUserInfo ccsToStationUserInfo;
@JmsListener(destination = "amos.privilege.v1.STATE_GRID.AMOS_ADMIN.login", containerFactory = "jmsListenerContainerQueue")
public void message(TextMessage textMessage) {
try {
if(textMessage == null || textMessage.getText() == null){
return;
}
logger.info("收到activeMQ发送登录信息>>{}", textMessage.getText());
ccsToStationUserInfo.sendUserInfoToStation(textMessage.getText());
} catch (Exception e) {
logger.error("消费activeMQ发送登录信息失败,{}", e.getMessage());
}
}
}
package com.yeejoin.amos.boot.module.jcs.biz.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
/**
*
* @author DELL
*/
@Configuration
@IntegrationComponentScan
public class ActiveMqQueueConfig {
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
/**
* 连接MQ服务端
* @return
*/
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
}
/**
* MQ的队列监听
* @param connectionFactory
* @return
*/
@Bean("jmsListenerContainerQueue")
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
// 启用queue 不启用topic
bean.setPubSubDomain(false);
return bean;
}
}
package com.yeejoin.amos.boot.module.jcs.biz.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
/**
* topic初始化
*
* @author litw
* @create 2022/11/1 10:06
*/
@Configuration
class KafkaConfig {
@Value("${kafka.auto-startup:false}")
private boolean autoStartup;
@Value("${queue.kafka.bootstrap-servers:}")
private String bootstrapServers;
@Value("${queue.kafka.consumer.group-id:}")
private String groupId;
@Value("${queue.kafka.consumer.enable-auto-commit:false}")
private boolean enableAutoCommit;
@Value("${queue.kafka.ssl.enabled:false}")
private boolean sslEnabled;
@Value("${queue.kafka.ssl.truststore.location:}")
private String sslTruststoreLocation;
@Value("${queue.kafka.ssl.truststore.password:}")
private String sslTruststorePassword;
@Value("${queue.kafka.confluent.sasl.jaas.config:}")
private String saslConfig;
@Value("${queue.kafka.confluent.sasl.mechanism:}")
private String saslMechanism;
@Value("${queue.kafka.confluent.security.protocol:}")
private String securityProtocol;
@Value("${queue.kafka.confluent.ssl.algorithm:}")
private String sslAlgorithm;
@Value("${queue.kafka.max.poll.records:}")
private String maxPollRecords;
@Bean(name = "kafkaRomaContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaEsContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties()
.setPollTimeout(3000);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties()
.setPollTimeout(15000);
// 禁止消费者监听器自启动
factory.setAutoStartup(autoStartup);
return factory;
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (sslEnabled) {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
props.put("sasl.mechanism", saslMechanism);
props.put("sasl.jaas.config", saslConfig);
props.put("ssl.endpoint.identification.algorithm", sslAlgorithm);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put("max.poll.records", maxPollRecords);
props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000");
}
return props;
}
}
\ No newline at end of file
package com.yeejoin.amos.boot.module.jcs.biz.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
/**
* kafka 消费者配置类
*
* @author Leo
* @create 2020/12/31 15:09
**/
@Slf4j
@Configuration
public class KafkaConsumerConfiguration {
/**
* 消费异常处理器
* @return
*/
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//打印消费异常的消息和异常信息
log.error("消费异常的消息 failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId());
return null;
}
};
}
}
package com.yeejoin.amos.boot.module.jcs.biz.kafka;
import com.yeejoin.amos.boot.module.jcs.api.service.ICcsToStationUserInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.util.Optional;
/**
* kafka 消费服务
*
* @author litw
* @create 2022/11/1 10:06
**/
@Slf4j
@Service
public class KafkaConsumerService {
@Autowired
ICcsToStationUserInfo ccsToStationUserInfo;
/**
* 转发苏州,绍兴换流站Kafka数据对emq
*
* @param record record
* @param ack ack
*/
@KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "amos.privilege.v1.STATE_GRID.AMOS_ADMIN.login", containerFactory = "kafkaRomaContainerFactory")
public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) {
try {
Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) {
log.info("收到kafka消息发送登录信息>>{}", record.value());
ccsToStationUserInfo.sendUserInfoToStation(record.value());
}
} catch (Exception e) {
log.error("消费kafka消息发送登录信息失败,{}", e.getMessage());
} finally {
ack.acknowledge();
}
}
}
......@@ -18,7 +18,7 @@ spring.redis.password=ENC(lWqTEY3X3h2PiIxJKR073v5L9aiF6MWZlhglsLr+8QrIGeV3M28y+B
## ES properties:
biz.elasticsearch.address=172.16.11.201
biz.elasticsearch.address=172.16.10.245
spring.data.elasticsearch.cluster-name=elasticsearch
spring.data.elasticsearch.cluster-nodes=${biz.elasticsearch.address}:9300
spring.elasticsearch.rest.uris=http://${biz.elasticsearch.address}:9200
......@@ -32,17 +32,6 @@ emqx.broker=tcp://172.16.10.216:1883
emqx.user-name=admin
emqx.password=ENC(yVcJLlSl9/CXSEMT/SjcyzaLAvGU4o8OhU0AVnaF40Olfvo9kT+VxykM6bunDzcb)
#activeMq
spring.activemq.broker-url=tcp://172.16.10.216:61616
spring.activemq.user=admin
spring.activemq.password=ENC(yVcJLlSl9/CXSEMT/SjcyzaLAvGU4o8OhU0AVnaF40Olfvo9kT+VxykM6bunDzcb)
spring.jms.pub-sub-domain=false
#启用连接池
spring.activemq.pool.enabled=true
#最大连接数
spring.activemq.pool.max-connections=100
spring.activemq.packages.trust-all=true
## ???? properties:
rule.definition.load=false
rule.definition.model-package=com.yeejoin.amos.boot.module.jcs.api.dto
......@@ -64,4 +53,16 @@ privilege.fegin.name=AMOS-API-PRIVILEGE
feign.client.config.default.connect-timeout=20000
feign.client.config.default.read-timeout=20000
auth-key-auth-enabled=auth-enabled
\ No newline at end of file
auth-key-auth-enabled=auth-enabled
spring.kafka.consumer.group-id=zhTestGroup
spring.kafka.consumer.bootstrap-servers=172.16.10.241:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.fetch-max-wait= 1000
\ No newline at end of file
......@@ -196,11 +196,6 @@
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!--ActiveMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- knife4j -->
<dependency>
<groupId>com.github.xiaoymin</groupId>
......@@ -306,17 +301,17 @@
<repository>
<id>public</id>
<name>public</name>
<url>http://113.142.68.105:8081/nexus/content/repositories/public/</url>
<url>http://47.92.103.240:8081/nexus/content/repositories/public/</url>
</repository>
<repository>
<id>Releases</id>
<name>Releases</name>
<url>http://113.142.68.105:8081/nexus/content/repositories/releases/</url>
<url>http://47.92.103.240:8081/nexus/content/repositories/releases/</url>
</repository>
<repository>
<id>Snapshots</id>
<name>Snapshots</name>
<url>http://113.142.68.105:8081/nexus/content/repositories/snapshots/</url>
<url>http://47.92.103.240:8081/nexus/content/repositories/snapshots/</url>
</repository>
<repository>
<id>com.e-iceblue</id>
......@@ -326,7 +321,7 @@
<repository>
<id>thirdparty</id>
<name>thirdparty</name>
<url>http://113.142.68.105:8081/nexus/content/repositories/thirdparty/</url>
<url>http://47.92.103.240:8081/nexus/content/repositories/thirdparty/</url>
</repository>
</repositories>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment