Skip to content

Commit

Permalink
rolling upgrade
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed May 23, 2023
1 parent b4b3724 commit 1aec373
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 23 deletions.
24 changes: 24 additions & 0 deletions server/src/main/java/org/opensearch/index/codec/CodecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.mapper.MapperService;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -58,8 +60,10 @@ public class CodecService {
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
public static Map<Version, String> opensearchVersionToLuceneCodec = new HashMap<>();

public CodecService(@Nullable MapperService mapperService, Logger logger) {
loadMap();
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
codecs.put(DEFAULT_CODEC, new Lucene95Codec());
Expand All @@ -75,6 +79,26 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) {
this.codecs = codecs.immutableMap();
}

public void loadMap() {
opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.6.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.4.2"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.5.1"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.5.0"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.4.0"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.3.0"), "Lucene93");
opensearchVersionToLuceneCodec.put(Version.fromString("2.2.1"), "Lucene93");
opensearchVersionToLuceneCodec.put(Version.fromString("2.2.0"), "Lucene93");
opensearchVersionToLuceneCodec.put(Version.fromString("2.1.0"), "Lucene92");
opensearchVersionToLuceneCodec.put(Version.fromString("2.0.1"), "Lucene91");
opensearchVersionToLuceneCodec.put(Version.fromString("2.0.0"), "Lucene91");
}

public Codec codec(String name) {
Codec codec = codecs.get(name);
if (codec == null) {
Expand Down
45 changes: 43 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -81,7 +82,7 @@ public final class EngineConfig {
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private volatile boolean enableGcDeletes = true;
private final TimeValue flushMergesAfter;
private final String codecName;
private String codecName;
private final ThreadPool threadPool;
private final Engine.Warmer warmer;
private final Store store;
Expand All @@ -105,6 +106,7 @@ public final class EngineConfig {
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final Comparator<LeafReader> leafSorter;
private Version clusterMinVersion;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -176,7 +178,8 @@ private EngineConfig(Builder builder) {
this.similarity = builder.similarity;
this.codecService = builder.codecService;
this.eventListener = builder.eventListener;
codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING);
this.codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING);
this.clusterMinVersion = Version.CURRENT;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
Expand Down Expand Up @@ -251,6 +254,44 @@ public Codec getCodec() {
return codecService.codec(codecName);
}

/**
* Returns the codec name of the lucene codec used for writing new segments
*/
public String getCodecName() {
return getCodec().getName();
}

/**
* Sets the codec name of the lucene codec used for writing new segments
*/
public void setCodecName(String name) {
this.codecName = name;
}

/**
* Returns the minimum opensearch version among all nodes of a cluster when upgrade is in progress and
* segment replication is enabled; null when upgrade not in progress.
*/
public Version getClusterMinVersion() {
return clusterMinVersion;
}

/**
* Sets the minimum opensearch version among all nodes of a cluster when upgrade is in progress and
* segment replication is enabled.
*/
public void setClusterMinVersion(Version clusterMinVersion) {
this.clusterMinVersion = clusterMinVersion;
}

/**
* Returns the BWC Codec{@link Codec} to be used in the engine during a rolling upgrade when
* cluster is in a mixed version state and segment replication is enabled {@link org.apache.lucene.index.IndexWriter}
*/
public Codec getBWCCodec(String codecName) {
return codecService.codec(codecName);
}

/**
* Returns a thread-pool mainly used to get estimated time stamps from
* {@link org.opensearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.engine;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
Expand Down Expand Up @@ -89,6 +90,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -2317,7 +2319,9 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
Codec bwcCodec = engineConfig.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(engineConfig.getClusterMinVersion()));
iwc.setCodec(bwcCodec);
engineConfig.setCodecName(bwcCodec.getName());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,38 @@

package org.opensearch.index.engine;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;

/**
* Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine}
* and primary with an ${@link InternalEngine}
*
* @opensearch.internal
*/
public class NRTReplicationEngineFactory implements EngineFactory {

private final ClusterService clusterService;

public NRTReplicationEngineFactory(ClusterService clusterService) {
this.clusterService = clusterService;
}

public NRTReplicationEngineFactory() {
this.clusterService = null;
}

@Override
public Engine newReadWriteEngine(EngineConfig config) {
if (config.isReadOnlyReplica()) {
return new NRTReplicationEngine(config);
}
if (clusterService != null) {
DiscoveryNodes nodes = this.clusterService.state().nodes();
config.setClusterMinVersion(nodes.getMinNodeVersion());
}
return new InternalEngine(config);
}
}
14 changes: 9 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -1553,7 +1554,8 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
getEngine().config().getCodec().getName()
getEngine().config().getCodecName(),
getEngine().config().getClusterMinVersion()
)
);
} catch (IOException e) {
Expand Down Expand Up @@ -1606,9 +1608,10 @@ public boolean isSegmentReplicationAllowed() {
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @param localNodeVersion opensearch version of local replica node
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint, Version localNodeVersion) {
if (isSegmentReplicationAllowed() == false) {
return false;
}
Expand All @@ -1629,7 +1632,8 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
);
return false;
}
if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) {
// if replica's OS version is on or after primary version, then can process checkpoint
if (localNodeVersion.onOrAfter(requestCheckpoint.getMinVersion()) == false) {
logger.trace(
() -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec())
);
Expand Down Expand Up @@ -3283,7 +3287,7 @@ private void doCheckIndex() throws IOException {
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));
}

