Skip to content

Commit

Permalink
Adds rollover conditions into info object (opendistro-for-elasticsear…
Browse files Browse the repository at this point in the history
  • Loading branch information
dbbaughe committed Aug 4, 2020
1 parent dd76cd6 commit 42130dc
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.rest.RestStatus
import java.time.Instant

Expand Down Expand Up @@ -67,24 +68,49 @@ class AttemptRolloverStep(
statsResponse ?: return

val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
val indexAgeTimeValue = if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
// since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0
TimeValue.timeValueMillis(0)
} else {
TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate)
}
val numDocs = statsResponse.primaries.docs?.count ?: 0
val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)

if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) {
val conditions = listOfNotNull(
config.minAge?.let {
RolloverActionConfig.MIN_INDEX_AGE_FIELD to mapOf(
"condition" to it.toString(),
"current" to indexAgeTimeValue.toString(),
"creationDate" to indexCreationDate
)
},
config.minDocs?.let {
RolloverActionConfig.MIN_DOC_COUNT_FIELD to mapOf(
"condition" to it,
"current" to numDocs
)
},
config.minSize?.let {
RolloverActionConfig.MIN_SIZE_FIELD to mapOf(
"condition" to it.toString(),
"current" to indexSize.toString()
)
}
).toMap()

if (config.evaluateConditions(indexAgeTimeValue, numDocs, indexSize)) {
logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize.bytes}]")
executeRollover(alias)
executeRollover(alias, conditions)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Attempting to rollover")
info = mapOf("message" to "Attempting to rollover", "conditions" to conditions)
}
}

private suspend fun executeRollover(alias: String) {
@Suppress("ComplexMethod")
private suspend fun executeRollover(alias: String, conditions: Map<String, Map<String, Any?>>) {
try {
val request = RolloverRequest(alias, null)
val response: RolloverResponse = client.admin().indices().suspendUntil { rolloverIndex(request, it) }
Expand All @@ -95,12 +121,18 @@ class AttemptRolloverStep(
// If response isAcknowledged it means the index was created and alias was added to new index
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Rolled over index")
info = listOfNotNull(
"message" to "Rolled over index",
if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified
).toMap()
} else {
// If the alias update response is NOT acknowledged we will get back isAcknowledged=false
// This means the new index was created but we failed to swap the alias
stepStatus = StepStatus.FAILED
info = mapOf("message" to "New index created (${response.newIndex}), but failed to update alias")
info = listOfNotNull(
"message" to "New index created (${response.newIndex}), but failed to update alias",
if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified
).toMap()
}
} catch (e: Exception) {
logger.error("Failed to rollover index [index=${managedIndexMetaData.index}]", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.index.query.BoolQueryBuilder
Expand Down Expand Up @@ -216,7 +217,7 @@ fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null

@Suppress("ReturnCount")
fun RolloverActionConfig.evaluateConditions(
indexCreationDate: Instant,
indexAgeTimeValue: TimeValue,
numDocs: Long,
indexSize: ByteSizeValue
): Boolean {
Expand All @@ -232,11 +233,7 @@ fun RolloverActionConfig.evaluateConditions(
}

if (this.minAge != null) {
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli != -1L) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
if (this.minAge.millis <= elapsedTime) return true
}
if (this.minAge.millis <= indexAgeTimeValue.millis) return true
}

if (this.minSize != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedI
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
import org.hamcrest.collection.IsMapContaining
import java.time.Instant
import java.util.Locale

Expand Down Expand Up @@ -54,13 +55,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {

// the second execution we move into rollover action, we won't hit the timeout as this is the execution that sets the startTime
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedInfoString = mapOf("message" to "Attempting to rollover").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString())),
getExplainMap(indexName),
strict = false
assertThat(
"Should be attempting to rollover",
getExplainManagedIndexMetaData(indexName).info,
IsMapContaining.hasEntry("message", "Attempting to rollover" as Any?)
)
}

Expand Down Expand Up @@ -122,13 +121,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
// the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover
// but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())),
getExplainMap(indexName),
strict = false
assertThat(
"Should be attempting to rollover",
getExplainManagedIndexMetaData(indexName).info,
IsMapContaining.hasEntry("message", "Attempting to rollover" as Any?)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase
import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.elasticsearch.common.unit.ByteSizeUnit
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.common.unit.TimeValue
import org.hamcrest.core.Is.isA
import org.junit.Assert
import java.time.Instant
import java.time.temporal.ChronoUnit
Expand All @@ -36,6 +34,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

@Suppress("UNCHECKED_CAST")
fun `test rollover no condition`() {
val aliasName = "${testIndexName}_alias"
val indexNameBase = "${testIndexName}_index"
Expand Down Expand Up @@ -65,10 +64,15 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index did not rollover.", "Rolled over index", info["message"])
assertNull("Should not have conditions if none specified", info["conditions"])
}
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

@Suppress("UNCHECKED_CAST")
fun `test rollover multi condition byte size`() {
val aliasName = "${testIndexName}_byte_alias"
val indexNameBase = "${testIndexName}_index_byte"
Expand Down Expand Up @@ -98,36 +102,47 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) }

client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/1111",
StringEntity("{ \"testkey\": \"some valueaaaaaaa\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/2222",
StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/3333",
StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON)
)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index rollover before it met the condition.", "Attempting to rollover", info["message"])
val conditions = info["conditions"] as Map<String, Any?>
assertEquals("Did not have exclusively min size and min doc count conditions",
setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Did not have min size condition", "10b", minSize["condition"])
assertThat("Did not have min size current", minSize["current"], isA(String::class.java))
assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"])
assertEquals("Did not have min doc count current", 0, minDocCount["current"])
}

insertSampleData(index = firstIndex, docCount = 5, delay = 0)

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index did not rollover", "Rolled over index", info["message"])
val conditions = info["conditions"] as Map<String, Any?>
assertEquals("Did not have exclusively min size and min doc count conditions",
setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Did not have min size condition", "10b", minSize["condition"])
assertThat("Did not have min size current", minSize["current"], isA(String::class.java))
assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"])
assertEquals("Did not have min doc count current", 5, minDocCount["current"])
}
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

@Suppress("UNCHECKED_CAST")
fun `test rollover multi condition doc size`() {
val aliasName = "${testIndexName}_doc_alias"
val indexNameBase = "${testIndexName}_index_doc"
val firstIndex = "$indexNameBase-1"
val policyID = "${testIndexName}_testPolicyName_doc_1"
val actionConfig = RolloverActionConfig(ByteSizeValue(10, ByteSizeUnit.TB), 3, null, 0)
val actionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueDays(2), 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
Expand All @@ -151,27 +166,37 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) }

client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/1111",
StringEntity("{ \"testkey\": \"some value\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/2222",
StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/3333",
StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON)
)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index rollover before it met the condition.", "Attempting to rollover", info["message"])
val conditions = info["conditions"] as Map<String, Any?>
assertEquals("Did not have exclusively min age and min doc count conditions",
setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Did not have min age condition", "2d", minAge["condition"])
assertThat("Did not have min age current", minAge["current"], isA(String::class.java))
assertEquals("Did not have min doc count condition", 3, minDocCount["condition"])
assertEquals("Did not have min doc count current", 0, minDocCount["current"])
}

insertSampleData(index = firstIndex, docCount = 5, delay = 0)

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index did not rollover", "Rolled over index", info["message"])
val conditions = info["conditions"] as Map<String, Any?>
assertEquals("Did not have exclusively min age and min doc count conditions",
setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Did not have min age condition", "2d", minAge["condition"])
assertThat("Did not have min age current", minAge["current"], isA(String::class.java))
assertEquals("Did not have min doc count condition", 3, minDocCount["condition"])
assertEquals("Did not have min doc count current", 5, minDocCount["current"])
}
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}
}
Loading

0 comments on commit 42130dc

Please sign in to comment.