Skip to content

Commit

Permalink
Merge branch 'main' into naming_change
Browse files Browse the repository at this point in the history
  • Loading branch information
naveen pajjuri committed Aug 4, 2021
2 parents 87e1c90 + f5371a4 commit 219f61c
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 46 deletions.
185 changes: 160 additions & 25 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,22 @@
* permissions and limitations under the License.
*/


import javax.net.ssl.HostnameVerifier
import javax.net.ssl.HttpsURLConnection
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLSession
import javax.net.ssl.TrustManager
import javax.net.ssl.X509TrustManager
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.GeneralSecurityException
import java.security.cert.X509Certificate
import java.util.concurrent.Callable
import org.elasticsearch.gradle.testclusters.TestClusterConfiguration
import java.util.function.Predicate
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import org.elasticsearch.gradle.testclusters.ElasticsearchCluster

plugins {
id "elasticsearch.esplugin" version "7.10.2"
Expand Down Expand Up @@ -124,12 +133,112 @@ def securityPluginFile = new Callable<RegularFile>() {
}
}

// TODO: Remove this once the integration test framework supports configuring and installing other plugins
def isReleaseTask = "release" in gradle.startParameter.taskNames

// Clone of WaitForHttpResource with updated code to support Cross cluster usecase
class CrossClusterWaitForHttpResource {

private URL url;
private String username;
private String password;
Set<Integer> validResponseCodes = Collections.singleton(200);

CrossClusterWaitForHttpResource(String protocol, String host, int numberOfNodes) throws MalformedURLException {
this(new URL(protocol + "://" + host + "/_cluster/health?wait_for_nodes=>=" + numberOfNodes + "&wait_for_status=yellow"));
}

CrossClusterWaitForHttpResource(URL url) {
this.url = url;
}

boolean wait(int durationInMs) throws GeneralSecurityException, InterruptedException, IOException {
final long waitUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationInMs);
final long sleep = 100;

IOException failure = null;
while (true) {
try {
checkResource();
return true;
} catch (IOException e) {
failure = e;
}
if (System.nanoTime() < waitUntil) {
Thread.sleep(sleep);
} else {
throw failure;
}
}
}

void setUsername(String username) {
this.username = username;
}

void setPassword(String password) {
this.password = password;
}

void checkResource() throws IOException {
final HttpURLConnection connection = buildConnection()
connection.connect();
final Integer response = connection.getResponseCode();
if (validResponseCodes.contains(response)) {
return;
} else {
throw new IOException(response + " " + connection.getResponseMessage());
}
}

HttpURLConnection buildConnection() throws IOException {
final HttpURLConnection connection = (HttpURLConnection) this.@url.openConnection();

if (connection instanceof HttpsURLConnection) {
TrustManager[] trustAllCerts = [ new X509TrustManager() {
X509Certificate[] getAcceptedIssuers() {
return null;
}

void checkClientTrusted(X509Certificate[] certs, String authType) {
}

void checkServerTrusted(X509Certificate[] certs, String authType) {
}
}
] as TrustManager[];
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, new java.security.SecureRandom());
connection.setSSLSocketFactory(sc.getSocketFactory());
// Create all-trusting host name verifier
HostnameVerifier allHostsValid = new HostnameVerifier() {
boolean verify(String hostname, SSLSession session) {
return true;
}
};
// Install the all-trusting host verifier
connection.setHostnameVerifier(allHostsValid);
}

configureBasicAuth(connection);
connection.setRequestMethod("GET");
return connection;
}

void configureBasicAuth(HttpURLConnection connection) {
if (username != null) {
if (password == null) {
throw new IllegalStateException("Basic Auth user [" + username + "] has been set, but no password has been configured");
}
connection.setRequestProperty(
"Authorization",
"Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8))
);
}
}

}

/*
* TODO: Default to false as it needs extending RunTask for automation
* If enabled, make sure to run initializeSecurityIndex task
* To run security tests
*/
def securityEnabled = findProperty("security") == "true"

