Skip to content

Commit

Permalink
Support for translog fetch at the leader(remote) cluster
Browse files Browse the repository at this point in the history
and enable translog pruning during replication setup
  • Loading branch information
saikaranam-amazon committed Jul 1, 2021
1 parent bcd2ae1 commit 145ec1c
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ import com.amazon.elasticsearch.replication.action.status.TransportReplicationSt
import com.amazon.elasticsearch.replication.metadata.ReplicationMetadataManager
import com.amazon.elasticsearch.replication.metadata.store.ReplicationMetadataStore
import com.amazon.elasticsearch.replication.rest.*

import com.amazon.elasticsearch.replication.seqno.RemoteClusterTranslogService

internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin {

Expand Down Expand Up @@ -159,7 +159,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,

override fun getGuiceServiceClasses(): Collection<Class<out LifecycleComponent>> {
return listOf(Injectables::class.java,
RemoteClusterRestoreLeaderService::class.java)
RemoteClusterRestoreLeaderService::class.java, RemoteClusterTranslogService::class.java)
}

override fun getActions(): List<ActionHandler<out ActionRequest, out ActionResponse>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@

package com.amazon.elasticsearch.replication.action.changes

import com.amazon.elasticsearch.replication.action.repository.GetFileChunkAction
import com.amazon.elasticsearch.replication.util.completeWith
import com.amazon.elasticsearch.replication.util.coroutineContext
import com.amazon.elasticsearch.replication.util.waitForGlobalCheckpoint
import com.amazon.elasticsearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUTOR_NAME_LEADER
import com.amazon.elasticsearch.replication.seqno.RemoteClusterTranslogService
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchTimeoutException
import org.elasticsearch.ResourceNotFoundException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction
Expand All @@ -34,6 +36,7 @@ import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.IndexSettings
import org.elasticsearch.index.shard.ShardId
import org.elasticsearch.index.translog.Translog
import org.elasticsearch.indices.IndicesService
Expand All @@ -45,7 +48,8 @@ import kotlin.math.min
class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clusterService: ClusterService,
transportService: TransportService, actionFilters: ActionFilters,
indexNameExpressionResolver: IndexNameExpressionResolver,
private val indicesService: IndicesService) :
private val indicesService: IndicesService,
private val translogService: RemoteClusterTranslogService) :
TransportSingleShardAction<GetChangesRequest, GetChangesResponse>(
GetChangesAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ::GetChangesRequest, REPLICATION_EXECUTOR_NAME_LEADER) {
Expand All @@ -56,6 +60,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus

companion object {
val WAIT_FOR_NEW_OPS_TIMEOUT = TimeValue.timeValueMinutes(1)!!
private val log = LogManager.getLogger(TransportGetChangesAction::class.java)
}

override fun shardOperation(request: GetChangesRequest, shardId: ShardId): GetChangesResponse {
Expand Down Expand Up @@ -85,19 +90,44 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus

// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo)
indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true).use { snapshot ->
val ops = ArrayList<Translog.Operation>(snapshot.totalOperations())
var op = snapshot.next()
while (op != null) {
ops.add(op)
op = snapshot.next()

var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId)
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
} catch (e: ResourceNotFoundException) {
fetchFromTranslog = false
}
}

// Translog fetch is disabled or not found
if(!fetchFromTranslog) {
log.info("Fetching changes from lucene for ${request.shardId} - from:${request.fromSeqNo}, to:$toSeqNo")
indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true).use { snapshot ->
ops = ArrayList(snapshot.totalOperations())
var op = snapshot.next()
while (op != null) {
(ops as ArrayList<Translog.Operation>).add(op)
op = snapshot.next()
}
}
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes)
}
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes)
}
}
}


private fun isTranslogPruningByRetentionLeaseEnabled(shardId: ShardId): Boolean {
val enabled = clusterService.state().metadata.indices.get(shardId.indexName)
?.settings?.getAsBoolean(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false)
if(enabled != null) {
return enabled
}
return false
}

override fun resolveIndex(request: GetChangesRequest): Boolean {
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,19 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
if(primaryShard.maxSeqNoOfUpdatesOrDeletes < request.maxSeqNoOfUpdatesOrDeletes) {
primaryShard.advanceMaxSeqNoOfUpdatesOrDeletes(request.maxSeqNoOfUpdatesOrDeletes)
}
var result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY)
var eachOp = op
if(op.opType() == Translog.Operation.Type.INDEX) {
eachOp = op as Translog.Index
// Unset autogeneretedIdTimeStamp as we are using externel ID from the leader index
eachOp = Translog.Index(eachOp.docType(), eachOp.id(), eachOp.seqNo(),
eachOp.primaryTerm(), eachOp.version(), eachOp.source().toBytesRef().bytes, eachOp.routing(), -1)
}
var result = primaryShard.applyTranslogOperation(eachOp, Engine.Operation.Origin.PRIMARY)
if (result.resultType == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
waitForMappingUpdate {
// fetch mappings from the remote cluster when applying on PRIMARY...
syncRemoteMapping(request.remoteCluster, request.remoteIndex, request.shardId()!!.indexName,
op.docType())
eachOp.docType())
}
result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.elasticsearch.common.UUIDs
import org.elasticsearch.common.component.AbstractLifecycleComponent
import org.elasticsearch.common.metrics.CounterMetric
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.IndexSettings
import org.elasticsearch.index.mapper.MapperService
import org.elasticsearch.index.shard.ShardId
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus
Expand Down Expand Up @@ -242,6 +243,10 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata
val builder = Settings.builder().put(indexMetadata.settings)
val replicatedIndex = "${repositoryMetadata.remoteClusterName()}:${index.name}"
builder.put(ReplicationPlugin.REPLICATED_INDEX_SETTING.key, replicatedIndex)

