From 16908ccdc6221444e84f8ac6e1bfff0d0162e8ad Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 14 Apr 2023 16:04:28 +0530 Subject: [PATCH] Handled batch requests for replication metadata update under cluster state (#772) (#778) Signed-off-by: Sai Kumar (cherry picked from commit c5d4cdc5332c0a0839f29ffdea4f2d52f2307f0d) Co-authored-by: Sai Kumar --- .../metadata/UpdateReplicationMetadata.kt | 23 +++--- .../state/UpdateReplicationMetadataTests.kt | 75 +++++++++++++++++++ 2 files changed, 89 insertions(+), 9 deletions(-) create mode 100644 src/test/kotlin/org/opensearch/replication/metadata/state/UpdateReplicationMetadataTests.kt diff --git a/src/main/kotlin/org/opensearch/replication/metadata/UpdateReplicationMetadata.kt b/src/main/kotlin/org/opensearch/replication/metadata/UpdateReplicationMetadata.kt index 856adbb3..54d4663e 100644 --- a/src/main/kotlin/org/opensearch/replication/metadata/UpdateReplicationMetadata.kt +++ b/src/main/kotlin/org/opensearch/replication/metadata/UpdateReplicationMetadata.kt @@ -49,29 +49,34 @@ class UpdateReplicationStateDetailsTaskExecutor private constructor() override fun execute(currentState: ClusterState, tasks: List) : ClusterStateTaskExecutor.ClusterTasksResult { - return getClusterStateUpdateTaskResult(tasks[0], currentState) + log.debug("Executing replication state update for $tasks") + return getClusterStateUpdateTaskResult(tasks, currentState) } - private fun getClusterStateUpdateTaskResult(request: UpdateReplicationStateDetailsRequest, + private fun getClusterStateUpdateTaskResult(requests: List, currentState: ClusterState) : ClusterStateTaskExecutor.ClusterTasksResult { val currentMetadata = currentState.metadata().custom(ReplicationStateMetadata.NAME) ?: ReplicationStateMetadata.EMPTY - val newMetadata = getUpdatedReplicationMetadata(request, currentMetadata) - if (currentMetadata == newMetadata) { - return getStateUpdateTaskResultForClusterState(request, currentState) // no change + var updatedMetadata = currentMetadata + // compute metadata update for the batched requests + for(request in requests) { + updatedMetadata = getUpdatedReplicationMetadata(request, updatedMetadata) + } + if (currentMetadata == updatedMetadata) { + return getStateUpdateTaskResultForClusterState(requests, currentState) // no change } else { val mdBuilder = Metadata.builder(currentState.metadata) - .putCustom(ReplicationStateMetadata.NAME, newMetadata) + .putCustom(ReplicationStateMetadata.NAME, updatedMetadata) val newClusterState = ClusterState.Builder(currentState).metadata(mdBuilder).build() - return getStateUpdateTaskResultForClusterState(request, newClusterState) + return getStateUpdateTaskResultForClusterState(requests, newClusterState) } } - private fun getStateUpdateTaskResultForClusterState(request: UpdateReplicationStateDetailsRequest, + private fun getStateUpdateTaskResultForClusterState(requests: List, clusterState: ClusterState) : ClusterStateTaskExecutor.ClusterTasksResult { return ClusterStateTaskExecutor.ClusterTasksResult.builder() - .success(request).build(clusterState) + .successes(requests).build(clusterState) } private fun getUpdatedReplicationMetadata(request: UpdateReplicationStateDetailsRequest, diff --git a/src/test/kotlin/org/opensearch/replication/metadata/state/UpdateReplicationMetadataTests.kt b/src/test/kotlin/org/opensearch/replication/metadata/state/UpdateReplicationMetadataTests.kt new file mode 100644 index 00000000..a3a68430 --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/metadata/state/UpdateReplicationMetadataTests.kt @@ -0,0 +1,75 @@ +package org.opensearch.replication.metadata.state + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope +import org.junit.Assert +import org.opensearch.cluster.ClusterState +import org.opensearch.replication.action.replicationstatedetails.UpdateReplicationStateDetailsRequest +import org.opensearch.replication.metadata.ReplicationOverallState +import org.opensearch.replication.metadata.UpdateReplicationStateDetailsTaskExecutor +import org.opensearch.test.ClusterServiceUtils +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.threadpool.TestThreadPool + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +class UpdateReplicationMetadataTests : OpenSearchTestCase() { + + var threadPool = TestThreadPool("ReplicationPluginTest") + var clusterService = ClusterServiceUtils.createClusterService(threadPool) + + fun `test single task update`() { + val currentState: ClusterState = clusterService.state() + // single task + val tasks = arrayListOf(UpdateReplicationStateDetailsRequest("test-index", + hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.ADD)) + val tasksResult = UpdateReplicationStateDetailsTaskExecutor.INSTANCE.execute(currentState, tasks) + + val updatedReplicationDetails = tasksResult.resultingState?.metadata + ?.custom(ReplicationStateMetadata.NAME)?.replicationDetails + + Assert.assertNotNull(updatedReplicationDetails) + Assert.assertNotNull(updatedReplicationDetails?.get("test-index")) + val replicationStateParams = updatedReplicationDetails?.get("test-index") + + Assert.assertEquals(ReplicationOverallState.RUNNING.name, replicationStateParams?.get(REPLICATION_LAST_KNOWN_OVERALL_STATE)) + } + + fun `test multiple tasks to add replication metadata`() { + val currentState: ClusterState = clusterService.state() + // multiple tasks + val tasks = arrayListOf(UpdateReplicationStateDetailsRequest("test-index-1", + hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.ADD), + UpdateReplicationStateDetailsRequest("test-index-2", + hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.ADD)) + val tasksResult = UpdateReplicationStateDetailsTaskExecutor.INSTANCE.execute(currentState, tasks) + + val updatedReplicationDetails = tasksResult.resultingState?.metadata + ?.custom(ReplicationStateMetadata.NAME)?.replicationDetails + + Assert.assertNotNull(updatedReplicationDetails) + Assert.assertNotNull(updatedReplicationDetails?.get("test-index-1")) + var replicationStateParams = updatedReplicationDetails?.get("test-index-1") + Assert.assertEquals(ReplicationOverallState.RUNNING.name, replicationStateParams?.get(REPLICATION_LAST_KNOWN_OVERALL_STATE)) + Assert.assertNotNull(updatedReplicationDetails?.get("test-index-2")) + replicationStateParams = updatedReplicationDetails?.get("test-index-2") + Assert.assertEquals(ReplicationOverallState.RUNNING.name, replicationStateParams?.get(REPLICATION_LAST_KNOWN_OVERALL_STATE)) + } + + fun `test multiple tasks to add and delete replication metadata`() { + val currentState: ClusterState = clusterService.state() + // multiple tasks + val tasks = arrayListOf(UpdateReplicationStateDetailsRequest("test-index-1", + hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.ADD), + UpdateReplicationStateDetailsRequest("test-index-2", + hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.REMOVE)) + val tasksResult = UpdateReplicationStateDetailsTaskExecutor.INSTANCE.execute(currentState, tasks) + + val updatedReplicationDetails = tasksResult.resultingState?.metadata + ?.custom(ReplicationStateMetadata.NAME)?.replicationDetails + + Assert.assertNotNull(updatedReplicationDetails) + Assert.assertNotNull(updatedReplicationDetails?.get("test-index-1")) + var replicationStateParams = updatedReplicationDetails?.get("test-index-1") + Assert.assertEquals(ReplicationOverallState.RUNNING.name, replicationStateParams?.get(REPLICATION_LAST_KNOWN_OVERALL_STATE)) + Assert.assertNull(updatedReplicationDetails?.get("test-index-2")) + } +} \ No newline at end of file