Commit 31b96beb authored by tangwei's avatar tangwei

监听消息

parent 7b3c85d9
......@@ -52,6 +52,13 @@
</exclusion>
</exclusions>
</dependency>
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-component-emq</artifactId>
......
......@@ -39,7 +39,7 @@ import java.net.InetAddress;
@EnableEurekaClient
@EnableScheduling
@MapperScan(value = { "org.typroject.tyboot.*.*.face.orm.dao", "com.yeejoin.amos.api.*.face.orm.dao", "org.typroject.tyboot.face.*.orm.dao*",
"com.yeejoin.amos.boot.biz.common.dao.mapper" })
"com.yeejoin.amos.api.*.mapper","com.yeejoin.amos.boot.biz.common.dao.mapper" })
@ComponentScan({ "org.typroject", "com.yeejoin.amos" })
public class AlarmApplication {
......
//package com.yeejoin.amos.api.alarm.config;
//
//import org.apache.kafka.clients.admin.AdminClient;
//import org.apache.kafka.clients.admin.AdminClientConfig;
//import org.apache.kafka.clients.admin.NewTopic;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.kafka.core.KafkaAdmin;
//
//import java.util.HashMap;
//import java.util.Map;
//
//import static org.bouncycastle.asn1.pkcs.PKCSObjectIdentifiers.md5;
//
//@Configuration
//public class KafkaInitialConfiguration {
//
//
//
//
// /***
// * 创建top 10个分区1个副本
// * 通过bean创建(bean的名字为initialTopic)
// * @return
// */
//
// @Bean
// public NewTopic initialTopic1() {
//
// return new NewTopic("jf1",3, (short) 1 );
// }
//
//
// @Bean
// public KafkaAdmin kafkaAdmin() {
// Map<String, Object> props = new HashMap<>();
// //配置Kafka实例的连接地址
// props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "121.199.39.218:9092");
// KafkaAdmin admin = new KafkaAdmin(props);
// return admin;
// }
//
// @Bean
// public AdminClient adminClient() {
// return AdminClient.create(kafkaAdmin().getConfig());
// }
//
//
//
//}
//package com.yeejoin.amos.api.alarm.service.impl;
//
//import com.alibaba.fastjson.JSON;
//import org.apache.kafka.clients.admin.NewTopic;
//import org.apache.kafka.clients.producer.ProducerRecord;
//import org.checkerframework.checker.units.qual.K;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.kafka.core.KafkaTemplate;
//import org.springframework.kafka.support.SendResult;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Service;
//import org.springframework.util.concurrent.ListenableFuture;
//import org.springframework.util.concurrent.ListenableFutureCallback;
//
//import javax.annotation.PostConstruct;
//import javax.annotation.Resource;
//
///**
// * @description:
// * @author: tw
// * @createDate: 2023/6/28
// */
//@Service
//public class producerServers {
//
//
// @Autowired
// private KafkaTemplate<String, String> kafkaTemplate;
//
//@Scheduled(fixedRate = 60000)
// public void send(){
// String gg1="1668801435891929089@18873";
// String gg2="1668801435891929089@18874";
// String gg3="1668801435891929089@18875";
// String gg4="1668801435891929089@18876";
// String gg5="1668801435891929089@18877";
// String gg6="1668801435891929089@18878";
// String gg7="1668801435891929089@18879";
// String gg8="1668801435891929089@18880";
//
//
// String topic="jf1";
//
// ProducerRecord<String, String> producerRecord1 = new ProducerRecord<String, String>( topic, gg1.hashCode()%3, gg1.hashCode()%3+"", gg1+"==============="+gg1.hashCode()%5);
// ProducerRecord<String, String> producerRecord2 = new ProducerRecord<String, String>( topic, gg2.hashCode()%3,gg2.hashCode()%3+"", gg2+"==============="+gg2.hashCode()%5);
// ProducerRecord<String, String> producerRecord3 = new ProducerRecord<String, String>( topic, gg3.hashCode()%3,gg3.hashCode()%3+"", gg3+"==============="+gg3.hashCode()%5);
// ProducerRecord<String, String> producerRecord4 = new ProducerRecord<String, String>( topic, gg4.hashCode()%3,gg4.hashCode()%3+"", gg4+"==============="+gg4.hashCode()%5);
// ProducerRecord<String, String> producerRecord5 = new ProducerRecord<String, String>( topic, gg5.hashCode()%3,gg5.hashCode()%3+"", gg5+"==============="+gg5.hashCode()%5);
// ProducerRecord<String, String> producerRecord6 = new ProducerRecord<String, String>( topic, gg6.hashCode()%3,gg6.hashCode()%3+"", gg6+"==============="+gg6.hashCode()%5);
// ProducerRecord<String, String> producerRecord7 = new ProducerRecord<String, String>( topic, gg7.hashCode()%3,gg7.hashCode()%3+"", gg7+"==============="+gg7.hashCode()%5);
// ProducerRecord<String, String> producerRecord8 = new ProducerRecord<String, String>( topic, gg8.hashCode()%3,gg8.hashCode()%3+"", gg8+"==============="+gg8.hashCode()%5);
//
// System.out.println(gg1.hashCode()%3);
// System.out.println(gg2.hashCode()%3);
// System.out.println(gg3.hashCode()%3);
// System.out.println(gg4.hashCode()%3);
// System.out.println(gg5.hashCode()%3);
// System.out.println(gg6.hashCode()%3);
// System.out.println(gg7.hashCode()%3);
// System.out.println(gg8.hashCode()%3);
//
// kafkaTemplate.send(producerRecord1);
// kafkaTemplate.send(producerRecord2);
// kafkaTemplate.send(producerRecord3);
// kafkaTemplate.send(producerRecord4);
// kafkaTemplate.send(producerRecord5);
// kafkaTemplate.send(producerRecord6);
// kafkaTemplate.send(producerRecord7);
// kafkaTemplate.send(producerRecord8);
//
//
//
//
// }
//
//
//
//
//
//}
......@@ -11,3 +11,51 @@ redis.cache.failure.time=10800
# mybatis-plus
mybatis-plus.mapper-locations=classpath:mapper/*Mapper.xml
#消费者所在组的名称
#消费者 的broker地址
spring.kafka.consumer.bootstrap-servers=121.199.39.218:9092
# 是否自动提交
spring.kafka.consumer.enable-auto-commit=false
#offset的消费位置
spring.kafka.consumer.auto-offset-reset=earliest
# 配置序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#手动提交方式
spring.kafka.listener.ack-mode=manual_immediate
#监听类型
spring.kafka.listener.type=single
# 并发
#spring.kafka.listener.concurrency=5
# 发生错误后,消息重发的次数。
spring.kafka.producer.retries=1
#配置kafak produce的broker地址
spring.kafka.producer.bootstrap-servers=121.199.39.218:9092
#默认批处理大小(以字节为单位)
spring.kafka.producer.batch-size=16384
#生产者可以用来缓冲等待发送到服务器的记录的内存总字节数
spring.kafka.producer.buffer-memory=33554432
# producer配置序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka默认的String序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.equipment.alarm=EQUIPMENT_ALARM88
#电站对接第三方查询设备kks码
power.station.url=http://139.9.169.123:5024/prod-api/fdgl/process/DataInterface
#电站104采集预警
power.station.warning=104/data/analysis
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment