From 2cfa5ad7661b404c79b62358c5a08d96ffdd1a29 Mon Sep 17 00:00:00 2001 From: Drew Baugher <46505179+dbbaughe@users.noreply.github.com> Date: Fri, 13 Nov 2020 10:46:09 -0800 Subject: [PATCH] Backports replicas change and rollover RTE cause handling (#330) * Explicitly sets replicas and shards to 1 for ISM history indices (#318) * Handles remote transport exceptions in rollover (#325) --- .../IndexStateManagementHistory.kt | 1 + .../IndexStateManagementIndices.kt | 2 ++ .../step/rollover/AttemptRolloverStep.kt | 17 ++++++++--------- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementHistory.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementHistory.kt index 9e1be6aaa..ca137f9e7 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementHistory.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementHistory.kt @@ -119,6 +119,7 @@ class IndexStateManagementHistory( val request = RolloverRequest(IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS, null) request.createIndexRequest.index(IndexStateManagementIndices.HISTORY_INDEX_PATTERN) .mapping(_DOC, IndexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON) + .settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1)) request.addMaxIndexDocsCondition(historyMaxDocs) request.addMaxIndexAgeCondition(historyMaxAge) val response = client.admin().indices().rolloversIndex(request).actionGet() diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndices.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndices.kt index cbf7a6209..5defa507d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndices.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndices.kt @@ -32,6 +32,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.client.IndicesAdminClient import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.XContentType @OpenForTesting @@ -104,6 +105,7 @@ class IndexStateManagementIndices( if (existsResponse.isExists) return true val request = CreateIndexRequest(index).mapping(_DOC, indexStateManagementHistoryMappings, XContentType.JSON) + .settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()) if (alias != null) request.alias(Alias(alias)) return try { val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.create(request, it) } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 13b5ed023..6f0a57f0c 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedi import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.evaluateConditions import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.rollover.RolloverRequest import org.elasticsearch.action.admin.indices.rollover.RolloverResponse import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest @@ -33,6 +34,7 @@ import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.unit.ByteSizeValue import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.rest.RestStatus +import org.elasticsearch.transport.RemoteTransportException import java.time.Instant @Suppress("ReturnCount", "TooGenericExceptionCaught") @@ -138,6 +140,8 @@ class AttemptRolloverStep( if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified ).toMap() } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { handleException(e) } @@ -173,21 +177,16 @@ class AttemptRolloverStep( "message" to message, "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } ) + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - val message = getFailedEvaluateMessage(indexName) - logger.error(message, e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to message) - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e, getFailedEvaluateMessage(indexName)) } return null } - private fun handleException(e: Exception) { - val message = getFailedMessage(indexName) + private fun handleException(e: Exception, message: String = getFailedMessage(indexName)) { logger.error(message, e) stepStatus = StepStatus.FAILED val mutableInfo = mutableMapOf("message" to message)