Commit fc6ad856 authored by chenzhao's avatar chenzhao

Merge branch 'develop_dl' of http://39.98.45.134:8090/moa/amos-boot-biz into develop_dl

parents 728d3d63 7b3c85d9
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>amos-boot-data</artifactId>
<groupId>com.amosframework.boot</groupId>
<version>1.0.0</version>
</parent>
<artifactId>amos-boot-data-alarm</artifactId>
<name>amos-boot-data-alarm</name>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-core-foundation</artifactId>
<version>${tyboot-version}</version>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-core-restful</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-core-auth</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-component-emq</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-component-event</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-component-opendata</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.yeejoin</groupId>
<artifactId>amos-feign-systemctl</artifactId>
<version>${amos.version}</version>
</dependency>
<dependency>
<groupId>com.yeejoin</groupId>
<artifactId>amos-component-config</artifactId>
<version>${amos.version}</version>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-core-rdbms</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.typroject</groupId>
<artifactId>tyboot-component-cache</artifactId>
<version>${tyboot-version}</version>
<exclusions>
<exclusion>
<groupId>org.typroject</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>19.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.3.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.yeejoin.amos;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.typroject.tyboot.core.restful.exception.GlobalExceptionHandler;
import java.net.InetAddress;
/**
*
* <pre>
*
* </pre>
*
* @author gwb
* @version $Id: OpenapiApplication.java, v 0.1 2021年9月27日 下午3:29:30 gwb Exp $
*/
@SpringBootApplication
@EnableTransactionManagement
@EnableConfigurationProperties
@ServletComponentScan
@EnableDiscoveryClient
@EnableFeignClients
@EnableAsync
@EnableEurekaClient
@EnableScheduling
@MapperScan(value = { "org.typroject.tyboot.*.*.face.orm.dao", "com.yeejoin.amos.api.*.face.orm.dao", "org.typroject.tyboot.face.*.orm.dao*",
"com.yeejoin.amos.boot.biz.common.dao.mapper" })
@ComponentScan({ "org.typroject", "com.yeejoin.amos" })
public class AlarmApplication {
private static final Logger logger = LogManager.getLogger(AlarmApplication.class);
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(AlarmApplication.class, args);
GlobalExceptionHandler.setAlwaysOk(true);
Environment env = context.getEnvironment();
String ip = InetAddress.getLocalHost().getHostAddress();
String port = env.getProperty("server.port");
String path = env.getProperty("server.servlet.context-path");
logger.info("\n----------------------------------------------------------\n\t"
+ "Application Amos-Biz-Boot is running! Access URLs:\n\t" + "Swagger文档: \thttp://" + ip + ":" + port
+ path + "/doc.html\n" + "----------------------------------------------------------");
}
}
spring.application.name=AMOS-ALARM
server.servlet.context-path=/alarm
server.port=11007
# jdbc_config
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://172.16.10.220:3306/equipment?useUnicode=true&allowMultiQueries=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=Yeejoin@2020
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.pool-name=DatebookHikariCP
spring.datasource.hikari.minimum-idle= 3
spring.datasource.hikari.maximum-pool-size= 30
spring.datasource.hikari.auto-commit= true
spring.datasource.hikari.idle-timeout= 500000
spring.datasource.hikari.max-lifetime= 1800000
spring.datasource.hikari.connection-timeout= 60000
spring.datasource.hikari.connection-test-query= SELECT 1
# REDIS (RedisProperties)
spring.redis.database=1
spring.redis.host=172.16.10.220
spring.redis.port=6379
spring.redis.password=yeejoin@2020
spring.redis.lettuce.pool.max-active=200
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.max-idle=10
spring.redis.lettuce.pool.min-idle=0
spring.redis.expire.time=30000
#注册中心地址
eureka.client.registry-fetch-interval-seconds=5
management.endpoint.health.show-details=always
management.endpoints.web.exposure.include=*
eureka.instance.health-check-url-path=/actuator/health
eureka.instance.lease-expiration-duration-in-seconds=10
eureka.instance.lease-renewal-interval-in-seconds=5
eureka.instance.metadata-map.management.context-path=${server.servlet.context-path}/actuator
eureka.instance.status-page-url-path=/actuator/info
eureka.instance.metadata-map.management.api-docs=http://localhost:${server.port}${server.servlet.context-path}/doc.html
eureka.instance.hostname= 172.16.10.220
eureka.instance.prefer-ip-address = true
eureka.client.serviceUrl.defaultZone=http://${spring.security.user.name}:${spring.security.user.password}@172.16.10.220:10001/eureka/
spring.security.user.name=admin
spring.security.user.password=a1234560
## emqx
emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
emqx.broker=tcp://172.16.10.220:1883
emqx.user-name=admin
emqx.password=public
mqtt.scene.host=mqtt://172.16.10.220:8083/mqtt
mqtt.client.product.id=mqtt
mqtt.topic=topic_mqtt
spring.mqtt.completionTimeout=3000
spring.profiles.active=dev
server.compression.enabled=true
spring.jackson.dateFormat=yyyy-MM-dd HH:mm:ss
logging.config=classpath:logback-${spring.profiles.active}.xml
#设置文件上传的大小限制
spring.servlet.multipart.maxFileSize=3MB
spring.servlet.multipart.maxRequestSize=3MB
## redis失效时间
redis.cache.failure.time=10800
# mybatis-plus
mybatis-plus.mapper-locations=classpath:mapper/*Mapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- show parameters for hibernate sql 专为 Hibernate 定制
<logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="TRACE" />
<logger name="org.hibernate.type.descriptor.sql.BasicExtractor" level="DEBUG" />
<logger name="org.hibernate.SQL" level="DEBUG" />
<logger name="org.hibernate.engine.QueryParameters" level="DEBUG" />
<logger name="org.hibernate.engine.query.HQLQueryPlan" level="DEBUG" />
-->
<!--myibatis log configure-->
<logger name="com.apache.ibatis" level="ERROR"/>
<logger name="java.sql.Connection" level="ERROR"/>
<logger name="java.sql.Statement" level="ERROR"/>
<logger name="java.sql.PreparedStatement" level="ERROR"/>
<logger name="com.baomidou" level="ERROR"/>
<logger name="org.springframework" level="INFO"/>
<logger name="org.apache.activemq" level="INFO"/>
<!-- 日志输出级别 -->
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
<!--日志异步到数据库 -->
<!--<appender name="DB" class="ch.qos.logback.classic.db.DBAppender">-->
<!--&lt;!&ndash;日志异步到数据库 &ndash;&gt;-->
<!--<connectionSource class="ch.qos.logback.core.db.DriverManagerConnectionSource">-->
<!--&lt;!&ndash;连接池 &ndash;&gt;-->
<!--<dataSource class="com.mchange.v2.c3p0.ComboPooledDataSource">-->
<!--<driverClass>com.mysql.jdbc.Driver</driverClass>-->
<!--<url>jdbc:mysql://127.0.0.1:3306/databaseName</url>-->
<!--<user>root</user>-->
<!--<password>root</password>-->
<!--</dataSource>-->
<!--</connectionSource>-->
<!--</appender>-->
</configuration>
\ No newline at end of file
......@@ -17,8 +17,9 @@
<modules>
<module>amos-boot-data-common</module>
<module>amos-boot-data-openapi</module>
<module>amos-boot-data-accessapi</module>
<module>amos-boot-data-common</module>
<module>amos-boot-data-openapi</module>
<module>amos-boot-data-accessapi</module>
<module>amos-boot-data-alarm</module>
</modules>
</project>
......@@ -62,6 +62,19 @@
<version>1.8.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies>
</project>
......@@ -35,7 +35,7 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner {
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null && v.getIndexAddress() !=null)
.collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity()));
.collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity(),(v1, v2) -> v1));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
}
......
......@@ -160,8 +160,6 @@ public class EquipmentIotMqttReceiveConfig {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
} else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handlerMqttRomaMessage(topic, msg);
} else if (dataType.equals("perspective") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handlerIotMessage(topic, msg);
} else if (dataType.equals("trigger") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handleDataToRiskModel(topic, msg);
}
......
package com.yeejoin.equipmanage.eqmx;
import com.alibaba.fastjson.JSON;
import com.yeejoin.equipmanage.kafka.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote Emq消息转发Kafka
*/
@Slf4j
@Component
public class EmqMessageService extends EmqxListener {
@Autowired
protected EmqKeeper emqKeeper;
@Autowired
protected KafkaProducerService kafkaProducerService;
@Value("${emq.topic}")
private String emqTopic;
@Value("${kafka.topic}")
private String kafkaTopic;
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
@PostConstruct
void init() {
new Thread(task_runnable).start();
String[] split = emqTopic.split(",");
Arrays.stream(split).forEach(e-> {
try {
emqKeeper.subscript(e, 1, this);
} catch (Exception exception) {
log.info("订阅emq消息失败 ====> message: {}", exception.getMessage());
}
});
}
@Override
public void processMessage(String topic, MqttMessage message) throws Exception {
JSONObject result = JSONObject.fromObject(new String(message.getPayload()));
JSONObject messageResult = new JSONObject();
messageResult.put("result", result);
messageResult.put("topic", topic);
blockingQueue.add(messageResult);
}
Runnable task_runnable = new Runnable() {
public void run() {
int k = 0;
boolean b = true;
while (b) {
k++;
b = k < Integer.MAX_VALUE;
try {
JSONObject messageResult = blockingQueue.take();
JSONObject result = messageResult.getJSONObject("result");
if ((messageResult.getString("topic")).equals(emqTopic)) {
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
}
} catch (Exception e) {
Thread.currentThread().interrupt();
log.info("发送kafka消息失败 ====> message: {}", e.getMessage());
}
}
}
};
}
package com.yeejoin.equipmanage.kafka;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.biz.common.utils.RedisKey;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equipmanage.common.entity.vo.EquipmentIndexVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 消费服务类
*/
@Slf4j
@Service
public class KafkaConsumerService {
@Autowired
private InfluxDbConnection influxDbConnection;
private Executor dataExecutor = new ThreadPoolTaskExecutor();
@Autowired
private RedisUtils redisUtils;
@KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
if (CollectionUtils.isEmpty(consumerRecords)) {
return;
}
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
String message = (String) kafkaMessage.get();
this.handleMessage(message,equipmentIndexVOMap);
}
}
} catch (Exception e) {
log.error("kafka失败,当前失败的批次。data:{}", consumerRecords);
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
private void handleMessage(String message, Map<Object, Object> equipmentIndexVOMap) {
dataExecutor.execute(new Runnable() {
@Override
public void run() {
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}",
equipmentSpeIndex.getEquipmentIndexName(),indexAddress,value,gatewayId);
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("address", indexAddress);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId());
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
//保存influxDB库
influxDbConnection.insert("iot_data", tagsMap, fieldsMap);
log.info("influxdb入库时间:{}",simpleDateFormat.format(new Date()));
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
}
}
});
}
private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
@PostConstruct
public void iotAsyncExecutor() {
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
int length = Runtime.getRuntime().availableProcessors();
int size = Math.max(length, 80);
workExecutor.setCorePoolSize(size * 2);
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
// 设置最大线程数
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
//配置队列大小
workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程活跃时间(秒)
workExecutor.setKeepAliveSeconds(60);
// 设置默认线程名称
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
// 等待所有任务结束后再关闭线程池
//当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
workExecutor.initialize();
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
}
}
package com.yeejoin.equipmanage.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 生产服务类
*/
@Slf4j
@Service
public class KafkaProducerService {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Resource
private KafkaTemplate<String, String> kafkaTemplateWithTransaction;
/**
* 发送消息(同步)
* @param topic 主题
* @param key 键
* @param message 值
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
//可以指定最长等待时间,也可以不指定
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
}
/**
* 发送消息并获取结果
* @param topic
* @param message
* @throws ExecutionException
* @throws InterruptedException
*/
public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
}
/**
* 发送消息(异步)
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息(异步) failure! topic : {}, message: {}", topic, message);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("发送消息(异步) success! topic: {}, message: {}", topic, message);
}
});
}
}
package com.yeejoin.equipmanage.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote kafka 消费者配置类
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置并发量,小于或者等于 Topic 的分区数
factory.setConcurrency(5);
// 设置为批量监听
factory.setBatchListener(Boolean.TRUE);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
// 自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//设置单次拉取的量,走公网访问时,该参数会有较大影响。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//每次Poll的最大数量。
//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写。
//属于同一个组的消费实例,会负载消费消息。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return props;
}
}
......@@ -10,6 +10,6 @@ import com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex;
public interface IPointSystemService {
//触发风险预警
public void sendWarning(String address, String value);
public void sendWarning(String address, String value,String valueLabe);
}
......@@ -42,12 +42,4 @@ public interface MqttReceiveService {
* @param message
*/
void handleDataToRiskModel(String topic, String message);
/**
* 处理Iot消息数据
*
* @param topic 主题
* @param message 消息内容
*/
void handlerIotMessage(String topic, String message);
}
......@@ -35,7 +35,6 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
......@@ -48,14 +47,11 @@ import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import org.typroject.tyboot.core.restful.exception.instance.BadRequest;
import org.typroject.tyboot.core.restful.utils.ResponseModel;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
......@@ -125,8 +121,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Autowired
private InfluxDbConnection influxDbConnection;
private Executor dataExecutor = new ThreadPoolTaskExecutor();
/**
* 泡沫罐KEY
*/
......@@ -634,59 +628,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerIotMessage(String topic, String message) {
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
dataExecutor.execute(new Runnable() {
@Override
public void run() {
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String deviceCode = jsonObject.getString("deviceCode");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}",
equipmentSpeIndex.getEquipmentIndexName(),indexAddress,value,gatewayId);
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("address", indexAddress);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId());
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
//保存influxDB库
influxDbConnection.insert("iot_data", tagsMap, fieldsMap);
log.info("influxdb入库时间:{}",simpleDateFormat.format(new Date()));
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
} finally {
}
}
});
}
/**
* 废弃代码,暂时不用,后期删除
* @param topic 主题
......@@ -2545,35 +2486,4 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
}
}
@PostConstruct
public void iotAsyncExecutor() {
if(iotAsyncExecutorFlag) {
System.out.println("-----------------iotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlagiotAsyncExecutorFlag");
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
int length = Runtime.getRuntime().availableProcessors();
int size = Math.max(length, 80);
workExecutor.setCorePoolSize(size * 2);
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
// 设置最大线程数
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
//配置队列大小
workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程活跃时间(秒)
workExecutor.setKeepAliveSeconds(60);
// 设置默认线程名称
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
// 等待所有任务结束后再关闭线程池
//当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
workExecutor.initialize();
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
}
}
}
......@@ -45,7 +45,7 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
protected EmqKeeper emqKeeper;
@Override
public void sendWarning(String address, String value) {
public void sendWarning(String address, String value,String valueLabe) {
try {
//通过测点地址获取,和对应值 获取kks
QueryWrapper<PointSystem> pointSystemWrapper = new QueryWrapper<>();
......@@ -74,8 +74,7 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
eqdata = (JSONObject) list.get(0);
//组装数据,发送预警
WarningDto warningDto = setWarningDto(pointSystem, eqdata);
WarningDto warningDto = setWarningDto(pointSystem, eqdata, valueLabe);
emqKeeper.getMqttClient().publish(STATIONWARNING, JSON.toJSONString(warningDto).getBytes(), 0, false);
} catch (Exception e) {
......@@ -85,28 +84,27 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
}
public WarningDto setWarningDto(PointSystem pointSystem,JSONObject eqdata ){
public WarningDto setWarningDto(PointSystem pointSystem,JSONObject eqdata,String valueLabe ){
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time= sdf.format(new Date());
String warningObjectCode=pointSystem.getKks();
List<TabContent> tabContent=new ArrayList<>();
tabContent.add(new TabContent( "KKS编码", TEXT, warningObjectCode, "key1"));
tabContent.add(new TabContent( "设备名称", TEXT, eqdata.get("kksms"), "key2"));
tabContent.add(new TabContent( "发生时间", TEXT, time, "key3"));
tabContent.add(new TabContent( "告警原因", TEXT, valueLabe, "key3"));
tabContent.add(new TabContent( "发生时间", TEXT, time, "key4"));
DynamicDetails dynamicDetails=new DynamicDetails( TABNAME, tabContent);
List<DynamicDetails> dynamicDetailsList=new ArrayList<>();
dynamicDetailsList.add(dynamicDetails);
StringBuffer indexKey=new StringBuffer(pointSystem.getStation())
StringBuilder indexKey=new StringBuilder(pointSystem.getStation())
.append("#")
.append(pointSystem.getNumber())
.append("#")
.append(pointSystem.getFunctionNum());
StringBuffer indexValue=new StringBuffer(pointSystem.getPointType())
.append("#")
.append(pointSystem.getValue());
String indexValue=valueLabe;
WarningDto WarningDto=new WarningDto(
indexKey.toString(),
indexValue.toString(),
indexValue,
null,
(String)eqdata.get("sourceAttributionDesc"),
(String)eqdata.get("sourceAttribution"),
......
......@@ -150,4 +150,24 @@ spring.influx.retention_policy=default
spring.influx.retention_policy_time=30d
spring.influx.actions=10000
spring.influx.bufferLimit=20000
#kafka
spring.kafka.bootstrap-servers=121.199.39.218:9092
spring.kafka.producer.retries=1
spring.kafka.producer.bootstrap-servers=121.199.39.218:9092
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=messageConsumerGroup
spring.kafka.consumer.bootstrap-servers=121.199.39.218:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch
kafka.topic=PERSPECTIVE
emq.topic=iot/data/perspective
iot.async.flag = false
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment