Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
73ad7afd
Commit
73ad7afd
authored
Nov 01, 2022
by
litengwei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
消息组件相关 / 后端服务初始化
parent
d2a44297
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
700 additions
and
0 deletions
+700
-0
pom.xml
amos-boot-utils/amos-boot-utils-message/pom.xml
+66
-0
AmosBootUtilsMessageApplication.java
...ava/com/yeejoin/amos/AmosBootUtilsMessageApplication.java
+53
-0
EmqMessageService.java
...java/com/yeejoin/amos/message/eqmx/EmqMessageService.java
+89
-0
KafkaConsumerService.java
.../com/yeejoin/amos/message/kafka/KafkaConsumerService.java
+77
-0
KafkaProducerService.java
.../com/yeejoin/amos/message/kafka/KafkaProducerService.java
+125
-0
KafkaConfig.java
...va/com/yeejoin/amos/message/kafka/config/KafkaConfig.java
+34
-0
KafkaConsumerConfiguration.java
...amos/message/kafka/config/KafkaConsumerConfiguration.java
+37
-0
application-dev.properties
...ils-message/src/main/resources/application-dev.properties
+26
-0
application.properties
...t-utils-message/src/main/resources/application.properties
+63
-0
topic.json
...mos-boot-utils-message/src/main/resources/json/topic.json
+15
-0
logback-dev.xml
...mos-boot-utils-message/src/main/resources/logback-dev.xml
+48
-0
logback-qa.xml
...amos-boot-utils-message/src/main/resources/logback-qa.xml
+47
-0
AmosBootUtilsMessageApplicationTests.java
...om/yeejoin/amos/AmosBootUtilsMessageApplicationTests.java
+20
-0
No files found.
amos-boot-utils/amos-boot-utils-message/pom.xml
0 → 100644
View file @
73ad7afd
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<artifactId>
amos-boot-utils
</artifactId>
<groupId>
com.amosframework.boot
</groupId>
<version>
1.0.0
</version>
</parent>
<artifactId>
amos-boot-utils-message
</artifactId>
<properties>
<tyboot.version>
1.1.23-SNAPSHOT
</tyboot.version>
</properties>
<dependencies>
<dependency>
<groupId>
com.amosframework.boot
</groupId>
<artifactId>
amos-boot-core
</artifactId>
<version>
${amos-biz-boot.version}
</version>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-test
</artifactId>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
net.sf.json-lib
</groupId>
<artifactId>
json-lib
</artifactId>
<version>
2.4
</version>
<classifier>
jdk15
</classifier>
</dependency>
<!--kafka依赖-->
<dependency>
<groupId>
org.springframework.kafka
</groupId>
<artifactId>
spring-kafka
</artifactId>
</dependency>
<dependency>
<groupId>
org.typroject
</groupId>
<artifactId>
tyboot-component-emq
</artifactId>
<version>
${tyboot.version}
</version>
<exclusions>
<exclusion>
<groupId>
org.typroject
</groupId>
<artifactId>
*
</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-maven-plugin
</artifactId>
</plugin>
</plugins>
</build>
</project>
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/AmosBootUtilsMessageApplication.java
0 → 100644
View file @
73ad7afd
package
com
.
yeejoin
.
amos
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.boot.SpringApplication
;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
import
org.springframework.boot.context.properties.EnableConfigurationProperties
;
import
org.springframework.boot.web.servlet.ServletComponentScan
;
import
org.springframework.cloud.client.discovery.EnableDiscoveryClient
;
import
org.springframework.cloud.netflix.eureka.EnableEurekaClient
;
import
org.springframework.cloud.openfeign.EnableFeignClients
;
import
org.springframework.context.ConfigurableApplicationContext
;
import
org.springframework.context.annotation.ComponentScan
;
import
org.springframework.scheduling.annotation.EnableAsync
;
import
org.springframework.core.env.Environment
;
import
java.net.InetAddress
;
import
java.net.UnknownHostException
;
/**
* <pre>
* 服务启动类
* </pre>
*
* @author amos
* @version $Id: YeeAMOSPatrolStart.java, v 0.1 2018年11月26日 下午4:56:29 amos Exp $
*/
@SpringBootApplication
@EnableConfigurationProperties
@ServletComponentScan
@EnableDiscoveryClient
@EnableFeignClients
@EnableAsync
@EnableEurekaClient
@ComponentScan
({
"org.typroject"
,
"com.yeejoin.amos"
})
public
class
AmosBootUtilsMessageApplication
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
AmosBootUtilsMessageApplication
.
class
);
public
static
void
main
(
String
[]
args
)
throws
UnknownHostException
{
ConfigurableApplicationContext
context
=
SpringApplication
.
run
(
AmosBootUtilsMessageApplication
.
class
,
args
);
Environment
env
=
context
.
getEnvironment
();
String
ip
=
InetAddress
.
getLocalHost
().
getHostAddress
();
String
port
=
env
.
getProperty
(
"server.port"
);
String
path
=
env
.
getProperty
(
"server.servlet.context-path"
);
logger
.
info
(
"\n----------------------------------------------------------\n\t"
+
"Application Amos-Biz-Boot is running! Access URLs:\n\t"
+
"Swagger文档: \thttp://"
+
ip
+
":"
+
port
+
path
+
"/doc.html\n"
+
"----------------------------------------------------------"
);
}
}
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/message/eqmx/EmqMessageService.java
0 → 100644
View file @
73ad7afd
package
com
.
yeejoin
.
amos
.
message
.
eqmx
;
import
com.alibaba.fastjson.JSON
;
import
com.yeejoin.amos.message.kafka.KafkaProducerService
;
import
org.apache.commons.io.IOUtils
;
import
org.eclipse.paho.client.mqttv3.MqttMessage
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.core.io.Resource
;
import
org.springframework.stereotype.Component
;
import
org.typroject.tyboot.component.emq.EmqKeeper
;
import
org.typroject.tyboot.component.emq.EmqxListener
;
import
javax.annotation.PostConstruct
;
import
java.io.IOException
;
import
java.nio.charset.StandardCharsets
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
net.sf.json.JSONObject
;
@Component
public
class
EmqMessageService
extends
EmqxListener
{
@Autowired
protected
EmqKeeper
emqKeeper
;
@Autowired
protected
KafkaProducerService
kafkaProducerService
;
@Value
(
"classpath:/json/topic.json"
)
private
Resource
topic
;
private
List
<
Map
>
list
;
private
static
final
BlockingQueue
<
JSONObject
>
blockingQueue
=
new
LinkedBlockingQueue
<>();
@PostConstruct
void
init
()
throws
Exception
{
new
Thread
(
task_runnable
).
start
();
String
json
=
null
;
try
{
json
=
IOUtils
.
toString
(
topic
.
getInputStream
(),
java
.
lang
.
String
.
valueOf
(
StandardCharsets
.
UTF_8
));
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
list
=
com
.
alibaba
.
fastjson
.
JSONObject
.
parseArray
(
json
,
Map
.
class
);
list
.
forEach
(
e
->{
try
{
emqKeeper
.
subscript
(
e
.
get
(
"emqTopic"
).
toString
(),
0
,
this
);
}
catch
(
Exception
exception
)
{
exception
.
printStackTrace
();
}
});
}
@Override
public
void
processMessage
(
String
topic
,
MqttMessage
message
)
throws
Exception
{
JSONObject
result
=
JSONObject
.
fromObject
(
new
String
(
message
.
getPayload
()));
JSONObject
messageResult
=
new
JSONObject
();
messageResult
.
put
(
"result"
,
result
);
messageResult
.
put
(
"topic"
,
topic
);
blockingQueue
.
add
(
messageResult
);
}
Runnable
task_runnable
=
new
Runnable
()
{
public
void
run
()
{
while
(
true
)
{
try
{
JSONObject
messageResult
=
blockingQueue
.
take
();
JSONObject
result
=
messageResult
.
getJSONObject
(
"result"
);
// 处理逻辑
list
.
forEach
(
e
->{
if
(
e
.
get
(
"emqTopic"
).
toString
().
equals
(
messageResult
.
getString
(
"topic"
)))
{
kafkaProducerService
.
sendMessageAsync
(
e
.
get
(
"akkaTopic"
).
toString
(),
JSON
.
toJSONString
(
result
));
}
});
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
};
}
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/message/kafka/KafkaConsumerService.java
0 → 100644
View file @
73ad7afd
package
com
.
yeejoin
.
amos
.
message
.
kafka
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.consumer.ConsumerRecord
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
org.springframework.kafka.support.Acknowledgment
;
import
org.springframework.stereotype.Service
;
import
java.util.List
;
/**
* kafka 消费服务
*
* @author litw
* @create 2022/11/1 10:06
**/
@Slf4j
@Service
public
class
KafkaConsumerService
{
/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息
*/
@KafkaListener
(
id
=
"consumerSingle"
,
topics
=
"#{'${topics}'.split(',')}"
)
public
void
consumerSingle
(
String
message
,
Acknowledgment
ack
)
{
log
.
info
(
"consumerSingle ====> message: {}"
,
message
);
ack
.
acknowledge
();
}
/* @KafkaListener(id = "consumerBatch", topicPartitions = {
@TopicPartition(topic = "hello-batch1", partitions = "0"),
@TopicPartition(topic = "hello-batch2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "4"))
})*/
// /**
// * 批量消费消息
// * @param messages
// */
// @KafkaListener(id = "consumerBatch", topics = "test-batch")
// public void consumerBatch(List<ConsumerRecord<String, String>> messages) {
// log.info("consumerBatch =====> messageSize: {}", messages.size());
// log.info(messages.toString());
// }
// /**
// * 指定消费异常处理器
// * @param message
// */
// @KafkaListener(id = "consumerException", topics = "kafka-test-topic", errorHandler = "consumerAwareListenerErrorHandler")
// public void consumerException(String message) {
// throw new RuntimeException("consumer exception");
// }
//
// /**
// * 验证ConsumerInterceptor
// * @param message
// */
// @KafkaListener(id = "interceptor", topics = "consumer-interceptor")
// public void consumerInterceptor(String message) {
// log.info("consumerInterceptor ====> message: {}", message);
// }
//
//
//
// //kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
// @KafkaListener(topics = "test", groupId = "zhTestGroup")
// public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
// String value = record.value();
// System.out.println(value);
// System.out.println(record);
// //手动提交offset
// ack.acknowledge();
// }
}
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/message/kafka/KafkaProducerService.java
0 → 100644
View file @
73ad7afd
package
com
.
yeejoin
.
amos
.
message
.
kafka
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.kafka.core.KafkaOperations
;
import
org.springframework.kafka.core.KafkaTemplate
;
import
org.springframework.kafka.support.KafkaHeaders
;
import
org.springframework.kafka.support.SendResult
;
import
org.springframework.messaging.Message
;
import
org.springframework.messaging.support.MessageBuilder
;
import
org.springframework.stereotype.Service
;
import
org.springframework.util.concurrent.ListenableFuture
;
import
org.springframework.util.concurrent.ListenableFutureCallback
;
import
javax.annotation.Resource
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
/**
* kafka 生产服务
*
* @author litw
* @create 2022/11/1 10:06
**/
@Slf4j
@Service
public
class
KafkaProducerService
{
@Resource
private
KafkaTemplate
<
String
,
String
>
kafkaTemplate
;
@Resource
private
KafkaTemplate
<
String
,
String
>
kafkaTemplateWithTransaction
;
/**
* 发送消息(同步)
* @param topic 主题
* @param key 键
* @param message 值
*/
public
void
sendMessageSync
(
String
topic
,
String
key
,
String
message
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
//可以指定最长等待时间,也可以不指定
kafkaTemplate
.
send
(
topic
,
message
).
get
(
10
,
TimeUnit
.
SECONDS
);
log
.
info
(
"sendMessageSync => topic: {}, key: {}, message: {}"
,
topic
,
key
,
message
);
//指定key,kafka根据key进行hash,决定存入哪个partition
// kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
//存入指定partition
// kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
}
/**
* 发送消息并获取结果
* @param topic
* @param message
* @throws ExecutionException
* @throws InterruptedException
*/
public
void
sendMessageGetResult
(
String
topic
,
String
key
,
String
message
)
throws
ExecutionException
,
InterruptedException
{
SendResult
<
String
,
String
>
result
=
kafkaTemplate
.
send
(
topic
,
message
).
get
();
log
.
info
(
"sendMessageSync => topic: {}, key: {}, message: {}"
,
topic
,
key
,
message
);
log
.
info
(
"The partition the message was sent to: "
+
result
.
getRecordMetadata
().
partition
());
}
/**
* 发送消息(异步)
* @param topic 主题
* @param message 消息内容
*/
public
void
sendMessageAsync
(
String
topic
,
String
message
)
{
ListenableFuture
<
SendResult
<
String
,
String
>>
future
=
kafkaTemplate
.
send
(
topic
,
message
);
//添加回调
future
.
addCallback
(
new
ListenableFutureCallback
<
SendResult
<
String
,
String
>>()
{
@Override
public
void
onFailure
(
Throwable
throwable
)
{
log
.
error
(
"sendMessageAsync failure! topic : {}, message: {}"
,
topic
,
message
);
}
@Override
public
void
onSuccess
(
SendResult
<
String
,
String
>
stringStringSendResult
)
{
log
.
info
(
"sendMessageAsync success! topic: {}, message: {}"
,
topic
,
message
);
}
});
}
/**
* 可以将消息组装成 Message 对象和 ProducerRecord 对象发送
* @param topic
* @param key
* @param message
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public
void
testMessageBuilder
(
String
topic
,
String
key
,
String
message
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
// 组装消息
Message
msg
=
MessageBuilder
.
withPayload
(
message
)
.
setHeader
(
KafkaHeaders
.
MESSAGE_KEY
,
key
)
.
setHeader
(
KafkaHeaders
.
TOPIC
,
topic
)
.
setHeader
(
KafkaHeaders
.
PREFIX
,
"kafka_"
)
.
build
();
//同步发送
kafkaTemplate
.
send
(
msg
).
get
();
// 组装消息
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
}
/**
* 以事务方式发送消息
* @param topic
* @param key
* @param message
*/
public
void
sendMessageInTransaction
(
String
topic
,
String
key
,
String
message
)
{
kafkaTemplateWithTransaction
.
executeInTransaction
(
new
KafkaOperations
.
OperationsCallback
<
String
,
String
,
Object
>()
{
@Override
public
Object
doInOperations
(
KafkaOperations
<
String
,
String
>
kafkaOperations
)
{
kafkaOperations
.
send
(
topic
,
key
,
message
);
//出现异常将会中断事务,消息不会发送出去
throw
new
RuntimeException
(
"12"
);
}
});
}
}
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/message/kafka/config/KafkaConfig.java
0 → 100644
View file @
73ad7afd
package
com
.
yeejoin
.
amos
.
message
.
kafka
.
config
;
import
org.apache.kafka.clients.admin.NewTopic
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
java.util.Arrays
;
/**
* topic初始化
*
* @author litw
* @create 2022/11/1 10:06
*/
@Configuration
class
KafkaConfig
{
@Value
(
"${init.topics}"
)
private
String
topics
;
/**
* 创建一个名为topic.test的Topic并设置分区数为8,分区副本数为2
*/
@Bean
public
void
initialTopic
()
{
String
[]
split
=
topics
.
split
(
","
);
Arrays
.
stream
(
split
).
forEach
(
e
->{
new
NewTopic
(
e
,
8
,
(
short
)
2
);
});
}
}
\ No newline at end of file
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/message/kafka/config/KafkaConsumerConfiguration.java
0 → 100644
View file @
73ad7afd
package
com
.
yeejoin
.
amos
.
message
.
kafka
.
config
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.kafka.clients.consumer.Consumer
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler
;
import
org.springframework.kafka.listener.ListenerExecutionFailedException
;
import
org.springframework.messaging.Message
;
/**
* kafka 消费者配置类
*
* @author Leo
* @create 2020/12/31 15:09
**/
@Slf4j
@Configuration
public
class
KafkaConsumerConfiguration
{
/**
* 消费异常处理器
* @return
*/
@Bean
public
ConsumerAwareListenerErrorHandler
consumerAwareListenerErrorHandler
()
{
return
new
ConsumerAwareListenerErrorHandler
()
{
@Override
public
Object
handleError
(
Message
<?>
message
,
ListenerExecutionFailedException
exception
,
Consumer
<?,
?>
consumer
)
{
//打印消费异常的消息和异常信息
log
.
error
(
"consumer failed! message: {}, exceptionMsg: {}, groupId: {}"
,
message
,
exception
.
getMessage
(),
exception
.
getGroupId
());
return
null
;
}
};
}
}
amos-boot-utils/amos-boot-utils-message/src/main/resources/application-dev.properties
0 → 100644
View file @
73ad7afd
#注册中心地址
eureka.client.service-url.defaultZone
=
http://172.16.11.201:10001/eureka/
eureka.instance.prefer-ip-address
=
true
management.endpoint.health.show-details
=
always
management.endpoints.web.exposure.include
=
*
eureka.instance.health-check-url-path
=
/actuator/health
eureka.instance.metadata-map.management.context-path
=
${server.servlet.context-path}/actuator
eureka.instance.status-page-url-path
=
/actuator/info
eureka.instance.metadata-map.management.api-docs
=
http://localhost:${server.port}${server.servlet.context-path}/swagger-ui.html
## emqx
emqx.clean-session
=
true
emqx.client-id
=
${spring.application.name}-${random.int[1024,65536]}
emqx.broker
=
tcp://172.16.11.201:1883
emqx.client-user-name
=
admin
emqx.client-password
=
public
emqx.max-inflight
=
1000
spring.redis.database
=
1
spring.redis.host
=
172.16.11.201
spring.redis.port
=
6379
spring.redis.password
=
1234560
topics
=
akka.iot.created,akka.patrol.created
init.topics
=
akka.iot.created,akka.patrol.created
\ No newline at end of file
amos-boot-utils/amos-boot-utils-message/src/main/resources/application.properties
0 → 100644
View file @
73ad7afd
spring.application.name
=
AMOS-MESSAGE
server.servlet.context-path
=
/message
server.port
=
8119
spring.profiles.active
=
dev
spring.jackson.time-zone
=
GMT+8
spring.jackson.date-format
=
yyyy-MM-dd HH:mm:ss
spring.jackson.serialization.write-dates-as-timestamps
=
true
# kafka集群信息
spring.kafka.bootstrap-servers
=
127.0.0.1:9092
# 生产者配置
# 设置大于0的值,则客户端会将发送失败的记录重新发送 # 重试次数
spring.kafka.producer.retries
=
3
#16K
spring.kafka.producer.batch-size
=
16384
spring.kafka.producer.buffer-memory
=
33554432
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
spring.kafka.producer.acks
=
1
# 指定消息key和消息体的编解码方式
# # 批量处理的最大大小 单位 byte
# batch-size: 4096
# # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# buffer-memory: 33554432
# # 客户端ID
# client-id: hello-kafka
# # 消息压缩:none、lz4、gzip、snappy,默认为 none。
# compression-type: gzip
spring.kafka.producer.key-serializer
=
org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer
=
org.apache.kafka.common.serialization.StringSerializer
# 消费者组
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset
# latest:重置为分区中最新的offset(消费分区中新产生的数据)
# none:只要有一个分区不存在已提交的offset,就抛出异常
spring.kafka.consumer.group-id
=
zhTestGroup
spring.kafka.consumer.enable-auto-commit
=
false
# 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# # 自动提交的频率 单位 ms
# auto-commit-interval: 1000
# # 批量消费最大数量
# max-poll-records: 100
spring.kafka.consumer.auto-offset-reset
=
earliest
spring.kafka.consumer.key-deserializer
=
org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer
=
org.apache.kafka.common.serialization.StringDeserializer
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
spring.kafka.listener.ack-mode
=
manual_immediate
amos-boot-utils/amos-boot-utils-message/src/main/resources/json/topic.json
0 → 100644
View file @
73ad7afd
[
{
"code"
:
"iot"
,
"emqTopic"
:
"eqm.iot.created"
,
"akkaTopic"
:
"akka.iot.created"
,
"emqCoverAkkaTopic"
:
"emq.iot.cover.akka"
},
{
"code"
:
"patrol"
,
"emqTopic"
:
"eqm.patrol.created"
,
"akkaTopic"
:
"akka.patrol.created"
,
"emqCoverAkkaTopic"
:
"emq.patrol.cover.akka"
}
]
\ No newline at end of file
amos-boot-utils/amos-boot-utils-message/src/main/resources/logback-dev.xml
0 → 100644
View file @
73ad7afd
<?xml version="1.0" encoding="UTF-8"?>
<configuration
debug=
"false"
>
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
<property
name=
"LOG_HOME"
value=
"log"
/>
<!-- 按照每天生成日志文件 -->
<appender
name=
"FILE"
class=
"ch.qos.logback.core.rolling.RollingFileAppender"
>
<rollingPolicy
class=
"ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"
>
<!--日志文件输出的文件名-->
<FileNamePattern>
${LOG_HOME}/jpush.log.%d{yyyy-MM-dd}.%i.log
</FileNamePattern>
<!--日志文件保留天数-->
<MaxHistory>
30
</MaxHistory>
<!--日志文件大小-->
<MaxFileSize>
30mb
</MaxFileSize>
</rollingPolicy>
<encoder
class=
"ch.qos.logback.classic.encoder.PatternLayoutEncoder"
>
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</encoder>
</appender>
<!-- 控制台输出 -->
<appender
name=
"STDOUT"
class=
"ch.qos.logback.core.ConsoleAppender"
>
<encoder
class=
"ch.qos.logback.classic.encoder.PatternLayoutEncoder"
>
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</encoder>
</appender>
<!-- show parameters for hibernate sql 专为 Hibernate 定制
<logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="TRACE" />
<logger name="org.hibernate.type.descriptor.sql.BasicExtractor" level="DEBUG" />
<logger name="org.hibernate.SQL" level="DEBUG" />
<logger name="org.hibernate.engine.QueryParameters" level="DEBUG" />
<logger name="org.hibernate.engine.query.HQLQueryPlan" level="DEBUG" />
-->
<!--myibatis log configure-->
<logger
name=
"com.apache.ibatis"
level=
"DEBUG"
/>
<logger
name=
"org.mybatis"
level=
"DEBUG"
/>
<logger
name=
"java.sql.Connection"
level=
"DEBUG"
/>
<logger
name=
"java.sql.Statement"
level=
"DEBUG"
/>
<logger
name=
"java.sql.PreparedStatement"
level=
"DEBUG"
/>
<logger
name=
"org.springframework"
level=
"DEBUG"
/>
<!-- 日志输出级别 -->
<root
level=
"DEBUG"
>
<appender-ref
ref=
"FILE"
/>
<appender-ref
ref=
"STDOUT"
/>
</root>
</configuration>
\ No newline at end of file
amos-boot-utils/amos-boot-utils-message/src/main/resources/logback-qa.xml
0 → 100644
View file @
73ad7afd
<?xml version="1.0" encoding="UTF-8"?>
<configuration
debug=
"false"
>
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
<property
name=
"LOG_HOME"
value=
"log"
/>
<!-- 按照每天生成日志文件 -->
<appender
name=
"FILE"
class=
"ch.qos.logback.core.rolling.RollingFileAppender"
>
<rollingPolicy
class=
"ch.qos.logback.core.rolling.TimeBasedRollingPolicy"
>
<!--日志文件输出的文件名-->
<FileNamePattern>
${LOG_HOME}/jpush.%d{yyyy-MM-dd}.%i.log
</FileNamePattern>
<!--日志文件保留天数-->
<MaxHistory>
30
</MaxHistory>
<!--按大小分割同一天的-->
<timeBasedFileNamingAndTriggeringPolicy
class=
"ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"
>
<maxFileSize>
100MB
</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<encoder
class=
"ch.qos.logback.classic.encoder.PatternLayoutEncoder"
>
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</encoder>
</appender>
<!-- show parameters for hibernate sql 专为 Hibernate 定制
<logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="TRACE" />
<logger name="org.hibernate.type.descriptor.sql.BasicExtractor" level="DEBUG" />
<logger name="org.hibernate.SQL" level="DEBUG" />
<logger name="org.hibernate.engine.QueryParameters" level="DEBUG" />
<logger name="org.hibernate.engine.query.HQLQueryPlan" level="DEBUG" />
-->
<!--myibatis log configure-->
<logger
name=
"com.apache.ibatis"
level=
"INFO"
/>
<logger
name=
"org.mybatis"
level=
"INFO"
/>
<logger
name=
"java.sql.Connection"
level=
"INFO"
/>
<logger
name=
"java.sql.Statement"
level=
"INFO"
/>
<logger
name=
"java.sql.PreparedStatement"
level=
"INFO"
/>
<logger
name=
"com.baomidou.mybatisplus"
level=
"INFO"
/>
<logger
name=
"org.typroject"
level=
"INFO"
/>
<logger
name=
"com.yeejoin.amos"
level=
"INFO"
/>
<logger
name=
"org.springframework"
level=
"INFO"
/>
<!-- 日志输出级别 -->
<root
level=
"INFO"
>
<appender-ref
ref=
"FILE"
/>
</root>
</configuration>
\ No newline at end of file
amos-boot-utils/amos-boot-utils-message/src/test/java/com/yeejoin/amos/AmosBootUtilsMessageApplicationTests.java
0 → 100644
View file @
73ad7afd
package
com
.
yeejoin
.
amos
;
import
com.yeejoin.amos.message.kafka.KafkaProducerService
;
import
org.junit.jupiter.api.Test
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.test.context.SpringBootTest
;
@SpringBootTest
class
AmosBootUtilsMessageApplicationTests
{
@Autowired
private
KafkaProducerService
kafkaProducerService
;
@Test
void
contextLoads
()
{
String
msg
=
"hello"
;
kafkaProducerService
.
sendMessageAsync
(
"akka.iot.created"
,
msg
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment