diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationException.kt b/src/main/kotlin/org/opensearch/replication/ReplicationException.kt index 89d2456c..891be0a3 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationException.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationException.kt @@ -12,22 +12,28 @@ package org.opensearch.replication import org.opensearch.OpenSearchException +import org.opensearch.OpenSearchStatusException import org.opensearch.action.ShardOperationFailedException import org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE import org.opensearch.index.shard.ShardId +import org.opensearch.rest.RestStatus /** * Base class replication exceptions. Note: Replication process may throw exceptions that do not derive from this such as * [org.opensearch.ResourceAlreadyExistsException], [org.opensearch.index.IndexNotFoundException] or * [org.opensearch.index.shard.ShardNotFoundException]. */ -class ReplicationException: OpenSearchException { +class ReplicationException: OpenSearchStatusException { - constructor(message: String, vararg args: Any) : super(message, *args) + constructor(message: String, status : RestStatus, cause: Throwable, vararg args: Any) : super(message, status, cause, *args) - constructor(message: String, cause: Throwable, vararg args: Any) : super(message, cause, *args) + constructor(message: String, vararg args: Any) : super(message, RestStatus.INTERNAL_SERVER_ERROR, *args) - constructor(message: String, shardFailures: Array) : super(message) { + constructor(message: String, status: RestStatus, vararg args: Any) : super(message, status, *args) + + constructor(cause: Throwable, status: RestStatus, vararg args: Any) : super(cause.message, status, *args) + + constructor(message: String, shardFailures: Array): super(message, shardFailures.firstOrNull()?.status()?:RestStatus.INTERNAL_SERVER_ERROR) { shardFailures.firstOrNull()?.let { setShard(ShardId(it.index(), INDEX_UUID_NA_VALUE, it.shardId())) // Add first failure as cause and rest as suppressed... diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 952c462a..a61bb2bc 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -171,6 +171,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_readers_per_shard", 2, 1, Setting.Property.Dynamic, Setting.Property.NodeScope) + val REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_writers_per_shard", 2, 1, + Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_PARALLEL_READ_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.follower.poll_interval", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.autofollow.fetch_poll_interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30), @@ -342,14 +344,14 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getSettings(): List> { return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE, - REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD, - REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS, - REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL, - REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL, - REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING, - REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE) + REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD, + REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS, + REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL, + REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL, + REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING, + REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE, + REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) } - override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry, clusterService: ClusterService, recoverySettings: RecoverySettings): Map { val repoFactory = Repository.Factory { repoMetadata: RepositoryMetadata -> diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt index a6d1bbd3..2b516f8e 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt @@ -24,6 +24,7 @@ open class ReplicationSettings(clusterService: ClusterService) { @Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings) @Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings) @Volatile var readersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) + @Volatile var writersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) @Volatile var batchSize = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) @Volatile var pollDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) @Volatile var autofollowFetchPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL) @@ -41,6 +42,7 @@ open class ReplicationSettings(clusterService: ClusterService) { clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE) { value: ByteSizeValue -> this.chunkSize = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS) { value: Int -> this.concurrentFileChunks = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) { value: Int -> this.readersPerShard = value} + clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) { value: Int -> this.writersPerShard = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) { batchSize = it } clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) { pollDuration = it } clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION) { leaseRenewalMaxFailureDuration = it } diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index 7b58d419..f0a75480 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -214,7 +214,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: // Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job. val rateLimiter = Semaphore(replicationSettings.readersPerShard) val sequencer = TranslogSequencer(scope, replicationMetadata, followerShardId, leaderAlias, leaderShardId.indexName, - TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats) + TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats, replicationSettings.writersPerShard) val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings) followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint @@ -251,7 +251,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1) logInfo("Unable to get changes from seqNo: $fromSeqNo. ${e.stackTraceToString()}") changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) - // Propagate 4xx exceptions up the chain and halt replication as they are irrecoverable val range4xx = 400.rangeTo(499) if (e is OpenSearchException && diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index 38b625bf..97127291 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -21,11 +21,14 @@ import org.opensearch.replication.util.suspendExecuteWithRetries import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ObsoleteCoroutinesApi -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.actor import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore import org.opensearch.client.Client +import org.opensearch.OpenSearchException +import org.opensearch.action.support.TransportActions import org.opensearch.common.logging.Loggers +import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog import org.opensearch.replication.util.indicesService @@ -33,6 +36,8 @@ import org.opensearch.tasks.TaskId import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit +import org.opensearch.rest.RestStatus + /** * A TranslogSequencer allows multiple producers of [Translog.Operation]s to write them in sequence number order to an @@ -50,7 +55,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: private val followerShardId: ShardId, private val leaderAlias: String, private val leaderIndexName: String, private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long, - private val followerClusterStats: FollowerClusterStats) { + private val followerClusterStats: FollowerClusterStats, writersPerShard : Int) { private val unAppliedChanges = ConcurrentHashMap() private val log = Loggers.getLogger(javaClass, followerShardId)!! @@ -59,40 +64,73 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) val indexShard = followerIndexService.getShard(followerShardId.id) - private val sequencer = scope.actor(capacity = Channel.UNLIMITED) { + private val sequencer = scope.actor(capacity = 0) { + // Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will // raise the same exception. See [SendChannel.close] method for details. + val rateLimiter = Semaphore(writersPerShard) var highWatermark = initialSeqNo for (m in channel) { while (unAppliedChanges.containsKey(highWatermark + 1)) { val next = unAppliedChanges.remove(highWatermark + 1)!! val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes, - leaderAlias, leaderIndexName) + leaderAlias, leaderIndexName) replayRequest.parentTask = parentTaskId + rateLimiter.acquire() launch { var relativeStartNanos = System.nanoTime() val retryOnExceptions = ArrayList>() retryOnExceptions.add(MappingNotAvailableException::class.java) + var tryReplay = true + try { + while (tryReplay) { + tryReplay = false + try { + val replayResponse = client.suspendExecuteWithRetries( + replicationMetadata, + ReplayChangesAction.INSTANCE, + replayRequest, + log = log, + retryOn = retryOnExceptions + ) + if (replayResponse.shardInfo.failed > 0) { + replayResponse.shardInfo.failures.forEachIndexed { i, failure -> + log.error("Failed replaying changes. Failure:$i:$failure}") + } + followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet( + replayResponse.shardInfo.failed.toLong() + ) + throw ReplicationException( + "failed to replay changes", + replayResponse.shardInfo.failures + ) + } - val replayResponse = client.suspendExecuteWithRetries( - replicationMetadata, - ReplayChangesAction.INSTANCE, - replayRequest, - log = log, - retryOn = retryOnExceptions - ) - if (replayResponse.shardInfo.failed > 0) { - replayResponse.shardInfo.failures.forEachIndexed { i, failure -> - log.error("Failed replaying changes. Failure:$i:$failure}") + val tookInNanos = System.nanoTime() - relativeStartNanos + followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet( + TimeUnit.NANOSECONDS.toMillis(tookInNanos) + ) + followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet( + replayRequest.changes.size.toLong() + ) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint + } catch (e: OpenSearchException) { + if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass) + || TransportActions.isShardNotAvailableException(e) + // This waits for the dependencies to load and retry. Helps during boot-up + || e.status().status >= 500 + || e.status() == RestStatus.TOO_MANY_REQUESTS)) { + tryReplay = true + } + else { + log.error("Got non-retriable Exception:${e.message} with status:${e.status()}") + throw e + } + } } - followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(replayResponse.shardInfo.failed.toLong()) - throw ReplicationException("failed to replay changes", replayResponse.shardInfo.failures) + } finally { + rateLimiter.release() } - - val tookInNanos = System.nanoTime() - relativeStartNanos - followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos)) - followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong()) - followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint } highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark } @@ -105,8 +143,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: completed.await() } + suspend fun send(changes : GetChangesResponse) { unAppliedChanges[changes.fromSeqNo] = changes sequencer.send(Unit) } -} +} \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt index 96749f7b..56f35fb9 100644 --- a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt +++ b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt @@ -28,6 +28,7 @@ import org.opensearch.action.index.IndexRequestBuilder import org.opensearch.action.index.IndexResponse import org.opensearch.action.support.TransportActions import org.opensearch.client.Client +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId @@ -43,6 +44,7 @@ import org.opensearch.transport.NodeDisconnectedException import org.opensearch.transport.NodeNotConnectedException import java.io.PrintWriter import java.io.StringWriter +import java.lang.Exception /* * Extension function to use the store object @@ -110,7 +112,8 @@ suspend fun Client.suspendExecuteWith defaultContext: Boolean = false): Resp { var currentBackoff = backoff retryOn.addAll(defaultRetryableExceptions()) - repeat(numberOfRetries - 1) { + var retryException: Exception + repeat(numberOfRetries - 1) { index -> try { return suspendExecute(replicationMetadata, action, req, injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) @@ -122,19 +125,29 @@ suspend fun Client.suspendExecuteWith // This waits for the dependencies to load and retry. Helps during boot-up || e.status().status >= 500 || e.status() == RestStatus.TOO_MANY_REQUESTS)) { - log.warn("Encountered a failure while executing in $req. Retrying in ${currentBackoff/1000} seconds" + - ".", e) - delay(currentBackoff) - currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut) + retryException = e; } else { throw e } + } catch (e: OpenSearchRejectedExecutionException) { + if(index < numberOfRetries-2) { + retryException = e; + } + else { + throw ReplicationException(e, RestStatus.TOO_MANY_REQUESTS) + } } + log.warn( + "Encountered a failure while executing in $req. Retrying in ${currentBackoff / 1000} seconds" + + ".", retryException + ) + delay(currentBackoff) + currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut) + } return suspendExecute(replicationMetadata, action, req, injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) // last attempt } - /** * Restore shard from leader cluster with retries. * Only specified error are retried diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 8f56fbb2..32019cc5 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -865,36 +865,31 @@ class StartReplicationIT: MultiClusterRestTestCase() { }, 60L, TimeUnit.SECONDS) } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/cross-cluster-replication/issues/176") fun `test follower stats`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - val leaderIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader" val followerIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower" - val leaderIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader" val followerIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower" -// val followerIndex2 = "follower_index_2" -// val followerIndex3 = "follower_index_3" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create( - CreateIndexRequest(leaderIndexName), - RequestOptions.DEFAULT + CreateIndexRequest(leaderIndexName), + RequestOptions.DEFAULT ) assertThat(createIndexResponse.isAcknowledged).isTrue() followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName, followerIndexName), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true ) followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName2, followerIndexName2), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName2), + TimeValue.timeValueSeconds(10), + true ) followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName3, followerIndexName3), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName3), + TimeValue.timeValueSeconds(10), + true ) val docCount = 50 for (i in 1..docCount) { @@ -902,12 +897,16 @@ class StartReplicationIT: MultiClusterRestTestCase() { leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) } followerClient.pauseReplication(followerIndexName2) - val stats = followerClient.followerStats() + followerClient.stopReplication(followerIndexName3) + var stats = followerClient.followerStats() assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1") assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1") assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0") assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1") - assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + assertBusy({ + stats = followerClient.followerStats() + assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + }, 60, TimeUnit.SECONDS) assertThat(stats.getValue("operations_read").toString()).isEqualTo("50") assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0") assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0") diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index ac377687..623a6396 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -11,6 +11,14 @@ package org.opensearch.replication.task.shard +import org.opensearch.replication.action.changes.GetChangesResponse +import org.opensearch.replication.action.replay.ReplayChangesAction +import org.opensearch.replication.action.replay.ReplayChangesRequest +import org.opensearch.replication.action.replay.ReplayChangesResponse +import org.opensearch.replication.metadata.ReplicationOverallState +import org.opensearch.replication.metadata.store.ReplicationContext +import org.opensearch.replication.metadata.store.ReplicationMetadata +import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ObsoleteCoroutinesApi import kotlinx.coroutines.test.runBlockingTest @@ -27,21 +35,13 @@ import org.opensearch.index.shard.IndexShard import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog import org.opensearch.indices.IndicesService -import org.opensearch.replication.action.changes.GetChangesResponse -import org.opensearch.replication.action.replay.ReplayChangesAction -import org.opensearch.replication.action.replay.ReplayChangesRequest -import org.opensearch.replication.action.replay.ReplayChangesResponse -import org.opensearch.replication.metadata.ReplicationOverallState -import org.opensearch.replication.metadata.store.ReplicationContext -import org.opensearch.replication.metadata.store.ReplicationMetadata -import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId.EMPTY_TASK_ID import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.OpenSearchTestCase.randomList import org.opensearch.test.client.NoOpClient import java.util.Locale - @ObsoleteCoroutinesApi class TranslogSequencerTests : OpenSearchTestCase() { @@ -72,7 +72,7 @@ class TranslogSequencerTests : OpenSearchTestCase() { val leaderIndex = "leaderIndex" val followerShardId = ShardId("follower", "follower_uuid", 0) val replicationMetadata = ReplicationMetadata(leaderAlias, ReplicationStoreMetadataType.INDEX.name, ReplicationOverallState.RUNNING.name, "test user", - ReplicationContext(followerShardId.indexName, null), ReplicationContext(leaderIndex, null), Settings.EMPTY) + ReplicationContext(followerShardId.indexName, null), ReplicationContext(leaderIndex, null), Settings.EMPTY) val client = RequestCapturingClient() init { closeAfterSuite(client) @@ -94,7 +94,7 @@ class TranslogSequencerTests : OpenSearchTestCase() { Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService) Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard) val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, - client, startSeqNo, stats) + client, startSeqNo, stats, 2) // Send requests out of order (shuffled seqNo) and await for them to be processed. var batchSeqNo = startSeqNo @@ -120,7 +120,7 @@ class TranslogSequencerTests : OpenSearchTestCase() { val changes = randomList(1, randomIntBetween(1, 512)) { seqNo = seqNo.inc() Translog.Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), seqNo, - 1L, "{}".toByteArray(Charsets.UTF_8)) + 1L, "{}".toByteArray(Charsets.UTF_8)) } return Pair(GetChangesResponse(changes, startSeqNo.inc(), startSeqNo, -1), seqNo) }