diff --git a/build.gradle b/build.gradle index 80fef3d8..c6b444f6 100644 --- a/build.gradle +++ b/build.gradle @@ -13,13 +13,22 @@ * permissions and limitations under the License. */ + +import javax.net.ssl.HostnameVerifier +import javax.net.ssl.HttpsURLConnection +import javax.net.ssl.SSLContext +import javax.net.ssl.SSLSession +import javax.net.ssl.TrustManager +import javax.net.ssl.X509TrustManager import java.nio.charset.StandardCharsets import java.nio.file.Files +import java.security.GeneralSecurityException +import java.security.cert.X509Certificate import java.util.concurrent.Callable -import org.elasticsearch.gradle.testclusters.TestClusterConfiguration import java.util.function.Predicate import java.util.concurrent.TimeUnit import java.util.stream.Collectors +import org.elasticsearch.gradle.testclusters.ElasticsearchCluster plugins { id "elasticsearch.esplugin" version "7.10.2" @@ -124,12 +133,112 @@ def securityPluginFile = new Callable() { } } -// TODO: Remove this once the integration test framework supports configuring and installing other plugins -def isReleaseTask = "release" in gradle.startParameter.taskNames + +// Clone of WaitForHttpResource with updated code to support Cross cluster usecase +class CrossClusterWaitForHttpResource { + + private URL url; + private String username; + private String password; + Set validResponseCodes = Collections.singleton(200); + + CrossClusterWaitForHttpResource(String protocol, String host, int numberOfNodes) throws MalformedURLException { + this(new URL(protocol + "://" + host + "/_cluster/health?wait_for_nodes=>=" + numberOfNodes + "&wait_for_status=yellow")); + } + + CrossClusterWaitForHttpResource(URL url) { + this.url = url; + } + + boolean wait(int durationInMs) throws GeneralSecurityException, InterruptedException, IOException { + final long waitUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationInMs); + final long sleep = 100; + + IOException failure = null; + while (true) { + try { + checkResource(); + return true; + } catch (IOException e) { + failure = e; + } + if (System.nanoTime() < waitUntil) { + Thread.sleep(sleep); + } else { + throw failure; + } + } + } + + void setUsername(String username) { + this.username = username; + } + + void setPassword(String password) { + this.password = password; + } + + void checkResource() throws IOException { + final HttpURLConnection connection = buildConnection() + connection.connect(); + final Integer response = connection.getResponseCode(); + if (validResponseCodes.contains(response)) { + return; + } else { + throw new IOException(response + " " + connection.getResponseMessage()); + } + } + + HttpURLConnection buildConnection() throws IOException { + final HttpURLConnection connection = (HttpURLConnection) this.@url.openConnection(); + + if (connection instanceof HttpsURLConnection) { + TrustManager[] trustAllCerts = [ new X509TrustManager() { + X509Certificate[] getAcceptedIssuers() { + return null; + } + + void checkClientTrusted(X509Certificate[] certs, String authType) { + } + + void checkServerTrusted(X509Certificate[] certs, String authType) { + } + } + ] as TrustManager[]; + SSLContext sc = SSLContext.getInstance("SSL"); + sc.init(null, trustAllCerts, new java.security.SecureRandom()); + connection.setSSLSocketFactory(sc.getSocketFactory()); + // Create all-trusting host name verifier + HostnameVerifier allHostsValid = new HostnameVerifier() { + boolean verify(String hostname, SSLSession session) { + return true; + } + }; + // Install the all-trusting host verifier + connection.setHostnameVerifier(allHostsValid); + } + + configureBasicAuth(connection); + connection.setRequestMethod("GET"); + return connection; + } + + void configureBasicAuth(HttpURLConnection connection) { + if (username != null) { + if (password == null) { + throw new IllegalStateException("Basic Auth user [" + username + "] has been set, but no password has been configured"); + } + connection.setRequestProperty( + "Authorization", + "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8)) + ); + } + } + +} /* -* TODO: Default to false as it needs extending RunTask for automation -* If enabled, make sure to run initializeSecurityIndex task +* To run security tests */ def securityEnabled = findProperty("security") == "true" @@ -138,7 +247,7 @@ def _numNodes = findProperty('numNodes') as Integer ?: 1 testClusters { leaderCluster { plugin(project.tasks.bundlePlugin.archiveFile) - if(!isReleaseTask && securityEnabled) { + if(securityEnabled) { plugin(provider(securityPluginFile)) cliSetup("opendistro_security/install_demo_configuration.sh", "-y") } @@ -152,7 +261,7 @@ testClusters { followCluster { testDistribution = "INTEG_TEST" plugin(project.tasks.bundlePlugin.archiveFile) - if(!isReleaseTask && securityEnabled) { + if(securityEnabled) { plugin(provider(securityPluginFile)) cliSetup("opendistro_security/install_demo_configuration.sh", "-y") } @@ -164,13 +273,54 @@ testClusters { } } +def configureCluster(ElasticsearchCluster cluster, Boolean securityEnabled) { + // clear existing health checks as we will need custom handling based on + // security plugin installation + cluster.@waitConditions.clear() + String unicastUris = cluster.nodes.stream().flatMap { node -> + node.getAllTransportPortURI().stream() + }.collect(Collectors.joining("\n")) + // Manually write the unicast hosts as we are not depending on the internal method + cluster.nodes.forEach {node -> + try { + Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new java.io.UncheckedIOException("Failed to write unicast_hosts for " + this, e); + } + } + + // Health check based on security plugin installation + Predicate pred = { ElasticsearchCluster c -> + String protocol = "http" + if(securityEnabled && !c.name.equalsIgnoreCase("integTest")) { + protocol = "https" + } + CrossClusterWaitForHttpResource wait = new CrossClusterWaitForHttpResource(protocol, cluster.getFirstNode().getHttpSocketURI(), cluster.nodes.size()) + wait.setUsername("admin") + wait.setPassword("admin") + return wait.wait(500) + } + + cluster.@waitConditions.put("cluster health yellow", pred) + cluster.waitForAllConditions() +} + integTest { useCluster testClusters.leaderCluster useCluster testClusters.followCluster doFirst { getClusters().forEach { cluster -> - systemProperty "tests.cluster.${cluster.name}.http_hosts", "${-> cluster.allHttpSocketURI.join(',')}" - systemProperty "tests.cluster.${cluster.name}.transport_hosts", "${-> cluster.allTransportPortURI.join(',')}" + String alltransportSocketURI = cluster.nodes.stream().flatMap { node -> + node.getAllTransportPortURI().stream() + }.collect(Collectors.joining(",")) + String allHttpSocketURI = cluster.nodes.stream().flatMap { node -> + node.getAllHttpSocketURI().stream() + }.collect(Collectors.joining(",")) + + systemProperty "tests.cluster.${cluster.name}.http_hosts", "${-> allHttpSocketURI}" + systemProperty "tests.cluster.${cluster.name}.transport_hosts", "${-> alltransportSocketURI}" + systemProperty "tests.cluster.${cluster.name}.security_enabled", "${-> securityEnabled.toString()}" + configureCluster(cluster, securityEnabled) } } systemProperty "build.dir", "${buildDir}" @@ -182,22 +332,7 @@ run { useCluster testClusters.followCluster doFirst { getClusters().forEach { cluster -> - LinkedHashMap> waitConditions = new LinkedHashMap<>() - cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster) - // Write unicast file manually - we could't wait on internal method(waitForAllConditions) as - // cluster health needs changes based on security plugin installation. - String unicastUris = cluster.nodes.stream().flatMap { node -> - node.getAllTransportPortURI().stream() - }.collect(Collectors.joining("\n")) - cluster.nodes.forEach{node -> - try { - Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8)); - } catch (IOException e) { - throw new java.io.UncheckedIOException("Failed to write unicast_hosts for " + this, e); - } - } - // TODO: Add health check and avoid wait for the cluster formation - cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster) + configureCluster(cluster, securityEnabled) } } } diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/MultiClusterRestTestCase.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/MultiClusterRestTestCase.kt index 57cefbd5..64a13cd4 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/MultiClusterRestTestCase.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/MultiClusterRestTestCase.kt @@ -65,6 +65,10 @@ import java.security.cert.CertificateException import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import java.util.Collections +import javax.net.ssl.SSLContext +import javax.net.ssl.TrustManager +import javax.net.ssl.X509TrustManager +import javax.security.cert.X509Certificate /** * This class provides basic support of managing life-cyle of @@ -74,12 +78,31 @@ abstract class MultiClusterRestTestCase : ESTestCase() { class TestCluster(clusterName: String, val httpHosts: List, val transportPorts: List, val preserveSnapshots: Boolean, val preserveIndices: Boolean, - val preserveClusterSettings: Boolean) { + val preserveClusterSettings: Boolean, + val securityEnabled: Boolean) { val restClient : RestHighLevelClient init { - val builder = RestClient.builder(*httpHosts.toTypedArray()) - configureClient(builder, getClusterSettings(clusterName)) - builder.setStrictDeprecationMode(true) + val trustCerts = arrayOf(object: X509TrustManager { + override fun checkClientTrusted(chain: Array?, authType: String?) { + } + + override fun checkServerTrusted(chain: Array?, authType: String?) { + } + + override fun getAcceptedIssuers(): Array? { + return null + } + + }) + val sslContext = SSLContext.getInstance("SSL") + sslContext.init(null, trustCerts, java.security.SecureRandom()) + + val builder = RestClient.builder(*httpHosts.toTypedArray()).setHttpClientConfigCallback { httpAsyncClientBuilder -> + httpAsyncClientBuilder.setSSLHostnameVerifier { _, _ -> true } // Disable hostname verification for local cluster + httpAsyncClientBuilder.setSSLContext(sslContext) + } + configureClient(builder, getClusterSettings(clusterName), securityEnabled) + builder.setStrictDeprecationMode(false) restClient = RestHighLevelClient(builder) } val lowLevelClient = restClient.lowLevelClient!! @@ -93,14 +116,20 @@ abstract class MultiClusterRestTestCase : ESTestCase() { val systemProperties = BootstrapInfo.getSystemProperties() val httpHostsProp = systemProperties.get("tests.cluster.${cluster}.http_hosts") as String? val transportHostsProp = systemProperties.get("tests.cluster.${cluster}.transport_hosts") as String? + val securityEnabled = systemProperties.get("tests.cluster.${cluster}.security_enabled") as String? requireNotNull(httpHostsProp) { "Missing http hosts property for cluster: $cluster."} requireNotNull(transportHostsProp) { "Missing transport hosts property for cluster: $cluster."} + requireNotNull(securityEnabled) { "Missing security enabled property for cluster: $cluster."} - val httpHosts = httpHostsProp.split(',').map { HttpHost.create("http://$it") } + var protocol = "http" + if(securityEnabled.equals("true", true)) { + protocol = "https" + } + val httpHosts = httpHostsProp.split(',').map { HttpHost.create("$protocol://$it") } val transportPorts = transportHostsProp.split(',') return TestCluster(cluster, httpHosts, transportPorts, configuration.preserveSnapshots, - configuration.preserveIndices, configuration.preserveClusterSettings) + configuration.preserveIndices, configuration.preserveClusterSettings, securityEnabled.equals("true", true)) } private fun getClusterConfigurations(): List { @@ -143,7 +172,7 @@ abstract class MultiClusterRestTestCase : ESTestCase() { /* Copied this method from [ESRestCase] */ - protected fun configureClient(builder: RestClientBuilder, settings: Settings) { + protected fun configureClient(builder: RestClientBuilder, settings: Settings, securityEnabled: Boolean) { val keystorePath = settings[ESRestTestCase.TRUSTSTORE_PATH] if (keystorePath != null) { val keystorePass = settings[ESRestTestCase.TRUSTSTORE_PASSWORD] @@ -172,12 +201,21 @@ abstract class MultiClusterRestTestCase : ESTestCase() { } } val headers = ThreadContext.buildDefaultHeaders(settings) - val defaultHeaders = arrayOfNulls
(headers.size) + var headerSize = headers.size + if(securityEnabled) { + headerSize = headers.size + 1 + } + val defaultHeaders = arrayOfNulls
(headerSize) var i = 0 for ((key, value) in headers) { defaultHeaders[i++] = BasicHeader(key, value) } + if(securityEnabled) { + defaultHeaders[i++] = BasicHeader("Authorization", "Basic YWRtaW46YWRtaW4=") + } + builder.setDefaultHeaders(defaultHeaders) + builder.setStrictDeprecationMode(false) val socketTimeoutString = settings[ESRestTestCase.CLIENT_SOCKET_TIMEOUT] val socketTimeout = TimeValue.parseTimeValue(socketTimeoutString ?: "60s", ESRestTestCase.CLIENT_SOCKET_TIMEOUT) diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt index a2cf5fd0..12584c4a 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/ReplicationHelpers.kt @@ -17,6 +17,7 @@ package com.amazon.elasticsearch.replication import com.amazon.elasticsearch.replication.task.index.IndexReplicationExecutor import com.amazon.elasticsearch.replication.task.shard.ShardReplicationExecutor +import org.apache.http.message.BasicHeader import org.assertj.core.api.Assertions.assertThat import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest @@ -33,10 +34,14 @@ import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.test.ESTestCase.assertBusy import org.elasticsearch.test.rest.ESRestTestCase import org.junit.Assert +import java.nio.charset.StandardCharsets +import java.util.* import java.util.concurrent.TimeUnit +data class AssumeRoles(val remoteClusterRole: String = "all_access", val localClusterRole: String = "all_access") + data class StartReplicationRequest(val remoteClusterAlias: String, val remoteIndex: String, val toIndex: String, - val settings: Settings = Settings.EMPTY) + val settings: Settings = Settings.EMPTY, val assumeRoles: AssumeRoles = AssumeRoles()) const val REST_REPLICATION_PREFIX = "/_plugins/_replication/" const val REST_REPLICATION_START = "$REST_REPLICATION_PREFIX{index}/_start" @@ -59,13 +64,21 @@ fun RestHighLevelClient.startReplication(request: StartReplicationRequest, if (request.settings == Settings.EMPTY) { lowLevelRequest.setJsonEntity("""{ "remote_cluster" : "${request.remoteClusterAlias}", - "remote_index": "${request.remoteIndex}" + "remote_index": "${request.remoteIndex}", + "assume_roles": { + "remote_cluster_role": "${request.assumeRoles.remoteClusterRole}", + "local_cluster_role": "${request.assumeRoles.localClusterRole}" + } } """) } else { lowLevelRequest.setJsonEntity("""{ "remote_cluster" : "${request.remoteClusterAlias}", "remote_index": "${request.remoteIndex}", + "assume_roles": { + "remote_cluster_role": "${request.assumeRoles.remoteClusterRole}", + "local_cluster_role": "${request.assumeRoles.localClusterRole}" + }, "settings": ${request.settings} } """) @@ -212,19 +225,29 @@ fun RestHighLevelClient.waitForReplicationStop(index: String, waitFor : TimeValu }, waitFor.seconds, TimeUnit.SECONDS) } -fun RestHighLevelClient.updateAutoFollowPattern(connection: String, patternName: String, pattern: String, settings: Settings = Settings.EMPTY) { +fun RestHighLevelClient.updateAutoFollowPattern(connection: String, patternName: String, pattern: String, + settings: Settings = Settings.EMPTY, + assumeRoles: AssumeRoles = AssumeRoles()) { val lowLevelRequest = Request("POST", REST_AUTO_FOLLOW_PATTERN) if (settings == Settings.EMPTY) { lowLevelRequest.setJsonEntity("""{ "connection" : "${connection}", "name" : "${patternName}", - "pattern": "${pattern}" + "pattern": "${pattern}", + "assume_roles": { + "remote_cluster_role": "${assumeRoles.remoteClusterRole}", + "local_cluster_role": "${assumeRoles.localClusterRole}" + } }""") } else { lowLevelRequest.setJsonEntity("""{ "connection" : "${connection}", "name" : "${patternName}", "pattern": "${pattern}", + "assume_roles": { + "remote_cluster_role": "${assumeRoles.remoteClusterRole}", + "local_cluster_role": "${assumeRoles.localClusterRole}" + }, "settings": $settings }""") } @@ -243,4 +266,3 @@ fun RestHighLevelClient.deleteAutoFollowPattern(connection: String, patternName: val response = getAckResponse(lowLevelResponse) assertThat(response.isAcknowledged).isTrue() } - diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/util/SecurityHelpers.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/util/SecurityHelpers.kt new file mode 100644 index 00000000..30c681e9 --- /dev/null +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/util/SecurityHelpers.kt @@ -0,0 +1,14 @@ +package com.amazon.elasticsearch.replication.util + +import org.apache.http.message.BasicHeader +import org.elasticsearch.client.RequestOptions +import java.nio.charset.StandardCharsets +import java.util.Base64 + +// To set for individual requests +// RequestOptions.DEFAULT.addBasicAuthHeader("admin", "admin") +fun RequestOptions.addBasicAuthHeader(user: String, password: String) { + this.headers.add(BasicHeader("Authorization", + "Basic " + Base64.getEncoder().encodeToString("$user:$password".toByteArray(StandardCharsets.UTF_8)))) +} + diff --git a/src/test/resources/security/plugin/opendistro-security-1.13.0.0.zip b/src/test/resources/security/plugin/opendistro-security-1.13.0.0.zip new file mode 100644 index 00000000..0d2b31b4 Binary files /dev/null and b/src/test/resources/security/plugin/opendistro-security-1.13.0.0.zip differ