diff --git a/sail/pom.xml b/sail/pom.xml
index 295210c52..c1d7d811f 100644
--- a/sail/pom.xml
+++ b/sail/pom.xml
@@ -44,6 +44,11 @@
elasticsearch-java
${elasticsearch.version}
+
+ org.elasticsearch.client
+ elasticsearch-rest-client-sniffer
+ ${elasticsearch.version}
+
com.fasterxml.jackson.core
jackson-databind
diff --git a/sail/src/main/java/com/msd/gin/halyard/sail/ElasticSettings.java b/sail/src/main/java/com/msd/gin/halyard/sail/ElasticSettings.java
index 341eec536..17bf2bca0 100644
--- a/sail/src/main/java/com/msd/gin/halyard/sail/ElasticSettings.java
+++ b/sail/src/main/java/com/msd/gin/halyard/sail/ElasticSettings.java
@@ -23,15 +23,24 @@
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
+import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
+import org.elasticsearch.client.sniff.NodesSniffer;
+import org.elasticsearch.client.sniff.Sniffer;
+import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
public final class ElasticSettings {
+ private static final String USER = "es.net.http.auth.user";
+ private static final String PASS = "es.net.http.auth.pass";
+ private static final String IO_THREADS = "halyard.es.ioThreads";
+
String protocol;
String host;
int port;
@@ -39,6 +48,7 @@ public final class ElasticSettings {
String password;
String indexName;
SSLSettings sslSettings;
+ int ioThreads;
public String getProtocol() {
return protocol;
@@ -89,44 +99,50 @@ public static ElasticSettings from(String esIndexUrl, Configuration conf) {
throw new IllegalArgumentException(e);
}
if (merged.username == null) {
- merged.username = conf.get("es.net.http.auth.user");
+ merged.username = conf.get(USER);
}
if (merged.password == null) {
- merged.password = conf.get("es.net.http.auth.pass");
+ merged.password = conf.get(PASS);
}
if ("https".equals(merged.protocol)) {
merged.sslSettings = SSLSettings.from(conf);
}
+ merged.ioThreads = conf.getInt(IO_THREADS, 0);
return merged;
}
public static ElasticSettings merge(Configuration conf, ElasticSettings defaults) {
- ElasticSettings merged;
+ ElasticSettings urlSettings;
String esIndexUrl = conf.get(HBaseSail.ELASTIC_INDEX_URL);
if (esIndexUrl != null) {
try {
- merged = from(new URL(esIndexUrl));
+ urlSettings = from(new URL(esIndexUrl));
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
} else if (defaults != null) {
- merged = new ElasticSettings();
- merged.protocol = defaults.protocol;
- merged.host = defaults.host;
- merged.port = defaults.port;
- merged.indexName = defaults.indexName;
+ urlSettings = new ElasticSettings();
+ urlSettings.protocol = defaults.protocol;
+ urlSettings.host = defaults.host;
+ urlSettings.port = defaults.port;
+ urlSettings.indexName = defaults.indexName;
} else {
return null;
}
- if (merged.username == null) {
- merged.username = conf.get("es.net.http.auth.user", defaults != null ? defaults.username : null);
+ if (urlSettings.username == null) {
+ urlSettings.username = defaults.username;
}
- if (merged.password == null) {
- merged.password = conf.get("es.net.http.auth.pass", defaults != null ? defaults.password : null);
+ if (urlSettings.password == null) {
+ urlSettings.password = defaults.password;
}
+
+ ElasticSettings merged = urlSettings;
+ merged.username = conf.get(USER, urlSettings.username);
+ merged.password = conf.get(PASS, urlSettings.password);
if ("https".equals(merged.protocol)) {
merged.sslSettings = SSLSettings.merge(conf, defaults != null ? defaults.sslSettings : null);
}
+ merged.ioThreads = conf.getInt(IO_THREADS, defaults != null ? defaults.ioThreads : 0);
return merged;
}
@@ -154,10 +170,44 @@ public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpCli
if (sslContext != null) {
httpClientBuilder.setSSLContext(sslContext);
}
+ if (ioThreads > 0) {
+ httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(ioThreads).build());
+ }
return httpClientBuilder;
}
});
RestClient restClient = restClientBuilder.build();
- return new RestClientTransport(restClient, new JacksonJsonpMapper());
+ ElasticsearchNodesSniffer.Scheme snifferScheme;
+ switch (protocol) {
+ case "http":
+ snifferScheme = ElasticsearchNodesSniffer.Scheme.HTTP;
+ break;
+ case "https":
+ snifferScheme = ElasticsearchNodesSniffer.Scheme.HTTPS;
+ break;
+ default:
+ throw new AssertionError("Unsupported scheme: " + protocol);
+ }
+ NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(restClient, ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT, snifferScheme);
+ Sniffer sniffer = Sniffer.builder(restClient).setNodesSniffer(nodesSniffer).build();
+ return new RestClientTransportWithSniffer(restClient, new JacksonJsonpMapper(), sniffer);
+ }
+
+ private static final class RestClientTransportWithSniffer extends RestClientTransport {
+ private final Sniffer sniffer;
+
+ public RestClientTransportWithSniffer(RestClient restClient, JsonpMapper mapper, Sniffer sniffer) {
+ super(restClient, mapper);
+ this.sniffer = sniffer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ sniffer.close();
+ } finally {
+ super.close();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/webapps/pom.xml b/webapps/pom.xml
index 10f94dddf..e54e19064 100644
--- a/webapps/pom.xml
+++ b/webapps/pom.xml
@@ -158,6 +158,11 @@
elasticsearch-rest-client
${elasticsearch.version}
+
+ org.elasticsearch.client
+ elasticsearch-rest-client-sniffer
+ ${elasticsearch.version}
+
org.apache.httpcomponents
httpasyncclient