From 4abb5fa297d0598198da67eae7de304e232fc49c Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 9 Nov 2017 08:09:14 +0000 Subject: [PATCH] Remove optimisations to reuse objects when applying a new `ClusterState` (#27317) In order to avoid churn when applying a new `ClusterState`, there are some checks that compare parts of the old and new states and, if equal, the new object is discarded and the old one reused. Since `ClusterState` updates are now largely diff-based, this code is unnecessary: applying a diff also reuses any old objects if unchanged. Moreover, the code compares the parts of the `ClusterState` using their `version()` values which is not guaranteed to be correct, because of a lack of consensus. This change removes this optimisation, and tests that objects are still reused as expected via the diff mechanism. --- .../discovery/zen/ZenDiscovery.java | 44 ++------------ .../ClusterSerializationTests.java | 57 +++++++++++++++++++ 2 files changed, 63 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index d688a5d5cdbae..f9af8aa5c5666 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -735,7 +735,6 @@ boolean processNextCommittedClusterState(String reason) { final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess(); final ClusterState currentState = committedState.get(); - final ClusterState adaptedNewClusterState; // all pending states have been processed if (newClusterState == null) { return false; @@ -773,54 +772,23 @@ boolean processNextCommittedClusterState(String reason) { if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { // its a fresh update from the master as we transition from a start of not having a master to having one logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId()); - adaptedNewClusterState = newClusterState; - } else if (newClusterState.nodes().isLocalNodeElectedMaster() == false) { - // some optimizations to make sure we keep old objects where possible - ClusterState.Builder builder = ClusterState.builder(newClusterState); - - // if the routing table did not change, use the original one - if (newClusterState.routingTable().version() == currentState.routingTable().version()) { - builder.routingTable(currentState.routingTable()); - } - // same for metadata - if (newClusterState.metaData().version() == currentState.metaData().version()) { - builder.metaData(currentState.metaData()); - } else { - // if its not the same version, only copy over new indices or ones that changed the version - MetaData.Builder metaDataBuilder = MetaData.builder(newClusterState.metaData()).removeAllIndices(); - for (IndexMetaData indexMetaData : newClusterState.metaData()) { - IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.getIndex()); - if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.getIndexUUID()) && - currentIndexMetaData.getVersion() == indexMetaData.getVersion()) { - // safe to reuse - metaDataBuilder.put(currentIndexMetaData, false); - } else { - metaDataBuilder.put(indexMetaData, false); - } - } - builder.metaData(metaDataBuilder); - } - - adaptedNewClusterState = builder.build(); - } else { - adaptedNewClusterState = newClusterState; } - if (currentState == adaptedNewClusterState) { + if (currentState == newClusterState) { return false; } - committedState.set(adaptedNewClusterState); + committedState.set(newClusterState); // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node - if (adaptedNewClusterState.nodes().isLocalNodeElectedMaster()) { + if (newClusterState.nodes().isLocalNodeElectedMaster()) { // update the set of nodes to ping - nodesFD.updateNodesAndPing(adaptedNewClusterState); + nodesFD.updateNodesAndPing(newClusterState); } else { // check to see that we monitor the correct master of the cluster - if (masterFD.masterNode() == null || !masterFD.masterNode().equals(adaptedNewClusterState.nodes().getMasterNode())) { - masterFD.restart(adaptedNewClusterState.nodes().getMasterNode(), + if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) { + masterFD.restart(newClusterState.nodes().getMasterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/core/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index 0d363cd3fcf0f..a892b2a293445 100644 --- a/core/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; @@ -43,6 +44,8 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.VersionUtils; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import static org.hamcrest.Matchers.equalTo; @@ -154,4 +157,58 @@ public void testSnapshotDeletionsInProgressSerialization() throws Exception { assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), notNullValue()); } + private ClusterState updateUsingSerialisedDiff(ClusterState original, Diff diff) throws IOException { + BytesStreamOutput outStream = new BytesStreamOutput(); + outStream.setVersion(Version.CURRENT); + diff.writeTo(outStream); + StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), + new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); + diff = ClusterState.readDiffFrom(inStream, newNode("node-name")); + return diff.apply(original); + } + + public void testObjectReuseWhenApplyingClusterStateDiff() throws Exception { + IndexMetaData indexMetaData + = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(10).numberOfReplicas(1).build(); + IndexTemplateMetaData indexTemplateMetaData + = IndexTemplateMetaData.builder("test-template").patterns(new ArrayList<>()).build(); + MetaData metaData = MetaData.builder().put(indexMetaData, true).put(indexTemplateMetaData).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build(); + + ClusterState clusterState1 = ClusterState.builder(new ClusterName("clusterName1")) + .metaData(metaData).routingTable(routingTable).build(); + BytesStreamOutput outStream = new BytesStreamOutput(); + outStream.setVersion(Version.CURRENT); + clusterState1.writeTo(outStream); + StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), + new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); + ClusterState serializedClusterState1 = ClusterState.readFrom(inStream, newNode("node4")); + + // Create a new, albeit equal, IndexMetadata object + ClusterState clusterState2 = ClusterState.builder(clusterState1).incrementVersion() + .metaData(MetaData.builder().put(IndexMetaData.builder(indexMetaData).numberOfReplicas(1).build(), true)).build(); + assertNotSame("Should have created a new, equivalent, IndexMetaData object in clusterState2", + clusterState1.metaData().index("test"), clusterState2.metaData().index("test")); + + ClusterState serializedClusterState2 = updateUsingSerialisedDiff(serializedClusterState1, clusterState2.diff(clusterState1)); + assertSame("Unchanged metadata should not create new IndexMetaData objects", + serializedClusterState1.metaData().index("test"), serializedClusterState2.metaData().index("test")); + assertSame("Unchanged routing table should not create new IndexRoutingTable objects", + serializedClusterState1.routingTable().index("test"), serializedClusterState2.routingTable().index("test")); + + // Create a new and different IndexMetadata object + ClusterState clusterState3 = ClusterState.builder(clusterState1).incrementVersion() + .metaData(MetaData.builder().put(IndexMetaData.builder(indexMetaData).numberOfReplicas(2).build(), true)).build(); + ClusterState serializedClusterState3 = updateUsingSerialisedDiff(serializedClusterState2, clusterState3.diff(clusterState2)); + assertNotEquals("Should have a new IndexMetaData object", + serializedClusterState2.metaData().index("test"), serializedClusterState3.metaData().index("test")); + assertSame("Unchanged routing table should not create new IndexRoutingTable objects", + serializedClusterState2.routingTable().index("test"), serializedClusterState3.routingTable().index("test")); + + assertSame("nodes", serializedClusterState2.nodes(), serializedClusterState3.nodes()); + assertSame("blocks", serializedClusterState2.blocks(), serializedClusterState3.blocks()); + assertSame("template", serializedClusterState2.metaData().templates().get("test-template"), + serializedClusterState3.metaData().templates().get("test-template")); + } }