Skip to content

Commit

Permalink
Merge branch 'main' into securityworkflow
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn committed Nov 19, 2022
2 parents de1c773 + d91df69 commit 1e9d5e4
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 140 deletions.
82 changes: 35 additions & 47 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.nio.file.Files
import java.util.concurrent.Callable
import java.util.concurrent.TimeUnit
import java.util.function.Predicate
import org.opensearch.gradle.http.WaitForHttpResource


buildscript {
Expand All @@ -25,6 +26,7 @@ buildscript {
opensearch_build = version_tokens[0] + '.0'
job_scheduler_no_snapshot = opensearch_build
notifications_no_snapshot = opensearch_build
security_no_snapshot = opensearch_build
if (buildVersionQualifier) {
opensearch_build += "-${buildVersionQualifier}"
job_scheduler_no_snapshot += "-${buildVersionQualifier}"
Expand All @@ -38,7 +40,6 @@ buildscript {

notifications_resource_folder = "src/test/resources/notifications"
notifications_core_resource_folder = "src/test/resources/notifications-core"
// notification_version = System.getProperty("notification.version", opensearch_build)
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler_version.version", opensearch_build)
job_scheduler_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
Expand All @@ -51,7 +52,7 @@ buildscript {

kotlin_version = System.getProperty("kotlin.version", "1.6.10")

security_plugin_version = opensearch_build.replace("-SNAPSHOT","")
security_plugin_version = System.getProperty("security.version", security_no_snapshot)
}

repositories {
Expand Down Expand Up @@ -180,7 +181,6 @@ allprojects {
configurations {
opensearchPlugin
}

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"
Expand All @@ -197,7 +197,6 @@ dependencies {
implementation "org.apache.httpcomponents:httpcore:4.4.15"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
testImplementation "org.mockito:mockito-core:4.7.0"
Expand All @@ -208,6 +207,7 @@ dependencies {
}
}

// https://aws.oss.sonatype.org/content/repositories/snapshots/org/opensearch/plugin/
opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
}

Expand Down Expand Up @@ -259,19 +259,23 @@ validateNebulaPom.enabled = false
def opensearch_tmp_dir = rootProject.file('build/private/opensearch_tmp').absoluteFile
opensearch_tmp_dir.mkdirs()

def securityPluginFile = new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.opensearchPlugin.resolvedConfiguration.resolvedArtifacts
.find { ResolvedArtifact f -> f.name.contains('opensearch-security') }
.file
// === Setup security test ===
ext.resolvePluginFile = { pluginId ->
return new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.opensearchPlugin.resolvedConfiguration.resolvedArtifacts
.find { ResolvedArtifact f -> f.name.contains(pluginId) }
.file
}
}
}
}
}
def securityPluginFile = resolvePluginFile("opensearch-security")
// This flag indicates the existence of security plugin
def securityEnabled = System.getProperty("security", "false") == "true"
afterEvaluate {
Expand All @@ -297,8 +301,8 @@ afterEvaluate {
node.setting("plugins.security.ssl.http.pemtrustedcas_filepath", "root-ca.pem")
node.setting("plugins.security.allow_unsafe_democertificates", "true")
node.setting("plugins.security.allow_default_init_securityindex", "true")
node.setting("plugins.security.authcz.admin_dn", "CN=kirk,OU=client,O=client,L=test,C=de")
// node.setting("plugins.security.audit.type", "internal_elasticsearch")
node.setting("plugins.security.authcz.admin_dn", "\n - CN=kirk,OU=client,O=client,L=test, C=de")
node.setting("plugins.security.audit.type", "internal_elasticsearch")
node.setting("plugins.security.enable_snapshot_restore_privilege", "true")
node.setting("plugins.security.check_snapshot_restore_write_privileges", "true")
node.setting("plugins.security.restapi.roles_enabled", "[\"all_access\", \"security_rest_api_access\"]")
Expand Down Expand Up @@ -401,36 +405,29 @@ testClusters.integTest {
setting 'path.repo', repo.absolutePath
}

def configureCluster(OpenSearchCluster cluster, Boolean securityEnabled) {
// clear existing health checks as we will need custom handling based on
// security plugin installation
def waitForClusterSetup(OpenSearchCluster cluster, Boolean securityEnabled) {
cluster.@waitConditions.clear()
String unicastUris = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining("\n"))
cluster.nodes.forEach {node ->
try {
// Manually write the unicast hosts as we are not depending on the internal method
Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new java.io.UncheckedIOException("Failed to write configuation files for " + this, e);
}
}

// Health check based on security plugin installation
// Predicate pred = { OpenSearchCluster c ->
// String protocol = "http"
// if(securityEnabled && !c.name.equalsIgnoreCase("integTest")) {
// protocol = "https"
// }
// CrossClusterWaitForHttpResource wait = new CrossClusterWaitForHttpResource(protocol, cluster.getFirstNode().getHttpSocketURI(), cluster.nodes.size())
// wait.setUsername("admin")
// wait.setPassword("admin")
// return wait.wait(500)
// }
//
// [email protected]("cluster health yellow", pred)
// cluster.waitForAllConditions()
Predicate pred = {
String protocol = securityEnabled ? "https" : "http"
WaitForHttpResource wait = new WaitForHttpResource(protocol, cluster.getFirstNode().getHttpSocketURI(), cluster.nodes.size())
wait.setUsername(System.getProperty("user", "admin"))
wait.setPassword(System.getProperty("password", "admin"))
return wait.wait(500)
}

cluster.@waitConditions.put("cluster health yellow", pred)
cluster.waitForAllConditions()
}

integTest {
Expand All @@ -450,8 +447,7 @@ integTest {
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
// cluster.waitForAllConditions()
configureCluster(cluster, securityEnabled)
waitForClusterSetup(cluster, securityEnabled)
}
}

Expand All @@ -477,6 +473,7 @@ integTest {
filter {
excludeTestsMatching "org.opensearch.indexmanagement.indexstatemanagement.action.NotificationActionIT"
}
exclude 'org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.class'
}

// TODO: raise issue in Core, this is because of the test framework
Expand Down Expand Up @@ -703,24 +700,18 @@ run {
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
if (securityEnabled) {
// TODO: This is a bit of a hack
LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>()
cluster.waitForConditions(waitConditions, System.currentTimeMillis(), 40, TimeUnit.SECONDS, cluster)
} else {
cluster.waitForAllConditions()
}
waitForClusterSetup(cluster, securityEnabled)
}
}
}

compileKotlin {
kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict']
// kotlinOptions.allWarningsAsErrors = true
kotlinOptions.allWarningsAsErrors = true
}

compileTestKotlin {
// kotlinOptions.allWarningsAsErrors = true
kotlinOptions.allWarningsAsErrors = true
}

apply from: 'build-tools/pkgbuild.gradle'
Expand All @@ -730,11 +721,8 @@ apply from: 'build-tools/pkgbuild.gradle'
// and new version mixed in one cluster
import org.opensearch.gradle.test.RestIntegTestTask

import java.util.stream.Collectors

def mixedClusterTest = project.tasks.create('mixedCluster', RestIntegTestTask.class)
def mixedClusterFlag = findProperty('mixed') as Boolean ?: false
println("mixed cluster flag: $mixedClusterFlag")
mixedClusterTest.dependsOn(bundlePlugin)

testClusters.mixedCluster {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
import org.junit.AfterClass
import org.junit.Before
import org.junit.rules.DisableOnDebug
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.Response
import org.opensearch.client.RestClient
Expand All @@ -25,11 +26,15 @@ import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.rest.RestStatus
import java.io.IOException
import java.nio.file.Files
import java.util.*
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashSet

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

Expand Down Expand Up @@ -63,7 +68,6 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {

protected val isDebuggingTest = DisableOnDebug(null).isDebugging
protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1

protected val isLocalTest = clusterName() == "integTest"
private fun clusterName(): String {
Expand Down Expand Up @@ -160,21 +164,15 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}

override fun preserveIndicesUponCompletion(): Boolean = true

companion object {

@JvmStatic
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
protected val defaultKeepIndexSet = setOf(".opendistro_security")
/**
* This clean up function can be use in @After or @AfterClass in the base test file
* of your feature test suite
* We override preserveIndicesUponCompletion to true and use this function to clean up indices
* Meant to be used in @After or @AfterClass of your feature test suite
*/
fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set<String> = defaultKeepIndexSet) {
waitFor {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
}
// Delete all data stream indices
try {
client.performRequest(Request("DELETE", "_data_stream/*"))
} catch (e: ResponseException) {
Expand All @@ -186,9 +184,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

// Delete all indices
val response = client.performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
Expand All @@ -208,6 +204,72 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}
}

waitFor {
if (!isMultiNode) {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
} else {
// Multi node test is not suitable to waitFor
// We have seen long-running write task that fails the waitFor
// probably because of cluster manager - data node task not in sync
// So instead we just sleep 1s after wiping indices
Thread.sleep(1_000)
}
}
}

@JvmStatic
@Throws(IOException::class)
protected fun waitForRunningTasks(client: RestClient) {
val runningTasks: MutableSet<String> = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed")))
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.")
}

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
// Ignore the task list API - it doesn't count against us
if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue
// runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
runningTasks.add(task.toString())
}
}
return runningTasks
}

@JvmStatic
protected fun waitForThreadPools(client: RestClient) {
val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}

internal interface IProxy {
Expand Down
Loading

0 comments on commit 1e9d5e4

Please sign in to comment.