Commit 35b81f8b authored by wujiang's avatar wujiang

提交代码

parent df36b923
package com.yeejoin.equip.eqmx;
import com.yeejoin.equip.kafka.KafkaProducerService;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
@Data@Accessors(chain = true)
public class ConnectionConfigDTO implements Serializable {
private static final long serialVersionUID = 1L;
private String id;
private String name;
private String communicationType;
private String url;
private String clientId;
private String username;
private String password;
private String topics;
private Long interval;
/**
* 请求方式 http
*/
private String requestMethod;
/**
* 请求头 http
*/
private String requestHeader;
/**
* protocols websocket
*/
private String protocols;
private String msgType;
private KafkaProducerService kafkaProducerService;
private String kafkaTopic;
}
package com.yeejoin.equip.eqmx;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.yeejoin.equip.kafka.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
public class ConnectionManager {
private final Map<String, Connector> connectors = new ConcurrentHashMap<>();
@Value("${mqtt.data.type:normal}")
private String msgType;
@Value("${kafka.topic}")
private String kafkaTopic;
@Autowired
private KafkaProducerService kafkaProducerService;
@PostConstruct
public void initFromDb() {
List<ConnectionConfigDTO> connectionConfigDTOS = new ArrayList<>();
ConnectionConfigDTO xz = new ConnectionConfigDTO();
xz.setId("1");
xz.setCommunicationType("mqtt");
xz.setClientId("xiazao");
xz.setTopics("/farm/xiazao/yitihua");
xz.setUrl("tcp://kafka-external.ruioutech.com:1883");
xz.setUsername("test");
xz.setPassword("test_Zxnk@2025");
xz.setKafkaTopic(kafkaTopic);
xz.setKafkaProducerService(kafkaProducerService);
connectionConfigDTOS.add(xz);
ConnectionConfigDTO th = new ConnectionConfigDTO();
th.setId("2");
th.setCommunicationType("mqtt");
th.setClientId("taihe");
th.setTopics("/solar/taihe/yitihua");
th.setUrl("tcp://kafka-external.ruioutech.com:1883");
th.setUsername("test");
th.setPassword("test_Zxnk@2025");
th.setKafkaTopic(kafkaTopic);
th.setKafkaProducerService(kafkaProducerService);
connectionConfigDTOS.add(th);
connectionConfigDTOS.forEach(cfg -> {
try {
cfg.setMsgType(msgType);
addConnector(cfg);
} catch (Exception e) {
log.error("初始化连接失败: {}", JSONObject.toJSONString(cfg), e);
}
});
}
public void addConnector(ConnectionConfigDTO configDTO) {
try {
if (connectors.containsKey(configDTO.getId())) throw new IllegalArgumentException("ID 已存在: " + configDTO.getId());
Connector conn = new MqttConnector(configDTO);
// Connector conn = switch (configDTO.getCommunicationType().toLowerCase()) {
// case "mqtt" -> new MqttConnector(configDTO);
// default -> throw new IllegalArgumentException("未知类型: " + configDTO.getCommunicationType());
// };
conn.connect();
connectors.put(configDTO.getId(), conn);
log.info("{} client {} connect success", configDTO.getCommunicationType().toUpperCase(), configDTO.getName());
} catch (Exception e){
log.error("初始化连接失败: {}", JSONObject.toJSONString(configDTO), e);
}
}
public void modifyConnector(ConnectionConfigDTO configDTO) {
try {
removeConnector(configDTO.getId());
addConnector(configDTO);
} catch (Exception e){
log.error("修改连接失败: {}", JSONObject.toJSONString(configDTO), e);
}
}
public void removeConnector(String id) throws Exception {
Connector conn = connectors.remove(id);
if (conn != null) {
conn.disconnect();
}
}
public void removeConnector(List<String> ids){
if(ids!=null&&ids.isEmpty())
{
ids.forEach(id -> {
try {
removeConnector(id);
} catch (Exception e) {
log.error("删除连接失败: {}", id, e);
}
});
}
}
public Map<String, Connector> getConnectors() {
return connectors;
}
public Connector getConnector(String id) {
return connectors.get(id);
}
public Map<String, Connector> listConnectors() {
return connectors;
}
@PreDestroy
public void shutdown() {
connectors.values().forEach(c -> {
try {
c.disconnect();
} catch (Exception ignored) {
}
});
}
}
package com.yeejoin.equip.eqmx;
public interface Connector {
void connect() throws Exception;
void disconnect() throws Exception;
String getId();
boolean isConnected();
boolean checkAlive();
}
package com.yeejoin.equip.eqmx; //package com.yeejoin.equip.eqmx;
//
import com.alibaba.fastjson.JSON; //import com.alibaba.fastjson.JSON;
import com.yeejoin.equip.kafka.KafkaProducerService; //import com.yeejoin.equip.kafka.KafkaProducerService;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject; //import net.sf.json.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttMessage; //import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; //import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper; //import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener; //import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct; //import javax.annotation.PostConstruct;
import java.util.concurrent.*; //import java.util.concurrent.*;
//
/** ///**
* @author LiuLin // * @author LiuLin
* @date 2023/6/25 // * @date 2023/6/25
* @apiNote Emq消息转发Kafka // * @apiNote Emq消息转发Kafka
*/ // */
@Slf4j //@Slf4j
@Component //@Component
public class EmqMessageService extends EmqxListener { //public class EmqMessageService extends EmqxListener {
//
@Autowired // @Autowired
protected EmqKeeper emqKeeper; // protected EmqKeeper emqKeeper;
//
@Autowired // @Autowired
protected KafkaProducerService kafkaProducerService; // protected KafkaProducerService kafkaProducerService;
//
@Value("${emq.topic}") // @Value("${emq.topic}")
private String emqTopic; // private String emqTopic;
//
@Value("${kafka.topic}") // @Value("${kafka.topic}")
private String kafkaTopic; // private String kafkaTopic;
ExecutorService service = Executors.newFixedThreadPool(10); // ExecutorService service = Executors.newFixedThreadPool(10);
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>(); // private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
//
@PostConstruct // @PostConstruct
void init() throws Exception { // void init() throws Exception {
service.execute(new NumberThread()); // service.execute(new NumberThread());
emqKeeper.subscript(emqTopic, 0, this); // emqKeeper.subscript(emqTopic, 0, this);
} // // emqKeeper.subscript(emqTopic, 0, this);
// }
@Override //
public void processMessage(String topic, MqttMessage message) throws Exception { // @Override
JSONObject result = JSONObject.fromObject(new String(message.getPayload())); // public void processMessage(String topic, MqttMessage message) throws Exception {
JSONObject messageResult = new JSONObject(); // JSONObject result = JSONObject.fromObject(new String(message.getPayload()));
messageResult.put("result", result); // JSONObject messageResult = new JSONObject();
messageResult.put("topic", topic); // messageResult.put("result", result);
blockingQueue.add(messageResult); // messageResult.put("topic", topic);
} // blockingQueue.add(messageResult);
class NumberThread implements Runnable { // }
@Override // class NumberThread implements Runnable {
public void run() { // @Override
while (true) { // public void run() {
try { // while (true) {
JSONObject messageResult = blockingQueue.take(); // try {
JSONObject result = messageResult.getJSONObject("result"); // JSONObject messageResult = blockingQueue.take();
if ((messageResult.getString("topic")).equals(emqTopic)) { // JSONObject result = messageResult.getJSONObject("result");
String dataType = result.getString("dataType"); // if ((messageResult.getString("topic")).equals(emqTopic)) {
String address = result.getString("address"); // String dataType = result.getString("dataType");
String gatewayId = result.getString("gatewayId"); // String address = result.getString("address");
String value = result.getString("value"); // String gatewayId = result.getString("gatewayId");
String signalType = result.getString("signalType"); // String value = result.getString("value");
log.info("===========接收IOT订阅消息,address:{},gatewayId:{},dateType:{},value:{},signalType:{}", address,gatewayId,dataType,value,signalType); // String signalType = result.getString("signalType");
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result)); // log.info("===========接收MQTT订阅消息,address:{},gatewayId:{},dateType:{},value:{},signalType:{}", address,gatewayId,dataType,value,signalType);
} // //log.info("===========接收IOT订阅消息,address:{},gatewayId:{},dateType:{},value:{},signalType:{}", address,gatewayId,dataType,value,signalType);
} catch (Exception e) { // kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
Thread.currentThread().interrupt(); // }
} // } catch (Exception e) {
} // Thread.currentThread().interrupt();
} // }
} // }
} // }
// }
//}
package com.yeejoin.equip.eqmx;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.equip.kafka.KafkaProducerService;
import com.yeejoin.equip.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class MqttConnector implements Connector, MqttCallback {
// 定时重连线程池
private static final ScheduledExecutorService reconnectScheduler = Executors.newScheduledThreadPool(4);
// 初始重连延迟时间(秒)
private static final int INITIAL_RECONNECT_DELAY = 5;
// 最大重连延迟时间(秒)
private static final int MAX_RECONNECT_DELAY = 60;
// 最大重连尝试次数
private static final int MAX_RECONNECT_ATTEMPTS = 10;
private MqttClient mqttClient;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final ConnectionConfigDTO connectionConfig;
// 记录当前重连尝试次数
private final AtomicInteger reconnectAttempts = new AtomicInteger(0);
// 用于管理重连任务
private ScheduledFuture<?> reconnectFuture;
public MqttConnector(ConnectionConfigDTO connectionConfig) {
this.connectionConfig = connectionConfig;
}
@Override
public void connect() throws MqttException {
if (connectionConfig==null) {
log.info("mqtt config is null or not enabled");
return;
}
log.info("mqtt config: {}", connectionConfig);
MqttConnectOptions options = buildConnectOptions();
mqttClient = new MqttClient(connectionConfig.getUrl(), "equip" + connectionConfig.getClientId(), new MemoryPersistence());
mqttClient.setCallback(this);
mqttClient.connect(options);
mqttClient.setTimeToWait(5000);
mqttClient.subscribe(connectionConfig.getTopics());
connected.set(true);
// 连接成功,重置重连尝试次数
reconnectAttempts.set(0);
// 取消重连任务
cancelReconnectTask();
}
@Override
public void disconnect() throws MqttException {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
mqttClient.close();
}
connected.set(false);
// 断开连接时取消重连任务
cancelReconnectTask();
}
@Override
public void connectionLost(Throwable throwable) {
connected.set(false);
log.error("mqtt connection lost", throwable);
// 启动重连流程
scheduleReconnect();
}
@Override
public void messageArrived(String topic, MqttMessage message) {
// log.info("mqtt message: {}", message);
String address="";
String gatewayId="";
String value="";
if("/farm/xiazao/yitihua".equals(topic))
{
com.alibaba.fastjson.JSONObject msg = com.alibaba.fastjson.JSONObject.parseObject(message.toString());
String node = msg.getString("node");
if(node.contains("fan"))
{
gatewayId="1668801435891929089";
}else if(node.contains("station"))
{
gatewayId="1668801570352926721";
}else
{
return;
}
String values = msg.getString("values");
JSONObject vobj = JSONObject.parseObject(values,JSONObject.class);
for(String key:vobj.keySet())
{
JSONObject result = new JSONObject();
result.put("address",key);
result.put("value",vobj.get(key));
result.put("gatewayId",gatewayId);
//log.info("===========接收MQTT订阅消息,address:{},gatewayId:{},value:{}", address,gatewayId,value);
//log.info("===========接收IOT订阅消息,address:{},gatewayId:{},dateType:{},value:{},signalType:{}", address,gatewayId,dataType,value,signalType);
this.connectionConfig.getKafkaProducerService().sendMessageAsync(this.connectionConfig.getKafkaTopic(), com.alibaba.fastjson.JSONObject.toJSONString(result));
}
}
else if("/solar/taihe/yitihua".equals(topic))
{
com.alibaba.fastjson.JSONObject msg = com.alibaba.fastjson.JSONObject.parseObject(message.toString());
String node = msg.getString("node");
if(node.contains("solar"))
{
gatewayId="1669524885619085313";
}else if(node.contains("station"))
{
gatewayId="1669525017559306241";
}else
{
return;
}
String values = msg.getString("values");
JSONObject vobj = JSONObject.parseObject(values,JSONObject.class);
for(String key:vobj.keySet())
{
JSONObject result = new JSONObject();
result.put("address",key);
result.put("value",vobj.get(key));
result.put("gatewayId",gatewayId);
//log.info("===========接收MQTT订阅消息,address:{},gatewayId:{},value:{}", address,gatewayId,value);
//log.info("===========接收IOT订阅消息,address:{},gatewayId:{},dateType:{},value:{},signalType:{}", address,gatewayId,dataType,value,signalType);
this.connectionConfig.getKafkaProducerService().sendMessageAsync(this.connectionConfig.getKafkaTopic(), com.alibaba.fastjson.JSONObject.toJSONString(result));
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
log.info("delivery to {} success: {}", token.getTopics(), token.getMessage());
} catch (Exception e) {
log.error("delivery to {} failed: {}", token.getTopics(), e.getMessage());
}
}
@Override
public String getId() {
return this.connectionConfig.getId();
}
@Override
public boolean isConnected() {
return connected.get();
}
@Override
public boolean checkAlive() {
// 主动检查MQTT连接状态
boolean isAlive = mqttClient != null && mqttClient.isConnected();
// 更新连接状态缓存
connected.set(isAlive);
return isAlive;
}
private MqttConnectOptions buildConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(false);
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setUserName(this.connectionConfig.getUsername());
// String password = "";
// try {
// password = SymmetricEncryptionUtil.decrypt(this.connectionConfig.getPassword());
// } catch (Exception e) {
// log.error("decrypt mqtt password error,pw={}", this.connectionConfig.getPassword(), e);
// }
options.setPassword(this.connectionConfig.getPassword().toCharArray());
return options;
}
private void scheduleReconnect() {
// 若已有未完成的重连任务,不再重复创建
if (reconnectFuture != null && !reconnectFuture.isDone()) {
return;
}
// 若达到最大重连尝试次数,停止重连
if (reconnectAttempts.get() >= MAX_RECONNECT_ATTEMPTS) {
log.error("Reached maximum reconnect attempts ({}) for MQTT client [{}]. Stopping reconnection.",
MAX_RECONNECT_ATTEMPTS, connectionConfig.getId());
return;
}
int currentAttempt = reconnectAttempts.incrementAndGet();
// 计算重连延迟时间,采用指数退避策略
int delay = Math.min(INITIAL_RECONNECT_DELAY * (int) Math.pow(2, currentAttempt - 1), MAX_RECONNECT_DELAY);
log.info("Scheduling MQTT client [{}] reconnect attempt {} in {} seconds", connectionConfig.getId(), currentAttempt, delay);
reconnectFuture = reconnectScheduler.schedule(() -> {
try {
if (!connected.get()) {
log.info("Trying to reconnect MQTT client [{}]...", connectionConfig.getId());
connect();
if (connected.get()) {
log.info("MQTT client [{}] reconnect success", connectionConfig.getId());
}
}
} catch (Exception e) {
log.error("MQTT client [{}] reconnect attempt {} failed",
connectionConfig.getId(), currentAttempt, e);
// 重连失败,继续尝试下一次重连
scheduleReconnect();
}
}, delay, TimeUnit.SECONDS);
}
private void cancelReconnectTask() {
if (reconnectFuture != null) {
reconnectFuture.cancel(true);
reconnectFuture = null;
}
}
}
\ No newline at end of file
...@@ -68,7 +68,7 @@ public class KafkaProducerService { ...@@ -68,7 +68,7 @@ public class KafkaProducerService {
String address = jsonObject.getString("address"); String address = jsonObject.getString("address");
String gatewayId = jsonObject.getString("gatewayId"); String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value"); String value = jsonObject.getString("value");
log.info("===========Kafka发送消息 success! address: {}, gatewayId: {},value:{}", address, gatewayId,value); //log.info("===========Kafka发送消息 success! address: {}, gatewayId: {},value:{}", address, gatewayId,value);
} }
}); });
} }
......
...@@ -80,7 +80,7 @@ public class JXDZExecute { ...@@ -80,7 +80,7 @@ public class JXDZExecute {
List<MockWindStationDataEntity> list1 =mockWindStationDataMapper.selectList( List<MockWindStationDataEntity> list1 =mockWindStationDataMapper.selectList(
new LambdaQueryWrapper<MockWindStationDataEntity>() new LambdaQueryWrapper<MockWindStationDataEntity>()
.eq(MockWindStationDataEntity::getDay,day) // .eq(MockWindStationDataEntity::getDay,day)
.eq(MockWindStationDataEntity::getHour,hour) .eq(MockWindStationDataEntity::getHour,hour)
); );
List<WindStationDataVO> windStationDataVOList = new ArrayList<>(); List<WindStationDataVO> windStationDataVOList = new ArrayList<>();
...@@ -92,7 +92,7 @@ public class JXDZExecute { ...@@ -92,7 +92,7 @@ public class JXDZExecute {
} }
List<MockPvStationDataEntity> list2 =mockPvStationDataMapper.selectList( List<MockPvStationDataEntity> list2 =mockPvStationDataMapper.selectList(
new LambdaQueryWrapper<MockPvStationDataEntity>() new LambdaQueryWrapper<MockPvStationDataEntity>()
.eq(MockPvStationDataEntity::getDay,day) // .eq(MockPvStationDataEntity::getDay,day)
.eq(MockPvStationDataEntity::getHour,hour) .eq(MockPvStationDataEntity::getHour,hour)
); );
List<PvStationDataVO> pvStationDataVOList = new ArrayList<>(); List<PvStationDataVO> pvStationDataVOList = new ArrayList<>();
...@@ -104,7 +104,7 @@ public class JXDZExecute { ...@@ -104,7 +104,7 @@ public class JXDZExecute {
} }
List<MockWindDeviceDataEntity> list3 =mockWindDeviceDataMapper.selectList( List<MockWindDeviceDataEntity> list3 =mockWindDeviceDataMapper.selectList(
new LambdaQueryWrapper<MockWindDeviceDataEntity>() new LambdaQueryWrapper<MockWindDeviceDataEntity>()
.eq(MockWindDeviceDataEntity::getDay,day) // .eq(MockWindDeviceDataEntity::getDay,day)
.eq(MockWindDeviceDataEntity::getHour,hour) .eq(MockWindDeviceDataEntity::getHour,hour)
); );
List<WindDeviceDataVO> windDeviceDataVOList = new ArrayList<>(); List<WindDeviceDataVO> windDeviceDataVOList = new ArrayList<>();
...@@ -116,7 +116,7 @@ public class JXDZExecute { ...@@ -116,7 +116,7 @@ public class JXDZExecute {
} }
List<MockPvDeviceDataEntity> list4 =mockPvDeviceDataMapper.selectList( List<MockPvDeviceDataEntity> list4 =mockPvDeviceDataMapper.selectList(
new LambdaQueryWrapper<MockPvDeviceDataEntity>() new LambdaQueryWrapper<MockPvDeviceDataEntity>()
.eq(MockPvDeviceDataEntity::getDay,day) // .eq(MockPvDeviceDataEntity::getDay,day)
.eq(MockPvDeviceDataEntity::getHour,hour) .eq(MockPvDeviceDataEntity::getHour,hour)
); );
List<PvDeviceDataVO> pvDeviceDataVOList = new ArrayList<>(); List<PvDeviceDataVO> pvDeviceDataVOList = new ArrayList<>();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment