Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run CheckIndex on metadata index before loading #73239

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -46,6 +47,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.Loggers;
Expand All @@ -70,6 +72,8 @@
import java.io.Closeable;
import java.io.IOError;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -290,17 +294,46 @@ public static void overrideVersion(Version newVersion, Path... dataPaths) throws
* Loads the available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
*/
public OnDiskState loadOnDiskState() throws IOException {
return loadOnDiskState(true);
}

/**
* Loads the available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
* @param checkClean whether to check the index for corruption before loading, only for tests
*/
OnDiskState loadOnDiskState(boolean checkClean) throws IOException {
OnDiskState onDiskState = OnDiskState.NO_ON_DISK_STATE;

final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
if (Files.exists(indexPath)) {
try (Directory directory = createDirectory(indexPath);
DirectoryReader directoryReader = DirectoryReader.open(directory)) {
onDiskState = loadOnDiskState(dataPath, directoryReader);
try (Directory directory = createDirectory(indexPath)) {
if (checkClean) {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
final boolean isClean;
try (PrintStream printStream = new PrintStream(outputStream, true, StandardCharsets.UTF_8);
CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.setInfoStream(printStream);
checkIndex.setChecksumsOnly(true);
isClean = checkIndex.checkIndex().clean;
}

if (nodeId.equals(onDiskState.nodeId) == false) {
throw new IllegalStateException("unexpected node ID in metadata, found [" + onDiskState.nodeId +
"] in [" + dataPath + "] but expected [" + nodeId + "]");
if (isClean == false) {
if (logger.isErrorEnabled()) {
outputStream.bytes().utf8ToString().lines().forEach(l -> logger.error("checkIndex: {}", l));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't great: we materialise all the bytes, then convert them to a string, and then split them into lines. A streaming implementation is definitely possible but doesn't seem worth the effort here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

}
throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
"] has been changed by an external force after it was last written by Elasticsearch and is now unreadable");
}
}
}

try (DirectoryReader directoryReader = DirectoryReader.open(directory)) {
onDiskState = loadOnDiskState(dataPath, directoryReader);

if (nodeId.equals(onDiskState.nodeId) == false) {
throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
"] belongs to a node with ID [" + onDiskState.nodeId + "] but this node's ID is [" + nodeId + "]");
}
}
} catch (IndexNotFoundException e) {
logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public void testDataOnlyNodePersistence() throws Exception {
assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
CoordinationMetadata persistedCoordinationMetadata =
persistedClusterStateService.loadOnDiskState().metadata.coordinationMetadata();
persistedClusterStateService.loadOnDiskState(false).metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
Expand All @@ -386,12 +386,12 @@ public void testDataOnlyNodePersistence() throws Exception {
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();

assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
persistedCoordinationMetadata = persistedClusterStateService.loadOnDiskState().metadata.coordinationMetadata();
persistedCoordinationMetadata = persistedClusterStateService.loadOnDiskState(false).metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertTrue(persistedClusterStateService.loadOnDiskState().metadata.clusterUUIDCommitted());
assertTrue(persistedClusterStateService.loadOnDiskState(false).metadata.clusterUUIDCommitted());

// generate a series of updates and check if batching works
final String indexName = randomAlphaOfLength(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.mockfile.ExtrasFS;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand All @@ -38,24 +39,32 @@
import org.elasticsearch.gateway.PersistedClusterStateService.Writer;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.io.IOError;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.lucene.index.IndexWriter.WRITE_LOCK_NAME;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class PersistedClusterStateServiceTests extends ESTestCase {

Expand All @@ -73,7 +82,7 @@ public void testPersistsAndReloadsTerm() throws IOException {
assertThat(persistedClusterStateService.loadOnDiskState().currentTerm, equalTo(0L));
try (Writer writer = persistedClusterStateService.createWriter()) {
writer.writeFullStateAndCommit(newTerm, ClusterState.EMPTY_STATE);
assertThat(persistedClusterStateService.loadOnDiskState().currentTerm, equalTo(newTerm));
assertThat(persistedClusterStateService.loadOnDiskState(false).currentTerm, equalTo(newTerm));
}

assertThat(persistedClusterStateService.loadOnDiskState().currentTerm, equalTo(newTerm));
Expand Down Expand Up @@ -638,6 +647,30 @@ public void testSlowLogging() throws IOException, IllegalAccessException {
}
}

public void testFailsIfCorrupt() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createTempDir())) {
final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment);

try (Writer writer = persistedClusterStateService.createWriter()) {
writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE);
}

try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(nodeEnvironment.nodeDataPath().resolve("_state"))) {
CorruptionUtils.corruptFile(random(), randomFrom(StreamSupport
.stream(directoryStream.spliterator(), false)
.filter(p -> {
final String filename = p.getFileName().toString();
return ExtrasFS.isExtra(filename) == false && filename.equals(WRITE_LOCK_NAME) == false;
})
.collect(Collectors.toList())));
}

assertThat(expectThrows(IllegalStateException.class, persistedClusterStateService::loadOnDiskState).getMessage(), allOf(
startsWith("the index containing the cluster metadata under the data path ["),
endsWith("] has been changed by an external force after it was last written by Elasticsearch and is now unreadable")));
}
}

private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState,
PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation)
throws IllegalAccessException, IOException {
Expand Down Expand Up @@ -675,7 +708,7 @@ private NodeEnvironment newNodeEnvironment(Path dataPath) throws IOException {
}

private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException {
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadOnDiskState();
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadOnDiskState(false);
return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);
}

Expand Down