Skip to content

Commit

Permalink
Fixes locking
Browse files Browse the repository at this point in the history
Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob committed Apr 7, 2022
1 parent 0373ba8 commit f866e9a
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class ShrinkAction(
if (maxShardSize != null) {
require(maxShardSize.bytes > 0) { "Shrink action maxShardSize must be greater than 0." }
} else if (percentageOfSourceShards != null) {
require(percentageOfSourceShards > 0.0 && percentageOfSourceShards < 1.0) { "Percentage of source shards must be between 0.0 and 1.0 exclusively" }
require(percentageOfSourceShards > 0.0 && percentageOfSourceShards < 1.0) {
"Percentage of source shards must be between 0.0 and 1.0 exclusively"
}
} else if (numNewShards != null) {
require(numNewShards > 0) { "Shrink action numNewShards must be greater than 0." }
}
Expand Down Expand Up @@ -116,5 +118,7 @@ class ShrinkAction(
const val TARGET_INDEX_SUFFIX_FIELD = "target_index_suffix"
const val ALIASES_FIELD = "aliases"
const val FORCE_UNSAFE_FIELD = "force_unsafe"
const val LOCK_RESOURCE_TYPE = "shrink"
const val LOCK_RESOURCE_NAME = "node_name"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.Tuple
import org.opensearch.common.hash.MurmurHash3
import org.opensearch.common.hash.MurmurHash3.Hash128
import org.opensearch.common.settings.Settings
import org.opensearch.index.shard.DocsStats
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.util.getIntervalFromManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.util.getManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink
import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkLockModel
import org.opensearch.indexmanagement.indexstatemanagement.util.getShrinkLockID
import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen
import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
Expand All @@ -41,11 +39,6 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaDat
import org.opensearch.jobscheduler.repackage.com.cronutils.utils.VisibleForTesting
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import java.io.ByteArrayOutputStream
import java.io.ObjectOutputStream
import java.io.Serializable
import java.nio.ByteBuffer
import java.util.Base64
import java.util.PriorityQueue
import kotlin.math.ceil
import kotlin.math.floor
Expand Down Expand Up @@ -106,16 +99,15 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {

// Get the job interval to use in determining the lock length
val interval = getJobIntervalSeconds(context.metadata.indexUuid, client)

// iterate through the nodes and try to acquire a lock on one
val lock = acquireLockFromNodeList(context.jobContext, suitableNodes, interval)
if (lock == null) {
val lockToNodeName: Pair<LockModel, String>? = acquireLockFromNodeList(context.jobContext, suitableNodes, interval)
if (lockToNodeName == null) {
logger.info("$indexName could not find available node to shrink onto.")
info = mapOf("message" to NO_AVAILABLE_NODES_MESSAGE)
stepStatus = StepStatus.CONDITION_NOT_MET
return this
}
val nodeName = lock.resource[RESOURCE_NAME] as String
val (lock, nodeName) = lockToNodeName
shrinkActionProperties = ShrinkActionProperties(
nodeName,
shrinkTargetIndexName,
Expand All @@ -125,44 +117,6 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
lock.lockTime.epochSecond,
lock.lockDurationSeconds
)
println(lock)
var lockToReacquire = getShrinkLockModel(shrinkActionProperties!!, context.jobContext)
println(lockToReacquire)
// lock ids are not the same!
println(lock.lockId == lockToReacquire.lockId)
println(lock.lockId)

val out = ByteArrayOutputStream()
val os = ObjectOutputStream(out)
os.writeObject(lock.resource as Map<String, Serializable>)
val resourceAsBytes = out.toByteArray()
val hash = MurmurHash3.hash128(
resourceAsBytes, 0, resourceAsBytes.size, 0,
Hash128()
)
val resourceHashBytes = ByteBuffer.allocate(16).putLong(hash.h1).putLong(hash.h2).array()
val resourceAsHashString = Base64.getUrlEncoder().withoutPadding().encodeToString(resourceHashBytes)
println(resourceAsHashString)

val out2 = ByteArrayOutputStream()
val os2 = ObjectOutputStream(out2)
os2.writeObject(lockToReacquire.resource as Map<String, Serializable>)
val resourceAsBytes2 = out2.toByteArray()
val hash2 = MurmurHash3.hash128(
resourceAsBytes2, 0, resourceAsBytes2.size, 0,
Hash128()
)
val resourceHashBytes2 = ByteBuffer.allocate(16).putLong(hash2.h1).putLong(hash2.h2).array()
val resourceAsHashString2 = Base64.getUrlEncoder().withoutPadding().encodeToString(resourceHashBytes2)
println(resourceAsHashString2)

println(lockToReacquire.lockId)
try {
lockToReacquire = context.jobContext.lockService.suspendUntil { renewLock(lockToReacquire, it) }
} catch (e: Exception) {
println(e)
}
println(lockToReacquire)

setToReadOnlyAndMoveIndexToNode(context, nodeName, lock)
info = mapOf("message" to getSuccessMessage(nodeName))
Expand All @@ -180,6 +134,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
private fun fail(message: String, cause: String? = null) {
info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause)
stepStatus = StepStatus.FAILED
shrinkActionProperties = null
}

private suspend fun getJobIntervalSeconds(indexUuid: String, client: Client): Long? {
Expand All @@ -191,7 +146,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
return null
}
// Divide the interval by 1000 to convert from ms to seconds
return managedIndexConfig?.let { getIntervalFromManagedIndexConfig(it) / 1000L }
return managedIndexConfig?.let { getIntervalFromManagedIndexConfig(it) / MILLISECONDS_IN_SECOND }
}

private fun shouldFailTooManyDocuments(docsStats: DocsStats, numTargetShards: Int): Boolean {
Expand All @@ -208,6 +163,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
* Returns whether the action should fail due to being unsafe. The action is unsafe if there are no replicas. If forceUnsafe
* is set, then this always returns false.
*/
@Suppress("ReturnCount")
private fun shouldFailUnsafe(clusterService: ClusterService, indexName: String): Boolean {
// If forceUnsafe is set and is true, then we don't even need to check the number of replicas
if (action.forceUnsafe == true) return false
Expand All @@ -229,41 +185,45 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
return false
}

private suspend fun setToReadOnlyAndMoveIndexToNode(stepContext: StepContext, node: String, lock: LockModel) {
private suspend fun setToReadOnlyAndMoveIndexToNode(stepContext: StepContext, node: String, lock: LockModel): Boolean {
val updateSettings = Settings.builder()
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)
.put(ROUTING_SETTING, node)
.build()
val jobContext = stepContext.jobContext
var response: AcknowledgedResponse? = null
val isUpdateAcknowledged: Boolean
try {
val response: AcknowledgedResponse = issueUpdateSettingsRequest(stepContext.client, stepContext.metadata.index, updateSettings)
if (!response.isAcknowledged) {
response = issueUpdateSettingsRequest(stepContext.client, stepContext.metadata.index, updateSettings)
} finally {
isUpdateAcknowledged = response != null && response.isAcknowledged
if (!isUpdateAcknowledged) {
fail(UPDATE_FAILED_MESSAGE)
jobContext.lockService.suspendUntil<Boolean> { release(lock, it) }
val released: Boolean = jobContext.lockService.suspendUntil { release(lock, it) }
if (!released) {
logger.error("Failed to release Shrink action lock on node [$node]")
}
}
} catch (e: Exception) {
stepStatus = StepStatus.FAILED
handleException(e, UPDATE_FAILED_MESSAGE)
jobContext.lockService.suspendUntil<Boolean> { release(lock, it) }
}
return isUpdateAcknowledged
}

/*
* Iterates through each suitable node in order, attempting to acquire a resource lock. Returns the first lock which
* is successfully acquired.
* is successfully acquired and the name of the node it acquired the lock on in a pair.
*/
private suspend fun acquireLockFromNodeList(jobContext: JobExecutionContext, suitableNodes: List<String>, jobIntervalSeconds: Long?): LockModel? {
for (node in suitableNodes) {
val nodeResourceObject = mapOf(RESOURCE_NAME to node)
// If we couldn't get the job interval for the lock, use the default of 12 hours.
// Lock is 3x + 30 minutes the job interval to allow the next step's execution to extend the lock without losing it.
// If user sets maximum jitter, it could be 2x the job interval before the next step is executed.
val lockTime = jobIntervalSeconds?.let { (it * 3) + (30 * 60) } ?: DEFAULT_LOCK_INTERVAL
private suspend fun acquireLockFromNodeList(
jobContext: JobExecutionContext,
suitableNodes: List<String>,
jobIntervalSeconds: Long?
): Pair<LockModel, String>? {
for (nodeName in suitableNodes) {
val lockID = getShrinkLockID(nodeName)
val lock: LockModel? = jobContext.lockService.suspendUntil {
acquireLockOnResource(jobContext, lockTime, RESOURCE_TYPE, nodeResourceObject, it)
acquireLockWithId(jobContext.jobIndexName, getShrinkLockDuration(jobIntervalSeconds), lockID, it)
}
if (lock != null) {
return lock
return lock to nodeName
}
}
return null
Expand Down Expand Up @@ -374,15 +334,6 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
return n
}

private fun handleException(e: Exception, message: String) {
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetadata.actionMetaData
// If we succeeded because there was only one source primary shard, we no-op by skipping to the last step
Expand All @@ -408,22 +359,28 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
companion object {
const val OS_METRIC = "os"
const val ROUTING_SETTING = "index.routing.allocation.require._name"
const val RESOURCE_NAME = "node_name"
const val DEFAULT_TARGET_SUFFIX = "_shrunken"
const val name = "attempt_move_shards_step"
const val RESOURCE_TYPE = "shrink"
const val UPDATE_FAILED_MESSAGE = "Shrink failed because settings could not be updated.."
const val UPDATE_FAILED_MESSAGE = "Shrink failed because shard settings could not be updated."
const val NO_AVAILABLE_NODES_MESSAGE =
"There are no available nodes for to move to to execute a shrink. Delaying until node becomes available."
const val DEFAULT_LOCK_INTERVAL = 3L * 60L * 60L // Default lock interval is 3 hours in seconds
const val UNSAFE_FAILURE_MESSAGE = "Shrink failed because index has no replicas and force_unsafe is not set to true."
const val ONE_PRIMARY_SHARD_MESSAGE = "Shrink action did not do anything because source index only has one primary shard."
const val TOO_MANY_DOCS_FAILURE_MESSAGE = "Shrink failed because there would be too many documents on each target shard following the shrink."
const val INDEX_NOT_GREEN_MESSAGE = "Shrink action cannot start moving shards as the index is not green."
const val FAILURE_MESSAGE = "Shrink failed to start moving shards."
private const val DEFAULT_LOCK_INTERVAL = 3L * 60L * 60L // Default lock interval is 3 hours in seconds
private const val MILLISECONDS_IN_SECOND = 1000
private const val JOB_INTERVAL_LOCK_MULTIPLIER = 3
private const val LOCK_BUFFER_SECONDS = 1800
private const val MAXIMUM_DOCS_PER_SHARD = 0x80000000 // The maximum number of documents per shard is 2^31
fun getSuccessMessage(node: String) = "Successfully started moving the shards to $node."
fun getIndexExistsMessage(newIndex: String) = "Shrink failed because $newIndex already exists."
fun getSecurityFailureMessage(failure: String) = "Shrink action failed because of missing permissions: $failure"
// If we couldn't get the job interval for the lock, use the default of 12 hours.
// Lock is 3x + 30 minutes the job interval to allow the next step's execution to extend the lock without losing it.
// If user sets maximum jitter, it could be 2x the job interval before the next step is executed.
private fun getShrinkLockDuration(jobInterval: Long?) = jobInterval?.let { (it * JOB_INTERVAL_LOCK_MULTIPLIER) + LOCK_BUFFER_SECONDS }
?: DEFAULT_LOCK_INTERVAL
}
}
Loading

0 comments on commit f866e9a

Please sign in to comment.