diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6e1cfdc8d4268..d851312dbff93 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))
- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526))
- Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420))
+- [Segment Replication] Rolling upgrade support for default codecs ([#7698](https://github.com/opensearch-project/OpenSearch/pull/7698))
### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
@@ -155,4 +156,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security
[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
-[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
\ No newline at end of file
+[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
diff --git a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java
index 28085dd6e3860..e25b6d4637e09 100644
--- a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java
+++ b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java
@@ -207,6 +207,18 @@ public boolean blocksChanged() {
return state.blocks() != previousState.blocks();
}
+ /**
+ * Returns true
if a version upgrade has taken place in the cluster
+ */
+ public boolean clusterUpgraded() {
+ // previous state was mixed version cluster and current state is not - which indicates upgrade is completed
+ if ((previousState.nodes().getMinNodeVersion() != previousState.nodes().getMaxNodeVersion())
+ && (state.nodes().getMinNodeVersion() == state.nodes().getMaxNodeVersion())) {
+ return true;
+ }
+ return false;
+ }
+
/**
* Returns true
iff the local node is the mater node of the cluster.
*/
diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java
index e4899c02d37e8..e4f176c33414a 100644
--- a/server/src/main/java/org/opensearch/index/codec/CodecService.java
+++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java
@@ -36,10 +36,13 @@
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode;
+import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.mapper.MapperService;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -58,8 +61,11 @@ public class CodecService {
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
+ static Map versionStringMap = new HashMap<>();
+ public static Map opensearchVersionToLuceneCodec;
public CodecService(@Nullable MapperService mapperService, Logger logger) {
+ loadMap();
final MapBuilder codecs = MapBuilder.newMapBuilder();
if (mapperService == null) {
codecs.put(DEFAULT_CODEC, new Lucene95Codec());
@@ -75,6 +81,14 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) {
this.codecs = codecs.immutableMap();
}
+ public void loadMap() {
+ versionStringMap.put(Version.fromString("3.0.0"), "Lucene95");
+ versionStringMap.put(Version.fromString("2.8.0"), "Lucene95");
+ versionStringMap.put(Version.fromString("2.7.1"), "Lucene95");
+ versionStringMap.put(Version.fromString("2.7.0"), "Lucene95");
+ opensearchVersionToLuceneCodec = Collections.unmodifiableMap(new HashMap<>(versionStringMap));
+ }
+
public Codec codec(String name) {
Codec codec = codecs.get(name);
if (codec == null) {
diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
index 338a541af387a..8249f100f5b66 100644
--- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
+++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
@@ -40,6 +40,7 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
+import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
@@ -81,7 +82,7 @@ public final class EngineConfig {
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private volatile boolean enableGcDeletes = true;
private final TimeValue flushMergesAfter;
- private final String codecName;
+ private String codecName;
private final ThreadPool threadPool;
private final Engine.Warmer warmer;
private final Store store;
@@ -105,6 +106,7 @@ public final class EngineConfig {
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final Comparator leafSorter;
+ private Version clusterMinVersion;
/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -176,7 +178,8 @@ private EngineConfig(Builder builder) {
this.similarity = builder.similarity;
this.codecService = builder.codecService;
this.eventListener = builder.eventListener;
- codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING);
+ this.codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING);
+ this.clusterMinVersion = Version.CURRENT;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
@@ -251,6 +254,44 @@ public Codec getCodec() {
return codecService.codec(codecName);
}
+ /**
+ * Returns the codec name of the lucene codec used for writing new segments
+ */
+ public String getCodecName() {
+ return getCodec().getName();
+ }
+
+ /**
+ * Sets the codec name of the lucene codec used for writing new segments
+ */
+ public void setCodecName(String name) {
+ this.codecName = name;
+ }
+
+ /**
+ * Returns the minimum opensearch version among all nodes of a cluster when upgrade is in progress and
+ * segment replication is enabled; null when upgrade not in progress.
+ */
+ public Version getClusterMinVersion() {
+ return clusterMinVersion;
+ }
+
+ /**
+ * Sets the minimum opensearch version among all nodes of a cluster when upgrade is in progress and
+ * segment replication is enabled.
+ */
+ public void setClusterMinVersion(Version clusterMinVersion) {
+ this.clusterMinVersion = clusterMinVersion;
+ }
+
+ /**
+ * Returns the BWC Codec{@link Codec} to be used in the engine during a rolling upgrade when
+ * cluster is in a mixed version state and segment replication is enabled {@link org.apache.lucene.index.IndexWriter}
+ */
+ public Codec getBWCCodec(String codecName) {
+ return codecService.codec(codecName);
+ }
+
/**
* Returns a thread-pool mainly used to get estimated time stamps from
* {@link org.opensearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java
index 45fe3086ac3f6..f2bd788d60036 100644
--- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java
@@ -8,6 +8,10 @@
package org.opensearch.index.engine;
+import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.index.codec.CodecService;
+
/**
* Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine}
* and primary with an ${@link InternalEngine}
@@ -15,11 +19,23 @@
* @opensearch.internal
*/
public class NRTReplicationEngineFactory implements EngineFactory {
+
+ private final ClusterService clusterService;
+
+ public NRTReplicationEngineFactory(ClusterService clusterService) {
+ this.clusterService = clusterService;
+ }
+
@Override
public Engine newReadWriteEngine(EngineConfig config) {
if (config.isReadOnlyReplica()) {
return new NRTReplicationEngine(config);
}
+ if (clusterService != null) {
+ DiscoveryNodes nodes = this.clusterService.state().nodes();
+ config.setClusterMinVersion(nodes.getMinNodeVersion());
+ config.setCodecName(config.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(nodes.getMinNodeVersion())).getName());
+ }
return new InternalEngine(config);
}
}
diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
index ce5d05065860f..43c2fb5555184 100644
--- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
@@ -1553,7 +1553,8 @@ public Tuple, ReplicationCheckpoint> getLatestSegme
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
- getEngine().config().getCodec().getName()
+ getEngine().config().getCodecName(),
+ getEngine().config().getClusterMinVersion()
)
);
} catch (IOException e) {
@@ -1787,7 +1788,7 @@ static Engine.Searcher wrapSearcher(
}
/**
- * Used with segment replication during relocation handoff, this method updates current read only engine to global
+ * Used with segment replication during relocation handoff and rolling upgrades, this method updates current read only engine to global
* checkpoint followed by changing to writeable engine
*
* @throws IOException if communication failed
@@ -1796,7 +1797,7 @@ static Engine.Searcher wrapSearcher(
*
* @opensearch.internal
*/
- public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException {
+ public void resetEngine() throws IOException, InterruptedException, TimeoutException {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}
@@ -3277,7 +3278,7 @@ private void doCheckIndex() throws IOException {
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));
}
- Engine getEngine() {
+ public Engine getEngine() {
Engine engine = getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine is closed");
diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java
index 58a26f813d88d..70c517ac553cd 100644
--- a/server/src/main/java/org/opensearch/indices/IndicesService.java
+++ b/server/src/main/java/org/opensearch/indices/IndicesService.java
@@ -851,7 +851,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false);
}
if (idxSettings.isSegRepEnabled()) {
- return new NRTReplicationEngineFactory();
+ return new NRTReplicationEngineFactory(clusterService);
}
return new InternalEngineFactory();
} else if (engineFactories.size() == 1) {
diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
index 0e62a4320e3f3..0b1c09d2bdfc1 100644
--- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
+++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
@@ -11,11 +11,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
+import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
@@ -37,7 +39,9 @@
import org.opensearch.transport.TransportService;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -170,6 +174,33 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
}
+ if (event.clusterUpgraded()) {
+ List indexShardList = new ArrayList<>();
+ DiscoveryNodes nodes = event.state().nodes();
+ for (IndexService indexService : indicesService) {
+ if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) {
+ for (IndexShard indexShard : indexService) {
+ try {
+ if (indexShard.routingEntry().primary()
+ && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
+ indexShardList.add(indexShard);
+ }
+ } catch (AlreadyClosedException e) {
+ logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
+ }
+ }
+ }
+ }
+ try {
+ if (indexShardList.isEmpty() == false) {
+ for (IndexShard indexShard : indexShardList) {
+ indexShard.resetEngine();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Received unexpected exception: [{}]", e.getMessage());
+ }
+ }
}
@Override
diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
index 1ce208a9a8234..1bb13bd5fec4d 100644
--- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
+++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
@@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.BaseExceptionsHelper;
+import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
@@ -227,6 +228,19 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
}
final Thread thread = Thread.currentThread();
+ Version localNodeVersion = Version.CURRENT;
+ // if replica's OS version is not on or after primary version, then can ignore checkpoint
+ if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) {
+ logger.trace(
+ () -> new ParameterizedMessage(
+ "Ignoring checkpoint, shard not started {} {}\n Shard does not support the received lucene codec version {}",
+ receivedCheckpoint,
+ replicaShard.state(),
+ receivedCheckpoint.getCodec()
+ )
+ );
+ return;
+ }
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(replicaShard, new SegmentReplicationListener() {
@Override
@@ -435,7 +449,7 @@ public void onReplicationDone(SegmentReplicationState state) {
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
- indexShard.resetToWriteableEngine();
+ indexShard.resetEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java
index 32521fb0cd944..49ea0e583d669 100644
--- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java
+++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java
@@ -32,6 +32,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable, ReplicationCheckpoint> replicaTuple = replica.getLatestSegmentInfosAndCheckpoint();
@@ -160,6 +174,8 @@ private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentI
assertNotNull(segmentInfos);
assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion());
assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration());
+ assertEquals(checkpoint.getCodec(), shard.getEngine().config().getBWCCodec(CodecService.DEFAULT_CODEC).getName());
+ assertEquals(checkpoint.getMinVersion(), Version.CURRENT);
}
public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException {
@@ -170,7 +186,7 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException
public void testSegmentReplication_Index_Update_Delete() throws Exception {
String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}";
- try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
@@ -219,7 +235,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
}
public void testIgnoreShardIdle() throws Exception {
- try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
@@ -309,8 +325,8 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
primaryShard.getReplicationTracker().completeRelocationHandoff();
// Assert that primary shard is no longer in Primary Mode and shard routing is still Primary
- assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
- assertEquals(true, primaryShard.routingEntry().primary());
+ assertFalse(primaryShard.getReplicationTracker().isPrimaryMode());
+ assertTrue(primaryShard.routingEntry().primary());
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard);
@@ -319,8 +335,81 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
closeShards(primaryShard);
}
+ /**
+ * Cluster version check in onNewCheckpoint method should pass when replica version is the same as the received checkpoint version.
+ */
+ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
+ shards.startAll();
+ final IndexShard primary = shards.getPrimary();
+ IndexShard replica = shards.getReplicas().get(0);
+ SegmentReplicationTargetService sut;
+ sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
+ SegmentReplicationTargetService spy = spy(sut);
+ IndexShard spyShard = spy(replica);
+ ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(replica.shardId(), 0L, 0L, 0L, replica.getDefaultCodecName());
+ spy.onNewCheckpoint(checkpoint, spyShard);
+ // passed the cluster version check and moved on to shouldProcessCheckpoint
+ verify(spyShard, times(1)).shouldProcessCheckpoint(checkpoint);
+ }
+ }
+
+ /**
+ * Cluster version check in onNewCheckpoint method should pass when replica version is ahead of the received checkpoint version.
+ */
+ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
+ shards.startAll();
+ final IndexShard primary = shards.getPrimary();
+ IndexShard replica = shards.getReplicas().get(0);
+ SegmentReplicationTargetService sut;
+ sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
+ SegmentReplicationTargetService spy = spy(sut);
+ IndexShard spyShard = spy(replica);
+ ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
+ replica.shardId(),
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ replica.getDefaultCodecName(),
+ Version.fromId(Version.CURRENT.id - 1)
+ );
+ spy.onNewCheckpoint(checkpoint, spyShard);
+ // passed the cluster version check and moved on to shouldProcessCheckpoint
+ verify(spyShard, times(1)).shouldProcessCheckpoint(checkpoint);
+ }
+ }
+
+ /**
+ * Cluster version check in onNewCheckpoint method should fail when replica version is behind the received checkpoint version.
+ */
+ public void testClusterVersionCheckFailOnNewCheckpointBehindVersion() throws Exception {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
+ shards.startAll();
+ final IndexShard primary = shards.getPrimary();
+ IndexShard replica = shards.getReplicas().get(0);
+ SegmentReplicationTargetService sut;
+ sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class));
+ SegmentReplicationTargetService spy = spy(sut);
+ IndexShard spyShard = spy(replica);
+ ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
+ replica.shardId(),
+ 0L,
+ 0L,
+ 0L,
+ 0L,
+ replica.getDefaultCodecName(),
+ Version.fromId(Version.CURRENT.id + 1)
+ );
+ spy.onNewCheckpoint(checkpoint, spyShard);
+ // did not pass the version check and returned before shouldProcessCheckpoint method
+ verify(spyShard, times(0)).shouldProcessCheckpoint(checkpoint);
+ }
+ }
+
public void testReplicaReceivesGenIncrease() throws Exception {
- try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
@@ -369,7 +458,7 @@ public void testPrimaryRelocation() throws Exception {
final IndexShard primaryTarget = newShard(
primarySource.routingEntry().getTargetRelocatingShard(),
settings,
- new NRTReplicationEngineFactory()
+ new NRTReplicationEngineFactory(null)
);
updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata());
@@ -406,7 +495,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
final IndexShard primaryTarget = newShard(
primarySource.routingEntry().getTargetRelocatingShard(),
settings,
- new NRTReplicationEngineFactory()
+ new NRTReplicationEngineFactory(null)
);
updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata());
@@ -562,7 +651,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception {
// index docs on new primary and flush
// replicate to all.
// Expected result: State Gens: P[4], R-1 [4], R-2 [4]
- try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica_1 = shards.getReplicas().get(0);
@@ -593,7 +682,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception {
}
public void testReplicaRestarts() throws Exception {
- try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
IndexShard primary = shards.getPrimary();
// 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point.
@@ -672,7 +761,15 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build();
- try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
+ try (
+ ReplicationGroup shards = createGroup(
+ 1,
+ settings,
+ indexMapping,
+ new NRTReplicationEngineFactory(clusterService),
+ createTempDir()
+ )
+ ) {
shards.startAll();
IndexShard oldPrimary = shards.getPrimary();
final IndexShard nextPrimary = shards.getReplicas().get(0);
@@ -740,7 +837,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush
}
public void testNRTReplicaPromotedAsPrimary() throws Exception {
- try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
IndexShard oldPrimary = shards.getPrimary();
final IndexShard nextPrimary = shards.getReplicas().get(0);
@@ -808,7 +905,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
}
public void testReplicaPromotedWhileReplicating() throws Exception {
- try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
final IndexShard oldPrimary = shards.getPrimary();
final IndexShard nextPrimary = shards.getReplicas().get(0);
@@ -884,7 +981,7 @@ public void onFailure(Exception e) {
}
public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Exception {
- try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
@@ -926,7 +1023,7 @@ public void getSegmentFiles(
}
public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exception {
- try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
@@ -968,7 +1065,7 @@ public void getSegmentFiles(
}
public void testPrimaryCancelsExecution() throws Exception {
- try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java
index eae070b98c4a1..5f1217ef4cffb 100644
--- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java
+++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java
@@ -108,7 +108,7 @@ public void testTranslogHistoryTransferred() throws Exception {
public void testWithSegmentReplication_ReplicaUsesPrimaryTranslogUUID() throws Exception {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
- try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final String expectedUUID = getTranslog(shards.getPrimary()).getTranslogUUID();
assertTrue(
diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java
index ff251f42ab21b..719241452d65a 100644
--- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java
+++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java
@@ -27,7 +27,7 @@ public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLe
.build();
public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception {
- try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) {
+ try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory(null))) {
// Step1 - Start primary, index docs and flush
shards.startPrimary();
diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java
index 677352cdd5120..42e3e2c1f6b93 100644
--- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java
+++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java
@@ -67,7 +67,7 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase {
public void setUp() throws Exception {
super.setUp();
primary = newStartedShard(true, settings);
- replica = newShard(false, settings, new NRTReplicationEngineFactory());
+ replica = newShard(false, settings, new NRTReplicationEngineFactory(null));
recoverReplica(replica, primary, true);
replicaDiscoveryNode = replica.recoveryState().getTargetNode();
primaryDiscoveryNode = replica.recoveryState().getSourceNode();
diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java
index 607f9dd91e35e..c6dc8b797a6cb 100644
--- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java
+++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java
@@ -52,7 +52,7 @@ public void setUp() throws Exception {
super.setUp();
final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build();
primary = newStartedShard(true, settings);
- replica = newShard(false, settings, new NRTReplicationEngineFactory());
+ replica = newShard(false, settings, new NRTReplicationEngineFactory(null));
recoverReplica(replica, primary, true);
replicaDiscoveryNode = replica.recoveryState().getTargetNode();
}
diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java
index 1d1777758972c..911b7d6e3844f 100644
--- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java
+++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java
@@ -88,7 +88,7 @@ public void setUp() throws Exception {
.build();
primaryShard = newStartedShard(true, settings);
String primaryCodec = primaryShard.getLatestReplicationCheckpoint().getCodec();
- replicaShard = newShard(false, settings, new NRTReplicationEngineFactory());
+ replicaShard = newShard(false, settings, new NRTReplicationEngineFactory(null));
recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard));
checkpoint = new ReplicationCheckpoint(
replicaShard.shardId(),
diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java
index 0e711af1afa62..cc51656cb2be2 100644
--- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java
+++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java
@@ -93,7 +93,7 @@ public void setUp() throws Exception {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
- indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory());
+ indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory(null));
spyIndexShard = spy(indexShard);
Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class));
diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java
index b1dd4fb1dcc1e..6c05687b3ce3f 100644
--- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java
@@ -1354,6 +1354,7 @@ public void getSegmentFiles(
};
when(sourceFactory.get(any())).thenReturn(replicationSource);
when(indicesService.getShardOrNull(any())).thenReturn(target);
+ when(indicesService.clusterService()).thenReturn(clusterService);
return targetService;
}