Skip to content

Commit

Permalink
Testing job lock
Browse files Browse the repository at this point in the history
Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob committed Apr 1, 2022
1 parent 4a30fd8 commit 0373ba8
Show file tree
Hide file tree
Showing 10 changed files with 447 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ data class ShrinkActionProperties(
val targetNumShards: Int,
val lockPrimaryTerm: Long,
val lockSeqNo: Long,
val lockEpochSecond: Long
val lockEpochSecond: Long,
val lockDurationSecond: Long
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
Expand All @@ -30,6 +31,7 @@ data class ShrinkActionProperties(
out.writeLong(lockPrimaryTerm)
out.writeLong(lockSeqNo)
out.writeLong(lockEpochSecond)
out.writeLong(lockDurationSecond)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -39,6 +41,7 @@ data class ShrinkActionProperties(
builder.field(ShrinkProperties.LOCK_SEQ_NO.key, lockSeqNo)
builder.field(ShrinkProperties.LOCK_PRIMARY_TERM.key, lockPrimaryTerm)
builder.field(ShrinkProperties.LOCK_EPOCH_SECOND.key, lockEpochSecond)
builder.field(ShrinkProperties.LOCK_DURATION_SECOND.key, lockDurationSecond)
return builder
}

Expand All @@ -52,8 +55,9 @@ data class ShrinkActionProperties(
val lockPrimaryTerm: Long = si.readLong()
val lockSeqNo: Long = si.readLong()
val lockEpochSecond: Long = si.readLong()
val lockDurationSecond: Long = si.readLong()

return ShrinkActionProperties(nodeName, targetIndexName, targetNumShards, lockPrimaryTerm, lockSeqNo, lockEpochSecond)
return ShrinkActionProperties(nodeName, targetIndexName, targetNumShards, lockPrimaryTerm, lockSeqNo, lockEpochSecond, lockDurationSecond)
}

fun parse(xcp: XContentParser): ShrinkActionProperties {
Expand All @@ -63,6 +67,7 @@ data class ShrinkActionProperties(
var lockPrimaryTerm: Long? = null
var lockSeqNo: Long? = null
var lockEpochSecond: Long? = null
var lockDurationSecond: Long? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -76,6 +81,7 @@ data class ShrinkActionProperties(
ShrinkProperties.LOCK_PRIMARY_TERM.key -> lockPrimaryTerm = xcp.longValue()
ShrinkProperties.LOCK_SEQ_NO.key -> lockSeqNo = xcp.longValue()
ShrinkProperties.LOCK_EPOCH_SECOND.key -> lockEpochSecond = xcp.longValue()
ShrinkProperties.LOCK_DURATION_SECOND.key -> lockDurationSecond = xcp.longValue()
}
}

Expand All @@ -85,7 +91,8 @@ data class ShrinkActionProperties(
requireNotNull(targetNumShards),
requireNotNull(lockPrimaryTerm),
requireNotNull(lockSeqNo),
requireNotNull(lockEpochSecond)
requireNotNull(lockEpochSecond),
requireNotNull(lockDurationSecond)
)
}
}
Expand All @@ -96,6 +103,7 @@ data class ShrinkActionProperties(
TARGET_NUM_SHARDS("target_num_shards"),
LOCK_SEQ_NO("lock_seq_no"),
LOCK_PRIMARY_TERM("lock_primary_term"),
LOCK_EPOCH_SECOND("lock_epoch_second")
LOCK_EPOCH_SECOND("lock_epoch_second"),
LOCK_DURATION_SECOND("lock_duration_second")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class ShrinkAction(
} else if (numNewShards != null) {
require(numNewShards > 0) { "Shrink action numNewShards must be greater than 0." }
}
if (targetIndexSuffix != null) {
require(!targetIndexSuffix.contains('*') && !targetIndexSuffix.contains('?')) { "Target index suffix must not contain wildcards." }
}
}

private val attemptMoveShardsStep = AttemptMoveShardsStep(this)
Expand Down Expand Up @@ -71,6 +74,9 @@ class ShrinkAction(
AttemptShrinkStep.name -> waitForShrinkStep
else -> stepNameToStep[currentStep]!!
}
} else if (currentStepStatus == Step.StepStatus.FAILED) {
// If we failed at any point, retries should start from the beginning
return attemptMoveShardsStep
}
// step not completed
return stepNameToStep[currentStep]!!
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@
package org.opensearch.indexmanagement.indexstatemanagement.step.shrink

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse
import org.opensearch.action.admin.indices.shrink.ResizeRequest
import org.opensearch.action.admin.indices.shrink.ResizeResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import org.opensearch.indexmanagement.indexstatemanagement.util.clearReadOnlyAndRouting
import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink
import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen
import org.opensearch.indexmanagement.indexstatemanagement.util.releaseShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.renewShrinkLock
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -34,8 +41,12 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
val actionMetadata = context.metadata.actionMetaData
val shrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
if (shrinkActionProperties == null) {
info = mapOf("message" to "Shrink action properties are null, metadata was not properly populated")
stepStatus = StepStatus.FAILED
cleanupAndFail(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE)
return this
}
val lock = renewShrinkLock(shrinkActionProperties, context.jobContext, logger)
if (lock == null) {
cleanupAndFail("Failed to renew lock on node [${shrinkActionProperties.nodeName}]")
return this
}
try {
Expand All @@ -44,24 +55,69 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
info = mapOf("message" to INDEX_HEALTH_NOT_GREEN_MESSAGE)
return this
}
if (!isNodeStillSuitable(shrinkActionProperties.nodeName, indexName, context)) return this

