Commit 95fe4813 authored by 刘林's avatar 刘林

fix(equip):添加influxdb入库更新最新指标,调整influxdb tags字段

parent 10fb8b44
......@@ -17,12 +17,12 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author LiuLin
......@@ -47,7 +47,12 @@ public class KafkaConsumerService {
@Value("${kafka.alarm.topic}")
private String alarmTopic;
private static final String MEASUREMENT= "iot_data";
//iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_";
//装备更新最新消息存入influxdb前缀
private static final String INDICATORS = "indicators_";
//装备更新最新消息存入influxdb固定时间
private static final Long TIME = 1688558007051L;
@KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
......@@ -86,6 +91,7 @@ public class KafkaConsumerService {
try {
if (equipmentIndexVOMap.get(key) != null) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}",
equipmentSpeIndex.getEquipmentIndexName(),indexAddress,value,gatewayId);
......@@ -93,24 +99,25 @@ public class KafkaConsumerService {
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
tagsMap.put("address", indexAddress);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
tagsMap.put("gatewayId", gatewayId);
tagsMap.put("dataType", dataType);
tagsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
tagsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT, tagsMap, fieldsMap);
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic,JSON.toJSONString(fieldsMap));
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment