Skip to content

Commit

Permalink
[Backport 1.3] Handling OpenSearchRejectExecuteException Exception (#…
Browse files Browse the repository at this point in the history
…1004) (#1026)

* Handling OpenSearchRejectExecuteException Exception (#1004)

* Handling OpenSearchRejectExecuteException Exception
* introduced writersPerShard setting.

Signed-off-by: sricharanvuppu <[email protected]>
(cherry picked from commit 448e7a7)
Signed-off-by: sricharanvuppu <[email protected]>

* correcting OpenSearchRejectedExecutionException import

Signed-off-by: sricharanvuppu <[email protected]>

* ratelimiter acquiring after getting unAppliedChanges in TranslogSequncer

Signed-off-by: sricharanvuppu <[email protected]>

---------

Signed-off-by: sricharanvuppu <[email protected]>
Signed-off-by: sricharanvuppu <[email protected]>
  • Loading branch information
sricharanvuppu authored Jul 10, 2023
1 parent ee5a9fa commit 6a6f3ca
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 40 deletions.
14 changes: 10 additions & 4 deletions src/main/kotlin/org/opensearch/replication/ReplicationException.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@
package org.opensearch.replication

import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ShardOperationFailedException
import org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE
import org.opensearch.index.shard.ShardId
import org.opensearch.rest.RestStatus

/**
* Base class replication exceptions. Note: Replication process may throw exceptions that do not derive from this such as
* [org.opensearch.ResourceAlreadyExistsException], [org.opensearch.index.IndexNotFoundException] or
* [org.opensearch.index.shard.ShardNotFoundException].
*/
class ReplicationException: OpenSearchException {
class ReplicationException: OpenSearchStatusException {

constructor(message: String, vararg args: Any) : super(message, *args)
constructor(message: String, status : RestStatus, cause: Throwable, vararg args: Any) : super(message, status, cause, *args)

constructor(message: String, cause: Throwable, vararg args: Any) : super(message, cause, *args)
constructor(message: String, vararg args: Any) : super(message, RestStatus.INTERNAL_SERVER_ERROR, *args)

constructor(message: String, shardFailures: Array<ShardOperationFailedException>) : super(message) {
constructor(message: String, status: RestStatus, vararg args: Any) : super(message, status, *args)

constructor(cause: Throwable, status: RestStatus, vararg args: Any) : super(cause.message, status, *args)

constructor(message: String, shardFailures: Array<ShardOperationFailedException>): super(message, shardFailures.firstOrNull()?.status()?:RestStatus.INTERNAL_SERVER_ERROR) {
shardFailures.firstOrNull()?.let {
setShard(ShardId(it.index(), INDEX_UUID_NA_VALUE, it.shardId()))
// Add first failure as cause and rest as suppressed...
Expand Down
16 changes: 9 additions & 7 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_readers_per_shard", 2, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_writers_per_shard", 2, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_PARALLEL_READ_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.follower.poll_interval", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(1),
TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.autofollow.fetch_poll_interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30),
Expand Down Expand Up @@ -345,14 +347,14 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,

override fun getSettings(): List<Setting<*>> {
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE,
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL,
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL,
REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING,
REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE)
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL,
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL,
REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING,
REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE,
REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD)
}

override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
clusterService: ClusterService, recoverySettings: RecoverySettings): Map<String, Repository.Factory> {
val repoFactory = Repository.Factory { repoMetadata: RepositoryMetadata ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ open class ReplicationSettings(clusterService: ClusterService) {
@Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings)
@Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings)
@Volatile var readersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD)
@Volatile var writersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD)
@Volatile var batchSize = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE)
@Volatile var pollDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL)
@Volatile var autofollowFetchPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL)
Expand All @@ -41,6 +42,7 @@ open class ReplicationSettings(clusterService: ClusterService) {
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE) { value: ByteSizeValue -> this.chunkSize = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS) { value: Int -> this.concurrentFileChunks = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) { value: Int -> this.readersPerShard = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) { value: Int -> this.writersPerShard = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) { batchSize = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) { pollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION) { leaseRenewalMaxFailureDuration = it }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
// Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job.
val rateLimiter = Semaphore(replicationSettings.readersPerShard)
val sequencer = TranslogSequencer(scope, replicationMetadata, followerShardId, leaderAlias, leaderShardId.indexName,
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats, replicationSettings.writersPerShard)

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
Expand Down Expand Up @@ -255,7 +255,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1)
logInfo("Unable to get changes from seqNo: $fromSeqNo. ${e.stackTraceToString()}")
changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1)