// If the resize index api fails, the step will be set to failed and resizeIndex will return false
if (!resizeIndex(indexName, shrinkActionProperties, context)) return this
info = mapOf("message" to getSuccessMessage(shrinkActionProperties.targetIndexName))
stepStatus = StepStatus.COMPLETED
return this
} catch (e: RemoteTransportException) {
info = mapOf("message" to FAILURE_MESSAGE)
releaseShrinkLock(shrinkActionProperties, context.jobContext, logger)
stepStatus = StepStatus.FAILED
cleanupAndFail(FAILURE_MESSAGE)
return this
} catch (e: Exception) {
releaseShrinkLock(shrinkActionProperties, context.jobContext, logger)
info = mapOf("message" to FAILURE_MESSAGE, "cause" to "{${e.message}}")
stepStatus = StepStatus.FAILED
cleanupAndFail(FAILURE_MESSAGE, e.message)
return this
}
}

// Sets the action to failed, clears the readonly and allocation settings on the source index, and releases the shrink lock
private suspend fun cleanupAndFail(message: String, cause: String? = null) {
info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause)
stepStatus = StepStatus.FAILED
val context = this.context ?: return
try {
clearReadOnlyAndRouting(context.metadata.index, context.client)
} catch (e: Exception) {
logger.error("Shrink action failed while trying to clean up routing and readonly setting after a failure: $e")
}
try {
val shrinkActionProperties = context.metadata.actionMetaData?.actionProperties?.shrinkActionProperties ?: return
releaseShrinkLock(shrinkActionProperties, context.jobContext, logger)
} catch (e: Exception) {
logger.error("Shrink action failed while trying to release the node lock after a failure: $e")
}
}

private suspend fun isNodeStillSuitable(nodeName: String, indexName: String, context: StepContext): Boolean {
// Get the size of the index
val statsRequest = IndicesStatsRequest().indices(indexName)
val statsResponse: IndicesStatsResponse = context.client.admin().indices().suspendUntil {
stats(statsRequest, it)
}
val statsStore = statsResponse.total.store
if (statsStore == null) {
cleanupAndFail(FAILURE_MESSAGE)
return false
}
val indexSizeInBytes = statsStore.sizeInBytes
// Get the remaining memory in the node
val nodesStatsReq = NodesStatsRequest().addMetric(AttemptMoveShardsStep.OS_METRIC)
val nodeStatsResponse: NodesStatsResponse = context.client.admin().cluster().suspendUntil { nodesStats(nodesStatsReq, it) }
// If the node has been replaced, this will fail
val node = nodeStatsResponse.nodes.firstOrNull { it.node.name == nodeName }
if (node == null) {
cleanupAndFail(FAILURE_MESSAGE)
return false
}
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.settings, context.clusterService.clusterSettings)
if (remainingMem < 1L) {
cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE)
return false
}
return true
}

private suspend fun resizeIndex(sourceIndex: String, shrinkActionProperties: ShrinkActionProperties, context: StepContext): Boolean {
val targetIndex = shrinkActionProperties.targetIndexName
val req = ResizeRequest(targetIndex, sourceIndex)
Expand All @@ -74,9 +130,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
action.aliases?.forEach { req.targetIndexRequest.alias(it) }
val resizeResponse: ResizeResponse = context.client.admin().indices().suspendUntil { resizeIndex(req, it) }
if (!resizeResponse.isAcknowledged) {
info = mapOf("message" to FAILURE_MESSAGE)
releaseShrinkLock(shrinkActionProperties, context.jobContext, logger)
stepStatus = StepStatus.FAILED
cleanupAndFail(FAILURE_MESSAGE)
return false
}
return true
Expand All @@ -97,6 +151,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
companion object {
const val name = "attempt_shrink_step"
const val FAILURE_MESSAGE = "Shrink failed when sending shrink request."
const val NOT_ENOUGH_SPACE_FAILURE_MESSAGE = "Shrink failed as the selected node no longer had enough free space to shrink to."
const val INDEX_HEALTH_NOT_GREEN_MESSAGE = "Shrink delayed because index health is not green."
fun getSuccessMessage(newIndex: String) = "Shrink started. $newIndex currently being populated."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.admin.indices.stats.ShardStats
import org.opensearch.common.collect.ImmutableOpenIntMap
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.util.clearReadOnlyAndRouting
import org.opensearch.indexmanagement.indexstatemanagement.util.getActionStartTime
import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkLockModel
import org.opensearch.indexmanagement.indexstatemanagement.util.releaseShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.renewShrinkLock
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.transport.RemoteTransportException
Expand All @@ -36,10 +37,16 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
val actionMetadata = context.metadata.actionMetaData
val shrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
if (shrinkActionProperties == null) {
info = mapOf("message" to "Shrink action properties are null, metadata was not properly populated")
stepStatus = StepStatus.FAILED
cleanupAndFail(METADATA_FAILURE_MESSAGE)
return this
}
println(getShrinkLockModel(shrinkActionProperties, context.jobContext))
val lock = renewShrinkLock(shrinkActionProperties, context.jobContext, logger)
if (lock == null) {
cleanupAndFail("Failed to renew lock on node [${shrinkActionProperties.nodeName}]")
return this
}
println(lock)
try {
val indexStatsRequests: IndicesStatsRequest = IndicesStatsRequest().indices(indexName)
val response: IndicesStatsResponse = context.client.admin().indices().suspendUntil { stats(indexStatsRequests, it) }
Expand All @@ -57,7 +64,9 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
if (nodeNameShardIsOn.equals(nodeToMoveOnto) && routingInfo.started()) {
numShardsOnNode++
}
if (numReplicas == 0 || inSyncReplicaExists(routingInfo.id, inSyncAllocations)) {
// Either there must be no replicas (force unsafe must have been set) or all replicas must be in sync as
// it isn't known which shard (any replica or primary) will be moved to the target node and used in the shrink.
if (numReplicas == 0 || inSyncAllocations[routingInfo.id].size == (numReplicas + 1)) {
numShardsInSync++
}
}
Expand All @@ -68,23 +77,35 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
} else {
val numShardsNotOnNode = numPrimaryShards - numShardsOnNode
val numShardsNotInSync = numPrimaryShards - numShardsInSync
checkTimeOut(context, shrinkActionProperties, numShardsNotOnNode, numShardsNotInSync, nodeToMoveOnto)
checkTimeOut(context, numShardsNotOnNode, numShardsNotInSync, nodeToMoveOnto)
}
return this
} catch (e: RemoteTransportException) {
releaseShrinkLock(shrinkActionProperties, context.jobContext, logger)
info = mapOf("message" to FAILURE_MESSAGE)
stepStatus = StepStatus.FAILED
cleanupAndFail(FAILURE_MESSAGE)
return this
} catch (e: Exception) {
releaseShrinkLock(shrinkActionProperties, context.jobContext, logger)
info = mapOf("message" to FAILURE_MESSAGE, "cause" to "{${e.message}}")
stepStatus = StepStatus.FAILED
cleanupAndFail(FAILURE_MESSAGE, cause = e.message)
return this
}
}

private fun inSyncReplicaExists(shardId: Int, inSyncAllocations: ImmutableOpenIntMap<MutableSet<String>>): Boolean = inSyncAllocations[shardId].size > 1
// Sets the action to failed, clears the readonly and allocation settings on the source index, and releases the shrink lock
private suspend fun cleanupAndFail(message: String, cause: String? = null) {
info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause)
stepStatus = StepStatus.FAILED
val context = this.context ?: return
try {
clearReadOnlyAndRouting(context.metadata.index, context.client)
} catch (e: Exception) {
logger.error("Shrink action failed while trying to clean up routing and readonly setting after a failure: $e")
}
try {
val shrinkActionProperties = context.metadata.actionMetaData?.actionProperties?.shrinkActionProperties ?: return
releaseShrinkLock(shrinkActionProperties, context.jobContext, logger)
} catch (e: Exception) {
logger.error("Shrink action failed while trying to release the node lock after a failure: $e")
}
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
// Saving maxNumSegments in ActionProperties after the force merge operation has begun so that if a ChangePolicy occurred
Expand All @@ -100,7 +121,6 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {

private suspend fun checkTimeOut(
stepContext: StepContext,
shrinkActionProperties: ShrinkActionProperties,
numShardsNotOnNode: Int,
numShardsNotInSync: Int,
nodeToMoveOnto: String
Expand All @@ -110,23 +130,19 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
val timeSinceActionStarted: Duration = Duration.between(getActionStartTime(managedIndexMetadata), Instant.now())
val timeOutInSeconds = action.configTimeout?.timeout?.seconds ?: MOVE_SHARDS_TIMEOUT_IN_SECONDS
// Get ActionTimeout if given, otherwise use default timeout of 12 hours
stepStatus = if (timeSinceActionStarted.toSeconds() > timeOutInSeconds) {
if (timeSinceActionStarted.toSeconds() > timeOutInSeconds) {
logger.error(
"Shrink Action move shards failed on [$indexName], the action timed out with [$numShardsNotOnNode] shards not yet " +
"moved and [$numShardsNotInSync] shards without an in sync replica."
)
if (managedIndexMetadata.actionMetaData?.actionProperties?.shrinkActionProperties != null) {
releaseShrinkLock(shrinkActionProperties, stepContext.jobContext, logger)
}
info = mapOf("message" to getTimeoutFailure(nodeToMoveOnto))
StepStatus.FAILED
cleanupAndFail(getTimeoutFailure(nodeToMoveOnto))
} else {
logger.debug(
"Shrink action move shards step running on [$indexName], [$numShardsNotOnNode] shards need to be moved, " +
"[$numShardsNotInSync] shards need an in sync replica."
)
info = mapOf("message" to getTimeoutDelay(nodeToMoveOnto))
StepStatus.CONDITION_NOT_MET
stepStatus = StepStatus.CONDITION_NOT_MET
}
}

Expand All @@ -138,6 +154,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : Step(name) {
fun getTimeoutFailure(node: String) = "Shrink failed because it took to long to move shards to $node"
fun getTimeoutDelay(node: String) = "Shrink delayed because it took to long to move shards to $node"
const val FAILURE_MESSAGE = "Shrink failed when waiting for shards to move."
const val METADATA_FAILURE_MESSAGE = "Shrink action properties are null, metadata was not properly populated"
const val MOVE_SHARDS_TIMEOUT_IN_SECONDS = 43200L // 12hrs in seconds
const val RESOURCE_NAME = "node_name"
const val RESOURCE_TYPE = "shrink"
Expand Down
Loading

0 comments on commit 0373ba8

Please sign in to comment.