Commit ca374051 authored by 刘林's avatar 刘林

fix(equip):江西电建添加ES依赖以及更新指标数据

parent b9678e18
......@@ -89,6 +89,12 @@
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
......
package com.yeejoin.equip.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Configuration
@EnableAsync
public class EquipExecutorConfig {
@Bean(name = "equipAsyncExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(10);
//配置最大线程数
executor.setMaxPoolSize(500);
//配置队列大小
executor.setQueueCapacity(2000);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("namePrefix");
//线程池维护线程所允许的空闲时间
executor.setKeepAliveSeconds(30);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行--拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
//等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
......@@ -5,9 +5,9 @@ import com.yeejoin.equip.mapper.EquipmentSpecificIndexMapper;
import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
......@@ -22,20 +22,25 @@ import java.util.stream.Collectors;
@Slf4j
public class EquipmentIndexCacheRunner implements CommandLineRunner {
@Autowired
@Resource
private EquipmentSpecificIndexMapper equipmentSpecificIndexMapper;
@Autowired
@Resource
private RedisUtils redisUtils;
@Override
public void run(String... args) throws Exception {
log.info(">>服务启动执行,执行预加载数据等操作");
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS);
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS_KEY);
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null && v.getIndexAddress() !=null)
.filter(v -> v.getGatewayId() != null)
.collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity(),(v1, v2) -> v1));
Map<String, Object> equipmentIndexKeyMap = equipSpecificIndexList.stream()
.filter(v -> v.getIndexAddress() != null && v.getGatewayId() == null)
.collect(Collectors.toMap(EquipmentIndexVO::getIndexAddress, Function.identity(),(v1, v2) -> v1));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS_KEY, equipmentIndexKeyMap);
}
}
}
\ No newline at end of file
......@@ -3,16 +3,10 @@ package com.yeejoin.equip.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @author LiuLin
......@@ -32,27 +26,13 @@ public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置并发量,小于或者等于 Topic 的分区数
factory.setConcurrency(5);
// 设置为批量监听
factory.setBatchListener(Boolean.TRUE);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
@Bean("consumerConfig")
public Properties consumerConfigs() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
// 自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
......@@ -70,5 +50,4 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return props;
}
}
......@@ -3,12 +3,12 @@ package com.yeejoin.equip.entity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@ApiModel(value = "性能指标详情返回vo实体", description = "性能指标详情返回vo实体")
public class EquipmentIndexVO {
public class EquipmentIndexVO implements Serializable {
@ApiModelProperty(value = "id")
private Long id;
......@@ -84,6 +84,7 @@ public class EquipmentIndexVO {
@ApiModelProperty(value = "装备名称")
private String equipmentSpecificName;
@ApiModelProperty(value = "指标名称")
@ApiModelProperty(value = "装备指标名称")
private String equipmentIndexName;
}
package com.yeejoin.equip.kafka;
import com.yeejoin.equip.service.KafkaMessageService;
import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 消费服务类
*/
@Slf4j
@Service
public class KafkaConsumerService {
@Autowired
protected KafkaProducerService kafkaProducerService;
@Autowired
private KafkaMessageService kafkaMessageService;
@Autowired
private RedisUtils redisUtils;
@KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
if (CollectionUtils.isEmpty(consumerRecords)) {
return;
}
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
String message = (String) kafkaMessage.get();
kafkaMessageService.handlerMessage(message,equipmentIndexVOMap);
}
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次。data:{}", consumerRecords);
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
}
\ No newline at end of file
//package com.yeejoin.equip.kafka;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONArray;
//import com.alibaba.fastjson.JSONObject;
//import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
//import com.yeejoin.equip.entity.EquipmentIndexVO;
//import com.yeejoin.equip.utils.ElasticSearchUtil;
//import com.yeejoin.equip.utils.RedisKey;
//import com.yeejoin.equip.utils.RedisUtils;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.kafka.clients.consumer.ConsumerRecord;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.kafka.support.Acknowledgment;
//import org.springframework.stereotype.Service;
//import org.springframework.util.CollectionUtils;
//import org.springframework.util.ObjectUtils;
//
//import java.text.SimpleDateFormat;
//import java.util.*;
//
///**
// * @author LiuLin
// * @date 2023/6/25
// * @apiNote kafka 消费服务类
// */
//@Slf4j
//@Service
//public class KafkaConsumerService {
//
// //iot转发实时消息存入influxdb前缀
// private static final String MEASUREMENT = "iot_data_";
// //装备更新最新消息存入influxdb前缀
// private static final String TRUE = "true";
// private static final String FALSE = "false";
// //装备更新最新消息存入influxdb固定时间
// private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
// @Autowired
// protected KafkaProducerService kafkaProducerService;
// @Autowired
// private InfluxDbConnection influxDbConnection;
// @Autowired
// private RedisUtils redisUtils;
// @Value("${kafka.alarm.topic}")
// private String alarmTopic;
// @Autowired
// private ElasticSearchUtil elasticSearchUtil;
//
// @KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
// public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
// try {
// if (CollectionUtils.isEmpty(consumerRecords)) {
// return;
// }
// Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
//
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
// if (kafkaMessage.isPresent()) {
// String message = (String) kafkaMessage.get();
// this.handleMessage(message, equipmentIndexVOMap);
// }
// }
// } catch (Exception e) {
// log.error("kafka失败,当前失败的批次。data:{}", consumerRecords);
// e.printStackTrace();
// } finally {
// ack.acknowledge();
// }
// }
//
// private void handleMessage(String message, Map<Object, Object> equipmentIndexVOMap) {
//
// JSONObject jsonObject = JSONObject.parseObject(message);
// String dataType = jsonObject.getString("dataType");
// String indexAddress = jsonObject.getString("address");
// String traceId = jsonObject.getString("traceId");
// String gatewayId = jsonObject.getString("gatewayId");
// String value = jsonObject.getString("value");
// String key = indexAddress + "_" + gatewayId;
//
// try {
// if (equipmentIndexVOMap.get(key) != null) {
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
// log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}",
// equipmentSpeIndex.getEquipmentIndexName(), indexAddress, value, gatewayId);
//
// Map<String, String> tagsMap = new HashMap<>();
// Map<String, Object> fieldsMap = new HashMap<>();
// tagsMap.put("equipmentsIdx", key);
// tagsMap.put("address", indexAddress);
// tagsMap.put("gatewayId", gatewayId);
// tagsMap.put("dataType", dataType);
// tagsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
// tagsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
// tagsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
// tagsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
//
// String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
// fieldsMap.put("traceId", traceId);
// fieldsMap.put("value", value);
// fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
// fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
// fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
// influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
//
// //更新数据入ES库
// Map<String, Object> paramJson = new HashMap<>();
// if (Arrays.asList(TRUE, FALSE).contains(value)) {
// paramJson.put("value", value);
// } else {
// paramJson.put("value", Float.parseFloat(value));
// }
// paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
// paramJson.put("createdTime", simpleDateFormat.format(new Date()));
// paramJson.put("unit", equipmentSpeIndex.getUnitName());
// elasticSearchUtil.updateData(ES_INDEX_NAME_JX, key, JSON.toJSONString(paramJson));
//
// if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
// fieldsMap.putAll(tagsMap);
// //kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
// }
// }
// } catch (Exception e) {
// log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
// }
//
// }
//
// private String valueTranslate(String value, String enumStr) {
// if (ObjectUtils.isEmpty(enumStr)) {
// return "";
// }
// try {
// JSONArray jsonArray = JSONArray.parseArray(enumStr);
// for (int i = 0; i < jsonArray.size(); i++) {
// JSONObject jsonObject = jsonArray.getJSONObject(i);
// if (jsonObject.get("key").equals(value)) {
// return jsonObject.getString("label");
// }
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// return "";
// }
//}
//
//
package com.yeejoin.equip.kafka;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equip.config.KafkaConsumerConfig;
import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.utils.ElasticSearchUtil;
import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author LiuLin
* @date 2023年08月01日 17:27
*/
@Slf4j
@Component
public class KafkaConsumerWithThread implements CommandLineRunner {
final private static AtomicLong sendThreadPoolCounter = new AtomicLong(0);
final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors(),
createThreadFactory());
//iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_";
private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
//装备更新最新消息存入influxdb前缀
private static final String TRUE = "true";
private static final String FALSE = "false";
//装备更新最新消息存入influxdb固定时间
private static final Long TIME = 1688558007051L;
@Autowired
protected KafkaProducerService kafkaProducerService;
@Autowired
private KafkaConsumerConfig consumerConfig;
@Autowired
private InfluxDbConnection influxDbConnection;
@Autowired
private RedisUtils redisUtils;
@Value("${kafka.alarm.topic}")
private String alarmTopic;
@Value("${kafka.topic}")
private String topic;
@Autowired
private ElasticSearchUtil elasticSearchUtil;
private static ThreadFactory createThreadFactory() {
return runnable -> {
Thread thread = new Thread(runnable);
thread.setName(String.format("clojure-agent-send-pool-%d", KafkaConsumerWithThread.sendThreadPoolCounter.getAndIncrement()));
return thread;
};
}
@Override
public void run(String... args) {
Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic));
thread.start();
}
private void processRecord(ConsumerRecord<String, String> record, Map<Object, Object> equipmentIndexVOMap) {
// 处理消息记录
//log.info("监听Kafka集群message:{}",record.value());
JSONObject jsonObject = JSONObject.parseObject(record.value());
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key);
tagsMap.put("address", indexAddress);
tagsMap.put("gatewayId", gatewayId);
tagsMap.put("dataType", dataType);
fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
//更新数据入ES库
Map<String, Object> paramJson = new HashMap<>();
if (Arrays.asList(TRUE, FALSE).contains(value)) {
paramJson.put("value", value);
}else{
paramJson.put("value", Float.parseFloat(value));
}
paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
paramJson.put("createdTime", simpleDateFormat.format(new Date()));
elasticSearchUtil.updateData(ES_INDEX_NAME_JX,key,JSON.toJSONString(paramJson));
//influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
}
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
}
}
private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
log.error("告警枚举转换异常" + e.getMessage(), e);
}
return "";
}
public class KafkaConsumerThread implements Runnable {
private final KafkaConsumer<String, String> kafkaConsumer;
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
pooledExecutor.submit(() -> {
processRecord(record, equipmentIndexVOMap);
});
}
}
}
//@Override
//public void run() {
// while (true) {
// ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
// for (TopicPartition topicPartition : records.partitions()) {
// List<ConsumerRecord<String, String>> recordList = new ArrayList<>(records.records(topicPartition));
// Iterator<ConsumerRecord<String, String>> it = recordList.iterator();
// while (it.hasNext()) {
// ConsumerRecord<String, String> record = it.next();
// long startTime = System.currentTimeMillis();
// long lastOffset = recordList.get(recordList.size() - 1).offset();
// try {
// kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1)));
// } catch (Exception e) {
// log.error("kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},commit time:{},value{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - startTime), record.value(), e);
// break;
// }
// pooledExecutor.submit(() -> {
// processRecord(record, equipmentIndexVOMap);
// });
// it.remove();
// }
// }
// }
//}
}
}
......@@ -6,7 +6,6 @@ import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
......@@ -52,7 +51,7 @@ public class KafkaProducerService {
/**
* 发送消息(异步)
* @param topic 主题
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {
......
package com.yeejoin.equip.utils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author LiuLin
* @date 2023年08月08日 16:30
*/
@Slf4j
@Component
public class ElasticSearchUtil {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* ES修改数据
*
* @param indexName 索引名称
* @param id 主键
* @param paramJson 参数JSON
* @return
*/
public boolean updateData(String indexName, String id, String paramJson) {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//如果修改索引中不存在则进行新增
updateRequest.docAsUpsert(true);
//立即刷新数据
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson, XContentType.JSON);
try {
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult());
if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
//新增
log.info("索引:【{}】,主键:【{}】新增成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
//修改
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
//无变化
log.info("索引:[{}],主键:[{}]无变化", indexName, id);
return true;
}
} catch (IOException e) {
log.error("索引:[{}],主键:【{}】,更新异常:[{}]", indexName, id, e);
return false;
}
return false;
}
}
......@@ -12,4 +12,6 @@ public class RedisKey {
* 装备指标Key值
*/
public static final String EQUIP_INDEX_ADDRESS = "equip_index_address";
public static final String EQUIP_INDEX_ADDRESS_KEY = "equip_index_address_key";
}
......@@ -78,4 +78,11 @@ spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch
kafka.topic=PERSPECTIVE
emq.topic=iot/data/perspective
kafka.alarm.topic=EQUIPMENT_ALARM
\ No newline at end of file
kafka.alarm.topic=EQUIPMENT_ALARM
spring.elasticsearch.rest.uris=http://39.98.224.23:9200
spring.elasticsearch.rest.connection-timeout=30000
spring.elasticsearch.rest.username=elastic
spring.elasticsearch.rest.password=123456
spring.elasticsearch.rest.read-timeout=30000
management.health.elasticsearch.enabled=false
\ No newline at end of file
......@@ -304,6 +304,11 @@
</repository> <repository> <id>Snapshots</id> <name>Snapshots</name> <url>http://4v059425e3.zicp.vip:13535/nexus/content/repositories/snapshots/</url>
</repository> -->
<repository>
<id>public</id>
<name>public</name>
<url>http://113.142.68.105:8081/nexus/content/repositories/public/</url>
</repository>
<repository>
<id>Releases</id>
<name>Releases</name>
<url>http://36.46.149.14:8081/nexus/content/repositories/releases/</url>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment