Skip to content

Commit

Permalink
Changes to support security plugin integ tests
Browse files Browse the repository at this point in the history
  • Loading branch information
saikaranam-amazon committed Aug 2, 2021
1 parent f8dbc76 commit 419d789
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 38 deletions.
185 changes: 160 additions & 25 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,22 @@
* permissions and limitations under the License.
*/


import javax.net.ssl.HostnameVerifier
import javax.net.ssl.HttpsURLConnection
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLSession
import javax.net.ssl.TrustManager
import javax.net.ssl.X509TrustManager
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.GeneralSecurityException
import java.security.cert.X509Certificate
import java.util.concurrent.Callable
import org.elasticsearch.gradle.testclusters.TestClusterConfiguration
import java.util.function.Predicate
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import org.elasticsearch.gradle.testclusters.ElasticsearchCluster

plugins {
id "elasticsearch.esplugin" version "7.10.2"
Expand Down Expand Up @@ -124,12 +133,112 @@ def securityPluginFile = new Callable<RegularFile>() {
}
}

// TODO: Remove this once the integration test framework supports configuring and installing other plugins
def isReleaseTask = "release" in gradle.startParameter.taskNames

// Clone of WaitForHttpResource with updated code to support Cross cluster usecase
class CrossClusterWaitForHttpResource {

private URL url;
private String username;
private String password;
Set<Integer> validResponseCodes = Collections.singleton(200);

CrossClusterWaitForHttpResource(String protocol, String host, int numberOfNodes) throws MalformedURLException {
this(new URL(protocol + "://" + host + "/_cluster/health?wait_for_nodes=>=" + numberOfNodes + "&wait_for_status=yellow"));
}

CrossClusterWaitForHttpResource(URL url) {
this.url = url;
}

boolean wait(int durationInMs) throws GeneralSecurityException, InterruptedException, IOException {
final long waitUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationInMs);
final long sleep = 100;

IOException failure = null;
while (true) {
try {
checkResource();
return true;
} catch (IOException e) {
failure = e;
}
if (System.nanoTime() < waitUntil) {
Thread.sleep(sleep);
} else {
throw failure;
}
}
}

void setUsername(String username) {
this.username = username;
}

void setPassword(String password) {
this.password = password;
}

void checkResource() throws IOException {
final HttpURLConnection connection = buildConnection()
connection.connect();
final Integer response = connection.getResponseCode();
if (validResponseCodes.contains(response)) {
return;
} else {
throw new IOException(response + " " + connection.getResponseMessage());
}
}

HttpURLConnection buildConnection() throws IOException {
final HttpURLConnection connection = (HttpURLConnection) this.@url.openConnection();

if (connection instanceof HttpsURLConnection) {
TrustManager[] trustAllCerts = [ new X509TrustManager() {
X509Certificate[] getAcceptedIssuers() {
return null;
}

void checkClientTrusted(X509Certificate[] certs, String authType) {
}

void checkServerTrusted(X509Certificate[] certs, String authType) {
}
}
] as TrustManager[];
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, new java.security.SecureRandom());
connection.setSSLSocketFactory(sc.getSocketFactory());
// Create all-trusting host name verifier
HostnameVerifier allHostsValid = new HostnameVerifier() {
boolean verify(String hostname, SSLSession session) {
return true;
}
};
// Install the all-trusting host verifier
connection.setHostnameVerifier(allHostsValid);
}

configureBasicAuth(connection);
connection.setRequestMethod("GET");
return connection;
}

void configureBasicAuth(HttpURLConnection connection) {
if (username != null) {
if (password == null) {
throw new IllegalStateException("Basic Auth user [" + username + "] has been set, but no password has been configured");
}
connection.setRequestProperty(
"Authorization",
"Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8))
);
}
}

}

/*
* TODO: Default to false as it needs extending RunTask for automation
* If enabled, make sure to run initializeSecurityIndex task
* To run security tests
*/
def securityEnabled = findProperty("security") == "true"

