Skip to content

Commit

Permalink
Prevent starting replication directly on the leader index alias. (#66)
Browse files Browse the repository at this point in the history
(This was previously resulting in NPE)

The rationale is to avoid complexity for now since an alias can be
associated with multiple indices, although an extension could be to
allow replication if there is a single match.

Added an integration test to validate the case.

While at it, also expanded BasicReplicationIT to incorporate force merge
and ensure that doesn't impact replication.

Signed-off-by: Gopala Krishna Ambareesh <[email protected]>
  • Loading branch information
krishna-ggk authored Jul 28, 2021
1 parent 90c0ef1 commit f44abda
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.client.Client
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.env.Environment
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.indices.InvalidIndexNameException
import org.elasticsearch.tasks.Task
import org.elasticsearch.threadpool.ThreadPool
Expand Down Expand Up @@ -67,7 +68,8 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
val remoteClient = client.getRemoteClusterClient(request.remoteCluster)
val getSettingsRequest = GetSettingsRequest().includeDefaults(false).indices(request.remoteIndex)
val settingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getSettings, injectSecurityContext = true)(getSettingsRequest)
val leaderSettings = settingsResponse.indexToSettings.get(request.remoteIndex)
val leaderSettings = settingsResponse.indexToSettings.get(request.remoteIndex) ?: throw IndexNotFoundException(request.remoteIndex)

if (leaderSettings.keySet().contains(REPLICATED_INDEX_SETTING.key) and !leaderSettings.get(REPLICATED_INDEX_SETTING.key).isNullOrBlank()) {
throw IllegalArgumentException("Cannot Replicate a Replicated Index ${request.remoteIndex}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
// After marking FailedState, IndexReplicationTask will action on it by pausing or stopping all shard
// replication tasks. This ShardReplicationTask should also thus receive the pause/stop via
// cancellation. We thus wait for waitMillis duration.
val waitMillis = Duration.ofMinutes(1).toMillis()
val waitMillis = Duration.ofMinutes(10).toMillis()
logInfo("Waiting $waitMillis millis for IndexReplicationTask to respond to failure of shard task")
delay(waitMillis)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package com.amazon.elasticsearch.replication

import com.amazon.elasticsearch.replication.MultiClusterAnnotations.ClusterConfiguration
import com.amazon.elasticsearch.replication.MultiClusterAnnotations.ClusterConfigurations
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.DocWriteResponse.Result
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.indices.CreateIndexRequest
import org.elasticsearch.common.CheckedRunnable
import org.elasticsearch.test.ESTestCase.assertBusy
import org.junit.Assert
import java.util.Locale
Expand Down Expand Up @@ -51,14 +54,40 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
follower.startReplication(StartReplicationRequest("source", leaderIndex, followerIndex), waitForRestore=true)

val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
val response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT)
var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)

assertBusy {
val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}

// Ensure force merge on leader doesn't impact replication
for (i in 2..5) {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
}
leader.indices().forcemerge(ForceMergeRequest(leaderIndex), RequestOptions.DEFAULT)
for (i in 6..10) {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
}
assertBusy {
for (i in 2..10) {
val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}

// Force merge on follower however isn't allowed due to WRITE block
Assertions.assertThatThrownBy {
follower.indices().forcemerge(ForceMergeRequest(followerIndex), RequestOptions.DEFAULT)
}.isInstanceOf(ElasticsearchStatusException::class.java)
.hasMessage("Elasticsearch exception [type=cluster_block_exception, reason=index [$followerIndex] " +
"blocked by: [FORBIDDEN/1000/index read-only(cross-cluster-replication)];]")

} finally {
follower.stopReplication(followerIndex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,25 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}
}

fun `test that replication cannot be started on leader alias directly`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).alias(Alias("leaderAlias")), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()

try {
followerClient.startReplication(StartReplicationRequest("source", "leaderAlias", followerIndexName))
fail("Expected startReplication to fail")
} catch (e: ResponseException) {
assertThat(e.response.statusLine.statusCode).isEqualTo(404)
assertThat(e.message).contains("index_not_found_exception")
assertThat(e.message).contains("no such index [leaderAlias]")
}
}

fun `test that translog settings are set on leader and not on follower`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
Expand Down

0 comments on commit f44abda

Please sign in to comment.