Skip to content

Commit

Permalink
Adds shrink action to ISM (opensearch-project#326)
Browse files Browse the repository at this point in the history
* Updates shrink action to new interface

Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob authored Apr 16, 2022
1 parent 95b900f commit 3a08383
Show file tree
Hide file tree
Showing 35 changed files with 2,371 additions and 64 deletions.
1 change: 1 addition & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ configurations.all {

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,34 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentFragment
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.spi.indexstatemanagement.addObject

/** Properties that will persist across steps of a single Action. Will be stored in the [ActionMetaData]. */
// TODO: Create namespaces to group properties together
data class ActionProperties(
val maxNumSegments: Int? = null,
val snapshotName: String? = null,
val rollupId: String? = null,
val hasRollupFailed: Boolean? = null
val hasRollupFailed: Boolean? = null,
val shrinkActionProperties: ShrinkActionProperties? = null
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalInt(maxNumSegments)
out.writeOptionalString(snapshotName)
out.writeOptionalString(rollupId)
out.writeOptionalBoolean(hasRollupFailed)
out.writeOptionalWriteable(shrinkActionProperties)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
if (maxNumSegments != null) builder.field(Properties.MAX_NUM_SEGMENTS.key, maxNumSegments)
if (snapshotName != null) builder.field(Properties.SNAPSHOT_NAME.key, snapshotName)
if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId)
if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed)
if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params)
return builder
}

Expand All @@ -46,18 +51,19 @@ data class ActionProperties(
val snapshotName: String? = si.readOptionalString()
val rollupId: String? = si.readOptionalString()
val hasRollupFailed: Boolean? = si.readOptionalBoolean()

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed)
val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
}

fun parse(xcp: XContentParser): ActionProperties {
var maxNumSegments: Int? = null
var snapshotName: String? = null
var rollupId: String? = null
var hasRollupFailed: Boolean? = null
var shrinkActionProperties: ShrinkActionProperties? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

Expand All @@ -66,10 +72,13 @@ data class ActionProperties(
Properties.SNAPSHOT_NAME.key -> snapshotName = xcp.text()
Properties.ROLLUP_ID.key -> rollupId = xcp.text()
Properties.HAS_ROLLUP_FAILED.key -> hasRollupFailed = xcp.booleanValue()
ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> {
shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp)
}
}
}

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.model

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentFragment
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils

data class ShrinkActionProperties(
val nodeName: String,
val targetIndexName: String,
val targetNumShards: Int,
val lockPrimaryTerm: Long,
val lockSeqNo: Long,
val lockEpochSecond: Long,
val lockDurationSecond: Long,
// Used to store the original index allocation and write block setting to reapply after shrink
val originalIndexSettings: Map<String, String>
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeString(nodeName)
out.writeString(targetIndexName)
out.writeInt(targetNumShards)
out.writeLong(lockPrimaryTerm)
out.writeLong(lockSeqNo)
out.writeLong(lockEpochSecond)
out.writeLong(lockDurationSecond)
out.writeMap(originalIndexSettings)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.field(ShrinkProperties.NODE_NAME.key, nodeName)
builder.field(ShrinkProperties.TARGET_INDEX_NAME.key, targetIndexName)
builder.field(ShrinkProperties.TARGET_NUM_SHARDS.key, targetNumShards)
builder.field(ShrinkProperties.LOCK_SEQ_NO.key, lockSeqNo)
builder.field(ShrinkProperties.LOCK_PRIMARY_TERM.key, lockPrimaryTerm)
builder.field(ShrinkProperties.LOCK_EPOCH_SECOND.key, lockEpochSecond)
builder.field(ShrinkProperties.LOCK_DURATION_SECOND.key, lockDurationSecond)
builder.field(ShrinkProperties.ORIGINAL_INDEX_SETTINGS.key, originalIndexSettings)
return builder
}

