diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java index e4899c02d37e8..203c8a3c87111 100644 --- a/server/src/main/java/org/opensearch/index/codec/CodecService.java +++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java @@ -36,10 +36,12 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.lucene95.Lucene95Codec; import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.collect.MapBuilder; import org.opensearch.index.mapper.MapperService; +import java.util.HashMap; import java.util.Map; /** @@ -58,8 +60,10 @@ public class CodecService { public static final String BEST_COMPRESSION_CODEC = "best_compression"; /** the raw unfiltered lucene default. useful for testing */ public static final String LUCENE_DEFAULT_CODEC = "lucene_default"; + public static Map opensearchVersionToLuceneCodec = new HashMap<>(); public CodecService(@Nullable MapperService mapperService, Logger logger) { + loadMap(); final MapBuilder codecs = MapBuilder.newMapBuilder(); if (mapperService == null) { codecs.put(DEFAULT_CODEC, new Lucene95Codec()); @@ -75,6 +79,26 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) { this.codecs = codecs.immutableMap(); } + public void loadMap() { + opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.6.0"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.4.2"), "Lucene95"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.5.1"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.5.0"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.4.0"), "Lucene94"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.3.0"), "Lucene93"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.2.1"), "Lucene93"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.2.0"), "Lucene93"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.1.0"), "Lucene92"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.0.1"), "Lucene91"); + opensearchVersionToLuceneCodec.put(Version.fromString("2.0.0"), "Lucene91"); + } + public Codec codec(String name) { Codec codec = codecs.get(name); if (codec == null) { diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 338a541af387a..8249f100f5b66 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -40,6 +40,7 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.similarities.Similarity; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -81,7 +82,7 @@ public final class EngineConfig { private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory; private volatile boolean enableGcDeletes = true; private final TimeValue flushMergesAfter; - private final String codecName; + private String codecName; private final ThreadPool threadPool; private final Engine.Warmer warmer; private final Store store; @@ -105,6 +106,7 @@ public final class EngineConfig { private final boolean isReadOnlyReplica; private final BooleanSupplier primaryModeSupplier; private final Comparator leafSorter; + private Version clusterMinVersion; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -176,7 +178,8 @@ private EngineConfig(Builder builder) { this.similarity = builder.similarity; this.codecService = builder.codecService; this.eventListener = builder.eventListener; - codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING); + this.codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING); + this.clusterMinVersion = Version.CURRENT; // We need to make the indexing buffer for this shard at least as large // as the amount of memory that is available for all engines on the // local node so that decisions to flush segments to disk are made by @@ -251,6 +254,44 @@ public Codec getCodec() { return codecService.codec(codecName); } + /** + * Returns the codec name of the lucene codec used for writing new segments + */ + public String getCodecName() { + return getCodec().getName(); + } + + /** + * Sets the codec name of the lucene codec used for writing new segments + */ + public void setCodecName(String name) { + this.codecName = name; + } + + /** + * Returns the minimum opensearch version among all nodes of a cluster when upgrade is in progress and + * segment replication is enabled; null when upgrade not in progress. + */ + public Version getClusterMinVersion() { + return clusterMinVersion; + } + + /** + * Sets the minimum opensearch version among all nodes of a cluster when upgrade is in progress and + * segment replication is enabled. + */ + public void setClusterMinVersion(Version clusterMinVersion) { + this.clusterMinVersion = clusterMinVersion; + } + + /** + * Returns the BWC Codec{@link Codec} to be used in the engine during a rolling upgrade when + * cluster is in a mixed version state and segment replication is enabled {@link org.apache.lucene.index.IndexWriter} + */ + public Codec getBWCCodec(String codecName) { + return codecService.codec(codecName); + } + /** * Returns a thread-pool mainly used to get estimated time stamps from * {@link org.opensearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 22eeca64328e0..f971e3b7e8a6e 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -33,6 +33,7 @@ package org.opensearch.index.engine; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; @@ -89,6 +90,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; @@ -2317,7 +2319,9 @@ private IndexWriterConfig getIndexWriterConfig() { iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); - iwc.setCodec(engineConfig.getCodec()); + Codec bwcCodec = engineConfig.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(engineConfig.getClusterMinVersion())); + iwc.setCodec(bwcCodec); + engineConfig.setCodecName(bwcCodec.getName()); iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh if (config().getIndexSort() != null) { iwc.setIndexSort(config().getIndexSort()); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java index 45fe3086ac3f6..e39a44e7d300b 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java @@ -8,6 +8,11 @@ package org.opensearch.index.engine; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; + /** * Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine} * and primary with an ${@link InternalEngine} @@ -15,11 +20,26 @@ * @opensearch.internal */ public class NRTReplicationEngineFactory implements EngineFactory { + + private final ClusterService clusterService; + + public NRTReplicationEngineFactory(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public NRTReplicationEngineFactory() { + this.clusterService = null; + } + @Override public Engine newReadWriteEngine(EngineConfig config) { if (config.isReadOnlyReplica()) { return new NRTReplicationEngine(config); } + if (clusterService != null) { + DiscoveryNodes nodes = this.clusterService.state().nodes(); + config.setClusterMinVersion(nodes.getMinNodeVersion()); + } return new InternalEngine(config); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b417133e4a89d..707cb3f1f6c34 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -56,6 +56,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.ThreadInterruptedException; +import org.opensearch.Version; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.core.Assertions; import org.opensearch.ExceptionsHelper; @@ -1553,7 +1554,8 @@ public Tuple, ReplicationCheckpoint> getLatestSegme shardRouting.primary() ? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum() : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(), - getEngine().config().getCodec().getName() + getEngine().config().getCodecName(), + getEngine().config().getClusterMinVersion() ) ); } catch (IOException e) { @@ -1606,9 +1608,10 @@ public boolean isSegmentReplicationAllowed() { * Checks if checkpoint should be processed * * @param requestCheckpoint received checkpoint that is checked for processing + * @param localNodeVersion opensearch version of local replica node * @return true if checkpoint should be processed */ - public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, Version localNodeVersion) { if (isSegmentReplicationAllowed() == false) { return false; } @@ -1629,7 +1632,8 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp ); return false; } - if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) { + // if replica's OS version is on or after primary version, then can process checkpoint + if (localNodeVersion.onOrAfter(requestCheckpoint.getMinVersion()) == false) { logger.trace( () -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec()) ); @@ -3283,7 +3287,7 @@ private void doCheckIndex() throws IOException { recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS))); } - Engine getEngine() { + public Engine getEngine() { Engine engine = getEngineOrNull(); if (engine == null) { throw new AlreadyClosedException("engine is closed"); @@ -4350,7 +4354,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { /** * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ - void resetEngineToGlobalCheckpoint() throws IOException { + public void resetEngineToGlobalCheckpoint() throws IOException { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index cdad2c45638e5..ce392d20613f7 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -304,7 +304,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - private volatile Map indices = emptyMap(); + public volatile Map indices = emptyMap(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @@ -478,6 +478,10 @@ public ClusterService clusterService() { return clusterService; } + public Map indices() { + return indices; + } + @Override protected void doStop() { ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS); @@ -856,7 +860,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) { return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false); } if (idxSettings.isSegRepEnabled()) { - return new NRTReplicationEngineFactory(); + return new NRTReplicationEngineFactory(clusterService); } return new InternalEngineFactory(); } else if (engineFactories.size() == 1) { diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 3ab0a7539fb06..6f04c6cf6f665 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -14,7 +14,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; @@ -148,12 +147,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { if (segrepHandler != null) { logger.warn("Override handler for allocation id {}", request.getTargetAllocationId()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 6c1547fbee82b..2d554f1cfa6df 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.BaseExceptionsHelper; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Nullable; @@ -227,7 +228,8 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } } final Thread thread = Thread.currentThread(); - if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { + Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion(); + if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint, localNodeVersion)) { startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java new file mode 100644 index 0000000000000..dbbb0c08b6aa4 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeListener.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; + +import java.util.ArrayList; +import java.util.List; + +public class SegmentReplicationUpgradeListener implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class); + + private final IndicesService indicesService; + + public SegmentReplicationUpgradeListener(IndicesService indicesService) { + this.indicesService = indicesService; + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + List indexShardList = new ArrayList<>(); + DiscoveryNodes nodes = event.state().nodes(); + if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) { + for (IndexService indexService : indicesService.indices().values()) { + for (IndexShard indexShard : indexService) { + try { + if ((indexShard.getEngine().config().isReadOnlyReplica() == false) && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) { + indexShardList.add(indexShard); + } + } catch (AlreadyClosedException e) { + logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId()); + } + } + } + } + try { + if (indexShardList.isEmpty() == false) { + for (IndexShard is : indexShardList) { + is.resetEngineToGlobalCheckpoint(); + } + } + } catch (Exception e) { + logger.error("Received unexpected exception: [{}]", e.getMessage()); + } + + } + +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java new file mode 100644 index 0000000000000..f62a33e8835b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationUpgradeService.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.common.lease.Releasable; +import org.opensearch.indices.IndicesService; + +public class SegmentReplicationUpgradeService implements Releasable { + + private final ClusterApplierService clusterApplierService; + private final SegmentReplicationUpgradeListener clusterStateListener; + + public SegmentReplicationUpgradeService(IndicesService indicesService, ClusterApplierService clusterApplierService) { + SegmentReplicationUpgradeListener clusterStateListener = new SegmentReplicationUpgradeListener(indicesService); + this.clusterApplierService = clusterApplierService; + this.clusterStateListener = clusterStateListener; + this.clusterApplierService.addListener(this.clusterStateListener); + } + + @Override + public void close() { + this.clusterApplierService.removeListener(this.clusterStateListener); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 32521fb0cd944..49ea0e583d669 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -32,6 +32,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable