Skip to content

Commit

Permalink
Cut PersistedState interface from GatewayMetaState (elastic#47001)
Browse files Browse the repository at this point in the history
Today `GatewayMetaState` implements `PersistedState` but it's an error to use
it as a `PersistedState` before it's been started, or if the node is
master-ineligible. It also holds some fields that are meaningless on nodes that
do not persist their states. Finally, it takes responsibility for both loading
the original cluster state and some of the high-level logic for writing the
cluster state back to disk.

This commit addresses these concerns by introducing a more specific
`PersistedState` implementation for use on master-eligible nodes which is only
instantiated if and when it's appropriate. It also moves the fields and
high-level persistence logic into a new `IncrementalClusterStateWriter` with a
more appropriate lifecycle.

Follow-up to elastic#46326 and elastic#46532
Forward-port of elastic#46655
  • Loading branch information
DaveCTurner authored Sep 24, 2019
1 parent 3fa722e commit 215be64
Show file tree
Hide file tree
Showing 8 changed files with 967 additions and 849 deletions.
528 changes: 112 additions & 416 deletions server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

Large diffs are not rendered by default.

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ protected Node(
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService);
final GatewayMetaState gatewayMetaState = new GatewayMetaState();
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
Expand Down Expand Up @@ -693,7 +693,7 @@ public Node start() throws NodeValidationException {

// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(transportService, clusterService,
gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),
injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class));
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
Expand All @@ -35,10 +37,10 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.Collections;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;

public class GatewayMetaStatePersistedStateTests extends ESTestCase {
Expand All @@ -63,21 +65,23 @@ public void tearDown() throws Exception {
super.tearDown();
}

private MockGatewayMetaState newGateway() {
final MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode);
gateway.start();
return gateway;
private CoordinationState.PersistedState newGatewayPersistedState() {
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
gateway.start(settings, nodeEnvironment, xContentRegistry());
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, not(instanceOf(InMemoryPersistedState.class)));
return persistedState;
}

private MockGatewayMetaState maybeNew(MockGatewayMetaState gateway) throws IOException {
private CoordinationState.PersistedState maybeNew(CoordinationState.PersistedState persistedState) {
if (randomBoolean()) {
return newGateway();
return newGatewayPersistedState();
}
return gateway;
return persistedState;
}

public void testInitialState() throws IOException {
MockGatewayMetaState gateway = newGateway();
public void testInitialState() {
CoordinationState.PersistedState gateway = newGatewayPersistedState();
ClusterState state = gateway.getLastAcceptedState();
assertThat(state.getClusterName(), equalTo(clusterName));
assertTrue(MetaData.isGlobalStateEquals(state.metaData(), MetaData.EMPTY_META_DATA));
Expand All @@ -88,8 +92,8 @@ public void testInitialState() throws IOException {
assertThat(currentTerm, equalTo(Manifest.empty().getCurrentTerm()));
}

public void testSetCurrentTerm() throws IOException {
MockGatewayMetaState gateway = newGateway();
public void testSetCurrentTerm() {
CoordinationState.PersistedState gateway = newGatewayPersistedState();

for (int i = 0; i < randomIntBetween(1, 5); i++) {
final long currentTerm = randomNonNegativeLong();
Expand Down Expand Up @@ -142,8 +146,8 @@ private void assertClusterStateEqual(ClusterState expected, ClusterState actual)
}
}

public void testSetLastAcceptedState() throws IOException {
MockGatewayMetaState gateway = newGateway();
public void testSetLastAcceptedState() {
CoordinationState.PersistedState gateway = newGatewayPersistedState();
final long term = randomNonNegativeLong();

for (int i = 0; i < randomIntBetween(1, 5); i++) {
Expand All @@ -165,8 +169,8 @@ public void testSetLastAcceptedState() throws IOException {
}
}

public void testSetLastAcceptedStateTermChanged() throws IOException {
MockGatewayMetaState gateway = newGateway();
public void testSetLastAcceptedStateTermChanged() {
CoordinationState.PersistedState gateway = newGatewayPersistedState();

final String indexName = randomAlphaOfLength(10);
final int numberOfShards = randomIntBetween(1, 5);
Expand All @@ -178,7 +182,7 @@ public void testSetLastAcceptedStateTermChanged() throws IOException {
gateway.setLastAcceptedState(state);

gateway = maybeNew(gateway);
final long newTerm = randomValueOtherThan(term, () -> randomNonNegativeLong());
final long newTerm = randomValueOtherThan(term, ESTestCase::randomNonNegativeLong);
final int newNumberOfShards = randomValueOtherThan(numberOfShards, () -> randomIntBetween(1,5));
final IndexMetaData newIndexMetaData = createIndexMetaData(indexName, newNumberOfShards, version);
final ClusterState newClusterState = createClusterState(randomNonNegativeLong(),
Expand All @@ -189,11 +193,11 @@ public void testSetLastAcceptedStateTermChanged() throws IOException {
assertThat(gateway.getLastAcceptedState().metaData().index(indexName), equalTo(newIndexMetaData));
}

public void testCurrentTermAndTermAreDifferent() throws IOException {
MockGatewayMetaState gateway = newGateway();
public void testCurrentTermAndTermAreDifferent() {
CoordinationState.PersistedState gateway = newGatewayPersistedState();

long currentTerm = randomNonNegativeLong();
long term = randomValueOtherThan(currentTerm, () -> randomNonNegativeLong());
long term = randomValueOtherThan(currentTerm, ESTestCase::randomNonNegativeLong);

gateway.setCurrentTerm(currentTerm);
gateway.setLastAcceptedState(createClusterState(randomNonNegativeLong(),
Expand All @@ -204,8 +208,8 @@ public void testCurrentTermAndTermAreDifferent() throws IOException {
assertThat(gateway.getLastAcceptedState().coordinationMetaData().term(), equalTo(term));
}

public void testMarkAcceptedConfigAsCommitted() throws IOException {
MockGatewayMetaState gateway = newGateway();
public void testMarkAcceptedConfigAsCommitted() {
CoordinationState.PersistedState gateway = newGatewayPersistedState();

//generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration
CoordinationMetaData coordinationMetaData;
Expand Down
Loading

0 comments on commit 215be64

Please sign in to comment.