diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index f3e03226d..cdb0e7a1a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -177,6 +177,7 @@ import org.opensearch.indexmanagement.transform.resthandler.RestPreviewTransform import org.opensearch.indexmanagement.transform.resthandler.RestStartTransformAction import org.opensearch.indexmanagement.transform.resthandler.RestStopTransformAction import org.opensearch.indexmanagement.transform.settings.TransformSettings +import org.opensearch.indices.SystemIndexDescriptor import org.opensearch.jobscheduler.spi.JobSchedulerExtension import org.opensearch.jobscheduler.spi.ScheduledJobParser import org.opensearch.jobscheduler.spi.ScheduledJobRunner @@ -185,6 +186,7 @@ import org.opensearch.plugins.ActionPlugin import org.opensearch.plugins.ExtensiblePlugin import org.opensearch.plugins.NetworkPlugin import org.opensearch.plugins.Plugin +import org.opensearch.plugins.SystemIndexPlugin import org.opensearch.repositories.RepositoriesService import org.opensearch.rest.RestController import org.opensearch.rest.RestHandler @@ -197,7 +199,7 @@ import org.opensearch.watcher.ResourceWatcherService import java.util.function.Supplier @Suppress("TooManyFunctions") -class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() { +class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, Plugin() { private val logger = LogManager.getLogger(javaClass) lateinit var indexManagementIndices: IndexManagementIndices @@ -613,6 +615,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin override fun getActionFilters(): List { return listOf(fieldCapsFilter, indexOperationActionFilter) } + + override fun getSystemIndexDescriptors(settings: Settings): Collection { + return listOf(SystemIndexDescriptor(INDEX_MANAGEMENT_INDEX, "Index for storing index management configuration and metadata.")) + } } class GuiceHolder @Inject constructor( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index 57ac86df8..0ac8b94f9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -38,6 +38,7 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.InjectSecurity import org.opensearch.commons.authuser.User import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException import org.opensearch.core.xcontent.MediaType import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent @@ -181,6 +182,12 @@ suspend fun BackoffPolicy.retry( } else { throw e } + } catch (rje: OpenSearchRejectedExecutionException) { + if (iter.hasNext()) { + backoff = iter.next() + logger.warn("Rejected execution. Retrying in $backoff.", rje) + delay((backoff.millis)) + } } } while (true) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 4162f9964..673590718 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -56,7 +56,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { // preemptively seems to give the job scheduler time to listen to operations. @Before fun initializeManagedIndex() { - if (!indexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) { + if (!isIndexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) { val request = Request("PUT", "/${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}") var entity = "{\"settings\": " + Strings.toString(XContentType.JSON, Settings.builder().put(INDEX_HIDDEN, true).build()) entity += ",\"mappings\" : ${IndexManagementIndices.indexManagementMappings}}" @@ -77,6 +77,11 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { protected fun Response.restStatus(): RestStatus = RestStatus.fromCode(this.statusLine.statusCode) + protected fun isIndexExists(index: String): Boolean { + val response = client().makeRequest("HEAD", index) + return RestStatus.OK == response.restStatus() + } + protected fun assertIndexExists(index: String) { val response = client().makeRequest("HEAD", index) assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus()) @@ -165,7 +170,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { override fun preserveIndicesUponCompletion(): Boolean = true companion object { @JvmStatic - protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 + val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 protected val defaultKeepIndexSet = setOf(".opendistro_security") /** * We override preserveIndicesUponCompletion to true and use this function to clean up indices diff --git a/src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt index e3ea00f44..ad4bd014c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt @@ -13,6 +13,7 @@ import org.opensearch.client.Response import org.opensearch.client.RestClient import org.opensearch.client.WarningsHandler import org.opensearch.commons.authuser.User +import org.opensearch.indexmanagement.IndexManagementRestTestCase.Companion.isMultiNode import org.opensearch.jobscheduler.spi.schedule.CronSchedule import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.jobscheduler.spi.schedule.Schedule @@ -113,6 +114,11 @@ fun waitFor( timeout: Instant = Instant.ofEpochSecond(20), block: () -> T ): T { + if (isMultiNode) { + // job scheduling could be skipped in multi-node tests + // https://github.com/opensearch-project/job-scheduler/issues/173 + timeout.plusSeconds(60) + } val startTime = Instant.now().toEpochMilli() do { try { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt index ddb008dc7..1f9a32f3c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt @@ -247,6 +247,8 @@ class RestStartRollupActionIT : RollupRestTestCase() { assertEquals("Rollup is not RETRY", RollupMetadata.Status.RETRY, rollupMetadata.status) // clearing the config index to prevent other tests using this multi shard index + Thread.sleep(2000L) deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) + Thread.sleep(2000L) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt index e29b55a15..95d28756a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt @@ -310,6 +310,8 @@ class RestStopRollupActionIT : RollupRestTestCase() { assertEquals("Rollup is not STOPPED", RollupMetadata.Status.STOPPED, rollupMetadata.status) // clearing the config index to prevent other tests using this multi shard index + Thread.sleep(2000L) deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) + Thread.sleep(2000L) } }