Expand All @@ -138,7 +247,7 @@ def _numNodes = findProperty('numNodes') as Integer ?: 1
testClusters {
leaderCluster {
plugin(project.tasks.bundlePlugin.archiveFile)
if(!isReleaseTask && securityEnabled) {
if(securityEnabled) {
plugin(provider(securityPluginFile))
cliSetup("opendistro_security/install_demo_configuration.sh", "-y")
}
Expand All @@ -152,7 +261,7 @@ testClusters {
followCluster {
testDistribution = "INTEG_TEST"
plugin(project.tasks.bundlePlugin.archiveFile)
if(!isReleaseTask && securityEnabled) {
if(securityEnabled) {
plugin(provider(securityPluginFile))
cliSetup("opendistro_security/install_demo_configuration.sh", "-y")
}
Expand All @@ -164,13 +273,54 @@ testClusters {
}
}

def configureCluster(ElasticsearchCluster cluster, Boolean securityEnabled) {
// clear existing health checks as we will need custom handling based on
// security plugin installation
cluster.@waitConditions.clear()
String unicastUris = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining("\n"))
// Manually write the unicast hosts as we are not depending on the internal method
cluster.nodes.forEach {node ->
try {
Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new java.io.UncheckedIOException("Failed to write unicast_hosts for " + this, e);
}
}

// Health check based on security plugin installation
Predicate pred = { ElasticsearchCluster c ->
String protocol = "http"
if(securityEnabled && !c.name.equalsIgnoreCase("integTest")) {
protocol = "https"
}
CrossClusterWaitForHttpResource wait = new CrossClusterWaitForHttpResource(protocol, cluster.getFirstNode().getHttpSocketURI(), cluster.nodes.size())
wait.setUsername("admin")
wait.setPassword("admin")
return wait.wait(500)
}

cluster.@waitConditions.put("cluster health yellow", pred)
cluster.waitForAllConditions()
}

integTest {
useCluster testClusters.leaderCluster
useCluster testClusters.followCluster
doFirst {
getClusters().forEach { cluster ->
systemProperty "tests.cluster.${cluster.name}.http_hosts", "${-> cluster.allHttpSocketURI.join(',')}"
systemProperty "tests.cluster.${cluster.name}.transport_hosts", "${-> cluster.allTransportPortURI.join(',')}"
String alltransportSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining(","))
String allHttpSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllHttpSocketURI().stream()
}.collect(Collectors.joining(","))

systemProperty "tests.cluster.${cluster.name}.http_hosts", "${-> allHttpSocketURI}"
systemProperty "tests.cluster.${cluster.name}.transport_hosts", "${-> alltransportSocketURI}"
systemProperty "tests.cluster.${cluster.name}.security_enabled", "${-> securityEnabled.toString()}"
configureCluster(cluster, securityEnabled)
}
}
systemProperty "build.dir", "${buildDir}"
Expand All @@ -182,22 +332,7 @@ run {
useCluster testClusters.followCluster
doFirst {
getClusters().forEach { cluster ->
LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>()
cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster)
// Write unicast file manually - we could't wait on internal method(waitForAllConditions) as
// cluster health needs changes based on security plugin installation.
String unicastUris = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining("\n"))
cluster.nodes.forEach{node ->
try {
Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new java.io.UncheckedIOException("Failed to write unicast_hosts for " + this, e);
}
}
// TODO: Add health check and avoid wait for the cluster formation
cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster)
configureCluster(cluster, securityEnabled)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ import java.security.cert.CertificateException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.Collections
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManager
import javax.net.ssl.X509TrustManager
import javax.security.cert.X509Certificate

/**
* This class provides basic support of managing life-cyle of
Expand All @@ -74,12 +78,31 @@ abstract class MultiClusterRestTestCase : ESTestCase() {

class TestCluster(clusterName: String, val httpHosts: List<HttpHost>, val transportPorts: List<String>,
val preserveSnapshots: Boolean, val preserveIndices: Boolean,
val preserveClusterSettings: Boolean) {
val preserveClusterSettings: Boolean,
val securityEnabled: Boolean) {
val restClient : RestHighLevelClient
init {
val builder = RestClient.builder(*httpHosts.toTypedArray())
configureClient(builder, getClusterSettings(clusterName))
builder.setStrictDeprecationMode(true)
val trustCerts = arrayOf<TrustManager>(object: X509TrustManager {
override fun checkClientTrusted(chain: Array<out java.security.cert.X509Certificate>?, authType: String?) {
}

override fun checkServerTrusted(chain: Array<out java.security.cert.X509Certificate>?, authType: String?) {
}

override fun getAcceptedIssuers(): Array<out java.security.cert.X509Certificate>? {
return null
}

})
val sslContext = SSLContext.getInstance("SSL")
sslContext.init(null, trustCerts, java.security.SecureRandom())

val builder = RestClient.builder(*httpHosts.toTypedArray()).setHttpClientConfigCallback { httpAsyncClientBuilder ->
httpAsyncClientBuilder.setSSLHostnameVerifier { _, _ -> true } // Disable hostname verification for local cluster
httpAsyncClientBuilder.setSSLContext(sslContext)
}
configureClient(builder, getClusterSettings(clusterName), securityEnabled)
builder.setStrictDeprecationMode(false)
restClient = RestHighLevelClient(builder)
}
val lowLevelClient = restClient.lowLevelClient!!
Expand All @@ -93,14 +116,20 @@ abstract class MultiClusterRestTestCase : ESTestCase() {
val systemProperties = BootstrapInfo.getSystemProperties()
val httpHostsProp = systemProperties.get("tests.cluster.${cluster}.http_hosts") as String?
val transportHostsProp = systemProperties.get("tests.cluster.${cluster}.transport_hosts") as String?
val securityEnabled = systemProperties.get("tests.cluster.${cluster}.security_enabled") as String?

requireNotNull(httpHostsProp) { "Missing http hosts property for cluster: $cluster."}
requireNotNull(transportHostsProp) { "Missing transport hosts property for cluster: $cluster."}
requireNotNull(securityEnabled) { "Missing security enabled property for cluster: $cluster."}

val httpHosts = httpHostsProp.split(',').map { HttpHost.create("http://$it") }
var protocol = "http"
if(securityEnabled.equals("true", true)) {
protocol = "https"
}
val httpHosts = httpHostsProp.split(',').map { HttpHost.create("$protocol://$it") }
val transportPorts = transportHostsProp.split(',')
return TestCluster(cluster, httpHosts, transportPorts, configuration.preserveSnapshots,
configuration.preserveIndices, configuration.preserveClusterSettings)
configuration.preserveIndices, configuration.preserveClusterSettings, securityEnabled.equals("true", true))
}

private fun getClusterConfigurations(): List<ClusterConfiguration> {
Expand Down Expand Up @@ -143,7 +172,7 @@ abstract class MultiClusterRestTestCase : ESTestCase() {


/* Copied this method from [ESRestCase] */
protected fun configureClient(builder: RestClientBuilder, settings: Settings) {
protected fun configureClient(builder: RestClientBuilder, settings: Settings, securityEnabled: Boolean) {
val keystorePath = settings[ESRestTestCase.TRUSTSTORE_PATH]
if (keystorePath != null) {
val keystorePass = settings[ESRestTestCase.TRUSTSTORE_PASSWORD]
Expand Down Expand Up @@ -172,12 +201,21 @@ abstract class MultiClusterRestTestCase : ESTestCase() {
}
}
val headers = ThreadContext.buildDefaultHeaders(settings)
val defaultHeaders = arrayOfNulls<Header>(headers.size)
var headerSize = headers.size
if(securityEnabled) {
headerSize = headers.size + 1
}
val defaultHeaders = arrayOfNulls<Header>(headerSize)
var i = 0
for ((key, value) in headers) {
defaultHeaders[i++] = BasicHeader(key, value)
}
if(securityEnabled) {
defaultHeaders[i++] = BasicHeader("Authorization", "Basic YWRtaW46YWRtaW4=")
}

builder.setDefaultHeaders(defaultHeaders)
builder.setStrictDeprecationMode(false)
val socketTimeoutString = settings[ESRestTestCase.CLIENT_SOCKET_TIMEOUT]
val socketTimeout = TimeValue.parseTimeValue(socketTimeoutString ?: "60s",
ESRestTestCase.CLIENT_SOCKET_TIMEOUT)
Expand Down
Loading

0 comments on commit 419d789

Please sign in to comment.