From 8e3399d4804fc53b80e95e3ffb5714512901e771 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 16 Jun 2021 11:42:29 +0100 Subject: [PATCH] Run CheckIndex on metadata index before loading (#73239) (#74173) The metadata index is small and important and only read at startup. Today we rely on Lucene to spot if any of its components is corrupt, but Lucene does not necesssarily verify all checksums in order to catch a corruption. With this commit we run `CheckIndex` on the metadata index first, and fail on startup if a corruption is detected. Closes #29358 --- .../gateway/PersistedClusterStateService.java | 88 +++++++++++++------ .../GatewayMetaStatePersistedStateTests.java | 6 +- .../PersistedClusterStateServiceTests.java | 46 ++++++++-- 3 files changed, 106 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 23d09f3af54a8..69468ebb7e11c 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -16,6 +16,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; +import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.IndexWriter; @@ -46,6 +47,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.common.logging.Loggers; @@ -70,6 +72,8 @@ import java.io.Closeable; import java.io.IOError; import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -306,6 +310,14 @@ public static void overrideVersion(Version newVersion, Path... dataPaths) throws * Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found. */ public OnDiskState loadBestOnDiskState() throws IOException { + return loadBestOnDiskState(true); + } + + /** + * Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found. + * @param checkClean whether to check the index for corruption before loading, only for tests + */ + OnDiskState loadBestOnDiskState(boolean checkClean) throws IOException { String committedClusterUuid = null; Path committedClusterUuidPath = null; OnDiskState bestOnDiskState = OnDiskState.NO_ON_DISK_STATE; @@ -317,39 +329,63 @@ public OnDiskState loadBestOnDiskState() throws IOException { for (final Path dataPath : dataPaths) { final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME); if (Files.exists(indexPath)) { - try (Directory directory = createDirectory(indexPath); - DirectoryReader directoryReader = DirectoryReader.open(directory)) { - final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader); + try (Directory directory = createDirectory(indexPath)) { + if (checkClean) { + try (BytesStreamOutput outputStream = new BytesStreamOutput()) { + final boolean isClean; + try (PrintStream printStream = new PrintStream(outputStream, true, StandardCharsets.UTF_8.name()); + CheckIndex checkIndex = new CheckIndex(directory)) { + checkIndex.setInfoStream(printStream); + checkIndex.setChecksumsOnly(true); + isClean = checkIndex.checkIndex().clean; + } - if (nodeId.equals(onDiskState.nodeId) == false) { - throw new IllegalStateException("unexpected node ID in metadata, found [" + onDiskState.nodeId + - "] in [" + dataPath + "] but expected [" + nodeId + "]"); + if (isClean == false) { + if (logger.isErrorEnabled()) { + for (final String line : outputStream.bytes().utf8ToString().split("\\r?\\n")) { + logger.error("checkIndex: {}", line); + } + } + throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath + + "] has been changed by an external force after it was last written by Elasticsearch and is now unreadable"); + } } + } - if (onDiskState.metadata.clusterUUIDCommitted()) { - if (committedClusterUuid == null) { - committedClusterUuid = onDiskState.metadata.clusterUUID(); - committedClusterUuidPath = dataPath; - } else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) { - throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid + - "] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in [" - + dataPath + "]"); + + try (DirectoryReader directoryReader = DirectoryReader.open(directory)) { + final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader); + + if (nodeId.equals(onDiskState.nodeId) == false) { + throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath + + "] belongs to a node with ID [" + onDiskState.nodeId + "] but this node's ID is [" + nodeId + "]"); } - } - if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) { - maxCurrentTermOnDiskState = onDiskState; - } + if (onDiskState.metadata.clusterUUIDCommitted()) { + if (committedClusterUuid == null) { + committedClusterUuid = onDiskState.metadata.clusterUUID(); + committedClusterUuidPath = dataPath; + } else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) { + throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid + + "] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in [" + + dataPath + "]"); + } + } + + if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) { + maxCurrentTermOnDiskState = onDiskState; + } - long acceptedTerm = onDiskState.metadata.coordinationMetadata().term(); - long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term(); - if (bestOnDiskState.empty() - || acceptedTerm > maxAcceptedTerm - || (acceptedTerm == maxAcceptedTerm + long acceptedTerm = onDiskState.metadata.coordinationMetadata().term(); + long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term(); + if (bestOnDiskState.empty() + || acceptedTerm > maxAcceptedTerm + || (acceptedTerm == maxAcceptedTerm && (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion - || (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion) - && onDiskState.currentTerm > bestOnDiskState.currentTerm))) { - bestOnDiskState = onDiskState; + || (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion) + && onDiskState.currentTerm > bestOnDiskState.currentTerm))) { + bestOnDiskState = onDiskState; + } } } catch (IndexNotFoundException e) { logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 50c1bd89c2e79..e75a67c0ee278 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -371,7 +371,7 @@ public void testDataOnlyNodePersistence() throws Exception { assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(), not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration()))); CoordinationMetadata persistedCoordinationMetadata = - persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata(); + persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata(); assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(), equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration)); assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(), @@ -387,12 +387,12 @@ public void testDataOnlyNodePersistence() throws Exception { .clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build(); assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState()); - persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata(); + persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata(); assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(), equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration)); assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(), equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration)); - assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted()); + assertTrue(persistedClusterStateService.loadBestOnDiskState(false).metadata.clusterUUIDCommitted()); // generate a series of updates and check if batching works final String indexName = randomAlphaOfLength(10); diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index b37f606b41fbf..553d88b5f99a6 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; +import org.apache.lucene.mockfile.ExtrasFS; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; @@ -40,12 +41,15 @@ import org.elasticsearch.gateway.PersistedClusterStateService.Writer; import org.elasticsearch.index.Index; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOError; import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -56,12 +60,16 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import static org.apache.lucene.index.IndexWriter.WRITE_LOCK_NAME; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; public class PersistedClusterStateServiceTests extends ESTestCase { @@ -79,7 +87,7 @@ public void testPersistsAndReloadsTerm() throws IOException { assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(0L)); try (Writer writer = persistedClusterStateService.createWriter()) { writer.writeFullStateAndCommit(newTerm, ClusterState.EMPTY_STATE); - assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm)); + assertThat(persistedClusterStateService.loadBestOnDiskState(false).currentTerm, equalTo(newTerm)); } assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm)); @@ -218,8 +226,12 @@ public void testFailsOnMismatchedNodeIds() throws IOException { .toArray(Path[]::new), nodeIds[0], xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L ).loadBestOnDiskState()).getMessage(); - assertThat(message, - allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1]))); + assertThat(message, allOf( + containsString("the index containing the cluster metadata under the data path"), + containsString("belongs to a node with ID"), + containsString("but this node's ID is"), + containsString(nodeIds[0]), + containsString(nodeIds[1]))); assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2), Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString()))); } @@ -315,7 +327,7 @@ public void testFailsIfFreshestStateIsInStaleTerm() throws IOException { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) { try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) { final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService(nodeEnvironment) - .loadBestOnDiskState(); + .loadBestOnDiskState(false); final ClusterState clusterState = clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata); writeState(writer, onDiskState.currentTerm, ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.metadata()).version(2) @@ -851,6 +863,30 @@ public void testSlowLogging() throws IOException, IllegalAccessException { } } + public void testFailsIfCorrupt() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE); + } + + try (DirectoryStream directoryStream = Files.newDirectoryStream(nodeEnvironment.nodeDataPaths()[0].resolve("_state"))) { + CorruptionUtils.corruptFile(random(), randomFrom(StreamSupport + .stream(directoryStream.spliterator(), false) + .filter(p -> { + final String filename = p.getFileName().toString(); + return ExtrasFS.isExtra(filename) == false && filename.equals(WRITE_LOCK_NAME) == false; + }) + .collect(Collectors.toList()))); + } + + assertThat(expectThrows(IllegalStateException.class, persistedClusterStateService::loadBestOnDiskState).getMessage(), allOf( + startsWith("the index containing the cluster metadata under the data path ["), + endsWith("] has been changed by an external force after it was last written by Elasticsearch and is now unreadable"))); + } + } + private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState, PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, IOException { @@ -896,7 +932,7 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException } private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException { - final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(); + final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(false); return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata); }