// Propagate 4xx exceptions up the chain and halt replication as they are irrecoverable
val range4xx = 400.rangeTo(499)
if (e is OpenSearchException &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@ import org.opensearch.replication.util.suspendExecuteWithRetries
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import org.opensearch.client.Client
import org.opensearch.OpenSearchException
import org.opensearch.action.support.TransportActions
import org.opensearch.common.logging.Loggers
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import org.opensearch.rest.RestStatus


/**
* A TranslogSequencer allows multiple producers of [Translog.Operation]s to write them in sequence number order to an
Expand All @@ -50,7 +55,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
private val followerShardId: ShardId,
private val leaderAlias: String, private val leaderIndexName: String,
private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long,
private val followerClusterStats: FollowerClusterStats) {
private val followerClusterStats: FollowerClusterStats, writersPerShard : Int) {

private val unAppliedChanges = ConcurrentHashMap<Long, GetChangesResponse>()
private val log = Loggers.getLogger(javaClass, followerShardId)!!
Expand All @@ -59,40 +64,72 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
private val sequencer = scope.actor<Unit>(capacity = 0) {

// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
// raise the same exception. See [SendChannel.close] method for details.
val rateLimiter = Semaphore(writersPerShard)
var highWatermark = initialSeqNo
for (m in channel) {
while (unAppliedChanges.containsKey(highWatermark + 1)) {
val next = unAppliedChanges.remove(highWatermark + 1)!!
val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes,
leaderAlias, leaderIndexName)
replayRequest.parentTask = parentTaskId
rateLimiter.acquire()
launch {
var relativeStartNanos = System.nanoTime()
val retryOnExceptions = ArrayList<Class<*>>()
retryOnExceptions.add(MappingNotAvailableException::class.java)
var tryReplay = true
try {
while (tryReplay) {
tryReplay = false
try {
val replayResponse = client.suspendExecuteWithRetries(
replicationMetadata,
ReplayChangesAction.INSTANCE,
replayRequest,
log = log,
retryOn = retryOnExceptions
)
if (replayResponse.shardInfo.failed > 0) {
replayResponse.shardInfo.failures.forEachIndexed { i, failure ->
log.error("Failed replaying changes. Failure:$i:$failure}")
}
followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(
replayResponse.shardInfo.failed.toLong()
)
throw ReplicationException(
"failed to replay changes",
replayResponse.shardInfo.failures
)
}

val replayResponse = client.suspendExecuteWithRetries(
replicationMetadata,
ReplayChangesAction.INSTANCE,
replayRequest,
log = log,
retryOn = retryOnExceptions
)
if (replayResponse.shardInfo.failed > 0) {
replayResponse.shardInfo.failures.forEachIndexed { i, failure ->
log.error("Failed replaying changes. Failure:$i:$failure}")
val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(
TimeUnit.NANOSECONDS.toMillis(tookInNanos)
)
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(
replayRequest.changes.size.toLong()
)
} catch (e: OpenSearchException) {
if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass)
|| TransportActions.isShardNotAvailableException(e)
// This waits for the dependencies to load and retry. Helps during boot-up
|| e.status().status >= 500
|| e.status() == RestStatus.TOO_MANY_REQUESTS)) {
tryReplay = true
}
else {
log.error("Got non-retriable Exception:${e.message} with status:${e.status()}")
throw e
}
}
}
followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(replayResponse.shardInfo.failed.toLong())
throw ReplicationException("failed to replay changes", replayResponse.shardInfo.failures)
} finally {
rateLimiter.release()
}

val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
}
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
}
Expand All @@ -105,6 +142,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
completed.await()
}


suspend fun send(changes : GetChangesResponse) {
unAppliedChanges[changes.fromSeqNo] = changes
sequencer.send(Unit)
Expand Down
25 changes: 19 additions & 6 deletions src/main/kotlin/org/opensearch/replication/util/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.action.index.IndexRequestBuilder
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.TransportActions
import org.opensearch.client.Client
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.shard.ShardId
Expand All @@ -43,6 +44,7 @@ import org.opensearch.transport.NodeDisconnectedException
import org.opensearch.transport.NodeNotConnectedException
import java.io.PrintWriter
import java.io.StringWriter
import java.lang.Exception

/*
* Extension function to use the store object
Expand Down Expand Up @@ -110,7 +112,8 @@ suspend fun <Req: ActionRequest, Resp: ActionResponse> Client.suspendExecuteWith
defaultContext: Boolean = false): Resp {
var currentBackoff = backoff
retryOn.addAll(defaultRetryableExceptions())
repeat(numberOfRetries - 1) {
var retryException: Exception
repeat(numberOfRetries - 1) { index ->
try {
return suspendExecute(replicationMetadata, action, req,
injectSecurityContext = injectSecurityContext, defaultContext = defaultContext)
Expand All @@ -122,19 +125,29 @@ suspend fun <Req: ActionRequest, Resp: ActionResponse> Client.suspendExecuteWith
// This waits for the dependencies to load and retry. Helps during boot-up
|| e.status().status >= 500
|| e.status() == RestStatus.TOO_MANY_REQUESTS)) {
log.warn("Encountered a failure while executing in $req. Retrying in ${currentBackoff/1000} seconds" +
".", e)
delay(currentBackoff)
currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut)
retryException = e;
} else {
throw e
}
} catch (e: OpenSearchRejectedExecutionException) {
if(index < numberOfRetries-2) {
retryException = e;
}
else {
throw ReplicationException(e, RestStatus.TOO_MANY_REQUESTS)
}
}
log.warn(
"Encountered a failure while executing in $req. Retrying in ${currentBackoff / 1000} seconds" +
".", retryException
)
delay(currentBackoff)
currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut)

}
return suspendExecute(replicationMetadata, action, req,
injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) // last attempt
}

/**
* Restore shard from leader cluster with retries.
* Only specified error are retried
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TranslogSequencerTests : OpenSearchTestCase() {
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats)
client, startSeqNo, stats, 2)

// Send requests out of order (shuffled seqNo) and await for them to be processed.
var batchSeqNo = startSeqNo
Expand Down

0 comments on commit 6a6f3ca

Please sign in to comment.