Expand All @@ -138,7 +247,7 @@ def _numNodes = findProperty('numNodes') as Integer ?: 1
testClusters {
leaderCluster {
plugin(project.tasks.bundlePlugin.archiveFile)
if(!isReleaseTask && securityEnabled) {
if(securityEnabled) {
plugin(provider(securityPluginFile))
cliSetup("opendistro_security/install_demo_configuration.sh", "-y")
}
Expand All @@ -152,7 +261,7 @@ testClusters {
followCluster {
testDistribution = "INTEG_TEST"
plugin(project.tasks.bundlePlugin.archiveFile)
if(!isReleaseTask && securityEnabled) {
if(securityEnabled) {
plugin(provider(securityPluginFile))
cliSetup("opendistro_security/install_demo_configuration.sh", "-y")
}
Expand All @@ -164,13 +273,54 @@ testClusters {
}
}

def configureCluster(ElasticsearchCluster cluster, Boolean securityEnabled) {
// clear existing health checks as we will need custom handling based on
// security plugin installation
cluster.@waitConditions.clear()
String unicastUris = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining("\n"))
// Manually write the unicast hosts as we are not depending on the internal method
cluster.nodes.forEach {node ->
try {
Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new java.io.UncheckedIOException("Failed to write unicast_hosts for " + this, e);
}
}

// Health check based on security plugin installation
Predicate pred = { ElasticsearchCluster c ->
String protocol = "http"
if(securityEnabled && !c.name.equalsIgnoreCase("integTest")) {
protocol = "https"
}
CrossClusterWaitForHttpResource wait = new CrossClusterWaitForHttpResource(protocol, cluster.getFirstNode().getHttpSocketURI(), cluster.nodes.size())
wait.setUsername("admin")
wait.setPassword("admin")
return wait.wait(500)
}

cluster.@waitConditions.put("cluster health yellow", pred)
cluster.waitForAllConditions()
}

integTest {
useCluster testClusters.leaderCluster
useCluster testClusters.followCluster
doFirst {
getClusters().forEach { cluster ->
systemProperty "tests.cluster.${cluster.name}.http_hosts", "${-> cluster.allHttpSocketURI.join(',')}"
systemProperty "tests.cluster.${cluster.name}.transport_hosts", "${-> cluster.allTransportPortURI.join(',')}"
String alltransportSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining(","))
String allHttpSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllHttpSocketURI().stream()
}.collect(Collectors.joining(","))

systemProperty "tests.cluster.${cluster.name}.http_hosts", "${-> allHttpSocketURI}"
systemProperty "tests.cluster.${cluster.name}.transport_hosts", "${-> alltransportSocketURI}"
systemProperty "tests.cluster.${cluster.name}.security_enabled", "${-> securityEnabled.toString()}"
configureCluster(cluster, securityEnabled)
}
}
systemProperty "build.dir", "${buildDir}"
Expand All @@ -182,22 +332,7 @@ run {
useCluster testClusters.followCluster
doFirst {
getClusters().forEach { cluster ->
LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>()
cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster)
// Write unicast file manually - we could't wait on internal method(waitForAllConditions) as
// cluster health needs changes based on security plugin installation.
String unicastUris = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining("\n"))
cluster.nodes.forEach{node ->
try {
Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new java.io.UncheckedIOException("Failed to write unicast_hosts for " + this, e);
}
}
// TODO: Add health check and avoid wait for the cluster formation
cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster)
configureCluster(cluster, securityEnabled)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService
shards.forEach {
val followerShardId = it.value.shardId
log.debug("Removing lease for $followerShardId.id ")
retentionLeaseHelper.removeRetentionLease(ShardId(params.leaderIndex, followerShardId.id), followerShardId)
retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, followerShardId.id), followerShardId)
}

return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
shards.forEach {
val followerShardId = it.value.shardId
log.debug("Removing lease for $followerShardId.id ")
retentionLeaseHelper.removeRetentionLease(ShardId(params.leaderIndex, followerShardId.id), followerShardId)
retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, followerShardId.id), followerShardId)
}
} catch (e: Exception) {
log.error("Exception while trying to remove Retention Lease ", e )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class LeaderClusterRetentionLeaseHelper constructor(val followerClusterName: Str
client.suspendExecute(RetentionLeaseActions.Renew.INSTANCE, request)
}

public suspend fun removeRetentionLease(remoteShardId: ShardId, followerShardId: ShardId) {
public suspend fun attemptRetentionLeaseRemoval(remoteShardId: ShardId, followerShardId: ShardId) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterName, followerShardId)
val request = RetentionLeaseActions.RemoveRequest(remoteShardId, retentionLeaseId)
try {
Expand All @@ -88,6 +88,10 @@ class LeaderClusterRetentionLeaseHelper constructor(val followerClusterName: Str
} catch(e: RetentionLeaseNotFoundException) {
// log error and bail
log.error("${e.message}")
} catch (e: Exception) {
// We are not bubbling up the exception as the stop action/ task cleanup should succeed
// even if we fail to remove the retention lease from leader cluster
log.error("Exception in removing retention lease", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName)?.shards()
shards?.forEach {
val followerShardId = it.value.shardId
retentionLeaseHelper.removeRetentionLease(ShardId(leaderIndex, followerShardId.id), followerShardId)
retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(leaderIndex, followerShardId.id), followerShardId)
}

/* As given here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
logDebug("Pausing and not removing lease for index $followerIndexName and shard $followerShardId task")
return
}
retentionLeaseHelper.removeRetentionLease(remoteShardId, followerShardId)
retentionLeaseHelper.attemptRetentionLeaseRemoval(remoteShardId, followerShardId)
}

private fun addListenerToInterruptTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.action.ActionType
import org.elasticsearch.action.support.TransportActions
import org.elasticsearch.action.index.IndexRequestBuilder
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.action.support.TransportActions
import org.elasticsearch.client.Client
import org.elasticsearch.index.shard.ShardId
import org.elasticsearch.common.util.concurrent.ThreadContext
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.index.store.Store
import org.elasticsearch.indices.recovery.RecoveryState
import org.elasticsearch.repositories.IndexId
Expand Down Expand Up @@ -110,7 +111,9 @@ suspend fun <Req: ActionRequest, Resp: ActionResponse> Client.suspendExecuteWith
try {
return suspendExecute(replicationMetadata, action, req, defaultContext = defaultContext)
} catch (e: ElasticsearchException) {
if (retryOn.contains(e.javaClass) || TransportActions.isShardNotAvailableException(e)) {
// Not retrying for IndexNotFoundException as it is not a transient failure
// TODO Remove this check for IndexNotFoundException: https://github.com/opensearch-project/cross-cluster-replication/issues/78
if (e !is IndexNotFoundException && (retryOn.contains(e.javaClass) || TransportActions.isShardNotAvailableException(e))) {
log.warn("Encountered a failure while executing in $req. Retrying in ${currentBackoff/1000} seconds" +
".", e)
delay(currentBackoff)
Expand Down
Loading

0 comments on commit 219f61c

Please sign in to comment.