Commit 84aeef98 authored by KeYong's avatar KeYong

Merge remote-tracking branch 'origin/develop' into develop

parents 84aaeb9b 9b3d6f8b
package com.boot.bus.sqlsync.emqx;
import org.springframework.stereotype.Component;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.emqtt
* @ClassName: EmqttPredicate
* @Author: Jianqiang Gao
* @Description: EmqttPredicate
* @Date: 2021/3/23 15:57
* @Version: 1.0
*/
@Component
public class EmqttPredicate {
public Boolean test(MqttEvent event) {
//测试内容
return Boolean.FALSE;
}
}
\ No newline at end of file
package com.boot.bus.sqlsync.emqx;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.emqtt
* @ClassName: MqttEvent
* @Author: Jianqiang Gao
* @Description: MqttEvent
* @Date: 2021/3/23 15:56
* @Version: 1.0
*/
@Getter
public class MqttEvent extends ApplicationEvent {
/**
*
*/
private String topic;
/**
* 发送的消息
*/
private String message;
public MqttEvent(Object source, String topic, String message) {
super(source);
this.topic = topic;
this.message = message;
}
}
\ No newline at end of file
...@@ -23,21 +23,22 @@ import org.springframework.messaging.MessageHandler; ...@@ -23,21 +23,22 @@ import org.springframework.messaging.MessageHandler;
*/ */
@Configuration @Configuration
@IntegrationComponentScan @IntegrationComponentScan
public class MqttConfiguration { public class MqttProduceConfig {
@Value("${spring.mqtt.username}")
@Value("${emqx.user-name}")
private String userName; private String userName;
@Value("${spring.mqtt.password}") @Value("${emqx.password}")
private String password; private String password;
@Value("${spring.mqtt.host-url}") @Value("${emqx.broker}")
private String hostUrl; private String hostUrl;
@Value("${spring.mqtt.client-id}") @Value("${mqtt.client.product.id}")
private String clientId; private String clientId;
@Value("${spring.mqtt.default-topic}") @Value("${mqtt.topic}")
private String defaultTopic; private String defaultTopic;
...@@ -49,7 +50,6 @@ public class MqttConfiguration { ...@@ -49,7 +50,6 @@ public class MqttConfiguration {
mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2); mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setConnectionTimeout(0);
return mqttConnectOptions; return mqttConnectOptions;
} }
......
package com.boot.bus.sqlsync.emqx;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.dto
* @ClassName: MqttProperties
* @Author: Jianqiang Gao
* @Description: MqttProperties
* @Date: 2021/3/23 15:54
* @Version: 1.0
*/
@ConfigurationProperties("spring.mqtt")
@Component
@Data
public class MqttProperties {
private String username;
private String password;
private String hostUrl;
private String clientId;
private String defaultTopic;
private String completionTimeout;
private Integer keepAlive;
}
\ No newline at end of file
package com.boot.bus.sqlsync.emqx;
import com.boot.bus.sqlsync.service.impl.SyncMqttMessageService;
import com.boot.bus.sqlsync.service.infc.MqttReceiveService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @author keyong
* @title: EquipmentIotMQConsumer
* <pre>
* @description: MQTT订阅模式消费类
* </pre>
* @date 2020/10/29 19:23
*/
@Configuration
@IntegrationComponentScan
public class MqttReceiveConfig {
@Value("${emqx.user-name}")
private String userName;
@Value("${emqx.password}")
private String password;
@Value("${emqx.broker}")
private String hostUrl;
@Value("${emqx.client-id}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout;
@Autowired
MqttReceiveService mqttReceiveService;
@Autowired
private SyncMqttMessageService syncMqttMessageService;
// 全局变量adapter
public MqttPahoMessageDrivenChannelAdapter adapter;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(userName);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(20);
mqttConnectOptions.setAutomaticReconnect(true);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttPahoClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
// 接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), defaultTopic.split(","));
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(0);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
syncMqttMessageService.syncData(topic, msg);
};
}
public void setMqttReceiveService(MqttReceiveService mqttReceiveService) {
this.mqttReceiveService = mqttReceiveService;
}
}
package com.boot.bus.sqlsync.emqx;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.emqtt
* @ClassName: MqttServer
* @Author: Jianqiang Gao
* @Description: MqttServer
* @Date: 2021/3/23 15:58
* @Version: 1.0
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttServer {
/**
* 通道发送消息
* @param topic 主题
* @param data 数据
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
}
\ No newline at end of file
package com.boot.bus.sqlsync.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.boot.bus.sqlsync.async.executor.Async;
import com.boot.bus.sqlsync.async.parallel.ParWorker;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
import com.boot.bus.sqlsync.service.infc.IContingencyPlanInstance;
import com.yeejoin.amos.fas.common.enums.DataSyncOperationEnum;
import com.yeejoin.amos.fas.common.enums.DataSyncTypeEnum;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import com.yeejoin.amos.fas.datasync.DataSyncMessage;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* <h1></h1>
*
* @Author SingleTian
* @Date 2021-04-02 09:22
*/
@Slf4j
@Component
public class SyncMqttMessageListener implements IMqttMessageListener {
final Deque<String> redisKeysQueue1 = new LinkedBlockingDeque<>();
final Deque<String> redisKeysQueue2 = new LinkedBlockingDeque<>();
final AtomicBoolean flag = new AtomicBoolean(true);
@Autowired
private IContingencyPlanInstance contingencyPlanInstance;
@Autowired
private RedisTemplate redisTemplate;
private Queue<String> getUsedQueue() {
return flag.get() ? redisKeysQueue1 : redisKeysQueue2;
}
private Queue<String> getUnusedQueue() {
return flag.get() ? redisKeysQueue2 : redisKeysQueue1;
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
DataSyncMessage message = null;
try {
message = DataSyncMessage.bytes2Message(mqttMessage.getPayload());
System.out.println(JSON.toJSONString(message));
log.info("topic:{},message:{}", topic, message);
ParWorker w = new ParWorker();
WorkerWrapper<DataSyncMessage, String> workerWrapper = new WorkerWrapper.Builder<DataSyncMessage, String>()
.worker(w)
.callback(w)
.param(message)
.build();
Async.beginWork(200, workerWrapper);
// syncData(message);
} catch (Exception e) {
log.error("同步mqtt推送数据出错:topic:{},message:{},错误信息:{}", topic, message, e.toString());
} finally {
assert message != null : "";
// 待删除的redis的key值放入队列
getUsedQueue().offer(message.redisKey());
}
}
/**
* 尝试删除redis中mqtt已经发送成功的同步数据
* 每10秒轮循一次
*/
@Scheduled(fixedRate = 10 * 1000)
synchronized private void cleanRedisKeysFinished() {
Queue<String> usedQueue = getUsedQueue();
Queue<String> unusedQueue = getUnusedQueue();
flag.set(!flag.get());
while (!usedQueue.isEmpty()) {
String key = usedQueue.poll();
if (!redisTemplate.delete(key)) {
unusedQueue.offer(key);
}
}
}
/**
* 处理redis中存在但mqtt未接收到的同步数据
* 每小时执行一次,整点触发
*/
@Scheduled(cron = "0 0 * * * ?")
synchronized private void syncByRedis() {
Set<String> keys = new HashSet<>();
for (DataSyncTypeEnum typeEnum : DataSyncTypeEnum.values()) {
keys.addAll(redisTemplate.keys(typeEnum.toString()));
}
if (keys.isEmpty()) {
return;
}
// 为减少redis
new Timer().schedule(new TimerTask() {
@Override
public void run() {
Queue<String> usedQueue = new LinkedBlockingDeque<>(getUsedQueue());
keys.forEach(key -> {
if (!usedQueue.contains(key)) {
String messageStr = String.valueOf(redisTemplate.opsForValue().get(key));
DataSyncMessage message = null;
try {
message = JSON.parseObject(messageStr, DataSyncMessage.class);
syncData(message);
} catch (Exception e) {
log.error("同步redis推送数据出错:key:{},message:{},错误信息:{}", key, message, e.getMessage());
} finally {
getUsedQueue().offer(message.redisKey());
}
}
});
}
}, 1000 * 30);
}
/**
* @param message
*/
private void syncData(DataSyncMessage message) {
DataSyncTypeEnum type = message.getType();
DataSyncOperationEnum operation = message.getOperation();
List<Serializable> data = message.getData();
assert message.getType() != null && message.getOperation() != null : "同步消息体信息不足";
if (data == null || data.isEmpty()) {
return;
}
switch (type) {
case CONTINGENCY_PLAN_INSTANCE: {
switch (operation) {
case DELETE: {
contingencyPlanInstance.astDeleteByIds(data.stream().map(x -> ((JSONObject) x).toJavaObject(ContingencyPlanInstance.class).getId()).collect(Collectors.toList()));
break;
}
default: {
contingencyPlanInstance.astSaveOrUpdateBatch(data.stream().map(x -> ((JSONObject) x).toJavaObject(ContingencyPlanInstance.class)).collect(Collectors.toList()));
}
}
break;
}
default:
}
}
}
package com.boot.bus.sqlsync.service.impl;
import com.boot.bus.sqlsync.service.infc.MqttReceiveService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author keyong
* @title: MqttReceiveServiceImpl
*
* <pre>
* &#64;description: 增量数据处理
* </pre>
* @date 2020/11/3 13:39
*/
@Slf4j
@Service
public class MqttReceiveServiceImpl implements MqttReceiveService {
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIncrementMessage(String topic, String message) {
log.info(String.format("收到mqtt消息:%s", message));
}
}
...@@ -148,4 +148,7 @@ public class SyncMqttMessageService { ...@@ -148,4 +148,7 @@ public class SyncMqttMessageService {
// Async.shutDown(); // Async.shutDown();
} }
} }
public void syncData(String topic, String msg) {
}
} }
\ No newline at end of file
package com.boot.bus.sqlsync.service.infc;
/**
* @author keyong
* @title: MqttReceiveService
* <pre>
* @description: TODO
* </pre>
* @date 2020/11/2 16:59
*/
public interface MqttReceiveService {
/**
* 增量数据处理
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttIncrementMessage(String topic, String message);
}
package com.boot.bus.sqlsync; package com.boot.bus.sqlsync;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
...@@ -7,7 +8,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; ...@@ -7,7 +8,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
...@@ -28,8 +28,8 @@ import java.net.UnknownHostException; ...@@ -28,8 +28,8 @@ import java.net.UnknownHostException;
@EnableAsync @EnableAsync
@EnableScheduling @EnableScheduling
@EnableEurekaClient @EnableEurekaClient
//@ComponentScan(value = {"com.boot.bus.sqlsync", "com.boot.bus.sqlsync.emqx", "com.boot.bus.sqlsync.listener"}) @MapperScan({"com.boot.bus.sqlsync.mapper"})
@ComponentScan(value = {"com.boot.bus.sqlsync", "com.boot.bus.sqlsync.emqx", "com.boot.bus.sqlsync.listener"}) @ComponentScan(value = {"com.boot.bus"})
public class AmosSqlSyncApplication { public class AmosSqlSyncApplication {
private static final Logger logger = LoggerFactory.getLogger(AmosSqlSyncApplication.class); private static final Logger logger = LoggerFactory.getLogger(AmosSqlSyncApplication.class);
......
...@@ -12,18 +12,13 @@ spring.redis.port=6379 ...@@ -12,18 +12,13 @@ spring.redis.port=6379
spring.redis.password=1234560 spring.redis.password=1234560
spring.redis.timeout=0 spring.redis.timeout=0
## emqx properties ## emqx
#emqx.clean-session=true emqx.clean-session=true
#emqx.client-id=${spring.application.name}-${random.int[1024,65536]} emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
#emqx.broker=tcp://172.16.11.201:1883 emqx.broker=tcp://172.16.11.201:1883
#emqx.user-name=admin emqx.user-name=admin
#emqx.password=public emqx.password=public
#emqx.default-topic=mqtt_topic mqtt.scene.host=mqtt://172.16.11.201:8083/mqtt
mqtt.client.product.id=mqtt
spring.mqtt.client-id=${spring.application.name}-${random.int[1024,65536]} mqtt.topic=cs/v1/fireAST/dataSync,cs/v1/fireASF/dataSync
spring.mqtt.default-topic=mqtt_topic
spring.mqtt.host-url=tcp://172.16.11.201:1883
spring.mqtt.username=admin
spring.mqtt.password=public
spring.mqtt.completionTimeout=3000 spring.mqtt.completionTimeout=3000
\ No newline at end of file
spring.mqtt.keepAlive=60
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment