Skip to content

Commit

Permalink
Run CheckIndex on metadata index before loading (elastic#73239)
Browse files Browse the repository at this point in the history
The metadata index is small and important and only read at startup.
Today we rely on Lucene to spot if any of its components is corrupt, but
Lucene does not necesssarily verify all checksums in order to catch a
corruption. With this commit we run `CheckIndex` on the metadata index
first, and fail on startup if a corruption is detected.

Closes elastic#29358
  • Loading branch information
DaveCTurner committed Jun 16, 2021
1 parent 84956bf commit 191c42a
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -46,6 +47,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.logging.Loggers;
Expand All @@ -70,6 +72,8 @@
import java.io.Closeable;
import java.io.IOError;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -306,6 +310,14 @@ public static void overrideVersion(Version newVersion, Path... dataPaths) throws
* Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
*/
public OnDiskState loadBestOnDiskState() throws IOException {
return loadBestOnDiskState(true);
}

/**
* Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
* @param checkClean whether to check the index for corruption before loading, only for tests
*/
OnDiskState loadBestOnDiskState(boolean checkClean) throws IOException {
String committedClusterUuid = null;
Path committedClusterUuidPath = null;
OnDiskState bestOnDiskState = OnDiskState.NO_ON_DISK_STATE;
Expand All @@ -317,39 +329,63 @@ public OnDiskState loadBestOnDiskState() throws IOException {
for (final Path dataPath : dataPaths) {
final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
if (Files.exists(indexPath)) {
try (Directory directory = createDirectory(indexPath);
DirectoryReader directoryReader = DirectoryReader.open(directory)) {
final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader);
try (Directory directory = createDirectory(indexPath)) {
if (checkClean) {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
final boolean isClean;
try (PrintStream printStream = new PrintStream(outputStream, true, StandardCharsets.UTF_8.name());
CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.setInfoStream(printStream);
checkIndex.setChecksumsOnly(true);
isClean = checkIndex.checkIndex().clean;
}

if (nodeId.equals(onDiskState.nodeId) == false) {
throw new IllegalStateException("unexpected node ID in metadata, found [" + onDiskState.nodeId +
"] in [" + dataPath + "] but expected [" + nodeId + "]");
if (isClean == false) {
if (logger.isErrorEnabled()) {
for (final String line : outputStream.bytes().utf8ToString().split("\\r?\\n")) {
logger.error("checkIndex: {}", line);
}
}
throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
"] has been changed by an external force after it was last written by Elasticsearch and is now unreadable");
}
}
}

if (onDiskState.metadata.clusterUUIDCommitted()) {
if (committedClusterUuid == null) {
committedClusterUuid = onDiskState.metadata.clusterUUID();
committedClusterUuidPath = dataPath;
} else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) {
throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid +
"] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in ["
+ dataPath + "]");

try (DirectoryReader directoryReader = DirectoryReader.open(directory)) {
final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader);

if (nodeId.equals(onDiskState.nodeId) == false) {
throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
"] belongs to a node with ID [" + onDiskState.nodeId + "] but this node's ID is [" + nodeId + "]");
}
}

if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) {
maxCurrentTermOnDiskState = onDiskState;
}
if (onDiskState.metadata.clusterUUIDCommitted()) {
if (committedClusterUuid == null) {
committedClusterUuid = onDiskState.metadata.clusterUUID();
committedClusterUuidPath = dataPath;
} else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) {
throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid +
"] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in ["
+ dataPath + "]");
}
}

if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) {
maxCurrentTermOnDiskState = onDiskState;
}

