Commit e49200d3 authored by 刘林's avatar 刘林

fix(equip):添加告警转发消息至Kafka

parent fc6ad856
...@@ -66,6 +66,9 @@ public class EquipmentIndexVO { ...@@ -66,6 +66,9 @@ public class EquipmentIndexVO {
@ApiModelProperty(value = "是否支持趋势查看") @ApiModelProperty(value = "是否支持趋势查看")
private Integer isTrend; private Integer isTrend;
@ApiModelProperty(value = "是否告警")
private Integer isAlarm;
@ApiModelProperty(value = "指标枚举") @ApiModelProperty(value = "指标枚举")
private String valueEnum; private String valueEnum;
......
...@@ -10,6 +10,7 @@ import com.yeejoin.equipmanage.common.entity.vo.EquipmentIndexVO; ...@@ -10,6 +10,7 @@ import com.yeejoin.equipmanage.common.entity.vo.EquipmentIndexVO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
...@@ -35,11 +36,19 @@ public class KafkaConsumerService { ...@@ -35,11 +36,19 @@ public class KafkaConsumerService {
@Autowired @Autowired
private InfluxDbConnection influxDbConnection; private InfluxDbConnection influxDbConnection;
@Autowired
protected KafkaProducerService kafkaProducerService;
private Executor dataExecutor = new ThreadPoolTaskExecutor(); private Executor dataExecutor = new ThreadPoolTaskExecutor();
@Autowired @Autowired
private RedisUtils redisUtils; private RedisUtils redisUtils;
@Value("${kafka.alarm.topic}")
private String alarmTopic;
private static final String MEASUREMENT= "iot_data";
@KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup") @KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) { public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try { try {
...@@ -92,17 +101,18 @@ public class KafkaConsumerService { ...@@ -92,17 +101,18 @@ public class KafkaConsumerService {
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel); fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("gatewayId", gatewayId); fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType); fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId()); fieldsMap.put("isAlarm", equipmentSpeIndex.getIsAlarm());
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName()); fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName()); fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName()); fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date())); fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex)); fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT, tagsMap, fieldsMap);
//保存influxDB库 if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
influxDbConnection.insert("iot_data", tagsMap, fieldsMap); kafkaProducerService.sendMessageAsync(alarmTopic,JSON.toJSONString(fieldsMap));
log.info("influxdb入库时间:{}",simpleDateFormat.format(new Date())); }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e); log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
......
...@@ -29,6 +29,9 @@ public class KafkaConsumerConfig { ...@@ -29,6 +29,9 @@ public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.group-id}") @Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId; private String kafkaGroupId;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(consumerFactory());
...@@ -49,7 +52,7 @@ public class KafkaConsumerConfig { ...@@ -49,7 +52,7 @@ public class KafkaConsumerConfig {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
// 自动提交 // 自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
//两次Poll之间的最大允许间隔。 //两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
......
...@@ -170,4 +170,5 @@ spring.kafka.listener.ack-mode=manual_immediate ...@@ -170,4 +170,5 @@ spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch spring.kafka.listener.type=batch
kafka.topic=PERSPECTIVE kafka.topic=PERSPECTIVE
emq.topic=iot/data/perspective emq.topic=iot/data/perspective
kafka.alarm.topic=EQUIPMENT_ALARM
iot.async.flag = false iot.async.flag = false
\ No newline at end of file
...@@ -332,7 +332,8 @@ ...@@ -332,7 +332,8 @@
si.gateway_id, si.gateway_id,
si.data_type, si.data_type,
si.equipment_specific_name, si.equipment_specific_name,
si.equipment_index_name si.equipment_index_name,
si.is_alarm
FROM FROM
wl_equipment_specific_index si wl_equipment_specific_index si
LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment