Commit 4fdb52b4 authored by 刘林's avatar 刘林

fix(equip):优化对接IOT代码,优化es批量查询

parent 750b645c
package com.yeejoin.equip.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Date;
/**
*
* @author LiuLin
* @date 2023年10月11日 09:31
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Book {
private String value;
private Float valueF;
private String valueLabel;
private String unit;
private Date createdTime;
}
package com.yeejoin.equip.entity;
import lombok.Getter;
/**
*
* @author LiuLin
* @date 2023年10月11日 09:31
*/
@Getter
public final class EsEntity<T> {
private String id;
private T data;
public EsEntity() {
}
public EsEntity(String id, T data) {
this.data = data;
this.id = id;
}
public void setId(String id) {
this.id = id;
}
public void setData(T data) {
this.data = data;
}
}
...@@ -63,7 +63,7 @@ public class EmqMessageService extends EmqxListener { ...@@ -63,7 +63,7 @@ public class EmqMessageService extends EmqxListener {
String gatewayId = result.getString("gatewayId"); String gatewayId = result.getString("gatewayId");
String value = result.getString("value"); String value = result.getString("value");
String signalType = result.getString("signalType"); String signalType = result.getString("signalType");
log.info("订阅emq消息 ====> address:{},gatewayId:{},dateType:{},value:{},signalType:{}", address,gatewayId,dataType,value,signalType); log.info("===========接收IOT订阅消息,address:{},gatewayId:{},dateType:{},value:{},signalType:{}", address,gatewayId,dataType,value,signalType);
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result)); kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -5,21 +5,25 @@ ...@@ -5,21 +5,25 @@
//import com.alibaba.fastjson.JSONObject; //import com.alibaba.fastjson.JSONObject;
//import com.yeejoin.amos.component.influxdb.InfluxDbConnection; //import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
//import com.yeejoin.equip.entity.EquipmentIndexVO; //import com.yeejoin.equip.entity.EquipmentIndexVO;
//import com.yeejoin.equip.entity.IndicatorData;
//import com.yeejoin.equip.eqmx.EmqMessageService;
//import com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper;
//import com.yeejoin.equip.utils.ElasticSearchUtil; //import com.yeejoin.equip.utils.ElasticSearchUtil;
//import com.yeejoin.equip.utils.RedisKey;
//import com.yeejoin.equip.utils.RedisUtils; //import com.yeejoin.equip.utils.RedisUtils;
//import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
//import org.apache.commons.lang3.ObjectUtils;
//import org.apache.kafka.clients.consumer.ConsumerRecord; //import org.apache.kafka.clients.consumer.ConsumerRecord;
//import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value; //import org.springframework.beans.factory.annotation.Value;
//import org.springframework.kafka.annotation.KafkaListener; //import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.kafka.support.Acknowledgment; //import org.springframework.kafka.support.Acknowledgment;
//import org.springframework.stereotype.Service; //import org.springframework.stereotype.Service;
//import org.springframework.util.CollectionUtils;
//import org.springframework.util.ObjectUtils;
// //
//import javax.annotation.PostConstruct;
//import java.text.SimpleDateFormat; //import java.text.SimpleDateFormat;
//import java.util.*; //import java.util.*;
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
// //
///** ///**
// * @author LiuLin // * @author LiuLin
...@@ -30,63 +34,79 @@ ...@@ -30,63 +34,79 @@
//@Service //@Service
//public class KafkaConsumerService { //public class KafkaConsumerService {
// //
// //iot转发实时消息存入influxdb前缀
// private static final String MEASUREMENT = "iot_data_";
// //装备更新最新消息存入influxdb前缀 // //装备更新最新消息存入influxdb前缀
// private static final String TRUE = "true"; // private static final String TRUE = "true";
// private static final String FALSE = "false"; // private static final String FALSE = "false";
// //装备更新最新消息存入influxdb固定时间
// private static final String ES_INDEX_NAME_JX = "jxiop_equipments"; // private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
//
// private static final String MEASUREMENT = "iot_data_";
// private static final String TOTAL_DATA_ = "total_data_";
// @Autowired // @Autowired
// protected KafkaProducerService kafkaProducerService; // protected KafkaProducerService kafkaProducerService;
//
// @Autowired
// private RedisUtils redisUtils;
//
// @Autowired
// private ElasticSearchUtil elasticSearchUtil;
//
// @Autowired // @Autowired
// private InfluxDbConnection influxDbConnection; // private InfluxDbConnection influxDbConnection;
//
// @Autowired // @Autowired
// private RedisUtils redisUtils; // private IndicatorDataMapper indicatorDataMapper;
// ExecutorService service = Executors.newFixedThreadPool(10);
//
// @Value("${kafka.alarm.topic}") // @Value("${kafka.alarm.topic}")
// private String alarmTopic; // private String alarmTopic;
// @Autowired
// private ElasticSearchUtil elasticSearchUtil;
// //
// @KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup") // /**
// public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) { // * 批量消费kafka消息
// * Kafka消息转emq
// *
// * @param consumerRecords messages
// * @param ack ack
// */
// @KafkaListener(id = "consumerSingle", topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
// public void listen1(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
// try { // try {
// if (CollectionUtils.isEmpty(consumerRecords)) {
// return;
// }
// Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
//
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { // for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); // Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
// if (kafkaMessage.isPresent()) { // kafkaMessage.ifPresent(o -> this.handleMessage((String) o));
// String message = (String) kafkaMessage.get();
// this.handleMessage(message, equipmentIndexVOMap);
// }
// } // }
// } catch (Exception e) { // } catch (Exception e) {
// log.error("kafka失败,当前失败的批次。data:{}", consumerRecords); // log.error("kafka失败,当前失败的批次: data:{}", consumerRecords);
// e.printStackTrace();
// } finally { // } finally {
// ack.acknowledge(); // ack.acknowledge();
// } // }
// } // }
// //
// private void handleMessage(String message, Map<Object, Object> equipmentIndexVOMap) { // private void handleMessage(String record) {
// // JSONObject jsonObject = JSONObject.parseObject(record);
// JSONObject jsonObject = JSONObject.parseObject(message); // IndicatorData indicatorData = JSON.parseObject(record, IndicatorData.class);
// String dataType = jsonObject.getString("dataType"); // String dataType = jsonObject.getString("dataType");
// String indexAddress = jsonObject.getString("address"); // String indexAddress = jsonObject.getString("address");
// String traceId = jsonObject.getString("traceId");
// String gatewayId = jsonObject.getString("gatewayId"); // String gatewayId = jsonObject.getString("gatewayId");
// String value = jsonObject.getString("value"); // String value = jsonObject.getString("value");
// String key = indexAddress + "_" + gatewayId; // String key = indexAddress + "_" + gatewayId;
// // String signalType = jsonObject.getString("signalType");
// log.info("接收Kafka消息! address: {}, gatewayId: {},value:{}", indexAddress, gatewayId, value);
// try { // try {
// if (equipmentIndexVOMap.get(key) != null) { // if (redisUtils.hasKey(key)) {
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key); // EquipmentIndexVO equipmentSpeIndex = JSONObject.parseObject(redisUtils.get(key), EquipmentIndexVO.class);
// log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}", // String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
// equipmentSpeIndex.getEquipmentIndexName(), indexAddress, value, gatewayId); //
// //更新数据入ES库
// Map<String, Object> paramJson = new HashMap<>();
// if (!Arrays.asList(TRUE, FALSE).contains(value)) {
// paramJson.put("valueF", Float.parseFloat(value));
// }
// paramJson.put("value", value);
// paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
// paramJson.put("createdTime", new Date());
// paramJson.put("unit", equipmentSpeIndex.getUnitName());
// elasticSearchUtil.updateData(ES_INDEX_NAME_JX, key, JSON.toJSONString(paramJson));
// //
// Map<String, String> tagsMap = new HashMap<>(); // Map<String, String> tagsMap = new HashMap<>();
// Map<String, Object> fieldsMap = new HashMap<>(); // Map<String, Object> fieldsMap = new HashMap<>();
...@@ -94,40 +114,41 @@ ...@@ -94,40 +114,41 @@
// fieldsMap.put("address", indexAddress); // fieldsMap.put("address", indexAddress);
// fieldsMap.put("gatewayId", gatewayId); // fieldsMap.put("gatewayId", gatewayId);
// fieldsMap.put("dataType", dataType); // fieldsMap.put("dataType", dataType);
// fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
// fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
// fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
// fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm())); // fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
// fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
// //
// String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
// fieldsMap.put("traceId", traceId);
// fieldsMap.put("value", value); // fieldsMap.put("value", value);
// fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel); // fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
// fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
// fieldsMap.put("unit", equipmentSpeIndex.getUnitName()); // fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
// fieldsMap.put("createdTime", simpleDateFormat.format(new Date())); // fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
// influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
// //
// //更新数据入ES库 // indicatorData.setIsAlarm(String.valueOf(equipmentSpeIndex.getIsAlarm()));
// Map<String, Object> paramJson = new HashMap<>(); // indicatorData.setEquipmentIndexName(equipmentSpeIndex.getEquipmentIndexName());
// if (Arrays.asList(TRUE, FALSE).contains(value)) { // indicatorData.setEquipmentSpecificName(equipmentSpeIndex.getEquipmentSpecificName());
// paramJson.put("value", value); // indicatorData.setUnit(equipmentSpeIndex.getUnitName());
// indicatorData.setEquipmentsIdx(key);
// indicatorData.setValueLabel(valueLabel.isEmpty() ? value : valueLabel);
//
// //变位存入influxdb
// if ("transformation".equalsIgnoreCase(signalType)) {
// influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
// indicatorDataMapper.insert(indicatorData);
// log.info("TDEngine入库成功,{},value:{}", indicatorData.getEquipmentsIdx(), indicatorData.getValue());
// } else { // } else {
// paramJson.put("value", Float.parseFloat(value)); // influxDbConnection.insert(TOTAL_DATA_ + indicatorData.getGatewayId(), tagsMap, fieldsMap);
// log.info("总召入库,key:{}", indicatorData.getEquipmentsIdx());
// } // }
// paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
// paramJson.put("createdTime", simpleDateFormat.format(new Date()));
// paramJson.put("unit", equipmentSpeIndex.getUnitName());
// elasticSearchUtil.updateData(ES_INDEX_NAME_JX, key, JSON.toJSONString(paramJson));
// //
// if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) { // if (0 != equipmentSpeIndex.getIsAlarm()) {
// fieldsMap.putAll(tagsMap); // fieldsMap.putAll(tagsMap);
// //kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap)); // kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
// log.info("===========发送告警信息,key:{}", indicatorData.getEquipmentsIdx());
// } // }
// } // }
// } catch (Exception e) { // } catch (Exception e) {
// log.error("Iot透传消息解析入库失败" + e.getMessage(), e); // log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
// } // }
//
// } // }
// //
// private String valueTranslate(String value, String enumStr) { // private String valueTranslate(String value, String enumStr) {
...@@ -143,7 +164,7 @@ ...@@ -143,7 +164,7 @@
// } // }
// } // }
// } catch (Exception e) { // } catch (Exception e) {
// e.printStackTrace(); // log.error("告警枚举转换异常" + e.getMessage(), e);
// } // }
// return ""; // return "";
// } // }
......
package com.yeejoin.equip.kafka; //package com.yeejoin.equip.kafka;
//
import com.alibaba.fastjson.JSON; //import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; //import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; //import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection; //import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equip.config.KafkaConsumerConfig; //import com.yeejoin.equip.config.KafkaConsumerConfig;
import com.yeejoin.equip.entity.EquipmentIndexVO; //import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.entity.IndicatorData; //import com.yeejoin.equip.entity.IndicatorData;
import com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper; //import com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper;
import com.yeejoin.equip.utils.ElasticSearchUtil; //import com.yeejoin.equip.utils.ElasticSearchUtil;
import com.yeejoin.equip.utils.RedisUtils; //import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils; //import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord; //import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; //import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; //import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; //import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner; //import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat; //import java.text.SimpleDateFormat;
import java.time.Duration; //import java.time.Duration;
import java.util.*; //import java.util.*;
import java.util.concurrent.*; //import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; //import java.util.concurrent.atomic.AtomicLong;
//
/** ///**
* @author LiuLin // * @author LiuLin
* @date 2023年08月01日 17:27 // * @date 2023年08月01日 17:27
*/ // */
@Slf4j //@Slf4j
@Component //@Component
public class KafkaConsumerWithThread implements CommandLineRunner { //public class KafkaConsumerWithThread implements CommandLineRunner {
final private static AtomicLong sendThreadPoolCounter = new AtomicLong(0); // final private static AtomicLong sendThreadPoolCounter = new AtomicLong(0);
final public static ExecutorService pooledExecutor = // final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors(), // Executors.newFixedThreadPool(60 + Runtime.getRuntime().availableProcessors(),
createThreadFactory()); // createThreadFactory());
//iot转发实时消息存入influxdb前缀 // //iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_"; // private static final String MEASUREMENT = "iot_data_";
private static final String TOTAL_DATA_ = "total_data_"; // private static final String TOTAL_DATA_ = "total_data_";
private static final String ES_INDEX_NAME_JX = "jxiop_equipments"; // private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
//装备更新最新消息存入influxdb前缀 // //装备更新最新消息存入influxdb前缀
private static final String TRUE = "true"; // private static final String TRUE = "true";
private static final String FALSE = "false"; // private static final String FALSE = "false";
@Autowired // @Autowired
protected KafkaProducerService kafkaProducerService; // protected KafkaProducerService kafkaProducerService;
@Autowired // @Autowired
private KafkaConsumerConfig consumerConfig; // private KafkaConsumerConfig consumerConfig;
@Autowired // @Autowired
private InfluxDbConnection influxDbConnection; // private InfluxDbConnection influxDbConnection;
@Autowired // @Autowired
private RedisUtils redisUtils; // private RedisUtils redisUtils;
@Autowired // @Autowired
private IndicatorDataMapper indicatorDataMapper; // private IndicatorDataMapper indicatorDataMapper;
//
@Value("${kafka.alarm.topic}") // @Value("${kafka.alarm.topic}")
private String alarmTopic; // private String alarmTopic;
//
@Value("${kafka.topic}") // @Value("${kafka.topic}")
private String topic; // private String topic;
//
@Autowired // @Autowired
private ElasticSearchUtil elasticSearchUtil; // private ElasticSearchUtil elasticSearchUtil;
//
private static ThreadFactory createThreadFactory() { // private static ThreadFactory createThreadFactory() {
return runnable -> { // return runnable -> {
Thread thread = new Thread(runnable); // Thread thread = new Thread(runnable);
thread.setName(String.format("kafka-consumer-iot-pool-%d", KafkaConsumerWithThread.sendThreadPoolCounter.getAndIncrement())); // thread.setName(String.format("kafka-consumer-iot-pool-%d", KafkaConsumerWithThread.sendThreadPoolCounter.getAndIncrement()));
return thread; // return thread;
}; // };
} // }
//
@Override // @Override
public void run(String... args) { // public void run(String... args) {
Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic)); // Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic));
thread.start(); // thread.start();
} // }
//
private void processRecord(ConsumerRecord<String, String> record) { // private void processRecord(ConsumerRecord<String, String> record) {
JSONObject jsonObject = JSONObject.parseObject(record.value()); // JSONObject jsonObject = JSONObject.parseObject(record.value());
IndicatorData indicatorData = JSON.parseObject(record.value(), IndicatorData.class); // IndicatorData indicatorData = JSON.parseObject(record.value(), IndicatorData.class);
String dataType = jsonObject.getString("dataType"); // String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address"); // String indexAddress = jsonObject.getString("address");
String gatewayId = jsonObject.getString("gatewayId"); // String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value"); // String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId; // String key = indexAddress + "_" + gatewayId;
String signalType = jsonObject.getString("signalType"); // String signalType = jsonObject.getString("signalType");
// log.info("接收Kafka消息! address: {}, gatewayId: {},value:{}", indexAddress, gatewayId,value);
try { // try {
if (redisUtils.hasKey(key)) { // if (redisUtils.hasKey(key)) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
EquipmentIndexVO equipmentSpeIndex = JSONObject.parseObject(redisUtils.get(key), EquipmentIndexVO.class); // EquipmentIndexVO equipmentSpeIndex = JSONObject.parseObject(redisUtils.get(key), EquipmentIndexVO.class);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum()); // String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
//
//更新数据入ES库 // //更新数据入ES库
Map<String, Object> paramJson = new HashMap<>(); // Map<String, Object> paramJson = new HashMap<>();
if (!Arrays.asList(TRUE, FALSE).contains(value)) { // if (!Arrays.asList(TRUE, FALSE).contains(value)) {
paramJson.put("valueF", Float.parseFloat(value)); // paramJson.put("valueF", Float.parseFloat(value));
} // }
paramJson.put("value", value); // paramJson.put("value", value);
paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel); // paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
paramJson.put("createdTime", new Date()); // paramJson.put("createdTime", new Date());
paramJson.put("unit", equipmentSpeIndex.getUnitName()); // paramJson.put("unit", equipmentSpeIndex.getUnitName());
elasticSearchUtil.updateData(ES_INDEX_NAME_JX, key, JSON.toJSONString(paramJson)); // elasticSearchUtil.updateData(ES_INDEX_NAME_JX, key, JSON.toJSONString(paramJson));
//
Map<String, String> tagsMap = new HashMap<>(); // Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>(); // Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key); // tagsMap.put("equipmentsIdx", key);
fieldsMap.put("address", indexAddress); // fieldsMap.put("address", indexAddress);
fieldsMap.put("gatewayId", gatewayId); // fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType); // fieldsMap.put("dataType", dataType);
fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm())); // fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName()); // fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
//
fieldsMap.put("value", value); // fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel); // fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName()); // fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName()); // fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date())); // fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
//
indicatorData.setIsAlarm(String.valueOf(equipmentSpeIndex.getIsAlarm())); // indicatorData.setIsAlarm(String.valueOf(equipmentSpeIndex.getIsAlarm()));
indicatorData.setEquipmentIndexName(equipmentSpeIndex.getEquipmentIndexName()); // indicatorData.setEquipmentIndexName(equipmentSpeIndex.getEquipmentIndexName());
indicatorData.setEquipmentSpecificName(equipmentSpeIndex.getEquipmentSpecificName()); // indicatorData.setEquipmentSpecificName(equipmentSpeIndex.getEquipmentSpecificName());
indicatorData.setUnit(equipmentSpeIndex.getUnitName()); // indicatorData.setUnit(equipmentSpeIndex.getUnitName());
indicatorData.setEquipmentsIdx(key); // indicatorData.setEquipmentsIdx(key);
indicatorData.setValueLabel(valueLabel.isEmpty() ? value : valueLabel); // indicatorData.setValueLabel(valueLabel.isEmpty() ? value : valueLabel);
//
//变位存入influxdb // //变位存入influxdb
if ("transformation".equalsIgnoreCase(signalType)) { // if ("transformation".equalsIgnoreCase(signalType)) {
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap); // influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
indicatorDataMapper.insert(indicatorData); // indicatorDataMapper.insert(indicatorData);
log.info("TDEngine入库成功,{},value:{}",indicatorData.getEquipmentsIdx(),indicatorData.getValue()); // log.info("TDEngine入库成功,{},value:{}",indicatorData.getEquipmentsIdx(),indicatorData.getValue());
} else { // } else {
influxDbConnection.insert(TOTAL_DATA_ + indicatorData.getGatewayId(), tagsMap, fieldsMap); // influxDbConnection.insert(TOTAL_DATA_ + indicatorData.getGatewayId(), tagsMap, fieldsMap);
log.info("总召入库,key:{}",indicatorData.getEquipmentsIdx()); // log.info("总召入库,key:{}",indicatorData.getEquipmentsIdx());
} // }
//
if (0 != equipmentSpeIndex.getIsAlarm()) { // if (0 != equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap); // fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap)); // kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
log.info("===========发送告警信息,key:{}",indicatorData.getEquipmentsIdx()); // log.info("===========发送告警信息,key:{}",indicatorData.getEquipmentsIdx());
} // }
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e); // log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
} // }
} // }
//
private String valueTranslate(String value, String enumStr) { // private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) { // if (ObjectUtils.isEmpty(enumStr)) {
return ""; // return "";
} // }
try { // try {
JSONArray jsonArray = JSONArray.parseArray(enumStr); // JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) { // for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i); // JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) { // if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label"); // return jsonObject.getString("label");
} // }
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("告警枚举转换异常" + e.getMessage(), e); // log.error("告警枚举转换异常" + e.getMessage(), e);
} // }
return ""; // return "";
} // }
//
public class KafkaConsumerThread implements Runnable { // public class KafkaConsumerThread implements Runnable {
private final KafkaConsumer<String, String> kafkaConsumer; // private final KafkaConsumer<String, String> kafkaConsumer;
//
public KafkaConsumerThread(Properties props, String topic) { // public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(props); // this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Collections.singletonList(topic)); // this.kafkaConsumer.subscribe(Collections.singletonList(topic));
} // }
//
@Override // @Override
public void run() { // public void run() {
while (true) { // while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000)); // ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) { // for (ConsumerRecord<String, String> record : records) {
pooledExecutor.submit(() -> { // pooledExecutor.submit(() -> {
processRecord(record); // processRecord(record);
}); // });
} // }
} // }
} // }
} // }
} //}
package com.yeejoin.equip.kafka;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.equip.config.KafkaConsumerConfig;
import com.yeejoin.equip.entity.Book;
import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.entity.EsEntity;
import com.yeejoin.equip.entity.IndicatorData;
import com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper;
import com.yeejoin.equip.utils.ElasticSearchUtil;
import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author LiuLin
* @date 2023年08月01日 17:27
*/
@Slf4j
@Component
public class KafkaConsumerWorker implements CommandLineRunner {
final private static AtomicLong sendThreadPoolCounter = new AtomicLong(0);
final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(100 + Runtime.getRuntime().availableProcessors(),
createThreadFactory());
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = 6 * CPU_COUNT;
private static final int MAX_POOL_SIZE = 6 * CPU_COUNT + 2;
private static final ThreadPoolExecutor exec = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000)
);
private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
private static final String TRANSFORMATION = "transformation";
//装备更新最新消息存入influxdb前缀
private static final String TRUE = "true";
private static final String FALSE = "false";
@Autowired
protected KafkaProducerService kafkaProducerService;
@Autowired
private KafkaConsumerConfig consumerConfig;
@Autowired
private RedisUtils redisUtils;
@Autowired
private IndicatorDataMapper indicatorDataMapper;
@Value("${kafka.alarm.topic}")
private String alarmTopic;
@Value("${kafka.topic}")
private String topic;
@Autowired
private ElasticSearchUtil elasticSearchUtil;
private static ThreadFactory createThreadFactory() {
return runnable -> {
Thread thread = new Thread(runnable);
thread.setName(String.format("kafka-consumer-iot-pool-%d", KafkaConsumerWorker.sendThreadPoolCounter.getAndIncrement()));
return thread;
};
}
@Override
public void run(String... args) {
Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic));
thread.start();
}
private Optional<IndicatorData> processSignal(ConsumerRecord<String, String> record) {
JSONObject jsonObject = JSONObject.parseObject(record.value());
String address = jsonObject.getString("address");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = address + "_" + gatewayId;
log.info("===========收到Kafka消息,key:{},value:{}", key, value);
IndicatorData indicatorData = JSON.parseObject(record.value(), IndicatorData.class);
if (redisUtils.hasKey(key)) {
EquipmentIndexVO equipmentSpeIndex = JSONObject.parseObject(redisUtils.get(key), EquipmentIndexVO.class);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
indicatorData.setIsAlarm(String.valueOf(equipmentSpeIndex.getIsAlarm()));
indicatorData.setEquipmentIndexName(equipmentSpeIndex.getEquipmentIndexName());
indicatorData.setEquipmentSpecificName(equipmentSpeIndex.getEquipmentSpecificName());
indicatorData.setUnit(equipmentSpeIndex.getUnitName());
indicatorData.setEquipmentsIdx(key);
indicatorData.setValueLabel(valueLabel.isEmpty() ? value : valueLabel);
indicatorData.setValueF(!Arrays.asList(TRUE, FALSE).contains(value) ? Float.parseFloat(value) : 0);
//发送告警信息
if (0 != equipmentSpeIndex.getIsAlarm()) {
kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(indicatorData));
log.info("===========发送告警信息,key:{}", indicatorData.getEquipmentsIdx());
}
return Optional.of(indicatorData);
}
return Optional.empty();
}
private void processRecord(ConsumerRecords<String, String> records) {
Map<String, List<IndicatorData>> data = StreamSupport.stream(records.spliterator(), true)
.map(this::processSignal)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.groupingBy(IndicatorData::getGatewayId));
data.forEach((gatewayId, list) -> {
//1.update es
List<EsEntity<Book>> batchList = new ArrayList<>(list.size());
list.forEach(item -> batchList.add(new EsEntity<>(item.getEquipmentsIdx(), new Book(item.getValue(), item.getValueF(), item.getValueLabel(),
item.getUnit(), new Date()))));
elasticSearchUtil.updateBatch(ES_INDEX_NAME_JX, batchList);
//2.save
List<IndicatorData> tdDataList = list.stream().filter(t -> Objects.equals(t.getSignalType(), TRANSFORMATION)).collect(Collectors.toList());
indicatorDataMapper.insertBatch(tdDataList, gatewayId);
tdDataList.forEach(s -> log.info("===========TDEngine入库成功,id:【{}】,value:【{}】修改成功", s.getEquipmentsIdx(), s.getValueF()));
});
}
private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
log.error("告警枚举转换异常" + e.getMessage(), e);
}
return "";
}
public class KafkaConsumerThread implements Runnable {
private final KafkaConsumer<String, String> kafkaConsumer;
public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
exec.submit(() -> {
processRecord(records);
});
kafkaConsumer.commitSync();
exec.shutdown();
}
}
}
}
package com.yeejoin.equip.kafka; package com.yeejoin.equip.kafka;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.SendResult;
...@@ -63,7 +64,11 @@ public class KafkaProducerService { ...@@ -63,7 +64,11 @@ public class KafkaProducerService {
} }
@Override @Override
public void onSuccess(SendResult<String, String> stringStringSendResult) { public void onSuccess(SendResult<String, String> stringStringSendResult) {
//log.info("发送消息(异步) success! topic: {}, message: {}", topic, message); JSONObject jsonObject = JSONObject.parseObject(message);
String address = jsonObject.getString("address");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
log.info("===========Kafka发送消息 success! address: {}, gatewayId: {},value:{}", address, gatewayId,value);
} }
}); });
} }
......
//package com.yeejoin.equip.kafka;
//
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//import org.apache.kafka.clients.consumer.ConsumerRecords;
//import org.apache.kafka.clients.consumer.KafkaConsumer;
//import org.apache.kafka.common.serialization.StringDeserializer;
//import javax.annotation.PostConstruct;
//import java.time.Duration;
//import java.util.Collections;
//import java.util.Properties;
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
//
///**
// * @author LiuLin
// * @date 2023年10月11日 09:31
// */
//public class WorkerConsumer {
// private static final ExecutorService executor = Executors.newFixedThreadPool(100);
// @PostConstruct
// void init() throws Exception {
// String topicName = "topic_t40";
// KafkaConsumer<String, String> consumer = getKafkaConsumer();
// consumer.subscribe(Collections.singletonList(topicName));
// try {
// while (true) {
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// if(!records.isEmpty()){
// executor.execute(new MessageHandler(records));
// }
// }
// }finally {
// consumer.close();
// }
// }
//
// private static KafkaConsumer<String, String> getKafkaConsumer() {
// Properties props = new Properties();
// props.put("bootstrap.servers", "localhost:9092");
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_w");
// props.put("client.id", "client_02");
// props.put("enable.auto.commit", true);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//
// return new KafkaConsumer<>(props);
// }
//
//
// static class MessageHandler implements Runnable{
//
// private final ConsumerRecords<String, String> records;
//
// public MessageHandler(ConsumerRecords<String, String> records) {
// this.records = records;
// }
//
// @Override
// public void run() {
// records.forEach(record -> {
// System.out.println(" 开始处理消息: " + record.value() + ", partition " + record.partition());
// });
// }
// }
//}
...@@ -16,6 +16,8 @@ public interface IndicatorDataMapper { ...@@ -16,6 +16,8 @@ public interface IndicatorDataMapper {
int insert(IndicatorData indicatorData); int insert(IndicatorData indicatorData);
int insertBatch(@Param("list")List<IndicatorData> indicatorDataList, @Param("gatewayId")String gatewayId);
void createDB(); void createDB();
void createTable(); void createTable();
......
package com.yeejoin.equip.utils; package com.yeejoin.equip.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.equip.entity.Book;
import com.yeejoin.equip.entity.EsEntity;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
...@@ -21,9 +30,12 @@ import org.elasticsearch.search.SearchHit; ...@@ -21,9 +30,12 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
/** /**
...@@ -48,6 +60,7 @@ public class ElasticSearchUtil { ...@@ -48,6 +60,7 @@ public class ElasticSearchUtil {
* @return * @return
*/ */
public boolean updateData(String indexName, String id, String paramJson) { public boolean updateData(String indexName, String id, String paramJson) {
log.info("更新ES数据,value:{}", id);
UpdateRequest updateRequest = new UpdateRequest(indexName, id); UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//如果修改索引中不存在则进行新增 //如果修改索引中不存在则进行新增
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
...@@ -75,6 +88,90 @@ public class ElasticSearchUtil { ...@@ -75,6 +88,90 @@ public class ElasticSearchUtil {
} }
/** /**
* 单条更新
*
* @param indexName
* @param id
* @param data
* @return
* @throws IOException
*/
public boolean updateData(String indexName, String id, Object data) throws IOException {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//准备文档
String jsonString = JSONObject.toJSONString(data);
Map jsonMap = JSONObject.parseObject(jsonString, Map.class);
updateRequest.doc(jsonMap);
updateRequest.timeout(TimeValue.timeValueSeconds(1));
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//数据为存储而不是更新
UpdateResponse update = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
return update.getGetResult().equals(DocWriteResponse.Result.UPDATED);
}
/**
* 必须传递ids集合
*
* @param indexName
* @param idList
* @param map
* @return
*/
public boolean update(String indexName, List<String> idList, Map map) {
// 创建批量请求
BulkRequest bulkRequest = new BulkRequest();
for (String id : idList) {
UpdateRequest updateRequest = new UpdateRequest(indexName, id).doc(map);
bulkRequest.add(updateRequest);
}
try {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulk.hasFailures();
} catch (IOException e) {
return false;
}
}
/**
* Description: 批量修改数据
*
* @param index index
* @param list 更新列表
* @author LiuLin
*/
public <T> void updateBatch(String index, List<EsEntity<T>> list) {
BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new UpdateRequest(index, item.getId())
.doc(JSON.toJSONString(item.getData()), XContentType.JSON)));
try {
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
list.forEach(s -> log.info("===========索引:【{}】,主键:【{}】修改成功", index, s.getId()));
} catch (Exception e) {
log.error("索引:[{}]", index, e);
}
}
/**
* Description: 批量插入数据
*
* @param index index
* @param list 插入列表
* @author LiuLin
*/
public <T> void insertBatch(String index, List<EsEntity<T>> list) {
BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new IndexRequest(index).id(item.getId())
.source(JSON.toJSONString(item.getData()), XContentType.JSON)));
try {
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* ES异步修改数据 * ES异步修改数据
* *
* @param indexName 索引名称 * @param indexName 索引名称
...@@ -86,26 +183,28 @@ public class ElasticSearchUtil { ...@@ -86,26 +183,28 @@ public class ElasticSearchUtil {
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(true);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson, XContentType.JSON); updateRequest.doc(paramJson, XContentType.JSON);
restHighLevelClient.updateAsync(updateRequest, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() { restHighLevelClient.updateAsync(updateRequest, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override @Override
public void onResponse(UpdateResponse updateResponse) { public void onResponse(UpdateResponse updateResponse) {
if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) { if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id); log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
}
}
@Override
public void onFailure(Exception e) {
log.error("索引:[{}],主键:【{}】", indexName, id, e);
} }
}); }
@Override
public void onFailure(Exception e) {
log.error("索引:[{}],主键:【{}】", indexName, id, e);
}
});
} }
/** /**
* 构建SearchResponse * 构建SearchResponse
* @param indices 索引 *
* @param query queryBuilder * @param indices 索引
* @param fun 返回函数 * @param query queryBuilder
* @param <T> 返回类型 * @param fun 返回函数
* @param <T> 返回类型
* @return List, 可以使用fun转换为T结果 * @return List, 可以使用fun转换为T结果
* @throws Exception e * @throws Exception e
*/ */
......
...@@ -14,12 +14,21 @@ spring.datasource.mysql-server.hikari.connection-timeout= 60000 ...@@ -14,12 +14,21 @@ spring.datasource.mysql-server.hikari.connection-timeout= 60000
spring.datasource.mysql-server.hikari.connection-test-query= SELECT 1 spring.datasource.mysql-server.hikari.connection-test-query= SELECT 1
#TDengine ??? #TDengine ???
spring.datasource.tdengine-server.driver-class-name=com.taosdata.jdbc.rs.RestfulDriver spring.datasource.tdengine-server.driver-class-name=com.taosdata.jdbc.rs.RestfulDriver
spring.datasource.tdengine-server.jdbc-url = jdbc:TAOS-RS://139.9.170.47:6041/iot_data_test?user=root&password=taosdata&timezone=GMT%2b8&allowMultiQueries=true spring.datasource.tdengine-server.jdbc-url = jdbc:TAOS-RS://139.9.170.47:6041/iot_data?user=root&password=taosdata&timezone=GMT%2b8&allowMultiQueries=true
spring.datasource.tdengine-server.username=root spring.datasource.tdengine-server.username=root
spring.datasource.tdengine-server.password=taosdata spring.datasource.tdengine-server.password=taosdata
spring.datasource.tdengine-server.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.tdengine-server.hikari.minimum-idle= 30
spring.datasource.tdengine-server.hikari.maximum-pool-size= 150
spring.datasource.tdengine-server.hikari.auto-commit= true
spring.datasource.tdengine-server.hikari.pool-name=TDEngineDruidCP
spring.datasource.tdengine-server.hikari.idle-timeout= 500000
spring.datasource.tdengine-server.hikari.max-lifetime= 1800000
spring.datasource.tdengine-server.hikari.connection-timeout= 60000
spring.datasource.tdengine-server.hikari.connection-test-query= show tables
spring.redis.database=0 spring.redis.database=0
spring.redis.host=172.16.11.201 spring.redis.host=139.9.173.44
spring.redis.port=6379 spring.redis.port=6379
spring.redis.password=yeejoin@2020 spring.redis.password=yeejoin@2020
spring.redis.timeout=3000 spring.redis.timeout=3000
...@@ -37,9 +46,9 @@ eureka.instance.lease-renewal-interval-in-seconds=5 ...@@ -37,9 +46,9 @@ eureka.instance.lease-renewal-interval-in-seconds=5
eureka.instance.metadata-map.management.context-path=${server.servlet.context-path}/actuator eureka.instance.metadata-map.management.context-path=${server.servlet.context-path}/actuator
eureka.instance.status-page-url-path=/actuator/info eureka.instance.status-page-url-path=/actuator/info
eureka.instance.metadata-map.management.api-docs=http://localhost:${server.port}${server.servlet.context-path}/doc.html eureka.instance.metadata-map.management.api-docs=http://localhost:${server.port}${server.servlet.context-path}/doc.html
eureka.instance.hostname= 172.16.11.201 eureka.instance.hostname= 139.9.173.44
eureka.instance.prefer-ip-address = true eureka.instance.prefer-ip-address = true
eureka.client.serviceUrl.defaultZone=http://${spring.security.user.name}:${spring.security.user.password}@172.16.11.201:10001/eureka/ eureka.client.serviceUrl.defaultZone=http://${spring.security.user.name}:${spring.security.user.password}@139.9.173.44:10001/eureka/
spring.security.user.name=admin spring.security.user.name=admin
spring.security.user.password=a1234560 spring.security.user.password=a1234560
...@@ -53,16 +62,6 @@ emqx.max-inflight=1000 ...@@ -53,16 +62,6 @@ emqx.max-inflight=1000
emqx.keep-alive-interval=10 emqx.keep-alive-interval=10
emqx.biz-topic[0]= iot/data/perspective emqx.biz-topic[0]= iot/data/perspective
# influxDB
spring.influx.url=http://139.9.173.44:8086
spring.influx.password=Yeejoin@2020
spring.influx.user=root
spring.influx.database=iot_platform_test
spring.influx.retention_policy=default
spring.influx.retention_policy_time=30d
spring.influx.actions=10000
spring.influx.bufferLimit=20000
#kafka #kafka
spring.kafka.bootstrap-servers=139.9.173.44:9092 spring.kafka.bootstrap-servers=139.9.173.44:9092
spring.kafka.producer.retries=1 spring.kafka.producer.retries=1
...@@ -74,7 +73,7 @@ spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.Strin ...@@ -74,7 +73,7 @@ spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.Strin
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=messageConsumerGroup spring.kafka.consumer.group-id=messageConsumerGroup
spring.kafka.consumer.bootstrap-servers=139.9.173.44:9092 spring.kafka.consumer.bootstrap-servers=139.9.173.44:9092
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
...@@ -90,8 +89,8 @@ elasticsearch.address= 139.9.173.44:9200 ...@@ -90,8 +89,8 @@ elasticsearch.address= 139.9.173.44:9200
elasticsearch.username= elastic elasticsearch.username= elastic
elasticsearch.password= Yeejoin@2020 elasticsearch.password= Yeejoin@2020
elasticsearch.scheme= http elasticsearch.scheme= http
elasticsearch.connectTimeout= 5000 elasticsearch.connectTimeout= 50000
elasticsearch.socketTimeout= 5000 elasticsearch.socketTimeout= 50000
elasticsearch.connectionRequestTimeout= 5000 elasticsearch.connectionRequestTimeout= 50000
elasticsearch.maxConnectNum= 1000 elasticsearch.maxConnectNum= 1000
elasticsearch.maxConnectPerRoute= 1000 elasticsearch.maxConnectPerRoute= 1000
\ No newline at end of file
...@@ -3,28 +3,58 @@ ...@@ -3,28 +3,58 @@
<mapper namespace="com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper"> <mapper namespace="com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper">
<!--创建数据库,指定压缩比--> <!--创建数据库,指定压缩比-->
<update id="createDB" > <update id="createDB">
create database if not exists iot_data vgroups 10 buffer 10 COMP 2 PRECISION 'ns'; create database if not exists iot_data vgroups 10 buffer 10 COMP 2 PRECISION 'ns';
</update> </update>
<!--创建超级表--> <!--创建超级表-->
<update id="createTable" > <update id="createTable">
CREATE STABLE if not exists indicator create STABLE if not exists s_indicator_his
(created_time timestamp, (created_time timestamp,
`value` VARCHAR(12), address binary(64),
`value_f` float,
value_label VARCHAR(24),
unit NCHAR(12))
TAGS (address binary(64),
gateway_id binary(64),
equipments_idx NCHAR(64), equipments_idx NCHAR(64),
data_type NCHAR(12), data_type NCHAR(12),
is_alarm BIGINT, is_alarm BIGINT,
equipment_index_name VARCHAR(200) , equipment_index_name VARCHAR(200) ,
equipment_specific_name VARCHAR(200)); equipment_specific_name VARCHAR(200),
`value` VARCHAR(12),
`value_f` float,
value_label VARCHAR(24),
unit NCHAR(12))
TAGS (gateway_id binary(64));
</update> </update>
<insert id="insert" parameterType="com.yeejoin.equip.entity.IndicatorData" > <insert id="insertBatch" parameterType="java.util.List">
insert into
<foreach separator=" " collection="list" item="item" index="index">
indicator_his_#{gatewayId,jdbcType=VARCHAR} USING s_indicator_his
TAGS (#{item.gatewayId,jdbcType=VARCHAR})
VALUES (NOW + #{index}a,
#{item.address,jdbcType=VARCHAR},
#{item.equipmentsIdx,jdbcType=VARCHAR},
#{item.dataType,jdbcType=VARCHAR},
#{item.isAlarm,jdbcType=VARCHAR},
#{item.equipmentSpecificName,jdbcType=VARCHAR},
#{item.equipmentIndexName,jdbcType=VARCHAR},
#{item.value,jdbcType=VARCHAR},
#{item.valueF,jdbcType=FLOAT},
#{item.valueLabel,jdbcType=VARCHAR},
#{item.unit,jdbcType=VARCHAR})
</foreach>
</insert>
<!-- <insert id="insertBatch" parameterType="java.util.List">-->
<!-- INSERT INTO indicator_#{gatewayId,jdbcType=VARCHAR} (created_time, `value`,`value_f`, value_label,unit,-->
<!-- address,gateway_id,equipments_idx,data_type,is_alarm,equipment_index_name,equipment_specific_name)-->
<!-- VALUES-->
<!-- <foreach collection="list" item="item" separator="UNION ALL" index="index">-->
<!-- SELECT NOW + #{index}a, #{item.value}, #{item.valueF}, #{item.valueLabel}, #{item.unit},-->
<!-- #{item.address}, #{item.gatewayId}, #{item.equipmentsIdx}, #{item.dataType}, #{item.isAlarm},-->
<!-- #{item.equipmentSpecificName},#{item.equipmentIndexName}-->
<!-- </foreach>-->
<!-- </insert>-->
<insert id="insert" parameterType="com.yeejoin.equip.entity.IndicatorData">
insert into indicator_#{gatewayId,jdbcType=VARCHAR} USING indicator insert into indicator_#{gatewayId,jdbcType=VARCHAR} USING indicator
TAGS (#{address,jdbcType=VARCHAR}, TAGS (#{address,jdbcType=VARCHAR},
#{gatewayId,jdbcType=VARCHAR}, #{gatewayId,jdbcType=VARCHAR},
...@@ -33,10 +63,6 @@ ...@@ -33,10 +63,6 @@
#{isAlarm,jdbcType=VARCHAR}, #{isAlarm,jdbcType=VARCHAR},
#{equipmentSpecificName,jdbcType=VARCHAR}, #{equipmentSpecificName,jdbcType=VARCHAR},
#{equipmentIndexName,jdbcType=VARCHAR}) #{equipmentIndexName,jdbcType=VARCHAR})
VALUES (NOW, VALUES (NOW, #{value,jdbcType=VARCHAR}, #{valueF,jdbcType=FLOAT}, #{valueLabel,jdbcType=VARCHAR}, #{unit,jdbcType=VARCHAR})
#{value,jdbcType=VARCHAR},
#{valueF,jdbcType=FLOAT},
#{valueLabel,jdbcType=VARCHAR},
#{unit,jdbcType=VARCHAR})
</insert> </insert>
</mapper> </mapper>
\ No newline at end of file
...@@ -311,12 +311,12 @@ ...@@ -311,12 +311,12 @@
<repository> <repository>
<id>Releases</id> <id>Releases</id>
<name>Releases</name> <name>Releases</name>
<url>http://36.46.149.14:8081/nexus/content/repositories/releases/</url> <url>http://113.142.68.105:8081/nexus/content/repositories/releases/</url>
</repository> </repository>
<repository> <repository>
<id>Snapshots</id> <id>Snapshots</id>
<name>Snapshots</name> <name>Snapshots</name>
<url>http://36.46.149.14:8081/nexus/content/repositories/snapshots/</url> <url>http://113.142.68.105:8081/nexus/content/repositories/snapshots/</url>
</repository> </repository>
<repository> <repository>
<id>com.e-iceblue</id> <id>com.e-iceblue</id>
...@@ -326,7 +326,7 @@ ...@@ -326,7 +326,7 @@
<repository> <repository>
<id>thirdparty</id> <id>thirdparty</id>
<name>thirdparty</name> <name>thirdparty</name>
<url>http://36.46.149.14:8081/nexus/content/repositories/thirdparty/</url> <url>http://113.142.68.105:8081/nexus/content/repositories/thirdparty/</url>
</repository> </repository>
</repositories> </repositories>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment