Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.0] Fixes flaky continuous transforms and shrink tests #341

Merged
merged 1 commit into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) {
for (node in nodesList) {
// Gets the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes,
// as the source index is duplicated during the shrink
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, stepContext.settings, stepContext.clusterService.clusterSettings)
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, stepContext.clusterService.clusterSettings)
if (remainingMem > 0L) {
nodesWithSpace.add(Tuple(remainingMem, node.node.name))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
cleanupAndFail(FAILURE_MESSAGE)
return false
}
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.settings, context.clusterService.clusterSettings)
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings)
if (remainingMem < 1L) {
logger.error("Shrink action failed as the previously selected node no longer has enough free space.")
cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
Expand Down Expand Up @@ -137,8 +140,8 @@ fun getActionStartTime(managedIndexMetaData: ManagedIndexMetaData): Instant {
* parse either and get the byte value back.
*/
@Suppress("MagicNumber")
fun getFreeBytesThresholdHigh(settings: Settings, clusterSettings: ClusterSettings?, totalNodeBytes: Long): Long {
val diskThresholdSettings = DiskThresholdSettings(settings, clusterSettings)
fun getFreeBytesThresholdHigh(clusterSettings: ClusterSettings, totalNodeBytes: Long): Long {
val diskThresholdSettings = DiskThresholdSettings(getDiskSettings(clusterSettings), clusterSettings)
// Depending on how a user provided input, this setting may be a percentage or byte value
val diskThresholdPercent = diskThresholdSettings.freeDiskThresholdHigh
val diskThresholdBytes = diskThresholdSettings.freeBytesThresholdHigh
Expand All @@ -149,16 +152,29 @@ fun getFreeBytesThresholdHigh(settings: Settings, clusterSettings: ClusterSettin
} else diskThresholdBytes.bytes
}

fun getDiskSettings(clusterSettings: ClusterSettings): Settings {
return Settings.builder().put(
CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key,
clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING)
).put(
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.key,
clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING)
).put(
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key,
clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING)
).build()
}

/*
* Returns the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, or -1
* if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats.
*/
fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, settings: Settings, clusterSettings: ClusterSettings?): Long {
fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long {
val osStats = node.os
if (osStats != null) {
val memLeftInNode = osStats.mem.free.bytes
val totalNodeMem = osStats.mem.total.bytes
val freeBytesThresholdHigh = getFreeBytesThresholdHigh(settings, clusterSettings, totalNodeMem)
val freeBytesThresholdHigh = getFreeBytesThresholdHigh(clusterSettings, totalNodeMem)
// We require that a node has enough space to be below the high watermark disk level with an additional 2 * the index size free
val requiredBytes = (2 * indexSizeInBytes) + freeBytesThresholdHigh
if (memLeftInNode > requiredBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.apache.logging.log4j.LogManager
import org.junit.Before
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
Expand All @@ -19,14 +26,41 @@ import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptMo
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptShrinkStep
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForMoveShardsStep
import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForShrinkStep
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.waitFor
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import java.time.Instant
import java.time.temporal.ChronoUnit

class ShrinkActionIT : IndexStateManagementRestTestCase() {
@Suppress("UnusedPrivateMember")
@Before
private fun disableJobIndexShardRelocation() {
initializeManagedIndex()
// Shrink ITs would sometimes fail on multi node setups because of the job scheduler index being moved between nodes,
// descheduling the job
updateIndexSetting(INDEX_MANAGEMENT_INDEX, "routing.allocation.enable", "none")
// When doing remote testing, the docker image seems to keep the disk free space very low, causing the shrink action
// to not be able to find a node to shrink onto. Lowering these watermarks avoids that.
val request = """
{
"persistent": {
"${CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key}": "5b",
"${CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.key}": "10b",
"${CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key}": "15b"
}
}
""".trimIndent()
val res = client().makeRequest(
"PUT", "_cluster/settings", emptyMap(),
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

private val testIndexName = javaClass.simpleName.lowercase()
private val testIndexSuffix = "_shrink_test"
fun `test basic workflow number of shards`() {
Expand Down Expand Up @@ -126,8 +160,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
@Suppress("UNCHECKED_CAST")
fun `test basic workflow max shard size`() {
val logger = LogManager.getLogger(::ShrinkActionIT)
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_2"
val policyID = "${testIndexName}_testPolicyName_2"
val testMaxShardSize: ByteSizeValue = ByteSizeValue.parseBytesSizeValue("1GB", "test")
val shrinkAction = ShrinkAction(
numNewShards = null,
Expand Down Expand Up @@ -217,8 +251,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {

@Suppress("UNCHECKED_CAST")
fun `test basic workflow percentage to decrease to`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_3"
val policyID = "${testIndexName}_testPolicyName_3"
val shrinkAction = ShrinkAction(
numNewShards = null,
maxShardSize = null,
Expand Down Expand Up @@ -309,8 +343,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
val logger = LogManager.getLogger(::ShrinkActionIT)
val nodes = getNodes()
if (nodes.size > 1) {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_4"
val policyID = "${testIndexName}_testPolicyName_4"
val shrinkAction = ShrinkAction(
numNewShards = null,
maxShardSize = null,
Expand Down Expand Up @@ -410,8 +444,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {

fun `test no-op with single source index primary shard`() {
val logger = LogManager.getLogger(::ShrinkActionIT)
val indexName = "${testIndexName}_index_1_shard_noop"
val policyID = "${testIndexName}_testPolicyName_1_shard_noop"
val indexName = "${testIndexName}_index_shard_noop"
val policyID = "${testIndexName}_testPolicyName_shard_noop"

// Create a Policy with one State that only preforms a force_merge Action
val shrinkAction = ShrinkAction(
Expand Down Expand Up @@ -561,8 +595,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
}
}

// TODO This test is excessively flaky, disabling for now but it needs to be fixed
private fun `test retries from first step`() {
fun `test retries from first step`() {
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"Shrink","states":[
{"name":"Shrink","actions":[{"retry":{"count":2,"backoff":"constant","delay":"1s"},"shrink":
Expand Down Expand Up @@ -599,7 +632,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName
var nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName
// starts WaitForMoveShardsStep
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor(Instant.ofEpochSecond(60)) {
Expand Down Expand Up @@ -645,6 +678,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() {
assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName)
assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName)
val settings = getFlatSettings(indexName)
nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName
assertTrue("Did not set allocation setting", settings.containsKey("index.routing.allocation.require._name"))
assertTrue(settings.containsKey("index.routing.allocation.require._name"))
assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class ActionTests : OpenSearchTestCase() {
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, percentage).build()
val clusterSettings = ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.map { it }.toSet())
val totalNodeBytes = randomByteSizeValue().bytes
val thresholdBytes = getFreeBytesThresholdHigh(settings, clusterSettings, totalNodeBytes)
val thresholdBytes = getFreeBytesThresholdHigh(clusterSettings, totalNodeBytes)
val expectedThreshold: Long = ((1 - (rawPercentage.toDouble() / 100.0)) * totalNodeBytes).toLong()
assertEquals("Free bytes threshold not being calculated correctly for percentage setting.", thresholdBytes, expectedThreshold)
}
Expand All @@ -200,7 +200,7 @@ class ActionTests : OpenSearchTestCase() {
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key, byteValue)
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, byteValue).build()
val clusterSettings = ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.map { it }.toSet())
val thresholdBytes = getFreeBytesThresholdHigh(settings, clusterSettings, randomByteSizeValue().bytes)
val thresholdBytes = getFreeBytesThresholdHigh(clusterSettings, randomByteSizeValue().bytes)
assertEquals("Free bytes threshold not being calculated correctly for byte setting.", thresholdBytes, byteValue.bytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.http.HttpHeaders
import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader
import org.junit.AfterClass
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -40,14 +41,18 @@ import java.time.Instant

abstract class RollupRestTestCase : IndexManagementRestTestCase() {

override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
@AfterClass @JvmStatic fun clearIndicesAfterClassCompletion() {
wipeAllIndices()
}
}

override fun preserveDataStreamsUponCompletion(): Boolean = true
override fun preserveIndicesUponCompletion(): Boolean = true

protected fun createRollup(
rollup: Rollup,
rollupId: String = OpenSearchTestCase.randomAlphaOfLength(10),
refresh: Boolean = true
refresh: Boolean = true,
): Rollup {
val response = createRollupJson(rollup.toJsonString(), rollupId, refresh)

Expand All @@ -66,7 +71,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
protected fun createRollupJson(
rollupString: String,
rollupId: String,
refresh: Boolean = true
refresh: Boolean = true,
): Response {
val response = client()
.makeRequest(
Expand Down Expand Up @@ -128,7 +133,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

protected fun getRollup(
rollupId: String,
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"),
): Rollup {
val response = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI/$rollupId", null, header)
assertEquals("Unable to get rollup $rollupId", RestStatus.OK, response.restStatus())
Expand Down Expand Up @@ -156,7 +161,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

protected fun getRollupMetadata(
metadataId: String,
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"),
): RollupMetadata {
val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId", null, header)
assertEquals("Unable to get rollup metadata $metadataId", RestStatus.OK, response.restStatus())
Expand All @@ -166,7 +171,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
protected fun getRollupMetadataWithRoutingId(
routingId: String,
metadataId: String,
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")
header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"),
): RollupMetadata {
val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId?routing=$routingId", null, header)
assertEquals("Unable to get rollup metadata $metadataId", RestStatus.OK, response.restStatus())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class RollupRunnerIT : RollupRestTestCase() {
// Non-continuous jobs will finish in a single execution
assertEquals("Unexpected metadata state", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}
// Delete the data stream
client().makeRequest("DELETE", "/_data_stream/$dataStreamName")
}

fun `test metadata set to failed when rollup job has a metadata id but metadata doc doesn't exist`() {
Expand Down
Loading