Commit ee16d6e6 authored by 刘林's avatar 刘林

fix(equip):添加开关控制Kafka多数据源自启动配置信息

parent 9a3c761b
...@@ -916,9 +916,7 @@ ...@@ -916,9 +916,7 @@
</sql> </sql>
</changeSet> </changeSet>
<!--<changeSet author="20230620" id="20230620-1" runAlways="true">
<changeSet author="20230620" id="20230620-1" runAlways="true">
<comment>`dz_point_system`</comment> <comment>`dz_point_system`</comment>
<sql endDelimiter="#"> <sql endDelimiter="#">
CREATE TABLE `dz_point_system` ( CREATE TABLE `dz_point_system` (
...@@ -935,7 +933,7 @@ ...@@ -935,7 +933,7 @@
PRIMARY KEY (`id`) USING BTREE PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic; ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
</sql> </sql>
</changeSet> </changeSet>-->
<changeSet author="20230629" id="20230629-1"> <changeSet author="20230629" id="20230629-1">
<preConditions onFail="MARK_RAN"> <preConditions onFail="MARK_RAN">
......
...@@ -321,8 +321,8 @@ ...@@ -321,8 +321,8 @@
si.equipment_specific_id AS equipmentId, si.equipment_specific_id AS equipmentId,
ei.id, ei.id,
ei.name_key, ei.name_key,
ei.`name` AS perfQuotaName, ei.name AS perfQuotaName,
si.`value`, si.value,
ei.is_iot, ei.is_iot,
si.unit AS unitName, si.unit AS unitName,
ei.sort_num, ei.sort_num,
...@@ -333,7 +333,8 @@ ...@@ -333,7 +333,8 @@
si.data_type, si.data_type,
si.equipment_specific_name, si.equipment_specific_name,
si.equipment_index_name, si.equipment_index_name,
si.is_alarm si.is_alarm,
si.value_enum AS valueEnum
FROM FROM
wl_equipment_specific_index si wl_equipment_specific_index si
LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id
......
...@@ -9,6 +9,7 @@ import org.springframework.kafka.annotation.KafkaListener; ...@@ -9,6 +9,7 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Optional; import java.util.Optional;
...@@ -26,6 +27,8 @@ public class KafkaConsumerService { ...@@ -26,6 +27,8 @@ public class KafkaConsumerService {
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
private static final String MQTT_TOPIC = "romaSite/data/transmit";
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息 * @param message 消息
...@@ -53,7 +56,6 @@ public class KafkaConsumerService { ...@@ -53,7 +56,6 @@ public class KafkaConsumerService {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) { if (messages.isPresent()) {
try { try {
String topic = "romaSite/data/transmit";
JSONObject messageObj = JSONObject.fromObject(record.value()); JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body"); JSONObject data = messageObj.getJSONObject("body");
if (data.size() == 0){ if (data.size() == 0){
...@@ -61,7 +63,7 @@ public class KafkaConsumerService { ...@@ -61,7 +63,7 @@ public class KafkaConsumerService {
data.put("datatype","state"); data.put("datatype","state");
} }
log.info("接收到Roma消息对象: {}", data); log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); e.printStackTrace();
} }
......
...@@ -2,18 +2,18 @@ package com.yeejoin.amos.message.kafka.config; ...@@ -2,18 +2,18 @@ package com.yeejoin.amos.message.kafka.config;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -26,13 +26,16 @@ import java.util.Map; ...@@ -26,13 +26,16 @@ import java.util.Map;
@Configuration @Configuration
class KafkaConfig { class KafkaConfig {
@Value("${queue.kafka.bootstrap-servers}") @Value("${kafka.auto-startup:false}")
private boolean autoStartup;
@Value("${queue.kafka.bootstrap-servers:}")
private String bootstrapServers; private String bootstrapServers;
@Value("${queue.kafka.consumer.group-id}") @Value("${queue.kafka.consumer.group-id:}")
private String groupId; private String groupId;
@Value("${queue.kafka.consumer.enable-auto-commit}") @Value("${queue.kafka.consumer.enable-auto-commit:false}")
private boolean enableAutoCommit; private boolean enableAutoCommit;
@Value("${queue.kafka.ssl.enabled:false}") @Value("${queue.kafka.ssl.enabled:false}")
...@@ -56,46 +59,29 @@ class KafkaConfig { ...@@ -56,46 +59,29 @@ class KafkaConfig {
@Value("${queue.kafka.confluent.ssl.algorithm:}") @Value("${queue.kafka.confluent.ssl.algorithm:}")
private String sslAlgorithm; private String sslAlgorithm;
@Bean(name = "kafkaRomaTemplate") @Value("${queue.kafka.max.poll.records:}")
public KafkaTemplate<String, String> kafkaRomaTemplate() { private String maxPollRecords;
return new KafkaTemplate<>(producerFactory());
}
@Bean(name = "kafkaRomaContainerFactory") @Bean(name = "kafkaRomaContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaEsContainerFactory() { KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaEsContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); factory.setConcurrency(3);
//Listener配置
factory.getContainerProperties() factory.getContainerProperties()
.setPollTimeout(3000); .setPollTimeout(3000);
factory.getContainerProperties() factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties() factory.getContainerProperties()
.setPollTimeout(15000); .setPollTimeout(15000);
// 禁止消费者监听器自启动
factory.setAutoStartup(autoStartup);
return factory; return factory;
} }
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() { public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs()); return new DefaultKafkaConsumerFactory<>(consumerConfigs());
} }
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() { private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(); Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
...@@ -111,7 +97,7 @@ class KafkaConfig { ...@@ -111,7 +97,7 @@ class KafkaConfig {
props.put("sasl.jaas.config", saslConfig); props.put("sasl.jaas.config", saslConfig);
props.put("ssl.endpoint.identification.algorithm", sslAlgorithm); props.put("ssl.endpoint.identification.algorithm", sslAlgorithm);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put("max.poll.records", 8); props.put("max.poll.records", maxPollRecords);
props.put("max.poll.interval.ms", "30000"); props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000"); props.put("session.timeout.ms", "30000");
} }
......
...@@ -94,9 +94,5 @@ emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq ...@@ -94,9 +94,5 @@ emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq
# #
##需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created, ##需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created,
#emq.topic= #emq.topic=
queue.kafka.topics=null
#自定义Kafka配置对接交换站 kafka.auto-startup=true
queue.kafka.bootstrap-servers=121.199.39.218:9092 \ No newline at end of file
queue.kafka.consumer.group-id=kafkaRoma
queue.kafka.consumer.enable-auto-commit=false
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
\ No newline at end of file
...@@ -130,4 +130,6 @@ queue.kafka.confluent.sasl.jaas.config=org.apache.kafka.common.security.plain.Pl ...@@ -130,4 +130,6 @@ queue.kafka.confluent.sasl.jaas.config=org.apache.kafka.common.security.plain.Pl
username="io.cs" \ username="io.cs" \
password="=4#4x%pN$Ky2+X5.54ZS+/8WL2Pyu@916--/ycV3.9Bzkq6CZKt!7OZ1uRCPwt65"; password="=4#4x%pN$Ky2+X5.54ZS+/8WL2Pyu@916--/ycV3.9Bzkq6CZKt!7OZ1uRCPwt65";
queue.kafka.confluent.ssl.algorithm= queue.kafka.confluent.ssl.algorithm=
queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS queue.kafka.topics=T_DC_MQ_REALDATA,T_DC_MQ_STATUS
\ No newline at end of file queue.kafka.max.poll.records=500
kafka.auto-startup=true
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment