Skip to content

Commit

Permalink
Handle case when leader index mapping is not available
Browse files Browse the repository at this point in the history
In case of large documents, leader mapping is not immediately available which causes
an NPE and fails the shard task. Checking whether mapping is available and throwing
exception if it not available so that the call can be retried.

Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Sep 29, 2021
1 parent e234c09 commit 7e16c39
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication;

import org.opensearch.OpenSearchException

public class MappingNotAvailableException: OpenSearchException {

constructor(message: String, vararg args: Any) : super(message, *args)

constructor(message: String, cause: Throwable, vararg args: Any) : super(message, cause, *args)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.replication.action.replay

import org.opensearch.replication.MappingNotAvailableException
import org.opensearch.replication.metadata.UpdateMetadataAction
import org.opensearch.replication.metadata.UpdateMetadataRequest
import org.opensearch.replication.metadata.checkIfIndexBlockedWithLevel
Expand Down Expand Up @@ -174,7 +175,12 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed()
val getMappingsRequest = GetMappingsRequest().indices(leaderIndex).indicesOptions(options)
val getMappingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(getMappingsRequest)
val mappingSource = getMappingsResponse.mappings().get(leaderIndex).get(type).source().string()
val mappingSource = getMappingsResponse?.mappings()?.get(leaderIndex)?.get(type)?.source()?.string()
if (null == mappingSource) {
log.error("Mapping response: $getMappingsResponse")
throw MappingNotAvailableException("Mapping for the index $leaderIndex is not available")
}


// This should use MappingUpdateAction but that uses PutMappingRequest internally and
// PutMappingRequest#setConcreteIndex has a bug where it throws an NPE.This is fixed upstream in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.replication.task.shard

import org.opensearch.replication.MappingNotAvailableException
import org.opensearch.replication.ReplicationException
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
Expand All @@ -28,6 +29,7 @@ import org.opensearch.common.logging.Loggers
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -65,7 +67,16 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
replayRequest.parentTask = parentTaskId
launch {
var relativeStartNanos = System.nanoTime()
val replayResponse = client.suspendExecuteWithRetries(replicationMetadata, ReplayChangesAction.INSTANCE, replayRequest, log = log)
val retryOnExceptions = ArrayList<Class<*>>()
retryOnExceptions.add(MappingNotAvailableException::class.java)

val replayResponse = client.suspendExecuteWithRetries(
replicationMetadata,
ReplayChangesAction.INSTANCE,
replayRequest,
log = log,
retryOn = retryOnExceptions
)
if (replayResponse.shardInfo.failed > 0) {
replayResponse.shardInfo.failures.forEachIndexed { i, failure ->
log.error("Failed replaying changes. Failure:$i:$failure")
Expand Down

0 comments on commit 7e16c39

Please sign in to comment.