From 7e16c39ab0197acd865e1b501955306cb32a0150 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 29 Sep 2021 19:23:54 +0530 Subject: [PATCH] Handle case when leader index mapping is not available In case of large documents, leader mapping is not immediately available which causes an NPE and fails the shard task. Checking whether mapping is available and throwing exception if it not available so that the call can be retried. Signed-off-by: Sooraj Sinha --- .../MappingNotAvailableException.kt | 21 +++++++++++++++++++ .../replay/TransportReplayChangesAction.kt | 8 ++++++- .../task/shard/TranslogSequencer.kt | 13 +++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/replication/MappingNotAvailableException.kt diff --git a/src/main/kotlin/org/opensearch/replication/MappingNotAvailableException.kt b/src/main/kotlin/org/opensearch/replication/MappingNotAvailableException.kt new file mode 100644 index 00000000..d4817478 --- /dev/null +++ b/src/main/kotlin/org/opensearch/replication/MappingNotAvailableException.kt @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication; + +import org.opensearch.OpenSearchException + +public class MappingNotAvailableException: OpenSearchException { + + constructor(message: String, vararg args: Any) : super(message, *args) + + constructor(message: String, cause: Throwable, vararg args: Any) : super(message, cause, *args) +} diff --git a/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt index 0a64584f..170a46ec 100644 --- a/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt @@ -11,6 +11,7 @@ package org.opensearch.replication.action.replay +import org.opensearch.replication.MappingNotAvailableException import org.opensearch.replication.metadata.UpdateMetadataAction import org.opensearch.replication.metadata.UpdateMetadataRequest import org.opensearch.replication.metadata.checkIfIndexBlockedWithLevel @@ -174,7 +175,12 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() val getMappingsRequest = GetMappingsRequest().indices(leaderIndex).indicesOptions(options) val getMappingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(getMappingsRequest) - val mappingSource = getMappingsResponse.mappings().get(leaderIndex).get(type).source().string() + val mappingSource = getMappingsResponse?.mappings()?.get(leaderIndex)?.get(type)?.source()?.string() + if (null == mappingSource) { + log.error("Mapping response: $getMappingsResponse") + throw MappingNotAvailableException("Mapping for the index $leaderIndex is not available") + } + // This should use MappingUpdateAction but that uses PutMappingRequest internally and // PutMappingRequest#setConcreteIndex has a bug where it throws an NPE.This is fixed upstream in diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index cad3f0ef..63c192d8 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -11,6 +11,7 @@ package org.opensearch.replication.task.shard +import org.opensearch.replication.MappingNotAvailableException import org.opensearch.replication.ReplicationException import org.opensearch.replication.action.changes.GetChangesResponse import org.opensearch.replication.action.replay.ReplayChangesAction @@ -28,6 +29,7 @@ import org.opensearch.common.logging.Loggers import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog import org.opensearch.tasks.TaskId +import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit @@ -65,7 +67,16 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: replayRequest.parentTask = parentTaskId launch { var relativeStartNanos = System.nanoTime() - val replayResponse = client.suspendExecuteWithRetries(replicationMetadata, ReplayChangesAction.INSTANCE, replayRequest, log = log) + val retryOnExceptions = ArrayList>() + retryOnExceptions.add(MappingNotAvailableException::class.java) + + val replayResponse = client.suspendExecuteWithRetries( + replicationMetadata, + ReplayChangesAction.INSTANCE, + replayRequest, + log = log, + retryOn = retryOnExceptions + ) if (replayResponse.shardInfo.failed > 0) { replayResponse.shardInfo.failures.forEachIndexed { i, failure -> log.error("Failed replaying changes. Failure:$i:$failure")