Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #760 to 2.x #1002

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ However, to build the `index management` plugin project, we also use the OpenSea

### Building from the command line

1. `./gradlew build` builds and tests project.
1. `./gradlew build` builds and tests project.
2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed.
3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed.
4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ data class ActionProperties(
val snapshotName: String? = null,
val rollupId: String? = null,
val hasRollupFailed: Boolean? = null,
val shrinkActionProperties: ShrinkActionProperties? = null
val shrinkActionProperties: ShrinkActionProperties? = null,
val transformActionProperties: TransformActionProperties? = null
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
Expand All @@ -32,6 +33,7 @@ data class ActionProperties(
out.writeOptionalString(rollupId)
out.writeOptionalBoolean(hasRollupFailed)
out.writeOptionalWriteable(shrinkActionProperties)
out.writeOptionalWriteable(transformActionProperties)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -40,6 +42,7 @@ data class ActionProperties(
if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId)
if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed)
if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params)
if (transformActionProperties != null) builder.addObject(TransformActionProperties.TRANSFORM_ACTION_PROPERTIES, transformActionProperties, params)
return builder
}

Expand All @@ -52,7 +55,8 @@ data class ActionProperties(
val rollupId: String? = si.readOptionalString()
val hasRollupFailed: Boolean? = si.readOptionalBoolean()
val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
val transformActionProperties: TransformActionProperties? = si.readOptionalWriteable { TransformActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}

fun parse(xcp: XContentParser): ActionProperties {
Expand All @@ -61,6 +65,7 @@ data class ActionProperties(
var rollupId: String? = null
var hasRollupFailed: Boolean? = null
var shrinkActionProperties: ShrinkActionProperties? = null
var transformActionProperties: TransformActionProperties? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -75,10 +80,13 @@ data class ActionProperties(
ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> {
shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp)
}
TransformActionProperties.TRANSFORM_ACTION_PROPERTIES -> {
transformActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else TransformActionProperties.parse(xcp)
}
}
}

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentFragment
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

data class TransformActionProperties(
val transformId: String?
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalString(transformId)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
if (transformId != null) builder.field(Properties.TRANSFORM_ID.key, transformId)
return builder
}

companion object {
const val TRANSFORM_ACTION_PROPERTIES = "transform_action_properties"

fun fromStreamInput(sin: StreamInput): TransformActionProperties {
val transformId: String? = sin.readOptionalString()
return TransformActionProperties(transformId)
}

fun parse(xcp: XContentParser): TransformActionProperties {
var transformId: String? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
Properties.TRANSFORM_ID.key -> transformId = xcp.text()
}
}

return TransformActionProperties(transformId)
}
}

enum class Properties(val key: String) {
TRANSFORM_ID("transform_id")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val managedIndexCoordinator = ManagedIndexCoordinator(
environment.settings(),
client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider
client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider, xContentRegistry
)

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
Expand All @@ -49,7 +50,8 @@ class ISMActionsParser private constructor() {
RollupActionParser(),
RolloverActionParser(),
ShrinkActionParser(),
SnapshotActionParser()
SnapshotActionParser(),
TransformActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.common.regex.Regex
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.authuser.User
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.index.Index
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementIndices
Expand Down Expand Up @@ -117,7 +118,8 @@ class ManagedIndexCoordinator(
indexManagementIndices: IndexManagementIndices,
private val metadataService: MetadataService,
private val templateService: ISMTemplateService,
private val indexMetadataProvider: IndexMetadataProvider
private val indexMetadataProvider: IndexMetadataProvider,
private val xContentRegistry: NamedXContentRegistry
) : ClusterStateListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {
Expand Down Expand Up @@ -462,7 +464,7 @@ class ManagedIndexCoordinator(

return try {
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
parseFromSearchResponse(response = response, parse = Policy.Companion::parse)
parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (ex: IndexNotFoundException) {
emptyList()
} catch (ex: ClusterBlockException) {
Expand Down Expand Up @@ -713,7 +715,7 @@ class ManagedIndexCoordinator(
}
mRes.forEach {
if (it.response.isExists) {
result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType(
result[it.id] = contentParser(it.response.sourceAsBytesRef, xContentRegistry).parseWithType(
it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep
import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.transform.model.ISMTransform

class TransformAction(
val ismTransform: ISMTransform,
index: Int
) : Action(name, index) {

companion object {
const val name = "transform"
const val ISM_TRANSFORM_FIELD = "ism_transform"
}

private val attemptCreateTransformJobStep = AttemptCreateTransformJobStep(this)
private val waitForTransformCompletionStep = WaitForTransformCompletionStep()
private val steps = listOf(attemptCreateTransformJobStep, waitForTransformCompletionStep)

@Suppress("ReturnCount")
override fun getStepToExecute(context: StepContext): Step {
// if stepMetaData is null, return first step
val stepMetaData = context.metadata.stepMetaData ?: return attemptCreateTransformJobStep

// if the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptCreateTransformJobStep.name -> waitForTransformCompletionStep
else -> attemptCreateTransformJobStep
}
}

return when (stepMetaData.name) {

Check warning on line 45 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt#L45

Added line #L45 was not covered by tests
AttemptCreateTransformJobStep.name -> attemptCreateTransformJobStep
else -> waitForTransformCompletionStep

Check warning on line 47 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt#L47

Added line #L47 was not covered by tests
}
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(ISM_TRANSFORM_FIELD, ismTransform)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
ismTransform.writeTo(out)
out.writeInt(actionIndex)

Check warning on line 61 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt#L60-L61

Added lines #L60 - L61 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.transform.model.ISMTransform

class TransformActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val ismTransform = ISMTransform(sin)
val index = sin.readInt()
return TransformAction(ismTransform, index)

Check warning on line 19 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt#L17-L19

Added lines #L17 - L19 were not covered by tests
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var ismTransform: ISMTransform? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
TransformAction.ISM_TRANSFORM_FIELD -> ismTransform = ISMTransform.parse(xcp)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in TransformAction.")

Check warning on line 32 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt#L32

Added line #L32 was not covered by tests
}
}

return TransformAction(ismTransform = requireNotNull(ismTransform) { "TransformAction transform is null." }, index)
}

override fun getActionType(): String {
return TransformAction.name
}
}
Loading
Loading