diff --git a/build.gradle b/build.gradle index 65686a1b..ad95ed20 100644 --- a/build.gradle +++ b/build.gradle @@ -43,7 +43,7 @@ buildscript { classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}" classpath "org.jetbrains.kotlin:kotlin-allopen:${kotlin_version}" classpath "io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.0.0-RC15" - classpath "org.jacoco:org.jacoco.agent:0.8.5" + //classpath "org.jacoco:org.jacoco.agent:0.8.5" } } @@ -54,7 +54,7 @@ plugins { } apply plugin: 'java' -apply plugin: 'jacoco' +//apply plugin: 'jacoco' apply plugin: 'idea' apply plugin: 'opensearch.opensearchplugin' apply plugin: 'opensearch.testclusters' @@ -98,6 +98,7 @@ dependencies { testImplementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}" testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.5" testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" + testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" } repositories { diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 6b0600c5..cd05c836 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -102,6 +102,7 @@ import org.opensearch.common.unit.TimeValue import org.opensearch.common.util.concurrent.OpenSearchExecutors import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentParser +import org.opensearch.commons.utils.OpenForTesting import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule @@ -130,6 +131,7 @@ import org.opensearch.watcher.ResourceWatcherService import java.util.Optional import java.util.function.Supplier +@OpenForTesting internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin { private lateinit var client: Client diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt index b9c1208a..61c99821 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt @@ -15,8 +15,11 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.utils.OpenForTesting -class ReplicationSettings(clusterService: ClusterService) { +//ToDo : Make OpenForTesting work +@OpenForTesting +open class ReplicationSettings(clusterService: ClusterService) { @Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings) @Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings) diff --git a/src/main/kotlin/org/opensearch/replication/metadata/ReplicationMetadataManager.kt b/src/main/kotlin/org/opensearch/replication/metadata/ReplicationMetadataManager.kt index a22a7191..804951c6 100644 --- a/src/main/kotlin/org/opensearch/replication/metadata/ReplicationMetadataManager.kt +++ b/src/main/kotlin/org/opensearch/replication/metadata/ReplicationMetadataManager.kt @@ -26,9 +26,12 @@ import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Singleton import org.opensearch.common.settings.Settings +import org.opensearch.commons.utils.OpenForTesting @Singleton -class ReplicationMetadataManager constructor(private val clusterService: ClusterService, +@OpenForTesting +//ToDo : Debug why OpenForTesting is not working properly +open class ReplicationMetadataManager constructor(private val clusterService: ClusterService, private val client: Client, private val replicaionMetadataStore: ReplicationMetadataStore) { diff --git a/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt index 8aca9291..28b208a5 100644 --- a/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/CrossClusterReplicationTask.kt @@ -176,7 +176,7 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin /** * Sets the security context */ - protected open suspend fun setReplicationMetadata() { + open suspend fun setReplicationMetadata() { replicationMetadata = if(this is AutoFollowTask) { replicationMetadataManager.getAutofollowMetadata(followerIndexName, leaderAlias, fetch_from_primary = true) } @@ -185,6 +185,12 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin } } + //used only in testing + open suspend fun setReplicationMetadata(rm :ReplicationMetadata) { + replicationMetadata = rm + + } + open class CrossClusterReplicationTaskResponse(val status: String): ActionResponse(), ToXContentObject { override fun writeTo(out: StreamOutput) { out.writeString(status) diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationExecutor.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationExecutor.kt index e6b5d3bc..8c2e88ca 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationExecutor.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationExecutor.kt @@ -20,6 +20,7 @@ import org.opensearch.replication.util.persistentTasksService import org.apache.logging.log4j.LogManager import org.opensearch.client.Client import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.ClusterStateObserver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.SettingsModule import org.opensearch.persistent.AllocatedPersistentTask @@ -64,9 +65,10 @@ class IndexReplicationExecutor(executor: String, private val clusterService: Clu override fun createTask(id: Long, type: String, action: String, parentTaskId: TaskId, taskInProgress: PersistentTask, headers: MutableMap?): AllocatedPersistentTask { + val cso = ClusterStateObserver(clusterService, log, threadPool.threadContext) return IndexReplicationTask(id, type, action, getDescription(taskInProgress), parentTaskId, executor, clusterService, threadPool, client, requireNotNull(taskInProgress.params), - persistentTasksService, replicationMetadataManager, replicationSettings, settingsModule) + persistentTasksService, replicationMetadataManager, replicationSettings, settingsModule, cso) } override fun getDescription(taskInProgress: PersistentTask): String { diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index f6109cbb..5cf7d7c6 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -86,6 +86,7 @@ import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask import org.opensearch.persistent.PersistentTasksNodeService import org.opensearch.persistent.PersistentTasksService import org.opensearch.tasks.TaskId +import org.opensearch.tasks.TaskManager import org.opensearch.threadpool.ThreadPool import java.util.function.Predicate import java.util.stream.Collectors @@ -94,7 +95,7 @@ import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine import kotlin.streams.toList -class IndexReplicationTask(id: Long, type: String, action: String, description: String, +open class IndexReplicationTask(id: Long, type: String, action: String, description: String, parentTask: TaskId, executor: String, clusterService: ClusterService, @@ -104,7 +105,8 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: private val persistentTasksService: PersistentTasksService, replicationMetadataManager: ReplicationMetadataManager, replicationSettings: ReplicationSettings, - val settingsModule: SettingsModule) + val settingsModule: SettingsModule, + val cso: ClusterStateObserver) : CrossClusterReplicationTask(id, type, action, description, parentTask, emptyMap(), executor, clusterService, threadPool, client, replicationMetadataManager, replicationSettings), ClusterStateListener { @@ -118,7 +120,6 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: override val followerIndexName = params.followerIndexName override val log = Loggers.getLogger(javaClass, Index(params.followerIndexName, ClusterState.UNKNOWN_UUID)) - private val cso = ClusterStateObserver(clusterService, log, threadPool.threadContext) private val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient) private var shouldCallEvalMonitoring = true @@ -144,12 +145,17 @@ class IndexReplicationTask(id: Long, type: String, action: String, description: const val SLEEP_TIME_BETWEEN_POLL_MS = 5000L const val TASK_CANCELLATION_REASON = "Index replication task was cancelled by user" + } + //only for testing + fun setPersistent(taskManager: TaskManager) { + super.init(persistentTasksService, taskManager, "persistentTaskId", allocationId) + } override fun indicesOrShards(): List = listOf(followerIndexName) - override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) { + public override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) { checkNotNull(initialState) { "Missing initial state" } followingTaskState = FollowingState(emptyMap()) currentTaskState = initialState as IndexReplicationState diff --git a/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt b/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt new file mode 100644 index 00000000..94b71999 --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/task/index/IndexReplicationTaskTests.kt @@ -0,0 +1,204 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.replication.task.index + +import org.opensearch.replication.ReplicationPlugin +import org.opensearch.replication.ReplicationSettings +import org.opensearch.replication.action.index.block.UpdateIndexBlockAction +import org.opensearch.replication.metadata.ReplicationMetadataManager +import org.opensearch.replication.metadata.store.ReplicationContext +import org.opensearch.replication.metadata.store.ReplicationMetadata +import org.opensearch.replication.repository.REMOTE_REPOSITORY_PREFIX +import org.opensearch.replication.task.shard.ShardReplicationExecutor +import org.opensearch.replication.task.shard.ShardReplicationParams +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.spy +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.opensearch.Version +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionResponse +import org.opensearch.action.ActionType +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse +import org.opensearch.action.admin.indices.recovery.RecoveryAction +import org.opensearch.action.admin.indices.recovery.RecoveryResponse +import org.opensearch.action.admin.indices.settings.get.GetSettingsAction +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsAction +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.ClusterStateObserver +import org.opensearch.cluster.RestoreInProgress +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.metadata.Metadata +import org.opensearch.cluster.routing.RoutingTable +import org.opensearch.common.settings.Settings +import org.opensearch.common.settings.SettingsModule +import org.opensearch.common.unit.TimeValue +import org.opensearch.index.Index +import org.opensearch.index.shard.ShardId +import org.opensearch.persistent.PersistentTaskParams +import org.opensearch.persistent.PersistentTasksCustomMetadata +import org.opensearch.persistent.PersistentTasksService +import org.opensearch.snapshots.Snapshot +import org.opensearch.snapshots.SnapshotId +import org.opensearch.tasks.TaskId.EMPTY_TASK_ID +import org.opensearch.tasks.TaskManager +import org.opensearch.test.ClusterServiceUtils +import org.opensearch.test.ClusterServiceUtils.setState +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.OpenSearchTestCase.assertBusy +import org.opensearch.threadpool.TestThreadPool +import org.junit.Test +import org.mockito.Mockito +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.replication.metadata.ReplicationOverallState +import org.opensearch.replication.metadata.store.ReplicationMetadataStore +import org.opensearch.replication.metadata.store.ReplicationMetadataStore.Companion.REPLICATION_CONFIG_SYSTEM_INDEX +import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType +import java.util.* +import java.util.concurrent.TimeUnit + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +class IndexReplicationTaskTests : OpenSearchTestCase() { + + companion object { + var currentTaskState :IndexReplicationState = InitialState + var stateChanges :Int = 0 + var restoreNotNull = false + + var followerIndex = "follower-index" + var connectionName = "leader-cluster" + var remoteCluster = "remote-cluster" + } + + var threadPool = TestThreadPool("ReplicationPluginTest") + var clusterService = ClusterServiceUtils.createClusterService(threadPool) + + @Test + fun testExecute() = runBlocking { + val replicationTask: IndexReplicationTask = spy(createIndexReplicationTask()) + var taskManager = Mockito.mock(TaskManager::class.java) + replicationTask.setPersistent(taskManager) + var rc = ReplicationContext(followerIndex) + var rm = ReplicationMetadata(connectionName, ReplicationStoreMetadataType.INDEX.name, ReplicationOverallState.RUNNING.name, "reason", rc, rc, Settings.EMPTY) + replicationTask.setReplicationMetadata(rm) + + //Update ClusterState to say restore started + val state: ClusterState = clusterService.state() + + var newClusterState: ClusterState + + // Updating cluster state + var builder: ClusterState.Builder + val indices: MutableList = ArrayList() + indices.add(followerIndex) + val snapshot = Snapshot("$REMOTE_REPOSITORY_PREFIX$connectionName", SnapshotId("randomAlphaOfLength", "randomAlphaOfLength")) + val restoreEntry = RestoreInProgress.Entry("restoreUUID", snapshot, RestoreInProgress.State.INIT, Collections.unmodifiableList(ArrayList(indices)), + null) + + // Update metadata store index as well + var metaBuilder = Metadata.builder() + metaBuilder.put(IndexMetadata.builder(REPLICATION_CONFIG_SYSTEM_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + var metadata = metaBuilder.build() + var routingTableBuilder = RoutingTable.builder() + routingTableBuilder.addAsNew(metadata.index(REPLICATION_CONFIG_SYSTEM_INDEX)) + var routingTable = routingTableBuilder.build() + + builder = ClusterState.builder(state).routingTable(routingTable) + builder.putCustom(RestoreInProgress.TYPE, RestoreInProgress.Builder( + state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).add(restoreEntry).build()) + + newClusterState = builder.build() + setState(clusterService, newClusterState) + + val job = this.launch{ + replicationTask.execute(this, InitialState) + } + + // Delay to let task execute + delay(1000) + + // Assert we move to RESTORING .. This is blocking and won't let the test run + assertBusy({ + assertThat(currentTaskState == RestoreState).isTrue() + }, 1, TimeUnit.SECONDS) + + + //Complete the Restore + metaBuilder = Metadata.builder() + metaBuilder.put(IndexMetadata.builder(followerIndex).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + metadata = metaBuilder.build() + routingTableBuilder = RoutingTable.builder() + routingTableBuilder.addAsNew(metadata.index(followerIndex)) + routingTable = routingTableBuilder.build() + + builder = ClusterState.builder(state).routingTable(routingTable) + builder.putCustom(RestoreInProgress.TYPE, RestoreInProgress.Builder( + state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).build()) + + newClusterState = builder.build() + setState(clusterService, newClusterState) + + delay(1000) + + assertBusy { + assertThat(currentTaskState == MonitoringState).isTrue() + } + + job.cancel() + + } + + private fun createIndexReplicationTask() : IndexReplicationTask { + var threadPool = TestThreadPool("IndexReplicationTask") + //Hack Alert : Though it is meant to force rejection , this is to make overallTaskScope not null + threadPool.startForcingRejections() + clusterService = ClusterServiceUtils.createClusterService(threadPool) + val settingsModule = Mockito.mock(SettingsModule::class.java) + val spyClient = Mockito.spy(NoOpClient("testName")) + + val replicationMetadataManager = ReplicationMetadataManager(clusterService, spyClient, + ReplicationMetadataStore(spyClient, clusterService, NamedXContentRegistry.EMPTY)) + var persist = PersistentTasksService(clusterService, threadPool, spyClient) + val state: ClusterState = clusterService.state() + val tasks = PersistentTasksCustomMetadata.builder() + var sId = ShardId(Index(followerIndex, "_na_"), 0) + tasks.addTask( "replication:0", ShardReplicationExecutor.TASK_NAME, ShardReplicationParams("remoteCluster", sId, sId), + PersistentTasksCustomMetadata.Assignment("other_node_", "test assignment on other node")) + + val metadata = Metadata.builder(state.metadata()) + metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()) + val newClusterState: ClusterState = ClusterState.builder(state).metadata(metadata).build() + + setState(clusterService, newClusterState) + + doAnswer{ invocation -> spyClient }.`when`(spyClient).getRemoteClusterClient(any()) + assert(spyClient.getRemoteClusterClient(remoteCluster) == spyClient) + + val replicationSettings = Mockito.mock(ReplicationSettings::class.java) + replicationSettings.metadataSyncInterval = TimeValue(100, TimeUnit.MILLISECONDS) + val cso = ClusterStateObserver(clusterService, logger, threadPool.threadContext) + val indexReplicationTask = IndexReplicationTask(1, "type", "action", "description" , EMPTY_TASK_ID, + ReplicationPlugin.REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService , threadPool, spyClient, IndexReplicationParams(connectionName, Index(followerIndex, "0"), followerIndex), + persist, replicationMetadataManager, replicationSettings, settingsModule,cso) + + return indexReplicationTask + } +} \ No newline at end of file diff --git a/src/test/kotlin/org/opensearch/replication/task/index/NoOpClient.kt b/src/test/kotlin/org/opensearch/replication/task/index/NoOpClient.kt new file mode 100644 index 00000000..fd9c67ed --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/task/index/NoOpClient.kt @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ +package org.opensearch.replication.task.index + +import com.nhaarman.mockitokotlin2.doReturn +import org.mockito.Mockito +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionResponse +import org.opensearch.action.ActionType +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse +import org.opensearch.action.admin.indices.recovery.RecoveryAction +import org.opensearch.action.admin.indices.recovery.RecoveryResponse +import org.opensearch.action.admin.indices.settings.get.GetSettingsAction +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsAction +import org.opensearch.action.get.GetAction +import org.opensearch.action.get.GetResponse +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.common.UUIDs +import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.collect.ImmutableOpenMap +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.index.Index +import org.opensearch.index.get.GetResult +import org.opensearch.index.shard.ShardId +import org.opensearch.indices.recovery.RecoveryState +import org.opensearch.persistent.PersistentTaskResponse +import org.opensearch.persistent.PersistentTasksCustomMetadata +import org.opensearch.persistent.StartPersistentTaskAction +import org.opensearch.persistent.UpdatePersistentTaskStatusAction +import org.opensearch.replication.ReplicationPlugin +import org.opensearch.replication.action.index.block.UpdateIndexBlockAction +import org.opensearch.replication.metadata.ReplicationMetadataManager +import org.opensearch.replication.metadata.store.ReplicationContext +import org.opensearch.replication.metadata.store.ReplicationMetadata +import org.opensearch.replication.metadata.store.ReplicationMetadataStore +import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType +import org.opensearch.replication.task.shard.ShardReplicationExecutor +import org.opensearch.replication.task.shard.ShardReplicationParams +import org.opensearch.snapshots.RestoreInfo +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.client.NoOpNodeClient +import java.lang.reflect.Field +import java.util.ArrayList +import java.util.HashMap + +open class NoOpClient(testName :String) : NoOpNodeClient(testName) { + @Override + override fun doExecute(action: ActionType?, request: Request?, listener: ActionListener) { + if (action == UpdateSettingsAction.INSTANCE) { + //Update setting to prevent pruning on leader + var settingResponse = AcknowledgedResponse(true) + listener.onResponse(settingResponse as Response) + } else if (action == RestoreSnapshotAction.INSTANCE) { + //begin snapshot operation + var snapResponse = RestoreSnapshotResponse(null as RestoreInfo?) + if (IndexReplicationTaskTests.restoreNotNull) { + snapResponse = RestoreSnapshotResponse(RestoreInfo("name", emptyList(), 1, 1)) + } + listener.onResponse(snapResponse as Response) + } else if (action == UpdatePersistentTaskStatusAction.INSTANCE) { + // update status of index replication task + var r = request as UpdatePersistentTaskStatusAction.Request + val obj: Class<*> = r.javaClass + // access the private variable "state" + val field: Field = obj.getDeclaredField("state") + field.setAccessible(true) + val taskState = field.get(r) as IndexReplicationState + + IndexReplicationTaskTests.currentTaskState = taskState + IndexReplicationTaskTests.stateChanges++ + + var t = Mockito.mock(PersistentTasksCustomMetadata.PersistentTask::class.java) + var t1 = Mockito.mock(PersistentTaskResponse::class.java) + doReturn(t).`when`(t1).task + doReturn(taskState).`when`(t).getState() + //var settingResponse = PersistentTaskResponse(true) + listener.onResponse(t1 as Response) + } else if (action == UpdateIndexBlockAction.INSTANCE) { + // applies index block + var settingResponse = AcknowledgedResponse(true) + listener.onResponse(settingResponse as Response) + } else if (action == StartPersistentTaskAction.INSTANCE) { + var sId = ShardId(Index(IndexReplicationTaskTests.followerIndex, "_na_"), 0) + var t1 = PersistentTaskResponse( + PersistentTasksCustomMetadata.PersistentTask(UUIDs.base64UUID(), ShardReplicationExecutor.TASK_NAME, + ShardReplicationParams(IndexReplicationTaskTests.remoteCluster, sId, sId), + OpenSearchTestCase.randomLong(), PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT)) + + listener.onResponse(t1 as Response) + } else if (action == GetSettingsAction.INSTANCE) { + //called in doesValidIndexExists after restore is complete + val desiredSettingsBuilder = Settings.builder() + desiredSettingsBuilder.put(ReplicationPlugin.REPLICATED_INDEX_SETTING.key, "true") + + val indexToSettings = HashMap() + indexToSettings[IndexReplicationTaskTests.followerIndex] = desiredSettingsBuilder.build() + + val settingsMap = ImmutableOpenMap.builder().putAll(indexToSettings).build() + var settingResponse = GetSettingsResponse(settingsMap, settingsMap) + listener.onResponse(settingResponse as Response) + } else if (action == RecoveryAction.INSTANCE) { + val shardRecoveryStates: MutableMap> = HashMap() + val recoveryStates: MutableList = ArrayList() + recoveryStates.add(Mockito.mock(RecoveryState::class.java)) + shardRecoveryStates.put("follower-index", recoveryStates) + var recoveryResponse = RecoveryResponse(1,1, 1, shardRecoveryStates, listOf()) + listener.onResponse(recoveryResponse as Response) + } else if (action == GetAction.INSTANCE) { + // Replication Metadata store + val replicationMetadata = ReplicationMetadata(IndexReplicationTaskTests.connectionName, + ReplicationStoreMetadataType.INDEX.name, "overallState.name", ReplicationMetadataManager.CUSTOMER_INITIATED_ACTION, + ReplicationContext(IndexReplicationTaskTests.followerIndex, null), + ReplicationContext(IndexReplicationTaskTests.followerIndex, null), Settings.EMPTY) + + var bytesReference = replicationMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS) + var by = BytesReference.bytes(bytesReference) + var result = GetResult(ReplicationMetadataStore.REPLICATION_CONFIG_SYSTEM_INDEX, "_doc", IndexReplicationTaskTests.followerIndex, 1, 1, 1, true, by, null, null) + var getResponse = GetResponse(result) + listener.onResponse(getResponse as Response) + } + } +} \ No newline at end of file