From 9e3822a8622cfb50494b92a5e5674624da412211 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Sep 2019 12:15:56 +0100 Subject: [PATCH] Load metadata at start time not construction time (#46532) Today we load the metadata from disk while constructing the node. However there is no real need to do so, and this commit moves that code to run later while the node is starting instead. Forward-port of #46326 --- .../discovery/DiscoveryModule.java | 7 +- .../gateway/GatewayMetaState.java | 80 ++++++++++++------- .../java/org/elasticsearch/node/Node.java | 15 ++-- .../RecoveryWithUnsupportedIndicesIT.java | 9 ++- .../GatewayMetaStatePersistedStateTests.java | 6 +- .../AbstractCoordinatorTestCase.java | 12 ++- .../gateway/MockGatewayMetaState.java | 16 ++-- .../test/InternalTestCluster.java | 6 ++ 8 files changed, 92 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 892dfe9eaa16e..d1b008ec76eb7 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; -import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -147,9 +146,9 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) { discovery = new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings, - transportService, namedWriteableRegistry, allocationService, masterService, - () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider, - clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy); + transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState, + seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, + electionStrategy); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); } diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 91bcb68370ea1..9abba46ad17e4 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -23,12 +23,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -38,7 +38,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; @@ -71,41 +70,63 @@ * elected as master, it requests metaData from other master eligible nodes. After that, master node performs re-conciliation on the * gathered results, re-creates {@link ClusterState} and broadcasts this state to other nodes in the cluster. */ -public class GatewayMetaState implements ClusterStateApplier, CoordinationState.PersistedState { +public class GatewayMetaState implements PersistedState { protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class); private final MetaStateService metaStateService; private final Settings settings; - private final ClusterService clusterService; - private final TransportService transportService; - //there is a single thread executing updateClusterState calls, hence no volatile modifier + // On master-eligible Zen2 nodes, we use this very object for the PersistedState (so that the state is actually persisted); on other + // nodes we use an InMemoryPersistedState instead and persist using a cluster applier if needed. In all cases it's an error to try and + // use this object as a PersistedState before calling start(). TODO stop implementing PersistedState at the top level. + private final SetOnce persistedState = new SetOnce<>(); + + // on master-eligible nodes we call updateClusterState under the Coordinator's mutex; on master-ineligible data nodes we call + // updateClusterState on the (unique) cluster applier thread; on other nodes we never call updateClusterState. In all cases there's no + // need to synchronize access to these variables. protected Manifest previousManifest; protected ClusterState previousClusterState; protected boolean incrementalWrite; - public GatewayMetaState(Settings settings, MetaStateService metaStateService, - MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader, - TransportService transportService, ClusterService clusterService) throws IOException { + public GatewayMetaState(Settings settings, MetaStateService metaStateService) { this.settings = settings; this.metaStateService = metaStateService; - this.transportService = transportService; - this.clusterService = clusterService; + } - upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader); - initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings)); + public void start(TransportService transportService, ClusterService clusterService, + MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) { + assert previousClusterState == null : "should only start once, but already have " + previousClusterState; + try { + upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader); + initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings)); + } catch (IOException e) { + throw new ElasticsearchException("failed to load metadata", e); + } incrementalWrite = false; - } - public PersistedState getPersistedState(Settings settings, ClusterApplierService clusterApplierService) { - applyClusterStateUpdaters(); + applyClusterStateUpdaters(transportService, clusterService); if (DiscoveryNode.isMasterNode(settings) == false) { - // use Zen1 way of writing cluster state for non-master-eligible nodes - // this avoids concurrent manipulating of IndexMetadata with IndicesStore - clusterApplierService.addLowPriorityApplier(this); - return new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState()); + if (DiscoveryNode.isDataNode(settings)) { + // Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's + // vitally important to the safety of the cluster coordination system that master-eligible nodes persist this metadata when + // _accepting_ the cluster state (i.e. before it is committed). This persistence happens on the generic threadpool. + // + // In contrast, master-ineligible data nodes only persist the index metadata for shards that they hold. When all shards of + // an index are moved off such a node the IndicesStore is responsible for removing the corresponding index directory, + // including the metadata, and does so on the cluster applier thread. + // + // This presents a problem: if a shard is unassigned from a node and then reassigned back to it again then there is a race + // between the IndicesStore deleting the index folder and the CoordinationState concurrently trying to write the updated + // metadata into it. We could probably solve this with careful synchronization, but in fact there is no need. The persisted + // state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index imports, which is + // inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes until applying the + // cluster state, which is what this does: + clusterService.addLowPriorityApplier(this::applyClusterState); + } + persistedState.set(new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState())); + } else { + persistedState.set(this); } - return this; } private void initializeClusterState(ClusterName clusterName) throws IOException { @@ -122,7 +143,7 @@ private void initializeClusterState(ClusterName clusterName) throws IOException logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); } - public void applyClusterStateUpdaters() { + protected void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) { assert previousClusterState.nodes().getLocalNode() == null : "applyClusterStateUpdaters must only be called once"; assert transportService.getLocalNode() != null : "transport service is not yet started"; @@ -181,15 +202,18 @@ private boolean isMasterOrDataNode() { return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings); } + public PersistedState getPersistedState() { + final PersistedState persistedState = this.persistedState.get(); + assert persistedState != null : "not started"; + return persistedState; + } + public MetaData getMetaData() { return previousClusterState.metaData(); } - @Override - public void applyClusterState(ClusterChangedEvent event) { - if (isMasterOrDataNode() == false) { - return; - } + private void applyClusterState(ClusterChangedEvent event) { + assert isMasterOrDataNode(); if (event.state().blocks().disableStatePersistence()) { incrementalWrite = false; diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 4f892948066db..50cb468b48744 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -479,8 +479,7 @@ protected Node( ).collect(Collectors.toSet()); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); - final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService, - metaDataIndexUpgradeService, metaDataUpgrader, transportService, clusterService); + final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); @@ -691,14 +690,14 @@ public Node start() throws NodeValidationException { assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; injector.getInstance(PeerRecoverySourceService.class).start(); - final MetaData onDiskMetadata; + + // Load (and maybe upgrade) the metadata stored on disk + final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); + gatewayMetaState.start(transportService, clusterService, + injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class)); // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. - if (DiscoveryNode.isMasterNode(settings()) || DiscoveryNode.isDataNode(settings())) { - onDiskMetadata = injector.getInstance(GatewayMetaState.class).getMetaData(); - } else { - onDiskMetadata = MetaData.EMPTY_META_DATA; - } + final MetaData onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metaData(); assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream() diff --git a/server/src/test/java/org/elasticsearch/bwcompat/RecoveryWithUnsupportedIndicesIT.java b/server/src/test/java/org/elasticsearch/bwcompat/RecoveryWithUnsupportedIndicesIT.java index 27cc04449217b..b4574dd9389f2 100644 --- a/server/src/test/java/org/elasticsearch/bwcompat/RecoveryWithUnsupportedIndicesIT.java +++ b/server/src/test/java/org/elasticsearch/bwcompat/RecoveryWithUnsupportedIndicesIT.java @@ -20,8 +20,10 @@ import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TestUtil; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; +import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.test.ESIntegTestCase; import java.io.IOException; @@ -41,7 +43,7 @@ public class RecoveryWithUnsupportedIndicesIT extends ESIntegTestCase { /** * Return settings that could be used to start a node that has the given zipped home directory. */ - protected Settings prepareBackwardsDataDir(Path backwardsIndex) throws IOException { + private Settings prepareBackwardsDataDir(Path backwardsIndex) throws IOException { Path indexDir = createTempDir(); Path dataDir = indexDir.resolve("data"); try (InputStream stream = Files.newInputStream(backwardsIndex)) { @@ -85,7 +87,8 @@ public void testUpgradeStartClusterOn_2_4_5() throws Exception { logger.info("Checking static index {}", indexName); Settings nodeSettings = prepareBackwardsDataDir(getDataPath("/indices/bwc").resolve(indexName + ".zip")); - assertThat(expectThrows(Exception.class, () -> internalCluster().startNode(nodeSettings)) - .getCause().getCause().getMessage(), containsString("Format version is not supported")); + assertThat(ExceptionsHelper.unwrap( + expectThrows(Exception.class, () -> internalCluster().startNode(nodeSettings)), CorruptStateException.class).getMessage(), + containsString("Format version is not supported")); } } diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 13348cef75fd4..107cc7541fe08 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -63,9 +63,9 @@ public void tearDown() throws Exception { super.tearDown(); } - private MockGatewayMetaState newGateway() throws IOException { - MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode); - gateway.applyClusterStateUpdaters(); + private MockGatewayMetaState newGateway() { + final MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode); + gateway.start(); return gateway; } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index c9a1729ad116e..595328b309c03 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -703,8 +703,10 @@ class MockPersistedState implements CoordinationState.PersistedState { if (rarely()) { nodeEnvironment = newNodeEnvironment(); nodeEnvironments.add(nodeEnvironment); - delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode) - .getPersistedState(Settings.EMPTY, null); + final MockGatewayMetaState gatewayMetaState + = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode); + gatewayMetaState.start(); + delegate = gatewayMetaState.getPersistedState(); } else { nodeEnvironment = null; delegate = new InMemoryPersistedState(0L, @@ -734,8 +736,10 @@ class MockPersistedState implements CoordinationState.PersistedState { new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(), manifest.getIndexGenerations())); } - delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode) - .getPersistedState(Settings.EMPTY, null); + final MockGatewayMetaState gatewayMetaState + = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode); + gatewayMetaState.start(); + delegate = gatewayMetaState.getPersistedState(); } else { nodeEnvironment = null; BytesStreamOutput outStream = new BytesStreamOutput(); diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index 317a9d1b7ba0f..006f294883128 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -28,10 +28,6 @@ import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.transport.TransportService; -import java.io.IOException; - -import static org.mockito.Mockito.mock; - /** * {@link GatewayMetaState} constructor accepts a lot of arguments. * It's not always easy / convenient to construct these dependencies. @@ -42,10 +38,8 @@ public class MockGatewayMetaState extends GatewayMetaState { private final DiscoveryNode localNode; public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment, - NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) throws IOException { - super(settings, new MetaStateService(nodeEnvironment, xContentRegistry), - mock(MetaDataIndexUpgradeService.class), mock(MetaDataUpgrader.class), - mock(TransportService.class), mock(ClusterService.class)); + NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) { + super(settings, new MetaStateService(nodeEnvironment, xContentRegistry)); this.localNode = localNode; } @@ -55,8 +49,12 @@ protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeS } @Override - public void applyClusterStateUpdaters() { + public void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) { // Just set localNode here, not to mess with ClusterService and IndicesService mocking previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode); } + + public void start() { + start(null, null, null, null); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index d07e129c7bdc6..47d6f83fc87e1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -875,10 +875,16 @@ void resetClient() { } void startNode() { + boolean success = false; try { node.start(); + success = true; } catch (NodeValidationException e) { throw new RuntimeException(e); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(node); + } } }