Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Rolling upgrade support for default codecs #7698

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))
- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526))
- Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420))
- [Segment Replication] Rolling upgrade support for default codecs ([#7698](https://github.com/opensearch-project/OpenSearch/pull/7698))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
Expand Down Expand Up @@ -155,4 +156,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ public boolean blocksChanged() {
return state.blocks() != previousState.blocks();
}

/**
* Returns <code>true</code> if a version upgrade has taken place in the cluster
*/
public boolean clusterUpgraded() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to something better maybe hasMixedVersionNodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using this method to check that cluster upgrade has been completed - it checks if it used to have mixed version nodes and current state does not. hasMixedVersionNodes might be misleading in this case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterUpgraded is equivalent to NOT hasMixedVersionNodes.

// previous state was mixed version cluster and current state is not - which indicates upgrade is completed
if ((previousState.nodes().getMinNodeVersion() != previousState.nodes().getMaxNodeVersion())
&& (state.nodes().getMinNodeVersion() == state.nodes().getMaxNodeVersion())) {
return true;
}
return false;
}

/**
* Returns <code>true</code> iff the local node is the mater node of the cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.mapper.MapperService;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -58,8 +61,11 @@ public class CodecService {
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
static Map<Version, String> versionStringMap = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This variable declaration can go inside loadMap() as it is only used to init opensearchVersionToLuceneCodec. It doesn't need to be static

public static Map<Version, String> opensearchVersionToLuceneCodec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: opensearchVersionToLuceneCodec -> versionToCodecMap. There are integrations which overrides Lucene codecs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable can be scoped protected that still allows integrations overriding CodecService to provide their own mapping


public CodecService(@Nullable MapperService mapperService, Logger logger) {
loadMap();
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
codecs.put(DEFAULT_CODEC, new Lucene95Codec());
Expand All @@ -75,6 +81,14 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) {
this.codecs = codecs.immutableMap();
}

public void loadMap() {
versionStringMap.put(Version.fromString("3.0.0"), "Lucene95");
versionStringMap.put(Version.fromString("2.8.0"), "Lucene95");
versionStringMap.put(Version.fromString("2.7.1"), "Lucene95");
versionStringMap.put(Version.fromString("2.7.0"), "Lucene95");
Comment on lines +85 to +88
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. nit: Rather than having specific call, we can static initialize this map. This is due to the fact we are calling this inside class ctor, I don't see advantage of lazy loading.
public static final Map<Version, String> opensearchVersionToLuceneCodec;
static {
Map<Version, String> versionStringMap = new HashMap<>();
versionStringMap.put(Version.fromString("3.0.0"), "Lucene95");
...
opensearchVersionToLuceneCodec = Collections.unmodifiableMap(new HashMap<>(versionStringMap));
}
  1. Can we build this map reading in Version.java, as this info is present there. This will prevent future maintenance of version <-> lucene codec map. I know this is not straightforward as Lucene version bumps doesn't necessarily mean codec bumps. We can take this in follow up PR.

opensearchVersionToLuceneCodec = Collections.unmodifiableMap(new HashMap<>(versionStringMap));
}

public Codec codec(String name) {
Codec codec = codecs.get(name);
if (codec == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -81,7 +82,7 @@ public final class EngineConfig {
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private volatile boolean enableGcDeletes = true;
private final TimeValue flushMergesAfter;
private final String codecName;
private String codecName;
private final ThreadPool threadPool;
private final Engine.Warmer warmer;
private final Store store;
Expand All @@ -105,6 +106,7 @@ public final class EngineConfig {
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final Comparator<LeafReader> leafSorter;
private Version clusterMinVersion;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -176,7 +178,8 @@ private EngineConfig(Builder builder) {
this.similarity = builder.similarity;
this.codecService = builder.codecService;
this.eventListener = builder.eventListener;
codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING);
this.codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING);
this.clusterMinVersion = Version.CURRENT;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
Expand Down Expand Up @@ -251,6 +254,44 @@ public Codec getCodec() {
return codecService.codec(codecName);
}

/**
* Returns the codec name of the lucene codec used for writing new segments
*/
public String getCodecName() {
return getCodec().getName();
}

/**
* Sets the codec name of the lucene codec used for writing new segments
*/
public void setCodecName(String name) {
this.codecName = name;
}

/**
* Returns the minimum opensearch version among all nodes of a cluster when upgrade is in progress and
* segment replication is enabled; null when upgrade not in progress.
*/
public Version getClusterMinVersion() {
return clusterMinVersion;
}

/**
* Sets the minimum opensearch version among all nodes of a cluster when upgrade is in progress and
* segment replication is enabled.
*/
public void setClusterMinVersion(Version clusterMinVersion) {
this.clusterMinVersion = clusterMinVersion;
}

/**
* Returns the BWC Codec{@link Codec} to be used in the engine during a rolling upgrade when
* cluster is in a mixed version state and segment replication is enabled {@link org.apache.lucene.index.IndexWriter}
*/
public Codec getBWCCodec(String codecName) {
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
return codecService.codec(codecName);
}

/**
* Returns a thread-pool mainly used to get estimated time stamps from
* {@link org.opensearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,34 @@

package org.opensearch.index.engine;

import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.codec.CodecService;

/**
* Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine}
* and primary with an ${@link InternalEngine}
*
* @opensearch.internal
*/
public class NRTReplicationEngineFactory implements EngineFactory {

private final ClusterService clusterService;

public NRTReplicationEngineFactory(ClusterService clusterService) {
this.clusterService = clusterService;
}

@Override
public Engine newReadWriteEngine(EngineConfig config) {
if (config.isReadOnlyReplica()) {
return new NRTReplicationEngine(config);
}
if (clusterService != null) {
DiscoveryNodes nodes = this.clusterService.state().nodes();
config.setClusterMinVersion(nodes.getMinNodeVersion());
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
config.setCodecName(config.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(nodes.getMinNodeVersion())).getName());
}
return new InternalEngine(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,8 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
getEngine().config().getCodec().getName()
getEngine().config().getCodecName(),
getEngine().config().getClusterMinVersion()
)
);
} catch (IOException e) {
Expand Down Expand Up @@ -1787,7 +1788,7 @@ static Engine.Searcher wrapSearcher(
}

/**
* Used with segment replication during relocation handoff, this method updates current read only engine to global
* Used with segment replication during relocation handoff and rolling upgrades, this method updates current read only engine to global
* checkpoint followed by changing to writeable engine
*
* @throws IOException if communication failed
Expand All @@ -1796,7 +1797,7 @@ static Engine.Searcher wrapSearcher(
*
* @opensearch.internal
*/
public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException {
public void resetEngine() throws IOException, InterruptedException, TimeoutException {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

Expand Down Expand Up @@ -3277,7 +3278,7 @@ private void doCheckIndex() throws IOException {
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));
}

Engine getEngine() {
public Engine getEngine() {
Engine engine = getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine is closed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false);
}
if (idxSettings.isSegRepEnabled()) {
return new NRTReplicationEngineFactory();
return new NRTReplicationEngineFactory(clusterService);
}
return new InternalEngineFactory();
} else if (engineFactories.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand All @@ -37,7 +39,9 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -170,6 +174,33 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
}
if (event.clusterUpgraded()) {
List<IndexShard> indexShardList = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final ?

DiscoveryNodes nodes = event.state().nodes();
for (IndexService indexService : indicesService) {
if (indexService.getIndexSettings().isSegRepEnabled() && (indexService.getIndexSettings().getNumberOfReplicas() > 0)) {
for (IndexShard indexShard : indexService) {
try {
if (indexShard.routingEntry().primary()
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
Copy link
Member

@dreamer-89 dreamer-89 Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For large clusters (100s of nodes), it is not uncommon to have few nodes running on older OS version, which means running primary shard in bwc for extended period, in worst case forever. I am not sure about the end result of the state. As an improvement, can this switch be performed when nodes containing all shard copies are upgraded.
  2. Performing this engine switch gradually also make more sense versus do it all at once. The user may see indexing requests getting piled up, when upgrade completes.
  3. Need tests.

indexShardList.add(indexShard);
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
}
}
}
try {
if (indexShardList.isEmpty() == false) {
for (IndexShard indexShard : indexShardList) {
indexShard.resetEngine();
Copy link
Member

@dreamer-89 dreamer-89 Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Engine reset is not required when there is no codec change. This change will unnecessarily impact end users post upgrade (delay operations) when it is not really needed.

}
}
} catch (Exception e) {
logger.error("Received unexpected exception: [{}]", e.getMessage());
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.BaseExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -227,6 +228,19 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
}
final Thread thread = Thread.currentThread();
Version localNodeVersion = Version.CURRENT;
// if replica's OS version is not on or after primary version, then can ignore checkpoint
if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) {
logger.trace(
() -> new ParameterizedMessage(
"Ignoring checkpoint, shard not started {} {}\n Shard does not support the received lucene codec version {}",
receivedCheckpoint,
replicaShard.state(),
receivedCheckpoint.getCodec()
Comment on lines +235 to +239
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
() -> new ParameterizedMessage(
"Ignoring checkpoint, shard not started {} {}\n Shard does not support the received lucene codec version {}",
receivedCheckpoint,
replicaShard.state(),
receivedCheckpoint.getCodec()
() -> new ParameterizedMessage(
"Ignoring checkpoint {} as shard does not support the received lucene codec version {}",
receivedCheckpoint,
receivedCheckpoint.getCodec()

)
);
return;
}
Comment on lines +231 to +243
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should go inside shouldProcessCheckpoint containing other validations around processing checkpoint.

if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(replicaShard, new SegmentReplicationListener() {
@Override
Expand Down Expand Up @@ -435,7 +449,7 @@ public void onReplicationDone(SegmentReplicationState state) {
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
indexShard.resetEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
Expand Down
Loading