From 42601d41459c0919a03baeca2b7544a143bc872c Mon Sep 17 00:00:00 2001 From: Sai Date: Tue, 9 Nov 2021 18:21:23 +0530 Subject: [PATCH] Filtered out replication exceptions from retrying (#238) Signed-off-by: Sai Kumar --- .../replication/task/index/IndexReplicationTask.kt | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index a534834b..9691c8ba 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -87,6 +87,7 @@ import org.opensearch.persistent.PersistentTasksCustomMetadata import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask import org.opensearch.persistent.PersistentTasksNodeService import org.opensearch.persistent.PersistentTasksService +import org.opensearch.replication.ReplicationException import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING import org.opensearch.rest.RestStatus import org.opensearch.tasks.TaskId @@ -190,7 +191,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript shouldCallEvalMonitoring = false MonitoringState } else { - throw org.opensearch.replication.ReplicationException("Wrong state type: ${currentTaskState::class}") + throw ReplicationException("Wrong state type: ${currentTaskState::class}") } } ReplicationState.MONITORING -> { @@ -235,8 +236,10 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript currentTaskState = updateState(newState) } if (isCompleted) break - } - catch(e: OpenSearchException) { + } catch(e: ReplicationException) { + log.error("Exiting index replication task", e) + throw e + } catch(e: OpenSearchException) { val status = e.status().status // Index replication task shouldn't exit before shard replication tasks // As long as shard replication tasks doesn't encounter any errors, Index task @@ -627,7 +630,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript defaultContext = true ) if (!pauseReplicationResponse.isAcknowledged) { - throw org.opensearch.replication.ReplicationException( + throw ReplicationException( "Failed to gracefully pause replication after one or more shard tasks failed. " + "Replication tasks may need to be paused manually." ) @@ -755,7 +758,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript val response = client.suspending(client.admin().cluster()::restoreSnapshot, defaultContext = true)(restoreRequest) if (response.restoreInfo != null) { if (response.restoreInfo.failedShards() != 0) { - throw org.opensearch.replication.ReplicationException("Restore failed: $response") + throw ReplicationException("Restore failed: $response") } return FollowingState(emptyMap()) }