From 137177c6c887054270f98e3dc1e0f61886553fb6 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 22 May 2023 09:35:48 -0700 Subject: [PATCH 1/4] rolling upgrade Signed-off-by: Poojita Raj --- .../opensearch/index/codec/CodecService.java | 24 +++++++ .../opensearch/index/engine/EngineConfig.java | 45 ++++++++++++- .../index/engine/InternalEngine.java | 6 +- .../engine/NRTReplicationEngineFactory.java | 20 ++++++ .../opensearch/index/shard/IndexShard.java | 8 ++- .../opensearch/indices/IndicesService.java | 8 ++- .../SegmentReplicationTargetService.java | 14 +++++ .../SegmentReplicationUpgradeListener.java | 63 +++++++++++++++++++ .../SegmentReplicationUpgradeService.java | 31 +++++++++ .../checkpoint/ReplicationCheckpoint.java | 39 ++++++++++-- .../main/java/org/opensearch/node/Node.java | 3 + 11 files changed, 248 insertions(+), 13 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java index e4899c02d37e8..203c8a3c87111 100644 --- a/server/src/main/java/org/opensearch/index/codec/CodecService.java +++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java @@ -36,10 +36,12 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.lucene95.Lucene95Codec; import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.collect.MapBuilder; import org.opensearch.index.mapper.MapperService; +import java.util.HashMap; import java.util.Map; /** @@ -58,8 +60,10 @@ public class CodecService { public static final String BEST_COMPRESSION_CODEC = "best_compression"; /** the raw unfiltered lucene default. useful for testing */ public static final String LUCENE_DEFAULT_CODEC = "lucene_default"; + public static Map opensearchVersionToLuceneCodec = new HashMap<>(); public CodecService(@Nullable MapperService mapperService, Logger logger) { + loadMap(); final MapBuilder codecs = MapBuilder.newMapBuilder(); if (mapperService == null) { codecs.put(DEFAULT_CODEC, new Lucene95Codec()); @@ -75,6 +79,26 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) { this.codecs = codecs.immutableMap(); } + public void loadMap() { + opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.6.0"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.4.2"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.5.1"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.5.0"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.4.0"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.3.0"), "Lucene93"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.2.1"), "Lucene93"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.2.0"), "Lucene93"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.1.0"), "Lucene92"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.0.1"), "Lucene91"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.0.0"), "Lucene91"); + } + public Codec codec(String name) { Codec codec = codecs.get(name); if (codec == null) { diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 338a541af387a..8249f100f5b66 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -40,6 +40,7 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.similarities.Similarity; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -81,7 +82,7 @@ public final class EngineConfig { private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory; private volatile boolean enableGcDeletes = true; private final TimeValue flushMergesAfter; - private final String codecName; + private String codecName; private final ThreadPool threadPool; private final Engine.Warmer warmer; private final Store store; @@ -105,6 +106,7 @@ public final class EngineConfig { private final boolean isReadOnlyReplica; private final BooleanSupplier primaryModeSupplier; private final Comparator leafSorter; + private Version clusterMinVersion; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -176,7 +178,8 @@ private EngineConfig(Builder builder) { this.similarity = builder.similarity; this.codecService = builder.codecService; this.eventListener = builder.eventListener; - codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING); + this.codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING); + this.clusterMinVersion = Version.CURRENT; // We need to make the indexing buffer for this shard at least as large // as the amount of memory that is available for all engines on the // local node so that decisions to flush segments to disk are made by @@ -251,6 +254,44 @@ public Codec getCodec() { return codecService.codec(codecName); } + /** + * Returns the codec name of the lucene codec used for writing new segments + */ + public String getCodecName() { + return getCodec().getName(); + } + + /** + * Sets the codec name of the lucene codec used for writing new segments + */ + public void setCodecName(String name) { + this.codecName = name; + } + + /** + * Returns the minimum opensearch version among all nodes of a cluster when upgrade is in progress and + * segment replication is enabled; null when upgrade not in progress. + */ + public Version getClusterMinVersion() { + return clusterMinVersion; + } + + /** + * Sets the minimum opensearch version among all nodes of a cluster when upgrade is in progress and + * segment replication is enabled. + */ + public void setClusterMinVersion(Version clusterMinVersion) { + this.clusterMinVersion = clusterMinVersion; + } + + /** + * Returns the BWC Codec{@link Codec} to be used in the engine during a rolling upgrade when + * cluster is in a mixed version state and segment replication is enabled {@link org.apache.lucene.index.IndexWriter} + */ + public Codec getBWCCodec(String codecName) { + return codecService.codec(codecName); + } + /** * Returns a thread-pool mainly used to get estimated time stamps from * {@link org.opensearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 22eeca64328e0..f971e3b7e8a6e 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -33,6 +33,7 @@ package org.opensearch.index.engine; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; @@ -89,6 +90,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; @@ -2317,7 +2319,9 @@ private IndexWriterConfig getIndexWriterConfig() { iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); - iwc.setCodec(engineConfig.getCodec()); + Codec bwcCodec = engineConfig.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(engineConfig.getClusterMinVersion())); + iwc.setCodec(bwcCodec); + engineConfig.setCodecName(bwcCodec.getName()); iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java index 45fe3086ac3f6..e39a44e7d300b 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java @@ -8,6 +8,11 @@ package org.opensearch.index.engine; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; + /** * Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine} * and primary with an ${@link InternalEngine} @@ -15,11 +20,26 @@ * @opensearch.internal */ public class NRTReplicationEngineFactory implements EngineFactory { + + private final ClusterService clusterService; + + public NRTReplicationEngineFactory(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public NRTReplicationEngineFactory() { + this.clusterService = null; + } + @Override public Engine newReadWriteEngine(EngineConfig config) { if (config.isReadOnlyReplica()) { return new NRTReplicationEngine(config); } + if (clusterService != null) { + DiscoveryNodes nodes = this.clusterService.state().nodes(); + config.setClusterMinVersion(nodes.getMinNodeVersion()); + } return new InternalEngine(config); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ce5d05065860f..d22b2c2022273 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -56,6 +56,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.ThreadInterruptedException; +import org.opensearch.Version; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.core.Assertions; import org.opensearch.ExceptionsHelper; @@ -1553,7 +1554,8 @@ public Tuple, ReplicationCheckpoint> getLatestSegme shardRouting.primary() ? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum() : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(), - getEngine().config().getCodec().getName() + getEngine().config().getCodecName(), + getEngine().config().getClusterMinVersion() ) ); } catch (IOException e) { @@ -3277,7 +3279,7 @@ private void doCheckIndex() throws IOException { recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS))); } - Engine getEngine() { + public Engine getEngine() { Engine engine = getEngineOrNull(); if (engine == null) { throw new AlreadyClosedException("engine is closed"); @@ -4344,7 +4346,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { /** * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ - void resetEngineToGlobalCheckpoint() throws IOException { + public void resetEngineToGlobalCheckpoint() throws IOException { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 58a26f813d88d..535b1130245d1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -302,7 +302,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - private volatile Map indices = emptyMap(); + public volatile Map indices = emptyMap(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @@ -474,6 +474,10 @@ public ClusterService clusterService() { return clusterService; } + public Map indices() { + return indices; + } + @Override protected void doStop() { ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS); @@ -851,7 +855,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) { return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false); } if (idxSettings.isSegRepEnabled()) { - return new NRTReplicationEngineFactory(); + return new NRTReplicationEngineFactory(clusterService); } return new InternalEngineFactory(); } else if (engineFactories.size() == 1) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1ce208a9a8234..70af7673bf0e1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.BaseExceptionsHelper; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Nullable; @@ -227,6 +228,19 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } } final Thread thread = Thread.currentThread(); + Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion(); + // if replica's OS version is not on or after primary version, then can ignore checkpoint + if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring checkpoint, shard not started {} {}\n Shard does not support the received lucene codec version {}", + receivedCheckpoint, + replicaShard.state(), + receivedCheckpoint.getCodec() + ) + ); + return; + } if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { startReplication(replicaShard, new SegmentReplicationListener() { @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java new file mode 100644 index 0000000000000..dbbb0c08b6aa4 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; + +import java.util.ArrayList; +import java.util.List; + +public class SegmentReplicationUpgradeListener implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class); + + private final IndicesService indicesService; + + public SegmentReplicationUpgradeListener(IndicesService indicesService) { + this.indicesService = indicesService; + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + List indexShardList = new ArrayList<>(); + DiscoveryNodes nodes = event.state().nodes(); + if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { + for (IndexService indexService : indicesService.indices().values()) { + for (IndexShard indexShard : indexService) { + try { + if ((indexShard.getEngine().config().isReadOnlyReplica() == false) && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); + } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); + } + } + } + } + try { + if (indexShardList.isEmpty() == false) { + for (IndexShard is : indexShardList) { + is.resetEngineToGlobalCheckpoint(); + } + } + } catch (Exception e) { + logger.error("Received unexpected exception: [{}]", e.getMessage()); + } + + } + +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java new file mode 100644 index 0000000000000..f62a33e8835b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.common.lease.Releasable; +import org.opensearch.indices.IndicesService; + +public class SegmentReplicationUpgradeService implements Releasable { + + private final ClusterApplierService clusterApplierService; + private final SegmentReplicationUpgradeListener clusterStateListener; + + public SegmentReplicationUpgradeService(IndicesService indicesService, ClusterApplierService clusterApplierService) { + SegmentReplicationUpgradeListener clusterStateListener = new SegmentReplicationUpgradeListener(indicesService); + this.clusterApplierService = clusterApplierService; + this.clusterStateListener = clusterStateListener; + this.clusterApplierService.addListener(this.clusterStateListener); + } + + @Override + public void close() { + this.clusterApplierService.removeListener(this.clusterStateListener); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 32521fb0cd944..49ea0e583d669 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -32,6 +32,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable { resetEngineToGlobalCheckpoint(); }); } @@ -4346,7 +4345,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { /** * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ - public void resetEngineToGlobalCheckpoint() throws IOException { + void resetEngineToGlobalCheckpoint() throws IOException { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 535b1130245d1..70c517ac553cd 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -302,7 +302,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - public volatile Map indices = emptyMap(); + private volatile Map indices = emptyMap(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @@ -474,10 +474,6 @@ public ClusterService clusterService() { return clusterService; } - public Map indices() { - return indices; - } - @Override protected void doStop() { ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 70af7673bf0e1..110d1c6e4436a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -449,7 +449,7 @@ public void onReplicationDone(SegmentReplicationState state) { try { // Promote engine type for primary target if (indexShard.recoveryState().getPrimary() == true) { - indexShard.resetToWriteableEngine(); + indexShard.resetEngine(); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (InterruptedException | TimeoutException | IOException e) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java index dbbb0c08b6aa4..fd0bfd52f5c81 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java @@ -21,6 +21,11 @@ import java.util.ArrayList; import java.util.List; +/** + * SegmentReplicationUpgradeListener is used to upgrade the opensearch version used by all primaries of a cluster when + * segment replication is enabled and a rolling upgrade is completed (while in mixed cluster state, the primaries use lower codec + * version on their primaries and this needs to be reset once upgrade is complete). + */ public class SegmentReplicationUpgradeListener implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class); @@ -33,29 +38,34 @@ public SegmentReplicationUpgradeListener(IndicesService indicesService) { @Override public void clusterChanged(ClusterChangedEvent event) { - List indexShardList = new ArrayList<>(); - DiscoveryNodes nodes = event.state().nodes(); - if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { - for (IndexService indexService : indicesService.indices().values()) { - for (IndexShard indexShard : indexService) { - try { - if ((indexShard.getEngine().config().isReadOnlyReplica() == false) && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { - indexShardList.add(indexShard); + if (event.nodesChanged()) { + List indexShardList = new ArrayList<>(); + DiscoveryNodes nodes = event.state().nodes(); + if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + try { + if (indexShard.indexSettings().isSegRepEnabled() + && indexShard.indexSettings().getNumberOfReplicas() > 0 + && indexShard.routingEntry().primary() + && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); + } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); } - } catch (AlreadyClosedException e) { - logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); } } } - } - try { - if (indexShardList.isEmpty() == false) { - for (IndexShard is : indexShardList) { - is.resetEngineToGlobalCheckpoint(); + try { + if (indexShardList.isEmpty() == false) { + for (IndexShard indexShard : indexShardList) { + indexShard.resetEngine(); + } } + } catch (Exception e) { + logger.error("Received unexpected exception: [{}]", e.getMessage()); } - } catch (Exception e) { - logger.error("Received unexpected exception: [{}]", e.getMessage()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java index f62a33e8835b1..8b4dc783be146 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java @@ -12,6 +12,9 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.indices.IndicesService; +/** + * SegmentReplicationUpgradeService is used to manage SegmentReplicationUpgradeListener's lifecycle (creation/deletion). + */ public class SegmentReplicationUpgradeService implements Releasable { private final ClusterApplierService clusterApplierService; diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index 1ebdd111bfed3..31762244c38ff 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -53,7 +53,7 @@ public class SegmentReplicationPressureServiceTests extends OpenSearchIndexLevel .build(); public void testIsSegrepLimitBreached() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); @@ -104,7 +104,7 @@ public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Excep .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .build(); - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); @@ -131,7 +131,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception { .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) .build(); - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); final SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); @@ -154,7 +154,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception { } public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception { - try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); indexInBatches(5, shards, primaryShard); @@ -198,7 +198,7 @@ public void testFailStaleReplicaTask() throws Exception { .put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(10)) .build(); - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 7cfc95d7f5cff..cd9e19c444498 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -33,6 +33,7 @@ package org.opensearch.index.seqno; import org.apache.lucene.codecs.Codec; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.routing.AllocationId; @@ -1807,7 +1808,8 @@ public void testSegmentReplicationCheckpointTracking() { 1, 1, 1L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Version.CURRENT ); final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1815,7 +1817,8 @@ public void testSegmentReplicationCheckpointTracking() { 2, 2, 50L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Version.CURRENT ); final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1823,7 +1826,8 @@ public void testSegmentReplicationCheckpointTracking() { 2, 3, 100L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Version.CURRENT ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 5253a017f8e0e..757ca14a462c9 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2697,7 +2697,7 @@ public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throw .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC) .build(); - final IndexShard indexShard = newStartedShard(true, settings, new NRTReplicationEngineFactory()); + final IndexShard indexShard = newStartedShard(true, settings, new NRTReplicationEngineFactory(null)); ShardRouting routing = indexShard.routingEntry(); routing = newShardRouting( routing.shardId(), @@ -4693,7 +4693,7 @@ public void testReadOnlyReplicaEngineConfig() throws IOException { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .build(); - final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory(null)); assertFalse(primaryShard.getEngine().config().isReadOnlyReplica()); assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); @@ -4701,7 +4701,7 @@ public void testReadOnlyReplicaEngineConfig() throws IOException { .put(primarySettings) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final IndexShard replicaShard = newStartedShard(false, replicaSettings, new NRTReplicationEngineFactory()); + final IndexShard replicaShard = newStartedShard(false, replicaSettings, new NRTReplicationEngineFactory(null)); assertTrue(replicaShard.getEngine().config().isReadOnlyReplica()); assertEquals(replicaShard.getEngine().getClass(), NRTReplicationEngine.class); @@ -4714,11 +4714,11 @@ public void testTranslogFactoryWithoutRemoteStore() throws IOException { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .build(); - final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); - final IndexShard replicaShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard replicaShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(replicaShard.getEngine().getClass(), InternalEngine.class); assertEquals(replicaShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); @@ -4731,7 +4731,7 @@ public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOExce .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .build(); - final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); closeShards(primaryShard); @@ -4747,7 +4747,7 @@ public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOEx .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") .build(); - final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), RemoteBlobStoreInternalTranslogFactory.class); closeShards(primaryShard); @@ -4763,7 +4763,7 @@ public void testTranslogFactoryForRemoteTranslogBackedReplicaShard() throws IOEx .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") .build(); - final IndexShard replicaShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + final IndexShard replicaShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory(null)); assertEquals(replicaShard.getEngine().getClass(), InternalEngine.class); assertEquals(replicaShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); closeShards(replicaShard); diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index d76afca51e354..45b9bb0b0ee6f 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -40,7 +40,7 @@ public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchI .build(); public void testStartSequenceForReplicaRecovery() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory(null))) { shards.startPrimary(); final IndexShard primary = shards.getPrimary(); @@ -111,7 +111,7 @@ public IndexShard indexShard() { } public void testNoTranslogHistoryTransferred() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory(null))) { // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected shards.startPrimary(); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 764d60dbd5b82..157fc9755a10b 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -103,14 +103,14 @@ public void testReplicationCheckpointNullForDocRep() throws IOException { * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices */ public void testReplicationCheckpointNotNullForSegRep() throws IOException { - final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory()); + final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory(null)); final ReplicationCheckpoint replicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); assertNotNull(replicationCheckpoint); closeShards(indexShard); } public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -170,7 +170,7 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; - try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); @@ -219,7 +219,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } public void testIgnoreShardIdle() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -320,7 +320,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { } public void testReplicaReceivesGenIncrease() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -369,7 +369,7 @@ public void testPrimaryRelocation() throws Exception { final IndexShard primaryTarget = newShard( primarySource.routingEntry().getTargetRelocatingShard(), settings, - new NRTReplicationEngineFactory() + new NRTReplicationEngineFactory(null) ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); @@ -406,7 +406,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { final IndexShard primaryTarget = newShard( primarySource.routingEntry().getTargetRelocatingShard(), settings, - new NRTReplicationEngineFactory() + new NRTReplicationEngineFactory(null) ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); @@ -562,7 +562,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { // index docs on new primary and flush // replicate to all. // Expected result: State Gens: P[4], R-1 [4], R-2 [4] - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica_1 = shards.getReplicas().get(0); @@ -593,7 +593,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { } public void testReplicaRestarts() throws Exception { - try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard primary = shards.getPrimary(); // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. @@ -672,7 +672,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .build(); - try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(null), createTempDir())) { shards.startAll(); IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -740,7 +740,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush } public void testNRTReplicaPromotedAsPrimary() throws Exception { - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -808,7 +808,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { } public void testReplicaPromotedWhileReplicating() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -884,7 +884,7 @@ public void onFailure(Exception e) { } public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -926,7 +926,7 @@ public void getSegmentFiles( } public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -968,7 +968,7 @@ public void getSegmentFiles( } public void testPrimaryCancelsExecution() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); From 172763776fd088a868e5d8c2a5fa515f3657a11b Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 25 May 2023 16:04:12 -0700 Subject: [PATCH 3/4] refactoring Signed-off-by: Poojita Raj --- .../SegmentReplicationSourceService.java | 33 +++++++ .../SegmentReplicationUpgradeListener.java | 73 -------------- .../SegmentReplicationUpgradeService.java | 34 ------- .../main/java/org/opensearch/node/Node.java | 3 - .../SegmentReplicationIndexShardTests.java | 94 ++++++++++++++++--- .../indices/recovery/RecoveryTests.java | 2 +- ...teStorePeerRecoverySourceHandlerTests.java | 2 +- .../OngoingSegmentReplicationsTests.java | 2 +- .../SegmentReplicationSourceHandlerTests.java | 2 +- .../SegmentReplicationTargetServiceTests.java | 2 +- .../SegmentReplicationTargetTests.java | 2 +- .../index/shard/IndexShardTestCase.java | 1 + 12 files changed, 119 insertions(+), 131 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java delete mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 0e62a4320e3f3..3ea45bcbf50d7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -11,11 +11,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -37,7 +39,9 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -170,6 +174,35 @@ public void clusterChanged(ClusterChangedEvent event) { } } } + if (event.nodesChanged()) { + List indexShardList = new ArrayList<>(); + DiscoveryNodes nodes = event.state().nodes(); + if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { + for (IndexService indexService : indicesService) { + if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) { + for (IndexShard indexShard : indexService) { + try { + if (indexShard.routingEntry().primary() + && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); + } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); + } + } + } + } + } + try { + if (indexShardList.isEmpty() == false) { + for (IndexShard indexShard : indexShardList) { + indexShard.resetEngine(); + } + } + } catch (Exception e) { + logger.error("Received unexpected exception: [{}]", e.getMessage()); + } + } } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java deleted file mode 100644 index fd0bfd52f5c81..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; -import org.opensearch.cluster.ClusterChangedEvent; -import org.opensearch.cluster.ClusterStateListener; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.index.IndexService; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesService; - -import java.util.ArrayList; -import java.util.List; - -/** - * SegmentReplicationUpgradeListener is used to upgrade the opensearch version used by all primaries of a cluster when - * segment replication is enabled and a rolling upgrade is completed (while in mixed cluster state, the primaries use lower codec - * version on their primaries and this needs to be reset once upgrade is complete). - */ -public class SegmentReplicationUpgradeListener implements ClusterStateListener { - - private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class); - - private final IndicesService indicesService; - - public SegmentReplicationUpgradeListener(IndicesService indicesService) { - this.indicesService = indicesService; - } - - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.nodesChanged()) { - List indexShardList = new ArrayList<>(); - DiscoveryNodes nodes = event.state().nodes(); - if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - try { - if (indexShard.indexSettings().isSegRepEnabled() - && indexShard.indexSettings().getNumberOfReplicas() > 0 - && indexShard.routingEntry().primary() - && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { - indexShardList.add(indexShard); - } - } catch (AlreadyClosedException e) { - logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); - } - } - } - } - try { - if (indexShardList.isEmpty() == false) { - for (IndexShard indexShard : indexShardList) { - indexShard.resetEngine(); - } - } - } catch (Exception e) { - logger.error("Received unexpected exception: [{}]", e.getMessage()); - } - } - - } - -} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java deleted file mode 100644 index 8b4dc783be146..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication; - -import org.opensearch.cluster.service.ClusterApplierService; -import org.opensearch.common.lease.Releasable; -import org.opensearch.indices.IndicesService; - -/** - * SegmentReplicationUpgradeService is used to manage SegmentReplicationUpgradeListener's lifecycle (creation/deletion). - */ -public class SegmentReplicationUpgradeService implements Releasable { - - private final ClusterApplierService clusterApplierService; - private final SegmentReplicationUpgradeListener clusterStateListener; - - public SegmentReplicationUpgradeService(IndicesService indicesService, ClusterApplierService clusterApplierService) { - SegmentReplicationUpgradeListener clusterStateListener = new SegmentReplicationUpgradeListener(indicesService); - this.clusterApplierService = clusterApplierService; - this.clusterStateListener = clusterStateListener; - this.clusterApplierService.addListener(this.clusterStateListener); - } - - @Override - public void close() { - this.clusterApplierService.removeListener(this.clusterStateListener); - } -} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 524b646813722..a25bac60f49b6 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -52,7 +52,6 @@ import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; -import org.opensearch.indices.replication.SegmentReplicationUpgradeService; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.plugins.ExtensionAwarePlugin; @@ -1098,8 +1097,6 @@ protected Node( b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); } - b.bind(SegmentReplicationUpgradeService.class) - .toInstance(new SegmentReplicationUpgradeService(indicesService, clusterService.getClusterApplierService())); b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); b.bind(PersistentTasksService.class).toInstance(persistentTasksService); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 157fc9755a10b..a887082fce879 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; import org.opensearch.ExceptionsHelper; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; @@ -28,6 +29,7 @@ import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.InternalEngineFactory; @@ -103,18 +105,30 @@ public void testReplicationCheckpointNullForDocRep() throws IOException { * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices */ public void testReplicationCheckpointNotNullForSegRep() throws IOException { - final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory(null)); + final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory(clusterService)); final ReplicationCheckpoint replicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); assertNotNull(replicationCheckpoint); closeShards(indexShard); } public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); + assertEquals( + primary.getEngine().config().getCodecName(), + primary.getEngine().config().getBWCCodec(CodecService.DEFAULT_CODEC).getName() + ); + assertEquals(primary.getEngine().config().getClusterMinVersion(), Version.CURRENT); + + assertEquals( + replica.getEngine().config().getCodecName(), + replica.getEngine().config().getBWCCodec(CodecService.DEFAULT_CODEC).getName() + ); + assertEquals(replica.getEngine().config().getClusterMinVersion(), Version.CURRENT); + // assert before any indexing: // replica: Tuple, ReplicationCheckpoint> replicaTuple = replica.getLatestSegmentInfosAndCheckpoint(); @@ -160,6 +174,8 @@ private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentI assertNotNull(segmentInfos); assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion()); assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration()); + assertEquals(checkpoint.getCodec(), shard.getEngine().config().getBWCCodec(CodecService.DEFAULT_CODEC).getName()); + assertEquals(checkpoint.getMinVersion(), Version.CURRENT); } public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { @@ -170,7 +186,7 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; - try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); @@ -219,7 +235,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } public void testIgnoreShardIdle() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -309,8 +325,8 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { primaryShard.getReplicationTracker().completeRelocationHandoff(); // Assert that primary shard is no longer in Primary Mode and shard routing is still Primary - assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode()); - assertEquals(true, primaryShard.routingEntry().primary()); + assertFalse(primaryShard.getReplicationTracker().isPrimaryMode()); + assertTrue(primaryShard.routingEntry().primary()); spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard); @@ -319,8 +335,48 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { closeShards(primaryShard); } + public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + IndexShard replica = shards.getReplicas().get(0); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primary, replica); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(replica); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(replica.shardId(), 0L, 0L, 0L, replica.getDefaultCodecName()); + spy.onNewCheckpoint(checkpoint, spyShard); + // passed the cluster version check and moved on to shouldProcessCheckpoint + verify(spyShard, times(1)).shouldProcessCheckpoint(checkpoint); + } + } + + public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + IndexShard replica = shards.getReplicas().get(0); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primary, replica); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(replica); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + replica.shardId(), + 0L, + 0L, + 0L, + 0L, + replica.getDefaultCodecName(), + Version.V_2_7_0 + ); + spy.onNewCheckpoint(checkpoint, spyShard); + // passed the cluster version check and moved on to shouldProcessCheckpoint + verify(spyShard, times(1)).shouldProcessCheckpoint(checkpoint); + } + } + public void testReplicaReceivesGenIncrease() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -562,7 +618,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { // index docs on new primary and flush // replicate to all. // Expected result: State Gens: P[4], R-1 [4], R-2 [4] - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); final IndexShard replica_1 = shards.getReplicas().get(0); @@ -593,7 +649,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { } public void testReplicaRestarts() throws Exception { - try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard primary = shards.getPrimary(); // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. @@ -672,7 +728,15 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .build(); - try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(null), createTempDir())) { + try ( + ReplicationGroup shards = createGroup( + 1, + settings, + indexMapping, + new NRTReplicationEngineFactory(clusterService), + createTempDir() + ) + ) { shards.startAll(); IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -740,7 +804,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush } public void testNRTReplicaPromotedAsPrimary() throws Exception { - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -808,7 +872,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { } public void testReplicaPromotedWhileReplicating() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard oldPrimary = shards.getPrimary(); final IndexShard nextPrimary = shards.getReplicas().get(0); @@ -884,7 +948,7 @@ public void onFailure(Exception e) { } public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -926,7 +990,7 @@ public void getSegmentFiles( } public void testReplicaClosesWhileReplicating_AfterGetSegmentFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); @@ -968,7 +1032,7 @@ public void getSegmentFiles( } public void testPrimaryCancelsExecution() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index eae070b98c4a1..5f1217ef4cffb 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -108,7 +108,7 @@ public void testTranslogHistoryTransferred() throws Exception { public void testWithSegmentReplication_ReplicaUsesPrimaryTranslogUUID() throws Exception { Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); - try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory(null))) { shards.startAll(); final String expectedUUID = getTranslog(shards.getPrimary()).getTranslogUUID(); assertTrue( diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index ff251f42ab21b..719241452d65a 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -27,7 +27,7 @@ public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLe .build(); public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory(null))) { // Step1 - Start primary, index docs and flush shards.startPrimary(); diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 677352cdd5120..42e3e2c1f6b93 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -67,7 +67,7 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { public void setUp() throws Exception { super.setUp(); primary = newStartedShard(true, settings); - replica = newShard(false, settings, new NRTReplicationEngineFactory()); + replica = newShard(false, settings, new NRTReplicationEngineFactory(null)); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); primaryDiscoveryNode = replica.recoveryState().getSourceNode(); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index 607f9dd91e35e..c6dc8b797a6cb 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -52,7 +52,7 @@ public void setUp() throws Exception { super.setUp(); final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build(); primary = newStartedShard(true, settings); - replica = newShard(false, settings, new NRTReplicationEngineFactory()); + replica = newShard(false, settings, new NRTReplicationEngineFactory(null)); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1d1777758972c..911b7d6e3844f 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -88,7 +88,7 @@ public void setUp() throws Exception { .build(); primaryShard = newStartedShard(true, settings); String primaryCodec = primaryShard.getLatestReplicationCheckpoint().getCodec(); - replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); + replicaShard = newShard(false, settings, new NRTReplicationEngineFactory(null)); recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); checkpoint = new ReplicationCheckpoint( replicaShard.shardId(), diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 0e711af1afa62..cc51656cb2be2 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -93,7 +93,7 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory(null)); spyIndexShard = spy(indexShard); Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class)); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b1dd4fb1dcc1e..6c05687b3ce3f 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1354,6 +1354,7 @@ public void getSegmentFiles( }; when(sourceFactory.get(any())).thenReturn(replicationSource); when(indicesService.getShardOrNull(any())).thenReturn(target); + when(indicesService.clusterService()).thenReturn(clusterService); return targetService; } From 8745d50cee32e03f0a4ff401c55a57938b0aab88 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 29 May 2023 23:39:40 -0700 Subject: [PATCH 4/4] add unit tests Signed-off-by: Poojita Raj --- .../cluster/ClusterChangedEvent.java | 12 ++++++ .../opensearch/index/codec/CodecService.java | 13 ++++--- .../SegmentReplicationSourceService.java | 22 +++++------ .../SegmentReplicationTargetService.java | 2 +- .../SegmentReplicationIndexShardTests.java | 39 +++++++++++++++++-- 5 files changed, 67 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java index 28085dd6e3860..e25b6d4637e09 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterChangedEvent.java @@ -207,6 +207,18 @@ public boolean blocksChanged() { return state.blocks() != previousState.blocks(); } + /** + * Returns true if a version upgrade has taken place in the cluster + */ + public boolean clusterUpgraded() { + // previous state was mixed version cluster and current state is not - which indicates upgrade is completed + if ((previousState.nodes().getMinNodeVersion() != previousState.nodes().getMaxNodeVersion()) + && (state.nodes().getMinNodeVersion() == state.nodes().getMaxNodeVersion())) { + return true; + } + return false; + } + /** * Returns true iff the local node is the mater node of the cluster. */ diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java index 83d01f7558110..e4f176c33414a 100644 --- a/server/src/main/java/org/opensearch/index/codec/CodecService.java +++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java @@ -41,6 +41,7 @@ import org.opensearch.common.collect.MapBuilder; import org.opensearch.index.mapper.MapperService; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -60,7 +61,8 @@ public class CodecService { public static final String BEST_COMPRESSION_CODEC = "best_compression"; /** the raw unfiltered lucene default. useful for testing */ public static final String LUCENE_DEFAULT_CODEC = "lucene_default"; - public static Map opensearchVersionToLuceneCodec = new HashMap<>(); + static Map versionStringMap = new HashMap<>(); + public static Map opensearchVersionToLuceneCodec; public CodecService(@Nullable MapperService mapperService, Logger logger) { loadMap(); @@ -80,10 +82,11 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) { } public void loadMap() { - opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95"); - opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95"); + versionStringMap.put(Version.fromString("3.0.0"), "Lucene95"); + versionStringMap.put(Version.fromString("2.8.0"), "Lucene95"); + versionStringMap.put(Version.fromString("2.7.1"), "Lucene95"); + versionStringMap.put(Version.fromString("2.7.0"), "Lucene95"); + opensearchVersionToLuceneCodec = Collections.unmodifiableMap(new HashMap<>(versionStringMap)); } public Codec codec(String name) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 3ea45bcbf50d7..0b1c09d2bdfc1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -174,21 +174,19 @@ public void clusterChanged(ClusterChangedEvent event) { } } } - if (event.nodesChanged()) { + if (event.clusterUpgraded()) { List indexShardList = new ArrayList<>(); DiscoveryNodes nodes = event.state().nodes(); - if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { - for (IndexService indexService : indicesService) { - if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) { - for (IndexShard indexShard : indexService) { - try { - if (indexShard.routingEntry().primary() - && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { - indexShardList.add(indexShard); - } - } catch (AlreadyClosedException e) { - logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); + for (IndexService indexService : indicesService) { + if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) { + for (IndexShard indexShard : indexService) { + try { + if (indexShard.routingEntry().primary() + && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 110d1c6e4436a..1bb13bd5fec4d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } } final Thread thread = Thread.currentThread(); - Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion(); + Version localNodeVersion = Version.CURRENT; // if replica's OS version is not on or after primary version, then can ignore checkpoint if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) { logger.trace( diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index a887082fce879..5e4b45568e01d 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -335,13 +335,16 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { closeShards(primaryShard); } + /** + * Cluster version check in onNewCheckpoint method should pass when replica version is the same as the received checkpoint version. + */ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); IndexShard replica = shards.getReplicas().get(0); SegmentReplicationTargetService sut; - sut = prepareForReplication(primary, replica); + sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class)); SegmentReplicationTargetService spy = spy(sut); IndexShard spyShard = spy(replica); ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(replica.shardId(), 0L, 0L, 0L, replica.getDefaultCodecName()); @@ -351,13 +354,16 @@ public void testClusterVersionCheckOnNewCheckpointSameVersion() throws Exception } } + /** + * Cluster version check in onNewCheckpoint method should pass when replica version is ahead of the received checkpoint version. + */ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll(); final IndexShard primary = shards.getPrimary(); IndexShard replica = shards.getReplicas().get(0); SegmentReplicationTargetService sut; - sut = prepareForReplication(primary, replica); + sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class)); SegmentReplicationTargetService spy = spy(sut); IndexShard spyShard = spy(replica); ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( @@ -367,7 +373,7 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio 0L, 0L, replica.getDefaultCodecName(), - Version.V_2_7_0 + Version.fromId(Version.CURRENT.id - 1) ); spy.onNewCheckpoint(checkpoint, spyShard); // passed the cluster version check and moved on to shouldProcessCheckpoint @@ -375,6 +381,33 @@ public void testClusterVersionCheckOnNewCheckpointAheadVersion() throws Exceptio } } + /** + * Cluster version check in onNewCheckpoint method should fail when replica version is behind the received checkpoint version. + */ + public void testClusterVersionCheckFailOnNewCheckpointBehindVersion() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + IndexShard replica = shards.getReplicas().get(0); + SegmentReplicationTargetService sut; + sut = prepareForReplication(primary, replica, mock(TransportService.class), mock(IndicesService.class)); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(replica); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + replica.shardId(), + 0L, + 0L, + 0L, + 0L, + replica.getDefaultCodecName(), + Version.fromId(Version.CURRENT.id + 1) + ); + spy.onNewCheckpoint(checkpoint, spyShard); + // did not pass the version check and returned before shouldProcessCheckpoint method + verify(spyShard, times(0)).shouldProcessCheckpoint(checkpoint); + } + } + public void testReplicaReceivesGenIncrease() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(clusterService))) { shards.startAll();