Commit 0b2b7c02 authored by litengwei's avatar litengwei

10850

parent 2e207ebb
...@@ -25,6 +25,7 @@ import com.yeejoin.equipmanage.remote.RemoteSecurityService; ...@@ -25,6 +25,7 @@ import com.yeejoin.equipmanage.remote.RemoteSecurityService;
import com.yeejoin.equipmanage.service.*; import com.yeejoin.equipmanage.service.*;
import com.yeejoin.equipmanage.utils.BeanUtil; import com.yeejoin.equipmanage.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -36,11 +37,14 @@ import org.springframework.transaction.support.TransactionSynchronizationManager ...@@ -36,11 +37,14 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil; import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -94,6 +98,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -94,6 +98,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Autowired @Autowired
MarqueeDataMapper marqueeDataMapper; MarqueeDataMapper marqueeDataMapper;
/** /**
* 泡沫罐KEY * 泡沫罐KEY
*/ */
...@@ -217,6 +222,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -217,6 +222,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
private static Boolean bool = Boolean.FALSE; private static Boolean bool = Boolean.FALSE;
@Autowired
protected EmqKeeper emqKeeper;
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void handlerMqttIncrementMessage(String topic, String message) { public void handlerMqttIncrementMessage(String topic, String message) {
...@@ -264,6 +272,18 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -264,6 +272,18 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
return; return;
} }
log.info(String.format("收到mqtt消息:%s", message)); log.info(String.format("收到mqtt消息:%s", message));
// 发送emq消息转kafka
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", topic);
jsonObject.put("data",message);
try {
emqKeeper.getMqttClient().publish("eqm.iot.created",jsonObject.toString().getBytes(),1,false);
} catch (MqttException e) {
e.printStackTrace();
}
if (!StringUtils.isEmpty(traceId)) { if (!StringUtils.isEmpty(traceId)) {
String finalTraceId = traceId; String finalTraceId = traceId;
List<IotDataVO> collect = iotDatalist.stream().map(x -> { List<IotDataVO> collect = iotDatalist.stream().map(x -> {
......
...@@ -2,6 +2,7 @@ package com.yeejoin.amos.message.eqmx; ...@@ -2,6 +2,7 @@ package com.yeejoin.amos.message.eqmx;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.message.kafka.KafkaProducerService; import com.yeejoin.amos.message.kafka.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -14,12 +15,14 @@ import org.typroject.tyboot.component.emq.EmqxListener; ...@@ -14,12 +15,14 @@ import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
@Slf4j
@Component @Component
public class EmqMessageService extends EmqxListener { public class EmqMessageService extends EmqxListener {
...@@ -35,11 +38,14 @@ public class EmqMessageService extends EmqxListener { ...@@ -35,11 +38,14 @@ public class EmqMessageService extends EmqxListener {
private List<Map> list; private List<Map> list;
@Value("${emq.topic}")
private String topics;
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>(); private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
@PostConstruct @PostConstruct
void init() throws Exception { void init() {
new Thread(task_runnable).start(); new Thread(task_runnable).start();
String json = null; String json = null;
...@@ -50,11 +56,13 @@ public class EmqMessageService extends EmqxListener { ...@@ -50,11 +56,13 @@ public class EmqMessageService extends EmqxListener {
} }
list = com.alibaba.fastjson.JSONObject.parseArray(json, Map.class); list = com.alibaba.fastjson.JSONObject.parseArray(json, Map.class);
list.forEach(e->{
String[] split = topics.split(",");
Arrays.stream(split).forEach(e-> {
try { try {
emqKeeper.subscript(e.get("emqTopic").toString(), 0, this); emqKeeper.subscript(e, 1, this);
} catch (Exception exception) { } catch (Exception exception) {
exception.printStackTrace(); log.info("订阅eqm消息失败 ====> message: {}", exception.getMessage());
} }
}); });
} }
...@@ -81,7 +89,7 @@ public class EmqMessageService extends EmqxListener { ...@@ -81,7 +89,7 @@ public class EmqMessageService extends EmqxListener {
} }
}); });
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.info("发送kafka消息失败 ====> message: {}", e.getMessage());
} }
} }
} }
......
...@@ -28,7 +28,7 @@ public class KafkaConsumerService { ...@@ -28,7 +28,7 @@ public class KafkaConsumerService {
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息 * @param message 消息
*/ */
@KafkaListener(id = "consumerSingle", topics = "#{'${topics}'.split(',')}") @KafkaListener(id = "consumerSingle", topics = "#{'${kafka.topics}'.split(',')}")
public void consumerSingle(String message,Acknowledgment ack) { public void consumerSingle(String message,Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message); JSONObject messageObj = JSONObject.fromObject(message);
String topic = messageObj.getString("topic"); String topic = messageObj.getString("topic");
......
package com.yeejoin.amos.message.kafka; package com.yeejoin.amos.message.kafka;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaHeaders;
...@@ -16,13 +13,8 @@ import org.springframework.util.concurrent.ListenableFuture; ...@@ -16,13 +13,8 @@ import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureCallback;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
...@@ -42,8 +34,6 @@ public class KafkaProducerService { ...@@ -42,8 +34,6 @@ public class KafkaProducerService {
@Resource @Resource
private KafkaTemplate<String, String> kafkaTemplateWithTransaction; private KafkaTemplate<String, String> kafkaTemplateWithTransaction;
@Autowired
protected EmqKeeper emqKeeper;
/** /**
......
...@@ -16,7 +16,7 @@ import java.util.Arrays; ...@@ -16,7 +16,7 @@ import java.util.Arrays;
*/ */
@Configuration class KafkaConfig { @Configuration class KafkaConfig {
@Value("${init.topics}") @Value("${kafka.init.topics}")
private String topics; private String topics;
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment