Skip to content

Commit

Permalink
Fixes selecting node and adds additional error logging
Browse files Browse the repository at this point in the history
Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob committed Apr 7, 2022
1 parent f866e9a commit 0aa86ae
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand
import org.opensearch.cluster.routing.allocation.decider.Decision
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.Tuple
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -86,6 +87,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
val statsStore = statsResponse.total.store
val statsDocs = statsResponse.total.docs
if (statsStore == null || statsDocs == null) {
logger.error("Failed to move shards in shrink action as IndicesStatsResponse was missing store or doc stats.")
fail(FAILURE_MESSAGE)
return this
}
Expand Down Expand Up @@ -123,15 +125,16 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
stepStatus = StepStatus.COMPLETED
return this
} catch (e: OpenSearchSecurityException) {
fail(getSecurityFailureMessage(e.localizedMessage), e.message)
fail(getSecurityFailureMessage(e.localizedMessage), e.message, e)
return this
} catch (e: Exception) {
fail(FAILURE_MESSAGE, e.message)
fail(FAILURE_MESSAGE, e.message, e)
return this
}
}

private fun fail(message: String, cause: String? = null) {
private fun fail(message: String, cause: String? = null, e: Exception? = null) {
e?.let { logger.error(message, e) }
info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause)
stepStatus = StepStatus.FAILED
shrinkActionProperties = null
Expand All @@ -153,6 +156,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
val totalDocs: Long = docsStats.count
val docsPerTargetShard: Long = totalDocs / numTargetShards
if (docsPerTargetShard > MAXIMUM_DOCS_PER_SHARD) {
logger.error(TOO_MANY_DOCS_FAILURE_MESSAGE)
fail(TOO_MANY_DOCS_FAILURE_MESSAGE)
return true
}
Expand All @@ -170,6 +174,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
val numReplicas = clusterService.state().metadata.indices[indexName].numberOfReplicas
val shouldFailForceUnsafeCheck = numReplicas == 0
if (shouldFailForceUnsafeCheck) {
logger.error(UNSAFE_FAILURE_MESSAGE)
fail(UNSAFE_FAILURE_MESSAGE)
return true
}
Expand All @@ -179,7 +184,9 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
private fun targetIndexNameExists(clusterService: ClusterService, shrinkTargetIndexName: String): Boolean {
val indexExists = clusterService.state().metadata.indices.containsKey(shrinkTargetIndexName)
if (indexExists) {
fail(getIndexExistsMessage(shrinkTargetIndexName))
val indexExistsMessage = getIndexExistsMessage(shrinkTargetIndexName)
logger.error(indexExistsMessage)
fail(indexExistsMessage)
return true
}
return false
Expand Down Expand Up @@ -269,8 +276,9 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
}
val clusterRerouteResponse: ClusterRerouteResponse =
stepContext.client.admin().cluster().suspendUntil { reroute(clusterRerouteRequest, it) }
val numYesDecisions = clusterRerouteResponse.explanations.explanations().count { it.decisions().type().equals((Decision.Type.YES)) }
// Should be the same number of yes decisions as the number of primary shards
if (clusterRerouteResponse.explanations.yesDecisionMessages.size == numberOfRerouteRequests) {
if (numYesDecisions == numberOfRerouteRequests) {
suitableNodes.add(sizeNodeTuple.v2())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
val localShrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
shrinkActionProperties = localShrinkActionProperties
if (localShrinkActionProperties == null) {
logger.error(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE)
cleanupAndFail(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE)
return this
}
val lock = renewShrinkLock(localShrinkActionProperties, context.jobContext, logger)
if (lock == null) {
logger.error("Shrink action failed to renew lock on node [${localShrinkActionProperties.nodeName}]")
cleanupAndFail("Failed to renew lock on node [${localShrinkActionProperties.nodeName}]")
return this
}
Expand All @@ -68,16 +70,17 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
stepStatus = StepStatus.COMPLETED
return this
} catch (e: RemoteTransportException) {
cleanupAndFail(FAILURE_MESSAGE)
cleanupAndFail(FAILURE_MESSAGE, e = e)
return this
} catch (e: Exception) {
cleanupAndFail(FAILURE_MESSAGE, e.message)
cleanupAndFail(FAILURE_MESSAGE, e.message, e)
return this
}
}

// Sets the action to failed, clears the readonly and allocation settings on the source index, and releases the shrink lock
private suspend fun cleanupAndFail(message: String, cause: String? = null) {
private suspend fun cleanupAndFail(message: String, cause: String? = null, e: Exception? = null) {
e?.let { logger.error(message, e) }
info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause)
stepStatus = StepStatus.FAILED
// Non-null assertion !! is used to throw an exception on null which would just be caught and logged
Expand All @@ -103,6 +106,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
}
val statsStore = statsResponse.total.store
if (statsStore == null) {
logger.error("Shrink action failed as indices stats request was missing store stats.")
cleanupAndFail(FAILURE_MESSAGE)
return false
}
Expand All @@ -113,11 +117,13 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
// If the node has been replaced, this will fail
val node = nodeStatsResponse.nodes.firstOrNull { it.node.name == nodeName }
if (node == null) {
logger.error("Shrink action failed as node stats were missing the previously selected node.")
cleanupAndFail(FAILURE_MESSAGE)
return false
}
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.settings, context.clusterService.clusterSettings)
if (remainingMem < 1L) {
logger.error("Shrink action failed as the previously selected node no longer has enough free space.")
cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE)
return false
}
Expand All @@ -136,6 +142,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
action.aliases?.forEach { req.targetIndexRequest.alias(it) }
val resizeResponse: ResizeResponse = context.client.admin().indices().suspendUntil { resizeIndex(req, it) }
if (!resizeResponse.isAcknowledged) {
logger.error("Shrink action failed as the resize index request was not acknowledged.")
cleanupAndFail(FAILURE_MESSAGE)
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
val localShrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
shrinkActionProperties = localShrinkActionProperties
if (localShrinkActionProperties == null) {
logger.error(METADATA_FAILURE_MESSAGE)
cleanupAndFail(METADATA_FAILURE_MESSAGE)
return this
}
val lock = renewShrinkLock(localShrinkActionProperties, context.jobContext, logger)
if (lock == null) {
logger.error("Shrink action failed to renew lock on node [${localShrinkActionProperties.nodeName}]")
cleanupAndFail("Failed to renew lock on node [${localShrinkActionProperties.nodeName}]")
return this
}
Expand Down Expand Up @@ -85,16 +87,17 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
}
return this
} catch (e: RemoteTransportException) {
cleanupAndFail(FAILURE_MESSAGE)
cleanupAndFail(FAILURE_MESSAGE, e = e)
return this
} catch (e: Exception) {
cleanupAndFail(FAILURE_MESSAGE, cause = e.message)
cleanupAndFail(FAILURE_MESSAGE, cause = e.message, e)
return this
}
}

// Sets the action to failed, clears the readonly and allocation settings on the source index, and releases the shrink lock
private suspend fun cleanupAndFail(message: String, cause: String? = null) {
private suspend fun cleanupAndFail(message: String, cause: String? = null, e: Exception? = null) {
e?.let { logger.error(message, e) }
info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause)
stepStatus = StepStatus.FAILED
// Non-null assertion !! is used to throw an exception on null which would just be caught and logged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForMoveShardsStep.Companion.getTimeoutFailure
import org.opensearch.indexmanagement.indexstatemanagement.util.clearReadOnlyAndRouting
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.getActionStartTime
Expand Down Expand Up @@ -44,11 +45,13 @@ class WaitForShrinkStep(private val action: ShrinkAction) : Step(name) {
val localShrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
shrinkActionProperties = localShrinkActionProperties
if (localShrinkActionProperties == null) {
logger.error(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE)
cleanupAndFail(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE)
return this
}
val lock = renewShrinkLock(localShrinkActionProperties, context.jobContext, logger)
if (lock == null) {
logger.error("Shrink action failed to renew lock on node [${localShrinkActionProperties.nodeName}]")
cleanupAndFail("Failed to renew lock on node [${localShrinkActionProperties.nodeName}]")
return this
}
Expand All @@ -72,17 +75,18 @@ class WaitForShrinkStep(private val action: ShrinkAction) : Step(name) {
info = mapOf("message" to SUCCESS_MESSAGE)
return this
} catch (e: RemoteTransportException) {
cleanupAndFail(getFailureMessage(localShrinkActionProperties.targetIndexName))
cleanupAndFail(getFailureMessage(localShrinkActionProperties.targetIndexName), e = e)
return this
} catch (e: Exception) {
cleanupAndFail(GENERIC_FAILURE_MESSAGE, e.message)
cleanupAndFail(GENERIC_FAILURE_MESSAGE, e.message, e)
return this
}
}

// Sets the action to failed, clears the readonly and allocation settings on the source index, deletes the target index,
// and releases the shrink lock
private suspend fun cleanupAndFail(message: String, cause: String? = null) {
private suspend fun cleanupAndFail(message: String, cause: String? = null, e: Exception? = null) {
e?.let { logger.error(message, e) }
info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause)
stepStatus = StepStatus.FAILED
// Using a try/catch for each cleanup action as we should clean up as much as possible despite any failures
Expand Down Expand Up @@ -117,6 +121,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : Step(name) {
val allocationSettings = Settings.builder().putNull(AttemptMoveShardsStep.ROUTING_SETTING).build()
val response: AcknowledgedResponse = issueUpdateSettingsRequest(context.client, index, allocationSettings)
if (!response.isAcknowledged) {
logger.error("Shrink action to clear the allocation settings on index [$index] following shrinking.")
cleanupAndFail(getFailureMessage(index))
return false
}
Expand All @@ -135,7 +140,8 @@ class WaitForShrinkStep(private val action: ShrinkAction) : Step(name) {
val timeOutInSeconds = action.configTimeout?.timeout?.seconds ?: WaitForMoveShardsStep.MOVE_SHARDS_TIMEOUT_IN_SECONDS
// Get ActionTimeout if given, otherwise use default timeout of 12 hours
if (timeFromActionStarted.toSeconds() > timeOutInSeconds) {
cleanupAndFail(getFailureMessage(targetIndex))
logger.error(getTimeoutFailure(targetIndex))
cleanupAndFail(getTimeoutFailure(targetIndex))
} else {
info = mapOf("message" to getDelayedMessage(targetIndex))
stepStatus = StepStatus.CONDITION_NOT_MET
Expand Down Expand Up @@ -163,5 +169,6 @@ class WaitForShrinkStep(private val action: ShrinkAction) : Step(name) {
const val GENERIC_FAILURE_MESSAGE = "Shrink failed while waiting for shards to start."
fun getDelayedMessage(newIndex: String) = "Shrink delayed because $newIndex shards not in started state."
fun getFailureMessage(newIndex: String) = "Shrink failed while waiting for $newIndex shards to start."
fun getTimeoutFailure(newIndex: String) = "Shrink failed because it timed out while waiting for $newIndex shrink to finish."
}
}

0 comments on commit 0aa86ae

Please sign in to comment.