From 21f7a75d4c211c2aa71424b0b01d61df917e667e Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 23 May 2023 10:02:21 -0700 Subject: [PATCH] spotless check Signed-off-by: Poojita Raj --- CHANGELOG.md | 3 +- .../opensearch/index/codec/CodecService.java | 15 +------ .../index/engine/InternalEngine.java | 6 +-- .../engine/NRTReplicationEngineFactory.java | 8 +--- .../opensearch/index/shard/IndexShard.java | 7 ++- .../opensearch/indices/IndicesService.java | 6 +-- .../SegmentReplicationTargetService.java | 2 +- .../SegmentReplicationUpgradeListener.java | 44 ++++++++++++------- .../SegmentReplicationUpgradeService.java | 3 ++ ...egmentReplicationPressureServiceTests.java | 10 ++--- .../index/seqno/ReplicationTrackerTests.java | 10 +++-- .../index/shard/IndexShardTests.java | 16 +++---- ...overyWithRemoteTranslogOnPrimaryTests.java | 4 +- .../SegmentReplicationIndexShardTests.java | 30 ++++++------- 14 files changed, 78 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e1cfdc8d4268..d851312dbff93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665)) - [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526)) - Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420)) +- [Segment Replication] Rolling upgrade support for default codecs ([#7698](https://github.com/opensearch-project/OpenSearch/pull/7698)) ### Dependencies - Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564) @@ -155,4 +156,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java index 203c8a3c87111..83d01f7558110 100644 --- a/server/src/main/java/org/opensearch/index/codec/CodecService.java +++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java @@ -81,22 +81,9 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) { public void loadMap() { opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene95"); opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95"); opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.6.0"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.4.2"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.5.1"), "Lucene94"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.5.0"), "Lucene94"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.4.0"), "Lucene94"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.3.0"), "Lucene93"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.2.1"), "Lucene93"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.2.0"), "Lucene93"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.1.0"), "Lucene92"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.0.1"), "Lucene91"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.0.0"), "Lucene91"); } public Codec codec(String name) { diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index f971e3b7e8a6e..22eeca64328e0 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -33,7 +33,6 @@ package org.opensearch.index.engine; import org.apache.logging.log4j.Logger; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; @@ -90,7 +89,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; -import org.opensearch.index.codec.CodecService; import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; @@ -2319,9 +2317,7 @@ private IndexWriterConfig getIndexWriterConfig() { iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); - Codec bwcCodec = engineConfig.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(engineConfig.getClusterMinVersion())); - iwc.setCodec(bwcCodec); - engineConfig.setCodecName(bwcCodec.getName()); + iwc.setCodec(engineConfig.getCodec()); iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java index e39a44e7d300b..f2bd788d60036 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java @@ -8,10 +8,9 @@ package org.opensearch.index.engine; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.index.codec.CodecService; /** * Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine} @@ -27,10 +26,6 @@ public NRTReplicationEngineFactory(ClusterService clusterService) { this.clusterService = clusterService; } - public NRTReplicationEngineFactory() { - this.clusterService = null; - } - @Override public Engine newReadWriteEngine(EngineConfig config) { if (config.isReadOnlyReplica()) { @@ -39,6 +34,7 @@ public Engine newReadWriteEngine(EngineConfig config) { if (clusterService != null) { DiscoveryNodes nodes = this.clusterService.state().nodes(); config.setClusterMinVersion(nodes.getMinNodeVersion()); + config.setCodecName(config.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(nodes.getMinNodeVersion())).getName()); } return new InternalEngine(config); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d22b2c2022273..43c2fb5555184 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -56,7 +56,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.ThreadInterruptedException; -import org.opensearch.Version; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.core.Assertions; import org.opensearch.ExceptionsHelper; @@ -1789,7 +1788,7 @@ static Engine.Searcher wrapSearcher( } /** - * Used with segment replication during relocation handoff, this method updates current read only engine to global + * Used with segment replication during relocation handoff and rolling upgrades, this method updates current read only engine to global * checkpoint followed by changing to writeable engine * * @throws IOException if communication failed @@ -1798,7 +1797,7 @@ static Engine.Searcher wrapSearcher( * * @opensearch.internal */ - public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException { + public void resetEngine() throws IOException, InterruptedException, TimeoutException { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); } @@ -4346,7 +4345,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { /** * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ - public void resetEngineToGlobalCheckpoint() throws IOException { + void resetEngineToGlobalCheckpoint() throws IOException { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 535b1130245d1..70c517ac553cd 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -302,7 +302,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - public volatile Map indices = emptyMap(); + private volatile Map indices = emptyMap(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @@ -474,10 +474,6 @@ public ClusterService clusterService() { return clusterService; } - public Map indices() { - return indices; - } - @Override protected void doStop() { ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 70af7673bf0e1..110d1c6e4436a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -449,7 +449,7 @@ public void onReplicationDone(SegmentReplicationState state) { try { // Promote engine type for primary target if (indexShard.recoveryState().getPrimary() == true) { - indexShard.resetToWriteableEngine(); + indexShard.resetEngine(); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (InterruptedException | TimeoutException | IOException e) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java index dbbb0c08b6aa4..fd0bfd52f5c81 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java @@ -21,6 +21,11 @@ import java.util.ArrayList; import java.util.List; +/** + * SegmentReplicationUpgradeListener is used to upgrade the opensearch version used by all primaries of a cluster when + * segment replication is enabled and a rolling upgrade is completed (while in mixed cluster state, the primaries use lower codec + * version on their primaries and this needs to be reset once upgrade is complete). + */ public class SegmentReplicationUpgradeListener implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class); @@ -33,29 +38,34 @@ public SegmentReplicationUpgradeListener(IndicesService indicesService) { @Override public void clusterChanged(ClusterChangedEvent event) { - List indexShardList = new ArrayList<>(); - DiscoveryNodes nodes = event.state().nodes(); - if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { - for (IndexService indexService : indicesService.indices().values()) { - for (IndexShard indexShard : indexService) { - try { - if ((indexShard.getEngine().config().isReadOnlyReplica() == false) && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { - indexShardList.add(indexShard); + if (event.nodesChanged()) { + List indexShardList = new ArrayList<>(); + DiscoveryNodes nodes = event.state().nodes(); + if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + try { + if (indexShard.indexSettings().isSegRepEnabled() + && indexShard.indexSettings().getNumberOfReplicas() > 0 + && indexShard.routingEntry().primary() + && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); + } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); } - } catch (AlreadyClosedException e) { - logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); } } } - } - try { - if (indexShardList.isEmpty() == false) { - for (IndexShard is : indexShardList) { - is.resetEngineToGlobalCheckpoint(); + try { + if (indexShardList.isEmpty() == false) { + for (IndexShard indexShard : indexShardList) { + indexShard.resetEngine(); + } } + } catch (Exception e) { + logger.error("Received unexpected exception: [{}]", e.getMessage()); } - } catch (Exception e) { - logger.error("Received unexpected exception: [{}]", e.getMessage()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java index f62a33e8835b1..8b4dc783be146 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java @@ -12,6 +12,9 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.indices.IndicesService; +/** + * SegmentReplicationUpgradeService is used to manage SegmentReplicationUpgradeListener's lifecycle (creation/deletion). + */ public class SegmentReplicationUpgradeService implements Releasable { private final ClusterApplierService clusterApplierService; diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index 1ebdd111bfed3..31762244c38ff 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -53,7 +53,7 @@ public class SegmentReplicationPressureServiceTests extends OpenSearchIndexLevel .build(); public void testIsSegrepLimitBreached() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); @@ -104,7 +104,7 @@ public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Excep .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .build(); - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); @@ -131,7 +131,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception { .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .build(); - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); final SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); @@ -154,7 +154,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception { } public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception { - try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); indexInBatches(5, shards, primaryShard); @@ -198,7 +198,7 @@ public void testFailStaleReplicaTask() throws Exception { .put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(10)) .build(); - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 7cfc95d7f5cff..cd9e19c444498 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -33,6 +33,7 @@ package org.opensearch.index.seqno; import org.apache.lucene.codecs.Codec; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.routing.AllocationId; @@ -1807,7 +1808,8 @@ public void testSegmentReplicationCheckpointTracking() { 1, 1, 1L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Version.CURRENT ); final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1815,7 +1817,8 @@ public void testSegmentReplicationCheckpointTracking() { 2, 2, 50L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Version.CURRENT ); final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1823,7 +1826,8 @@ public void testSegmentReplicationCheckpointTracking() { 2, 3, 100L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Version.CURRENT ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 5253a017f8e0e..757ca14a462c9 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2697,7 +2697,7 @@ public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throw .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC) .build(); - final IndexShard indexShard = newStartedShard(true, settings, new NRTReplicationEngineFactory()); + final IndexShard indexShard = newStartedShard(true, settings, new NRTReplicationEngineFactory(null)); ShardRouting routing = indexShard.routingEntry(); routing = newShardRouting( routing.shardId(), @@ -4693,7 +4693,7 @@ public void testReadOnlyReplicaEngineConfig() throws IOException { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .build(); - final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory(null)); assertFalse(primaryShard.getEngine().config().isReadOnlyReplica()); assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); @@ -4701,7 +4701,7 @@ public void testReadOnlyReplicaEngineConfig() throws IOException { .put(primarySettings) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final IndexShard replicaShard = newStartedShard(false, replicaSettings, new NRTReplicationEngineFactory()); + final IndexShard replicaShard = newStartedShard(false, replicaSettings, new NRTReplicationEngineFactory(null)); assertTrue(replicaShard.getEngine().config().isReadOnlyReplica()); assertEquals(replicaShard.getEngine().getClass(), NRTReplicationEngine.class); @@ -4714,11 +4714,11 @@ public void testTranslogFactoryWithoutRemoteStore() throws IOException { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .build(); - final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); - final IndexShard replicaShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard replicaShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(replicaShard.getEngine().getClass(), InternalEngine.class); assertEquals(replicaShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); @@ -4731,7 +4731,7 @@ public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOExce .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .build(); - final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); closeShards(primaryShard); @@ -4747,7 +4747,7 @@ public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOEx .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") .build(); - final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), RemoteBlobStoreInternalTranslogFactory.class); closeShards(primaryShard); @@ -4763,7 +4763,7 @@ public void testTranslogFactoryForRemoteTranslogBackedReplicaShard() throws IOEx .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") .build(); - final IndexShard replicaShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard replicaShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(replicaShard.getEngine().getClass(), InternalEngine.class); assertEquals(replicaShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); closeShards(replicaShard); diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index d76afca51e354..45b9bb0b0ee6f 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -40,7 +40,7 @@ public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchI .build(); public void testStartSequenceForReplicaRecovery() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory(null))) { shards.startPrimary(); final IndexShard primary = shards.getPrimary(); @@ -111,7 +111,7 @@ public IndexShard indexShard() { } public void testNoTranslogHistoryTransferred() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory(null))) { // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected shards.startPrimary(); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 764d60dbd5b82..157fc9755a10b 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -103,14 +103,14 @@ public void testReplicationCheckpointNullForDocRep() throws IOException { * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices */ public void testReplicationCheckpointNotNullForSegRep() throws IOException { - final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory()); + final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory(null)); final ReplicationCheckpoint replicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); assertNotNull(replicationCheckpoint); closeShards(indexShard); } public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -170,7 +170,7 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; - try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); @@ -219,7 +219,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } public void testIgnoreShardIdle() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -320,7 +320,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { } public void testReplicaReceivesGenIncrease() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -369,7 +369,7 @@ public void testPrimaryRelocation() throws Exception { final IndexShard primaryTarget = newShard( primarySource.routingEntry().getTargetRelocatingShard(), settings, - new NRTReplicationEngineFactory() + new NRTReplicationEngineFactory(null) ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); @@ -406,7 +406,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { final IndexShard primaryTarget = newShard( primarySource.routingEntry().getTargetRelocatingShard(), settings, - new NRTReplicationEngineFactory() + new NRTReplicationEngineFactory(null) ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); @@ -562,7 +562,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { // index docs on new primary and flush // replicate to all. // Expected result: State Gens: P[4], R-1 [4], R-2 [4] - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica_1 = shards.getReplicas().get(0); @@ -593,7 +593,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { } public void testReplicaRestarts() throws Exception { - try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard primary = shards.getPrimary(); // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. @@ -672,7 +672,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .build(); - try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(null), createTempDir())) { shards.startAll(); IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -740,7 +740,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush } public void testNRTReplicaPromotedAsPrimary() throws Exception { - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -808,7 +808,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { } public void testReplicaPromotedWhileReplicating() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -884,7 +884,7 @@ public void onFailure(Exception e) { } public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -926,7 +926,7 @@ public void getSegmentFiles( } public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -968,7 +968,7 @@ public void getSegmentFiles( } public void testPrimaryCancelsExecution() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0);