long acceptedTerm = onDiskState.metadata.coordinationMetadata().term();
long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term();
if (bestOnDiskState.empty()
|| acceptedTerm > maxAcceptedTerm
|| (acceptedTerm == maxAcceptedTerm
long acceptedTerm = onDiskState.metadata.coordinationMetadata().term();
long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term();
if (bestOnDiskState.empty()
|| acceptedTerm > maxAcceptedTerm
|| (acceptedTerm == maxAcceptedTerm
&& (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion
|| (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
&& onDiskState.currentTerm > bestOnDiskState.currentTerm))) {
bestOnDiskState = onDiskState;
|| (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
&& onDiskState.currentTerm > bestOnDiskState.currentTerm))) {
bestOnDiskState = onDiskState;
}
}
} catch (IndexNotFoundException e) {
logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void testDataOnlyNodePersistence() throws Exception {
assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
CoordinationMetadata persistedCoordinationMetadata =
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
Expand All @@ -387,12 +387,12 @@ public void testDataOnlyNodePersistence() throws Exception {
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();

assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());
assertTrue(persistedClusterStateService.loadBestOnDiskState(false).metadata.clusterUUIDCommitted());

// generate a series of updates and check if batching works
final String indexName = randomAlphaOfLength(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.mockfile.ExtrasFS;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -40,12 +41,15 @@
import org.elasticsearch.gateway.PersistedClusterStateService.Writer;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.io.IOError;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -56,12 +60,16 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.lucene.index.IndexWriter.WRITE_LOCK_NAME;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class PersistedClusterStateServiceTests extends ESTestCase {

Expand All @@ -79,7 +87,7 @@ public void testPersistsAndReloadsTerm() throws IOException {
assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(0L));
try (Writer writer = persistedClusterStateService.createWriter()) {
writer.writeFullStateAndCommit(newTerm, ClusterState.EMPTY_STATE);
assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm));
assertThat(persistedClusterStateService.loadBestOnDiskState(false).currentTerm, equalTo(newTerm));
}

assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm));
Expand Down Expand Up @@ -218,8 +226,12 @@ public void testFailsOnMismatchedNodeIds() throws IOException {
.toArray(Path[]::new), nodeIds[0], xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L
).loadBestOnDiskState()).getMessage();
assertThat(message,
allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1])));
assertThat(message, allOf(
containsString("the index containing the cluster metadata under the data path"),
containsString("belongs to a node with ID"),
containsString("but this node's ID is"),
containsString(nodeIds[0]),
containsString(nodeIds[1])));
assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2),
Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString())));
}
Expand Down Expand Up @@ -315,7 +327,7 @@ public void testFailsIfFreshestStateIsInStaleTerm() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) {
try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) {
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService(nodeEnvironment)
.loadBestOnDiskState();
.loadBestOnDiskState(false);
final ClusterState clusterState = clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);
writeState(writer, onDiskState.currentTerm, ClusterState.builder(clusterState)
.metadata(Metadata.builder(clusterState.metadata()).version(2)
Expand Down Expand Up @@ -851,6 +863,30 @@ public void testSlowLogging() throws IOException, IllegalAccessException {
}
}

public void testFailsIfCorrupt() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment);

try (Writer writer = persistedClusterStateService.createWriter()) {
writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE);
}

try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(nodeEnvironment.nodeDataPaths()[0].resolve("_state"))) {
CorruptionUtils.corruptFile(random(), randomFrom(StreamSupport
.stream(directoryStream.spliterator(), false)
.filter(p -> {
final String filename = p.getFileName().toString();
return ExtrasFS.isExtra(filename) == false && filename.equals(WRITE_LOCK_NAME) == false;
})
.collect(Collectors.toList())));
}

assertThat(expectThrows(IllegalStateException.class, persistedClusterStateService::loadBestOnDiskState).getMessage(), allOf(
startsWith("the index containing the cluster metadata under the data path ["),
endsWith("] has been changed by an external force after it was last written by Elasticsearch and is now unreadable")));
}
}

private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState,
PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation)
throws IllegalAccessException, IOException {
Expand Down Expand Up @@ -896,7 +932,7 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException
}

private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException {
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState();
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(false);
return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);
}

Expand Down

0 comments on commit 191c42a

Please sign in to comment.