Skip to content

Commit

Permalink
Load metadata at start time not construction time (#46532)
Browse files Browse the repository at this point in the history
Today we load the metadata from disk while constructing the node. However there
is no real need to do so, and this commit moves that code to run later while
the node is starting instead.

Forward-port of #46326
  • Loading branch information
DaveCTurner authored Sep 11, 2019
1 parent c8cba51 commit 9e3822a
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -147,9 +146,9 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy);
transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState,
seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService,
electionStrategy);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -38,7 +38,6 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -71,41 +70,63 @@
* elected as master, it requests metaData from other master eligible nodes. After that, master node performs re-conciliation on the
* gathered results, re-creates {@link ClusterState} and broadcasts this state to other nodes in the cluster.
*/
public class GatewayMetaState implements ClusterStateApplier, CoordinationState.PersistedState {
public class GatewayMetaState implements PersistedState {
protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class);

private final MetaStateService metaStateService;
private final Settings settings;
private final ClusterService clusterService;
private final TransportService transportService;

//there is a single thread executing updateClusterState calls, hence no volatile modifier
// On master-eligible Zen2 nodes, we use this very object for the PersistedState (so that the state is actually persisted); on other
// nodes we use an InMemoryPersistedState instead and persist using a cluster applier if needed. In all cases it's an error to try and
// use this object as a PersistedState before calling start(). TODO stop implementing PersistedState at the top level.
private final SetOnce<PersistedState> persistedState = new SetOnce<>();

// on master-eligible nodes we call updateClusterState under the Coordinator's mutex; on master-ineligible data nodes we call
// updateClusterState on the (unique) cluster applier thread; on other nodes we never call updateClusterState. In all cases there's no
// need to synchronize access to these variables.
protected Manifest previousManifest;
protected ClusterState previousClusterState;
protected boolean incrementalWrite;

public GatewayMetaState(Settings settings, MetaStateService metaStateService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader,
TransportService transportService, ClusterService clusterService) throws IOException {
public GatewayMetaState(Settings settings, MetaStateService metaStateService) {
this.settings = settings;
this.metaStateService = metaStateService;
this.transportService = transportService;
this.clusterService = clusterService;
}

upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings));
public void start(TransportService transportService, ClusterService clusterService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) {
assert previousClusterState == null : "should only start once, but already have " + previousClusterState;
try {
upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings));
} catch (IOException e) {
throw new ElasticsearchException("failed to load metadata", e);
}
incrementalWrite = false;
}

public PersistedState getPersistedState(Settings settings, ClusterApplierService clusterApplierService) {
applyClusterStateUpdaters();
applyClusterStateUpdaters(transportService, clusterService);
if (DiscoveryNode.isMasterNode(settings) == false) {
// use Zen1 way of writing cluster state for non-master-eligible nodes
// this avoids concurrent manipulating of IndexMetadata with IndicesStore
clusterApplierService.addLowPriorityApplier(this);
return new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState());
if (DiscoveryNode.isDataNode(settings)) {
// Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's
// vitally important to the safety of the cluster coordination system that master-eligible nodes persist this metadata when
// _accepting_ the cluster state (i.e. before it is committed). This persistence happens on the generic threadpool.
//
// In contrast, master-ineligible data nodes only persist the index metadata for shards that they hold. When all shards of
// an index are moved off such a node the IndicesStore is responsible for removing the corresponding index directory,
// including the metadata, and does so on the cluster applier thread.
//
// This presents a problem: if a shard is unassigned from a node and then reassigned back to it again then there is a race
// between the IndicesStore deleting the index folder and the CoordinationState concurrently trying to write the updated
// metadata into it. We could probably solve this with careful synchronization, but in fact there is no need. The persisted
// state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index imports, which is
// inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes until applying the
// cluster state, which is what this does:
clusterService.addLowPriorityApplier(this::applyClusterState);
}
persistedState.set(new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState()));
} else {
persistedState.set(this);
}
return this;
}

private void initializeClusterState(ClusterName clusterName) throws IOException {
Expand All @@ -122,7 +143,7 @@ private void initializeClusterState(ClusterName clusterName) throws IOException
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
}

