Commit f2ec1c19 authored by LiuLin's avatar LiuLin

fix(equip):装备服务修改ES,java.net.SocketTimeoutException: 50,000 milliseconds timeout…

fix(equip):装备服务修改ES,java.net.SocketTimeoutException: 50,000 milliseconds timeout on connection http-outgoing-518 [ACTIVE]
parent 1337a189
......@@ -36,19 +36,19 @@ import java.util.stream.StreamSupport;
public class KafkaConsumerWorker implements CommandLineRunner {
final private static AtomicLong sendThreadPoolCounter = new AtomicLong(0);
final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(100 + Runtime.getRuntime().availableProcessors(),
Executors.newFixedThreadPool(200 + Runtime.getRuntime().availableProcessors(),
createThreadFactory());
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = 6 * CPU_COUNT;
private static final int MAX_POOL_SIZE = 6 * CPU_COUNT + 2;
private static final ThreadPoolExecutor exec = new ThreadPoolExecutor(
/* private static final ThreadPoolExecutor exec = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000)
);
);*/
private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
private static final String TRANSFORMATION = "transformation";
......@@ -169,7 +169,7 @@ public class KafkaConsumerWorker implements CommandLineRunner {
public void run() {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
exec.submit(() -> {
pooledExecutor.submit(() -> {
processRecord(records);
});
kafkaConsumer.commitSync();
......
package com.yeejoin.equip.utils;
import org.apache.http.HttpResponse;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import java.util.Arrays;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
public class CustomConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {
private final long DEFAULT_SECONDS = 30;
public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
return Arrays.stream(response.getHeaders(HTTP.CONN_KEEP_ALIVE))
.filter(h -> StringUtils.equalsIgnoreCase(h.getName(), "timeout")
&& StringUtils.isNumeric(h.getValue()))
.findFirst()
.map(h -> NumberUtils.toLong(h.getValue(), DEFAULT_SECONDS))
.orElse(DEFAULT_SECONDS) * 1000;
}
}
\ No newline at end of file
......@@ -56,6 +56,8 @@ public class ElasticSearchClient {
httpClientBuilder.setMaxConnTotal(elasticSearchConfig.getMaxConnectNum());
httpClientBuilder.setMaxConnPerRoute(elasticSearchConfig.getMaxConnectPerRoute());
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//调整Keep-Alive策略
httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);
return httpClientBuilder;
});
......
//package com.yeejoin.equip.utils;
//
//import com.yeejoin.equip.config.ElasticSearchConfig;
//import lombok.RequiredArgsConstructor;
//import org.apache.http.HttpHost;
//import org.apache.http.auth.AuthScope;
//import org.apache.http.auth.UsernamePasswordCredentials;
//import org.apache.http.client.CredentialsProvider;
//import org.apache.http.impl.client.BasicCredentialsProvider;
//import org.elasticsearch.client.RestClient;
//import org.elasticsearch.client.RestClientBuilder;
//import org.elasticsearch.client.RestHighLevelClient;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
//import org.springframework.boot.context.properties.EnableConfigurationProperties;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.util.Assert;
//import org.springframework.util.StringUtils;
//import java.util.ArrayList;
//import java.util.List;
//
///**
// * Elasticsearch自动配置
// *
// * @author LiuLin
// * @version v1.0
// * @since 2024-01-15
// */
//@Configuration
//@RequiredArgsConstructor(onConstructor_ = @Autowired)
//@EnableConfigurationProperties(ElasticSearchConfig.class)
//public class ElasticsearchAutoConfiguration {
//
// private final ElasticSearchConfig elasticSearchConfig;
// private final List<HttpHost> httpHosts = new ArrayList<>();
//
// @Bean
// @ConditionalOnMissingBean
// public RestHighLevelClient restHighLevelClient() {
//
// List<String> clusterNodes = elasticSearchConfig.getClusterNodes();
// clusterNodes.forEach(node -> {
// try {
// String[] parts = StringUtils.split(node, ":");
// Assert.notNull(parts, "Must defined");
// Assert.state(parts.length == 2, "Must be defined as 'host:port'");
// httpHosts.add(new HttpHost(parts[0], Integer.parseInt(parts[1]), elasticSearchConfig.getSchema()));
// } catch (Exception e) {
// throw new IllegalStateException("Invalid ES nodes " + "property '" + node + "'", e);
// }
// });
// RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));
//
// return getRestHighLevelClient(builder, elasticSearchConfig);
// }
//
//
// /**
// * get restHistLevelClient
// *
// * @param builder RestClientBuilder
// * @param elasticSearchConfig elasticsearch default properties
// * @return {@link RestHighLevelClient}
// */
// private static RestHighLevelClient getRestHighLevelClient(RestClientBuilder builder, ElasticSearchConfig elasticSearchConfig) {
//
// // Callback used the default {@link RequestConfig} being set to the {@link CloseableHttpClient}
// builder.setRequestConfigCallback(requestConfigBuilder -> {
// requestConfigBuilder.setConnectTimeout(elasticSearchConfig.getConnectTimeout());
// requestConfigBuilder.setSocketTimeout(elasticSearchConfig.getSocketTimeout());
// requestConfigBuilder.setConnectionRequestTimeout(elasticSearchConfig.getConnectionRequestTimeout());
// return requestConfigBuilder;
// });
//
// // Callback used to customize the {@link CloseableHttpClient} instance used by a {@link RestClient} instance.
// builder.setHttpClientConfigCallback(httpClientBuilder -> {
// httpClientBuilder.setMaxConnTotal(elasticSearchConfig.getMaxConnectTotal());
// httpClientBuilder.setMaxConnPerRoute(elasticSearchConfig.getMaxConnectPerRoute());
// return httpClientBuilder;
// });
//
// // Callback used the basic credential auth
// ElasticSearchConfig.Account account = elasticSearchConfig.getAccount();
// if (!StringUtils.isEmpty(account.getUsername()) && !StringUtils.isEmpty(account.getUsername())) {
// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//
// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(account.getUsername(), account.getPassword()));
// }
// return new RestHighLevelClient(builder);
// }
//}
\ No newline at end of file
package com.yeejoin.equip.utils;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpResponse;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.Args;
/**
* An implementation of a strategy deciding duration that a connection can remain idle, but setting
* the keep alive to a maximum of 10 minutes (600 seconds).
*/
public class OdpConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {
/**
* Instance object.
*/
public static final OdpConnectionKeepAliveStrategy INSTANCE = new OdpConnectionKeepAliveStrategy();
@Override
public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) {
Args.notNull(response, "HTTP response");
final HeaderElementIterator it = new BasicHeaderElementIterator(
response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
final HeaderElement he = it.nextElement();
final String param = he.getName();
final String value = he.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
try {
return Long.parseLong(value) * 1000;
} catch (final NumberFormatException ignore) {
}
}
}
return 600 * 1000;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment