Skip to content

Commit

Permalink
Support for system index interface (opensearch-project#789)
Browse files Browse the repository at this point in the history
* Support for system index interface

Signed-off-by: bowenlan-amzn <[email protected]>

* Handle rejected execution exception as retryable in retry block

Signed-off-by: bowenlan-amzn <[email protected]>

* Fix tests

Signed-off-by: bowenlan-amzn <[email protected]>

* Fix tests

Signed-off-by: bowenlan-amzn <[email protected]>

---------

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored May 30, 2023
1 parent 114910a commit fa7e46f
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ import org.opensearch.indexmanagement.transform.resthandler.RestPreviewTransform
import org.opensearch.indexmanagement.transform.resthandler.RestStartTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestStopTransformAction
import org.opensearch.indexmanagement.transform.settings.TransformSettings
import org.opensearch.indices.SystemIndexDescriptor
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand All @@ -185,6 +186,7 @@ import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.NetworkPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.SystemIndexPlugin
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
Expand All @@ -197,7 +199,7 @@ import org.opensearch.watcher.ResourceWatcherService
import java.util.function.Supplier

@Suppress("TooManyFunctions")
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() {
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, Plugin() {

private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
Expand Down Expand Up @@ -613,6 +615,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
override fun getActionFilters(): List<ActionFilter> {
return listOf(fieldCapsFilter, indexOperationActionFilter)
}

override fun getSystemIndexDescriptors(settings: Settings): Collection<SystemIndexDescriptor> {
return listOf(SystemIndexDescriptor(INDEX_MANAGEMENT_INDEX, "Index for storing index management configuration and metadata."))
}
}

class GuiceHolder @Inject constructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.InjectSecurity
import org.opensearch.commons.authuser.User
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException
import org.opensearch.core.xcontent.MediaType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
Expand Down Expand Up @@ -181,6 +182,12 @@ suspend fun <T> BackoffPolicy.retry(
} else {
throw e
}
} catch (rje: OpenSearchRejectedExecutionException) {
if (iter.hasNext()) {
backoff = iter.next()
logger.warn("Rejected execution. Retrying in $backoff.", rje)
delay((backoff.millis))
}
}
} while (true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
// preemptively seems to give the job scheduler time to listen to operations.
@Before
fun initializeManagedIndex() {
if (!indexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) {
if (!isIndexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) {
val request = Request("PUT", "/${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}")
var entity = "{\"settings\": " + Strings.toString(XContentType.JSON, Settings.builder().put(INDEX_HIDDEN, true).build())
entity += ",\"mappings\" : ${IndexManagementIndices.indexManagementMappings}}"
Expand All @@ -77,6 +77,11 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {

protected fun Response.restStatus(): RestStatus = RestStatus.fromCode(this.statusLine.statusCode)

protected fun isIndexExists(index: String): Boolean {
val response = client().makeRequest("HEAD", index)
return RestStatus.OK == response.restStatus()
}

protected fun assertIndexExists(index: String) {
val response = client().makeRequest("HEAD", index)
assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus())
Expand Down Expand Up @@ -165,7 +170,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
@JvmStatic
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
protected val defaultKeepIndexSet = setOf(".opendistro_security")
/**
* We override preserveIndicesUponCompletion to true and use this function to clean up indices
Expand Down
6 changes: 6 additions & 0 deletions src/test/kotlin/org/opensearch/indexmanagement/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.client.Response
import org.opensearch.client.RestClient
import org.opensearch.client.WarningsHandler
import org.opensearch.commons.authuser.User
import org.opensearch.indexmanagement.IndexManagementRestTestCase.Companion.isMultiNode
import org.opensearch.jobscheduler.spi.schedule.CronSchedule
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.jobscheduler.spi.schedule.Schedule
Expand Down Expand Up @@ -113,6 +114,11 @@ fun <T> waitFor(
timeout: Instant = Instant.ofEpochSecond(20),
block: () -> T
): T {
if (isMultiNode) {
// job scheduling could be skipped in multi-node tests
// https://github.com/opensearch-project/job-scheduler/issues/173
timeout.plusSeconds(60)
}
val startTime = Instant.now().toEpochMilli()
do {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class RestStartRollupActionIT : RollupRestTestCase() {
assertEquals("Rollup is not RETRY", RollupMetadata.Status.RETRY, rollupMetadata.status)

// clearing the config index to prevent other tests using this multi shard index
Thread.sleep(2000L)
deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)
Thread.sleep(2000L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ class RestStopRollupActionIT : RollupRestTestCase() {
assertEquals("Rollup is not STOPPED", RollupMetadata.Status.STOPPED, rollupMetadata.status)

// clearing the config index to prevent other tests using this multi shard index
Thread.sleep(2000L)
deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)
Thread.sleep(2000L)
}
}

0 comments on commit fa7e46f

Please sign in to comment.