Commit a1b208c5 authored by litengwei's avatar litengwei

消息组件相关 / 后端服务初始化

parent e017e1ca
package com.yeejoin.amos.message.kafka; package com.yeejoin.amos.message.kafka;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord; import net.sf.json.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.util.List; import java.io.UnsupportedEncodingException;
/** /**
* kafka 消费服务 * kafka 消费服务
...@@ -18,13 +21,26 @@ import java.util.List; ...@@ -18,13 +21,26 @@ import java.util.List;
@Service @Service
public class KafkaConsumerService { public class KafkaConsumerService {
@Autowired
protected EmqKeeper emqKeeper;
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息 * @param message 消息
*/ */
@KafkaListener(id = "consumerSingle", topics = "#{'${topics}'.split(',')}") @KafkaListener(id = "consumerSingle", topics = "#{'${topics}'.split(',')}")
public void consumerSingle(String message,Acknowledgment ack) { public void consumerSingle(String message,Acknowledgment ack) {
log.info("consumerSingle ====> message: {}", message); JSONObject messageObj = JSONObject.fromObject(message);
String topic = messageObj.getString("topic");
JSONObject data = messageObj.getJSONObject("data");
try {
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes("UTF-8"), 1,false);
} catch (MqttException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
log.info("单条消息 ====> message: {}", message);
ack.acknowledge(); ack.acknowledge();
} }
......
package com.yeejoin.amos.message.kafka; package com.yeejoin.amos.message.kafka;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaHeaders;
...@@ -10,8 +14,15 @@ import org.springframework.messaging.support.MessageBuilder; ...@@ -10,8 +14,15 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureCallback;
import org.typroject.tyboot.component.emq.EmqKeeper;
import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
...@@ -31,6 +42,10 @@ public class KafkaProducerService { ...@@ -31,6 +42,10 @@ public class KafkaProducerService {
@Resource @Resource
private KafkaTemplate<String, String> kafkaTemplateWithTransaction; private KafkaTemplate<String, String> kafkaTemplateWithTransaction;
@Autowired
protected EmqKeeper emqKeeper;
/** /**
* 发送消息(同步) * 发送消息(同步)
* @param topic 主题 * @param topic 主题
...@@ -71,12 +86,12 @@ public class KafkaProducerService { ...@@ -71,12 +86,12 @@ public class KafkaProducerService {
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(Throwable throwable) {
log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message); log.error("发送消息(异步) failure! topic : {}, message: {}", topic, message);
} }
@Override @Override
public void onSuccess(SendResult<String, String> stringStringSendResult) { public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("sendMessageAsync success! topic: {}, message: {}", topic, message); log.info("发送消息(异步) success! topic: {}, message: {}", topic, message);
} }
}); });
} }
......
...@@ -29,7 +29,7 @@ public class KafkaConsumerConfiguration { ...@@ -29,7 +29,7 @@ public class KafkaConsumerConfiguration {
@Override @Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//打印消费异常的消息和异常信息 //打印消费异常的消息和异常信息
log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId()); log.error("消费异常的消息 failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId());
return null; return null;
} }
}; };
......
...@@ -21,5 +21,6 @@ spring.redis.host=172.16.11.201 ...@@ -21,5 +21,6 @@ spring.redis.host=172.16.11.201
spring.redis.port=6379 spring.redis.port=6379
spring.redis.password=1234560 spring.redis.password=1234560
#需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
topics=akka.iot.created,akka.patrol.created topics=akka.iot.created,akka.patrol.created
init.topics=akka.iot.created,akka.patrol.created init.topics=akka.iot.created,akka.patrol.created
\ No newline at end of file
...@@ -2,13 +2,11 @@ ...@@ -2,13 +2,11 @@
{ {
"code": "iot", "code": "iot",
"emqTopic": "eqm.iot.created", "emqTopic": "eqm.iot.created",
"akkaTopic": "akka.iot.created", "akkaTopic": "akka.iot.created"
"emqCoverAkkaTopic": "emq.iot.cover.akka"
}, },
{ {
"code": "patrol", "code": "patrol",
"emqTopic": "eqm.patrol.created", "emqTopic": "eqm.patrol.created",
"akkaTopic": "akka.patrol.created", "akkaTopic": "akka.patrol.created"
"emqCoverAkkaTopic": "emq.patrol.cover.akka"
} }
] ]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment