Commit 1de73fd0 authored by 刘林's avatar 刘林

fix(equip):优化对接IOT代码,优化esclient

parent f0a9df5e
......@@ -2,14 +2,17 @@ package com.yeejoin.equip.utils;
import com.yeejoin.equip.config.ElasticSearchConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import java.time.Duration;
import java.util.Objects;
/**
* ElasticSearch 配置
......@@ -23,21 +26,39 @@ public class ElasticSearchClient {
@Autowired(required = false)
private ElasticSearchConfig elasticSearchConfig;
@Bean
public RestHighLevelClient client() {
ClientConfiguration clientConfiguration = null;
try {
clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchConfig.getAddress())
.withConnectTimeout(Duration.ofSeconds(5))
.withSocketTimeout(Duration.ofSeconds(3))
.withSocketTimeout(elasticSearchConfig.getSocketTimeout())
.withConnectTimeout(elasticSearchConfig.getConnectTimeout())
.withBasicAuth(elasticSearchConfig.getUsername(), elasticSearchConfig.getPassword())
.build();
} catch (Exception e) {
log.error("连接ES异常" + e.getMessage(), e);
}
return RestClients.create(Objects.requireNonNull(clientConfiguration)).rest();
/**
* 如果@Bean没有指定bean的名称,那么这个bean的名称就是方法名
*/
@Bean(name = "restHighLevelClient")
public RestHighLevelClient restHighLevelClient() {
// 此处为单节点es
String host = elasticSearchConfig.getAddress().split(":")[0];
String port = elasticSearchConfig.getAddress().split(":")[1];
HttpHost httpHost = new HttpHost(host, Integer.parseInt(port));
// 构建连接对象
RestClientBuilder builder = RestClient.builder(httpHost);
// 设置用户名、密码
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticSearchConfig.getUsername(), elasticSearchConfig.getPassword()));
// 连接延时配置
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(elasticSearchConfig.getConnectTimeout());
requestConfigBuilder.setSocketTimeout(elasticSearchConfig.getSocketTimeout());
requestConfigBuilder.setConnectionRequestTimeout(elasticSearchConfig.getConnectionRequestTimeout());
return requestConfigBuilder;
});
// 连接数配置
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(elasticSearchConfig.getMaxConnectNum());
httpClientBuilder.setMaxConnPerRoute(elasticSearchConfig.getMaxConnectPerRoute());
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
});
return new RestHighLevelClient(builder);
}
}
\ No newline at end of file
......@@ -2,6 +2,7 @@ package com.yeejoin.equip.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
......@@ -70,13 +71,39 @@ public class ElasticSearchUtil {
return true;
}
} catch (IOException e) {
log.error("索引:[{}],主键:【{}】,更新异常:[{}]", indexName, id, e);
log.error("索引:[{}],主键:【{}】", indexName, id, e);
return false;
}
return false;
}
/**
* ES异步修改数据
*
* @param indexName 索引名称
* @param id 主键
* @param paramJson 参数JSON
*/
public void updateDataAsync(String indexName, String id, String paramJson) throws IOException {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
updateRequest.docAsUpsert(true);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson, XContentType.JSON);
restHighLevelClient.updateAsync(updateRequest, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
}
}
@Override
public void onFailure(Exception e) {
log.error("索引:[{}],主键:【{}】", indexName, id, e);
}
});
}
/**
* 构建SearchResponse
* @param indices 索引
* @param query queryBuilder
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment