Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport main] Support for system index interface #799

Merged
merged 1 commit into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ import org.opensearch.indexmanagement.transform.resthandler.RestPreviewTransform
import org.opensearch.indexmanagement.transform.resthandler.RestStartTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestStopTransformAction
import org.opensearch.indexmanagement.transform.settings.TransformSettings
import org.opensearch.indices.SystemIndexDescriptor
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand All @@ -183,6 +184,7 @@ import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.NetworkPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.SystemIndexPlugin
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
Expand All @@ -195,7 +197,7 @@ import org.opensearch.watcher.ResourceWatcherService
import java.util.function.Supplier

@Suppress("TooManyFunctions")
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() {
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, Plugin() {

private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
Expand Down Expand Up @@ -608,6 +610,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
override fun getActionFilters(): List<ActionFilter> {
return listOf(fieldCapsFilter, indexOperationActionFilter)
}

override fun getSystemIndexDescriptors(settings: Settings): Collection<SystemIndexDescriptor> {
return listOf(SystemIndexDescriptor(INDEX_MANAGEMENT_INDEX, "Index for storing index management configuration and metadata."))
}
}

class GuiceHolder @Inject constructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.InjectSecurity
import org.opensearch.commons.authuser.User
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException
import org.opensearch.core.xcontent.MediaType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
Expand Down Expand Up @@ -183,6 +184,12 @@ suspend fun <T> BackoffPolicy.retry(
} else {
throw e
}
} catch (rje: OpenSearchRejectedExecutionException) {
if (iter.hasNext()) {
backoff = iter.next()
logger.warn("Rejected execution. Retrying in $backoff.", rje)
delay((backoff.millis))
}
}
} while (true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
// preemptively seems to give the job scheduler time to listen to operations.
@Before
fun initializeManagedIndex() {
if (!indexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) {
if (!isIndexExists(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)) {
val request = Request("PUT", "/${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}")
var entity = "{\"settings\": " + Strings.toString(XContentType.JSON, Settings.builder().put(INDEX_HIDDEN, true).build())
entity += ",\"mappings\" : ${IndexManagementIndices.indexManagementMappings}}"
Expand All @@ -77,6 +77,11 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {

protected fun Response.restStatus(): RestStatus = RestStatus.fromCode(this.statusLine.statusCode)

protected fun isIndexExists(index: String): Boolean {
val response = client().makeRequest("HEAD", index)
return RestStatus.OK == response.restStatus()
}

protected fun assertIndexExists(index: String) {
val response = client().makeRequest("HEAD", index)
assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus())
Expand Down Expand Up @@ -165,7 +170,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
@JvmStatic
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
protected val defaultKeepIndexSet = setOf(".opendistro_security")
/**
* We override preserveIndicesUponCompletion to true and use this function to clean up indices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.client.Response
import org.opensearch.client.RestClient
import org.opensearch.client.WarningsHandler
import org.opensearch.commons.authuser.User
import org.opensearch.indexmanagement.IndexManagementRestTestCase.Companion.isMultiNode
import org.opensearch.jobscheduler.spi.schedule.CronSchedule
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.jobscheduler.spi.schedule.Schedule
Expand Down Expand Up @@ -113,6 +114,11 @@ fun <T> waitFor(
timeout: Instant = Instant.ofEpochSecond(20),
block: () -> T
): T {
if (isMultiNode) {
// job scheduling could be skipped in multi-node tests
// https://github.com/opensearch-project/job-scheduler/issues/173
timeout.plusSeconds(60)
}
val startTime = Instant.now().toEpochMilli()
do {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class RestStartRollupActionIT : RollupRestTestCase() {
assertEquals("Rollup is not RETRY", RollupMetadata.Status.RETRY, rollupMetadata.status)

// clearing the config index to prevent other tests using this multi shard index
Thread.sleep(2000L)
deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)
Thread.sleep(2000L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ class RestStopRollupActionIT : RollupRestTestCase() {
assertEquals("Rollup is not STOPPED", RollupMetadata.Status.STOPPED, rollupMetadata.status)

// clearing the config index to prevent other tests using this multi shard index
Thread.sleep(2000L)
deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)
Thread.sleep(2000L)
}
}