Skip to content

Commit

Permalink
Unify test clean logic (#609)
Browse files Browse the repository at this point in the history
* Unify wipe indices logic after tests

Signed-off-by: bowenlan-amzn <[email protected]>

* Enhance wipeAllIndices function

Signed-off-by: bowenlan-amzn <[email protected]>

* Customize cleanup for multi node test

Signed-off-by: bowenlan-amzn <[email protected]>

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Nov 21, 2022
1 parent f4741c2 commit 5198922
Show file tree
Hide file tree
Showing 25 changed files with 194 additions and 151 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bwc-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 11
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 11
java-version: 17
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ jobs:
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 11
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 11
java-version: 17
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.LocalNodeMasterListener
import org.opensearch.cluster.LocalNodeClusterManagerListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.ToXContent
Expand Down Expand Up @@ -47,7 +47,7 @@ class IndexStateManagementHistory(
private val threadPool: ThreadPool,
private val clusterService: ClusterService,
private val indexManagementIndices: IndexManagementIndices
) : LocalNodeMasterListener {
) : LocalNodeClusterManagerListener {

private val logger = LogManager.getLogger(javaClass)
private var scheduledRollover: Scheduler.Cancellable? = null
Expand All @@ -61,7 +61,7 @@ class IndexStateManagementHistory(
@Volatile private var historyNumberOfReplicas = ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.get(settings)

init {
clusterService.addLocalNodeMasterListener(this)
clusterService.addLocalNodeClusterManagerListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_ENABLED) {
historyEnabled = it
}
Expand All @@ -82,7 +82,7 @@ class IndexStateManagementHistory(
}
}

override fun onMaster() {
override fun onClusterManager() {
try {
// try to rollover immediately as we might be restarting the cluster
if (historyEnabled) rolloverHistoryIndex()
Expand All @@ -97,12 +97,12 @@ class IndexStateManagementHistory(
}
}

override fun offMaster() {
override fun offClusterManager() {
scheduledRollover?.cancel()
}

private fun rescheduleRollover() {
if (clusterService.state().nodes.isLocalNodeElectedMaster) {
if (clusterService.state().nodes.isLocalNodeElectedClusterManager) {
scheduledRollover?.cancel()
scheduledRollover = threadPool.scheduleWithFixedDelay(
{ rolloverAndDeleteHistoryIndex() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ object ManagedIndexRunner :
// the cluster state index uuid differs from the one in the managed index config then the config is referring
// to a different index which does not exist in the cluster. We need to check all of the extensions to confirm an index exists
if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) {
clusterStateIndexMetadata = null
// If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types
val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE }
val multiTypeIndexNameToMetaData =
Expand Down Expand Up @@ -404,7 +403,13 @@ object ManagedIndexRunner :
@Suppress("ComplexCondition", "MaxLineLength")
if (updateResult.metadataSaved && state != null && action != null && step != null && currentActionMetaData != null) {
if (validationServiceEnabled) {
val validationResult = actionValidation.validate(action.type, stepContext.metadata.index)
val validationResult = withClosableContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user
)
) {
actionValidation.validate(action.type, stepContext.metadata.index)
}
if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) {
logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name")
publishErrorNotification(policy, managedIndexMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ data class ManagedIndexConfig(
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policy = policy?.copy(
id = policyID ?: NO_ID,
id = policyID,
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class AttemptCloseStep : Step(name) {
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(indexName, cause as SnapshotInProgressException)
handleSnapshotException(indexName, cause)
} else {
handleException(indexName, cause as Exception)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ class TransportExplainAction @Inject constructor(
filteredIndices.add(indexNames[i])
filteredMetadata.add(indexMetadatas[i])
filteredPolicies.add(indexPolicyIDs[i])
validationResults[i]?.let { filteredValidationResult.add(it) }
validationResults[i].let { filteredValidationResult.add(it) }
enabledState[indexNames[i]]?.let { enabledStatus[indexNames[i]] = it }
appliedPolicies[indexNames[i]]?.let { filteredAppliedPolicies[indexNames[i]] = it }
} catch (e: OpenSearchSecurityException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.jobscheduler.spi.ScheduledJobParameter

private val logger = LogManager.getLogger("JobSchedulerUtils")
private val logger = LogManager.getLogger("o.o.i.u.JobSchedulerUtils")

/**
* Util method to attempt to get the lock on the requested scheduled job using the backoff policy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() {
}

fun `test update management index mapping with new schema version`() {
wipeAllODFEIndices()
waitForPendingTasks(adminClient())
wipeAllIndices()
assertIndexDoesNotExist(INDEX_MANAGEMENT_INDEX)

val mapping = indexManagementMappings.trim().trimStart('{').trimEnd('}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,31 @@ import org.apache.http.entity.StringEntity
import org.junit.AfterClass
import org.junit.Before
import org.junit.rules.DisableOnDebug
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.Response
import org.opensearch.client.RestClient
import org.opensearch.client.RequestOptions
import org.opensearch.client.WarningsHandler
import org.opensearch.client.ResponseException
import org.opensearch.common.Strings
import org.opensearch.common.collect.Set
import org.opensearch.common.io.PathUtils
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.DeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.rest.RestStatus
import java.io.IOException
import java.nio.file.Files
import java.util.*
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashSet

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

Expand Down Expand Up @@ -57,7 +68,6 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {

protected val isDebuggingTest = DisableOnDebug(null).isDebugging
protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1

protected val isLocalTest = clusterName() == "integTest"
private fun clusterName(): String {
Expand Down Expand Up @@ -153,7 +163,115 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

override fun preserveIndicesUponCompletion(): Boolean = true
companion object {
@JvmStatic
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
protected val defaultKeepIndexSet = setOf(".opendistro_security")
/**
* We override preserveIndicesUponCompletion to true and use this function to clean up indices
* Meant to be used in @After or @AfterClass of your feature test suite
*/
fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set<String> = defaultKeepIndexSet) {
try {
client.performRequest(Request("DELETE", "_data_stream/*"))
} catch (e: ResponseException) {
// We hit a version of ES that doesn't serialize DeleteDataStreamAction.Request#wildcardExpressionsOriginallySpecified field or
// that doesn't support data streams so it's safe to ignore
val statusCode = e.response.statusLine.statusCode
if (!Set.of(404, 405, 500).contains(statusCode)) {
throw e
}
}

val response = client.performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all"))
val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val indexName: String = jsonObject["index"] as String
// .opendistro_security isn't allowed to delete from cluster
if (!keepIndex.contains(indexName)) {
val request = Request("DELETE", "/$indexName")
// TODO: remove PERMISSIVE option after moving system index access to REST API call
val options = RequestOptions.DEFAULT.toBuilder()
options.setWarningsHandler(WarningsHandler.PERMISSIVE)
request.options = options.build()
client.performRequest(request)
}
}
}

waitFor {
if (!isMultiNode) {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
} else {
// Multi node test is not suitable to waitFor
// We have seen long-running write task that fails the waitFor
// probably because of cluster manager - data node task not in sync
// So instead we just sleep 1s after wiping indices
Thread.sleep(1_000)
}
}
}

@JvmStatic
@Throws(IOException::class)
protected fun waitForRunningTasks(client: RestClient) {
val runningTasks: MutableSet<String> = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed")))
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.")
}

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
// Ignore the task list API - it doesn't count against us
if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue
// runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
runningTasks.add(task.toString())
}
}
return runningTasks
}

@JvmStatic
protected fun waitForThreadPools(client: RestClient) {
val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}

internal interface IProxy {
val version: String?
var sessionId: String?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() {
private val testRole = "test_role"
var testClient: RestClient? = null

override fun preserveIndicesUponCompletion(): Boolean {
return true
}

@Before
fun setupUsersAndRoles() {
updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false)
Expand Down
Loading

0 comments on commit 5198922

Please sign in to comment.