Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Rolling upgrade support for default codecs #7698

Closed
wants to merge 4 commits into from

Conversation

Poojita-Raj
Copy link
Contributor

Description

Supports rolling upgrade for default codecs

  • while cluster is in mixed cluster state, primary downgrades the codec it uses to one that matches the cluster min node version
  • once full cluster is upgraded, primary resets the lucene codec it uses to write segments to latest one

Related Issues

Resolves #7349

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff
  • Commit changes are listed out in CHANGELOG.md file (See: Changelog)

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

Comment on lines 52 to 69
try {
if (indexShardList.isEmpty() == false) {
for (IndexShard is : indexShardList) {
is.resetEngineToGlobalCheckpoint();
}
}
} catch (Exception e) {
logger.error("Received unexpected exception: [{}]", e.getMessage());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause disruptions during upgrades?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throughput will be impacted but we will still queue incoming requests that come in while the switch of index writer is taking place and process them when it's back up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test around this that can confirm the same? Can we run some benchmarks/test to see the impact on performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran 2 benchmarks to confirm on nyc_taxis dataset - saw a 0% error rate and a 0.01% error rate on indexing respectively.

@Poojita-Raj Poojita-Raj requested a review from dbwiddis as a code owner May 24, 2023 23:46
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

/**
* Returns <code>true</code> if a version upgrade has taken place in the cluster
*/
public boolean clusterUpgraded() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to something better maybe hasMixedVersionNodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using this method to check that cluster upgrade has been completed - it checks if it used to have mixed version nodes and current state does not. hasMixedVersionNodes might be misleading in this case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterUpgraded is equivalent to NOT hasMixedVersionNodes.

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

Signed-off-by: Poojita Raj <[email protected]>
Signed-off-by: Poojita Raj <[email protected]>
Signed-off-by: Poojita Raj <[email protected]>
Signed-off-by: Poojita Raj <[email protected]>
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

Comment on lines +85 to +88
versionStringMap.put(Version.fromString("3.0.0"), "Lucene95");
versionStringMap.put(Version.fromString("2.8.0"), "Lucene95");
versionStringMap.put(Version.fromString("2.7.1"), "Lucene95");
versionStringMap.put(Version.fromString("2.7.0"), "Lucene95");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. nit: Rather than having specific call, we can static initialize this map. This is due to the fact we are calling this inside class ctor, I don't see advantage of lazy loading.
public static final Map<Version, String> opensearchVersionToLuceneCodec;
static {
Map<Version, String> versionStringMap = new HashMap<>();
versionStringMap.put(Version.fromString("3.0.0"), "Lucene95");
...
opensearchVersionToLuceneCodec = Collections.unmodifiableMap(new HashMap<>(versionStringMap));
}
  1. Can we build this map reading in Version.java, as this info is present there. This will prevent future maintenance of version <-> lucene codec map. I know this is not straightforward as Lucene version bumps doesn't necessarily mean codec bumps. We can take this in follow up PR.

@@ -71,6 +82,11 @@ public ReplicationCheckpoint(StreamInput in) throws IOException {
length = 0L;
codec = null;
}
if (in.getVersion().onOrAfter(Version.V_2_8_0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For main branch (this PR). This needs to be changed to 3.0.0 or else this will break bwc tests (if any is exercising this code) because this field is not yet present in 2.x 2.9.0 branch. Reading from or sending to, this field to 2.9.0 node will fail.
  2. On 2.x backport, change this back to 2.9.0.
  3. Additional Step/PR. Change main to use 2.9.0 after PR in step 2 is merged.

@@ -58,8 +61,11 @@ public class CodecService {
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
static Map<Version, String> versionStringMap = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This variable declaration can go inside loadMap() as it is only used to init opensearchVersionToLuceneCodec. It doesn't need to be static

@@ -58,8 +61,11 @@ public class CodecService {
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
static Map<Version, String> versionStringMap = new HashMap<>();
public static Map<Version, String> opensearchVersionToLuceneCodec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: opensearchVersionToLuceneCodec -> versionToCodecMap. There are integrations which overrides Lucene codecs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable can be scoped protected that still allows integrations overriding CodecService to provide their own mapping

@@ -170,6 +174,33 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
}
if (event.clusterUpgraded()) {
List<IndexShard> indexShardList = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final ?

for (IndexShard indexShard : indexService) {
try {
if (indexShard.routingEntry().primary()
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
Copy link
Member

@dreamer-89 dreamer-89 Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For large clusters (100s of nodes), it is not uncommon to have few nodes running on older OS version, which means running primary shard in bwc for extended period, in worst case forever. I am not sure about the end result of the state. As an improvement, can this switch be performed when nodes containing all shard copies are upgraded.
  2. Performing this engine switch gradually also make more sense versus do it all at once. The user may see indexing requests getting piled up, when upgrade completes.
  3. Need tests.

@@ -131,6 +154,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(length);
out.writeString(codec);
}
if (out.getVersion().onOrAfter(Version.V_2_8_0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@dreamer-89
Copy link
Member

dreamer-89 commented Jun 6, 2023

Thanks @Poojita-Raj for working on this. Few top level comments.

Lucene major version upgrades

I think Lucene does not allow wiring in previous major version codecs with IndexWriter. For e.g. I see using Lucene87 during index creation results in failures during indexing operations (test code link) when running on 3.0.0 OS version using Lucene95. This can be a problem during Lucene major version upgrades i.e. 8.x -> 9.x. Tests is the best way to verify but at this point don't see a way.

Older codecs provided during index creation

Today, we allow users to provide older codec names as is during index creation. e.g

    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "replication.type": "SEGMENT",
      "codec": "Lucene90"
    }
  }
  1. For existing indices, it is posible that node on specific OS version can have lucene codec which does not conform to mappings we have defined inside CodecService.java. I don't see this as a problem because we are always loading codec (latest) which should still be able to read/write with older codec. We can add test to verify this.
  2. New index creation in mixed version cluster. With replica assignment not allowed on older OS version nodes, there is no need to run in bwc mode.

try {
if (indexShardList.isEmpty() == false) {
for (IndexShard indexShard : indexShardList) {
indexShard.resetEngine();
Copy link
Member

@dreamer-89 dreamer-89 Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Engine reset is not required when there is no codec change. This change will unnecessarily impact end users post upgrade (delay operations) when it is not really needed.

Comment on lines +231 to +243
Version localNodeVersion = Version.CURRENT;
// if replica's OS version is not on or after primary version, then can ignore checkpoint
if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) {
logger.trace(
() -> new ParameterizedMessage(
"Ignoring checkpoint, shard not started {} {}\n Shard does not support the received lucene codec version {}",
receivedCheckpoint,
replicaShard.state(),
receivedCheckpoint.getCodec()
)
);
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should go inside shouldProcessCheckpoint containing other validations around processing checkpoint.

Comment on lines +235 to +239
() -> new ParameterizedMessage(
"Ignoring checkpoint, shard not started {} {}\n Shard does not support the received lucene codec version {}",
receivedCheckpoint,
replicaShard.state(),
receivedCheckpoint.getCodec()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
() -> new ParameterizedMessage(
"Ignoring checkpoint, shard not started {} {}\n Shard does not support the received lucene codec version {}",
receivedCheckpoint,
replicaShard.state(),
receivedCheckpoint.getCodec()
() -> new ParameterizedMessage(
"Ignoring checkpoint {} as shard does not support the received lucene codec version {}",
receivedCheckpoint,
receivedCheckpoint.getCodec()

@dreamer-89
Copy link
Member

dreamer-89 commented Jun 7, 2023

Lucene major version upgrades

I think Lucene does not allow wiring in previous major version codecs with IndexWriter. For e.g. I see using Lucene87 during index creation results in failures during indexing operations (test code link) when running on 3.0.0 OS version using Lucene95. This can be a problem during Lucene major version upgrades i.e. 8.x -> 9.x. Tests is the best way to verify but at this point don't see a way.

Verified that using previous major latest lucene codecs is not allowed and any indexing operation fails with UnsupportedOperationException. Verified this on different version 9x version of lucenes, using latest 8x i.e. Lucene87 codec as mentioned below.

  1. Lucene95 - latest main
  2. Lucene92 5358502
  3. Lucene90 006c832

Step 1. Create index with older lucene index using current lucene version 9x (any of above 3)

{
    "settings": {
        "index": {
            "number_of_shards": 1,
            "number_of_replicas": 1,
            "codec": "Lucene87"
        }
    }
}

Step 2. Index operation

{
    "error": {
        "root_cause": [
            {
                "type": "unsupported_operation_exception",
                "reason": "Old codecs may only be used for reading"
            }
        ],
        "type": "unsupported_operation_exception",
        "reason": "Old codecs may only be used for reading"
    },
    "status": 500
}

It appears lucene only allows codecs which are part of core lucene library and older/bwc codecs are only meant for reading the older segments.

Screenshot 2023-06-07 at 3 26 51 PM

@Poojita-Raj
Copy link
Contributor Author

Closing until a decision is made on what approach to take with rolling upgrades with segment replication enabled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Segment Replication] Mixed cluster version support for default codecs
4 participants