Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Removed multiple paths from MetadataStateFormat (#72821)" #78475

Merged
merged 1 commit into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -39,10 +39,12 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* MetadataStateFormat is a base class to write checksummed
Expand Down Expand Up @@ -169,10 +171,10 @@ private static void performStateDirectoriesFsync(List<Tuple<Path, Directory>> st
/**
* Writes the given state to the given directories and performs cleanup of old state files if the write succeeds or
* newly created state file if write fails.
* See also {@link #write(Object, Path)} and {@link #cleanupOldFiles(long, Path)}.
* See also {@link #write(Object, Path...)} and {@link #cleanupOldFiles(long, Path[])}.
*/
public final long writeAndCleanup(final T state, final Path location) throws WriteStateException {
return write(state, true, location);
public final long writeAndCleanup(final T state, final Path... locations) throws WriteStateException {
return write(state, true, locations);
}

/**
Expand All @@ -187,26 +189,29 @@ public final long writeAndCleanup(final T state, final Path location) throws Wri
* If this method fails with an exception, it performs cleanup of newly created state file.
* But if this method succeeds, it does not perform cleanup of old state files.
* If this write succeeds, but some further write fails, you may want to rollback the transaction and keep old file around.
* After transaction is finished use {@link #cleanupOldFiles(long, Path)} for the clean-up.
* If this write is not a part of bigger transaction, consider using {@link #writeAndCleanup(Object, Path)} method instead.
* After transaction is finished use {@link #cleanupOldFiles(long, Path[])} for the clean-up.
* If this write is not a part of bigger transaction, consider using {@link #writeAndCleanup(Object, Path...)} method instead.
*
* @param state the state object to write
* @param location the data dir the state should be written into
* @param locations the locations where the state should be written to.
* @throws WriteStateException if some exception during writing state occurs. See also {@link WriteStateException#isDirty()}.
* @return generation of newly written state.
*/
public final long write(final T state, final Path location) throws WriteStateException {
return write(state, false, location);
public final long write(final T state, final Path... locations) throws WriteStateException {
return write(state, false, locations);
}

private long write(final T state, boolean cleanup, final Path location) throws WriteStateException {
if (location == null) {
private long write(final T state, boolean cleanup, final Path... locations) throws WriteStateException {
if (locations == null) {
throw new IllegalArgumentException("Locations must not be null");
}
if (locations.length <= 0) {
throw new IllegalArgumentException("One or more locations required");
}

final long oldGenerationId, newGenerationId;
try {
oldGenerationId = findMaxGenerationId(prefix, location);
oldGenerationId = findMaxGenerationId(prefix, locations);
newGenerationId = oldGenerationId + 1;
} catch (Exception e) {
throw new WriteStateException(false, "exception during looking up new generation id", e);
Expand All @@ -218,11 +223,13 @@ private long write(final T state, boolean cleanup, final Path location) throws W
List<Tuple<Path, Directory>> directories = new ArrayList<>();

try {
Path stateLocation = location.resolve(STATE_DIR_NAME);
try {
directories.add(new Tuple<>(location, newDirectory(stateLocation)));
} catch (IOException e) {
throw new WriteStateException(false, "failed to open state directory " + stateLocation, e);
for (Path location : locations) {
Path stateLocation = location.resolve(STATE_DIR_NAME);
try {
directories.add(new Tuple<>(location, newDirectory(stateLocation)));
} catch (IOException e) {
throw new WriteStateException(false, "failed to open state directory " + stateLocation, e);
}
}

writeStateToFirstLocation(state, directories.get(0).v1(), directories.get(0).v2(), tmpFileName);
Expand All @@ -231,7 +238,7 @@ private long write(final T state, boolean cleanup, final Path location) throws W
performStateDirectoriesFsync(directories);
} catch (WriteStateException e) {
if (cleanup) {
cleanupOldFiles(oldGenerationId, location);
cleanupOldFiles(oldGenerationId, locations);
}
throw e;
} finally {
Expand All @@ -242,7 +249,7 @@ private long write(final T state, boolean cleanup, final Path location) throws W
}

if (cleanup) {
cleanupOldFiles(newGenerationId, location);
cleanupOldFiles(newGenerationId, locations);
}

return newGenerationId;
Expand Down Expand Up @@ -309,61 +316,68 @@ protected Directory newDirectory(Path dir) throws IOException {
* Clean ups all state files not matching passed generation.
*
* @param currentGeneration state generation to keep.
* @param location data dir.
* @param locations state paths.
*/
public void cleanupOldFiles(final long currentGeneration, Path location) {
public void cleanupOldFiles(final long currentGeneration, Path... locations) {
final String fileNameToKeep = getStateFileName(currentGeneration);
logger.trace("cleanupOldFiles: cleaning up {}", location);
Path stateLocation = location.resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(stateLocation)) {
for (String file : stateDir.listAll()) {
if (file.startsWith(prefix) && file.equals(fileNameToKeep) == false) {
deleteFileIgnoreExceptions(stateLocation, stateDir, file);
for (Path location : locations) {
logger.trace("cleanupOldFiles: cleaning up {}", location);
Path stateLocation = location.resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(stateLocation)) {
for (String file : stateDir.listAll()) {
if (file.startsWith(prefix) && file.equals(fileNameToKeep) == false) {
deleteFileIgnoreExceptions(stateLocation, stateDir, file);
}
}
} catch (Exception e) {
logger.trace("clean up failed for state location {}", stateLocation);
}
} catch (Exception e) {
logger.trace("clean up failed for state location {}", stateLocation);
}
}

/**
* Finds state file with maximum id.
*
* @param prefix - filename prefix
* @param dataLocation - path to directory with state folder
* @param locations - paths to directories with state folder
* @return maximum id of state file or -1 if no such files are found
* @throws IOException if IOException occurs
*/
private long findMaxGenerationId(final String prefix, Path dataLocation) throws IOException {
private long findMaxGenerationId(final String prefix, Path... locations) throws IOException {
long maxId = -1;
final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
if (Files.exists(resolve)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(resolve, prefix + "*")) {
for (Path stateFile : stream) {
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
if (matcher.matches()) {
final long id = Long.parseLong(matcher.group(1));
maxId = Math.max(maxId, id);
for (Path dataLocation : locations) {
final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
if (Files.exists(resolve)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(resolve, prefix + "*")) {
for (Path stateFile : stream) {
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
if (matcher.matches()) {
final long id = Long.parseLong(matcher.group(1));
maxId = Math.max(maxId, id);
}
}
}
}
}
return maxId;
}

private Path findStateFilesByGeneration(final long generation, Path dataLocation) {
private List<Path> findStateFilesByGeneration(final long generation, Path... locations) {
List<Path> files = new ArrayList<>();
if (generation == -1) {
return null;
return files;
}

final String fileName = getStateFileName(generation);
final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName);
if (Files.exists(stateFilePath)) {
logger.trace("found state file: {}", stateFilePath);
return stateFilePath;
for (Path dataLocation : locations) {
final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName);
if (Files.exists(stateFilePath)) {
logger.trace("found state file: {}", stateFilePath);
files.add(stateFilePath);
}
}

return null;
return files;
}

public String getStateFileName(long generation) {
Expand All @@ -376,40 +390,52 @@ public String getStateFileName(long generation) {
*
* @param logger a logger instance.
* @param generation the generation to be loaded.
* @param dataLocation the data dir to read from
* @param dataLocations the data-locations to try.
* @return the state of asked generation or <code>null</code> if no state was found.
*/
public T loadGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, long generation, Path dataLocation) {
Path stateFile = findStateFilesByGeneration(generation, dataLocation);
public T loadGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, long generation, Path... dataLocations) {
List<Path> stateFiles = findStateFilesByGeneration(generation, dataLocations);

if (stateFile != null) {
final List<Throwable> exceptions = new ArrayList<>();
for (Path stateFile : stateFiles) {
try {
T state = read(namedXContentRegistry, stateFile);
logger.trace("generation id [{}] read from [{}]", generation, stateFile.getFileName());
return state;
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("{}: failed to read [{}], ignoring...", stateFile, prefix), e);
throw new ElasticsearchException("failed to read " + stateFile, e);
exceptions.add(new IOException("failed to read " + stateFile, e));
logger.debug(() -> new ParameterizedMessage(
"{}: failed to read [{}], ignoring...", stateFile, prefix), e);
}
}
// if we reach this something went wrong
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
if (stateFiles.size() > 0) {
// We have some state files but none of them gave us a usable state
throw new IllegalStateException("Could not find a state file to recover from among " +
stateFiles.stream().map(Object::toString).collect(Collectors.joining(", ")));
}
return null;
}

/**
* Tries to load the latest state from the given data-locations.
*
* @param logger a logger instance.
* @param dataLocation the data dir to read from
* @param dataLocations the data-locations to try.
* @return tuple of the latest state and generation. (null, -1) if no state is found.
*/
public Tuple<T, Long> loadLatestStateWithGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, Path dataLocation)
public Tuple<T, Long> loadLatestStateWithGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations)
throws IOException {
long generation = findMaxGenerationId(prefix, dataLocation);
T state = loadGeneration(logger, namedXContentRegistry, generation, dataLocation);
long generation = findMaxGenerationId(prefix, dataLocations);
T state = loadGeneration(logger, namedXContentRegistry, generation, dataLocations);

if (generation > -1 && state == null) {
throw new IllegalStateException("unable to find state files with generation id " + generation +
" returned by findMaxGenerationId function, in data folder [" + dataLocation + "], concurrent writes?");
" returned by findMaxGenerationId function, in data folders [" +
Arrays.stream(dataLocations).
map(Object::toString).collect(Collectors.joining(", ")) +
"], concurrent writes?");
}
return Tuple.tuple(state, generation);
}
Expand All @@ -418,19 +444,24 @@ public Tuple<T, Long> loadLatestStateWithGeneration(Logger logger, NamedXContent
* Tries to load the latest state from the given data-locations.
*
* @param logger a logger instance.
* @param dataLocation the data dir to read from
* @param dataLocations the data-locations to try.
* @return the latest state or <code>null</code> if no state was found.
*/
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path dataLocation) throws IOException {
return loadLatestStateWithGeneration(logger, namedXContentRegistry, dataLocation).v1();
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws
IOException {
return loadLatestStateWithGeneration(logger, namedXContentRegistry, dataLocations).v1();
}

/**
* Deletes all meta state directories recursively for the given data locations
* @param dataLocation the data dir to delete state from
* @param dataLocations the data location to delete
*/
public static void deleteMetaState(Path dataLocation) throws IOException {
IOUtils.rm(dataLocation.resolve(STATE_DIR_NAME));
public static void deleteMetaState(Path... dataLocations) throws IOException {
Path[] stateDirectories = new Path[dataLocations.length];
for (int i = 0; i < dataLocations.length; i++) {
stateDirectories[i] = dataLocations[i].resolve(STATE_DIR_NAME);
}
IOUtils.rm(stateDirectories);
}

public String getPrefix() {
Expand Down
Loading