Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove optimisations to reuse objects when applying a new ClusterState #27317

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,6 @@ boolean processNextCommittedClusterState(String reason) {

final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();
final ClusterState currentState = committedState.get();
final ClusterState adaptedNewClusterState;
// all pending states have been processed
if (newClusterState == null) {
return false;
Expand Down Expand Up @@ -773,54 +772,23 @@ boolean processNextCommittedClusterState(String reason) {
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
adaptedNewClusterState = newClusterState;
} else if (newClusterState.nodes().isLocalNodeElectedMaster() == false) {
// some optimizations to make sure we keep old objects where possible
ClusterState.Builder builder = ClusterState.builder(newClusterState);

// if the routing table did not change, use the original one
if (newClusterState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
// same for metadata
if (newClusterState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
} else {
// if its not the same version, only copy over new indices or ones that changed the version
MetaData.Builder metaDataBuilder = MetaData.builder(newClusterState.metaData()).removeAllIndices();
for (IndexMetaData indexMetaData : newClusterState.metaData()) {
IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.getIndex());
if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.getIndexUUID()) &&
currentIndexMetaData.getVersion() == indexMetaData.getVersion()) {
// safe to reuse
metaDataBuilder.put(currentIndexMetaData, false);
} else {
metaDataBuilder.put(indexMetaData, false);
}
}
builder.metaData(metaDataBuilder);
}

adaptedNewClusterState = builder.build();
} else {
adaptedNewClusterState = newClusterState;
}

if (currentState == adaptedNewClusterState) {
if (currentState == newClusterState) {
return false;
}

committedState.set(adaptedNewClusterState);
committedState.set(newClusterState);

// update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
// and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
if (adaptedNewClusterState.nodes().isLocalNodeElectedMaster()) {
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
// update the set of nodes to ping
nodesFD.updateNodesAndPing(adaptedNewClusterState);
nodesFD.updateNodesAndPing(newClusterState);
} else {
// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(adaptedNewClusterState.nodes().getMasterNode())) {
masterFD.restart(adaptedNewClusterState.nodes().getMasterNode(),
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
masterFD.restart(newClusterState.nodes().getMasterNode(),
"new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,58 @@ public void testSnapshotDeletionsInProgressSerialization() throws Exception {
assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), notNullValue());
}

public void testObjectReuseWhenApplyingClusterStateDiff() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while we're at it , can you check more than just the routing table and the index meta data? we have blocks, nodes and customs (out of the top of my head).

IndexMetaData indexMetaData
= IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(10).numberOfReplicas(1).build();
MetaData metaData = MetaData.builder().put(indexMetaData, true).build();

RoutingTable routingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();

ClusterState clusterState1 = ClusterState.builder(new ClusterName("clusterName1"))
.metaData(metaData).routingTable(routingTable).build();
ClusterState clusterState2 = ClusterState.builder(clusterState1).incrementVersion()
.metaData(MetaData.builder().put(IndexMetaData.builder(indexMetaData).numberOfReplicas(1).build(), true)).build();
ClusterState clusterState3 = ClusterState.builder(clusterState1).incrementVersion()
.metaData(MetaData.builder().put(IndexMetaData.builder(indexMetaData).numberOfReplicas(2).build(), true)).build();

assertNotSame("Should have created a new, equivalent, IndexMetaData object in clusterState2",
clusterState1.metaData().index("test"), clusterState2.metaData().index("test"));
assertNotEquals("Should have created a new, different, IndexMetaData object in clusterState3",
clusterState2.metaData().index("test"), clusterState3.metaData().index("test"));

BytesStreamOutput outStream = new BytesStreamOutput();
outStream.setVersion(Version.CURRENT);
clusterState1.writeTo(outStream);
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
ClusterState serializedClusterState1 = ClusterState.readFrom(inStream, newNode("node4"));

outStream = new BytesStreamOutput();
outStream.setVersion(Version.CURRENT);
clusterState2.diff(clusterState1).writeTo(outStream);
inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
Diff<ClusterState> diff = ClusterState.readDiffFrom(inStream, newNode("node4"));

ClusterState serializedClusterState2 = diff.apply(serializedClusterState1);

assertSame("Unchanged metadata should not create new IndexMetaData objects",
serializedClusterState1.metaData().index("test"), serializedClusterState2.metaData().index("test"));
assertSame("Unchanged routing table should not create new IndexRoutingTable objects",
serializedClusterState1.routingTable().index("test"), serializedClusterState2.routingTable().index("test"));

outStream = new BytesStreamOutput();
outStream.setVersion(Version.CURRENT);
clusterState3.diff(clusterState2).writeTo(outStream);
inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
diff = ClusterState.readDiffFrom(inStream, newNode("node4"));

ClusterState serializedClusterState3 = diff.apply(serializedClusterState2);

assertNotEquals("Should have a new IndexMetaData object",
serializedClusterState2.metaData().index("test"), serializedClusterState3.metaData().index("test"));
assertSame("Unchanged routing table should not create new IndexRoutingTable objects",
serializedClusterState2.routingTable().index("test"), serializedClusterState3.routingTable().index("test"));
}
}