Skip to content

Commit

Permalink
Merge branch 'main' into feature/ism_transform
Browse files Browse the repository at this point in the history
  • Loading branch information
tanqiuliu authored Jul 11, 2023
2 parents 10ba7c5 + 99df725 commit e6699d3
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 69 deletions.
8 changes: 2 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ configurations.all {
force 'com.google.guava:guava:30.0-jre'
force 'com.puppycrawl.tools:checkstyle:8.29'
force 'commons-codec:commons-codec:1.13'
force 'org.apache.httpcomponents:httpclient:4.5.13'
force 'org.apache.httpcomponents:httpclient-osgi:4.5.13'
force 'org.apache.httpcomponents.client5:httpclient5:5.0.3'
force 'org.apache.httpcomponents.client5:httpclient5-osgi:5.0.3'
force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
force "org.yaml:snakeyaml:${versions.snakeyaml}"
force 'org.codehaus.plexus:plexus-utils:3.0.24'
Expand Down Expand Up @@ -212,8 +208,8 @@ dependencies {
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.github.seancfoley:ipaddress:5.3.3"
implementation "commons-codec:commons-codec:${versions.commonscodec}"
implementation "org.apache.httpcomponents:httpclient:4.5.13"
implementation "org.apache.httpcomponents:httpcore:4.4.15"
implementation "org.apache.httpcomponents:httpclient:${versions.httpclient}"
implementation "org.apache.httpcomponents:httpcore:${versions.httpcore}"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.controlcenter.notification

import org.opensearch.ExceptionsHelper
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.create.CreateIndexRequest
Expand All @@ -31,7 +32,7 @@ class ControlCenterIndices(
indexRequest,
object : ActionListener<CreateIndexResponse> {
override fun onFailure(e: Exception) {
if (e is ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) is ResourceAlreadyExistsException) {
/* if two request create the control center index at the same time, may raise this exception */
/* but we don't take it as error */
actionListener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,12 @@ class NotificationActionListener<Request : ActionRequest, Response : ActionRespo
}
}
}
}

// remove one time configuration no matter it is enabled or not
removeOneTimePolicy(config)
// remove one time configuration
val runtimeConfig = lronConfigResponse.lronConfigResponses.firstOrNull() { it.lronConfig.taskId != null }
runtimeConfig?.let {
removeOneTimePolicy(it.lronConfig)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ class ManagedIndexCoordinator(
*/
@OpenForTesting
suspend fun sweepClusterChangedEvent(event: ClusterChangedEvent) {
// If IM config doesn't exist skip
if (!ismIndices.indexManagementIndexExists()) {
logger.debug("[.opendistro-ism-config] config index does not exist")
return
}
// indices delete event
var removeManagedIndexReq = emptyList<DocWriteRequest<*>>()
var indicesToClean = emptyList<Index>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.opensearch.action.admin.indices.open.OpenIndexAction
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.client.RestClient
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -198,25 +199,7 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {

@Suppress("UNCHECKED_CAST")
fun `test notify for reindex`() {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)

Expand Down Expand Up @@ -323,25 +306,7 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {

@Suppress("UNCHECKED_CAST")
fun `test notify for reindex with duplicate channel`() {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)
val taskId = response.asMap()["task"] as String
Expand Down Expand Up @@ -448,4 +413,78 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {
)
}
}

@Suppress("UNCHECKED_CAST")
fun `test runtime policy been removed when index operation finished`() {
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)
val taskId = response.asMap()["task"]
Assert.assertNotNull(taskId)

// put runtime policy only for failure
client.makeRequest(
"POST", "_plugins/_im/lron",
StringEntity(
"""
{
"lron_config": {
"task_id": "$taskId",
"lron_condition": {
"failure": true,
"success": false
},
"channels": [
{
"id": "$notificationConfId"
}
]
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)

waitFor(Instant.ofEpochSecond(60)) {
assertEquals(
"Notification index does not have a doc",
1,
(
client.makeRequest("GET", "$notificationIndex/_search?q=msg:reindex")
.asMap() as Map<String, Map<String, Map<String, Any>>>
)["hits"]!!["total"]!!["value"]
)

try {
client.makeRequest("GET", "_plugins/_im/lron/LRON:$taskId")
} catch (e: ResponseException) {
// runtime policy been removed
Assert.assertTrue(e.response.restStatus() == RestStatus.NOT_FOUND)
}
}
}

private fun performReindex(): Response {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
return response
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.IndexManagementRestTestCase
import org.opensearch.indexmanagement.controlcenter.notification.initNodeIdsInRestIT
import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig
import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT
import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig
import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId
import org.opensearch.indexmanagement.controlcenter.notification.toJsonString
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.rest.RestStatus
Expand All @@ -30,8 +27,6 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {
setDebugLogLevel()
/* init cluster node ids in integ test */
initNodeIdsInRestIT(client())
/* index a random doc to create .opensearch-control-center index */
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
}

@After
Expand Down Expand Up @@ -74,8 +69,13 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {

companion object {
@AfterClass
@JvmStatic fun clearIndicesAfterClass() {
wipeAllIndices()
@JvmStatic fun removeControlCenterIndex() {
try {
adminClient().makeRequest("DELETE", IndexManagementPlugin.CONTROL_CENTER_INDEX, emptyMap())
} catch (e: ResponseException) {
/* ignore if the index has not been created */
assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class RestDeleteLRONConfigActionIT : LRONConfigRestTestCase() {
}

fun `test delete nonexist LRONConfig response`() {
/* index a random doc to create .opensearch-control-center index */
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
val response = client().makeRequest("DELETE", getResourceURI(lronConfig.taskId, lronConfig.actionName))
assertEquals("delete LRONConfig failed", RestStatus.OK, response.restStatus())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {
}

fun `test get nonexist LRONConfig fails`() {
/* index a random doc to create .opensearch-control-center index */
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
try {
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName))
Expand All @@ -51,7 +53,7 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {

fun `test get all LRONConfigs`() {
/* LRONConfigRestTestCase index a doc to auto create the index, here we wipe the index before count doc number */
wipeAllIndices()
removeControlCenterIndex()
val lronConfigResponses = randomList(1, 15) {
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))).asMap()
}
Expand Down Expand Up @@ -90,16 +92,11 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {
}

fun `test get all LRONConfig if index not exists`() {
try {
wipeAllIndices()
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
val responseBody = response.asMap()
val totalNumber = responseBody["total_number"]
OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber)
} finally {
/* index a random doc to create .opensearch-control-center index */
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
}
removeControlCenterIndex()
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
val responseBody = response.asMap()
val totalNumber = responseBody["total_number"]
OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.indexmanagement.controlcenter.notification.resthandler

import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.opensearch.client.ResponseException
import org.opensearch.common.xcontent.XContentType
Expand All @@ -21,6 +25,7 @@ import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.util.DRY_RUN
import org.opensearch.rest.RestStatus
import java.util.concurrent.Executors

@Suppress("UNCHECKED_CAST")
class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {
Expand Down Expand Up @@ -155,21 +160,21 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {
}

fun `test autocreate index for indexLRONConfig action`() {
wipeAllIndices()
removeControlCenterIndex()
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
var response = createLRONConfig(lronConfig)
assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus())
wipeAllIndices()
removeControlCenterIndex()
response = client().makeRequest(
"PUT",
getResourceURI(lronConfig.taskId, lronConfig.actionName),
lronConfig.toHttpEntity()
)
assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus())
wipeAllIndices()
}

fun `test mappings after LRONConfig creation`() {
removeControlCenterIndex()
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
createLRONConfig(lronConfig)

Expand All @@ -185,4 +190,29 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {

assertEquals("Mappings are different", expectedMap, mappingsMap)
}

fun `test concurrent indexing requests auto create index and not throw exception`() {
removeControlCenterIndex()
val threadSize = 5
val lronConfigs = List(threadSize) { randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) }
val threadPool = Executors.newFixedThreadPool(threadSize)
try {
runBlocking {
val dispatcher = threadPool.asCoroutineDispatcher()
val responses = lronConfigs.map {
async(dispatcher) {
createLRONConfig(it)
}
}.awaitAll()
responses.forEach { assertEquals("Create LRONConfig failed", RestStatus.OK, it.restStatus()) }
}
} finally {
threadPool.shutdown()
}
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
val responseBody = response.asMap()
val totalNumber = responseBody["total_number"]
assertEquals("wrong LRONConfigs number", threadSize, totalNumber)
}
}

0 comments on commit e6699d3

Please sign in to comment.