Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding UTs for IndexReplicationTask #109

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ buildscript {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}"
classpath "org.jetbrains.kotlin:kotlin-allopen:${kotlin_version}"
classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.0.0-RC15"
classpath "org.jacoco:org.jacoco.agent:0.8.5"
//classpath "org.jacoco:org.jacoco.agent:0.8.5"
}
}

Expand All @@ -54,7 +54,7 @@ plugins {
}

apply plugin: 'java'
apply plugin: 'jacoco'
//apply plugin: 'jacoco'
apply plugin: 'idea'
apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.testclusters'
Expand Down Expand Up @@ -98,6 +98,7 @@ dependencies {
testImplementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.5"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
}

repositories {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import org.opensearch.common.unit.TimeValue
import org.opensearch.common.util.concurrent.OpenSearchExecutors
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.commons.utils.OpenForTesting
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
Expand Down Expand Up @@ -130,6 +131,7 @@ import org.opensearch.watcher.ResourceWatcherService
import java.util.Optional
import java.util.function.Supplier

@OpenForTesting
internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin {

private lateinit var client: Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.utils.OpenForTesting

class ReplicationSettings(clusterService: ClusterService) {
//ToDo : Make OpenForTesting work
@OpenForTesting
open class ReplicationSettings(clusterService: ClusterService) {

@Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings)
@Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Singleton
import org.opensearch.common.settings.Settings
import org.opensearch.commons.utils.OpenForTesting

@Singleton
class ReplicationMetadataManager constructor(private val clusterService: ClusterService,
@OpenForTesting
//ToDo : Debug why OpenForTesting is not working properly
open class ReplicationMetadataManager constructor(private val clusterService: ClusterService,
private val client: Client,
private val replicaionMetadataStore: ReplicationMetadataStore) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
/**
* Sets the security context
*/
protected open suspend fun setReplicationMetadata() {
open suspend fun setReplicationMetadata() {
replicationMetadata = if(this is AutoFollowTask) {
replicationMetadataManager.getAutofollowMetadata(followerIndexName, leaderAlias, fetch_from_primary = true)
}
Expand All @@ -185,6 +185,12 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
}
}

//used only in testing
open suspend fun setReplicationMetadata(rm :ReplicationMetadata) {
replicationMetadata = rm

}

open class CrossClusterReplicationTaskResponse(val status: String): ActionResponse(), ToXContentObject {
override fun writeTo(out: StreamOutput) {
out.writeString(status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.replication.util.persistentTasksService
import org.apache.logging.log4j.LogManager
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.ClusterStateObserver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.SettingsModule
import org.opensearch.persistent.AllocatedPersistentTask
Expand Down Expand Up @@ -64,9 +65,10 @@ class IndexReplicationExecutor(executor: String, private val clusterService: Clu
override fun createTask(id: Long, type: String, action: String, parentTaskId: TaskId,
taskInProgress: PersistentTask<IndexReplicationParams>,
headers: MutableMap<String, String>?): AllocatedPersistentTask {
val cso = ClusterStateObserver(clusterService, log, threadPool.threadContext)
return IndexReplicationTask(id, type, action, getDescription(taskInProgress), parentTaskId,
executor, clusterService, threadPool, client, requireNotNull(taskInProgress.params),
persistentTasksService, replicationMetadataManager, replicationSettings, settingsModule)
persistentTasksService, replicationMetadataManager, replicationSettings, settingsModule, cso)
}

override fun getDescription(taskInProgress: PersistentTask<IndexReplicationParams>): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask
import org.opensearch.persistent.PersistentTasksNodeService
import org.opensearch.persistent.PersistentTasksService
import org.opensearch.tasks.TaskId
import org.opensearch.tasks.TaskManager
import org.opensearch.threadpool.ThreadPool
import java.util.function.Predicate
import java.util.stream.Collectors
Expand All @@ -94,7 +95,7 @@ import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.streams.toList

class IndexReplicationTask(id: Long, type: String, action: String, description: String,
open class IndexReplicationTask(id: Long, type: String, action: String, description: String,
parentTask: TaskId,
executor: String,
clusterService: ClusterService,
Expand All @@ -104,7 +105,8 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
private val persistentTasksService: PersistentTasksService,
replicationMetadataManager: ReplicationMetadataManager,
replicationSettings: ReplicationSettings,
val settingsModule: SettingsModule)
val settingsModule: SettingsModule,
val cso: ClusterStateObserver)
: CrossClusterReplicationTask(id, type, action, description, parentTask, emptyMap(), executor,
clusterService, threadPool, client, replicationMetadataManager, replicationSettings), ClusterStateListener
{
Expand All @@ -118,7 +120,6 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:
override val followerIndexName = params.followerIndexName

override val log = Loggers.getLogger(javaClass, Index(params.followerIndexName, ClusterState.UNKNOWN_UUID))
private val cso = ClusterStateObserver(clusterService, log, threadPool.threadContext)
private val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient)

private var shouldCallEvalMonitoring = true
Expand All @@ -144,12 +145,17 @@ class IndexReplicationTask(id: Long, type: String, action: String, description:

const val SLEEP_TIME_BETWEEN_POLL_MS = 5000L
const val TASK_CANCELLATION_REASON = "Index replication task was cancelled by user"

}

//only for testing
fun setPersistent(taskManager: TaskManager) {
super.init(persistentTasksService, taskManager, "persistentTaskId", allocationId)
}

override fun indicesOrShards(): List<Any> = listOf(followerIndexName)

override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) {
public override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) {
checkNotNull(initialState) { "Missing initial state" }
followingTaskState = FollowingState(emptyMap())
currentTaskState = initialState as IndexReplicationState
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.task.index

import org.opensearch.replication.ReplicationPlugin
import org.opensearch.replication.ReplicationSettings
import org.opensearch.replication.action.index.block.UpdateIndexBlockAction
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.repository.REMOTE_REPOSITORY_PREFIX
import org.opensearch.replication.task.shard.ShardReplicationExecutor
import org.opensearch.replication.task.shard.ShardReplicationParams
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.spy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.opensearch.Version
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.ActionType
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse
import org.opensearch.action.admin.indices.recovery.RecoveryAction
import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.ClusterStateObserver
import org.opensearch.cluster.RestoreInProgress
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.routing.RoutingTable
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsModule
import org.opensearch.common.unit.TimeValue
import org.opensearch.index.Index
import org.opensearch.index.shard.ShardId
import org.opensearch.persistent.PersistentTaskParams
import org.opensearch.persistent.PersistentTasksCustomMetadata
import org.opensearch.persistent.PersistentTasksService
import org.opensearch.snapshots.Snapshot
import org.opensearch.snapshots.SnapshotId
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
import org.opensearch.tasks.TaskManager
import org.opensearch.test.ClusterServiceUtils
import org.opensearch.test.ClusterServiceUtils.setState
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.OpenSearchTestCase.assertBusy
import org.opensearch.threadpool.TestThreadPool
import org.junit.Test
import org.mockito.Mockito
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationMetadataStore
import org.opensearch.replication.metadata.store.ReplicationMetadataStore.Companion.REPLICATION_CONFIG_SYSTEM_INDEX
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import java.util.*
import java.util.concurrent.TimeUnit

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
class IndexReplicationTaskTests : OpenSearchTestCase() {

companion object {
var currentTaskState :IndexReplicationState = InitialState
var stateChanges :Int = 0
var restoreNotNull = false

var followerIndex = "follower-index"
var connectionName = "leader-cluster"
var remoteCluster = "remote-cluster"
}

var threadPool = TestThreadPool("ReplicationPluginTest")
var clusterService = ClusterServiceUtils.createClusterService(threadPool)

@Test
fun testExecute() = runBlocking {
val replicationTask: IndexReplicationTask = spy(createIndexReplicationTask())
var taskManager = Mockito.mock(TaskManager::class.java)
replicationTask.setPersistent(taskManager)
var rc = ReplicationContext(followerIndex)
var rm = ReplicationMetadata(connectionName, ReplicationStoreMetadataType.INDEX.name, ReplicationOverallState.RUNNING.name, "reason", rc, rc, Settings.EMPTY)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: We can probably move the logic to create ReplicationMetadata, ClusterState, etc objects to a helper utility

replicationTask.setReplicationMetadata(rm)

//Update ClusterState to say restore started
val state: ClusterState = clusterService.state()

var newClusterState: ClusterState

// Updating cluster state
var builder: ClusterState.Builder
val indices: MutableList<String> = ArrayList()
indices.add(followerIndex)
val snapshot = Snapshot("$REMOTE_REPOSITORY_PREFIX$connectionName", SnapshotId("randomAlphaOfLength", "randomAlphaOfLength"))
val restoreEntry = RestoreInProgress.Entry("restoreUUID", snapshot, RestoreInProgress.State.INIT, Collections.unmodifiableList(ArrayList<String>(indices)),
null)

// Update metadata store index as well
var metaBuilder = Metadata.builder()
metaBuilder.put(IndexMetadata.builder(REPLICATION_CONFIG_SYSTEM_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
var metadata = metaBuilder.build()
var routingTableBuilder = RoutingTable.builder()
routingTableBuilder.addAsNew(metadata.index(REPLICATION_CONFIG_SYSTEM_INDEX))
var routingTable = routingTableBuilder.build()

builder = ClusterState.builder(state).routingTable(routingTable)
builder.putCustom(RestoreInProgress.TYPE, RestoreInProgress.Builder(
state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).add(restoreEntry).build())

newClusterState = builder.build()
setState(clusterService, newClusterState)

val job = this.launch{
replicationTask.execute(this, InitialState)
}

// Delay to let task execute
delay(1000)
Comment on lines +135 to +136
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - yield is a better way to yield back instead of delay.


// Assert we move to RESTORING .. This is blocking and won't let the test run
assertBusy({
assertThat(currentTaskState == RestoreState).isTrue()
}, 1, TimeUnit.SECONDS)


//Complete the Restore
metaBuilder = Metadata.builder()
metaBuilder.put(IndexMetadata.builder(followerIndex).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
metadata = metaBuilder.build()
routingTableBuilder = RoutingTable.builder()
routingTableBuilder.addAsNew(metadata.index(followerIndex))
routingTable = routingTableBuilder.build()

builder = ClusterState.builder(state).routingTable(routingTable)
builder.putCustom(RestoreInProgress.TYPE, RestoreInProgress.Builder(
state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).build())

newClusterState = builder.build()
setState(clusterService, newClusterState)

delay(1000)

assertBusy {
assertThat(currentTaskState == MonitoringState).isTrue()
}

job.cancel()

}

private fun createIndexReplicationTask() : IndexReplicationTask {
var threadPool = TestThreadPool("IndexReplicationTask")
//Hack Alert : Though it is meant to force rejection , this is to make overallTaskScope not null
threadPool.startForcingRejections()
clusterService = ClusterServiceUtils.createClusterService(threadPool)
val settingsModule = Mockito.mock(SettingsModule::class.java)
val spyClient = Mockito.spy<NoOpClient>(NoOpClient("testName"))

val replicationMetadataManager = ReplicationMetadataManager(clusterService, spyClient,
ReplicationMetadataStore(spyClient, clusterService, NamedXContentRegistry.EMPTY))
var persist = PersistentTasksService(clusterService, threadPool, spyClient)
val state: ClusterState = clusterService.state()
val tasks = PersistentTasksCustomMetadata.builder()
var sId = ShardId(Index(followerIndex, "_na_"), 0)
tasks.addTask<PersistentTaskParams>( "replication:0", ShardReplicationExecutor.TASK_NAME, ShardReplicationParams("remoteCluster", sId, sId),
PersistentTasksCustomMetadata.Assignment("other_node_", "test assignment on other node"))

val metadata = Metadata.builder(state.metadata())
metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build())
val newClusterState: ClusterState = ClusterState.builder(state).metadata(metadata).build()

setState(clusterService, newClusterState)

doAnswer{ invocation -> spyClient }.`when`(spyClient).getRemoteClusterClient(any())
assert(spyClient.getRemoteClusterClient(remoteCluster) == spyClient)

val replicationSettings = Mockito.mock(ReplicationSettings::class.java)
replicationSettings.metadataSyncInterval = TimeValue(100, TimeUnit.MILLISECONDS)
val cso = ClusterStateObserver(clusterService, logger, threadPool.threadContext)
val indexReplicationTask = IndexReplicationTask(1, "type", "action", "description" , EMPTY_TASK_ID,
ReplicationPlugin.REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService , threadPool, spyClient, IndexReplicationParams(connectionName, Index(followerIndex, "0"), followerIndex),
persist, replicationMetadataManager, replicationSettings, settingsModule,cso)

return indexReplicationTask
}
}
Loading