This repository has been archived by the owner on Aug 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introducing new ISM Action to create rollups (#371)
- Loading branch information
Showing
23 changed files
with
1,500 additions
and
19 deletions.
There are no files selected for viewing
57 changes: 57 additions & 0 deletions
57
...on/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/RollupAction.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action | ||
|
||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.ActionConfig | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.RollupActionConfig | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.rollup.AttemptCreateRollupJobStep | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.rollup.WaitForRollupCompletionStep | ||
import org.elasticsearch.client.Client | ||
import org.elasticsearch.cluster.service.ClusterService | ||
|
||
class RollupAction( | ||
clusterService: ClusterService, | ||
client: Client, | ||
managedIndexMetaData: ManagedIndexMetaData, | ||
config: RollupActionConfig | ||
) : Action(ActionConfig.ActionType.ROLLUP, config, managedIndexMetaData) { | ||
|
||
private val attemptCreateRollupJobStep = AttemptCreateRollupJobStep(clusterService, client, config.ismRollup, managedIndexMetaData) | ||
private val waitForRollupCompletionStep = WaitForRollupCompletionStep(clusterService, client, managedIndexMetaData) | ||
|
||
override fun getSteps(): List<Step> = listOf(attemptCreateRollupJobStep, waitForRollupCompletionStep) | ||
|
||
@Suppress("ReturnCount") | ||
override fun getStepToExecute(): Step { | ||
// If stepMetaData is null, return the first step | ||
val stepMetaData = managedIndexMetaData.stepMetaData ?: return attemptCreateRollupJobStep | ||
|
||
// If the current step has completed, return the next step | ||
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) { | ||
return when (stepMetaData.name) { | ||
AttemptCreateRollupJobStep.name -> waitForRollupCompletionStep | ||
else -> attemptCreateRollupJobStep | ||
} | ||
} | ||
|
||
return when (stepMetaData.name) { | ||
AttemptCreateRollupJobStep.name -> attemptCreateRollupJobStep | ||
else -> waitForRollupCompletionStep | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
...oforelasticsearch/indexmanagement/indexstatemanagement/model/action/RollupActionConfig.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action | ||
|
||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action.Action | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action.RollupAction | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.ISMRollup | ||
import org.elasticsearch.client.Client | ||
import org.elasticsearch.cluster.service.ClusterService | ||
import org.elasticsearch.common.io.stream.StreamInput | ||
import org.elasticsearch.common.io.stream.StreamOutput | ||
import org.elasticsearch.common.xcontent.ToXContent | ||
import org.elasticsearch.common.xcontent.ToXContentObject | ||
import org.elasticsearch.common.xcontent.XContentBuilder | ||
import org.elasticsearch.common.xcontent.XContentParser | ||
import org.elasticsearch.common.xcontent.XContentParserUtils | ||
import org.elasticsearch.script.ScriptService | ||
import java.io.IOException | ||
|
||
class RollupActionConfig( | ||
val ismRollup: ISMRollup, | ||
val index: Int | ||
) : ToXContentObject, ActionConfig(ActionType.ROLLUP, index) { | ||
|
||
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { | ||
builder.startObject() | ||
super.toXContent(builder, params) | ||
.startObject(ActionType.ROLLUP.type) | ||
.field(ISM_ROLLUP_FIELD, ismRollup) | ||
.endObject() | ||
.endObject() | ||
return builder | ||
} | ||
|
||
override fun toAction( | ||
clusterService: ClusterService, | ||
scriptService: ScriptService, | ||
client: Client, | ||
managedIndexMetaData: ManagedIndexMetaData | ||
): Action = RollupAction(clusterService, client, managedIndexMetaData, this) | ||
|
||
override fun isFragment(): Boolean = super<ToXContentObject>.isFragment() | ||
|
||
@Throws(IOException::class) | ||
constructor(sin: StreamInput) : this(ismRollup = ISMRollup(sin), index = sin.readInt()) | ||
|
||
@Throws(IOException::class) | ||
override fun writeTo(out: StreamOutput) { | ||
super.writeTo(out) | ||
ismRollup.writeTo(out) | ||
out.writeInt(actionIndex) | ||
} | ||
|
||
companion object { | ||
const val ISM_ROLLUP_FIELD = "ism_rollup" | ||
var ismRollup: ISMRollup? = null | ||
|
||
@JvmStatic | ||
@Throws(IOException::class) | ||
fun parse(xcp: XContentParser, actionIndex: Int): RollupActionConfig { | ||
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) | ||
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { | ||
val fieldName = xcp.currentName() | ||
xcp.nextToken() | ||
|
||
when (fieldName) { | ||
ISM_ROLLUP_FIELD -> ismRollup = ISMRollup.parse(xcp) | ||
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RollupActionConfig.") | ||
} | ||
} | ||
|
||
return RollupActionConfig( | ||
ismRollup = requireNotNull(ismRollup) { "RollupActionConfig rollup is null" }, | ||
index = actionIndex) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
130 changes: 130 additions & 0 deletions
130
...sticsearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
/* | ||
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.rollup | ||
|
||
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.index.IndexRollupAction | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.index.IndexRollupRequest | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.index.IndexRollupResponse | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.start.StartRollupAction | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.start.StartRollupRequest | ||
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.ISMRollup | ||
import org.apache.logging.log4j.LogManager | ||
import org.elasticsearch.ExceptionsHelper | ||
import org.elasticsearch.action.support.WriteRequest | ||
import org.elasticsearch.action.support.master.AcknowledgedResponse | ||
import org.elasticsearch.client.Client | ||
import org.elasticsearch.cluster.service.ClusterService | ||
import org.elasticsearch.index.engine.VersionConflictEngineException | ||
import org.elasticsearch.transport.RemoteTransportException | ||
import java.lang.Exception | ||
|
||
class AttemptCreateRollupJobStep( | ||
val clusterService: ClusterService, | ||
val client: Client, | ||
val ismRollup: ISMRollup, | ||
managedIndexMetaData: ManagedIndexMetaData | ||
) : Step(name, managedIndexMetaData) { | ||
|
||
private val logger = LogManager.getLogger(javaClass) | ||
private var stepStatus = StepStatus.STARTING | ||
private var info: Map<String, Any>? = null | ||
private var rollupId: String? = null | ||
private var previousRunRollupId: String? = null | ||
private var hasPreviousRollupAttemptFailed: Boolean? = null | ||
|
||
override fun isIdempotent() = true | ||
|
||
override suspend fun execute(): Step { | ||
previousRunRollupId = managedIndexMetaData.actionMetaData?.actionProperties?.rollupId | ||
hasPreviousRollupAttemptFailed = managedIndexMetaData.actionMetaData?.actionProperties?.hasRollupFailed | ||
|
||
// Creating a rollup job | ||
val rollup = ismRollup.toRollup(indexName) | ||
rollupId = rollup.id | ||
logger.info("Attempting to create a rollup job $rollupId for index $indexName") | ||
|
||
val indexRollupRequest = IndexRollupRequest(rollup, WriteRequest.RefreshPolicy.IMMEDIATE) | ||
|
||
try { | ||
val response: IndexRollupResponse = client.suspendUntil { execute(IndexRollupAction.INSTANCE, indexRollupRequest, it) } | ||
logger.info("Received status ${response.status.status} on trying to create rollup job $rollupId") | ||
|
||
stepStatus = StepStatus.COMPLETED | ||
info = mapOf("message" to getSuccessMessage(rollup.id, indexName)) | ||
} catch (e: VersionConflictEngineException) { | ||
val message = getFailedJobExistsMessage(rollup.id, indexName) | ||
logger.info(message) | ||
if (rollupId == previousRunRollupId && hasPreviousRollupAttemptFailed == true) { | ||
startRollupJob(rollup.id) | ||
} else { | ||
stepStatus = StepStatus.COMPLETED | ||
info = mapOf("info" to message) | ||
} | ||
} catch (e: RemoteTransportException) { | ||
processFailure(rollup.id, ExceptionsHelper.unwrapCause(e) as Exception) | ||
} catch (e: RemoteTransportException) { | ||
processFailure(rollup.id, e) | ||
} | ||
|
||
return this | ||
} | ||
|
||
override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { | ||
val currentActionMetaData = currentMetaData.actionMetaData | ||
return currentMetaData.copy( | ||
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(rollupId = rollupId)), | ||
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), | ||
transitionTo = null, | ||
info = info | ||
) | ||
} | ||
|
||
fun processFailure(rollupId: String, e: Exception) { | ||
val message = getFailedMessage(rollupId, indexName) | ||
logger.error(message, e) | ||
stepStatus = StepStatus.FAILED | ||
info = mapOf("message" to message, "cause" to "${e.message}") | ||
} | ||
|
||
private suspend fun startRollupJob(rollupId: String) { | ||
logger.info("Attempting to re-start the job $rollupId") | ||
try { | ||
val startRollupRequest = StartRollupRequest(rollupId) | ||
val response: AcknowledgedResponse = client.suspendUntil { execute(StartRollupAction.INSTANCE, startRollupRequest, it) } | ||
stepStatus = StepStatus.COMPLETED | ||
info = mapOf("message" to getSuccessRestartMessage(rollupId, indexName)) | ||
} catch (e: Exception) { | ||
val message = getFailedToStartMessage(rollupId, indexName) | ||
logger.error(message, e) | ||
stepStatus = StepStatus.FAILED | ||
info = mapOf("message" to message) | ||
} | ||
} | ||
|
||
companion object { | ||
const val name = "attempt_create_rollup" | ||
fun getFailedMessage(rollupId: String, index: String) = "Failed to create the rollup job [$rollupId] [index=$index]" | ||
fun getFailedJobExistsMessage(rollupId: String, index: String) = "Rollup job [$rollupId] already exists, skipping creation [index=$index]" | ||
fun getFailedToStartMessage(rollupId: String, index: String) = "Failed to start the rollup job [$rollupId] [index=$index]" | ||
fun getSuccessMessage(rollupId: String, index: String) = "Successfully created the rollup job [$rollupId] [index=$index]" | ||
fun getSuccessRestartMessage(rollupId: String, index: String) = "Successfully restarted the rollup job [$rollupId] [index=$index]" | ||
} | ||
} |
Oops, something went wrong.