public void applyClusterStateUpdaters() {
protected void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) {
assert previousClusterState.nodes().getLocalNode() == null : "applyClusterStateUpdaters must only be called once";
assert transportService.getLocalNode() != null : "transport service is not yet started";

Expand Down Expand Up @@ -181,15 +202,18 @@ private boolean isMasterOrDataNode() {
return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
}

public PersistedState getPersistedState() {
final PersistedState persistedState = this.persistedState.get();
assert persistedState != null : "not started";
return persistedState;
}

public MetaData getMetaData() {
return previousClusterState.metaData();
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
if (isMasterOrDataNode() == false) {
return;
}
private void applyClusterState(ClusterChangedEvent event) {
assert isMasterOrDataNode();

if (event.state().blocks().disableStatePersistence()) {
incrementalWrite = false;
Expand Down
15 changes: 7 additions & 8 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,7 @@ protected Node(
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService,
metaDataIndexUpgradeService, metaDataUpgrader, transportService, clusterService);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService);
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
Expand Down Expand Up @@ -691,14 +690,14 @@ public Node start() throws NodeValidationException {
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
final MetaData onDiskMetadata;

// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(transportService, clusterService,
injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class));
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
if (DiscoveryNode.isMasterNode(settings()) || DiscoveryNode.isDataNode(settings())) {
onDiskMetadata = injector.getInstance(GatewayMetaState.class).getMetaData();
} else {
onDiskMetadata = MetaData.EMPTY_META_DATA;
}
final MetaData onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metaData();
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),
pluginsService.filterPlugins(Plugin.class).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
Expand All @@ -41,7 +43,7 @@ public class RecoveryWithUnsupportedIndicesIT extends ESIntegTestCase {
/**
* Return settings that could be used to start a node that has the given zipped home directory.
*/
protected Settings prepareBackwardsDataDir(Path backwardsIndex) throws IOException {
private Settings prepareBackwardsDataDir(Path backwardsIndex) throws IOException {
Path indexDir = createTempDir();
Path dataDir = indexDir.resolve("data");
try (InputStream stream = Files.newInputStream(backwardsIndex)) {
Expand Down Expand Up @@ -85,7 +87,8 @@ public void testUpgradeStartClusterOn_2_4_5() throws Exception {

logger.info("Checking static index {}", indexName);
Settings nodeSettings = prepareBackwardsDataDir(getDataPath("/indices/bwc").resolve(indexName + ".zip"));
assertThat(expectThrows(Exception.class, () -> internalCluster().startNode(nodeSettings))
.getCause().getCause().getMessage(), containsString("Format version is not supported"));
assertThat(ExceptionsHelper.unwrap(
expectThrows(Exception.class, () -> internalCluster().startNode(nodeSettings)), CorruptStateException.class).getMessage(),
containsString("Format version is not supported"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public void tearDown() throws Exception {
super.tearDown();
}

private MockGatewayMetaState newGateway() throws IOException {
MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode);
gateway.applyClusterStateUpdaters();
private MockGatewayMetaState newGateway() {
final MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode);
gateway.start();
return gateway;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,10 @@ class MockPersistedState implements CoordinationState.PersistedState {
if (rarely()) {
nodeEnvironment = newNodeEnvironment();
nodeEnvironments.add(nodeEnvironment);
delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode)
.getPersistedState(Settings.EMPTY, null);
final MockGatewayMetaState gatewayMetaState
= new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode);
gatewayMetaState.start();
delegate = gatewayMetaState.getPersistedState();
} else {
nodeEnvironment = null;
delegate = new InMemoryPersistedState(0L,
Expand Down Expand Up @@ -734,8 +736,10 @@ class MockPersistedState implements CoordinationState.PersistedState {
new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(),
manifest.getIndexGenerations()));
}
delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode)
.getPersistedState(Settings.EMPTY, null);
final MockGatewayMetaState gatewayMetaState
= new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode);
gatewayMetaState.start();
delegate = gatewayMetaState.getPersistedState();
} else {
nodeEnvironment = null;
BytesStreamOutput outStream = new BytesStreamOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

import static org.mockito.Mockito.mock;

/**
* {@link GatewayMetaState} constructor accepts a lot of arguments.
* It's not always easy / convenient to construct these dependencies.
Expand All @@ -42,10 +38,8 @@ public class MockGatewayMetaState extends GatewayMetaState {
private final DiscoveryNode localNode;

public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment,
NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) throws IOException {
super(settings, new MetaStateService(nodeEnvironment, xContentRegistry),
mock(MetaDataIndexUpgradeService.class), mock(MetaDataUpgrader.class),
mock(TransportService.class), mock(ClusterService.class));
NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) {
super(settings, new MetaStateService(nodeEnvironment, xContentRegistry));
this.localNode = localNode;
}

Expand All @@ -55,8 +49,12 @@ protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeS
}

@Override
public void applyClusterStateUpdaters() {
public void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) {
// Just set localNode here, not to mess with ClusterService and IndicesService mocking
previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode);
}

public void start() {
start(null, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,16 @@ void resetClient() {
}

void startNode() {
boolean success = false;
try {
node.start();
success = true;
} catch (NodeValidationException e) {
throw new RuntimeException(e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(node);
}
}
}

Expand Down

0 comments on commit 9e3822a

Please sign in to comment.