Skip to content

Commit

Permalink
Fixes flaky continuous transforms and shrink tests (#340) (#341)
Browse files Browse the repository at this point in the history
* fixes flaky tests

Signed-off-by: Clay Downs <[email protected]>

* Deletes data stream manually

Signed-off-by: Clay Downs <[email protected]>
(cherry picked from commit 74dac63)

Co-authored-by: Clay Downs <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and downsrob authored Apr 21, 2022
1 parent 3af62f5 commit 3dc2894
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
for (node in nodesList) {
// Gets the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes,
// as the source index is duplicated during the shrink
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, stepContext.settings, stepContext.clusterService.clusterSettings)
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, stepContext.clusterService.clusterSettings)
if (remainingMem > 0L) {
nodesWithSpace.add(Tuple(remainingMem, node.node.name))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
cleanupAndFail(FAILURE_MESSAGE)
return false
}
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.settings, context.clusterService.clusterSettings)
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings)
if (remainingMem < 1L) {
logger.error("Shrink action failed as the previously selected node no longer has enough free space.")
cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
Expand Down Expand Up @@ -137,8 +140,8 @@ fun getActionStartTime(managedIndexMetaData: ManagedIndexMetaData): Instant {
* parse either and get the byte value back.
*/
@Suppress("MagicNumber")
fun getFreeBytesThresholdHigh(settings: Settings, clusterSettings: ClusterSettings?, totalNodeBytes: Long): Long {
val diskThresholdSettings = DiskThresholdSettings(settings, clusterSettings)
fun getFreeBytesThresholdHigh(clusterSettings: ClusterSettings, totalNodeBytes: Long): Long {
val diskThresholdSettings = DiskThresholdSettings(getDiskSettings(clusterSettings), clusterSettings)
// Depending on how a user provided input, this setting may be a percentage or byte value
val diskThresholdPercent = diskThresholdSettings.freeDiskThresholdHigh
val diskThresholdBytes = diskThresholdSettings.freeBytesThresholdHigh
Expand All @@ -149,16 +152,29 @@ fun getFreeBytesThresholdHigh(settings: Settings, clusterSettings: ClusterSettin
} else diskThresholdBytes.bytes
}

fun getDiskSettings(clusterSettings: ClusterSettings): Settings {
return Settings.builder().put(
CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key,
clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING)
).put(
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.key,
clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING)
).put(
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key,
clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING)
).build()
}