Engine getEngine() {
public Engine getEngine() {
Engine engine = getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine is closed");
Expand Down Expand Up @@ -4350,7 +4354,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
/**
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
public void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are ["
+ getActiveOperations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final ScriptService scriptService;
private final ClusterService clusterService;
private final Client client;
private volatile Map<String, IndexService> indices = emptyMap();
public volatile Map<String, IndexService> indices = emptyMap();
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
private final AtomicInteger numUncompletedDeletes = new AtomicInteger();
private final OldShardsStats oldShardsStats = new OldShardsStats();
Expand Down Expand Up @@ -478,6 +478,10 @@ public ClusterService clusterService() {
return clusterService;
}

public Map<String, IndexService> indices() {
return indices;
}

@Override
protected void doStop() {
ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -856,7 +860,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false);
}
if (idxSettings.isSegRepEnabled()) {
return new NRTReplicationEngineFactory();
return new NRTReplicationEngineFactory(clusterService);
}
return new InternalEngineFactory();
} else if (engineFactories.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -148,12 +147,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (copyState.getCheckpoint().getCodec().equals(request.getCheckpoint().getCodec()) == false) {
logger.trace("Requested unsupported codec version {}", request.getCheckpoint().getCodec());
throw new CancellableThreads.ExecutionCancelledException(
new ParameterizedMessage("Requested unsupported codec version {}", request.getCheckpoint().getCodec()).toString()
);
}
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
if (segrepHandler != null) {
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.BaseExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -227,7 +228,8 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint, localNodeVersion)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;

import java.util.ArrayList;
import java.util.List;

public class SegmentReplicationUpgradeListener implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class);

private final IndicesService indicesService;

public SegmentReplicationUpgradeListener(IndicesService indicesService) {
this.indicesService = indicesService;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
List<IndexShard> indexShardList = new ArrayList<>();
DiscoveryNodes nodes = event.state().nodes();
if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) {
for (IndexService indexService : indicesService.indices().values()) {
for (IndexShard indexShard : indexService) {
try {
if ((indexShard.getEngine().config().isReadOnlyReplica() == false) && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
}
}
}
try {
if (indexShardList.isEmpty() == false) {
for (IndexShard is : indexShardList) {
is.resetEngineToGlobalCheckpoint();
}
}
} catch (Exception e) {
logger.error("Received unexpected exception: [{}]", e.getMessage());
}

}

}
Loading

0 comments on commit 1aec373

Please sign in to comment.