Commit bf88f772 authored by 刘林's avatar 刘林

fix(equip):优化对接IOT代码,添加kafka消息队列

parent c014e982
......@@ -28,11 +28,11 @@
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.typroject</groupId>-->
<!-- <artifactId>tyboot-component-emq</artifactId>-->
<!-- <version>1.1.20</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-component-emq</artifactId>
<version>1.1.23</version>
</dependency>
<dependency>
<groupId>com.yeejoin</groupId>
......
//package com.yeejoin.equip.eqmx;
//
//import com.alibaba.fastjson.JSON;
//import com.yeejoin.equip.kafka.KafkaProducerService;
//import lombok.extern.slf4j.Slf4j;
//import net.sf.json.JSONObject;
//import org.eclipse.paho.client.mqttv3.MqttMessage;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.stereotype.Component;
//import org.typroject.tyboot.component.emq.EmqKeeper;
//import org.typroject.tyboot.component.emq.EmqxListener;
//import javax.annotation.PostConstruct;
//import java.util.Arrays;
//import java.util.concurrent.BlockingQueue;
//import java.util.concurrent.LinkedBlockingQueue;
//
///**
// * @author LiuLin
// * @date 2023/6/25
// * @apiNote Emq消息转发Kafka
// */
//@Slf4j
//@Component
//public class EmqMessageService extends EmqxListener {
//
// @Autowired
// protected EmqKeeper emqKeeper;
//
// @Autowired
// protected KafkaProducerService kafkaProducerService;
//
// @Value("${emq.topic}")
// private String emqTopic;
//
// @Value("${kafka.topic}")
// private String kafkaTopic;
//
// private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
//
// @PostConstruct
// void init() throws Exception {
// emqKeeper.subscript(emqTopic, 1, this);
// }
//
// @Override
// public void processMessage(String topic, MqttMessage message) throws Exception {
// JSONObject result = JSONObject.fromObject(new String(message.getPayload()));
// //JSONObject messageResult = new JSONObject();
// //messageResult.put("result", result);
// //messageResult.put("topic", topic);
// //blockingQueue.add(messageResult);
//
// if (topic.equals(emqTopic)) {
// kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
// }
// }
//
// //Runnable task_runnable = new Runnable() {
// // public void run() {
// // int k = 0;
// // boolean b = true;
// // while (b) {
// // k++;
// // b = k < Integer.MAX_VALUE;
// // try {
// // JSONObject messageResult = blockingQueue.take();
// // JSONObject result = messageResult.getJSONObject("result");
// // if ((messageResult.getString("topic")).equals(emqTopic)) {
// // kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
// // }
// // } catch (Exception e) {
// // Thread.currentThread().interrupt();
// // }
// // }
// // }
// //};
//}
package com.yeejoin.equip.eqmx;
import com.alibaba.fastjson.JSON;
import com.yeejoin.equip.kafka.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote Emq消息转发Kafka
*/
@Slf4j
@Component
public class EmqMessageService extends EmqxListener {
@Autowired
protected EmqKeeper emqKeeper;
@Autowired
protected KafkaProducerService kafkaProducerService;
@Value("${emq.topic}")
private String emqTopic;
@Value("${kafka.topic}")
private String kafkaTopic;
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
@PostConstruct
void init() {
new Thread(task_runnable).start();
String[] split = emqTopic.split(",");
Arrays.stream(split).forEach(e-> {
try {
emqKeeper.subscript(e, 1, this);
} catch (Exception exception) {
log.info("订阅emq消息失败 ====> message: {}", exception.getMessage());
}
});
}
@Override
public void processMessage(String topic, MqttMessage message) throws Exception {
JSONObject result = JSONObject.fromObject(new String(message.getPayload()));
JSONObject messageResult = new JSONObject();
messageResult.put("result", result);
messageResult.put("topic", topic);
blockingQueue.add(messageResult);
}
Runnable task_runnable = new Runnable() {
public void run() {
int k = 0;
boolean b = true;
while (b) {
k++;
b = k < Integer.MAX_VALUE;
try {
JSONObject messageResult = blockingQueue.take();
JSONObject result = messageResult.getJSONObject("result");
if ((messageResult.getString("topic")).equals(emqTopic)) {
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
}
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}
}
};
}
package com.yeejoin.equip.mqtt;
import com.yeejoin.equip.config.MqttPropertyConfig;
import com.yeejoin.equip.entity.IndicatorData;
import com.yeejoin.equip.mqtt.message.MqttTopicEnum;
import com.yeejoin.equip.utils.ExecutorFactory;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.annotation.Resource;
import java.util.Objects;
import static com.yeejoin.equip.mqtt.message.MqttConstant.*;
/**
* 消息处理器
*
* @author LiuLin
* @date 2023年08月18日 10:56
*/
@Configuration
public class MessageIntegration {
@Resource
private MqttPropertyConfig mqttPropertyConfig;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{mqttPropertyConfig.getBroker()});
options.setUserName(mqttPropertyConfig.getClientUserName());
options.setPassword(mqttPropertyConfig.getClientPassword().toCharArray());
options.setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
// 设置心跳:1.5*20秒
options.setKeepAliveInterval(mqttPropertyConfig.getKeepAliveInterval());
// 设置最大并发数
options.setMaxInflight(mqttPropertyConfig.getMaxInflight());
options.setAutomaticReconnect(true);
//options.setCleanSession(false);
return options;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions());
return factory;
}
@Bean
public MessageProducerSupport bizInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttPropertyConfig.getBizClientId(),
mqttClientFactory(),
mqttPropertyConfig.getBizTopic()
);
adapter.setCompletionTimeout(DEFAULT_COMPLETION_TIMEOUT);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(QOS_DEFAULT);
return adapter;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttPropertyConfig.getClientId(),
mqttClientFactory()
);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(QOS_DEFAULT);
return messageHandler;
}
@Bean
public IntegrationFlow bizMsgFlow() {
return IntegrationFlows
.from(bizInbound())
.channel(channels -> channels.executor(ExecutorFactory.buildBizExecutor()))
.handle(MessageTransfer::mqttMessage2RawMessage)
//根据Topic后缀进行分流
.<IndicatorData, MqttTopicEnum>route(IndicatorData::getMqttTopicEnum,
mapping -> mapping
.subFlowMapping(MqttTopicEnum.perspective, flow -> flow
.handle("handleMessageService", "processMessage")
.filter(Objects::nonNull)
.handle(mqttOutbound()))
.defaultOutputToParentFlow())
.get();
}
}
//package com.yeejoin.equip.mqtt;
//
//import com.yeejoin.equip.config.MqttPropertyConfig;
//import com.yeejoin.equip.entity.IndicatorData;
//import com.yeejoin.equip.mqtt.message.MqttTopicEnum;
//import com.yeejoin.equip.utils.ExecutorFactory;
//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.integration.annotation.ServiceActivator;
//import org.springframework.integration.channel.DirectChannel;
//import org.springframework.integration.dsl.IntegrationFlow;
//import org.springframework.integration.dsl.IntegrationFlows;
//import org.springframework.integration.endpoint.MessageProducerSupport;
//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
//import org.springframework.messaging.MessageChannel;
//import org.springframework.messaging.MessageHandler;
//import javax.annotation.Resource;
//import java.util.Objects;
//import static com.yeejoin.equip.mqtt.message.MqttConstant.*;
//
///**
// * 消息处理器
// *
// * @author LiuLin
// * @date 2023年08月18日 10:56
// */
//@Configuration
//public class MessageIntegration {
//
// @Resource
// private MqttPropertyConfig mqttPropertyConfig;
// @Bean
// public MqttConnectOptions mqttConnectOptions() {
// MqttConnectOptions options = new MqttConnectOptions();
// options.setServerURIs(new String[]{mqttPropertyConfig.getBroker()});
// options.setUserName(mqttPropertyConfig.getClientUserName());
// options.setPassword(mqttPropertyConfig.getClientPassword().toCharArray());
// options.setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
// // 设置心跳:1.5*20秒
// options.setKeepAliveInterval(mqttPropertyConfig.getKeepAliveInterval());
// // 设置最大并发数
// options.setMaxInflight(mqttPropertyConfig.getMaxInflight());
// options.setAutomaticReconnect(true);
// //options.setCleanSession(false);
// return options;
// }
//
// @Bean
// public MqttPahoClientFactory mqttClientFactory() {
// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// factory.setConnectionOptions(mqttConnectOptions());
// return factory;
// }
//
// @Bean
// public MessageProducerSupport bizInbound() {
// MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
// mqttPropertyConfig.getBizClientId(),
// mqttClientFactory(),
// mqttPropertyConfig.getBizTopic()
// );
// adapter.setCompletionTimeout(DEFAULT_COMPLETION_TIMEOUT);
// adapter.setConverter(new DefaultPahoMessageConverter());
// adapter.setQos(QOS_DEFAULT);
// return adapter;
// }
//
// @Bean
// public MessageChannel mqttOutboundChannel() {
// return new DirectChannel();
// }
//
// @Bean
// @ServiceActivator(inputChannel = "mqttOutboundChannel")
// public MessageHandler mqttOutbound() {
// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
// mqttPropertyConfig.getClientId(),
// mqttClientFactory()
// );
// messageHandler.setAsync(true);
// messageHandler.setDefaultQos(QOS_DEFAULT);
// return messageHandler;
// }
//
// @Bean
// public IntegrationFlow bizMsgFlow() {
// return IntegrationFlows
// .from(bizInbound())
// .channel(channels -> channels.executor(ExecutorFactory.buildBizExecutor()))
// .handle(MessageTransfer::mqttMessage2RawMessage)
// //根据Topic后缀进行分流
// .<IndicatorData, MqttTopicEnum>route(IndicatorData::getMqttTopicEnum,
// mapping -> mapping
// .subFlowMapping(MqttTopicEnum.perspective, flow -> flow
// .handle("handleMessageService", "processMessage")
// .filter(Objects::nonNull)
// .handle(mqttOutbound()))
// .defaultOutputToParentFlow())
// .get();
// }
//}
package com.yeejoin.equip.mqtt;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.entity.IndicatorData;
import com.yeejoin.equip.mqtt.message.MqttTopicEnum;
import com.yeejoin.equip.utils.RedisUtils;
import com.yeejoin.equip.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import static com.yeejoin.equip.mqtt.message.MqttConstant.*;
/**
* @author LiuLin
* @date 2023年07月13日 09:58
*/
@Slf4j
@Component
public class MessageTransfer {
/**
* 转为原生数据,payload为字节数组
**/
public static IndicatorData mqttMessage2RawMessage(String payload, Map<String, Object> headers) {
//log.info("received raw message, header >>> {}, payload >>> {}", headers, JSONObject.toJSONString(payload));
RedisUtils redisUtils = (RedisUtils) SpringUtils.getBean("redisUtils");
IndicatorData indicatorData = JSON.parseObject(payload, IndicatorData.class);
if(!indicatorData.getSignalType().equals("transformation")){
log.info("received TotalSummon message:address{},gateway:{},time:{},signalType:{}", indicatorData.getAddress(),indicatorData.getGatewayId(),
new Date(),indicatorData.getSignalType());
}
//log.info("received raw message, header >>> {}, payload >>> {}", headers, JSONObject.toJSONString(payload));
try {
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
String[] topicItems = topic.split(TOPIC_SPLITTER);
indicatorData.setMqttTopicEnum(MqttTopicEnum.of(topicItems[topicItems.length - 1]));
String key = indicatorData.getAddress() + "_" + indicatorData.getGatewayId();
if (redisUtils.hasKey(key)) {
EquipmentIndexVO equipmentSpeIndex = JSONObject.parseObject(redisUtils.get(key),EquipmentIndexVO.class) ;
String valueLabel = valueTranslate(indicatorData.getValue(), equipmentSpeIndex.getValueEnum());
indicatorData.setIsAlarm(String.valueOf(equipmentSpeIndex.getIsAlarm()));
indicatorData.setEquipmentIndexName(equipmentSpeIndex.getEquipmentIndexName());
indicatorData.setEquipmentSpecificName(equipmentSpeIndex.getEquipmentSpecificName());
indicatorData.setUnit(equipmentSpeIndex.getUnitName());
indicatorData.setEquipmentsIdx(key);
indicatorData.setValueLabel(valueLabel.isEmpty() ? indicatorData.getValue() : valueLabel);
if (!Arrays.asList(TRUE, FALSE).contains(indicatorData.getValue())) {
indicatorData.setValueF(Float.parseFloat(indicatorData.getValue()));
}
}else {
return null;
}
} catch (Exception e) {
log.error("mqttMessage2RawMessage解析消息数据异常", e);
}
return indicatorData;
}
private static String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
log.error("告警枚举转换异常" + e.getMessage(), e);
}
return "";
}
}
//package com.yeejoin.equip.mqtt;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONArray;
//import com.alibaba.fastjson.JSONObject;
//import com.yeejoin.equip.entity.EquipmentIndexVO;
//import com.yeejoin.equip.entity.IndicatorData;
//import com.yeejoin.equip.mqtt.message.MqttTopicEnum;
//import com.yeejoin.equip.utils.RedisUtils;
//import com.yeejoin.equip.utils.SpringUtils;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.commons.lang3.ObjectUtils;
//import org.springframework.integration.mqtt.support.MqttHeaders;
//import org.springframework.stereotype.Component;
//import java.util.Arrays;
//import java.util.Date;
//import java.util.Map;
//import static com.yeejoin.equip.mqtt.message.MqttConstant.*;
//
///**
// * @author LiuLin
// * @date 2023年07月13日 09:58
// */
//@Slf4j
//@Component
//public class MessageTransfer {
//
// /**
// * 转为原生数据,payload为字节数组
// **/
// public static IndicatorData mqttMessage2RawMessage(String payload, Map<String, Object> headers) {
// //log.info("received raw message, header >>> {}, payload >>> {}", headers, JSONObject.toJSONString(payload));
// RedisUtils redisUtils = (RedisUtils) SpringUtils.getBean("redisUtils");
// IndicatorData indicatorData = JSON.parseObject(payload, IndicatorData.class);
// if(!indicatorData.getSignalType().equals("transformation")){
// log.info("received TotalSummon message:address{},gateway:{},time:{},signalType:{}", indicatorData.getAddress(),indicatorData.getGatewayId(),
// new Date(),indicatorData.getSignalType());
// }
// //log.info("received raw message, header >>> {}, payload >>> {}", headers, JSONObject.toJSONString(payload));
// try {
// String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
// String[] topicItems = topic.split(TOPIC_SPLITTER);
// indicatorData.setMqttTopicEnum(MqttTopicEnum.of(topicItems[topicItems.length - 1]));
// String key = indicatorData.getAddress() + "_" + indicatorData.getGatewayId();
// if (redisUtils.hasKey(key)) {
// EquipmentIndexVO equipmentSpeIndex = JSONObject.parseObject(redisUtils.get(key),EquipmentIndexVO.class) ;
// String valueLabel = valueTranslate(indicatorData.getValue(), equipmentSpeIndex.getValueEnum());
// indicatorData.setIsAlarm(String.valueOf(equipmentSpeIndex.getIsAlarm()));
// indicatorData.setEquipmentIndexName(equipmentSpeIndex.getEquipmentIndexName());
// indicatorData.setEquipmentSpecificName(equipmentSpeIndex.getEquipmentSpecificName());
// indicatorData.setUnit(equipmentSpeIndex.getUnitName());
// indicatorData.setEquipmentsIdx(key);
// indicatorData.setValueLabel(valueLabel.isEmpty() ? indicatorData.getValue() : valueLabel);
// if (!Arrays.asList(TRUE, FALSE).contains(indicatorData.getValue())) {
// indicatorData.setValueF(Float.parseFloat(indicatorData.getValue()));
// }
// }else {
// return null;
// }
// } catch (Exception e) {
// log.error("mqttMessage2RawMessage解析消息数据异常", e);
// }
// return indicatorData;
// }
//
// private static String valueTranslate(String value, String enumStr) {
// if (ObjectUtils.isEmpty(enumStr)) {
// return "";
// }
// try {
// JSONArray jsonArray = JSONArray.parseArray(enumStr);
// for (int i = 0; i < jsonArray.size(); i++) {
// JSONObject jsonObject = jsonArray.getJSONObject(i);
// if (jsonObject.get("key").equals(value)) {
// return jsonObject.getString("label");
// }
// }
// } catch (Exception e) {
// log.error("告警枚举转换异常" + e.getMessage(), e);
// }
// return "";
// }
//}
package com.yeejoin.equip.service;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equip.entity.IndicatorData;
import com.yeejoin.equip.kafka.KafkaProducerService;
import com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper;
import com.yeejoin.equip.utils.ElasticSearchUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote Emq消息转发Kafka
*/
@Slf4j
@Component("handleMessageService")
public class HandleMessageService {
private static final String MEASUREMENT = "iot_data_";
private static final String TOTAL_DATA_ = "total_data_";
private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
@Autowired
protected KafkaProducerService kafkaProducerService;
@Autowired
private InfluxDbConnection influxDbConnection;
@Autowired
private IndicatorDataMapper indicatorDataMapper;
@Value("${kafka.alarm.topic}")
private String alarmTopic;
@Autowired
private ElasticSearchUtil elasticSearchUtil;
public void processMessage(IndicatorData indicatorData) {
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", indicatorData.getEquipmentsIdx());
fieldsMap.put("address", indicatorData.getAddress());
fieldsMap.put("gatewayId", indicatorData.getGatewayId());
fieldsMap.put("dataType", indicatorData.getDataType());
fieldsMap.put("isAlarm", indicatorData.getIsAlarm());
fieldsMap.put("equipmentSpecificName", indicatorData.getEquipmentSpecificName());
fieldsMap.put("value", indicatorData.getValue());
fieldsMap.put("valueLabel", indicatorData.getValueLabel());
fieldsMap.put("equipmentIndexName", indicatorData.getEquipmentIndexName());
fieldsMap.put("unit", indicatorData.getUnit());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
//更新数据入ES库
Map<String, Object> paramJson = new HashMap<>();
paramJson.put("valueF", indicatorData.getValueF());
paramJson.put("value", indicatorData.getValue());
paramJson.put("valueLabel", indicatorData.getValueLabel());
paramJson.put("createdTime", new Date());
paramJson.put("unit", indicatorData.getUnit());
elasticSearchUtil.updateData(ES_INDEX_NAME_JX, indicatorData.getEquipmentsIdx(), JSON.toJSONString(paramJson));
if (indicatorData.getIsAlarm() != null && "1".equals(indicatorData.getIsAlarm())) {
fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
}
if ("transformation".equals(indicatorData.getSignalType())) {
influxDbConnection.insert(MEASUREMENT + indicatorData.getGatewayId(), tagsMap, fieldsMap);
indicatorDataMapper.insert(indicatorData);
log.info("TDEngine入库成功,{},value:{}",indicatorData.getEquipmentsIdx(),indicatorData.getValue());
}else{
influxDbConnection.insert(TOTAL_DATA_ + indicatorData.getGatewayId(), tagsMap, fieldsMap);
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
}
}
}
//package com.yeejoin.equip.service;
//
//import com.alibaba.fastjson.JSON;
//import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
//import com.yeejoin.equip.entity.IndicatorData;
//import com.yeejoin.equip.kafka.KafkaProducerService;
//import com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper;
//import com.yeejoin.equip.utils.ElasticSearchUtil;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.stereotype.Component;
//import java.text.SimpleDateFormat;
//import java.util.Date;
//import java.util.HashMap;
//import java.util.Map;
///**
// * @author LiuLin
// * @date 2023/6/25
// * @apiNote Emq消息转发Kafka
// */
//@Slf4j
//@Component("handleMessageService")
//public class HandleMessageService {
// private static final String MEASUREMENT = "iot_data_";
// private static final String TOTAL_DATA_ = "total_data_";
// private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
// @Autowired
// protected KafkaProducerService kafkaProducerService;
// @Autowired
// private InfluxDbConnection influxDbConnection;
// @Autowired
// private IndicatorDataMapper indicatorDataMapper;
// @Value("${kafka.alarm.topic}")
// private String alarmTopic;
// @Autowired
// private ElasticSearchUtil elasticSearchUtil;
//
// public void processMessage(IndicatorData indicatorData) {
// try {
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Map<String, String> tagsMap = new HashMap<>();
// Map<String, Object> fieldsMap = new HashMap<>();
// tagsMap.put("equipmentsIdx", indicatorData.getEquipmentsIdx());
//
// fieldsMap.put("address", indicatorData.getAddress());
// fieldsMap.put("gatewayId", indicatorData.getGatewayId());
// fieldsMap.put("dataType", indicatorData.getDataType());
// fieldsMap.put("isAlarm", indicatorData.getIsAlarm());
// fieldsMap.put("equipmentSpecificName", indicatorData.getEquipmentSpecificName());
// fieldsMap.put("value", indicatorData.getValue());
// fieldsMap.put("valueLabel", indicatorData.getValueLabel());
// fieldsMap.put("equipmentIndexName", indicatorData.getEquipmentIndexName());
// fieldsMap.put("unit", indicatorData.getUnit());
// fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
//
// //更新数据入ES库
// Map<String, Object> paramJson = new HashMap<>();
// paramJson.put("valueF", indicatorData.getValueF());
// paramJson.put("value", indicatorData.getValue());
// paramJson.put("valueLabel", indicatorData.getValueLabel());
// paramJson.put("createdTime", new Date());
// paramJson.put("unit", indicatorData.getUnit());
// elasticSearchUtil.updateData(ES_INDEX_NAME_JX, indicatorData.getEquipmentsIdx(), JSON.toJSONString(paramJson));
//
// if (indicatorData.getIsAlarm() != null && "1".equals(indicatorData.getIsAlarm())) {
// fieldsMap.putAll(tagsMap);
// kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
// }
//
// if ("transformation".equals(indicatorData.getSignalType())) {
// influxDbConnection.insert(MEASUREMENT + indicatorData.getGatewayId(), tagsMap, fieldsMap);
// indicatorDataMapper.insert(indicatorData);
// log.info("TDEngine入库成功,{},value:{}",indicatorData.getEquipmentsIdx(),indicatorData.getValue());
// }else{
// influxDbConnection.insert(TOTAL_DATA_ + indicatorData.getGatewayId(), tagsMap, fieldsMap);
// }
//
// } catch (Exception e) {
// log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
// }
// }
//}
......@@ -82,6 +82,8 @@ spring.kafka.consumer.max-poll-records=1000
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch
kafka.alarm.topic=EQUIPMENT_ALARM
kafka.topic=PERSPECTIVE
emq.topic=iot/data/perspective
elasticsearch.address= 139.9.173.44:9200
elasticsearch.username= elastic
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment