Skip to content

Commit

Permalink
Handled batch requests for replication metadata update under cluster …
Browse files Browse the repository at this point in the history
…state (#772) (#778)

Signed-off-by: Sai Kumar <[email protected]>
(cherry picked from commit c5d4cdc)

Co-authored-by: Sai Kumar <[email protected]>
  • Loading branch information
1 parent f265458 commit 16908cc
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,34 @@ class UpdateReplicationStateDetailsTaskExecutor private constructor()

override fun execute(currentState: ClusterState, tasks: List<UpdateReplicationStateDetailsRequest>)
: ClusterStateTaskExecutor.ClusterTasksResult<UpdateReplicationStateDetailsRequest> {
return getClusterStateUpdateTaskResult(tasks[0], currentState)
log.debug("Executing replication state update for $tasks")
return getClusterStateUpdateTaskResult(tasks, currentState)
}

private fun getClusterStateUpdateTaskResult(request: UpdateReplicationStateDetailsRequest,
private fun getClusterStateUpdateTaskResult(requests: List<UpdateReplicationStateDetailsRequest>,
currentState: ClusterState)
: ClusterStateTaskExecutor.ClusterTasksResult<UpdateReplicationStateDetailsRequest> {
val currentMetadata = currentState.metadata().custom(ReplicationStateMetadata.NAME) ?: ReplicationStateMetadata.EMPTY
val newMetadata = getUpdatedReplicationMetadata(request, currentMetadata)
if (currentMetadata == newMetadata) {
return getStateUpdateTaskResultForClusterState(request, currentState) // no change
var updatedMetadata = currentMetadata
// compute metadata update for the batched requests
for(request in requests) {
updatedMetadata = getUpdatedReplicationMetadata(request, updatedMetadata)
}
if (currentMetadata == updatedMetadata) {
return getStateUpdateTaskResultForClusterState(requests, currentState) // no change
} else {
val mdBuilder = Metadata.builder(currentState.metadata)
.putCustom(ReplicationStateMetadata.NAME, newMetadata)
.putCustom(ReplicationStateMetadata.NAME, updatedMetadata)
val newClusterState = ClusterState.Builder(currentState).metadata(mdBuilder).build()
return getStateUpdateTaskResultForClusterState(request, newClusterState)
return getStateUpdateTaskResultForClusterState(requests, newClusterState)
}
}

private fun getStateUpdateTaskResultForClusterState(request: UpdateReplicationStateDetailsRequest,
private fun getStateUpdateTaskResultForClusterState(requests: List<UpdateReplicationStateDetailsRequest>,
clusterState: ClusterState)
: ClusterStateTaskExecutor.ClusterTasksResult<UpdateReplicationStateDetailsRequest> {
return ClusterStateTaskExecutor.ClusterTasksResult.builder<UpdateReplicationStateDetailsRequest>()
.success(request).build(clusterState)
.successes(requests).build(clusterState)
}

private fun getUpdatedReplicationMetadata(request: UpdateReplicationStateDetailsRequest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.opensearch.replication.metadata.state

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope
import org.junit.Assert
import org.opensearch.cluster.ClusterState
import org.opensearch.replication.action.replicationstatedetails.UpdateReplicationStateDetailsRequest
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.UpdateReplicationStateDetailsTaskExecutor
import org.opensearch.test.ClusterServiceUtils
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.threadpool.TestThreadPool

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
class UpdateReplicationMetadataTests : OpenSearchTestCase() {

var threadPool = TestThreadPool("ReplicationPluginTest")
var clusterService = ClusterServiceUtils.createClusterService(threadPool)

fun `test single task update`() {
val currentState: ClusterState = clusterService.state()
// single task
val tasks = arrayListOf(UpdateReplicationStateDetailsRequest("test-index",
hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.ADD))
val tasksResult = UpdateReplicationStateDetailsTaskExecutor.INSTANCE.execute(currentState, tasks)

val updatedReplicationDetails = tasksResult.resultingState?.metadata
?.custom<ReplicationStateMetadata>(ReplicationStateMetadata.NAME)?.replicationDetails

Assert.assertNotNull(updatedReplicationDetails)
Assert.assertNotNull(updatedReplicationDetails?.get("test-index"))
val replicationStateParams = updatedReplicationDetails?.get("test-index")

Assert.assertEquals(ReplicationOverallState.RUNNING.name, replicationStateParams?.get(REPLICATION_LAST_KNOWN_OVERALL_STATE))
}

fun `test multiple tasks to add replication metadata`() {
val currentState: ClusterState = clusterService.state()
// multiple tasks
val tasks = arrayListOf(UpdateReplicationStateDetailsRequest("test-index-1",
hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.ADD),
UpdateReplicationStateDetailsRequest("test-index-2",
hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.ADD))
val tasksResult = UpdateReplicationStateDetailsTaskExecutor.INSTANCE.execute(currentState, tasks)

val updatedReplicationDetails = tasksResult.resultingState?.metadata
?.custom<ReplicationStateMetadata>(ReplicationStateMetadata.NAME)?.replicationDetails

Assert.assertNotNull(updatedReplicationDetails)
Assert.assertNotNull(updatedReplicationDetails?.get("test-index-1"))
var replicationStateParams = updatedReplicationDetails?.get("test-index-1")
Assert.assertEquals(ReplicationOverallState.RUNNING.name, replicationStateParams?.get(REPLICATION_LAST_KNOWN_OVERALL_STATE))
Assert.assertNotNull(updatedReplicationDetails?.get("test-index-2"))
replicationStateParams = updatedReplicationDetails?.get("test-index-2")
Assert.assertEquals(ReplicationOverallState.RUNNING.name, replicationStateParams?.get(REPLICATION_LAST_KNOWN_OVERALL_STATE))
}

fun `test multiple tasks to add and delete replication metadata`() {
val currentState: ClusterState = clusterService.state()
// multiple tasks
val tasks = arrayListOf(UpdateReplicationStateDetailsRequest("test-index-1",
hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.ADD),
UpdateReplicationStateDetailsRequest("test-index-2",
hashMapOf("REPLICATION_LAST_KNOWN_OVERALL_STATE" to "RUNNING"), UpdateReplicationStateDetailsRequest.UpdateType.REMOVE))
val tasksResult = UpdateReplicationStateDetailsTaskExecutor.INSTANCE.execute(currentState, tasks)

val updatedReplicationDetails = tasksResult.resultingState?.metadata
?.custom<ReplicationStateMetadata>(ReplicationStateMetadata.NAME)?.replicationDetails

Assert.assertNotNull(updatedReplicationDetails)
Assert.assertNotNull(updatedReplicationDetails?.get("test-index-1"))
var replicationStateParams = updatedReplicationDetails?.get("test-index-1")
Assert.assertEquals(ReplicationOverallState.RUNNING.name, replicationStateParams?.get(REPLICATION_LAST_KNOWN_OVERALL_STATE))
Assert.assertNull(updatedReplicationDetails?.get("test-index-2"))
}
}

0 comments on commit 16908cc

Please sign in to comment.