Skip to content

Commit

Permalink
Merge branch 'main' into securityworkflow
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn committed Nov 18, 2022
2 parents de1c773 + d91df69 commit cdedcf0
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 96 deletions.
10 changes: 7 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ afterEvaluate {
node.setting("plugins.security.check_snapshot_restore_write_privileges", "true")
node.setting("plugins.security.restapi.roles_enabled", "[\"all_access\", \"security_rest_api_access\"]")
node.setting("plugins.security.system_indices.enabled", "true")
// node.setting("plugins.security.system_indices.indices", "[\".opendistro-ism-config\"]")
node.setting("plugins.security.system_indices.indices", "[\".opendistro-alerting-config\"]")
}
}
}
Expand Down Expand Up @@ -431,6 +431,7 @@ def configureCluster(OpenSearchCluster cluster, Boolean securityEnabled) {
//
// [email protected]("cluster health yellow", pred)
// cluster.waitForAllConditions()
Thread.sleep(5_000)
}

integTest {
Expand Down Expand Up @@ -477,6 +478,8 @@ integTest {
filter {
excludeTestsMatching "org.opensearch.indexmanagement.indexstatemanagement.action.NotificationActionIT"
}
// Don't execute IntegTest under https
exclude 'org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.class'
}

// TODO: raise issue in Core, this is because of the test framework
Expand Down Expand Up @@ -705,8 +708,9 @@ run {
getClusters().forEach { cluster ->
if (securityEnabled) {
// TODO: This is a bit of a hack
LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>()
cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster)
// LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>()
// cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster)
configureCluster(cluster, securityEnabled)
} else {
cluster.waitForAllConditions()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
import org.junit.AfterClass
import org.junit.Before
import org.junit.rules.DisableOnDebug
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.Response
import org.opensearch.client.RestClient
Expand All @@ -25,11 +26,15 @@ import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.rest.RestStatus
import java.io.IOException
import java.nio.file.Files
import java.util.*
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashSet

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

Expand Down Expand Up @@ -63,7 +68,6 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {

protected val isDebuggingTest = DisableOnDebug(null).isDebugging
protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1

protected val isLocalTest = clusterName() == "integTest"
private fun clusterName(): String {
Expand Down Expand Up @@ -160,21 +164,15 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}

override fun preserveIndicesUponCompletion(): Boolean = true

companion object {

@JvmStatic
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
protected val defaultKeepIndexSet = setOf(".opendistro_security")
/**
* This clean up function can be use in @After or @AfterClass in the base test file
* of your feature test suite
* We override preserveIndicesUponCompletion to true and use this function to clean up indices
* Meant to be used in @After or @AfterClass of your feature test suite
*/
fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set<String> = defaultKeepIndexSet) {
waitFor {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
}
// Delete all data stream indices
try {
client.performRequest(Request("DELETE", "_data_stream/*"))
} catch (e: ResponseException) {
Expand All @@ -186,9 +184,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

// Delete all indices
val response = client.performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
Expand All @@ -208,6 +204,72 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}
}

waitFor {
if (!isMultiNode) {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
} else {
// Multi node test is not suitable to waitFor
// We have seen long-running write task that fails the waitFor
// probably because of cluster manager - data node task not in sync
// So instead we just sleep 1s after wiping indices
Thread.sleep(1_000)
}
}
}

@JvmStatic
@Throws(IOException::class)
protected fun waitForRunningTasks(client: RestClient) {
val runningTasks: MutableSet<String> = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed")))
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.")
}

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
// Ignore the task list API - it doesn't count against us
if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue
// runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
runningTasks.add(task.toString())
}
}
return runningTasks
}

@JvmStatic
protected fun waitForThreadPools(client: RestClient) {
val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}

internal interface IProxy {
Expand Down
87 changes: 7 additions & 80 deletions src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,9 @@
package org.opensearch.indexmanagement

import org.apache.http.HttpHost
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.Response
import org.opensearch.client.RestClient
import org.opensearch.common.io.PathUtils
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.DeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_ENABLED
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD
Expand All @@ -22,7 +17,6 @@ import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_PEMCE
import org.opensearch.commons.rest.SecureRestClientBuilder
import org.opensearch.test.rest.OpenSearchRestTestCase
import java.io.IOException
import java.util.*

abstract class ODFERestTestCase : OpenSearchRestTestCase() {

Expand All @@ -32,60 +26,6 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() {

override fun getProtocol(): String = if (isHttps()) "https" else "http"

companion object {
@JvmStatic
@Throws(IOException::class)
protected fun waitForRunningTasks(client: RestClient) {
val runningTasks: MutableSet<String> = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed")))
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.")
}

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
// Ignore the task list API - it doesn't count against us
if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue
// runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
runningTasks.add(task.toString())
}
}
return runningTasks
}

@JvmStatic
protected fun waitForThreadPools(client: RestClient) {
val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}
}

/**
* Returns the REST client settings used for super-admin actions like cleaning up after the test has completed.
*/
Expand All @@ -107,31 +47,18 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() {
val keystore = settings.get(OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH)
return when (keystore != null) {
true -> {
println("Build super admin client")
logger.info("set up super admin")
// create adminDN (super-admin) client
// val uri = javaClass.classLoader.getResource("security/sample.pem")?.toURI()
// val configPath = PathUtils.get(uri).parent.toAbsolutePath()
// SecureRestClientBuilder(settings, configPath).setSocketTimeout(60000).build()

val userName = System.getProperty("user")
val password = System.getProperty("password")
println("Build client with user:password $userName:$password")
hosts.map {
println("Host uri ${it.toURI()}")
}
SecureRestClientBuilder(hosts, isHttps(), userName, password)
.setSocketTimeout(60000).build()
val uri = javaClass.classLoader.getResource("security/sample.pem")?.toURI()
val configPath = PathUtils.get(uri).parent.toAbsolutePath()
SecureRestClientBuilder(settings, configPath, hosts).setSocketTimeout(5000).build()
}
false -> {
logger.info("set up admin")
// create client with passed user
println("Build admin client")
val userName = System.getProperty("user")
val password = System.getProperty("password")
println("Build client with user:password $userName:$password")
hosts.map {
println("Host uri ${it.toURI()}")
}
SecureRestClientBuilder(hosts, isHttps(), userName, password).setSocketTimeout(60000).build()
SecureRestClientBuilder(hosts, isHttps(), userName, password).setSocketTimeout(5000).build()
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.junit.Before
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -20,6 +24,14 @@ import java.util.Locale
class ActionRetryIT : IndexStateManagementRestTestCase() {
private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT)

@Before
fun setDebugLogLevel() {
client().makeRequest(
"PUT", "_cluster/settings",
StringEntity("""{"transient":{"logger.org.opensearch.security":"DEBUG"}}""", ContentType.APPLICATION_JSON)
)
}

/**
* We are forcing RollOver to fail in this Integ test.
*/
Expand Down

0 comments on commit cdedcf0

Please sign in to comment.