Skip to content

Commit

Permalink
Fixes force merge failing on long executions, changes some action mes… (
Browse files Browse the repository at this point in the history
opendistro-for-elasticsearch#267)

* Fixes force merge failing on long executions, changes some action messaging, adds better try/catch on actions to deal with remote transport exceptions

* Adds unit tests for failures in steps

* Adds more tests

* Updates jacoco source so the source files show up in the report

* Addressing comments on naming and spacing
  • Loading branch information
dbbaughe committed Aug 4, 2020
1 parent d309ae6 commit 2aabd51
Show file tree
Hide file tree
Showing 37 changed files with 1,157 additions and 280 deletions.
9 changes: 0 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ The project in this package uses the [Gradle](https://docs.gradle.org/current/us

However, to build the `index management` plugin project, we also use the Elastic build tools for Gradle. These tools are idiosyncratic and don't always follow the conventions and instructions for building regular Java code using Gradle. Not everything in `index management` will work the way it's described in the Gradle documentation. If you encounter such a situation, the Elastic build tools [source code](https://github.com/elastic/elasticsearch/tree/master/buildSrc/src/main/groovy/org/elasticsearch/gradle) is your best bet for figuring out what's going on.

This project currently uses the Notification subproject from the [Alerting plugin](https://github.com/opendistro-for-elasticsearch/alerting). There is an [open PR](https://github.com/opendistro-for-elasticsearch/alerting/pull/97) that introduces the maven publish task in Alerting for publishing the Notification jars. Until this PR is fully merged and jars published you will need to pull down the PR yourself and publish the jars to your local maven repository in order to build Index Management.

1. Visit the PR [here](https://github.com/opendistro-for-elasticsearch/alerting/pull/97) and pull down the Alerting plugin along with the PR changes
2. You may need to cherry-pick the changes into a separate branch if you require a specific version to be published
3. Build the Alerting plugin (w/ the changes in PR) and publish the artifacts to your local maven repository
1. `./gradlew clean`
2. `./gradlew build` or `./gradlew assemble` build will run the tests and build artifacts, assemble will only build the artifacts
3. `./gradlew publishToMavenLocal` publishes artifacts to your local maven repository

### Building from the command line

1. `./gradlew build` builds and tests project.
Expand Down
4 changes: 2 additions & 2 deletions build-tools/esplugin-coverage.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ integTestCluster {
jacocoTestReport {
dependsOn integTest, test
executionData dummyTest.jacoco.destinationFile, dummyIntegTest.jacoco.destinationFile
sourceDirectories = sourceSets.main.allSource
classDirectories = sourceSets.main.output
sourceDirectories.from = "src/main/kotlin"
classDirectories.from = sourceSets.main.output
reports {
html.enabled = true // human readable
xml.enabled = true // for coverlay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ object ManagedIndexRunner : ScheduledJobRunner,

if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) {
// Step null check is done in getStartingManagedIndexMetaData
step.execute()
step.preExecute(logger).execute().postExecute(logger)
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)

if (executedManagedIndexMetaData.isFailed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.client.ElasticsearchClient
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.bytes.BytesReference
Expand All @@ -36,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.transport.RemoteTransportException
import java.time.Instant
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
Expand Down Expand Up @@ -202,9 +205,7 @@ fun IndexMetaData.getRolloverAlias(): String? {
fun IndexMetaData.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexConfig? {
val index = this.index.name
val uuid = this.index.uuid
val policyID = this.getPolicyID()

if (policyID == null) return null
val policyID = this.getPolicyID() ?: return null

return ClusterStateManagedIndexConfig(index = index, uuid = uuid, policyID = policyID)
}
Expand All @@ -217,3 +218,13 @@ fun IndexMetaData.getManagedIndexMetaData(): ManagedIndexMetaData? {
}
return null
}

fun Throwable.findRemoteTransportException(): RemoteTransportException? {
if (this is RemoteTransportException) return this
return this.cause?.findRemoteTransportException()
}

fun DefaultShardOperationFailedException.getUsefulCauseString(): String {
val rte = this.cause?.findRemoteTransportException()
return if (rte == null) this.toString() else ExceptionsHelper.unwrapCause(rte).toString()
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ data class SweptManagedIndexConfig(
) {

companion object {
@Suppress("ComplexMethod")
@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): SweptManagedIndexConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.apache.logging.log4j.Logger
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.io.stream.Writeable
Expand All @@ -25,7 +26,17 @@ import java.util.Locale

abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData, val isSafeToDisableOn: Boolean = true) {

abstract suspend fun execute()
fun preExecute(logger: Logger): Step {
logger.info("Executing $name for ${managedIndexMetaData.index}")
return this
}

abstract suspend fun execute(): Step

fun postExecute(logger: Logger): Step {
logger.info("Finished executing $name for ${managedIndexMetaData.index}")
return this
}

abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData

Expand All @@ -44,9 +55,7 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta
*/
abstract fun isIdempotent(): Boolean

fun getStartingStepMetaData(): StepMetaData {
return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)
}
fun getStartingStepMetaData(): StepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)

fun getStepStartTime(): Instant {
if (managedIndexMetaData.stepMetaData == null || managedIndexMetaData.stepMetaData.name != this.name) {
Expand All @@ -55,6 +64,8 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta
return Instant.ofEpochMilli(managedIndexMetaData.stepMetaData.startTime)
}

protected val indexName: String = managedIndexMetaData.index

enum class StepStatus(val status: String) : Writeable {
STARTING("starting"),
CONDITION_NOT_MET("condition_not_met"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.C
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.transport.RemoteTransportException

class AttemptCloseStep(
val clusterService: ClusterService,
Expand All @@ -41,34 +43,52 @@ class AttemptCloseStep(
override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
override suspend fun execute(): AttemptCloseStep {
try {
logger.info("Executing close on $index")
val closeIndexRequest = CloseIndexRequest()
.indices(index)
.indices(indexName)

val response: CloseIndexResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) }
logger.info("Close index for $index was acknowledged=${response.isAcknowledged}")
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Successfully closed index")
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to close index")
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(cause)
} else {
handleException(cause as Exception)
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to close index [index=$index] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying closing")
handleSnapshotException(e)
} catch (e: Exception) {
logger.error("Failed to set index to close [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to set index to close")
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
handleException(e)
}

return this
}

private fun handleSnapshotException(e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.warn(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

private fun handleException(e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
Expand All @@ -78,4 +98,10 @@ class AttemptCloseStep(
info = info
)
}

companion object {
fun getFailedMessage(index: String) = "Failed to close index [index=$index]"
fun getSuccessMessage(index: String) = "Successfully closed index [index=$index]"
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying closing [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.D
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.transport.RemoteTransportException
import java.lang.Exception

class AttemptDeleteStep(
Expand All @@ -42,30 +44,51 @@ class AttemptDeleteStep(
override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
override suspend fun execute(): AttemptDeleteStep {
try {
val response: AcknowledgedResponse = client.admin().indices()
.suspendUntil { delete(DeleteIndexRequest(managedIndexMetaData.index), it) }
.suspendUntil { delete(DeleteIndexRequest(indexName), it) }

if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Deleted index")
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to delete index")
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(cause)
} else {
handleException(cause as Exception)
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying deletion")
handleSnapshotException(e)
} catch (e: Exception) {
logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to delete index")
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
handleException(e)
}

return this
}

private fun handleSnapshotException(e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.warn(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

private fun handleException(e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
Expand All @@ -78,5 +101,8 @@ class AttemptDeleteStep(

companion object {
const val name = "attempt_delete"
fun getFailedMessage(index: String) = "Failed to delete index [index=$index]"
fun getSuccessMessage(index: String) = "Successfully deleted index [index=$index]"
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying deletion [index=$index]"
}
}
Loading

0 comments on commit 2aabd51

Please sign in to comment.