From b75b6a34bc5ab6a1a844be1ca574a77998d1c3fb Mon Sep 17 00:00:00 2001 From: Sai Kumar Date: Sat, 30 Oct 2021 01:36:55 +0530 Subject: [PATCH] BugFix: Changes to ensure replication tasks are not failing prematurely Signed-off-by: Sai Kumar --- .../metadata/ReplicationMetadataManager.kt | 77 +++++++--- .../store/ReplicationMetadataStore.kt | 26 +++- .../task/CrossClusterReplicationTask.kt | 38 +++-- .../task/autofollow/AutoFollowTask.kt | 24 +++- .../task/index/IndexReplicationTask.kt | 136 ++++++++++-------- .../task/shard/ShardReplicationTask.kt | 4 + .../opensearch/replication/util/Coroutines.kt | 10 ++ .../opensearch/replication/util/Extensions.kt | 19 ++- .../replication/ReplicationHelpers.kt | 2 +- 9 files changed, 238 insertions(+), 98 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/metadata/ReplicationMetadataManager.kt b/src/main/kotlin/org/opensearch/replication/metadata/ReplicationMetadataManager.kt index 804951c6..0d68acec 100644 --- a/src/main/kotlin/org/opensearch/replication/metadata/ReplicationMetadataManager.kt +++ b/src/main/kotlin/org/opensearch/replication/metadata/ReplicationMetadataManager.kt @@ -21,6 +21,9 @@ import org.opensearch.replication.util.overrideFgacRole import org.opensearch.replication.util.suspendExecute import org.opensearch.commons.authuser.User import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import org.opensearch.OpenSearchException +import org.opensearch.ResourceNotFoundException import org.opensearch.action.DocWriteResponse import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService @@ -67,12 +70,14 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl } private suspend fun addMetadata(metadataReq: AddReplicationMetadataRequest) { - val response = replicaionMetadataStore.addMetadata(metadataReq) - if(response.result != DocWriteResponse.Result.CREATED && - response.result != DocWriteResponse.Result.UPDATED) { - log.error("Encountered error with result - ${response.result}, while adding metadata") - throw org.opensearch.replication.ReplicationException("Error adding replication metadata") - } + executeAndWrapExceptionIfAny({ + val response = replicaionMetadataStore.addMetadata(metadataReq) + if(response.result != DocWriteResponse.Result.CREATED && + response.result != DocWriteResponse.Result.UPDATED) { + log.error("Encountered error with result - ${response.result}, while adding metadata") + throw ReplicationException("Error adding replication metadata") + } + }, log, "Error adding replication metadata") } suspend fun updateIndexReplicationState(followerIndex: String, @@ -100,12 +105,14 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl } private suspend fun updateMetadata(updateReq: UpdateReplicationMetadataRequest) { - val response = replicaionMetadataStore.updateMetadata(updateReq) - if(response.result != DocWriteResponse.Result.CREATED && - response.result != DocWriteResponse.Result.UPDATED) { - log.error("Encountered error with result - ${response.result}, while updating metadata") - throw org.opensearch.replication.ReplicationException("Error updating replication metadata") - } + executeAndWrapExceptionIfAny({ + val response = replicaionMetadataStore.updateMetadata(updateReq) + if(response.result != DocWriteResponse.Result.CREATED && + response.result != DocWriteResponse.Result.UPDATED) { + log.error("Encountered error with result - ${response.result}, while updating metadata") + throw org.opensearch.replication.ReplicationException("Error updating replication metadata") + } + }, log, "Error updating replication metadata") } suspend fun updateSettings(followerIndex: String, settings: Settings) { @@ -129,16 +136,20 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl } private suspend fun deleteMetadata(deleteReq: DeleteReplicationMetadataRequest) { - val delRes = replicaionMetadataStore.deleteMetadata(deleteReq) - if(delRes.result != DocWriteResponse.Result.DELETED && delRes.result != DocWriteResponse.Result.NOT_FOUND) { - log.error("Encountered error with result - ${delRes.result}, while deleting metadata") - throw org.opensearch.replication.ReplicationException("Error deleting replication metadata") - } + executeAndWrapExceptionIfAny({ + val delRes = replicaionMetadataStore.deleteMetadata(deleteReq) + if(delRes.result != DocWriteResponse.Result.DELETED && delRes.result != DocWriteResponse.Result.NOT_FOUND) { + log.error("Encountered error with result - ${delRes.result}, while deleting metadata") + throw ReplicationException("Error deleting replication metadata") + } + }, log, "Error deleting replication metadata") } suspend fun getIndexReplicationMetadata(followerIndex: String, fetch_from_primary: Boolean = false): ReplicationMetadata { val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.INDEX.name, null, followerIndex) - return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata + return executeAndWrapExceptionIfAny({ + replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata + }, log, "Failed to fetch replication metadata") as ReplicationMetadata } fun getIndexReplicationMetadata(followerIndex: String, @@ -146,14 +157,24 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl fetch_from_primary: Boolean = false, timeout: Long = RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC): ReplicationMetadata { val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.INDEX.name, connectionName, followerIndex) - return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary, timeout).replicationMetadata + try { + return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary, timeout).replicationMetadata + } catch (e: ResourceNotFoundException) { + log.error("Encountered exception - ", e) + throw e + } catch (e: Exception) { + log.error("Failed to fetch replication metadata", e) + throw ReplicationException("Failed to fetch replication metadata") + } } suspend fun getAutofollowMetadata(patternName: String, connectionName: String, fetch_from_primary: Boolean = false): ReplicationMetadata { val getReq = GetReplicationMetadataRequest(ReplicationStoreMetadataType.AUTO_FOLLOW.name, connectionName, patternName) - return replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata + return executeAndWrapExceptionIfAny({ + replicaionMetadataStore.getMetadata(getReq, fetch_from_primary).replicationMetadata + }, log, "Failed to fetch replication metadata") as ReplicationMetadata } private suspend fun updateReplicationState(indexName: String, overallState: ReplicationOverallState) { @@ -165,6 +186,20 @@ open class ReplicationMetadataManager constructor(private val clusterService: Cl } val updateReplicationStateDetailsRequest = UpdateReplicationStateDetailsRequest(indexName, replicationStateParamMap, updateType) - client.suspendExecute(UpdateReplicationStateAction.INSTANCE, updateReplicationStateDetailsRequest, defaultContext = true) + executeAndWrapExceptionIfAny({ + client.suspendExecute(UpdateReplicationStateAction.INSTANCE, updateReplicationStateDetailsRequest, defaultContext = true) + }, log, "Error updating replicaiton metadata") + } + + private suspend fun executeAndWrapExceptionIfAny(tryBlock: suspend () -> Any?, log: Logger, errorMessage: String): Any? { + try { + return tryBlock() + } catch (e: OpenSearchException) { + log.error("Encountered exception - ", e) + throw e + } catch (e: Exception) { + log.error("Encountered exception - ", e) + throw ReplicationException(errorMessage) + } } } diff --git a/src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt b/src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt index 5ac1edda..458e90e1 100644 --- a/src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt +++ b/src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt @@ -17,6 +17,8 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.ResourceAlreadyExistsException import org.opensearch.ResourceNotFoundException +import org.opensearch.action.admin.cluster.health.ClusterHealthAction +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest @@ -25,12 +27,14 @@ import org.opensearch.action.delete.DeleteResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.index.IndexResponse import org.opensearch.client.Client +import org.opensearch.cluster.health.ClusterHealthStatus import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.component.AbstractLifecycleComponent import org.opensearch.common.settings.Settings import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.common.xcontent.* +import org.opensearch.replication.util.suspendExecuteWithRetries class ReplicationMetadataStore constructor(val client: Client, val clusterService: ClusterService, val namedXContentRegistry: NamedXContentRegistry): AbstractLifecycleComponent() { @@ -89,6 +93,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic } } + checkAndWaitForStoreHealth() checkAndUpdateMapping() val id = getId(addReq.replicationMetadata.metadataType, addReq.replicationMetadata.connectionName, @@ -125,11 +130,13 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic throw ResourceNotFoundException("Metadata for $id doesn't exist") } + checkAndWaitForStoreHealth() + val getReq = GetRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id) getReq.realtime(true) getReq.refresh(true) if(fetch_from_primary) { - val preference = getPreferenceOnPrimaryNode() ?: throw throw IllegalStateException("Primary shard to fetch id[$id] in index[$REPLICATION_CONFIG_SYSTEM_INDEX] doesn't exist") + val preference = getPreferenceOnPrimaryNode() ?: throw IllegalStateException("Primary shard to fetch id[$id] in index[$REPLICATION_CONFIG_SYSTEM_INDEX] doesn't exist") getReq.preference(preference) } @@ -151,6 +158,10 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic throw ResourceNotFoundException("Metadata for $id doesn't exist") } + val clusterHealthReq = ClusterHealthRequest(REPLICATION_CONFIG_SYSTEM_INDEX).waitForYellowStatus() + val clusterHealthRes = client.admin().cluster().health(clusterHealthReq).actionGet(timeout) + assert(clusterHealthRes.status <= ClusterHealthStatus.YELLOW) { "Replication metadata store is unhealthy" } + val getReq = GetRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id) getReq.realtime(true) getReq.refresh(true) @@ -200,6 +211,8 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic throw ResourceNotFoundException("Metadata for $id doesn't exist") } + checkAndWaitForStoreHealth() + val delReq = DeleteRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id) return client.suspending(client::delete, defaultContext = true)(delReq) } @@ -210,6 +223,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic if(!configStoreExists()) { throw ResourceNotFoundException("Metadata for $id doesn't exist") } + checkAndWaitForStoreHealth() checkAndUpdateMapping() val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX, MAPPING_TYPE, id) @@ -256,4 +270,14 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic override fun doClose() { } + + private suspend fun checkAndWaitForStoreHealth() { + if(!configStoreExists()) { + return + } + val clusterHealthReq = ClusterHealthRequest(REPLICATION_CONFIG_SYSTEM_INDEX).waitForYellowStatus() + // This should ensure that security plugin and shards are active during boot-up before triggering the requests + val clusterHealthRes = client.suspendExecuteWithRetries(null, ClusterHealthAction.INSTANCE, + clusterHealthReq, log=log, defaultContext = true) + } } diff --git a/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt index 45a2d589..7c5de728 100644 --- a/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt @@ -22,11 +22,13 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull import org.apache.logging.log4j.Logger +import org.opensearch.OpenSearchException import org.opensearch.action.ActionListener import org.opensearch.action.ActionResponse import org.opensearch.client.Client @@ -44,6 +46,7 @@ import org.opensearch.persistent.AllocatedPersistentTask import org.opensearch.persistent.PersistentTaskState import org.opensearch.persistent.PersistentTasksService import org.opensearch.replication.util.stackTraceToString +import org.opensearch.rest.RestStatus import org.opensearch.tasks.TaskId import org.opensearch.tasks.TaskManager import org.opensearch.threadpool.ThreadPool @@ -65,6 +68,10 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin protected lateinit var replicationMetadata: ReplicationMetadata @Volatile private lateinit var taskManager: TaskManager + companion object { + const val DEFAULT_WAIT_ON_ERRORS = 60000L + } + override fun init(persistentTasksService: PersistentTasksService, taskManager: TaskManager, persistentTaskId: String, allocationId: Long) { super.init(persistentTasksService, taskManager, persistentTaskId, allocationId) @@ -85,7 +92,7 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin var exception : Throwable? = null try { registerCloseListeners() - setReplicationMetadata() + waitAndSetReplicationMetadata() execute(this, initialState) markAsCompleted() } catch (e: Exception) { @@ -190,22 +197,33 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin } } + private suspend fun waitAndSetReplicationMetadata() { + if (this::replicationMetadata.isInitialized) { + return + } else { + while(overallTaskScope.isActive) { + try { + setReplicationMetadata() + return + } catch (e: OpenSearchException) { + if(e.status().status < 500 && e.status() != RestStatus.TOO_MANY_REQUESTS) { + throw e + } + log.error("Failed to fetch replication metadata due to ", e) + delay(DEFAULT_WAIT_ON_ERRORS) + } + } + } + } + /** * Sets the security context */ - open suspend fun setReplicationMetadata() { - replicationMetadata = if(this is AutoFollowTask) { - replicationMetadataManager.getAutofollowMetadata(followerIndexName, leaderAlias, fetch_from_primary = true) - } - else { - replicationMetadataManager.getIndexReplicationMetadata(followerIndexName, fetch_from_primary = true) - } - } + protected abstract suspend fun setReplicationMetadata() //used only in testing open suspend fun setReplicationMetadata(rm :ReplicationMetadata) { replicationMetadata = rm - } open class CrossClusterReplicationTaskResponse(val status: String): ActionResponse(), ToXContentObject { diff --git a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt index 834a19e6..f541594b 100644 --- a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt @@ -23,6 +23,7 @@ import org.opensearch.replication.util.suspending import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.isActive +import org.opensearch.OpenSearchException import org.opensearch.OpenSearchSecurityException import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.support.IndicesOptions @@ -35,6 +36,7 @@ import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.persistent.PersistentTaskState import org.opensearch.replication.ReplicationException +import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.tasks.TaskId import org.opensearch.threadpool.Scheduler @@ -65,9 +67,21 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) { stat = AutoFollowStat(params.patternName, replicationMetadata.leaderContext.resource) while (scope.isActive) { - addRetryScheduler() - autoFollow() - delay(replicationSettings.autofollowFetchPollDuration.millis) + try { + addRetryScheduler() + autoFollow() + delay(replicationSettings.autofollowFetchPollDuration.millis) + } + catch(e: OpenSearchException) { + // Any transient error encountered during auto follow execution should be re-tried + val status = e.status().status + if(status < 500 && status != RestStatus.TOO_MANY_REQUESTS.status) { + log.error("Exiting autofollow task", e) + throw e + } + log.debug("Encountered transient error while running autofollow task", e) + delay(replicationSettings.autofollowFetchPollDuration.millis) + } } } @@ -185,6 +199,10 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String override fun getStatus(): AutoFollowStat { return stat } + + override suspend fun setReplicationMetadata() { + this.replicationMetadata = replicationMetadataManager.getAutofollowMetadata(followerIndexName, leaderAlias, fetch_from_primary = true) + } } class AutoFollowStat: Task.Status { diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index bf0d3a0f..a534834b 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -88,6 +88,7 @@ import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask import org.opensearch.persistent.PersistentTasksNodeService import org.opensearch.persistent.PersistentTasksService import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING +import org.opensearch.rest.RestStatus import org.opensearch.tasks.TaskId import org.opensearch.tasks.TaskManager import org.opensearch.threadpool.ThreadPool @@ -163,76 +164,91 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript followingTaskState = FollowingState(emptyMap()) currentTaskState = initialState as IndexReplicationState while (scope.isActive) { - val newState = when (currentTaskState.state) { - ReplicationState.INIT -> { - addListenerToInterruptTask() - if (isResumed()) { - log.debug("Resuming tasks now.") - InitFollowState - } else { - setupAndStartRestore() + try { + val newState = when (currentTaskState.state) { + ReplicationState.INIT -> { + addListenerToInterruptTask() + if (isResumed()) { + log.debug("Resuming tasks now.") + InitFollowState + } else { + setupAndStartRestore() + } } - } - ReplicationState.RESTORING -> { - log.info("In restoring state") - waitForRestore() - } - ReplicationState.INIT_FOLLOW -> { - log.info("Starting shard tasks") - addIndexBlockForReplication() - startShardFollowTasks(emptyMap()) - } - ReplicationState.FOLLOWING -> { - if (currentTaskState is FollowingState) { - followingTaskState = (currentTaskState as FollowingState) - shouldCallEvalMonitoring = false - MonitoringState - } else { - throw org.opensearch.replication.ReplicationException("Wrong state type: ${currentTaskState::class}") + ReplicationState.RESTORING -> { + log.info("In restoring state") + waitForRestore() } - } - ReplicationState.MONITORING -> { - var state = evalMonitoringState() - if (metadataPoller == null) { - metadataPoller = scope.launch { - pollForMetadata(this) + ReplicationState.INIT_FOLLOW -> { + log.info("Starting shard tasks") + addIndexBlockForReplication() + startShardFollowTasks(emptyMap()) + } + ReplicationState.FOLLOWING -> { + if (currentTaskState is FollowingState) { + followingTaskState = (currentTaskState as FollowingState) + shouldCallEvalMonitoring = false + MonitoringState + } else { + throw org.opensearch.replication.ReplicationException("Wrong state type: ${currentTaskState::class}") } } - - if (state !is MonitoringState) { - // Tasks need to be started - state - } else { - state = pollShardTaskStatus((followingTaskState as FollowingState).shardReplicationTasks) - followingTaskState = startMissingShardTasks((followingTaskState as FollowingState).shardReplicationTasks) - when (state) { - is MonitoringState -> { - updateMetadata() - } - is FailedState -> { - // Try pausing first if we get Failed state. This returns failed state if pause failed - pauseReplication(state) + ReplicationState.MONITORING -> { + var state = evalMonitoringState() + if (metadataPoller == null) { + metadataPoller = scope.launch { + pollForMetadata(this) } - else -> { - state + } + + if (state !is MonitoringState) { + // Tasks need to be started + state + } else { + state = pollShardTaskStatus((followingTaskState as FollowingState).shardReplicationTasks) + followingTaskState = startMissingShardTasks((followingTaskState as FollowingState).shardReplicationTasks) + when (state) { + is MonitoringState -> { + updateMetadata() + } + is FailedState -> { + // Try pausing first if we get Failed state. This returns failed state if pause failed + pauseReplication(state) + } + else -> { + state + } } } } + ReplicationState.FAILED -> { + assert(currentTaskState is FailedState) + failReplication(currentTaskState as FailedState) + currentTaskState + } + ReplicationState.COMPLETED -> { + markAsCompleted() + CompletedState + } } - ReplicationState.FAILED -> { - assert(currentTaskState is FailedState) - failReplication(currentTaskState as FailedState) - currentTaskState - } - ReplicationState.COMPLETED -> { - markAsCompleted() - CompletedState + if (newState != currentTaskState) { + currentTaskState = updateState(newState) } + if (isCompleted) break } - if (newState != currentTaskState) { - currentTaskState = updateState(newState) + catch(e: OpenSearchException) { + val status = e.status().status + // Index replication task shouldn't exit before shard replication tasks + // As long as shard replication tasks doesn't encounter any errors, Index task + // should continue to poll and Any failure encoutered from shard task should + // invoke state transition and exit + if(status < 500 && status != RestStatus.TOO_MANY_REQUESTS.status) { + log.error("Exiting index replication task", e) + throw e + } + log.debug("Encountered transient error while running index replication task", e) + delay(SLEEP_TIME_BETWEEN_POLL_MS) } - if (isCompleted) break } } @@ -856,6 +872,10 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } } + override suspend fun setReplicationMetadata() { + this.replicationMetadata = replicationMetadataManager.getIndexReplicationMetadata(followerIndexName, fetch_from_primary = true) + } + override fun replicationTaskResponse(): CrossClusterReplicationTaskResponse { return IndexReplicationTaskResponse(currentTaskState) } diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index 006c8eba..a7418917 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -314,6 +314,10 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: log.error("${Thread.currentThread().name}: $msg") } + override suspend fun setReplicationMetadata() { + this.replicationMetadata = replicationMetadataManager.getIndexReplicationMetadata(followerIndexName, fetch_from_primary = true) + } + override fun toString(): String { return "ShardReplicationTask(from=${leaderAlias}$leaderShardId to=$followerShardId)" } diff --git a/src/main/kotlin/org/opensearch/replication/util/Coroutines.kt b/src/main/kotlin/org/opensearch/replication/util/Coroutines.kt index 54776494..ddcfebd5 100644 --- a/src/main/kotlin/org/opensearch/replication/util/Coroutines.kt +++ b/src/main/kotlin/org/opensearch/replication/util/Coroutines.kt @@ -91,6 +91,16 @@ suspend fun } } +suspend fun + OpenSearchClient.suspendExecute(replicationMetadata: ReplicationMetadata?, + action: ActionType, req: Req, injectSecurityContext: Boolean = false, defaultContext: Boolean = false) : Resp { + return if(replicationMetadata != null) { + suspendExecute(replicationMetadata, action, req, defaultContext = defaultContext) + } else { + suspendExecute(action, req, injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) + } +} + suspend fun IndexShard.waitForGlobalCheckpoint(waitingForGlobalCheckpoint: Long, timeout: TimeValue?) : Long { return suspendCancellableCoroutine { cont -> val listener = object : GlobalCheckpointListeners.GlobalCheckpointListener { diff --git a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt index 1b5ffbc6..38c40874 100644 --- a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt +++ b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt @@ -18,6 +18,8 @@ import org.opensearch.commons.authuser.User import kotlinx.coroutines.delay import org.apache.logging.log4j.Logger import org.opensearch.OpenSearchException +import org.opensearch.OpenSearchSecurityException +import org.opensearch.ResourceNotFoundException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest import org.opensearch.action.ActionResponse @@ -31,8 +33,10 @@ import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.store.Store import org.opensearch.indices.recovery.RecoveryState +import org.opensearch.replication.ReplicationException import org.opensearch.replication.util.stackTraceToString import org.opensearch.repositories.IndexId +import org.opensearch.rest.RestStatus import org.opensearch.snapshots.SnapshotId import org.opensearch.transport.ConnectTransportException import org.opensearch.transport.NodeDisconnectedException @@ -92,7 +96,7 @@ fun IndexRequestBuilder.execute(id: String, listener: ActionListener Client.suspendExecuteWithRetries( - replicationMetadata: ReplicationMetadata, + replicationMetadata: ReplicationMetadata?, action: ActionType, req: Req, numberOfRetries: Int = 5, @@ -101,16 +105,22 @@ suspend fun Client.suspendExecuteWith factor: Double = 2.0, log: Logger, retryOn: ArrayList> = ArrayList(), + injectSecurityContext: Boolean = false, defaultContext: Boolean = false): Resp { var currentBackoff = backoff retryOn.addAll(defaultRetryableExceptions()) repeat(numberOfRetries - 1) { try { - return suspendExecute(replicationMetadata, action, req, defaultContext = defaultContext) + return suspendExecute(replicationMetadata, action, req, + injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) } catch (e: OpenSearchException) { // Not retrying for IndexNotFoundException as it is not a transient failure // TODO Remove this check for IndexNotFoundException: https://github.com/opensearch-project/cross-cluster-replication/issues/78 - if (e !is IndexNotFoundException && (retryOn.contains(e.javaClass) || TransportActions.isShardNotAvailableException(e))) { + if (e !is IndexNotFoundException && (retryOn.contains(e.javaClass) + || TransportActions.isShardNotAvailableException(e) + // This waits for the dependencies to load and retry. Helps during boot-up + || e.status().status >= 500 + || e.status() == RestStatus.TOO_MANY_REQUESTS)) { log.warn("Encountered a failure while executing in $req. Retrying in ${currentBackoff/1000} seconds" + ".", e) delay(currentBackoff) @@ -120,7 +130,8 @@ suspend fun Client.suspendExecuteWith } } } - return suspendExecute(replicationMetadata, action, req) // last attempt + return suspendExecute(replicationMetadata, action, req, + injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) // last attempt } /** diff --git a/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt b/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt index 7b68168a..41e3c139 100644 --- a/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt +++ b/src/test/kotlin/org/opensearch/replication/ReplicationHelpers.kt @@ -301,7 +301,7 @@ fun RestHighLevelClient.waitForNoRelocatingShards() { this.cluster().health(request, RequestOptions.DEFAULT) } -fun RestHighLevelClient.waitForReplicationStop(index: String, waitFor : TimeValue = TimeValue.timeValueSeconds(10)) { +fun RestHighLevelClient.waitForReplicationStop(index: String, waitFor : TimeValue = TimeValue.timeValueSeconds(30)) { assertBusy( { // Persistent tasks service appends modifiers to task action hence the '*'