/*
* Returns the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, or -1
* if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats.
*/
fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, settings: Settings, clusterSettings: ClusterSettings?): Long {
fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long {
val osStats = node.os
if (osStats != null) {
val memLeftInNode = osStats.mem.free.bytes
val totalNodeMem = osStats.mem.total.bytes
val freeBytesThresholdHigh = getFreeBytesThresholdHigh(settings, clusterSettings, totalNodeMem)
val freeBytesThresholdHigh = getFreeBytesThresholdHigh(clusterSettings, totalNodeMem)
// We require that a node has enough space to be below the high watermark disk level with an additional 2 * the index size free
val requiredBytes = (2 * indexSizeInBytes) + freeBytesThresholdHigh
if (memLeftInNode > requiredBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.apache.logging.log4j.LogManager
import org.junit.Before
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
Expand All @@ -19,14 +26,41 @@ import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptMo
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptShrinkStep
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForMoveShardsStep
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForShrinkStep
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.waitFor
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import java.time.Instant
import java.time.temporal.ChronoUnit

class ShrinkActionIT : IndexStateManagementRestTestCase() {
@Suppress("UnusedPrivateMember")
@Before
private fun disableJobIndexShardRelocation() {
initializeManagedIndex()
// Shrink ITs would sometimes fail on multi node setups because of the job scheduler index being moved between nodes,
// descheduling the job
updateIndexSetting(INDEX_MANAGEMENT_INDEX, "routing.allocation.enable", "none")
// When doing remote testing, the docker image seems to keep the disk free space very low, causing the shrink action
// to not be able to find a node to shrink onto. Lowering these watermarks avoids that.
val request = """
{
"persistent": {
"${CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key}": "5b",
"${CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.key}": "10b",
"${CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key}": "15b"
}
}
""".trimIndent()
val res = client().makeRequest(
"PUT", "_cluster/settings", emptyMap(),
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

private val testIndexName = javaClass.simpleName.lowercase()
private val testIndexSuffix = "_shrink_test"
fun `test basic workflow number of shards`() {
Expand Down Expand Up @@ -126,8 +160,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
@Suppress("UNCHECKED_CAST")
fun `test basic workflow max shard size`() {
val logger = LogManager.getLogger(::ShrinkActionIT)
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_2"
val policyID = "${testIndexName}_testPolicyName_2"
val testMaxShardSize: ByteSizeValue = ByteSizeValue.parseBytesSizeValue("1GB", "test")
val shrinkAction = ShrinkAction(
numNewShards = null,
Expand Down Expand Up @@ -217,8 +251,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {

@Suppress("UNCHECKED_CAST")
fun `test basic workflow percentage to decrease to`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_3"
val policyID = "${testIndexName}_testPolicyName_3"
val shrinkAction = ShrinkAction(
numNewShards = null,
maxShardSize = null,
Expand Down Expand Up @@ -309,8 +343,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
val logger = LogManager.getLogger(::ShrinkActionIT)
val nodes = getNodes()
if (nodes.size > 1) {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_4"
val policyID = "${testIndexName}_testPolicyName_4"
val shrinkAction = ShrinkAction(
numNewShards = null,
maxShardSize = null,
Expand Down Expand Up @@ -410,8 +444,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {

fun `test no-op with single source index primary shard`() {
val logger = LogManager.getLogger(::ShrinkActionIT)
val indexName = "${testIndexName}_index_1_shard_noop"
val policyID = "${testIndexName}_testPolicyName_1_shard_noop"
val indexName = "${testIndexName}_index_shard_noop"
val policyID = "${testIndexName}_testPolicyName_shard_noop"

// Create a Policy with one State that only preforms a force_merge Action
val shrinkAction = ShrinkAction(
Expand Down Expand Up @@ -561,8 +595,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
}
}

// TODO This test is excessively flaky, disabling for now but it needs to be fixed
private fun `test retries from first step`() {
fun `test retries from first step`() {
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"Shrink","states":[
{"name":"Shrink","actions":[{"retry":{"count":2,"backoff":"constant","delay":"1s"},"shrink":
Expand Down Expand Up @@ -599,7 +632,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName
var nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName
// starts WaitForMoveShardsStep
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(60)) {
Expand Down Expand Up @@ -645,6 +678,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName)
assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName)
val settings = getFlatSettings(indexName)
nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName
assertTrue("Did not set allocation setting", settings.containsKey("index.routing.allocation.require._name"))
assertTrue(settings.containsKey("index.routing.allocation.require._name"))
assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class ActionTests : OpenSearchTestCase() {
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, percentage).build()
val clusterSettings = ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.map { it }.toSet())
val totalNodeBytes = randomByteSizeValue().bytes
val thresholdBytes = getFreeBytesThresholdHigh(settings, clusterSettings, totalNodeBytes)
val thresholdBytes = getFreeBytesThresholdHigh(clusterSettings, totalNodeBytes)
val expectedThreshold: Long = ((1 - (rawPercentage.toDouble() / 100.0)) * totalNodeBytes).toLong()
assertEquals("Free bytes threshold not being calculated correctly for percentage setting.", thresholdBytes, expectedThreshold)
}
Expand All @@ -200,7 +200,7 @@ class ActionTests : OpenSearchTestCase() {
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key, byteValue)
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, byteValue).build()
val clusterSettings = ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.map { it }.toSet())
val thresholdBytes = getFreeBytesThresholdHigh(settings, clusterSettings, randomByteSizeValue().bytes)
val thresholdBytes = getFreeBytesThresholdHigh(clusterSettings, randomByteSizeValue().bytes)
assertEquals("Free bytes threshold not being calculated correctly for byte setting.", thresholdBytes, byteValue.bytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.http.HttpHeaders
import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader
import org.junit.AfterClass
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -40,14 +41,18 @@ import java.time.Instant

abstract class RollupRestTestCase : IndexManagementRestTestCase() {

override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
@AfterClass @JvmStatic fun clearIndicesAfterClassCompletion() {
wipeAllIndices()
}
}

override fun preserveDataStreamsUponCompletion(): Boolean = true
override fun preserveIndicesUponCompletion(): Boolean = true

protected fun createRollup(
rollup: Rollup,
rollupId: String = OpenSearchTestCase.randomAlphaOfLength(10),
refresh: Boolean = true
refresh: Boolean = true,
): Rollup {
val response = createRollupJson(rollup.toJsonString(), rollupId, refresh)

Expand All @@ -66,7 +71,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
protected fun createRollupJson(
rollupString: String,
rollupId: String,
refresh: Boolean = true
refresh: Boolean = true,
): Response {
val response = client()
.makeRequest(
Expand Down Expand Up @@ -128,7 +133,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

protected fun getRollup(
rollupId: String,
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"),
): Rollup {
val response = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI/$rollupId", null, header)
assertEquals("Unable to get rollup $rollupId", RestStatus.OK, response.restStatus())
Expand Down Expand Up @@ -156,7 +161,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

protected fun getRollupMetadata(
metadataId: String,
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"),
): RollupMetadata {
val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId", null, header)
assertEquals("Unable to get rollup metadata $metadataId", RestStatus.OK, response.restStatus())
Expand All @@ -166,7 +171,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
protected fun getRollupMetadataWithRoutingId(
routingId: String,
metadataId: String,
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"),
): RollupMetadata {
val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId?routing=$routingId", null, header)
assertEquals("Unable to get rollup metadata $metadataId", RestStatus.OK, response.restStatus())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class RollupRunnerIT : RollupRestTestCase() {
// Non-continuous jobs will finish in a single execution
assertEquals("Unexpected metadata state", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}
// Delete the data stream
client().makeRequest("DELETE", "/_data_stream/$dataStreamName")
}

fun `test metadata set to failed when rollup job has a metadata id but metadata doc doesn't exist`() {
Expand Down
Loading

0 comments on commit 3dc2894

Please sign in to comment.