From 81b80053a949c0191928baaa5f40f650410002f5 Mon Sep 17 00:00:00 2001 From: Clay Downs Date: Thu, 21 Apr 2022 09:45:35 -0700 Subject: [PATCH] Fixes flaky continuous transforms and shrink tests (#340) * fixes flaky tests Signed-off-by: Clay Downs * Deletes data stream manually Signed-off-by: Clay Downs --- .../step/shrink/AttemptMoveShardsStep.kt | 2 +- .../step/shrink/AttemptShrinkStep.kt | 2 +- .../indexstatemanagement/util/StepUtils.kt | 24 ++++++-- .../action/ShrinkActionIT.kt | 56 +++++++++++++++---- .../indexstatemanagement/model/ActionTests.kt | 4 +- .../rollup/RollupRestTestCase.kt | 19 ++++--- .../rollup/runner/RollupRunnerIT.kt | 2 + .../transform/TransformRestTestCase.kt | 17 ++++-- .../transform/TransformRunnerIT.kt | 2 + 9 files changed, 97 insertions(+), 31 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt index cf4dafe29..cdd617653 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt @@ -305,7 +305,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : Step(name) { for (node in nodesList) { // Gets the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, // as the source index is duplicated during the shrink - val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, stepContext.settings, stepContext.clusterService.clusterSettings) + val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, stepContext.clusterService.clusterSettings) if (remainingMem > 0L) { nodesWithSpace.add(Tuple(remainingMem, node.node.name)) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt index 6ba6984bb..93d18465f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt @@ -128,7 +128,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) { cleanupAndFail(FAILURE_MESSAGE) return false } - val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.settings, context.clusterService.clusterSettings) + val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings) if (remainingMem < 1L) { logger.error("Shrink action failed as the previously selected node no longer has enough free space.") cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt index a25cd47e3..0621caaa8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt @@ -16,6 +16,9 @@ import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.node.DiscoveryNodes import org.opensearch.cluster.routing.allocation.DiskThresholdSettings +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -137,8 +140,8 @@ fun getActionStartTime(managedIndexMetaData: ManagedIndexMetaData): Instant { * parse either and get the byte value back. */ @Suppress("MagicNumber") -fun getFreeBytesThresholdHigh(settings: Settings, clusterSettings: ClusterSettings?, totalNodeBytes: Long): Long { - val diskThresholdSettings = DiskThresholdSettings(settings, clusterSettings) +fun getFreeBytesThresholdHigh(clusterSettings: ClusterSettings, totalNodeBytes: Long): Long { + val diskThresholdSettings = DiskThresholdSettings(getDiskSettings(clusterSettings), clusterSettings) // Depending on how a user provided input, this setting may be a percentage or byte value val diskThresholdPercent = diskThresholdSettings.freeDiskThresholdHigh val diskThresholdBytes = diskThresholdSettings.freeBytesThresholdHigh @@ -149,16 +152,29 @@ fun getFreeBytesThresholdHigh(settings: Settings, clusterSettings: ClusterSettin } else diskThresholdBytes.bytes } +fun getDiskSettings(clusterSettings: ClusterSettings): Settings { + return Settings.builder().put( + CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key, + clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING) + ).put( + CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.key, + clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING) + ).put( + CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, + clusterSettings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING) + ).build() +} + /* * Returns the amount of memory in the node which will be free below the high watermark level after adding 2*indexSizeInBytes, or -1 * if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats. */ -fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, settings: Settings, clusterSettings: ClusterSettings?): Long { +fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long { val osStats = node.os if (osStats != null) { val memLeftInNode = osStats.mem.free.bytes val totalNodeMem = osStats.mem.total.bytes - val freeBytesThresholdHigh = getFreeBytesThresholdHigh(settings, clusterSettings, totalNodeMem) + val freeBytesThresholdHigh = getFreeBytesThresholdHigh(clusterSettings, totalNodeMem) // We require that a node has enough space to be below the high watermark disk level with an additional 2 * the index size free val requiredBytes = (2 * indexSizeInBytes) + freeBytesThresholdHigh if (memLeftInNode > requiredBytes) { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt index 77994efa1..0bdf32ab0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt @@ -5,12 +5,19 @@ package org.opensearch.indexmanagement.indexstatemanagement.action +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity import org.apache.logging.log4j.LogManager +import org.junit.Before import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING import org.opensearch.common.settings.Settings import org.opensearch.common.unit.ByteSizeValue import org.opensearch.index.query.QueryBuilders +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.model.State @@ -19,14 +26,41 @@ import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptMo import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.AttemptShrinkStep import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForMoveShardsStep import org.opensearch.indexmanagement.indexstatemanagement.step.shrink.WaitForShrinkStep +import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.waitFor +import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptType import java.time.Instant import java.time.temporal.ChronoUnit class ShrinkActionIT : IndexStateManagementRestTestCase() { + @Suppress("UnusedPrivateMember") + @Before + private fun disableJobIndexShardRelocation() { + initializeManagedIndex() + // Shrink ITs would sometimes fail on multi node setups because of the job scheduler index being moved between nodes, + // descheduling the job + updateIndexSetting(INDEX_MANAGEMENT_INDEX, "routing.allocation.enable", "none") + // When doing remote testing, the docker image seems to keep the disk free space very low, causing the shrink action + // to not be able to find a node to shrink onto. Lowering these watermarks avoids that. + val request = """ + { + "persistent": { + "${CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key}": "5b", + "${CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.key}": "10b", + "${CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key}": "15b" + } + } + """.trimIndent() + val res = client().makeRequest( + "PUT", "_cluster/settings", emptyMap(), + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Request failed", RestStatus.OK, res.restStatus()) + } + private val testIndexName = javaClass.simpleName.lowercase() private val testIndexSuffix = "_shrink_test" fun `test basic workflow number of shards`() { @@ -126,8 +160,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { @Suppress("UNCHECKED_CAST") fun `test basic workflow max shard size`() { val logger = LogManager.getLogger(::ShrinkActionIT) - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" + val indexName = "${testIndexName}_index_2" + val policyID = "${testIndexName}_testPolicyName_2" val testMaxShardSize: ByteSizeValue = ByteSizeValue.parseBytesSizeValue("1GB", "test") val shrinkAction = ShrinkAction( numNewShards = null, @@ -217,8 +251,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { @Suppress("UNCHECKED_CAST") fun `test basic workflow percentage to decrease to`() { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" + val indexName = "${testIndexName}_index_3" + val policyID = "${testIndexName}_testPolicyName_3" val shrinkAction = ShrinkAction( numNewShards = null, maxShardSize = null, @@ -309,8 +343,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { val logger = LogManager.getLogger(::ShrinkActionIT) val nodes = getNodes() if (nodes.size > 1) { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" + val indexName = "${testIndexName}_index_4" + val policyID = "${testIndexName}_testPolicyName_4" val shrinkAction = ShrinkAction( numNewShards = null, maxShardSize = null, @@ -410,8 +444,8 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { fun `test no-op with single source index primary shard`() { val logger = LogManager.getLogger(::ShrinkActionIT) - val indexName = "${testIndexName}_index_1_shard_noop" - val policyID = "${testIndexName}_testPolicyName_1_shard_noop" + val indexName = "${testIndexName}_index_shard_noop" + val policyID = "${testIndexName}_testPolicyName_shard_noop" // Create a Policy with one State that only preforms a force_merge Action val shrinkAction = ShrinkAction( @@ -561,8 +595,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { } } - // TODO This test is excessively flaky, disabling for now but it needs to be fixed - private fun `test retries from first step`() { + fun `test retries from first step`() { val testPolicy = """ {"policy":{"description":"Default policy","default_state":"Shrink","states":[ {"name":"Shrink","actions":[{"retry":{"count":2,"backoff":"constant","delay":"1s"},"shrink": @@ -599,7 +632,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + var nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName // starts WaitForMoveShardsStep updateManagedIndexConfigStartTime(managedIndexConfig) waitFor(Instant.ofEpochSecond(60)) { @@ -645,6 +678,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName) assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName) val settings = getFlatSettings(indexName) + nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName assertTrue("Did not set allocation setting", settings.containsKey("index.routing.allocation.require._name")) assertTrue(settings.containsKey("index.routing.allocation.require._name")) assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"]) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt index 0119a24ba..1dd3ca7c9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt @@ -189,7 +189,7 @@ class ActionTests : OpenSearchTestCase() { .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, percentage).build() val clusterSettings = ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.map { it }.toSet()) val totalNodeBytes = randomByteSizeValue().bytes - val thresholdBytes = getFreeBytesThresholdHigh(settings, clusterSettings, totalNodeBytes) + val thresholdBytes = getFreeBytesThresholdHigh(clusterSettings, totalNodeBytes) val expectedThreshold: Long = ((1 - (rawPercentage.toDouble() / 100.0)) * totalNodeBytes).toLong() assertEquals("Free bytes threshold not being calculated correctly for percentage setting.", thresholdBytes, expectedThreshold) } @@ -200,7 +200,7 @@ class ActionTests : OpenSearchTestCase() { .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.key, byteValue) .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, byteValue).build() val clusterSettings = ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.map { it }.toSet()) - val thresholdBytes = getFreeBytesThresholdHigh(settings, clusterSettings, randomByteSizeValue().bytes) + val thresholdBytes = getFreeBytesThresholdHigh(clusterSettings, randomByteSizeValue().bytes) assertEquals("Free bytes threshold not being calculated correctly for byte setting.", thresholdBytes, byteValue.bytes) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 53b224221..79849c9d0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -10,6 +10,7 @@ import org.apache.http.HttpHeaders import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader +import org.junit.AfterClass import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.common.settings.Settings @@ -40,14 +41,18 @@ import java.time.Instant abstract class RollupRestTestCase : IndexManagementRestTestCase() { - override fun preserveIndicesUponCompletion(): Boolean = true + companion object { + @AfterClass @JvmStatic fun clearIndicesAfterClassCompletion() { + wipeAllIndices() + } + } - override fun preserveDataStreamsUponCompletion(): Boolean = true + override fun preserveIndicesUponCompletion(): Boolean = true protected fun createRollup( rollup: Rollup, rollupId: String = OpenSearchTestCase.randomAlphaOfLength(10), - refresh: Boolean = true + refresh: Boolean = true, ): Rollup { val response = createRollupJson(rollup.toJsonString(), rollupId, refresh) @@ -66,7 +71,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun createRollupJson( rollupString: String, rollupId: String, - refresh: Boolean = true + refresh: Boolean = true, ): Response { val response = client() .makeRequest( @@ -128,7 +133,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun getRollup( rollupId: String, - header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), ): Rollup { val response = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI/$rollupId", null, header) assertEquals("Unable to get rollup $rollupId", RestStatus.OK, response.restStatus()) @@ -156,7 +161,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun getRollupMetadata( metadataId: String, - header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), ): RollupMetadata { val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId", null, header) assertEquals("Unable to get rollup metadata $metadataId", RestStatus.OK, response.restStatus()) @@ -166,7 +171,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun getRollupMetadataWithRoutingId( routingId: String, metadataId: String, - header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), ): RollupMetadata { val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId?routing=$routingId", null, header) assertEquals("Unable to get rollup metadata $metadataId", RestStatus.OK, response.restStatus()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index e895feb38..87997e3a0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -113,6 +113,8 @@ class RollupRunnerIT : RollupRestTestCase() { // Non-continuous jobs will finish in a single execution assertEquals("Unexpected metadata state", RollupMetadata.Status.FINISHED, rollupMetadata.status) } + // Delete the data stream + client().makeRequest("DELETE", "/_data_stream/$dataStreamName") } fun `test metadata set to failed when rollup job has a metadata id but metadata doc doesn't exist`() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt index 3941c9bf0..af9f2274b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt @@ -10,6 +10,7 @@ import org.apache.http.HttpHeaders import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader +import org.junit.AfterClass import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.common.settings.Settings @@ -37,12 +38,18 @@ import java.time.Instant abstract class TransformRestTestCase : IndexManagementRestTestCase() { + companion object { + @AfterClass @JvmStatic fun clearIndicesAfterClassCompletion() { + wipeAllIndices() + } + } + override fun preserveIndicesUponCompletion(): Boolean = true protected fun createTransform( transform: Transform, transformId: String = randomAlphaOfLength(10), - refresh: Boolean = true + refresh: Boolean = true, ): Transform { if (!indexExists(transform.sourceIndex)) { createTransformSourceIndex(transform) @@ -63,7 +70,7 @@ abstract class TransformRestTestCase : IndexManagementRestTestCase() { private fun createTransformJson( transformString: String, transformId: String, - refresh: Boolean = true + refresh: Boolean = true, ): Response { val response = client() .makeRequest( @@ -139,7 +146,7 @@ abstract class TransformRestTestCase : IndexManagementRestTestCase() { protected fun getTransform( transformId: String, - header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), ): Transform { val response = client().makeRequest("GET", "$TRANSFORM_BASE_URI/$transformId", null, header) assertEquals("Unable to get transform $transformId", RestStatus.OK, response.restStatus()) @@ -167,7 +174,7 @@ abstract class TransformRestTestCase : IndexManagementRestTestCase() { protected fun getTransformMetadata( metadataId: String, - header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), ): TransformMetadata { val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId", null, header) assertEquals("Unable to get transform metadata $metadataId", RestStatus.OK, response.restStatus()) @@ -197,7 +204,7 @@ abstract class TransformRestTestCase : IndexManagementRestTestCase() { @Suppress("UNCHECKED_CAST") protected fun getTransformDocumentsBehind( transformId: String, - header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), ): Map { val explainResponse = client().makeRequest("GET", "$TRANSFORM_BASE_URI/$transformId/_explain", null, header) assertEquals(RestStatus.OK, explainResponse.restStatus()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 2834da49c..f1fafb996 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -409,6 +409,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Transform did not complete iteration or had incorrect number of documents processed", 5000, transformMetadata.stats.documentsProcessed) assertEquals("Transform did not complete iteration", null, transformMetadata.afterKey) assertNotNull("Continuous stats were not updated", transformMetadata.continuousStats) + assertNotNull("Continuous stats were set, but lastTimestamp was not", transformMetadata.continuousStats!!.lastTimestamp) transformMetadata } @@ -734,6 +735,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Transform did not complete iteration or had incorrect number of documents processed", 15000, transformMetadata.stats.documentsProcessed) assertEquals("Transform did not complete iteration", null, transformMetadata.afterKey) assertNotNull("Continuous stats were not updated", transformMetadata.continuousStats) + assertNotNull("Continuous stats were set, but lastTimestamp was not", transformMetadata.continuousStats!!.lastTimestamp) transformMetadata }