From 145ec1c28343b68d5401a724d6709ce374355d43 Mon Sep 17 00:00:00 2001 From: Sai Date: Mon, 28 Jun 2021 10:02:51 +0530 Subject: [PATCH] Support for translog fetch at the leader(remote) cluster and enable translog pruning during replication setup --- .../replication/ReplicationPlugin.kt | 4 +- .../changes/TransportGetChangesAction.kt | 48 ++++++++-- .../replay/TransportReplayChangesAction.kt | 11 ++- .../repository/RemoteClusterRepository.kt | 5 + .../seqno/RemoteClusterTranslogService.kt | 65 +++++++++++++ .../task/index/IndexReplicationTask.kt | 15 ++- .../integ/rest/StartReplicationIT.kt | 93 ++++++++++++++++--- 7 files changed, 211 insertions(+), 30 deletions(-) create mode 100644 src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterTranslogService.kt diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt index f7aa97e8..4d8e8f22 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/ReplicationPlugin.kt @@ -122,7 +122,7 @@ import com.amazon.elasticsearch.replication.action.status.TransportReplicationSt import com.amazon.elasticsearch.replication.metadata.ReplicationMetadataManager import com.amazon.elasticsearch.replication.metadata.store.ReplicationMetadataStore import com.amazon.elasticsearch.replication.rest.* - +import com.amazon.elasticsearch.replication.seqno.RemoteClusterTranslogService internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin { @@ -159,7 +159,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getGuiceServiceClasses(): Collection> { return listOf(Injectables::class.java, - RemoteClusterRestoreLeaderService::class.java) + RemoteClusterRestoreLeaderService::class.java, RemoteClusterTranslogService::class.java) } override fun getActions(): List> { diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/TransportGetChangesAction.kt index efcbc981..df4c9e4c 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/TransportGetChangesAction.kt @@ -15,14 +15,16 @@ package com.amazon.elasticsearch.replication.action.changes -import com.amazon.elasticsearch.replication.action.repository.GetFileChunkAction import com.amazon.elasticsearch.replication.util.completeWith import com.amazon.elasticsearch.replication.util.coroutineContext import com.amazon.elasticsearch.replication.util.waitForGlobalCheckpoint import com.amazon.elasticsearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUTOR_NAME_LEADER +import com.amazon.elasticsearch.replication.seqno.RemoteClusterTranslogService import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager import org.elasticsearch.ElasticsearchTimeoutException +import org.elasticsearch.ResourceNotFoundException import org.elasticsearch.action.ActionListener import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.single.shard.TransportSingleShardAction @@ -34,6 +36,7 @@ import org.elasticsearch.common.inject.Inject import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.io.stream.Writeable import org.elasticsearch.common.unit.TimeValue +import org.elasticsearch.index.IndexSettings import org.elasticsearch.index.shard.ShardId import org.elasticsearch.index.translog.Translog import org.elasticsearch.indices.IndicesService @@ -45,7 +48,8 @@ import kotlin.math.min class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clusterService: ClusterService, transportService: TransportService, actionFilters: ActionFilters, indexNameExpressionResolver: IndexNameExpressionResolver, - private val indicesService: IndicesService) : + private val indicesService: IndicesService, + private val translogService: RemoteClusterTranslogService) : TransportSingleShardAction( GetChangesAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, ::GetChangesRequest, REPLICATION_EXECUTOR_NAME_LEADER) { @@ -56,6 +60,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus companion object { val WAIT_FOR_NEW_OPS_TIMEOUT = TimeValue.timeValueMinutes(1)!! + private val log = LogManager.getLogger(TransportGetChangesAction::class.java) } override fun shardOperation(request: GetChangesRequest, shardId: ShardId): GetChangesResponse { @@ -85,19 +90,44 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus // At this point lastSyncedGlobalCheckpoint is at least fromSeqNo val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo) - indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true).use { snapshot -> - val ops = ArrayList(snapshot.totalOperations()) - var op = snapshot.next() - while (op != null) { - ops.add(op) - op = snapshot.next() + + var ops: List = listOf() + var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) + if(fetchFromTranslog) { + try { + ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo) + } catch (e: ResourceNotFoundException) { + fetchFromTranslog = false + } + } + + // Translog fetch is disabled or not found + if(!fetchFromTranslog) { + log.info("Fetching changes from lucene for ${request.shardId} - from:${request.fromSeqNo}, to:$toSeqNo") + indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true).use { snapshot -> + ops = ArrayList(snapshot.totalOperations()) + var op = snapshot.next() + while (op != null) { + (ops as ArrayList).add(op) + op = snapshot.next() + } } - GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes) } + GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes) } } } + + private fun isTranslogPruningByRetentionLeaseEnabled(shardId: ShardId): Boolean { + val enabled = clusterService.state().metadata.indices.get(shardId.indexName) + ?.settings?.getAsBoolean(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false) + if(enabled != null) { + return enabled + } + return false + } + override fun resolveIndex(request: GetChangesRequest): Boolean { return true } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/replay/TransportReplayChangesAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/replay/TransportReplayChangesAction.kt index 5f88da95..0cbd2c1a 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/replay/TransportReplayChangesAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/replay/TransportReplayChangesAction.kt @@ -125,12 +125,19 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans if(primaryShard.maxSeqNoOfUpdatesOrDeletes < request.maxSeqNoOfUpdatesOrDeletes) { primaryShard.advanceMaxSeqNoOfUpdatesOrDeletes(request.maxSeqNoOfUpdatesOrDeletes) } - var result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY) + var eachOp = op + if(op.opType() == Translog.Operation.Type.INDEX) { + eachOp = op as Translog.Index + // Unset autogeneretedIdTimeStamp as we are using externel ID from the leader index + eachOp = Translog.Index(eachOp.docType(), eachOp.id(), eachOp.seqNo(), + eachOp.primaryTerm(), eachOp.version(), eachOp.source().toBytesRef().bytes, eachOp.routing(), -1) + } + var result = primaryShard.applyTranslogOperation(eachOp, Engine.Operation.Origin.PRIMARY) if (result.resultType == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { waitForMappingUpdate { // fetch mappings from the remote cluster when applying on PRIMARY... syncRemoteMapping(request.remoteCluster, request.remoteIndex, request.shardId()!!.indexName, - op.docType()) + eachOp.docType()) } result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY) } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/repository/RemoteClusterRepository.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/repository/RemoteClusterRepository.kt index db33a2e2..61699610 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/repository/RemoteClusterRepository.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/repository/RemoteClusterRepository.kt @@ -53,6 +53,7 @@ import org.elasticsearch.common.UUIDs import org.elasticsearch.common.component.AbstractLifecycleComponent import org.elasticsearch.common.metrics.CounterMetric import org.elasticsearch.common.settings.Settings +import org.elasticsearch.index.IndexSettings import org.elasticsearch.index.mapper.MapperService import org.elasticsearch.index.shard.ShardId import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus @@ -242,6 +243,10 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata val builder = Settings.builder().put(indexMetadata.settings) val replicatedIndex = "${repositoryMetadata.remoteClusterName()}:${index.name}" builder.put(ReplicationPlugin.REPLICATED_INDEX_SETTING.key, replicatedIndex) + + // Remove translog pruning for the follower index + builder.remove(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) + val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder) indexMetadata.aliases.valuesIt().forEach { indexMdBuilder.putAlias(it) diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterTranslogService.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterTranslogService.kt new file mode 100644 index 00000000..8896a9e5 --- /dev/null +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterTranslogService.kt @@ -0,0 +1,65 @@ +package com.amazon.elasticsearch.replication.seqno + +import org.apache.logging.log4j.LogManager +import org.elasticsearch.ResourceNotFoundException +import org.elasticsearch.common.component.AbstractLifecycleComponent +import org.elasticsearch.common.inject.Singleton +import org.elasticsearch.core.internal.io.IOUtils +import org.elasticsearch.index.engine.Engine +import org.elasticsearch.index.shard.IndexShard +import org.elasticsearch.index.translog.Translog +import java.io.Closeable + +@Singleton +class RemoteClusterTranslogService : AbstractLifecycleComponent(){ + companion object { + private val log = LogManager.getLogger(RemoteClusterTranslogService::class.java) + private const val SOURCE_NAME = "os_plugin_replication" + } + + override fun doStart() { + } + + override fun doStop() { + } + + override fun doClose() { + } + + public fun getHistoryOfOperations(indexShard: IndexShard, startSeqNo: Long, toSeqNo: Long): List { + if(!indexShard.hasCompleteHistoryOperations(SOURCE_NAME, Engine.HistorySource.TRANSLOG, startSeqNo)) { + log.debug("Doesn't have history of operations starting from $startSeqNo") + throw ResourceNotFoundException("$indexShard doesn't contain ops starting from $startSeqNo " + + "with source ${Engine.HistorySource.TRANSLOG.name}") + } + log.trace("Fetching translog snapshot for $indexShard - from $startSeqNo to $toSeqNo") + val snapshot = indexShard.getHistoryOperations(SOURCE_NAME, Engine.HistorySource.TRANSLOG, startSeqNo, toSeqNo) + + // Total ops to be fetched (both toSeqNo and startSeqNo are inclusive) + val opsSize = toSeqNo - startSeqNo + 1 + val ops = ArrayList(opsSize.toInt()) + + // Filter and sort specific ops from the obtained history + var filteredOpsFromTranslog = 0 + snapshot.use { + var op = snapshot.next() + while(op != null) { + if(op.seqNo() in startSeqNo..toSeqNo) { + ops.add(op) + filteredOpsFromTranslog++ + } + op = snapshot.next() + } + } + assert(filteredOpsFromTranslog == opsSize.toInt()) {"Missing operations while fetching from translog"} + + val sortedOps = ArrayList(opsSize.toInt()) + sortedOps.addAll(ops) + for(ele in ops) { + sortedOps[(ele.seqNo() - startSeqNo).toInt()] = ele + } + + log.debug("Starting seqno after sorting ${sortedOps[0].seqNo()} and ending seqno ${sortedOps[ops.size-1].seqNo()}") + return sortedOps.subList(0, ops.size.coerceAtMost((opsSize).toInt())) + } +} diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationTask.kt index 8bdaffc7..529566da 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/index/IndexReplicationTask.kt @@ -74,6 +74,8 @@ import com.amazon.elasticsearch.replication.util.suspendExecute import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.index.IndexSettings import org.elasticsearch.indices.recovery.RecoveryState import kotlin.streams.toList @@ -117,7 +119,7 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: log.debug("Resuming tasks now.") InitFollowState } else { - startRestore() + setupAndStartRestore() } } ReplicationState.RESTORING -> { @@ -253,7 +255,16 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: client.suspending(client.admin().indices()::delete, defaultContext = true)(DeleteIndexRequest(followerIndexName)) } - private suspend fun startRestore(): IndexReplicationState { + private suspend fun setupAndStartRestore(): IndexReplicationState { + // Enable translog based fetch on the leader(remote) cluster + val remoteClient = client.getRemoteClusterClient(remoteCluster) + val settingsBuilder = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, true) + val updateSettingsRequest = remoteClient.admin().indices().prepareUpdateSettings().setSettings(settingsBuilder).setIndices(remoteIndex.name).request() + val updateResponse = remoteClient.suspending(remoteClient.admin().indices()::updateSettings, injectSecurityContext = true)(updateSettingsRequest) + if(!updateResponse.isAcknowledged) { + log.error("Unable to update setting for translog pruning based on retention lease") + } + val restoreRequest = client.admin().cluster() .prepareRestoreSnapshot(RemoteClusterRepository.repoForCluster(remoteCluster), REMOTE_SNAPSHOT_NAME) .setIndices(remoteIndex.name) diff --git a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt index 5d6cd0bb..4d82ddcd 100644 --- a/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt @@ -29,6 +29,9 @@ import org.assertj.core.api.Assertions.assertThatThrownBy import org.elasticsearch.action.admin.indices.alias.Alias import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest +import org.elasticsearch.action.get.GetRequest +import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Request import org.elasticsearch.client.RequestOptions import org.elasticsearch.client.ResponseException @@ -37,6 +40,7 @@ import org.elasticsearch.client.indices.GetIndexRequest import org.elasticsearch.client.indices.GetMappingsRequest import org.elasticsearch.cluster.metadata.IndexMetadata import org.elasticsearch.common.settings.Settings +import org.elasticsearch.index.IndexSettings import org.elasticsearch.test.ESTestCase.assertBusy import org.junit.Assert @@ -137,21 +141,20 @@ class StartReplicationIT: MultiClusterRestTestCase() { assertThat(createIndexResponse.isAcknowledged).isTrue() try { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) - assertBusy { - assertThat(followerClient.indices() - .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) - .isEqualTo(true) - } - Assert.assertEquals( - leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) - .mappings()[leaderIndexName], - followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) - .mappings()[followerIndexName] - ) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) } finally { followerClient.stopReplication(followerIndexName) } - } fun `test that index settings are getting replicated`() { @@ -179,8 +182,8 @@ class StartReplicationIT: MultiClusterRestTestCase() { Assert.assertEquals( "0", followerClient.indices() - .getSettings(getSettingsRequest, RequestOptions.DEFAULT) - .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] + .getSettings(getSettingsRequest, RequestOptions.DEFAULT) + .indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS] ) } finally { followerClient.stopReplication(followerIndexName) @@ -213,6 +216,66 @@ class StartReplicationIT: MultiClusterRestTestCase() { } } + fun `test that translog settings are set on leader and not on follower`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + assertThat(followerClient.indices() + .getSettings(GetSettingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .getSetting(followerIndexName, IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) + .isNullOrEmpty()) + } + + assertThat(leaderClient.indices() + .getSettings(GetSettingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .getSetting(leaderIndexName, IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) == "true") + + } finally { + followerClient.stopReplication(followerIndexName) + } + } + + fun `test that replication continues after removing translog settings based on retention lease`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + try { + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + // Turn-off the settings and index doc + val settingsBuilder = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false) + val settingsUpdateResponse = leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName) + .settings(settingsBuilder.build()), RequestOptions.DEFAULT) + Assert.assertEquals(settingsUpdateResponse.isAcknowledged, true) + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT) + assertBusy { + followerClient.get(GetRequest(followerIndexName).id("2"), RequestOptions.DEFAULT).isExists + } + } finally { + followerClient.stopReplication(followerIndexName) + } + } + private fun addClusterMetadataBlock(clusterName: String, blockValue: String) { val cluster = getNamedCluster(clusterName) val persistentConnectionRequest = Request("PUT", "_cluster/settings") @@ -231,4 +294,4 @@ class StartReplicationIT: MultiClusterRestTestCase() { val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest) assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong()) } -} \ No newline at end of file +}