Skip to content

Commit

Permalink
Fixes force merge failing on long executions, changes some action mes… (
Browse files Browse the repository at this point in the history
opendistro-for-elasticsearch#267)

* Fixes force merge failing on long executions, changes some action messaging, adds better try/catch on actions to deal with remote transport exceptions

* Adds unit tests for failures in steps

* Adds more tests

* Updates jacoco source so the source files show up in the report

* Addressing comments on naming and spacing
  • Loading branch information
dbbaughe committed Aug 4, 2020
1 parent d5762b4 commit 7c373ad
Show file tree
Hide file tree
Showing 36 changed files with 1,161 additions and 275 deletions.
4 changes: 2 additions & 2 deletions build-tools/esplugin-coverage.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ integTestCluster {
jacocoTestReport {
dependsOn integTest, test
executionData dummyTest.jacoco.destinationFile, dummyIntegTest.jacoco.destinationFile
sourceDirectories = sourceSets.main.allSource
classDirectories = sourceSets.main.output
sourceDirectories.from = "src/main/kotlin"
classDirectories.from = sourceSets.main.output
reports {
html.enabled = true // human readable
xml.enabled = true // for coverlay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ object ManagedIndexRunner : ScheduledJobRunner,

if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) {
// Step null check is done in getStartingManagedIndexMetaData
step.execute()
step.preExecute(logger).execute().postExecute(logger)
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)

if (executedManagedIndexMetaData.isFailed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.client.ElasticsearchClient
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.bytes.BytesReference
Expand All @@ -36,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.transport.RemoteTransportException
import java.time.Instant
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
Expand Down Expand Up @@ -202,9 +205,7 @@ fun IndexMetaData.getRolloverAlias(): String? {
fun IndexMetaData.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexConfig? {
val index = this.index.name
val uuid = this.index.uuid
val policyID = this.getPolicyID()

if (policyID == null) return null
val policyID = this.getPolicyID() ?: return null

return ClusterStateManagedIndexConfig(index = index, uuid = uuid, policyID = policyID)
}
Expand All @@ -217,3 +218,13 @@ fun IndexMetaData.getManagedIndexMetaData(): ManagedIndexMetaData? {
}
return null
}

fun Throwable.findRemoteTransportException(): RemoteTransportException? {
if (this is RemoteTransportException) return this
return this.cause?.findRemoteTransportException()
}

fun DefaultShardOperationFailedException.getUsefulCauseString(): String {
val rte = this.cause?.findRemoteTransportException()
return if (rte == null) this.toString() else ExceptionsHelper.unwrapCause(rte).toString()
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ data class SweptManagedIndexConfig(
) {

companion object {
@Suppress("ComplexMethod")
@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): SweptManagedIndexConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.apache.logging.log4j.Logger
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.io.stream.Writeable
Expand All @@ -25,7 +26,17 @@ import java.util.Locale

abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData, val isSafeToDisableOn: Boolean = true) {

abstract suspend fun execute()
fun preExecute(logger: Logger): Step {
logger.info("Executing $name for ${managedIndexMetaData.index}")
return this
}

abstract suspend fun execute(): Step

fun postExecute(logger: Logger): Step {
logger.info("Finished executing $name for ${managedIndexMetaData.index}")
return this
}

abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData

Expand All @@ -44,9 +55,7 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta
*/
abstract fun isIdempotent(): Boolean

fun getStartingStepMetaData(): StepMetaData {
return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)
}
fun getStartingStepMetaData(): StepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)

fun getStepStartTime(): Instant {
if (managedIndexMetaData.stepMetaData == null || managedIndexMetaData.stepMetaData.name != this.name) {
Expand All @@ -55,6 +64,8 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta
return Instant.ofEpochMilli(managedIndexMetaData.stepMetaData.startTime)
}

protected val indexName: String = managedIndexMetaData.index

enum class StepStatus(val status: String) : Writeable {
STARTING("starting"),
CONDITION_NOT_MET("condition_not_met"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.C
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.transport.RemoteTransportException

class AttemptCloseStep(
val clusterService: ClusterService,
Expand All @@ -41,34 +43,52 @@ class AttemptCloseStep(
override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
override suspend fun execute(): AttemptCloseStep {
try {
logger.info("Executing close on $index")
val closeIndexRequest = CloseIndexRequest()
.indices(index)
.indices(indexName)

val response: AcknowledgedResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) }
logger.info("Close index for $index was acknowledged=${response.isAcknowledged}")
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Successfully closed index")
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to close index")
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(cause)
} else {
handleException(cause as Exception)
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to close index [index=$index] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying closing")
handleSnapshotException(e)
} catch (e: Exception) {
logger.error("Failed to set index to close [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to set index to close")
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
handleException(e)
}

return this
}

private fun handleSnapshotException(e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.warn(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

private fun handleException(e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
Expand All @@ -78,4 +98,10 @@ class AttemptCloseStep(
info = info
)
}

companion object {
fun getFailedMessage(index: String) = "Failed to close index [index=$index]"
fun getSuccessMessage(index: String) = "Successfully closed index [index=$index]"
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying closing [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.D
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.transport.RemoteTransportException
import java.lang.Exception

class AttemptDeleteStep(
Expand All @@ -42,30 +44,51 @@ class AttemptDeleteStep(
override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
override suspend fun execute(): AttemptDeleteStep {
try {
val response: AcknowledgedResponse = client.admin().indices()
.suspendUntil { delete(DeleteIndexRequest(managedIndexMetaData.index), it) }
.suspendUntil { delete(DeleteIndexRequest(indexName), it) }

if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Deleted index")
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to delete index")
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(cause)
} else {
handleException(cause as Exception)
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying deletion")
handleSnapshotException(e)
} catch (e: Exception) {
logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to delete index")
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
handleException(e)
}

return this
}

private fun handleSnapshotException(e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.warn(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

private fun handleException(e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
Expand All @@ -78,5 +101,8 @@ class AttemptDeleteStep(

companion object {
const val name = "attempt_delete"
fun getFailedMessage(index: String) = "Failed to delete index [index=$index]"
fun getSuccessMessage(index: String) = "Successfully deleted index [index=$index]"
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying deletion [index=$index]"
}
}
Loading

0 comments on commit 7c373ad

Please sign in to comment.