Skip to content

Commit

Permalink
Improve Elasticsearch cluster support (node sniffer).
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Hale committed Nov 16, 2023
1 parent 38ea84a commit 1228029
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 103 deletions.
5 changes: 5 additions & 0 deletions sail/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
82 changes: 67 additions & 15 deletions sail/src/main/java/com/msd/gin/halyard/sail/ElasticSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,32 @@
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
import org.elasticsearch.client.sniff.NodesSniffer;
import org.elasticsearch.client.sniff.Sniffer;

import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;

public final class ElasticSettings {
private static final String USER = "es.net.http.auth.user";
private static final String PASS = "es.net.http.auth.pass";
private static final String IO_THREADS = "halyard.es.ioThreads";

String protocol;
String host;
int port;
String username;
String password;
String indexName;
SSLSettings sslSettings;
int ioThreads;

public String getProtocol() {
return protocol;
Expand Down Expand Up @@ -89,44 +99,52 @@ public static ElasticSettings from(String esIndexUrl, Configuration conf) {
throw new IllegalArgumentException(e);
}
if (merged.username == null) {
merged.username = conf.get("es.net.http.auth.user");
merged.username = conf.get(USER);
}
if (merged.password == null) {
merged.password = conf.get("es.net.http.auth.pass");
merged.password = conf.get(PASS);
}
if ("https".equals(merged.protocol)) {
merged.sslSettings = SSLSettings.from(conf);
}
merged.ioThreads = conf.getInt(IO_THREADS, 0);
return merged;
}

public static ElasticSettings merge(Configuration conf, ElasticSettings defaults) {
ElasticSettings merged;
ElasticSettings urlSettings;
String esIndexUrl = conf.get(HBaseSail.ELASTIC_INDEX_URL);
if (esIndexUrl != null) {
try {
merged = from(new URL(esIndexUrl));
urlSettings = from(new URL(esIndexUrl));
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
} else if (defaults != null) {
merged = new ElasticSettings();
merged.protocol = defaults.protocol;
merged.host = defaults.host;
merged.port = defaults.port;
merged.indexName = defaults.indexName;
urlSettings = new ElasticSettings();
urlSettings.protocol = defaults.protocol;
urlSettings.host = defaults.host;
urlSettings.port = defaults.port;
urlSettings.indexName = defaults.indexName;
} else {
return null;
}
if (merged.username == null) {
merged.username = conf.get("es.net.http.auth.user", defaults != null ? defaults.username : null);
}
if (merged.password == null) {
merged.password = conf.get("es.net.http.auth.pass", defaults != null ? defaults.password : null);
if (defaults != null) {
if (urlSettings.username == null) {
urlSettings.username = defaults.username;
}
if (urlSettings.password == null) {
urlSettings.password = defaults.password;
}
}

ElasticSettings merged = urlSettings;
merged.username = conf.get(USER, urlSettings.username);
merged.password = conf.get(PASS, urlSettings.password);
if ("https".equals(merged.protocol)) {
merged.sslSettings = SSLSettings.merge(conf, defaults != null ? defaults.sslSettings : null);
}
merged.ioThreads = conf.getInt(IO_THREADS, defaults != null ? defaults.ioThreads : 0);
return merged;
}

Expand Down Expand Up @@ -154,10 +172,44 @@ public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpCli
if (sslContext != null) {
httpClientBuilder.setSSLContext(sslContext);
}
if (ioThreads > 0) {
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(ioThreads).build());
}
return httpClientBuilder;
}
});
RestClient restClient = restClientBuilder.build();
return new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchNodesSniffer.Scheme snifferScheme;
switch (protocol) {
case "http":
snifferScheme = ElasticsearchNodesSniffer.Scheme.HTTP;
break;
case "https":
snifferScheme = ElasticsearchNodesSniffer.Scheme.HTTPS;
break;
default:
throw new AssertionError("Unsupported scheme: " + protocol);
}
NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(restClient, ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT, snifferScheme);
Sniffer sniffer = Sniffer.builder(restClient).setNodesSniffer(nodesSniffer).build();
return new RestClientTransportWithSniffer(restClient, new JacksonJsonpMapper(), sniffer);
}

private static final class RestClientTransportWithSniffer extends RestClientTransport {
private final Sniffer sniffer;

public RestClientTransportWithSniffer(RestClient restClient, JsonpMapper mapper, Sniffer sniffer) {
super(restClient, mapper);
this.sniffer = sniffer;
}

@Override
public void close() throws IOException {
try {
sniffer.close();
} finally {
super.close();
}
}
}
}
Loading

0 comments on commit 1228029

Please sign in to comment.