// Remove translog pruning for the follower index
builder.remove(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key)

val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder)
indexMetadata.aliases.valuesIt().forEach {
indexMdBuilder.putAlias(it)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.amazon.elasticsearch.replication.seqno

import org.apache.logging.log4j.LogManager
import org.elasticsearch.ResourceNotFoundException
import org.elasticsearch.common.component.AbstractLifecycleComponent
import org.elasticsearch.common.inject.Singleton
import org.elasticsearch.core.internal.io.IOUtils
import org.elasticsearch.index.engine.Engine
import org.elasticsearch.index.shard.IndexShard
import org.elasticsearch.index.translog.Translog
import java.io.Closeable

@Singleton
class RemoteClusterTranslogService : AbstractLifecycleComponent(){
companion object {
private val log = LogManager.getLogger(RemoteClusterTranslogService::class.java)
private const val SOURCE_NAME = "os_plugin_replication"
}

override fun doStart() {
}

override fun doStop() {
}

override fun doClose() {
}

public fun getHistoryOfOperations(indexShard: IndexShard, startSeqNo: Long, toSeqNo: Long): List<Translog.Operation> {
if(!indexShard.hasCompleteHistoryOperations(SOURCE_NAME, Engine.HistorySource.TRANSLOG, startSeqNo)) {
log.debug("Doesn't have history of operations starting from $startSeqNo")
throw ResourceNotFoundException("$indexShard doesn't contain ops starting from $startSeqNo " +
"with source ${Engine.HistorySource.TRANSLOG.name}")
}
log.trace("Fetching translog snapshot for $indexShard - from $startSeqNo to $toSeqNo")
val snapshot = indexShard.getHistoryOperations(SOURCE_NAME, Engine.HistorySource.TRANSLOG, startSeqNo, toSeqNo)

// Total ops to be fetched (both toSeqNo and startSeqNo are inclusive)
val opsSize = toSeqNo - startSeqNo + 1
val ops = ArrayList<Translog.Operation>(opsSize.toInt())

// Filter and sort specific ops from the obtained history
var filteredOpsFromTranslog = 0
snapshot.use {
var op = snapshot.next()
while(op != null) {
if(op.seqNo() in startSeqNo..toSeqNo) {
ops.add(op)
filteredOpsFromTranslog++
}
op = snapshot.next()
}
}
assert(filteredOpsFromTranslog == opsSize.toInt()) {"Missing operations while fetching from translog"}

val sortedOps = ArrayList<Translog.Operation>(opsSize.toInt())
sortedOps.addAll(ops)
for(ele in ops) {
sortedOps[(ele.seqNo() - startSeqNo).toInt()] = ele
}

log.debug("Starting seqno after sorting ${sortedOps[0].seqNo()} and ending seqno ${sortedOps[ops.size-1].seqNo()}")
return sortedOps.subList(0, ops.size.coerceAtMost((opsSize).toInt()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ import com.amazon.elasticsearch.replication.util.suspendExecute
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.IndexSettings
import org.elasticsearch.indices.recovery.RecoveryState
import kotlin.streams.toList

Expand Down Expand Up @@ -117,7 +119,7 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
log.debug("Resuming tasks now.")
InitFollowState
} else {
startRestore()
setupAndStartRestore()
}
}
ReplicationState.RESTORING -> {
Expand Down Expand Up @@ -253,7 +255,16 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
client.suspending(client.admin().indices()::delete, defaultContext = true)(DeleteIndexRequest(followerIndexName))
}

private suspend fun startRestore(): IndexReplicationState {
private suspend fun setupAndStartRestore(): IndexReplicationState {
// Enable translog based fetch on the leader(remote) cluster
val remoteClient = client.getRemoteClusterClient(remoteCluster)
val settingsBuilder = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, true)
val updateSettingsRequest = remoteClient.admin().indices().prepareUpdateSettings().setSettings(settingsBuilder).setIndices(remoteIndex.name).request()
val updateResponse = remoteClient.suspending(remoteClient.admin().indices()::updateSettings, injectSecurityContext = true)(updateSettingsRequest)
if(!updateResponse.isAcknowledged) {
log.error("Unable to update setting for translog pruning based on retention lease")
}

val restoreRequest = client.admin().cluster()
.prepareRestoreSnapshot(RemoteClusterRepository.repoForCluster(remoteCluster), REMOTE_SNAPSHOT_NAME)
.setIndices(remoteIndex.name)
Expand Down
Loading

0 comments on commit 145ec1c

Please sign in to comment.