diff --git a/core/src/main/java/org/apache/lucene/store/SimpleReadOnlyFSDirectory.java b/core/src/main/java/org/apache/lucene/store/SimpleReadOnlyFSDirectory.java new file mode 100644 index 0000000000000..28d82024b93b3 --- /dev/null +++ b/core/src/main/java/org/apache/lucene/store/SimpleReadOnlyFSDirectory.java @@ -0,0 +1,217 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lucene.store; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collection; + +/** + * Similar to {@link SimpleFSDirectory} but only supports reading files from a folder and does not create the directory if it does not + * exist. This is useful if we don't want to resurrect a folder that was just deleted before creating the {@link Directory}. + * + * Only supports the {@link Directory#openInput(String,IOContext)} method. + */ +public class SimpleReadOnlyFSDirectory extends BaseDirectory { + + protected final Path directory; // The underlying filesystem directory + + public SimpleReadOnlyFSDirectory(Path path) throws IOException { + super(FSLockFactory.getDefault()); + directory = path.toRealPath(); + } + + @Override + public String[] listAll() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteFile(String name) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long fileLength(String name) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void sync(Collection names) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void renameFile(String source, String dest) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + ensureOpen(); + Path path = directory.resolve(name); + SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ); + return new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + path + "\")", channel, context); + } + + @Override + public void close() throws IOException { + isOpen = false; + } + + /** + * Copy of SimpleFSDirectory.SimpleFSIndexInput which is package-private class. + * + * Reads bytes with {@link SeekableByteChannel#read(ByteBuffer)} + */ + static final class SimpleFSIndexInput extends BufferedIndexInput { + /** + * The maximum chunk size for reads of 16384 bytes. + */ + private static final int CHUNK_SIZE = 16384; + + /** the channel we will read from */ + protected final SeekableByteChannel channel; + /** is this instance a clone and hence does not own the file to close it */ + boolean isClone = false; + /** start offset: non-zero in the slice case */ + protected final long off; + /** end offset (start+length) */ + protected final long end; + + private ByteBuffer byteBuf; // wraps the buffer for NIO + + public SimpleFSIndexInput(String resourceDesc, SeekableByteChannel channel, IOContext context) throws IOException { + super(resourceDesc, context); + this.channel = channel; + this.off = 0L; + this.end = channel.size(); + } + + public SimpleFSIndexInput(String resourceDesc, SeekableByteChannel channel, long off, long length, int bufferSize) { + super(resourceDesc, bufferSize); + this.channel = channel; + this.off = off; + this.end = off + length; + this.isClone = true; + } + + @Override + public void close() throws IOException { + if (!isClone) { + channel.close(); + } + } + + @Override + public SimpleFSIndexInput clone() { + SimpleFSIndexInput clone = (SimpleFSIndexInput)super.clone(); + clone.isClone = true; + return clone; + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + if (offset < 0 || length < 0 || offset + length > this.length()) { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + ",length=" + + length + ",fileLength=" + this.length() + ": " + this); + } + return new SimpleFSIndexInput(getFullSliceDescription(sliceDescription), channel, off + offset, length, getBufferSize()); + } + + @Override + public final long length() { + return end - off; + } + + @Override + protected void newBuffer(byte[] newBuffer) { + super.newBuffer(newBuffer); + byteBuf = ByteBuffer.wrap(newBuffer); + } + + @Override + protected void readInternal(byte[] b, int offset, int len) throws IOException { + final ByteBuffer bb; + + // Determine the ByteBuffer we should use + if (b == buffer) { + // Use our own pre-wrapped byteBuf: + assert byteBuf != null; + bb = byteBuf; + byteBuf.clear().position(offset); + } else { + bb = ByteBuffer.wrap(b, offset, len); + } + + synchronized(channel) { + long pos = getFilePointer() + off; + + if (pos + len > end) { + throw new EOFException("read past EOF: " + this); + } + + try { + channel.position(pos); + + int readLength = len; + while (readLength > 0) { + final int toRead = Math.min(CHUNK_SIZE, readLength); + bb.limit(bb.position() + toRead); + assert bb.remaining() == toRead; + final int i = channel.read(bb); + if (i < 0) { // be defensive here, even though we checked before hand, something could have changed + throw new EOFException("read past EOF: " + this + " off: " + offset + " len: " + len + " pos: " + pos + + " chunkLen: " + toRead + " end: " + end); + } + assert i > 0 : "SeekableByteChannel.read with non zero-length bb.remaining() must always read at least one byte " + + "(Channel is in blocking mode, see spec of ReadableByteChannel)"; + pos += i; + readLength -= i; + } + assert readLength == 0; + } catch (IOException ioe) { + throw new IOException(ioe.getMessage() + ": " + this, ioe); + } + } + } + + @Override + protected void seekInternal(long pos) throws IOException { + if (pos > length()) { + throw new EOFException("read past EOF: pos=" + pos + " vs length=" + length() + ": " + this); + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/core/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index cc6a48b855b3b..4960268614c66 100644 --- a/core/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/core/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -26,7 +26,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.OutputStreamIndexOutput; -import org.apache.lucene.store.SimpleFSDirectory; +import org.apache.lucene.store.SimpleReadOnlyFSDirectory; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesArray; @@ -180,7 +180,7 @@ protected XContentBuilder newXContentBuilder(XContentType type, OutputStream str * the state. */ public final T read(Path file) throws IOException { - try (Directory dir = newDirectory(file.getParent())) { + try (Directory dir = newReadOnlyDirectory(file.getParent())) { try (final IndexInput indexInput = dir.openInput(file.getFileName().toString(), IOContext.DEFAULT)) { // We checksum the entire file before we even go and parse it. If it's corrupted we barf right here. CodecUtil.checksumEntireFile(indexInput); @@ -206,8 +206,8 @@ public final T read(Path file) throws IOException { } } - protected Directory newDirectory(Path dir) throws IOException { - return new SimpleFSDirectory(dir); + protected Directory newReadOnlyDirectory(Path dir) throws IOException { + return new SimpleReadOnlyFSDirectory(dir); } private void cleanupOldFiles(final String prefix, final String currentStateFile, Path[] locations) throws IOException { @@ -287,8 +287,9 @@ public T loadLatestState(ESLogger logger, Path... dataLocations) throws IOExcep } } } - final List exceptions = new ArrayList<>(); - T state = null; + if (files.isEmpty()) { + return null; + } // NOTE: we might have multiple version of the latest state if there are multiple data dirs.. for this case // we iterate only over the ones with the max version. If we have at least one state file that uses the // new format (ie. legacy == false) then we know that the latest version state ought to use this new format. @@ -299,10 +300,13 @@ public T loadLatestState(ESLogger logger, Path... dataLocations) throws IOExcep .filter(new StateIdAndLegacyPredicate(maxStateId, maxStateIdIsLegacy)) .collect(Collectors.toCollection(ArrayList::new)); + final List exceptions = new ArrayList<>(); + int numFilesDeleted = 0; // number of files that were deleted after being selected above but before reading file content below for (PathAndStateId pathAndStateId : pathAndStateIds) { try { final Path stateFile = pathAndStateId.file; final long id = pathAndStateId.id; + final T state; if (pathAndStateId.legacy) { // read the legacy format -- plain XContent final byte[] data = Files.readAllBytes(stateFile); if (data.length == 0) { @@ -320,6 +324,10 @@ public T loadLatestState(ESLogger logger, Path... dataLocations) throws IOExcep logger.trace("state id [{}] read from [{}]", id, stateFile.getFileName()); } return state; + } catch (NoSuchFileException | FileNotFoundException ex) { + // was just deleted on us as we were reading this, ignore + numFilesDeleted++; + logger.debug("{}: failed to read [{}], ignoring...", ex, pathAndStateId.file.toAbsolutePath(), prefix); } catch (Exception e) { exceptions.add(new IOException("failed to read " + pathAndStateId.toString(), e)); logger.debug("{}: failed to read [{}], ignoring...", e, pathAndStateId.file.toAbsolutePath(), prefix); @@ -327,11 +335,11 @@ public T loadLatestState(ESLogger logger, Path... dataLocations) throws IOExcep } // if we reach this something went wrong ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); - if (files.size() > 0) { + if (files.size() > numFilesDeleted) { // We have some state files but none of them gave us a usable state throw new IllegalStateException("Could not find a state file to recover from among " + files); } - return state; + return null; } /** diff --git a/core/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java b/core/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java index 4cf505d839a75..a4fd6ecd40af0 100644 --- a/core/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.SimpleFSDirectory; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -59,6 +60,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.equalTo; @@ -136,6 +138,35 @@ public void testReadWriteState() throws IOException { } } + public void testLoadLatestStateDoesNotResurrectsDeletedFolder() throws URISyntaxException, IOException, InterruptedException { + MetaDataStateFormat format = metaDataFormat(randomFrom(XContentType.values())); + Path path = createTempDir(); + Files.createDirectories(path.resolve(MetaDataStateFormat.STATE_DIR_NAME)); + MetaData meta = randomMeta(); + format.write(meta, path); + assertEquals(meta.clusterUUID(), format.loadLatestState(logger, path).clusterUUID()); + AtomicBoolean successfulDeletion = new AtomicBoolean(); + Thread thread = new Thread() { + public void run() { + try { + IOUtils.rm(path); + successfulDeletion.set(true); + } catch (IOException e) { + logger.debug("could not delete {}", e, path); + } + } + }; + thread.start(); + MetaData loadedMeta = format.loadLatestState(logger, path); + if (loadedMeta != null) { + assertEquals(loadedMeta.clusterUUID(), meta.clusterUUID()); + } + thread.join(); + if (successfulDeletion.get()) { + assertFalse(Files.isDirectory(path)); + } + } + public void testVersionMismatch() throws IOException { Path[] dirs = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < dirs.length; i++) { @@ -360,8 +391,9 @@ public DummyState fromXContent(XContentParser parser) throws IOException { } @Override - protected Directory newDirectory(Path dir) throws IOException { - MockDirectoryWrapper mock = new MockDirectoryWrapper(random(), super.newDirectory(dir)); + protected Directory newReadOnlyDirectory(Path dir) throws IOException { + MockDirectoryWrapper mock = new MockDirectoryWrapper(random(), super.newReadOnlyDirectory(dir)); + mock.setCheckIndexOnClose(false); closeAfterSuite(mock); return mock; }