Commit acf7d011 authored by 刘林's avatar 刘林

fix(equip):江西电建对接IOT代码迁移至data-equip中

parent e96041b9
package com.yeejoin.equipmanage.eqmx;
import com.alibaba.fastjson.JSON;
import com.yeejoin.equipmanage.kafka.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote Emq消息转发Kafka
*/
@Slf4j
@Component
public class EmqMessageService extends EmqxListener {
@Autowired
protected EmqKeeper emqKeeper;
@Autowired
protected KafkaProducerService kafkaProducerService;
@Value("${emq.topic}")
private String emqTopic;
@Value("${kafka.topic}")
private String kafkaTopic;
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
@PostConstruct
void init() {
new Thread(task_runnable).start();
String[] split = emqTopic.split(",");
Arrays.stream(split).forEach(e-> {
try {
emqKeeper.subscript(e, 1, this);
} catch (Exception exception) {
log.info("订阅emq消息失败 ====> message: {}", exception.getMessage());
}
});
}
@Override
public void processMessage(String topic, MqttMessage message) throws Exception {
JSONObject result = JSONObject.fromObject(new String(message.getPayload()));
JSONObject messageResult = new JSONObject();
messageResult.put("result", result);
messageResult.put("topic", topic);
blockingQueue.add(messageResult);
}
Runnable task_runnable = new Runnable() {
public void run() {
int k = 0;
boolean b = true;
while (b) {
k++;
b = k < Integer.MAX_VALUE;
try {
JSONObject messageResult = blockingQueue.take();
JSONObject result = messageResult.getJSONObject("result");
if ((messageResult.getString("topic")).equals(emqTopic)) {
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
}
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}
}
};
}
package com.yeejoin.equipmanage.kafka;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.biz.common.utils.RedisKey;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equipmanage.common.entity.vo.EquipmentIndexVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 消费服务类
*/
@Slf4j
@Service
public class KafkaConsumerService {
@Autowired
private InfluxDbConnection influxDbConnection;
@Autowired
protected KafkaProducerService kafkaProducerService;
private Executor dataExecutor = new ThreadPoolTaskExecutor();
@Autowired
private RedisUtils redisUtils;
@Value("${kafka.alarm.topic}")
private String alarmTopic;
//iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_";
//装备更新最新消息存入influxdb前缀
private static final String INDICATORS = "indicators_";
//装备更新最新消息存入influxdb固定时间
private static final Long TIME = 1688558007051L;
@KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
if (CollectionUtils.isEmpty(consumerRecords)) {
return;
}
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
String message = (String) kafkaMessage.get();
this.handleMessage(message,equipmentIndexVOMap);
}
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次。data:{}", consumerRecords);
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
private void handleMessage(String message, Map<Object, Object> equipmentIndexVOMap) {
dataExecutor.execute(new Runnable() {
@Override
public void run() {
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}",
equipmentSpeIndex.getEquipmentIndexName(),indexAddress,value,gatewayId);
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key);
tagsMap.put("address", indexAddress);
tagsMap.put("gatewayId", gatewayId);
tagsMap.put("dataType", dataType);
tagsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
tagsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic,JSON.toJSONString(fieldsMap));
}
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
}
}
});
}
private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
@PostConstruct
public void iotAsyncExecutor() {
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
int length = Runtime.getRuntime().availableProcessors();
int size = Math.max(length, 80);
workExecutor.setCorePoolSize(size * 2);
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
// 设置最大线程数
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
//配置队列大小
workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程活跃时间(秒)
workExecutor.setKeepAliveSeconds(60);
// 设置默认线程名称
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
// 等待所有任务结束后再关闭线程池
//当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
workExecutor.initialize();
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
}
}
package com.yeejoin.equipmanage.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 生产服务类
*/
@Slf4j
@Service
public class KafkaProducerService {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Resource
private KafkaTemplate<String, String> kafkaTemplateWithTransaction;
/**
* 发送消息(同步)
* @param topic 主题
* @param key 键
* @param message 值
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
//可以指定最长等待时间,也可以不指定
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
}
/**
* 发送消息并获取结果
* @param topic
* @param message
* @throws ExecutionException
* @throws InterruptedException
*/
public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
}
/**
* 发送消息(异步)
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息(异步) failure! topic : {}, message: {}", topic, message);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("发送消息(异步) success! topic: {}, message: {}", topic, message);
}
});
}
}
package com.yeejoin.equipmanage.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 消费者配置类
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置并发量,小于或者等于 Topic 的分区数
factory.setConcurrency(5);
// 设置为批量监听
factory.setBatchListener(Boolean.TRUE);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
// 自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//设置单次拉取的量,走公网访问时,该参数会有较大影响。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//每次Poll的最大数量。
//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写。
//属于同一个组的消费实例,会负载消费消息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return props;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment