Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Backports replicas change and rollover RTE cause handling (#332)
Browse files Browse the repository at this point in the history
* Explicitly sets replicas and shards to 1 for ISM history indices (#318)

* Handles remote transport exceptions in rollover (#325)
  • Loading branch information
dbbaughe authored Nov 13, 2020
1 parent cb0f341 commit dd601c8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class IndexStateManagementHistory(
val request = RolloverRequest(IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS, null)
request.createIndexRequest.index(IndexStateManagementIndices.HISTORY_INDEX_PATTERN)
.mapping(_DOC, IndexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON)
.settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1))
request.addMaxIndexDocsCondition(historyMaxDocs)
request.addMaxIndexAgeCondition(historyMaxAge)
val response = client.admin().indices().rolloversIndex(request).actionGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.XContentType

@OpenForTesting
Expand Down Expand Up @@ -104,6 +105,7 @@ class IndexStateManagementIndices(
if (existsResponse.isExists) return true

val request = CreateIndexRequest(index).mapping(_DOC, indexStateManagementHistoryMappings, XContentType.JSON)
.settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build())
if (alias != null) request.alias(Alias(alias))
return try {
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.create(request, it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedi
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.evaluateConditions
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest
Expand All @@ -33,6 +34,7 @@ import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.transport.RemoteTransportException
import java.time.Instant

@Suppress("ReturnCount", "TooGenericExceptionCaught")
Expand Down Expand Up @@ -138,6 +140,8 @@ class AttemptRolloverStep(
if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified
).toMap()
}
} catch (e: RemoteTransportException) {
handleException(ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: Exception) {
handleException(e)
}
Expand Down Expand Up @@ -173,21 +177,16 @@ class AttemptRolloverStep(
"message" to message,
"shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() }
)
} catch (e: RemoteTransportException) {
handleException(ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: Exception) {
val message = getFailedEvaluateMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
handleException(e, getFailedEvaluateMessage(indexName))
}

return null
}

private fun handleException(e: Exception) {
val message = getFailedMessage(indexName)
private fun handleException(e: Exception, message: String = getFailedMessage(indexName)) {
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
Expand Down

0 comments on commit dd601c8

Please sign in to comment.