Skip to content

Commit

Permalink
Shrink action Fix (#718)
Browse files Browse the repository at this point in the history
* Fixed issues of shrink action

Signed-off-by: Angie Zhang <[email protected]>

* Fixed node lock release error: request should use value of "_id" instead of "job_id"

Signed-off-by: Angie Zhang <[email protected]>

* Fix review comments

Signed-off-by: Angie Zhang <[email protected]>

* Fix build error

Signed-off-by: Angie Zhang <[email protected]>

* Update getShrinkLockID

Signed-off-by: Angie Zhang <[email protected]>

* Added debug logs for Shard Moving Decisions

Signed-off-by: Angie Zhang <[email protected]>

* Fix detekt error

Signed-off-by: Angie Zhang <[email protected]>

* Fix failed test cases due to renew lock error

Signed-off-by: Angie Zhang <[email protected]>

* Fix failed test cases due to lock job id error

Signed-off-by: Angie Zhang <[email protected]>

* Fix review comments

Signed-off-by: Angie Zhang <[email protected]>

---------

Signed-off-by: Angie Zhang <[email protected]>
(cherry picked from commit 2a63f94)
  • Loading branch information
Angie Zhang authored and github-actions[bot] committed Apr 10, 2023
1 parent 04eaf1e commit cfffb6d
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ class ShrinkAction(
const val TARGET_INDEX_TEMPLATE_FIELD = "target_index_name_template"
const val ALIASES_FIELD = "aliases"
const val FORCE_UNSAFE_FIELD = "force_unsafe"
const val LOCK_RESOURCE_TYPE = "shrink"
const val LOCK_RESOURCE_NAME = "node_name"
const val LOCK_SOURCE_JOB_ID = "shrink-node_name"
fun getSecurityFailureMessage(failure: String) = "Shrink action failed because of missing permissions: $failure"
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import org.opensearch.action.admin.indices.shrink.ResizeRequest
import org.opensearch.action.admin.indices.shrink.ResizeResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink
import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeDiskSpaceAfterShrink
import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen
import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -29,15 +32,18 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
override suspend fun wrappedExecute(context: StepContext): AttemptShrinkStep {
val indexName = context.metadata.index
// If the returned shrinkActionProperties are null, then the status has been set to failed, just return
val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this
val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this

if (!isIndexGreen(context.client, indexName)) {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to INDEX_HEALTH_NOT_GREEN_MESSAGE)
return this
}

if (!isNodeStillSuitable(localShrinkActionProperties.nodeName, indexName, context)) return this

if (!confirmIndexWriteBlock(context, indexName)) return this

// If the resize index api fails, the step will be set to failed and resizeIndex will return false
if (!resizeIndex(indexName, localShrinkActionProperties, context)) return this
info = mapOf("message" to getSuccessMessage(localShrinkActionProperties.targetIndexName))
Expand Down Expand Up @@ -69,14 +75,34 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
cleanupAndFail(FAILURE_MESSAGE, "Shrink action failed as node stats were missing the previously selected node.")
return false
}
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings)
val remainingMem = getNodeFreeDiskSpaceAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings)
if (remainingMem < 1L) {
cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE, NOT_ENOUGH_SPACE_FAILURE_MESSAGE)
return false
}
return true
}

// Set index write block again before sending shrink request, in case of write block flipped by other processes in previous steps.
private suspend fun confirmIndexWriteBlock(stepContext: StepContext, indexName: String): Boolean {
val updateSettings = Settings.builder()
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)
.build()
var response: AcknowledgedResponse? = null
val isUpdateAcknowledged: Boolean

try {
response = issueUpdateSettingsRequest(stepContext.client, stepContext.metadata.index, updateSettings)
} finally {
isUpdateAcknowledged = response != null && response.isAcknowledged
}

if (!isUpdateAcknowledged) {
cleanupAndFail(WRITE_BLOCK_FAILED_MESSAGE, "Failed to confirm write block for index: [$indexName] before sending shrink request.")
}
return isUpdateAcknowledged
}

