Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds shrink action to ISM #326

Merged
merged 13 commits into from
Apr 16, 2022
1 change: 1 addition & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ configurations.all {

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,34 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentFragment
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.spi.indexstatemanagement.addObject

/** Properties that will persist across steps of a single Action. Will be stored in the [ActionMetaData]. */
// TODO: Create namespaces to group properties together
data class ActionProperties(
val maxNumSegments: Int? = null,
val snapshotName: String? = null,
val rollupId: String? = null,
val hasRollupFailed: Boolean? = null
val hasRollupFailed: Boolean? = null,
val shrinkActionProperties: ShrinkActionProperties? = null
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalInt(maxNumSegments)
out.writeOptionalString(snapshotName)
out.writeOptionalString(rollupId)
out.writeOptionalBoolean(hasRollupFailed)
out.writeOptionalWriteable(shrinkActionProperties)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
if (maxNumSegments != null) builder.field(Properties.MAX_NUM_SEGMENTS.key, maxNumSegments)
if (snapshotName != null) builder.field(Properties.SNAPSHOT_NAME.key, snapshotName)
if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId)
if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed)
if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params)
return builder
}

Expand All @@ -46,18 +51,19 @@ data class ActionProperties(
val snapshotName: String? = si.readOptionalString()
val rollupId: String? = si.readOptionalString()
val hasRollupFailed: Boolean? = si.readOptionalBoolean()

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed)
val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
}

fun parse(xcp: XContentParser): ActionProperties {
var maxNumSegments: Int? = null
var snapshotName: String? = null
var rollupId: String? = null
var hasRollupFailed: Boolean? = null
var shrinkActionProperties: ShrinkActionProperties? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

Expand All @@ -66,10 +72,13 @@ data class ActionProperties(
Properties.SNAPSHOT_NAME.key -> snapshotName = xcp.text()
Properties.ROLLUP_ID.key -> rollupId = xcp.text()
Properties.HAS_ROLLUP_FAILED.key -> hasRollupFailed = xcp.booleanValue()
ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> {
shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp)
}
}
}

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.model

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentFragment
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils

data class ShrinkActionProperties(
val nodeName: String,
val targetIndexName: String,
val targetNumShards: Int,
val lockPrimaryTerm: Long,
val lockSeqNo: Long,
val lockEpochSecond: Long,
val lockDurationSecond: Long,
// Used to store the original index allocation and write block setting to reapply after shrink
val originalIndexSettings: Map<String, String>
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeString(nodeName)
out.writeString(targetIndexName)
out.writeInt(targetNumShards)
out.writeLong(lockPrimaryTerm)
out.writeLong(lockSeqNo)
out.writeLong(lockEpochSecond)
out.writeLong(lockDurationSecond)
out.writeMap(originalIndexSettings)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.field(ShrinkProperties.NODE_NAME.key, nodeName)
builder.field(ShrinkProperties.TARGET_INDEX_NAME.key, targetIndexName)
builder.field(ShrinkProperties.TARGET_NUM_SHARDS.key, targetNumShards)
builder.field(ShrinkProperties.LOCK_SEQ_NO.key, lockSeqNo)
builder.field(ShrinkProperties.LOCK_PRIMARY_TERM.key, lockPrimaryTerm)
builder.field(ShrinkProperties.LOCK_EPOCH_SECOND.key, lockEpochSecond)
builder.field(ShrinkProperties.LOCK_DURATION_SECOND.key, lockDurationSecond)
builder.field(ShrinkProperties.ORIGINAL_INDEX_SETTINGS.key, originalIndexSettings)
return builder
}

companion object {
const val SHRINK_ACTION_PROPERTIES = "shrink_action_properties"

fun fromStreamInput(si: StreamInput): ShrinkActionProperties {
val nodeName: String = si.readString()
val targetIndexName: String = si.readString()
val targetNumShards: Int = si.readInt()
val lockPrimaryTerm: Long = si.readLong()
val lockSeqNo: Long = si.readLong()
val lockEpochSecond: Long = si.readLong()
val lockDurationSecond: Long = si.readLong()
val originalIndexSettings: Map<String, String> = si.readMap({ it.readString() }, { it.readString() })

return ShrinkActionProperties(
nodeName, targetIndexName, targetNumShards, lockPrimaryTerm, lockSeqNo, lockEpochSecond, lockDurationSecond, originalIndexSettings
)
}

fun parse(xcp: XContentParser): ShrinkActionProperties {
var nodeName: String? = null
var targetIndexName: String? = null
var targetNumShards: Int? = null
var lockPrimaryTerm: Long? = null
var lockSeqNo: Long? = null
var lockEpochSecond: Long? = null
var lockDurationSecond: Long? = null
var originalIndexSettings: Map<String, String>? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
ShrinkProperties.NODE_NAME.key -> nodeName = xcp.text()
ShrinkProperties.TARGET_INDEX_NAME.key -> targetIndexName = xcp.text()
ShrinkProperties.TARGET_NUM_SHARDS.key -> targetNumShards = xcp.intValue()
ShrinkProperties.LOCK_PRIMARY_TERM.key -> lockPrimaryTerm = xcp.longValue()
ShrinkProperties.LOCK_SEQ_NO.key -> lockSeqNo = xcp.longValue()
ShrinkProperties.LOCK_EPOCH_SECOND.key -> lockEpochSecond = xcp.longValue()
ShrinkProperties.LOCK_DURATION_SECOND.key -> lockDurationSecond = xcp.longValue()
ShrinkProperties.ORIGINAL_INDEX_SETTINGS.key -> originalIndexSettings = xcp.mapStrings()
}
}

return ShrinkActionProperties(
requireNotNull(nodeName),
requireNotNull(targetIndexName),
requireNotNull(targetNumShards),
requireNotNull(lockPrimaryTerm),
requireNotNull(lockSeqNo),
requireNotNull(lockEpochSecond),
requireNotNull(lockDurationSecond),
requireNotNull(originalIndexSettings)
)
}
}

enum class ShrinkProperties(val key: String) {
NODE_NAME("node_name"),
TARGET_INDEX_NAME("target_index_name"),
TARGET_NUM_SHARDS("target_num_shards"),
LOCK_SEQ_NO("lock_seq_no"),
LOCK_PRIMARY_TERM("lock_primary_term"),
LOCK_EPOCH_SECOND("lock_epoch_second"),
LOCK_DURATION_SECOND("lock_duration_second"),
ORIGINAL_INDEX_SETTINGS("original_index_settings")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.commons.authuser.User
import org.opensearch.jobscheduler.spi.utils.LockService
import org.opensearch.script.ScriptService

class StepContext(
Expand All @@ -20,8 +21,9 @@ class StepContext(
val user: User?,
val scriptService: ScriptService,
val settings: Settings,
val lockService: LockService
) {
fun getUpdatedContext(metadata: ManagedIndexMetaData): StepContext {
return StepContext(metadata, this.clusterService, this.client, this.threadContext, this.user, this.scriptService, this.settings)
return StepContext(metadata, this.clusterService, this.client, this.threadContext, this.user, this.scriptService, this.settings, this.lockService)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ReadWriteActio
import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
Expand All @@ -45,6 +46,7 @@ class ISMActionsParser private constructor() {
ReplicaCountActionParser(),
RollupActionParser(),
RolloverActionParser(),
ShrinkActionParser(),
SnapshotActionParser()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ object ManagedIndexRunner :
if (lock == null) {
logger.debug("Could not acquire lock [${lock?.lockId}] for ${job.index}")
} else {
runManagedIndexConfig(job)
runManagedIndexConfig(job, context)
// Release lock
val released: Boolean = context.lockService.suspendUntil { release(lock, it) }
if (!released) {
Expand All @@ -231,7 +231,7 @@ object ManagedIndexRunner :
}

@Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth")
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig) {
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) {
logger.debug("Run job for index ${managedIndexConfig.index}")
// doing a check of local cluster health as we do not want to overload master node with potentially a lot of calls
if (clusterIsRed()) {
Expand Down Expand Up @@ -304,7 +304,9 @@ object ManagedIndexRunner :

val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider)
val stepContext = StepContext(managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings)
val stepContext = StepContext(
managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, jobContext.lockService
)
val step: Step? = action?.getStepToExecute(stepContext)
val currentActionMetaData = action?.getUpdatedActionMetadata(managedIndexMetaData, state.name)

Expand Down
Loading