Skip to content

Commit

Permalink
Unify test clean logic (#609)
Browse files Browse the repository at this point in the history
* Unify wipe indices logic after tests

Signed-off-by: bowenlan-amzn <[email protected]>

* Enhance wipeAllIndices function

Signed-off-by: bowenlan-amzn <[email protected]>

* Customize cleanup for multi node test

Signed-off-by: bowenlan-amzn <[email protected]>

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored Nov 17, 2022
1 parent a21e4a6 commit d91df69
Show file tree
Hide file tree
Showing 22 changed files with 186 additions and 142 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bwc-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 11
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 11
java-version: 17
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ jobs:
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 11
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 11
java-version: 17
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,13 @@ object ManagedIndexRunner :
@Suppress("ComplexCondition", "MaxLineLength")
if (updateResult.metadataSaved && state != null && action != null && step != null && currentActionMetaData != null) {
if (validationServiceEnabled) {
val validationResult = actionValidation.validate(action.type, stepContext.metadata.index)
val validationResult = withClosableContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user
)
) {
actionValidation.validate(action.type, stepContext.metadata.index)
}
if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) {
logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name")
publishErrorNotification(policy, managedIndexMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ class TransportExplainAction @Inject constructor(
filteredIndices.add(indexNames[i])
filteredMetadata.add(indexMetadatas[i])
filteredPolicies.add(indexPolicyIDs[i])
validationResults[i]?.let { filteredValidationResult.add(it) }
validationResults[i].let { filteredValidationResult.add(it) }
enabledState[indexNames[i]]?.let { enabledStatus[indexNames[i]] = it }
appliedPolicies[indexNames[i]]?.let { filteredAppliedPolicies[indexNames[i]] = it }
} catch (e: OpenSearchSecurityException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.jobscheduler.spi.ScheduledJobParameter

private val logger = LogManager.getLogger("JobSchedulerUtils")
private val logger = LogManager.getLogger("o.o.i.u.JobSchedulerUtils")

/**
* Util method to attempt to get the lock on the requested scheduled job using the backoff policy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() {
}

fun `test update management index mapping with new schema version`() {
wipeAllODFEIndices()
waitForPendingTasks(adminClient())
wipeAllIndices()
assertIndexDoesNotExist(INDEX_MANAGEMENT_INDEX)

val mapping = indexManagementMappings.trim().trimStart('{').trimEnd('}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,31 @@ import org.apache.http.entity.StringEntity
import org.junit.AfterClass
import org.junit.Before
import org.junit.rules.DisableOnDebug
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.Response
import org.opensearch.client.RestClient
import org.opensearch.client.RequestOptions
import org.opensearch.client.WarningsHandler
import org.opensearch.client.ResponseException
import org.opensearch.common.Strings
import org.opensearch.common.collect.Set
import org.opensearch.common.io.PathUtils
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.DeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.rest.RestStatus
import java.io.IOException
import java.nio.file.Files
import java.util.*
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashSet

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

Expand Down Expand Up @@ -57,7 +68,6 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {

protected val isDebuggingTest = DisableOnDebug(null).isDebugging
protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1

protected val isLocalTest = clusterName() == "integTest"
private fun clusterName(): String {
Expand Down Expand Up @@ -153,7 +163,115 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
@JvmStatic
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
protected val defaultKeepIndexSet = setOf(".opendistro_security")
/**
* We override preserveIndicesUponCompletion to true and use this function to clean up indices
* Meant to be used in @After or @AfterClass of your feature test suite
*/
fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set<String> = defaultKeepIndexSet) {
try {
client.performRequest(Request("DELETE", "_data_stream/*"))
} catch (e: ResponseException) {
// We hit a version of ES that doesn't serialize DeleteDataStreamAction.Request#wildcardExpressionsOriginallySpecified field or
// that doesn't support data streams so it's safe to ignore
val statusCode = e.response.statusLine.statusCode
if (!Set.of(404, 405, 500).contains(statusCode)) {
throw e
}
}

val response = client.performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all"))
val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val indexName: String = jsonObject["index"] as String
// .opendistro_security isn't allowed to delete from cluster
if (!keepIndex.contains(indexName)) {
val request = Request("DELETE", "/$indexName")
// TODO: remove PERMISSIVE option after moving system index access to REST API call
val options = RequestOptions.DEFAULT.toBuilder()
options.setWarningsHandler(WarningsHandler.PERMISSIVE)
request.options = options.build()
client.performRequest(request)
}
}
}

waitFor {
if (!isMultiNode) {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
} else {
// Multi node test is not suitable to waitFor
// We have seen long-running write task that fails the waitFor
// probably because of cluster manager - data node task not in sync
// So instead we just sleep 1s after wiping indices
Thread.sleep(1_000)
}
}
}

@JvmStatic
@Throws(IOException::class)
protected fun waitForRunningTasks(client: RestClient) {
val runningTasks: MutableSet<String> = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed")))
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.")
}

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
// Ignore the task list API - it doesn't count against us
if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue
// runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
runningTasks.add(task.toString())
}
}
return runningTasks
}

@JvmStatic
protected fun waitForThreadPools(client: RestClient) {
val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}

internal interface IProxy {
val version: String?
var sessionId: String?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() {
private val testRole = "test_role"
var testClient: RestClient? = null

override fun preserveIndicesUponCompletion(): Boolean {
return true
}

@Before
fun setupUsersAndRoles() {
updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false)
Expand Down
105 changes: 0 additions & 105 deletions src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,9 @@
package org.opensearch.indexmanagement

import org.apache.http.HttpHost
import org.junit.After
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.Response
import org.opensearch.client.RestClient
import org.opensearch.client.WarningsHandler
import org.opensearch.common.io.PathUtils
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.DeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_ENABLED
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD
Expand All @@ -26,7 +17,6 @@ import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_PEMCE
import org.opensearch.commons.rest.SecureRestClientBuilder
import org.opensearch.test.rest.OpenSearchRestTestCase
import java.io.IOException
import java.time.Instant

abstract class ODFERestTestCase : OpenSearchRestTestCase() {

Expand All @@ -36,101 +26,6 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() {

override fun getProtocol(): String = if (isHttps()) "https" else "http"

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
runningTasks.add(task["action"].toString())
}
}
return runningTasks
}

@After
fun waitForCleanup() {
waitFor {
waitForRunningTasks()
waitForThreadPools()
waitForPendingTasks(adminClient())
}
}

@Throws(IOException::class)
private fun waitForRunningTasks() {
waitFor(timeout = Instant.ofEpochSecond(5)) {
val runningTasks: MutableSet<String> = runningTasks(adminClient().performRequest(Request("GET", "/_tasks")))
// Ignore the task list API - it doesn't count against us
runningTasks.remove(ListTasksAction.NAME)
runningTasks.remove(ListTasksAction.NAME + "[n]")
if (runningTasks.isEmpty()) {
return@waitFor
}
val stillRunning = ArrayList<String>(runningTasks)
fail("There are still tasks running after this test that might break subsequent tests $stillRunning.")
}
}

private fun waitForThreadPools() {
waitFor {
val response = client().performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}
}

open fun preserveODFEIndicesAfterTest(): Boolean = false

@Throws(IOException::class)
open fun wipeAllODFEIndices() {
if (preserveODFEIndicesAfterTest()) return

// Delete all data stream indices
client().performRequest(Request("DELETE", "/_data_stream/*"))

// Delete all indices
val response = client().performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val indexName: String = jsonObject["index"] as String
// .opendistro_security isn't allowed to delete from cluster
if (".opendistro_security" != indexName) {
val request = Request("DELETE", "/$indexName")
// TODO: remove PERMISSIVE option after moving system index access to REST API call
val options = RequestOptions.DEFAULT.toBuilder()
options.setWarningsHandler(WarningsHandler.PERMISSIVE)
request.options = options.build()
adminClient().performRequest(request)
}
}
}
}
/**
* Returns the REST client settings used for super-admin actions like cleaning up after the test has completed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ class RollupSecurityBehaviorIT : SecurityRestTestCase() {
private val testRole = "test_role"
var testUserClient: RestClient? = null

override fun preserveIndicesUponCompletion(): Boolean {
return true
}

@Before
fun setupUsersAndRoles() {
updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ class SecurityBehaviorIT : SecurityRestTestCase() {
private val john = "john"
private var johnClient: RestClient? = null

override fun preserveIndicesUponCompletion(): Boolean {
return true
}

@Before
fun setupUsersAndRoles() {
updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ class TransformSecurityBehaviorIT : SecurityRestTestCase() {
private val testRole = "test_role"
var testUserClient: RestClient? = null

override fun preserveIndicesUponCompletion(): Boolean {
return true
}

@Before
fun setupUsersAndRoles() {
updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false)
Expand Down
Loading

0 comments on commit d91df69

Please sign in to comment.