Skip to content

Commit

Permalink
Feature/index operation notification (#712)
Browse files Browse the repository at this point in the history
* Feature/index operation notification (#707)

* task completion listener for long running task

Signed-off-by: Hailong Cui <[email protected]>

* task completion listener for long running task

Signed-off-by: Hailong Cui <[email protected]>

* remove unused import

Signed-off-by: Hailong Cui <[email protected]>

---------

Signed-off-by: Hailong Cui <[email protected]>

* index action, no validation&test

Signed-off-by: zhichao-aws <[email protected]>

get action. no validation&test

Signed-off-by: zhichao-aws <[email protected]>

index and get action (draft)

changes for rebase main

Signed-off-by: zhichao-aws <[email protected]>

index and get, no validation&test

Signed-off-by: zhichao-aws <[email protected]>

* refactor on package name

Signed-off-by: Hailong Cui <[email protected]>

* +delete, change index

Signed-off-by: zhichao-aws <[email protected]>

* Implement notification with ActionFilter

Signed-off-by: Hailong Cui <[email protected]>

* Query notification channel

Signed-off-by: Hailong Cui <[email protected]>

* Notification message update

Signed-off-by: Hailong Cui <[email protected]>

* new version of crud

Signed-off-by: zhichao-aws <[email protected]>

* validate task id and action name; unitfy taskId

Signed-off-by: zhichao-aws <[email protected]>

* Add unit test case

Signed-off-by: Hailong Cui <[email protected]>

* refactor on response parser

Signed-off-by: Hailong Cui <[email protected]>

* fix unit test case

Signed-off-by: Hailong Cui <[email protected]>

* modify code structure, change ID to Id

Signed-off-by: zhichao-aws <[email protected]>

* add some test, delete message template

Signed-off-by: zhichao-aws <[email protected]>

* Add integration test

Signed-off-by: Hailong Cui <[email protected]>

* Add integration test for action listener

Signed-off-by: Hailong Cui <[email protected]>

* remove refresh policy in request

Signed-off-by: zhichao-aws <[email protected]>

* add some tests, fix actionfilter refresh parameter

Signed-off-by: zhichao-aws <[email protected]>

* Remove duplicate errors for reindex

Signed-off-by: Hailong Cui <[email protected]>

* add UT and IT

Signed-off-by: zhichao-aws <[email protected]>

* fix some code style

Signed-off-by: zhichao-aws <[email protected]>

* enable com.sun.net.httpserver.* for test

Signed-off-by: Hailong Cui <[email protected]>

* change index name, fix bug

Signed-off-by: zhichao-aws <[email protected]>

* modifications after meeting

Signed-off-by: zhichao-aws <[email protected]>

* don't continue wait for shard to be started for async mode

Signed-off-by: Hailong Cui <[email protected]>

* handle IndexNotFoundException for sending notification

Signed-off-by: Hailong Cui <[email protected]>

* add security test case

Signed-off-by: zhichao-aws <[email protected]>

* add security test case

Signed-off-by: zhichao-aws <[email protected]>

* refactor on file names

Signed-off-by: Hailong Cui <[email protected]>

* add success/failure condition

Signed-off-by: zhichao-aws <[email protected]>

* filter channels by conditions

Signed-off-by: Hailong Cui <[email protected]>

* Remove unused field enable for index .opensearch-admin-panel

Signed-off-by: Hailong Cui <[email protected]>

* add more integration testcase

Signed-off-by: Hailong Cui <[email protected]>

* change admin panel to control center

Signed-off-by: zhichao-aws <[email protected]>

* Update mock server address to localhost for docker env

Signed-off-by: Hailong Cui <[email protected]>

* remove unused field in response

Signed-off-by: zhichao-aws <[email protected]>

* update integ test

Signed-off-by: Hailong Cui <[email protected]>

* remove default for all

Signed-off-by: zhichao-aws <[email protected]>

* Remove default for all

Signed-off-by: Hailong Cui <[email protected]>

* remove filter by backend roles

Signed-off-by: zhichao-aws <[email protected]>

* fix build

Signed-off-by: zhichao-aws <[email protected]>

* optimize constructor code

Signed-off-by: zhichao-aws <[email protected]>

* add request doc id validation

Signed-off-by: zhichao-aws <[email protected]>

* merge get and search

Signed-off-by: zhichao-aws <[email protected]>

* remove get before delete

Signed-off-by: zhichao-aws <[email protected]>

* add node id validation

Signed-off-by: zhichao-aws <[email protected]>

* auto create doc for update

Signed-off-by: zhichao-aws <[email protected]>

* fix test case

Signed-off-by: zhichao-aws <[email protected]>

* delete restUpdateAction, fix test

Signed-off-by: zhichao-aws <[email protected]>

* add dry run option

Signed-off-by: zhichao-aws <[email protected]>

* fix bug, add test case

Signed-off-by: zhichao-aws <[email protected]>

* add xcontent tests

Signed-off-by: zhichao-aws <[email protected]>

* notification crud api

Signed-off-by: zhichao-aws <[email protected]>

* changes based on comments; add tests

Signed-off-by: zhichao-aws <[email protected]>

* changes for comment

Signed-off-by: zhichao-aws <[email protected]>

* change behavior for get action index not exists, fix test cases

Signed-off-by: zhichao-aws <[email protected]>

* fix merge

Signed-off-by: zhichao-aws <[email protected]>

* delay 5s for sending runtime notification

Signed-off-by: Hailong Cui <[email protected]>

* update log message

Signed-off-by: Hailong Cui <[email protected]>

* delay 5s for runtime notification policy

Signed-off-by: Hailong Cui <[email protected]>

* Update notification message

Signed-off-by: Hailong Cui <[email protected]>

* filter duplicate channel

Signed-off-by: Hailong Cui <[email protected]>

* exception handling

Signed-off-by: Hailong Cui <[email protected]>

* Add cluster name into title

Signed-off-by: Hailong Cui <[email protected]>

* fix UT for failure messagee

Signed-off-by: Hailong Cui <[email protected]>

* include cluster name into notification messag

Signed-off-by: Hailong Cui <[email protected]>

* fix UT failures

Signed-off-by: Hailong Cui <[email protected]>

* fix detekt issue

Signed-off-by: Hailong Cui <[email protected]>

* fix NPE

Signed-off-by: Hailong Cui <[email protected]>

* force merge message

Signed-off-by: Hailong Cui <[email protected]>

* remove duplicate error of reindex

Signed-off-by: Hailong Cui <[email protected]>

* notification message rewording

Signed-off-by: Hailong Cui <[email protected]>

* notification message wording change and unit test case

Signed-off-by: Hailong Cui <[email protected]>

* update block write wording

Signed-off-by: Hailong Cui <[email protected]>

* Add more unit test case

Signed-off-by: Hailong Cui <[email protected]>

* Add more logs

Signed-off-by: Hailong Cui <[email protected]>

---------

Signed-off-by: Hailong Cui <[email protected]>
Signed-off-by: zhichao-aws <[email protected]>
Co-authored-by: zhichao-aws <[email protected]>
  • Loading branch information
Hailong-am and zhichao-aws authored May 26, 2023
1 parent 96f7380 commit 89bd6de
Show file tree
Hide file tree
Showing 20 changed files with 2,291 additions and 1 deletion.
11 changes: 11 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ configurations.testImplementation {
exclude module: "securemock"
}

tasks.named('forbiddenApisTest').configure {
//we are using jdk-internal instead of jdk-non-portable to allow for com.sun.net.httpserver.* usage
bundledSignatures -= 'jdk-non-portable'
bundledSignatures += 'jdk-internal'
}

ext {
projectSubstitutions = [:]
}
Expand Down Expand Up @@ -567,6 +573,11 @@ integTest {
if (System.getProperty("tests.clustername") != null) {
exclude 'org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.class'
}

// remove from running with remote cluster
if (usingRemoteCluster) {
exclude 'org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListenerIT.class'
}
}

task integTestRemote(type: RestIntegTestTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.support.ActionFilter
import org.opensearch.action.support.ActiveShardsObserver
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
Expand Down Expand Up @@ -36,6 +37,7 @@ import org.opensearch.indexmanagement.controlcenter.notification.action.get.GetL
import org.opensearch.indexmanagement.controlcenter.notification.action.get.TransportGetLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.index.IndexLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.action.index.TransportIndexLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.filter.IndexOperationActionFilter
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestDeleteLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestGetLRONConfigAction
import org.opensearch.indexmanagement.controlcenter.notification.resthandler.RestIndexLRONConfigAction
Expand Down Expand Up @@ -207,6 +209,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
private var customIndexUUIDSetting: String? = null
private val extensions = mutableSetOf<String>()
private val extensionCheckerMap = mutableMapOf<String, StatusChecker>()
lateinit var indexOperationActionFilter: IndexOperationActionFilter

companion object {
const val PLUGINS_BASE_URI = "/_plugins"
Expand Down Expand Up @@ -456,6 +459,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val pluginVersionSweepCoordinator = PluginVersionSweepCoordinator(skipFlag, settings, threadPool, clusterService)

indexOperationActionFilter = IndexOperationActionFilter(
client, clusterService,
ActiveShardsObserver(clusterService, client.threadPool()),
indexNameExpressionResolver,
)

return listOf(
managedIndexRunner,
rollupRunner,
Expand Down Expand Up @@ -597,7 +606,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
}

override fun getActionFilters(): List<ActionFilter> {
return listOf(fieldCapsFilter)
return listOf(fieldCapsFilter, indexOperationActionFilter)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import java.io.IOException
const val DEFAULT_PAGINATION_SIZE = 20
const val DEFAULT_PAGINATION_FROM = 0
const val DEFAULT_SORT_ORDER = "asc"
const val SORT_ORDER_DESC = "desc"
const val DEFAULT_QUERY_STRING = "*"

data class SearchParams(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.controlcenter.notification.filter

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.support.ActionFilter
import org.opensearch.action.support.ActionFilterChain
import org.opensearch.action.support.ActiveShardsObserver
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.controlcenter.notification.util.supportedActions
import org.opensearch.tasks.Task
import org.opensearch.tasks.TaskId

class IndexOperationActionFilter(
val client: Client,
val clusterService: ClusterService,
val activeShardsObserver: ActiveShardsObserver,
val indexNameExpressionResolver: IndexNameExpressionResolver
) : ActionFilter {

private val logger = LogManager.getLogger(IndexOperationActionFilter::class.java)

override fun order() = Integer.MAX_VALUE
override fun <Request : ActionRequest, Response : ActionResponse> apply(
task: Task,
action: String,
request: Request,
listener: ActionListener<Response>,
chain: ActionFilterChain<Request, Response>
) {
chain.proceed(task, action, request, wrapActionListener(task, action, request, listener))
}

fun <Request : ActionRequest, Response : ActionResponse> wrapActionListener(
task: Task,
action: String,
request: Request,
listener: ActionListener<Response>,
): ActionListener<Response> {
var wrappedListener = listener
if (supportedActions.contains(action)) {
if (task.parentTaskId.isSet == false) {
val taskId = TaskId(clusterService.localNode().id, task.id)
logger.info("Add notification action listener for tasks: {} and action: {} ", taskId.toString(), action)
wrappedListener = NotificationActionListener(
delegate = listener,
client = client,
action = action,
clusterService = clusterService,
task = task,
request = request,
activeShardsObserver = activeShardsObserver,
indexNameExpressionResolver = indexNameExpressionResolver
)
}
}
return wrappedListener
}
}
Loading

0 comments on commit 89bd6de

Please sign in to comment.