diff --git a/build.gradle b/build.gradle index 4cd9297e5..a589189e1 100644 --- a/build.gradle +++ b/build.gradle @@ -110,10 +110,6 @@ configurations.all { force 'com.google.guava:guava:30.0-jre' force 'com.puppycrawl.tools:checkstyle:8.29' force 'commons-codec:commons-codec:1.13' - force 'org.apache.httpcomponents:httpclient:4.5.13' - force 'org.apache.httpcomponents:httpclient-osgi:4.5.13' - force 'org.apache.httpcomponents.client5:httpclient5:5.0.3' - force 'org.apache.httpcomponents.client5:httpclient5-osgi:5.0.3' force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" force "org.yaml:snakeyaml:${versions.snakeyaml}" force 'org.codehaus.plexus:plexus-utils:3.0.24' @@ -212,8 +208,8 @@ dependencies { implementation "org.opensearch:common-utils:${common_utils_version}" implementation "com.github.seancfoley:ipaddress:5.3.3" implementation "commons-codec:commons-codec:${versions.commonscodec}" - implementation "org.apache.httpcomponents:httpclient:4.5.13" - implementation "org.apache.httpcomponents:httpcore:4.4.15" + implementation "org.apache.httpcomponents:httpclient:${versions.httpclient}" + implementation "org.apache.httpcomponents:httpcore:${versions.httpcore}" testImplementation "org.opensearch.test:framework:${opensearch_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt index dbb68a4b2..29b2f65c1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.controlcenter.notification +import org.opensearch.ExceptionsHelper import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.create.CreateIndexRequest @@ -31,7 +32,7 @@ class ControlCenterIndices( indexRequest, object : ActionListener { override fun onFailure(e: Exception) { - if (e is ResourceAlreadyExistsException) { + if (ExceptionsHelper.unwrapCause(e) is ResourceAlreadyExistsException) { /* if two request create the control center index at the same time, may raise this exception */ /* but we don't take it as error */ actionListener.onResponse( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListener.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListener.kt index cf44441fe..81411165c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListener.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListener.kt @@ -235,9 +235,12 @@ class NotificationActionListener>() var indicesToClean = emptyList() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListenerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListenerIT.kt index 838bb1d79..0f4b9f20c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListenerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListenerIT.kt @@ -13,6 +13,7 @@ import org.junit.After import org.junit.Assert import org.junit.Before import org.opensearch.action.admin.indices.open.OpenIndexAction +import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient import org.opensearch.common.settings.Settings @@ -198,25 +199,7 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() { @Suppress("UNCHECKED_CAST") fun `test notify for reindex`() { - insertSampleData("source-index", 10) - createIndex("reindex-dest", Settings.EMPTY) - - val response = client.makeRequest( - "POST", "_reindex?wait_for_completion=false", - StringEntity( - """ - { - "source": { - "index": "source-index" - }, - "dest": { - "index": "reindex-dest" - } - } - """.trimIndent(), - ContentType.APPLICATION_JSON - ) - ) + val response = performReindex() Assert.assertTrue(response.restStatus() == RestStatus.OK) @@ -323,25 +306,7 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() { @Suppress("UNCHECKED_CAST") fun `test notify for reindex with duplicate channel`() { - insertSampleData("source-index", 10) - createIndex("reindex-dest", Settings.EMPTY) - - val response = client.makeRequest( - "POST", "_reindex?wait_for_completion=false", - StringEntity( - """ - { - "source": { - "index": "source-index" - }, - "dest": { - "index": "reindex-dest" - } - } - """.trimIndent(), - ContentType.APPLICATION_JSON - ) - ) + val response = performReindex() Assert.assertTrue(response.restStatus() == RestStatus.OK) val taskId = response.asMap()["task"] as String @@ -448,4 +413,78 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() { ) } } + + @Suppress("UNCHECKED_CAST") + fun `test runtime policy been removed when index operation finished`() { + val response = performReindex() + + Assert.assertTrue(response.restStatus() == RestStatus.OK) + val taskId = response.asMap()["task"] + Assert.assertNotNull(taskId) + + // put runtime policy only for failure + client.makeRequest( + "POST", "_plugins/_im/lron", + StringEntity( + """ + { + "lron_config": { + "task_id": "$taskId", + "lron_condition": { + "failure": true, + "success": false + }, + "channels": [ + { + "id": "$notificationConfId" + } + ] + } + } + """.trimIndent(), + ContentType.APPLICATION_JSON + ) + ) + + waitFor(Instant.ofEpochSecond(60)) { + assertEquals( + "Notification index does not have a doc", + 1, + ( + client.makeRequest("GET", "$notificationIndex/_search?q=msg:reindex") + .asMap() as Map>> + )["hits"]!!["total"]!!["value"] + ) + + try { + client.makeRequest("GET", "_plugins/_im/lron/LRON:$taskId") + } catch (e: ResponseException) { + // runtime policy been removed + Assert.assertTrue(e.response.restStatus() == RestStatus.NOT_FOUND) + } + } + } + + private fun performReindex(): Response { + insertSampleData("source-index", 10) + createIndex("reindex-dest", Settings.EMPTY) + + val response = client.makeRequest( + "POST", "_reindex?wait_for_completion=false", + StringEntity( + """ + { + "source": { + "index": "source-index" + }, + "dest": { + "index": "reindex-dest" + } + } + """.trimIndent(), + ContentType.APPLICATION_JSON + ) + ) + return response + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt index d3f6258b4..8c5294051 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt @@ -17,9 +17,6 @@ import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.IndexManagementRestTestCase import org.opensearch.indexmanagement.controlcenter.notification.initNodeIdsInRestIT import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig -import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT -import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig -import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId import org.opensearch.indexmanagement.controlcenter.notification.toJsonString import org.opensearch.indexmanagement.makeRequest import org.opensearch.rest.RestStatus @@ -30,8 +27,6 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() { setDebugLogLevel() /* init cluster node ids in integ test */ initNodeIdsInRestIT(client()) - /* index a random doc to create .opensearch-control-center index */ - createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) } @After @@ -74,8 +69,13 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() { companion object { @AfterClass - @JvmStatic fun clearIndicesAfterClass() { - wipeAllIndices() + @JvmStatic fun removeControlCenterIndex() { + try { + adminClient().makeRequest("DELETE", IndexManagementPlugin.CONTROL_CENTER_INDEX, emptyMap()) + } catch (e: ResponseException) { + /* ignore if the index has not been created */ + assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode)) + } } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt index 35ac2ccfc..82e6d9ebd 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt @@ -39,6 +39,8 @@ class RestDeleteLRONConfigActionIT : LRONConfigRestTestCase() { } fun `test delete nonexist LRONConfig response`() { + /* index a random doc to create .opensearch-control-center index */ + createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) val response = client().makeRequest("DELETE", getResourceURI(lronConfig.taskId, lronConfig.actionName)) assertEquals("delete LRONConfig failed", RestStatus.OK, response.restStatus()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt index 92f50cb62..f9dbef7d9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt @@ -39,6 +39,8 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() { } fun `test get nonexist LRONConfig fails`() { + /* index a random doc to create .opensearch-control-center index */ + createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) try { val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName)) @@ -51,7 +53,7 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() { fun `test get all LRONConfigs`() { /* LRONConfigRestTestCase index a doc to auto create the index, here we wipe the index before count doc number */ - wipeAllIndices() + removeControlCenterIndex() val lronConfigResponses = randomList(1, 15) { createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))).asMap() } @@ -90,16 +92,11 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() { } fun `test get all LRONConfig if index not exists`() { - try { - wipeAllIndices() - val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI) - assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus()) - val responseBody = response.asMap() - val totalNumber = responseBody["total_number"] - OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber) - } finally { - /* index a random doc to create .opensearch-control-center index */ - createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) - } + removeControlCenterIndex() + val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI) + assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val totalNumber = responseBody["total_number"] + OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt index a83ff482f..e2d1f0b24 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt @@ -5,6 +5,10 @@ package org.opensearch.indexmanagement.controlcenter.notification.resthandler +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.runBlocking import org.junit.Assert import org.opensearch.client.ResponseException import org.opensearch.common.xcontent.XContentType @@ -21,6 +25,7 @@ import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.opensearchapi.convertToMap import org.opensearch.indexmanagement.util.DRY_RUN import org.opensearch.rest.RestStatus +import java.util.concurrent.Executors @Suppress("UNCHECKED_CAST") class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() { @@ -155,21 +160,21 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() { } fun `test autocreate index for indexLRONConfig action`() { - wipeAllIndices() + removeControlCenterIndex() val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) var response = createLRONConfig(lronConfig) assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus()) - wipeAllIndices() + removeControlCenterIndex() response = client().makeRequest( "PUT", getResourceURI(lronConfig.taskId, lronConfig.actionName), lronConfig.toHttpEntity() ) assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus()) - wipeAllIndices() } fun `test mappings after LRONConfig creation`() { + removeControlCenterIndex() val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) createLRONConfig(lronConfig) @@ -185,4 +190,29 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() { assertEquals("Mappings are different", expectedMap, mappingsMap) } + + fun `test concurrent indexing requests auto create index and not throw exception`() { + removeControlCenterIndex() + val threadSize = 5 + val lronConfigs = List(threadSize) { randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) } + val threadPool = Executors.newFixedThreadPool(threadSize) + try { + runBlocking { + val dispatcher = threadPool.asCoroutineDispatcher() + val responses = lronConfigs.map { + async(dispatcher) { + createLRONConfig(it) + } + }.awaitAll() + responses.forEach { assertEquals("Create LRONConfig failed", RestStatus.OK, it.restStatus()) } + } + } finally { + threadPool.shutdown() + } + val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI) + assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val totalNumber = responseBody["total_number"] + assertEquals("wrong LRONConfigs number", threadSize, totalNumber) + } }