companion object {
const val SHRINK_ACTION_PROPERTIES = "shrink_action_properties"

fun fromStreamInput(si: StreamInput): ShrinkActionProperties {
val nodeName: String = si.readString()
val targetIndexName: String = si.readString()
val targetNumShards: Int = si.readInt()
val lockPrimaryTerm: Long = si.readLong()
val lockSeqNo: Long = si.readLong()
val lockEpochSecond: Long = si.readLong()
val lockDurationSecond: Long = si.readLong()
val originalIndexSettings: Map<String, String> = si.readMap({ it.readString() }, { it.readString() })

return ShrinkActionProperties(
nodeName, targetIndexName, targetNumShards, lockPrimaryTerm, lockSeqNo, lockEpochSecond, lockDurationSecond, originalIndexSettings
)
}

fun parse(xcp: XContentParser): ShrinkActionProperties {
var nodeName: String? = null
var targetIndexName: String? = null
var targetNumShards: Int? = null
var lockPrimaryTerm: Long? = null
var lockSeqNo: Long? = null
var lockEpochSecond: Long? = null
var lockDurationSecond: Long? = null
var originalIndexSettings: Map<String, String>? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
ShrinkProperties.NODE_NAME.key -> nodeName = xcp.text()
ShrinkProperties.TARGET_INDEX_NAME.key -> targetIndexName = xcp.text()
ShrinkProperties.TARGET_NUM_SHARDS.key -> targetNumShards = xcp.intValue()
ShrinkProperties.LOCK_PRIMARY_TERM.key -> lockPrimaryTerm = xcp.longValue()
ShrinkProperties.LOCK_SEQ_NO.key -> lockSeqNo = xcp.longValue()
ShrinkProperties.LOCK_EPOCH_SECOND.key -> lockEpochSecond = xcp.longValue()
ShrinkProperties.LOCK_DURATION_SECOND.key -> lockDurationSecond = xcp.longValue()
ShrinkProperties.ORIGINAL_INDEX_SETTINGS.key -> originalIndexSettings = xcp.mapStrings()
}
}

return ShrinkActionProperties(
requireNotNull(nodeName),
requireNotNull(targetIndexName),
requireNotNull(targetNumShards),
requireNotNull(lockPrimaryTerm),
requireNotNull(lockSeqNo),
requireNotNull(lockEpochSecond),
requireNotNull(lockDurationSecond),
requireNotNull(originalIndexSettings)
)
}
}

enum class ShrinkProperties(val key: String) {
NODE_NAME("node_name"),
TARGET_INDEX_NAME("target_index_name"),
TARGET_NUM_SHARDS("target_num_shards"),
LOCK_SEQ_NO("lock_seq_no"),
LOCK_PRIMARY_TERM("lock_primary_term"),
LOCK_EPOCH_SECOND("lock_epoch_second"),
LOCK_DURATION_SECOND("lock_duration_second"),
ORIGINAL_INDEX_SETTINGS("original_index_settings")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.commons.authuser.User
import org.opensearch.jobscheduler.spi.utils.LockService
import org.opensearch.script.ScriptService

class StepContext(
Expand All @@ -20,8 +21,9 @@ class StepContext(
val user: User?,
val scriptService: ScriptService,
val settings: Settings,
val lockService: LockService
) {
fun getUpdatedContext(metadata: ManagedIndexMetaData): StepContext {
return StepContext(metadata, this.clusterService, this.client, this.threadContext, this.user, this.scriptService, this.settings)
return StepContext(metadata, this.clusterService, this.client, this.threadContext, this.user, this.scriptService, this.settings, this.lockService)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ReadWriteActio
import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
Expand All @@ -45,6 +46,7 @@ class ISMActionsParser private constructor() {
ReplicaCountActionParser(),
RollupActionParser(),
RolloverActionParser(),
ShrinkActionParser(),
SnapshotActionParser()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ object ManagedIndexRunner :
if (lock == null) {
logger.debug("Could not acquire lock [${lock?.lockId}] for ${job.index}")
} else {
runManagedIndexConfig(job)
runManagedIndexConfig(job, context)
// Release lock
val released: Boolean = context.lockService.suspendUntil { release(lock, it) }
if (!released) {
Expand All @@ -231,7 +231,7 @@ object ManagedIndexRunner :
}

@Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth")
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig) {
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) {
logger.debug("Run job for index ${managedIndexConfig.index}")
// doing a check of local cluster health as we do not want to overload master node with potentially a lot of calls
if (clusterIsRed()) {
Expand Down Expand Up @@ -304,7 +304,9 @@ object ManagedIndexRunner :

val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider)
val stepContext = StepContext(managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings)
val stepContext = StepContext(
managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, jobContext.lockService
)
val step: Step? = action?.getStepToExecute(stepContext)
val currentActionMetaData = action?.getUpdatedActionMetadata(managedIndexMetaData, state.name)

Expand Down
Loading

0 comments on commit 3a08383

Please sign in to comment.