Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
820d798e
Commit
820d798e
authored
Sep 20, 2023
by
刘林
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix(equip):集成TDengine数据库,重构对接iot代码
parent
190fcbc8
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
28 changed files
with
919 additions
and
321 deletions
+919
-321
pom.xml
amos-boot-data/amos-boot-data-equip/pom.xml
+5
-5
ElasticSearchConfig.java
...in/java/com/yeejoin/equip/config/ElasticSearchConfig.java
+6
-15
EquipmentIndexCacheRunner.java
...a/com/yeejoin/equip/config/EquipmentIndexCacheRunner.java
+29
-17
MqttPropertyConfig.java
...ain/java/com/yeejoin/equip/config/MqttPropertyConfig.java
+24
-0
ESEquipments.java
.../src/main/java/com/yeejoin/equip/entity/ESEquipments.java
+59
-0
IndicatorData.java
...src/main/java/com/yeejoin/equip/entity/IndicatorData.java
+4
-1
EmqMessageService.java
...c/main/java/com/yeejoin/equip/eqmx/EmqMessageService.java
+78
-78
KafkaConsumerWithThread.java
...java/com/yeejoin/equip/kafka/KafkaConsumerWithThread.java
+0
-0
ESEquipmentsMapper.java
...com/yeejoin/equip/mapper/tdengine/ESEquipmentsMapper.java
+22
-0
IndicatorDataMapper.java
...om/yeejoin/equip/mapper/tdengine/IndicatorDataMapper.java
+0
-2
MessageIntegration.java
.../main/java/com/yeejoin/equip/mqtt/MessageIntegration.java
+106
-0
MessageTransfer.java
...src/main/java/com/yeejoin/equip/mqtt/MessageTransfer.java
+78
-0
MqttConstant.java
...ain/java/com/yeejoin/equip/mqtt/message/MqttConstant.java
+18
-0
MqttTopicEnum.java
...in/java/com/yeejoin/equip/mqtt/message/MqttTopicEnum.java
+20
-0
HandleESMessage2TDService.java
.../com/yeejoin/equip/service/HandleESMessage2TDService.java
+45
-0
HandleMessageService.java
.../java/com/yeejoin/equip/service/HandleMessageService.java
+78
-0
InitTDEngineDbService.java
...java/com/yeejoin/equip/service/InitTDEngineDbService.java
+7
-5
KafkaMessageService.java
...n/java/com/yeejoin/equip/service/KafkaMessageService.java
+0
-9
ElasticSearchClient.java
...ain/java/com/yeejoin/equip/utils/ElasticSearchClient.java
+44
-0
ElasticSearchConfig.java
...ain/java/com/yeejoin/equip/utils/ElasticSearchConfig.java
+0
-96
ElasticSearchUtil.java
.../main/java/com/yeejoin/equip/utils/ElasticSearchUtil.java
+67
-0
ExecutorFactory.java
...rc/main/java/com/yeejoin/equip/utils/ExecutorFactory.java
+29
-0
RedisKey.java
...equip/src/main/java/com/yeejoin/equip/utils/RedisKey.java
+0
-17
RedisUtils.java
...uip/src/main/java/com/yeejoin/equip/utils/RedisUtils.java
+5
-38
SpringUtils.java
...ip/src/main/java/com/yeejoin/equip/utils/SpringUtils.java
+45
-0
application-dev.properties
...-data-equip/src/main/resources/application-dev.properties
+25
-17
ESEquipmentsMapper.xml
...src/main/resources/mapper/tdengine/ESEquipmentsMapper.xml
+92
-0
IndicatorDataMapper.xml
...rc/main/resources/mapper/tdengine/IndicatorDataMapper.xml
+33
-21
No files found.
amos-boot-data/amos-boot-data-equip/pom.xml
View file @
820d798e
...
...
@@ -28,11 +28,11 @@
<artifactId>
spring-kafka
</artifactId>
</dependency>
<dependency
>
<groupId>
org.typroject
</groupId
>
<artifactId>
tyboot-component-emq
</artifactId
>
<version>
1.1.20
</version
>
</dependency
>
<!-- <dependency>--
>
<!-- <groupId>org.typroject</groupId>--
>
<!-- <artifactId>tyboot-component-emq</artifactId>--
>
<!-- <version>1.1.20</version>--
>
<!-- </dependency>--
>
<dependency>
<groupId>
com.yeejoin
</groupId>
...
...
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/
utils/ElasticSearchRuntimeEnvironment
.java
→
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/
config/ElasticSearchConfig
.java
View file @
820d798e
package
com
.
yeejoin
.
equip
.
utils
;
package
com
.
yeejoin
.
equip
.
config
;
import
lombok.AllArgsConstructor
;
import
lombok.Getter
;
import
lombok.NoArgsConstructor
;
import
lombok.Setter
;
import
lombok.*
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
;
import
org.springframework.boot.context.properties.ConfigurationProperties
;
import
org.springframework.stereotype.Component
;
/**
* \* Created with IntelliJ IDEA.
* \* User: 煦仔
* \* Date: 2020-12-22
* \* Time: 11:01
* \* To change this template use File | Settings | File Templates.
* \* Description: ElasticSearch 配置
* \
* ElasticSearch 配置
*
*/
@Setter
@Getter
@Data
@AllArgsConstructor
@NoArgsConstructor
@Component
@ConfigurationProperties
(
prefix
=
"elasticsearch"
)
@ConditionalOnProperty
(
"elasticsearch.address"
)
public
class
ElasticSearch
RuntimeEnvironment
{
public
class
ElasticSearch
Config
{
/**
* es连接地址,如果有多个用,隔开
...
...
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/config/EquipmentIndexCacheRunner.java
View file @
820d798e
...
...
@@ -2,19 +2,18 @@ package com.yeejoin.equip.config;
import
com.yeejoin.equip.entity.EquipmentIndexVO
;
import
com.yeejoin.equip.mapper.mysql.EquipmentSpecificIndexMapper
;
import
com.yeejoin.equip.service.KafkaMessageService
;
import
com.yeejoin.equip.utils.RedisKey
;
import
com.yeejoin.equip.utils.RedisUtils
;
import
com.yeejoin.equip.service.HandleESMessage2TDService
;
import
com.yeejoin.equip.service.InitTDEngineDbService
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.boot.CommandLineRunner
;
import
org.springframework.stereotype.Component
;
import
org.springframework.transaction.annotation.Transactional
;
import
redis.clients.jedis.Jedis
;
import
redis.clients.jedis.Pipeline
;
import
javax.annotation.Resource
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
/**
* @author LiuLin
...
...
@@ -25,24 +24,36 @@ import java.util.stream.Collectors;
@Component
@Transactional
(
transactionManager
=
"mysqlTransactionManager"
)
public
class
EquipmentIndexCacheRunner
implements
CommandLineRunner
{
@Resource
private
EquipmentSpecificIndexMapper
equipmentSpecificIndexMapper
;
@Resource
private
RedisUtils
redisUtils
;
@Autowired
private
KafkaMessageService
kafkaMessageService
;
private
InitTDEngineDbService
initTDEngineDbService
;
@Autowired
private
HandleESMessage2TDService
handleESMessage2TDService
;
@Value
(
"${spring.redis.host}"
)
private
String
redisHost
;
@Value
(
"${spring.redis.port}"
)
private
Integer
redisPort
;
@Value
(
"${spring.redis.password}"
)
private
String
redisPassword
;
@Override
public
void
run
(
String
...
args
)
throws
Exception
{
log
.
info
(
">>服务启动执行,执行预加载数据等操作"
);
redisUtils
.
del
(
RedisKey
.
EQUIP_INDEX_ADDRESS
);
Jedis
jedis
=
new
Jedis
(
redisHost
,
redisPort
);
jedis
.
auth
(
redisPassword
);
Pipeline
pipeline
=
jedis
.
pipelined
();
List
<
EquipmentIndexVO
>
equipSpecificIndexList
=
equipmentSpecificIndexMapper
.
getEquipSpecificIndexList
(
null
);
Map
<
String
,
Object
>
equipmentIndexVOMap
=
equipSpecificIndexList
.
stream
()
.
filter
(
v
->
v
.
getGatewayId
()
!=
null
)
.
collect
(
Collectors
.
toMap
(
vo
->
vo
.
getIndexAddress
()
+
"_"
+
vo
.
getGatewayId
(),
Function
.
identity
(),(
v1
,
v2
)
->
v1
));
redisUtils
.
hmset
(
RedisKey
.
EQUIP_INDEX_ADDRESS
,
equipmentIndexVOMap
);
kafkaMessageService
.
init
();
equipSpecificIndexList
.
forEach
(
vo
->{
String
key
=
vo
.
getIndexAddress
()
+
"_"
+
vo
.
getGatewayId
();
pipeline
.
del
(
key
);
pipeline
.
set
(
key
,
String
.
valueOf
(
vo
));
});
pipeline
.
syncAndReturnAll
();
log
.
info
(
">>>>>>>>>>>>>>>>服务启动执行Redis缓存预加载指标数据完成!>>>>>>>>>>>>>>>>"
);
//initTDEngineDbService.init();
}
}
\ No newline at end of file
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/config/MqttPropertyConfig.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
config
;
import
lombok.Data
;
import
org.springframework.boot.context.properties.ConfigurationProperties
;
import
org.springframework.context.annotation.Configuration
;
/**
* @author LiuLin
* @date 2023年08月18日 11:08
*/
@Data
@ConfigurationProperties
(
prefix
=
"emqx"
)
@Configuration
public
class
MqttPropertyConfig
{
private
String
broker
;
private
String
clientUserName
;
private
String
clientPassword
;
private
String
clientId
;
private
Boolean
cleanSession
;
private
String
bizClientId
;
private
String
[]
bizTopic
;
private
int
maxInflight
;
private
int
keepAliveInterval
;
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/entity/ESEquipments.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
entity
;
import
io.github.classgraph.json.Id
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
org.springframework.data.elasticsearch.annotations.DateFormat
;
import
org.springframework.data.elasticsearch.annotations.Document
;
import
org.springframework.data.elasticsearch.annotations.Field
;
import
org.springframework.data.elasticsearch.annotations.FieldType
;
import
java.util.Date
;
/**
* @description:
* @author: LiuLin
* @createDate: 2023/09/18
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document
(
indexName
=
"jxiop_equipments"
)
public
class
ESEquipments
{
@Id
private
String
id
;
@Field
(
type
=
FieldType
.
Text
,
index
=
false
)
private
String
address
;
@Field
(
type
=
FieldType
.
Text
)
private
String
dataType
;
@Field
(
type
=
FieldType
.
Text
)
private
String
equipmentSpecificName
;
@Field
(
type
=
FieldType
.
Keyword
)
private
String
gatewayId
;
@Field
(
type
=
FieldType
.
Text
)
private
String
isAlarm
;
@Field
(
type
=
FieldType
.
Date
,
format
=
DateFormat
.
basic_date_time
,
index
=
false
)
private
Date
createdTime
;
@Field
(
type
=
FieldType
.
Text
,
index
=
false
)
private
String
unit
;
@Field
(
type
=
FieldType
.
Text
)
private
String
value
;
@Field
(
type
=
FieldType
.
Float
,
index
=
false
)
private
Float
valueF
;
@Field
(
type
=
FieldType
.
Text
)
private
String
valueLabel
;
@Field
(
type
=
FieldType
.
Text
,
index
=
false
)
private
String
traceId
;
@Field
(
type
=
FieldType
.
Keyword
)
private
String
equipmentIndexName
;
@Field
(
type
=
FieldType
.
Keyword
)
private
String
equipmentNumber
;
@Field
(
type
=
FieldType
.
Text
)
private
String
frontModule
;
@Field
(
type
=
FieldType
.
Text
)
private
String
systemType
;
@Field
(
type
=
FieldType
.
Text
,
index
=
false
)
private
String
pictureName
;
@Field
(
type
=
FieldType
.
Text
)
private
String
displayName
;
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/entity/IndicatorData.java
View file @
820d798e
package
com
.
yeejoin
.
equip
.
entity
;
import
com.yeejoin.equip.mqtt.message.MqttTopicEnum
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
java.util.Date
;
/**
...
...
@@ -24,6 +24,9 @@ public class IndicatorData {
private
String
equipmentIndexName
;
private
String
valueLabel
;
private
String
value
;
private
float
valueF
;
private
String
unit
;
private
String
signalType
;
private
Date
createdTime
;
private
MqttTopicEnum
mqttTopicEnum
;
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/eqmx/EmqMessageService.java
View file @
820d798e
package
com
.
yeejoin
.
equip
.
eqmx
;
import
com.alibaba.fastjson.JSON
;
import
com.yeejoin.equip.kafka.KafkaProducerService
;
import
lombok.extern.slf4j.Slf4j
;
import
net.sf.json.JSONObject
;
import
org.eclipse.paho.client.mqttv3.MqttMessage
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Component
;
import
org.typroject.tyboot.component.emq.EmqKeeper
;
import
org.typroject.tyboot.component.emq.EmqxListener
;
import
javax.annotation.PostConstruct
;
import
java.util.Arrays
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.LinkedBlockingQueue
;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote Emq消息转发Kafka
*/
@Slf4j
@Component
public
class
EmqMessageService
extends
EmqxListener
{
@Autowired
protected
EmqKeeper
emqKeeper
;
@Autowired
protected
KafkaProducerService
kafkaProducerService
;
@Value
(
"${emq.topic}"
)
private
String
emqTopic
;
@Value
(
"${kafka.topic}"
)
private
String
kafkaTopic
;
private
static
final
BlockingQueue
<
JSONObject
>
blockingQueue
=
new
LinkedBlockingQueue
<>();
@PostConstruct
void
init
()
throws
Exception
{
emqKeeper
.
subscript
(
emqTopic
,
1
,
this
);
}
@Override
public
void
processMessage
(
String
topic
,
MqttMessage
message
)
throws
Exception
{
JSONObject
result
=
JSONObject
.
fromObject
(
new
String
(
message
.
getPayload
()));
//JSONObject messageResult = new JSONObject();
//messageResult.put("result", result);
//messageResult.put("topic", topic);
//blockingQueue.add(messageResult);
if
(
topic
.
equals
(
emqTopic
))
{
kafkaProducerService
.
sendMessageAsync
(
kafkaTopic
,
JSON
.
toJSONString
(
result
));
}
}
//Runnable task_runnable = new Runnable() {
// public void run() {
// int k = 0;
// boolean b = true;
// while (b) {
// k++;
// b = k < Integer.MAX_VALUE;
// try {
// JSONObject messageResult = blockingQueue.take();
// JSONObject result = messageResult.getJSONObject("result");
// if ((messageResult.getString("topic")).equals(emqTopic)) {
// kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
// }
// } catch (Exception e) {
// Thread.currentThread().interrupt();
// }
// }
// }
//};
}
//
package com.yeejoin.equip.eqmx;
//
//
import com.alibaba.fastjson.JSON;
//
import com.yeejoin.equip.kafka.KafkaProducerService;
//
import lombok.extern.slf4j.Slf4j;
//
import net.sf.json.JSONObject;
//
import org.eclipse.paho.client.mqttv3.MqttMessage;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.beans.factory.annotation.Value;
//
import org.springframework.stereotype.Component;
//
import org.typroject.tyboot.component.emq.EmqKeeper;
//
import org.typroject.tyboot.component.emq.EmqxListener;
//
import javax.annotation.PostConstruct;
//
import java.util.Arrays;
//
import java.util.concurrent.BlockingQueue;
//
import java.util.concurrent.LinkedBlockingQueue;
//
/
//
**
//
* @author LiuLin
//
* @date 2023/6/25
//
* @apiNote Emq消息转发Kafka
//
*/
//
@Slf4j
//
@Component
//
public class EmqMessageService extends EmqxListener {
//
//
@Autowired
//
protected EmqKeeper emqKeeper;
//
//
@Autowired
//
protected KafkaProducerService kafkaProducerService;
//
//
@Value("${emq.topic}")
//
private String emqTopic;
//
//
@Value("${kafka.topic}")
//
private String kafkaTopic;
//
//
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
//
//
@PostConstruct
//
void init() throws Exception {
//
emqKeeper.subscript(emqTopic, 1, this);
//
}
//
//
@Override
//
public void processMessage(String topic, MqttMessage message) throws Exception {
//
JSONObject result = JSONObject.fromObject(new String(message.getPayload()));
//
//JSONObject messageResult = new JSONObject();
//
//messageResult.put("result", result);
//
//messageResult.put("topic", topic);
//
//blockingQueue.add(messageResult);
//
//
if (topic.equals(emqTopic)) {
//
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
//
}
//
}
//
//
//Runnable task_runnable = new Runnable() {
//
// public void run() {
//
// int k = 0;
//
// boolean b = true;
//
// while (b) {
//
// k++;
//
// b = k < Integer.MAX_VALUE;
//
// try {
//
// JSONObject messageResult = blockingQueue.take();
//
// JSONObject result = messageResult.getJSONObject("result");
//
// if ((messageResult.getString("topic")).equals(emqTopic)) {
//
// kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
//
// }
//
// } catch (Exception e) {
//
// Thread.currentThread().interrupt();
//
// }
//
// }
//
// }
//
//};
//
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/kafka/KafkaConsumerWithThread.java
View file @
820d798e
This diff is collapsed.
Click to expand it.
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/mapper/tdengine/ESEquipmentsMapper.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
mapper
.
tdengine
;
import
com.yeejoin.equip.entity.ESEquipments
;
import
com.yeejoin.equip.entity.IndicatorData
;
import
org.springframework.stereotype.Component
;
import
java.util.List
;
/**
* @author CuiXi
* @version 1.0
* @Description:
* @date 2021/3/11 14:30
*/
@Component
public
interface
ESEquipmentsMapper
{
int
batchInsert
(
List
<
ESEquipments
>
esEquipmentsList
);
int
insert
(
ESEquipments
esEquipments
);
void
createTable
();
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/mapper/tdengine/IndicatorDataMapper.java
View file @
820d798e
...
...
@@ -16,8 +16,6 @@ public interface IndicatorDataMapper {
int
insert
(
IndicatorData
indicatorData
);
int
batchInsert
(
List
<
IndicatorData
>
indicatorDataList
);
void
createDB
();
void
createTable
();
...
...
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/mqtt/MessageIntegration.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
mqtt
;
import
com.yeejoin.equip.config.MqttPropertyConfig
;
import
com.yeejoin.equip.entity.IndicatorData
;
import
com.yeejoin.equip.mqtt.message.MqttTopicEnum
;
import
com.yeejoin.equip.utils.ExecutorFactory
;
import
org.eclipse.paho.client.mqttv3.MqttConnectOptions
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.integration.annotation.ServiceActivator
;
import
org.springframework.integration.channel.DirectChannel
;
import
org.springframework.integration.dsl.IntegrationFlow
;
import
org.springframework.integration.dsl.IntegrationFlows
;
import
org.springframework.integration.endpoint.MessageProducerSupport
;
import
org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory
;
import
org.springframework.integration.mqtt.core.MqttPahoClientFactory
;
import
org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
;
import
org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler
;
import
org.springframework.integration.mqtt.support.DefaultPahoMessageConverter
;
import
org.springframework.messaging.MessageChannel
;
import
org.springframework.messaging.MessageHandler
;
import
javax.annotation.Resource
;
import
java.util.Objects
;
import
static
com
.
yeejoin
.
equip
.
mqtt
.
message
.
MqttConstant
.*;
/**
* 消息处理器
*
* @author LiuLin
* @date 2023年08月18日 10:56
*/
@Configuration
public
class
MessageIntegration
{
@Resource
private
MqttPropertyConfig
mqttPropertyConfig
;
@Bean
public
MqttConnectOptions
mqttConnectOptions
()
{
MqttConnectOptions
options
=
new
MqttConnectOptions
();
options
.
setServerURIs
(
new
String
[]{
mqttPropertyConfig
.
getBroker
()});
options
.
setUserName
(
mqttPropertyConfig
.
getClientUserName
());
options
.
setPassword
(
mqttPropertyConfig
.
getClientPassword
().
toCharArray
());
options
.
setConnectionTimeout
(
DEFAULT_CONNECTION_TIMEOUT
);
// 设置心跳:1.5*20秒
options
.
setKeepAliveInterval
(
mqttPropertyConfig
.
getKeepAliveInterval
());
// 设置最大并发数
options
.
setMaxInflight
(
mqttPropertyConfig
.
getMaxInflight
());
options
.
setAutomaticReconnect
(
true
);
//options.setCleanSession(false);
return
options
;
}
@Bean
public
MqttPahoClientFactory
mqttClientFactory
()
{
DefaultMqttPahoClientFactory
factory
=
new
DefaultMqttPahoClientFactory
();
factory
.
setConnectionOptions
(
mqttConnectOptions
());
return
factory
;
}
@Bean
public
MessageProducerSupport
bizInbound
()
{
MqttPahoMessageDrivenChannelAdapter
adapter
=
new
MqttPahoMessageDrivenChannelAdapter
(
mqttPropertyConfig
.
getBizClientId
(),
mqttClientFactory
(),
mqttPropertyConfig
.
getBizTopic
()
);
adapter
.
setCompletionTimeout
(
DEFAULT_COMPLETION_TIMEOUT
);
adapter
.
setConverter
(
new
DefaultPahoMessageConverter
());
adapter
.
setQos
(
QOS_DEFAULT
);
return
adapter
;
}
@Bean
public
MessageChannel
mqttOutboundChannel
()
{
return
new
DirectChannel
();
}
@Bean
@ServiceActivator
(
inputChannel
=
"mqttOutboundChannel"
)
public
MessageHandler
mqttOutbound
()
{
MqttPahoMessageHandler
messageHandler
=
new
MqttPahoMessageHandler
(
mqttPropertyConfig
.
getClientId
(),
mqttClientFactory
()
);
messageHandler
.
setAsync
(
true
);
messageHandler
.
setDefaultQos
(
QOS_DEFAULT
);
return
messageHandler
;
}
@Bean
public
IntegrationFlow
bizMsgFlow
()
{
return
IntegrationFlows
.
from
(
bizInbound
())
.
channel
(
channels
->
channels
.
executor
(
ExecutorFactory
.
buildBizExecutor
()))
.
handle
(
MessageTransfer:
:
mqttMessage2RawMessage
)
//根据Topic后缀进行分流
.<
IndicatorData
,
MqttTopicEnum
>
route
(
IndicatorData:
:
getMqttTopicEnum
,
mapping
->
mapping
.
subFlowMapping
(
MqttTopicEnum
.
perspective
,
flow
->
flow
.
handle
(
"handleMessageService"
,
"processMessage"
)
.
filter
(
Objects:
:
nonNull
)
.
handle
(
mqttOutbound
()))
.
defaultOutputToParentFlow
())
.
get
();
}
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/mqtt/MessageTransfer.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
mqtt
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.yeejoin.equip.entity.EquipmentIndexVO
;
import
com.yeejoin.equip.entity.IndicatorData
;
import
com.yeejoin.equip.mqtt.message.MqttTopicEnum
;
import
com.yeejoin.equip.utils.RedisUtils
;
import
com.yeejoin.equip.utils.SpringUtils
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.ObjectUtils
;
import
org.springframework.integration.mqtt.support.MqttHeaders
;
import
org.springframework.stereotype.Component
;
import
java.util.Arrays
;
import
java.util.Map
;
import
static
com
.
yeejoin
.
equip
.
mqtt
.
message
.
MqttConstant
.*;
/**
* @author LiuLin
* @date 2023年07月13日 09:58
*/
@Slf4j
@Component
public
class
MessageTransfer
{
/**
* 转为原生数据,payload为字节数组
**/
public
static
IndicatorData
mqttMessage2RawMessage
(
String
payload
,
Map
<
String
,
Object
>
headers
)
{
log
.
info
(
"received raw message, header >>> {}, payload >>> {}"
,
headers
,
JSONObject
.
toJSONString
(
payload
));
RedisUtils
redisUtils
=
(
RedisUtils
)
SpringUtils
.
getBean
(
"redisUtils"
);
IndicatorData
indicatorData
=
JSON
.
parseObject
(
payload
,
IndicatorData
.
class
);
try
{
String
topic
=
headers
.
get
(
MqttHeaders
.
RECEIVED_TOPIC
).
toString
();
String
[]
topicItems
=
topic
.
split
(
TOPIC_SPLITTER
);
indicatorData
.
setMqttTopicEnum
(
MqttTopicEnum
.
of
(
topicItems
[
topicItems
.
length
-
1
]));
String
key
=
indicatorData
.
getAddress
()
+
"_"
+
indicatorData
.
getGatewayId
();
if
(
redisUtils
.
get
(
key
)!=
null
)
{
EquipmentIndexVO
equipmentSpeIndex
=
(
EquipmentIndexVO
)
redisUtils
.
get
(
key
);
String
valueLabel
=
valueTranslate
(
indicatorData
.
getValue
(),
equipmentSpeIndex
.
getValueEnum
());
indicatorData
.
setIsAlarm
(
String
.
valueOf
(
equipmentSpeIndex
.
getIsAlarm
()));
indicatorData
.
setEquipmentIndexName
(
equipmentSpeIndex
.
getEquipmentIndexName
());
indicatorData
.
setEquipmentSpecificName
(
equipmentSpeIndex
.
getEquipmentSpecificName
());
indicatorData
.
setUnit
(
equipmentSpeIndex
.
getUnitName
());
indicatorData
.
setEquipmentsIdx
(
key
);
indicatorData
.
setValueLabel
(
valueLabel
.
isEmpty
()
?
indicatorData
.
getValue
()
:
valueLabel
);
if
(!
Arrays
.
asList
(
TRUE
,
FALSE
).
contains
(
indicatorData
.
getValue
()))
{
indicatorData
.
setValueF
(
Float
.
parseFloat
(
indicatorData
.
getValue
()));
}
}
else
{
return
null
;
}
}
catch
(
Exception
e
)
{
log
.
error
(
"mqttMessage2RawMessage解析消息数据异常"
,
e
);
}
return
indicatorData
;
}
private
static
String
valueTranslate
(
String
value
,
String
enumStr
)
{
if
(
ObjectUtils
.
isEmpty
(
enumStr
))
{
return
""
;
}
try
{
JSONArray
jsonArray
=
JSONArray
.
parseArray
(
enumStr
);
for
(
int
i
=
0
;
i
<
jsonArray
.
size
();
i
++)
{
JSONObject
jsonObject
=
jsonArray
.
getJSONObject
(
i
);
if
(
jsonObject
.
get
(
"key"
).
equals
(
value
))
{
return
jsonObject
.
getString
(
"label"
);
}
}
}
catch
(
Exception
e
)
{
log
.
error
(
"告警枚举转换异常"
+
e
.
getMessage
(),
e
);
}
return
""
;
}
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/mqtt/message/MqttConstant.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
mqtt
.
message
;
import
com.google.common.collect.ImmutableMap
;
import
java.util.Map
;
/**
* @author LiuLin
* @date 2023年08月02日 11:02
*/
public
interface
MqttConstant
{
int
DEFAULT_CONNECTION_TIMEOUT
=
5000
;
long
DEFAULT_COMPLETION_TIMEOUT
=
5000
;
int
QOS_DEFAULT
=
1
;
String
TOPIC_SPLITTER
=
"/"
;
String
TRUE
=
"true"
;
String
FALSE
=
"false"
;
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/mqtt/message/MqttTopicEnum.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
mqtt
.
message
;
/**
* @author LiuLin
* @date 2023年07月13日 09:54
*/
public
enum
MqttTopicEnum
{
perspective
,
//iot/data/perspective
;
public
static
MqttTopicEnum
of
(
String
name
)
{
for
(
MqttTopicEnum
topic
:
values
())
{
if
(
topic
.
name
().
equals
(
name
))
{
return
topic
;
}
}
return
null
;
}
}
\ No newline at end of file
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/service/HandleESMessage2TDService.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
service
;
import
com.alibaba.fastjson.JSON
;
import
com.baomidou.mybatisplus.core.toolkit.CollectionUtils
;
import
com.google.common.collect.Lists
;
import
com.yeejoin.equip.entity.ESEquipments
;
import
com.yeejoin.equip.mapper.tdengine.ESEquipmentsMapper
;
import
com.yeejoin.equip.utils.ElasticSearchUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.scheduling.annotation.Scheduled
;
import
org.springframework.stereotype.Component
;
import
org.springframework.transaction.annotation.Transactional
;
import
java.util.List
;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote ES数据十分钟存入TDEngine
*/
@Slf4j
@Component
public
class
HandleESMessage2TDService
{
public
static
final
String
ES_INDEX
=
"jxiop_equipments"
;
public
static
final
int
SIZE
=
1000
;
@Autowired
private
ESEquipmentsMapper
esEquipmentsMapper
;
@Autowired
private
ElasticSearchUtil
elasticSearchUtil
;
/**
* 十分钟拉取ES数据存入TdEngine
*/
@Scheduled
(
cron
=
"0 */10 * * * ?"
)
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
void
syncEsData2TDEngine
()
throws
Exception
{
List
<
ESEquipments
>
result
=
elasticSearchUtil
.
searchResponse
(
ES_INDEX
,
null
,
hit
->
JSON
.
parseObject
(
hit
.
getSourceAsString
(),
ESEquipments
.
class
));
List
<
List
<
ESEquipments
>>
allDataList
=
Lists
.
partition
(
result
,
SIZE
);
for
(
List
<
ESEquipments
>
tempDataList
:
allDataList
)
{
if
(
CollectionUtils
.
isNotEmpty
(
tempDataList
))
{
esEquipmentsMapper
.
batchInsert
(
tempDataList
);
}
}
}
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/service/HandleMessageService.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
service
;
import
com.alibaba.fastjson.JSON
;
import
com.yeejoin.amos.component.influxdb.InfluxDbConnection
;
import
com.yeejoin.equip.entity.IndicatorData
;
import
com.yeejoin.equip.kafka.KafkaProducerService
;
import
com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper
;
import
com.yeejoin.equip.utils.ElasticSearchUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Component
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @author LiuLin
* @date 2023/6/25
* @apiNote Emq消息转发Kafka
*/
@Slf4j
@Component
(
"handleMessageService"
)
public
class
HandleMessageService
{
private
static
final
String
MEASUREMENT
=
"iot_data_"
;
private
static
final
String
ES_INDEX_NAME_JX
=
"jxiop_equipments"
;
@Autowired
protected
KafkaProducerService
kafkaProducerService
;
@Autowired
private
InfluxDbConnection
influxDbConnection
;
@Autowired
private
IndicatorDataMapper
indicatorDataMapper
;
@Value
(
"${kafka.alarm.topic}"
)
private
String
alarmTopic
;
@Autowired
private
ElasticSearchUtil
elasticSearchUtil
;
public
void
processMessage
(
IndicatorData
indicatorData
)
{
try
{
SimpleDateFormat
simpleDateFormat
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
Map
<
String
,
String
>
tagsMap
=
new
HashMap
<>();
Map
<
String
,
Object
>
fieldsMap
=
new
HashMap
<>();
tagsMap
.
put
(
"equipmentsIdx"
,
indicatorData
.
getEquipmentsIdx
());
fieldsMap
.
put
(
"address"
,
indicatorData
.
getAddress
());
fieldsMap
.
put
(
"gatewayId"
,
indicatorData
.
getGatewayId
());
fieldsMap
.
put
(
"dataType"
,
indicatorData
.
getDataType
());
fieldsMap
.
put
(
"isAlarm"
,
indicatorData
.
getIsAlarm
());
fieldsMap
.
put
(
"equipmentSpecificName"
,
indicatorData
.
getEquipmentSpecificName
());
fieldsMap
.
put
(
"value"
,
indicatorData
.
getValue
());
fieldsMap
.
put
(
"valueLabel"
,
indicatorData
.
getValueLabel
().
isEmpty
()
?
indicatorData
.
getValue
()
:
indicatorData
.
getValueLabel
());
fieldsMap
.
put
(
"equipmentIndexName"
,
indicatorData
.
getEquipmentIndexName
());
fieldsMap
.
put
(
"unit"
,
indicatorData
.
getUnit
());
fieldsMap
.
put
(
"createdTime"
,
simpleDateFormat
.
format
(
new
Date
()));
if
(
"transformation"
.
equals
(
indicatorData
.
getSignalType
()))
{
influxDbConnection
.
insert
(
MEASUREMENT
+
indicatorData
.
getGatewayId
(),
tagsMap
,
fieldsMap
);
indicatorDataMapper
.
insert
(
indicatorData
);
}
//更新数据入ES库
Map
<
String
,
Object
>
paramJson
=
new
HashMap
<>();
paramJson
.
put
(
"valueF"
,
indicatorData
.
getValueF
());
paramJson
.
put
(
"value"
,
indicatorData
.
getValue
());
paramJson
.
put
(
"valueLabel"
,
indicatorData
.
getValueLabel
().
isEmpty
()
?
indicatorData
.
getValue
()
:
indicatorData
.
getValueLabel
());
paramJson
.
put
(
"createdTime"
,
new
Date
());
paramJson
.
put
(
"unit"
,
indicatorData
.
getUnit
());
elasticSearchUtil
.
updateData
(
ES_INDEX_NAME_JX
,
indicatorData
.
getEquipmentsIdx
(),
JSON
.
toJSONString
(
paramJson
));
if
(
indicatorData
.
getIsAlarm
()
!=
null
&&
"1"
.
equals
(
indicatorData
.
getIsAlarm
()))
{
fieldsMap
.
putAll
(
tagsMap
);
kafkaProducerService
.
sendMessageAsync
(
alarmTopic
,
JSON
.
toJSONString
(
fieldsMap
));
}
}
catch
(
Exception
e
)
{
log
.
error
(
"Iot透传消息解析入库失败"
+
e
.
getMessage
(),
e
);
}
}
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/service/
impl/KafkaMessageServiceImpl
.java
→
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/service/
InitTDEngineDbService
.java
View file @
820d798e
package
com
.
yeejoin
.
equip
.
service
.
impl
;
package
com
.
yeejoin
.
equip
.
service
;
import
com.yeejoin.equip.mapper.tdengine.ESEquipmentsMapper
;
import
com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper
;
import
com.yeejoin.equip.service.KafkaMessageService
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
import
java.util.Map
;
/**
* @author LiuLin
* @date 2023年07月12日 10:44
...
...
@@ -16,13 +14,17 @@ import java.util.Map;
@Slf4j
@Service
@Transactional
(
transactionManager
=
"tdEngineTransactionManager"
)
public
class
KafkaMessageServiceImpl
implements
KafkaMessage
Service
{
public
class
InitTDEngineDb
Service
{
@Autowired
private
IndicatorDataMapper
indicatorDataMapper
;
@Autowired
private
ESEquipmentsMapper
esEquipmentsMapper
;
public
void
init
()
{
indicatorDataMapper
.
createDB
();
indicatorDataMapper
.
createTable
();
esEquipmentsMapper
.
createTable
();
}
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/service/KafkaMessageService.java
deleted
100644 → 0
View file @
190fcbc8
package
com
.
yeejoin
.
equip
.
service
;
/**
* @author LiuLin
* @date 2023年07月12日 10:44
*/
public
interface
KafkaMessageService
{
void
init
();
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/utils/ElasticSearchClient.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
utils
;
import
com.yeejoin.equip.config.ElasticSearchConfig
;
import
lombok.extern.slf4j.Slf4j
;
import
org.elasticsearch.client.RestHighLevelClient
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.elasticsearch.client.ClientConfiguration
;
import
org.springframework.data.elasticsearch.client.RestClients
;
import
java.time.Duration
;
import
java.util.Objects
;
/**
* ElasticSearch 配置
* @author LiuLin
* @date 2023年08月08日 16:30
*/
@Slf4j
@Configuration
public
class
ElasticSearchClient
{
@Autowired
(
required
=
false
)
private
ElasticSearchConfig
elasticSearchConfig
;
@Bean
public
RestHighLevelClient
client
()
{
ClientConfiguration
clientConfiguration
=
null
;
try
{
clientConfiguration
=
ClientConfiguration
.
builder
()
.
connectedTo
(
elasticSearchConfig
.
getAddress
())
.
withConnectTimeout
(
Duration
.
ofSeconds
(
5
))
.
withSocketTimeout
(
Duration
.
ofSeconds
(
3
))
.
withSocketTimeout
(
elasticSearchConfig
.
getSocketTimeout
())
.
withConnectTimeout
(
elasticSearchConfig
.
getConnectTimeout
())
.
withBasicAuth
(
elasticSearchConfig
.
getUsername
(),
elasticSearchConfig
.
getPassword
())
.
build
();
}
catch
(
Exception
e
){
log
.
error
(
"连接ES异常"
+
e
.
getMessage
(),
e
);
}
return
RestClients
.
create
(
Objects
.
requireNonNull
(
clientConfiguration
)).
rest
();
}
}
\ No newline at end of file
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/utils/ElasticSearchConfig.java
deleted
100644 → 0
View file @
190fcbc8
package
com
.
yeejoin
.
equip
.
utils
;
import
org.elasticsearch.client.RestHighLevelClient
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.elasticsearch.client.ClientConfiguration
;
import
org.springframework.data.elasticsearch.client.RestClients
;
import
java.time.Duration
;
/**
* \* Created with IntelliJ IDEA.
* \* User: 煦仔
* \* Date: 2020-12-22
* \* Time: 9:40
* \* To change this template use File | Settings | File Templates.
* \* Description:
* \
*/
@Configuration
public
class
ElasticSearchConfig
{
@Autowired
(
required
=
false
)
private
ElasticSearchRuntimeEnvironment
esRuntimeEnvironment
;
//@Bean
////当前es相关的配置存在则实例化RestHighLevelClient,如果不存在则不实例化RestHighLevelClient
//@ConditionalOnBean(value = ElasticSearchRuntimeEnvironment.class)
//public RestHighLevelClient restHighLevelClient(){
//
// //es地址,以逗号分隔
// String nodes = esRuntimeEnvironment.getAddress();
// nodes = nodes.contains("http://") ? nodes.replace("http://","") : nodes;
// //es密码
// String password = esRuntimeEnvironment.getPassword();
// String scheme = esRuntimeEnvironment.getScheme();
// List<HttpHost> httpHostList = new ArrayList<>();
// //拆分es地址
// for(String address : nodes.split(",")){
// int index = address.lastIndexOf(":");
// httpHostList.add(new HttpHost(address.substring(0, index),Integer.parseInt(address.substring(index + 1)),scheme));
// }
// //转换成 HttpHost 数组
// HttpHost[] httpHosts = httpHostList.toArray(new HttpHost[0]);
//
// //构建连接对象
// RestClientBuilder builder = RestClient.builder(httpHosts);
//
// //使用账号密码连接
// if ( StringUtils.isNotEmpty(password)){
// String username = esRuntimeEnvironment.getUsername();
// CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(username,password));
// builder.setHttpClientConfigCallback(f->f.setDefaultCredentialsProvider(credentialsProvider));
// }
//
// // 异步连接延时配置
// builder.setRequestConfigCallback(requestConfigBuilder -> {
// requestConfigBuilder.setConnectTimeout(esRuntimeEnvironment.getConnectTimeout());
// requestConfigBuilder.setSocketTimeout(esRuntimeEnvironment.getSocketTimeout());
// requestConfigBuilder.setConnectionRequestTimeout(esRuntimeEnvironment.getConnectionRequestTimeout());
// return requestConfigBuilder;
// });
//
// // 异步连接数配置
// builder.setHttpClientConfigCallback(httpClientBuilder -> {
// httpClientBuilder.setMaxConnTotal(esRuntimeEnvironment.getMaxConnectNum());
// httpClientBuilder.setMaxConnPerRoute(esRuntimeEnvironment.getMaxConnectPerRoute());
// return httpClientBuilder;
// });
//
// return new RestHighLevelClient(builder);
//}
@Bean
public
RestHighLevelClient
client
()
{
ClientConfiguration
clientConfiguration
=
null
;
try
{
clientConfiguration
=
ClientConfiguration
.
builder
()
.
connectedTo
(
esRuntimeEnvironment
.
getAddress
())
.
withConnectTimeout
(
Duration
.
ofSeconds
(
5
))
.
withSocketTimeout
(
Duration
.
ofSeconds
(
3
))
.
withSocketTimeout
(
esRuntimeEnvironment
.
getSocketTimeout
())
.
withConnectTimeout
(
esRuntimeEnvironment
.
getConnectTimeout
())
.
withBasicAuth
(
esRuntimeEnvironment
.
getUsername
(),
esRuntimeEnvironment
.
getPassword
())
.
build
();
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
assert
clientConfiguration
!=
null
;
return
RestClients
.
create
(
clientConfiguration
).
rest
();
}
}
\ No newline at end of file
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/utils/ElasticSearchUtil.java
View file @
820d798e
package
com
.
yeejoin
.
equip
.
utils
;
import
com.alibaba.fastjson.JSON
;
import
com.yeejoin.equip.entity.ESEquipments
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.ArrayUtils
;
import
org.elasticsearch.action.search.ClearScrollRequest
;
import
org.elasticsearch.action.search.SearchRequest
;
import
org.elasticsearch.action.search.SearchResponse
;
import
org.elasticsearch.action.search.SearchScrollRequest
;
import
org.elasticsearch.action.support.WriteRequest
;
import
org.elasticsearch.action.update.UpdateRequest
;
import
org.elasticsearch.client.RequestOptions
;
import
org.elasticsearch.client.RestHighLevelClient
;
import
org.elasticsearch.common.xcontent.XContentType
;
import
org.elasticsearch.core.TimeValue
;
import
org.elasticsearch.index.query.QueryBuilder
;
import
org.elasticsearch.search.Scroll
;
import
org.elasticsearch.search.SearchHit
;
import
org.elasticsearch.search.builder.SearchSourceBuilder
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.function.Function
;
/**
* @author LiuLin
...
...
@@ -17,6 +32,8 @@ import java.io.IOException;
@Slf4j
@Component
public
class
ElasticSearchUtil
{
private
static
final
long
SCROLL_TIMEOUT
=
180000
;
private
static
final
int
SIZE
=
1000
;
@Autowired
private
RestHighLevelClient
restHighLevelClient
;
...
...
@@ -40,4 +57,54 @@ public class ElasticSearchUtil {
log
.
error
(
"索引:[{}],主键:【{}】,更新异常:[{}]"
,
indexName
,
id
,
e
.
getMessage
());
}
}
/**
* 构建SearchResponse
* @param indices 索引
* @param query queryBuilder
* @param fun 返回函数
* @param <T> 返回类型
* @return List, 可以使用fun转换为T结果
* @throws Exception e
*/
public
<
T
>
List
<
T
>
searchResponse
(
String
indices
,
QueryBuilder
query
,
Function
<
SearchHit
,
T
>
fun
)
throws
Exception
{
SearchRequest
request
=
new
SearchRequest
(
indices
);
Scroll
scroll
=
new
Scroll
(
TimeValue
.
timeValueMillis
(
SCROLL_TIMEOUT
));
SearchSourceBuilder
sourceBuilder
=
new
SearchSourceBuilder
();
sourceBuilder
.
query
(
query
);
sourceBuilder
.
size
(
SIZE
);
request
.
scroll
(
scroll
);
request
.
source
(
sourceBuilder
);
List
<
String
>
scrollIdList
=
new
ArrayList
<>();
List
<
T
>
result
=
new
ArrayList
<>();
SearchResponse
searchResponse
=
restHighLevelClient
.
search
(
request
,
RequestOptions
.
DEFAULT
);
String
scrollId
=
searchResponse
.
getScrollId
();
SearchHit
[]
hits
=
searchResponse
.
getHits
().
getHits
();
scrollIdList
.
add
(
scrollId
);
try
{
while
(
ArrayUtils
.
isNotEmpty
(
hits
))
{
for
(
SearchHit
hit
:
hits
)
{
result
.
add
(
fun
.
apply
(
hit
));
}
if
(
hits
.
length
<
SIZE
)
{
break
;
}
SearchScrollRequest
searchScrollRequest
=
new
SearchScrollRequest
(
scrollId
);
searchScrollRequest
.
scroll
(
scroll
);
SearchResponse
searchScrollResponse
=
restHighLevelClient
.
scroll
(
searchScrollRequest
,
RequestOptions
.
DEFAULT
);
scrollId
=
searchScrollResponse
.
getScrollId
();
hits
=
searchScrollResponse
.
getHits
().
getHits
();
scrollIdList
.
add
(
scrollId
);
}
}
finally
{
ClearScrollRequest
clearScrollRequest
=
new
ClearScrollRequest
();
clearScrollRequest
.
setScrollIds
(
scrollIdList
);
restHighLevelClient
.
clearScroll
(
clearScrollRequest
,
RequestOptions
.
DEFAULT
);
}
return
result
;
}
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/utils/ExecutorFactory.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
utils
;
import
com.google.common.util.concurrent.ThreadFactoryBuilder
;
import
java.util.concurrent.Executor
;
import
java.util.concurrent.LinkedBlockingDeque
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
/**
* @author LiuLin
* @date 2023年07月12日 18:11
*/
public
class
ExecutorFactory
{
private
static
final
Integer
DEFAULT_THREAD_NUM
=
Runtime
.
getRuntime
().
availableProcessors
();
private
static
final
Integer
THREAD_NUM_BIZ
=
DEFAULT_THREAD_NUM
*
2
;
public
static
Executor
buildBizExecutor
()
{
return
new
ThreadPoolExecutor
(
THREAD_NUM_BIZ
,
THREAD_NUM_BIZ
*
2
,
0L
,
TimeUnit
.
MILLISECONDS
,
new
LinkedBlockingDeque
<>(
1024
),
new
ThreadFactoryBuilder
().
setNameFormat
(
"mqtt-pool-%d"
).
build
()
);
}
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/utils/RedisKey.java
deleted
100644 → 0
View file @
190fcbc8
package
com
.
yeejoin
.
equip
.
utils
;
/**
* @description:
* @author: tw
* @createDate: 2021/6/22
* redis key
*/
public
class
RedisKey
{
/**
* 装备指标Key值
*/
public
static
final
String
EQUIP_INDEX_ADDRESS
=
"equip_index_address"
;
public
static
final
String
EQUIP_INDEX_ADDRESS_KEY
=
"equip_index_address_key"
;
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/utils/RedisUtils.java
View file @
820d798e
...
...
@@ -3,54 +3,21 @@ package com.yeejoin.equip.utils;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Component
;
import
org.springframework.util.CollectionUtils
;
import
java.util.Map
;
/**
* @author
DELL
* @author
LiuLin
*/
@Component
public
class
RedisUtils
{
@Autowired
private
RedisTemplate
redisTemplate
;
/**
*
获取hashKey对应的所有键值
*
普通缓存获取
*
* @param key 键
* @return 对应的多个键值
*/
public
Map
<
Object
,
Object
>
hmget
(
String
key
)
{
return
redisTemplate
.
opsForHash
().
entries
(
key
);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public
boolean
hmset
(
String
key
,
Map
<
String
,
Object
>
map
)
{
redisTemplate
.
opsForHash
().
putAll
(
key
,
map
);
return
true
;
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
* @return 值
*/
@SuppressWarnings
(
"unchecked"
)
public
void
del
(
String
...
key
)
{
if
(
key
!=
null
&&
key
.
length
>
0
)
{
if
(
key
.
length
==
1
)
{
redisTemplate
.
delete
(
key
[
0
]);
}
else
{
redisTemplate
.
delete
(
CollectionUtils
.
arrayToList
(
key
));
}
}
public
Object
get
(
String
key
)
{
return
redisTemplate
.
opsForValue
().
get
(
key
);
}
}
amos-boot-data/amos-boot-data-equip/src/main/java/com/yeejoin/equip/utils/SpringUtils.java
0 → 100644
View file @
820d798e
package
com
.
yeejoin
.
equip
.
utils
;
import
org.springframework.beans.BeansException
;
import
org.springframework.context.ApplicationContext
;
import
org.springframework.context.ApplicationContextAware
;
import
org.springframework.stereotype.Component
;
import
java.lang.annotation.Annotation
;
import
java.util.Map
;
/**
* @description: spring容器操作工具类
* @author: duanwei
* @create: 2020-05-28 13:57
**/
@Component
public
class
SpringUtils
implements
ApplicationContextAware
{
private
static
ApplicationContext
applicationContext
;
/**
* 利用aware注入application
*
* @param applicationContext
* @throws BeansException
*/
@Override
public
void
setApplicationContext
(
ApplicationContext
applicationContext
)
throws
BeansException
{
SpringUtils
.
applicationContext
=
applicationContext
;
}
private
static
ApplicationContext
getApplicationContext
()
{
return
applicationContext
;
}
/**
* 通过name获取bean
*
* @param name
* @return
*/
public
static
Object
getBean
(
String
name
)
{
return
getApplicationContext
().
getBean
(
name
);
}
}
amos-boot-data/amos-boot-data-equip/src/main/resources/application-dev.properties
View file @
820d798e
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#spring.datasource.url = jdbc:mysql://139.9.173.44:3306/equipment?useUnicode=true&allowMultiQueries=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
#spring.datasource.username=root
#spring.datasource.password=Yeejoin@2020
#spring.datasource.type=com.zaxxer.hikari.HikariDataSource
#spring.datasource.hikari.pool-name=DatebookHikariCP
#spring.datasource.hikari.minimum-idle= 3
#spring.datasource.hikari.maximum-pool-size= 30
#spring.datasource.hikari.auto-commit= true
#spring.datasource.hikari.idle-timeout= 500000
#spring.datasource.hikari.max-lifetime= 1800000
#spring.datasource.hikari.connection-timeout= 60000
#spring.datasource.hikari.connection-test-query= SELECT 1
#mysql ???
spring.datasource.mysql-server.driver-class-name
=
com.mysql.cj.jdbc.Driver
spring.datasource.mysql-server.jdbc-url
=
jdbc:mysql://139.9.173.44:3306/equipment?useUnicode=true&allowMultiQueries=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
spring.datasource.mysql-server.username
=
root
spring.datasource.mysql-server.password
=
Yeejoin@2020
spring.datasource.mysql-server.type
=
com.zaxxer.hikari.HikariDataSource
spring.datasource.mysql-server.hikari.pool-name
=
DatebookHikariCP
spring.datasource.mysql-server.hikari.minimum-idle
=
3
spring.datasource.mysql-server.hikari.maximum-pool-size
=
30
spring.datasource.mysql-server.hikari.auto-commit
=
true
spring.datasource.mysql-server.hikari.idle-timeout
=
500000
spring.datasource.mysql-server.hikari.max-lifetime
=
1800000
spring.datasource.mysql-server.hikari.connection-timeout
=
60000
spring.datasource.mysql-server.hikari.connection-test-query
=
SELECT 1
#TDengine ???
spring.datasource.tdengine-server.driver-class-name
=
com.taosdata.jdbc.rs.RestfulDriver
spring.datasource.tdengine-server.jdbc-url
=
jdbc:TAOS-RS://139.9.170.47:6041/iot_data?user=root&password=taosdata&timezone=GMT%2b8&allowMultiQueries=true
spring.datasource.tdengine-server.username
=
root
spring.datasource.tdengine-server.password
=
taosdata
spring.redis.database
=
1
spring.redis.host
=
172.16.11.201
...
...
@@ -39,9 +45,13 @@ spring.security.user.password=a1234560
## emqx
emqx.clean-session
=
true
emqx.client-id
=
${spring.application.name}-${random.int[1024,65536]}
emqx.biz-client-id
=
consumer-${random.int[1024,65536]}
emqx.broker
=
tcp://172.16.3.157:1883
emqx.user-name
=
admin
emqx.password
=
public
emqx.client-user-name
=
admin
emqx.client-password
=
public
emqx.max-inflight
=
100
emqx.keep-alive-interval
=
100
emqx.biz-topic[0]=
iot/data/perspective
# influxDB
spring.influx.url
=
http://172.16.11.201:8086
...
...
@@ -73,8 +83,6 @@ spring.kafka.consumer.max-poll-records=1000
spring.kafka.listener.ack-mode
=
manual_immediate
spring.kafka.listener.type
=
batch
kafka.topic
=
PERSPECTIVE
emq.topic
=
iot/data/perspective
kafka.alarm.topic
=
EQUIPMENT_ALARM
elasticsearch.address
=
139.9.173.44:9200
...
...
amos-boot-data/amos-boot-data-equip/src/main/resources/mapper/tdengine/ESEquipmentsMapper.xml
0 → 100644
View file @
820d798e
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper
namespace=
"com.yeejoin.equip.mapper.tdengine.ESEquipmentsMapper"
>
<insert
id=
"batchInsert"
parameterType=
"java.util.List"
>
insert into indicator_data
(createdTime,
id,
address,
gateway_id,
data_type,
is_alarm,
unit,
equipment_index_name,
equipment_specific_name,
`value`,
`value_f` ,
value_label,
equipment_number,
display_name)
values
<foreach
separator=
" "
collection=
"list"
item=
"equip"
index=
"index"
>
(now + #{index}a,
#{equip.id},
#{equip.address},
#{equip.gatewayId},
#{equip.dataType},
#{equip.isAlarm},
#{equip.unit},
#{equip.equipmentIndexName},
#{equip.equipmentSpecificName},
#{equip.value},
#{equip.valueF},
#{equip.valueLabel},
#{equip.equipmentNumber},
#{equip.displayName})
</foreach>
</insert>
<insert
id=
"insert"
parameterType=
"com.yeejoin.equip.entity.ESEquipments"
>
insert into indicator_data
(createdTime,
id,
address,
gateway_id,
data_type,
is_alarm,
unit,
equipment_index_name,
equipment_specific_name,
`value`,
`value_f` ,
value_label,
equipment_number,
display_name)
values
(NOW,
#{id},
#{address},
#{gatewayId},
#{dataType},
#{isAlarm},
#{unit},
#{equipmentIndexName},
#{equipmentSpecificName},
#{value},
#{valueF},
#{valueLabel},
#{equipmentNumber},
#{displayName})
</insert>
<!--创建表-->
<update
id=
"createTable"
>
create table if not exists indicator_data
(createdTime timestamp,
id binary(64),
address binary(64),
gateway_id binary(64),
data_type NCHAR(12),
is_alarm BIGINT,
unit NCHAR(24),
equipment_index_name VARCHAR(255) ,
equipment_specific_name VARCHAR(255),
`value` VARCHAR(12),
`value_f` float,
value_label VARCHAR(64),
equipment_number binary(64),
display_name VARCHAR(200));
</update>
</mapper>
\ No newline at end of file
amos-boot-data/amos-boot-data-equip/src/main/resources/mapper/tdengine/IndicatorDataMapper.xml
View file @
820d798e
...
...
@@ -2,30 +2,41 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper
namespace=
"com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper"
>
<insert
id=
"insert"
parameterType=
"com.yeejoin.equip.entity.IndicatorData"
>
insert into test.indicator_data_1
(address, gateway_Id, equipments_idx, data_type,is_alarm,equipment_specific_name,equipment_index_name,
value_label,value1,unit,created_time)
values
(#{address,jdbcType=VARCHAR},
#{gatewayId,jdbcType=VARCHAR},
#{equipmentsIdx,jdbcType=VARCHAR},
#{dataType,jdbcType=VARCHAR},
#{isAlarm,jdbcType=VARCHAR},
#{equipmentSpecificName,jdbcType=VARCHAR},
#{equipmentIndexName,jdbcType=VARCHAR},
#{valueLabel,jdbcType=VARCHAR},
#{value,jdbcType=VARCHAR},
#{unit,jdbcType=VARCHAR},
now)
</insert>
<!--创建数据库,指定压缩比-->
<update
id=
"createDB"
>
create database if not exists
test keep 730
;
create database if not exists
iot_data vgroups 10 buffer 10 COMP 2 PRECISION 'ns'
;
</update>
<!--创建超级表-->
<update
id=
"createTable"
>
create table if not exists test.indicator_data(created_time timestamp, address NCHAR(64),gateway_id NCHAR(64),equipments_idx NCHAR(64), data_type NCHAR(12),is_alarm BIGINT,
value_label VARCHAR(24), unit NCHAR(12),equipment_index_name VARCHAR(200) ,equipment_specific_name VARCHAR(200),valueE VARCHAR(12));
CREATE STABLE if not exists indicator
(created_time timestamp,
`value` VARCHAR(12),
`value_f` float,
value_label VARCHAR(24),
unit NCHAR(12))
TAGS (address binary(64),
gateway_id binary(64),
equipments_idx NCHAR(64),
data_type NCHAR(12),
is_alarm BIGINT,
equipment_index_name VARCHAR(200) ,
equipment_specific_name VARCHAR(200));
</update>
<insert
id=
"insert"
parameterType=
"com.yeejoin.equip.entity.IndicatorData"
>
insert into indicator_#{gatewayId,jdbcType=VARCHAR} USING indicator
TAGS (#{address,jdbcType=VARCHAR},
#{gatewayId,jdbcType=VARCHAR},
#{equipmentsIdx,jdbcType=VARCHAR},
#{dataType,jdbcType=VARCHAR},
#{isAlarm,jdbcType=VARCHAR},
#{equipmentSpecificName,jdbcType=VARCHAR},
#{equipmentIndexName,jdbcType=VARCHAR})
VALUES (NOW,
#{value,jdbcType=VARCHAR},
#{valueF,jdbcType=FLOAT},
#{valueLabel,jdbcType=VARCHAR},
#{unit,jdbcType=VARCHAR})
</insert>
</mapper>
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment