Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shrink action Fix #718

Merged
merged 11 commits into from
Apr 10, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ class ShrinkAction(
const val TARGET_INDEX_TEMPLATE_FIELD = "target_index_name_template"
const val ALIASES_FIELD = "aliases"
const val FORCE_UNSAFE_FIELD = "force_unsafe"
const val LOCK_RESOURCE_TYPE = "shrink"
const val LOCK_RESOURCE_NAME = "node_name"
const val LOCK_SOURCE_JOB_ID = "shrink-node_name"
fun getSecurityFailureMessage(failure: String) = "Shrink action failed because of missing permissions: $failure"
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import org.opensearch.action.admin.indices.shrink.ResizeRequest
import org.opensearch.action.admin.indices.shrink.ResizeResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink
import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeDiskSpaceAfterShrink
import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen
import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -29,15 +32,18 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
override suspend fun wrappedExecute(context: StepContext): AttemptShrinkStep {
val indexName = context.metadata.index
// If the returned shrinkActionProperties are null, then the status has been set to failed, just return
val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this
val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this

if (!isIndexGreen(context.client, indexName)) {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to INDEX_HEALTH_NOT_GREEN_MESSAGE)
return this
}

if (!isNodeStillSuitable(localShrinkActionProperties.nodeName, indexName, context)) return this

if (!confirmIndexWriteBlock(context, indexName)) return this

// If the resize index api fails, the step will be set to failed and resizeIndex will return false
if (!resizeIndex(indexName, localShrinkActionProperties, context)) return this
info = mapOf("message" to getSuccessMessage(localShrinkActionProperties.targetIndexName))
Expand Down Expand Up @@ -69,14 +75,34 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
cleanupAndFail(FAILURE_MESSAGE, "Shrink action failed as node stats were missing the previously selected node.")
return false
}
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings)
val remainingMem = getNodeFreeDiskSpaceAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings)
if (remainingMem < 1L) {
cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE, NOT_ENOUGH_SPACE_FAILURE_MESSAGE)
return false
}
return true
}

// Set index write block again before sending shrink request, in case of write block flipped by other processes in previous steps.
private suspend fun confirmIndexWriteBlock(stepContext: StepContext, indexName: String): Boolean {
val updateSettings = Settings.builder()
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)
.build()
var response: AcknowledgedResponse? = null
val isUpdateAcknowledged: Boolean

try {
response = issueUpdateSettingsRequest(stepContext.client, stepContext.metadata.index, updateSettings)
} finally {
isUpdateAcknowledged = response != null && response.isAcknowledged
}

if (!isUpdateAcknowledged) {
cleanupAndFail(WRITE_BLOCK_FAILED_MESSAGE, "Failed to confirm write block for index: [$indexName] before sending shrink request.")
Angie-Zhang marked this conversation as resolved.
Show resolved Hide resolved
}
return isUpdateAcknowledged
}

private suspend fun resizeIndex(sourceIndex: String, shrinkActionProperties: ShrinkActionProperties, context: StepContext): Boolean {
val targetIndex = shrinkActionProperties.targetIndexName
val req = ResizeRequest(targetIndex, sourceIndex)
Expand Down Expand Up @@ -113,6 +139,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
companion object {
const val name = "attempt_shrink_step"
const val FAILURE_MESSAGE = "Shrink failed when sending shrink request."
const val WRITE_BLOCK_FAILED_MESSAGE = "Failed to set write block before sending shrink request."
const val NOT_ENOUGH_SPACE_FAILURE_MESSAGE = "Shrink failed as the selected node no longer had enough free space to shrink to."
const val INDEX_HEALTH_NOT_GREEN_MESSAGE = "Shrink delayed because index health is not green."
fun getSuccessMessage(newIndex: String) = "Shrink started. $newIndex currently being populated."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,19 @@ abstract class ShrinkStep(

protected suspend fun cleanupAndFail(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) {
cleanupResources(cleanupSettings, cleanupLock, cleanupTargetIndex)
fail(infoMessage, logMessage, cause, e)
setStepFailed(infoMessage, logMessage, cause, e)
}

abstract fun getGenericFailureMessage(): String

abstract suspend fun wrappedExecute(context: StepContext): Step

@Suppress("ReturnCount")
protected suspend fun updateAndGetShrinkActionProperties(context: StepContext): ShrinkActionProperties? {
protected suspend fun checkShrinkActionPropertiesAndRenewLock(context: StepContext): ShrinkActionProperties? {
val actionMetadata = context.metadata.actionMetaData
var localShrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
shrinkActionProperties = localShrinkActionProperties
if (localShrinkActionProperties == null) {
cleanupAndFail(METADATA_FAILURE_MESSAGE, METADATA_FAILURE_MESSAGE)
setStepFailed(METADATA_FAILURE_MESSAGE, METADATA_FAILURE_MESSAGE)
return null
}
val lock = renewShrinkLock(localShrinkActionProperties, context.lockService, logger)
Expand All @@ -85,7 +84,7 @@ abstract class ShrinkStep(
return localShrinkActionProperties
}

protected fun fail(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) {
protected fun setStepFailed(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) {
if (logMessage != null) {
if (e != null) {
logger.error(logMessage, e)
Expand All @@ -95,7 +94,6 @@ abstract class ShrinkStep(
}
info = if (cause == null) mapOf("message" to infoMessage) else mapOf("message" to infoMessage, "cause" to cause)
stepStatus = StepStatus.FAILED
shrinkActionProperties = null
}

protected suspend fun cleanupResources(resetSettings: Boolean, releaseLock: Boolean, deleteTargetIndex: Boolean) {
Expand All @@ -104,6 +102,7 @@ abstract class ShrinkStep(
if (resetSettings) resetIndexSettings(localShrinkActionProperties)
if (deleteTargetIndex) deleteTargetIndex(localShrinkActionProperties)
if (releaseLock) releaseLock(localShrinkActionProperties)
shrinkActionProperties = null
} else {
logger.error("Shrink action failed to clean up resources due to null shrink action properties.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
override suspend fun wrappedExecute(context: StepContext): WaitForMoveShardsStep {
val indexName = context.metadata.index
// If the returned shrinkActionProperties are null, then the status has been set to failed, just return
val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this
val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this

val shardStats = getShardStats(indexName, context.client) ?: return this

Expand Down Expand Up @@ -87,7 +87,7 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
val response: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(indexStatsRequests, it) }
val shardStats = response.shards
if (shardStats == null) {
fail(AttemptMoveShardsStep.FAILURE_MESSAGE, "Failed to move shards in shrink action as shard stats were null.")
cleanupAndFail(AttemptMoveShardsStep.FAILURE_MESSAGE, "Failed to move shards in shrink action as shard stats were null.")
return null
}
return shardStats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
override suspend fun wrappedExecute(context: StepContext): WaitForShrinkStep {
val indexName = context.metadata.index
// If the returned shrinkActionProperties are null, then the status has been set to failed, just return
val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this
val localShrinkActionProperties = checkShrinkActionPropertiesAndRenewLock(context) ?: return this

val targetIndex = localShrinkActionProperties.targetIndexName
if (shrinkNotDone(targetIndex, localShrinkActionProperties.targetNumShards, context.client, context.clusterService)) {
Expand All @@ -42,7 +42,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
if (!clearAllocationSettings(context, targetIndex)) return this
if (!resetReadOnlyAndRouting(indexName, context.client, localShrinkActionProperties.originalIndexSettings)) return this

if (!deleteShrinkLock(localShrinkActionProperties, context.lockService)) {
if (!deleteShrinkLock(localShrinkActionProperties, context.lockService, logger)) {
logger.error("Failed to delete Shrink action lock on node [${localShrinkActionProperties.nodeName}]")
}
stepStatus = StepStatus.COMPLETED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_RESOURCE_NAME
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_RESOURCE_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.LOCK_SOURCE_JOB_ID
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptMoveShardsStep
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -52,9 +51,11 @@ suspend fun releaseShrinkLock(

suspend fun deleteShrinkLock(
shrinkActionProperties: ShrinkActionProperties,
lockService: LockService
lockService: LockService,
logger: Logger
): Boolean {
val lockID = getShrinkLockID(shrinkActionProperties.nodeName)
logger.info("Deleting lock: $lockID")
return lockService.suspendUntil { deleteLock(lockID, it) }
}

Expand Down Expand Up @@ -94,11 +95,11 @@ fun getShrinkLockModel(
lockSeqNo: Long,
lockDurationSecond: Long
): LockModel {
val lockID = getShrinkLockID(nodeName)
val jobID = getShrinkJobID(nodeName)
val lockCreationInstant: Instant = Instant.ofEpochSecond(lockEpochSecond)
return LockModel(
jobIndexName,
lockID,
jobID,
lockCreationInstant,
lockDurationSecond,
false,
Expand Down Expand Up @@ -163,7 +164,7 @@ fun getDiskSettings(clusterSettings: ClusterSettings): Settings {
* Returns the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, or -1
* if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats.
*/
fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long {
fun getNodeFreeDiskSpaceAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long {
val fsStats = node.fs
if (fsStats != null) {
val diskSpaceLeftInNode = fsStats.total.free.bytes
Expand Down Expand Up @@ -202,7 +203,14 @@ suspend fun resetReadOnlyAndRouting(index: String, client: Client, originalSetti
}

fun getShrinkLockID(nodeName: String): String {
return "$LOCK_RESOURCE_TYPE-$LOCK_RESOURCE_NAME-$nodeName"
return LockModel.generateLockId(
Angie-Zhang marked this conversation as resolved.
Show resolved Hide resolved
INDEX_MANAGEMENT_INDEX,
getShrinkJobID(nodeName)
)
}

fun getShrinkJobID(nodeName: String): String {
return "$LOCK_SOURCE_JOB_ID-$nodeName"
}

// Creates a map of shardIds to the set of node names which the shard copies reside on. For example, with 2 replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StepUtilsTests : OpenSearchTestCase() {
)
val lockModel = getShrinkLockModel(shrinkActionProperties)
assertEquals("Incorrect lock model job index name", INDEX_MANAGEMENT_INDEX, lockModel.jobIndexName)
assertEquals("Incorrect lock model jobID", getShrinkLockID(shrinkActionProperties.nodeName), lockModel.jobId)
assertEquals("Incorrect lock model jobID", getShrinkJobID(shrinkActionProperties.nodeName), lockModel.jobId)
assertEquals("Incorrect lock model duration", shrinkActionProperties.lockDurationSecond, lockModel.lockDurationSeconds)
assertEquals("Incorrect lock model lockID", "${lockModel.jobIndexName}-${lockModel.jobId}", lockModel.lockId)
assertEquals("Incorrect lock model sequence number", shrinkActionProperties.lockSeqNo, lockModel.seqNo)
Expand Down Expand Up @@ -129,9 +129,9 @@ class StepUtilsTests : OpenSearchTestCase() {
val clusterSettings = ClusterSettings(settings.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
val remainingSpace = freeBytes - ((2 * indexSize) + threshold)
if (remainingSpace > 0) {
assertEquals(remainingSpace, getNodeFreeMemoryAfterShrink(nodeStats, indexSize, clusterSettings))
assertEquals(remainingSpace, getNodeFreeDiskSpaceAfterShrink(nodeStats, indexSize, clusterSettings))
} else {
assertEquals(-1L, getNodeFreeMemoryAfterShrink(nodeStats, indexSize, clusterSettings))
assertEquals(-1L, getNodeFreeDiskSpaceAfterShrink(nodeStats, indexSize, clusterSettings))
}
}
}