private suspend fun resizeIndex(sourceIndex: String, shrinkActionProperties: ShrinkActionProperties, context: StepContext): Boolean {
val targetIndex = shrinkActionProperties.targetIndexName
val req = ResizeRequest(targetIndex, sourceIndex)
Expand Down Expand Up @@ -113,6 +139,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
companion object {
const val name = "attempt_shrink_step"
const val FAILURE_MESSAGE = "Shrink failed when sending shrink request."
const val WRITE_BLOCK_FAILED_MESSAGE = "Failed to set write block before sending shrink request."
const val NOT_ENOUGH_SPACE_FAILURE_MESSAGE = "Shrink failed as the selected node no longer had enough free space to shrink to."
const val INDEX_HEALTH_NOT_GREEN_MESSAGE = "Shrink delayed because index health is not green."
fun getSuccessMessage(newIndex: String) = "Shrink started. $newIndex currently being populated."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,19 @@ abstract class ShrinkStep(

protected suspend fun cleanupAndFail(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) {
cleanupResources(cleanupSettings, cleanupLock, cleanupTargetIndex)
fail(infoMessage, logMessage, cause, e)
setStepFailed(infoMessage, logMessage, cause, e)
}

abstract fun getGenericFailureMessage(): String

abstract suspend fun wrappedExecute(context: StepContext): Step

@Suppress("ReturnCount")
protected suspend fun updateAndGetShrinkActionProperties(context: StepContext): ShrinkActionProperties? {
protected suspend fun checkShrinkActionPropertiesAndRenewLock(context: StepContext): ShrinkActionProperties? {
val actionMetadata = context.metadata.actionMetaData
var localShrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
shrinkActionProperties = localShrinkActionProperties
if (localShrinkActionProperties == null) {
cleanupAndFail(METADATA_FAILURE_MESSAGE, METADATA_FAILURE_MESSAGE)
setStepFailed(METADATA_FAILURE_MESSAGE, METADATA_FAILURE_MESSAGE)
return null
}
val lock = renewShrinkLock(localShrinkActionProperties, context.lockService, logger)
Expand All @@ -85,7 +84,7 @@ abstract class ShrinkStep(
return localShrinkActionProperties
}

protected fun fail(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) {
protected fun setStepFailed(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) {
if (logMessage != null) {
if (e != null) {
logger.error(logMessage, e)
Expand All @@ -95,7 +94,6 @@ abstract class ShrinkStep(
}
info = if (cause == null) mapOf("message" to infoMessage) else mapOf("message" to infoMessage, "cause" to cause)
stepStatus = StepStatus.FAILED
shrinkActionProperties = null
}

protected suspend fun cleanupResources(resetSettings: Boolean, releaseLock: Boolean, deleteTargetIndex: Boolean) {
Expand All @@ -104,6 +102,7 @@ abstract class ShrinkStep(
if (resetSettings) resetIndexSettings(localShrinkActionProperties)
if (deleteTargetIndex) deleteTargetIndex(localShrinkActionProperties)
if (releaseLock) releaseLock(localShrinkActionProperties)
shrinkActionProperties = null
} else {
logger.error("Shrink action failed to clean up resources due to null shrink action properties.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
override suspend fun wrappedExecute(context: StepContext): WaitForMoveShardsStep {
val indexName = context.metadata.index
// If the returned shrinkActionProperties are null, then the status has been set to failed, just return
val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this
val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this

val shardStats = getShardStats(indexName, context.client) ?: return this

Expand Down Expand Up @@ -87,7 +87,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
val response: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(indexStatsRequests, it) }
val shardStats = response.shards
if (shardStats == null) {
fail(AttemptMoveShardsStep.FAILURE_MESSAGE, "Failed to move shards in shrink action as shard stats were null.")
cleanupAndFail(AttemptMoveShardsStep.FAILURE_MESSAGE, "Failed to move shards in shrink action as shard stats were null.")
return null
}
return shardStats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
override suspend fun wrappedExecute(context: StepContext): WaitForShrinkStep {
val indexName = context.metadata.index
// If the returned shrinkActionProperties are null, then the status has been set to failed, just return
val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this
val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this

val targetIndex = localShrinkActionProperties.targetIndexName
if (shrinkNotDone(targetIndex, localShrinkActionProperties.targetNumShards, context.client, context.clusterService)) {
Expand All @@ -42,7 +42,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
if (!clearAllocationSettings(context, targetIndex)) return this
if (!resetReadOnlyAndRouting(indexName, context.client, localShrinkActionProperties.originalIndexSettings)) return this

if (!deleteShrinkLock(localShrinkActionProperties, context.lockService)) {
if (!deleteShrinkLock(localShrinkActionProperties, context.lockService, logger)) {
logger.error("Failed to delete Shrink action lock on node [${localShrinkActionProperties.nodeName}]")
}
stepStatus = StepStatus.COMPLETED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_RESOURCE_NAME
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_RESOURCE_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_SOURCE_JOB_ID
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptMoveShardsStep
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -52,9 +51,11 @@ suspend fun releaseShrinkLock(

suspend fun deleteShrinkLock(
shrinkActionProperties: ShrinkActionProperties,
lockService: LockService
lockService: LockService,
logger: Logger
): Boolean {
val lockID = getShrinkLockID(shrinkActionProperties.nodeName)
logger.info("Deleting lock: $lockID")
return lockService.suspendUntil { deleteLock(lockID, it) }
}

Expand Down Expand Up @@ -94,11 +95,11 @@ fun getShrinkLockModel(
lockSeqNo: Long,
lockDurationSecond: Long
): LockModel {
val lockID = getShrinkLockID(nodeName)
val jobID = getShrinkJobID(nodeName)
val lockCreationInstant: Instant = Instant.ofEpochSecond(lockEpochSecond)
return LockModel(
jobIndexName,
lockID,
jobID,
lockCreationInstant,
lockDurationSecond,
false,
Expand Down Expand Up @@ -163,7 +164,7 @@ fun getDiskSettings(clusterSettings: ClusterSettings): Settings {
* Returns the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, or -1
* if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats.
*/
fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long {
fun getNodeFreeDiskSpaceAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long {
val fsStats = node.fs
if (fsStats != null) {
val diskSpaceLeftInNode = fsStats.total.free.bytes
Expand Down Expand Up @@ -202,7 +203,14 @@ suspend fun resetReadOnlyAndRouting(index: String, client: Client, originalSetti
}

fun getShrinkLockID(nodeName: String): String {
return "$LOCK_RESOURCE_TYPE-$LOCK_RESOURCE_NAME-$nodeName"
return LockModel.generateLockId(
INDEX_MANAGEMENT_INDEX,
getShrinkJobID(nodeName)
)
}

fun getShrinkJobID(nodeName: String): String {
return "$LOCK_SOURCE_JOB_ID-$nodeName"
}

// Creates a map of shardIds to the set of node names which the shard copies reside on. For example, with 2 replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StepUtilsTests : OpenSearchTestCase() {
)
val lockModel = getShrinkLockModel(shrinkActionProperties)
assertEquals("Incorrect lock model job index name", INDEX_MANAGEMENT_INDEX, lockModel.jobIndexName)
assertEquals("Incorrect lock model jobID", getShrinkLockID(shrinkActionProperties.nodeName), lockModel.jobId)
assertEquals("Incorrect lock model jobID", getShrinkJobID(shrinkActionProperties.nodeName), lockModel.jobId)
assertEquals("Incorrect lock model duration", shrinkActionProperties.lockDurationSecond, lockModel.lockDurationSeconds)
assertEquals("Incorrect lock model lockID", "${lockModel.jobIndexName}-${lockModel.jobId}", lockModel.lockId)
assertEquals("Incorrect lock model sequence number", shrinkActionProperties.lockSeqNo, lockModel.seqNo)
Expand Down Expand Up @@ -129,9 +129,9 @@ class StepUtilsTests : OpenSearchTestCase() {
val clusterSettings = ClusterSettings(settings.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
val remainingSpace = freeBytes - ((2 * indexSize) + threshold)
if (remainingSpace > 0) {
assertEquals(remainingSpace, getNodeFreeMemoryAfterShrink(nodeStats, indexSize, clusterSettings))
assertEquals(remainingSpace, getNodeFreeDiskSpaceAfterShrink(nodeStats, indexSize, clusterSettings))
} else {
assertEquals(-1L, getNodeFreeMemoryAfterShrink(nodeStats, indexSize, clusterSettings))
assertEquals(-1L, getNodeFreeDiskSpaceAfterShrink(nodeStats, indexSize, clusterSettings))
}
}
}

0 comments on commit cfffb6d

Please sign in to comment.