Commit 52113eb5 authored by 刘林's avatar 刘林

fix(equip):调整电建对接iot消息代码结构

parent 08ecc4a5
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>amos-boot-data</artifactId>
<groupId>com.amosframework.boot</groupId>
<version>1.0.0</version>
</parent>
<artifactId>amos-boot-data-equip</artifactId>
<name>amos-boot-data-equip</name>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-component-emq</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>com.yeejoin</groupId>
<artifactId>amos-component-influxdb</artifactId>
<version>1.8.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-core-restful</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-core-auth</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-core-rdbms</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.yeejoin.equip;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.typroject.tyboot.core.restful.exception.GlobalExceptionHandler;
import java.net.InetAddress;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote 启动类
*/
@SpringBootApplication
@EnableTransactionManagement
@EnableConfigurationProperties
@ServletComponentScan
@EnableDiscoveryClient
@EnableFeignClients
@EnableAsync
@EnableEurekaClient
@EnableScheduling
@MapperScan(value = { "org.typroject.tyboot.*.*.face.orm.dao", "com.yeejoin.amos.api.*.face.orm.dao", "org.typroject.tyboot.face.*.orm.dao*",
"com.yeejoin.equip.mapper","com.yeejoin.amos.boot.biz.common.dao.mapper" })
@ComponentScan({ "org.typroject", "com.yeejoin.equip","com.yeejoin.amos" })
public class EquipDataApplication {
private static final Logger logger = LogManager.getLogger(EquipDataApplication.class);
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(EquipDataApplication.class, args);
GlobalExceptionHandler.setAlwaysOk(true);
Environment env = context.getEnvironment();
String ip = InetAddress.getLocalHost().getHostAddress();
String port = env.getProperty("server.port");
String path = env.getProperty("server.servlet.context-path");
logger.info("\n----------------------------------------------------------\n\t"
+ "Application Amos-Biz-Boot is running! Access URLs:\n\t" + "Swagger文档: \thttp://" + ip + ":" + port
+ path + "/doc.html\n" + "----------------------------------------------------------");
}
}
package com.yeejoin.equip.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Configuration
@EnableAsync
public class EquipExecutorConfig {
@Bean(name = "equipAsyncExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(10);
//配置最大线程数
executor.setMaxPoolSize(500);
//配置队列大小
executor.setQueueCapacity(2000);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("namePrefix");
//线程池维护线程所允许的空闲时间
executor.setKeepAliveSeconds(30);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行--拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
//等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
package com.yeejoin.equip.config;
import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.mapper.EquipmentSpecificIndexMapper;
import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author LiuLin
* @date 2023/6/15
* @apiNote
*/
@Component
@Slf4j
public class EquipmentIndexCacheRunner implements CommandLineRunner {
@Autowired
private EquipmentSpecificIndexMapper equipmentSpecificIndexMapper;
@Autowired
private RedisUtils redisUtils;
@Override
public void run(String... args) throws Exception {
log.info(">>服务启动执行,执行预加载数据等操作");
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS);
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null && v.getIndexAddress() !=null)
.collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity(),(v1, v2) -> v1));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
}
}
package com.yeejoin.equip.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 消费者配置类
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置并发量,小于或者等于 Topic 的分区数
factory.setConcurrency(5);
// 设置为批量监听
factory.setBatchListener(Boolean.TRUE);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
// 自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//设置单次拉取的量,走公网访问时,该参数会有较大影响。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//每次Poll的最大数量。
//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写。
//属于同一个组的消费实例,会负载消费消息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return props;
}
}
package com.yeejoin.equip.entity;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.Date;
/**
* @description: 公共实体
* @author: duanwei
**/
@Data
@Accessors(chain = true)
public class BaseEntity implements Serializable {
private static final long serialVersionUID = -5464322936854328207L;
@TableId(type = IdType.ID_WORKER)
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
/**
* 新增和更新执行
*/
@TableField(value = "create_date", fill = FieldFill.INSERT)
private Date createDate;
}
package com.yeejoin.equip.entity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
@Data
@ApiModel(value = "性能指标详情返回vo实体", description = "性能指标详情返回vo实体")
public class EquipmentIndexVO {
@ApiModelProperty(value = "id")
private Long id;
@ApiModelProperty(value = "值")
private String value;
@ApiModelProperty(value = "equipment_id")
private String equipmentId;
@ApiModelProperty(value = "性能指标名称")
private String perfQuotaName;
@ApiModelProperty(value = "性能指标id")
private String perfQuotaDefinitionId;
@ApiModelProperty(value = "数量单位名称")
private String unitName;
@ApiModelProperty(value = "否物联指标")
private Integer isIot;
@ApiModelProperty(value = "物联指标")
private String typeName;
@ApiModelProperty(value = "物联指标ID")
private String typeCode;
@ApiModelProperty(value = "分类名称")
private String groupName;
@ApiModelProperty(value = "指标原始id,从iot平台接口获取")
private String indexId;
@ApiModelProperty(value = "性能指标")
private String perfQuotaStr;
@ApiModelProperty(value = "是否是核心参数")
private Boolean isImportentParameter;
@ApiModelProperty(value = "排序")
private Integer sortNum;
@ApiModelProperty(value = "類型")
private Integer type;
@ApiModelProperty(value = "物联nameKey")
private String nameKey;
@ApiModelProperty(value = "创建日期")
private Date createDate;
@ApiModelProperty(value = "更新日期")
private Date updateDate;
@ApiModelProperty(value = "是否支持趋势查看")
private Integer isTrend;
@ApiModelProperty(value = "是否告警")
private Integer isAlarm;
@ApiModelProperty(value = "指标枚举")
private String valueEnum;
@ApiModelProperty(value = "信号的索引键key,用于唯一索引信号")
private String indexAddress;
@ApiModelProperty(value = "测点类型,analog/state")
private String dataType;
@ApiModelProperty(value = "网关标识")
private String gatewayId;
@ApiModelProperty(value = "装备名称")
private String equipmentSpecificName;
@ApiModelProperty(value = "指标名称")
private String equipmentIndexName;
}
package com.yeejoin.equip.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* @author ZeHua Li
* @date 2020/10/30 11:12
* @since v2.0
*/
@Data
@EqualsAndHashCode(callSuper = true)
@TableName("wl_equipment_specific_index")
@ApiModel(value = "EquipmentSpecificIndex对象", description = "性能指标参数")
public class EquipmentSpecificIndex extends BaseEntity {
@ApiModelProperty(value = "单个设备id")
@TableField("equipment_specific_id")
private Long equipmentSpecificId;
@ApiModelProperty(value = "值")
@TableField("value")
private String value;
@ApiModelProperty(value = "值说明")
@TableField("value_label")
private String valueLabel;
@ApiModelProperty(value = "性能指标对应id")
@TableField("equipment_index_id")
private Long equipmentIndexId;
@ApiModelProperty(value = "更新时间")
@TableField("update_date")
private Date updateDate;
@ApiModelProperty(value = "装备名称(冗余字段)")
@TableField("equipment_specific_name")
private String equipmentSpecificName;
// equipmentSpecialName
@ApiModelProperty(value = "指标名称(冗余字段)")
@TableField("equipment_index_name")
private String equipmentIndexName;
@ApiModelProperty(value = "指标key(冗余字段)")
@TableField("equipment_index_key")
private String equipmentIndexKey;
/**
* 颜色
*/
@TableField(value = "emergency_level_color")
private String emergencyLevelColor;
/**
* 是否告警:0-否;1-是
*/
@TableField(value = "is_alarm")
private Integer isAlarm;
/**
* 紧急程度枚举(1:紧急,2:严重,3:轻微,4:正常,5:无效,6:备用,7:其他)
*/
@TableField(value = "emergency_level")
private String emergencyLevel;
/**
* 紧急程度描述
*/
@TableField(value = "emergency_level_describe")
private String emergencyLevelDescribe;
@ApiModelProperty(value = "iot数据上报唯一id")
private String traceId;
@TableField(exist = false)
private String nameKey;
@TableField(exist = false)
private String code;
@TableField(exist = false)
private String iotCode;
@TableField(exist = false)
private String type;
@TableField(exist = false)
private String orgCode;
@TableField(exist = false)
private String typeCode;
@TableField(exist = false)
private String typeName;
@TableField(exist = false)
private String indexName;
@TableField(exist = false)
private String equipmentSpecificIndexName;
@TableField(exist = false)
private String indexUnitName;
@TableField(exist = false)
private String qrCode;
@TableField(exist = false)
private String equipmentCode;
@TableField(exist = false)
private Long equipmentId;
@TableField(exist = false)
private Long equipmentDetailId;
@TableField(exist = false)
private String alamReason;
@ApiModelProperty(value = "设备CODE")
@TableField(exist = false)
private String equipmentSpecificCode;
@ApiModelProperty(value = "设备所属系统ids")
@TableField(exist = false)
private String systemId;
@ApiModelProperty(value = "详细位置")
@TableField(exist = false)
private String location;
@ApiModelProperty(value = "所属建筑id")
@TableField(exist = false)
private String buildId;
@ApiModelProperty(value = "是否遥测")
@TableField(exist = false)
private Integer isTrend;
@ApiModelProperty(value = "绑定视屏数量")
@TableField(exist = false)
private int num;
/**
* 机构/部门名称
*/
@TableField(exist = false)
private String bizOrgName;
/**
* 机构编码
*/
@TableField(exist = false)
private String bizOrgCode;
@ApiModelProperty(value = "装备系统code")
@TableField(exist = false)
private String specificCode;
@ApiModelProperty(value = "装备定义名称")
@TableField(exist = false)
private String equipmentName;
@TableField(exist = false)
private String equipmentType;
// 直流中心方便前端刷新展示
@TableField(exist = false)
private String UUID;
@TableField(value = "unit")
private String unit;
@ApiModelProperty(value = "信号的索引键key,用于唯一索引信号")
@TableField(value = "index_address")
private String indexAddress;
@ApiModelProperty(value = "品质,0为有效,1为无效")
@TableField(value = "quality")
private String quality;
@ApiModelProperty(value = "测点类型,analog/state")
@TableField(value = "data_type")
private String dataType;
@ApiModelProperty(value = "时间")
@TableField(value = "time_stamp")
private String timeStamp;
/**
* 指标值枚举
*/
@ApiModelProperty(value = "指标值枚举")
@TableField("value_enum")
private String valueEnum;
@ApiModelProperty(value = "网关标识")
@TableField(value = "gateway_id")
private String gatewayId;
}
package com.yeejoin.equip.eqmx;
import com.alibaba.fastjson.JSON;
import com.yeejoin.equip.kafka.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote Emq消息转发Kafka
*/
@Slf4j
@Component
public class EmqMessageService extends EmqxListener {
@Autowired
protected EmqKeeper emqKeeper;
@Autowired
protected KafkaProducerService kafkaProducerService;
@Value("${emq.topic}")
private String emqTopic;
@Value("${kafka.topic}")
private String kafkaTopic;
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
@PostConstruct
void init() {
new Thread(task_runnable).start();
String[] split = emqTopic.split(",");
Arrays.stream(split).forEach(e-> {
try {
emqKeeper.subscript(e, 1, this);
} catch (Exception exception) {
log.info("订阅emq消息失败 ====> message: {}", exception.getMessage());
}
});
}
@Override
public void processMessage(String topic, MqttMessage message) throws Exception {
JSONObject result = JSONObject.fromObject(new String(message.getPayload()));
JSONObject messageResult = new JSONObject();
messageResult.put("result", result);
messageResult.put("topic", topic);
blockingQueue.add(messageResult);
}
Runnable task_runnable = new Runnable() {
public void run() {
int k = 0;
boolean b = true;
while (b) {
k++;
b = k < Integer.MAX_VALUE;
try {
JSONObject messageResult = blockingQueue.take();
JSONObject result = messageResult.getJSONObject("result");
if ((messageResult.getString("topic")).equals(emqTopic)) {
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
}
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}
}
};
}
package com.yeejoin.equip.kafka;
import com.yeejoin.equip.service.KafkaMessageService;
import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 消费服务类
*/
@Slf4j
@Service
public class KafkaConsumerService {
@Autowired
protected KafkaProducerService kafkaProducerService;
@Autowired
private KafkaMessageService kafkaMessageService;
@Autowired
private RedisUtils redisUtils;
@KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
if (CollectionUtils.isEmpty(consumerRecords)) {
return;
}
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
String message = (String) kafkaMessage.get();
kafkaMessageService.handlerMessage(message,equipmentIndexVOMap);
}
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次。data:{}", consumerRecords);
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
}
\ No newline at end of file
package com.yeejoin.equip.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 生产服务类
*/
@Slf4j
@Service
public class KafkaProducerService {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Resource
private KafkaTemplate<String, String> kafkaTemplateWithTransaction;
/**
* 发送消息(同步)
* @param topic 主题
* @param key 键
* @param message 值
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
//可以指定最长等待时间,也可以不指定
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
}
/**
* 发送消息并获取结果
* @param topic
* @param message
* @throws ExecutionException
* @throws InterruptedException
*/
public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
}
/**
* 发送消息(异步)
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息(异步) failure! topic : {}, message: {}", topic, message);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("发送消息(异步) success! topic: {}, message: {}", topic, message);
}
});
}
}
package com.yeejoin.equip.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.entity.EquipmentSpecificIndex;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* @author ZeHua Li
* @date 2020/10/30 11:16
* @since v2.0
*/
@Mapper
public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecificIndex> {
List<EquipmentIndexVO> getEquipSpecificIndexList(EquipmentIndexVO equipmentIndexVo);
}
package com.yeejoin.equip.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.service.KafkaMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author LiuLin
* @date 2023年07月12日 10:44
*/
@Slf4j
@Service
public class KafkaMessageServiceImpl implements KafkaMessageService {
private Executor dataExecutor = new ThreadPoolTaskExecutor();
@Autowired
private InfluxDbConnection influxDbConnection;
@Value("${kafka.alarm.topic}")
private String alarmTopic;
//iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_";
//装备更新最新消息存入influxdb前缀
private static final String INDICATORS = "indicators_";
//装备更新最新消息存入influxdb固定时间
private static final Long TIME = 1688558007051L;
@Override
public void handlerMessage(String message, Map<Object, Object> equipmentIndexVOMap) {
dataExecutor.execute(new Runnable() {
@Override
public void run() {
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}",
equipmentSpeIndex.getEquipmentIndexName(),indexAddress,value,gatewayId);
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key);
tagsMap.put("address", indexAddress);
tagsMap.put("gatewayId", gatewayId);
tagsMap.put("dataType", dataType);
tagsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
tagsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap);
//kafkaProducerService.sendMessageAsync(alarmTopic,JSON.toJSONString(fieldsMap));
}
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
}
}
});
}
private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
@PostConstruct
public void iotAsyncExecutor() {
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
int length = Runtime.getRuntime().availableProcessors();
int size = Math.max(length, 80);
workExecutor.setCorePoolSize(size * 2);
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
// 设置最大线程数
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
//配置队列大小
workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程活跃时间(秒)
workExecutor.setKeepAliveSeconds(60);
// 设置默认线程名称
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
// 等待所有任务结束后再关闭线程池
//当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
workExecutor.initialize();
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
}
}
package com.yeejoin.equip.utils;
/**
* @description:
* @author: tw
* @createDate: 2021/6/22
* redis key
*/
public class RedisKey {
/**
* 装备指标Key值
*/
public static final String EQUIP_INDEX_ADDRESS = "equip_index_address";
}
package com.yeejoin.equip.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Map;
/**
* @author DELL
*/
@Component
public class RedisUtils {
@Autowired
private RedisTemplate redisTemplate;
/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hmset(String key, Map<String, Object> map) {
redisTemplate.opsForHash().putAll(key, map);
return true;
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
}
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://172.16.11.201:3306/equipment?useUnicode=true&allowMultiQueries=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=Yeejoin@2020
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.pool-name=DatebookHikariCP
spring.datasource.hikari.minimum-idle= 3
spring.datasource.hikari.maximum-pool-size= 30
spring.datasource.hikari.auto-commit= true
spring.datasource.hikari.idle-timeout= 500000
spring.datasource.hikari.max-lifetime= 1800000
spring.datasource.hikari.connection-timeout= 60000
spring.datasource.hikari.connection-test-query= SELECT 1
spring.redis.database=1
spring.redis.host=172.16.11.201
spring.redis.port=6379
spring.redis.password=yeejoin@2020
spring.redis.lettuce.pool.max-active=200
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.max-idle=10
spring.redis.lettuce.pool.min-idle=0
eureka.client.registry-fetch-interval-seconds=5
management.endpoint.health.show-details=always
management.endpoints.web.exposure.include=*
eureka.instance.health-check-url-path=/actuator/health
eureka.instance.lease-expiration-duration-in-seconds=10
eureka.instance.lease-renewal-interval-in-seconds=5
eureka.instance.metadata-map.management.context-path=${server.servlet.context-path}/actuator
eureka.instance.status-page-url-path=/actuator/info
eureka.instance.metadata-map.management.api-docs=http://localhost:${server.port}${server.servlet.context-path}/doc.html
eureka.instance.hostname= 172.16.11.201
eureka.instance.prefer-ip-address = true
eureka.client.serviceUrl.defaultZone=http://${spring.security.user.name}:${spring.security.user.password}@172.16.10.220:10001/eureka/
spring.security.user.name=admin
spring.security.user.password=a1234560
## emqx
emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
emqx.broker=tcp://172.16.11.201:1883
emqx.user-name=admin
emqx.password=public
mqtt.scene.host=mqtt://172.16.11.201:8083/mqtt
mqtt.client.product.id=mqtt
mqtt.topic=topic_mqtt
spring.mqtt.completionTimeout=3000
# influxDB
spring.influx.url=http://172.16.11.201:8086
spring.influx.password=Yeejoin@2020
spring.influx.user=root
spring.influx.database=iot_platform
spring.influx.retention_policy=default
spring.influx.retention_policy_time=30d
spring.influx.actions=10000
spring.influx.bufferLimit=20000
#kafka
spring.kafka.bootstrap-servers=172.16.10.215:9092
spring.kafka.producer.retries=1
spring.kafka.producer.bootstrap-servers=172.16.10.215:9092
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=messageConsumerGroup
spring.kafka.consumer.bootstrap-servers=172.16.10.215:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch
kafka.topic=PERSPECTIVE
emq.topic=iot/data/perspective
kafka.alarm.topic=EQUIPMENT_ALARM
\ No newline at end of file
spring.application.name=AMOS-DATA-EQUIP
server.servlet.context-path=/data-equip
server.port=8200
spring.profiles.active=dev
server.compression.enabled=true
spring.jackson.dateFormat=yyyy-MM-dd HH:mm:ss
spring.servlet.multipart.maxFileSize=3MB
spring.servlet.multipart.maxRequestSize=3MB
mybatis-plus.mapper-locations=classpath:mapper/*Mapper.xml
#\u65E5\u5FD7\u7EA7\u522BALL < TRACE < DEBUG < INFO < WARN < ERROR < OFF
logging.level.root=INFO
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<springProperty scope="context" name="LOG_LEVEL" source="logging.level.root" defaultValue="info"/>
<springProperty scope="context" name="MAX_FILE_SIZE" source="logging.max_file_size" defaultValue="128MB"/>
<springProperty scope="context" name="MAX_HISTORY" source="log.max_history" defaultValue="7"/>
<springProperty scope="context" name="PATTERN" source="log.pattern"
defaultValue="-|%d{yyyy-MM-dd HH:mm:ss.SSS}|%-5level|%X{tid}|%thread|%logger{36}.%M:%L-%msg%n"/>
<property name="LOG_NAME" value="message"/>
<property name="LOG_PATH" value="./logs"/>
<property name="LOG_DIR" value="${LOG_PATH}/${LOG_NAME}/%d{yyyyMMdd}"/>
<property name="CHARSET" value="UTF-8"/>
<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${PATTERN}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
</appender>
<appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level> <!-- 只记录error级别的日志 -->
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<!-- 定义文件的名称 -->
<file>${LOG_PATH}/${LOG_NAME}/error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_DIR}/err_${LOG_NAME}%i.log</FileNamePattern>
<MaxHistory>${MAX_HISTORY}</MaxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>${PATTERN}</Pattern>
</layout>
</appender>
<appender name="FILE_ALL" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 定义文件的名称 -->
<file>${LOG_PATH}/${LOG_NAME}/total.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_DIR}/all_${LOG_NAME}%i.log</FileNamePattern>
<MaxHistory>${MAX_HISTORY}</MaxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>${PATTERN}</pattern>
</layout>
</appender>
<appender name="ASYNC_STDOUT" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STDOUT"/>
<queueSize>256</queueSize>
<neverBlock>true</neverBlock>
<includeCallerData>true</includeCallerData>
</appender>
<appender name="ASYNC_FILE_ALL" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="FILE_ALL"/>
<queueSize>1024</queueSize>
<neverBlock>true</neverBlock>
<includeCallerData>true</includeCallerData>
</appender>
<appender name="ASYNC_ERROR" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="ERROR"/>
<queueSize>256</queueSize>
<neverBlock>true</neverBlock>
<includeCallerData>true</includeCallerData>
</appender>
<!--统一日志输出级别,其他appender中如果有高于此处等级设置的也会被输出 -->
<root level="${LOG_LEVEL}">
<springProfile name="test,dev">
<appender-ref ref="ASYNC_STDOUT"/>
</springProfile>
<appender-ref ref="ASYNC_FILE_ALL"/>
<appender-ref ref="ASYNC_ERROR"/>
</root>
</configuration>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yeejoin.equip.mapper.EquipmentSpecificIndexMapper">
<select id="getEquipSpecificIndexList" resultType="com.yeejoin.equip.entity.EquipmentIndexVO">
SELECT
si.equipment_specific_id AS equipmentId,
ei.id,
ei.name_key,
ei.name AS perfQuotaName,
si.value,
ei.is_iot,
si.unit AS unitName,
ei.sort_num,
si.create_date,
si.update_date,
si.index_address,
si.gateway_id,
si.data_type,
si.equipment_specific_name,
si.equipment_index_name,
si.is_alarm,
si.value_enum AS valueEnum
FROM
wl_equipment_specific_index si
LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id
<where>
<if test="equipmentId != null and equipmentId !=''">
si.equipment_specific_id = #{equipmentId}
</if>
<if test="isIot != null and isIot !=''">
AND ei.is_iot = #{isIot}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
...@@ -21,5 +21,6 @@ ...@@ -21,5 +21,6 @@
<module>amos-boot-data-openapi</module> <module>amos-boot-data-openapi</module>
<module>amos-boot-data-accessapi</module> <module>amos-boot-data-accessapi</module>
<module>amos-boot-data-alarm</module> <module>amos-boot-data-alarm</module>
<module>amos-boot-data-equip</module>
</modules> </modules>
</project> </project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment