Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge remote translog basis the latest metadata for remote-backed indexes #6086

Merged
merged 16 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
Expand All @@ -26,12 +29,15 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

/**
* A Translog implementation which syncs local FS with a remote store
Expand All @@ -51,6 +57,12 @@ public class RemoteFsTranslog extends Translog {

private volatile long minSeqNoToKeep;

// min generation referred by last uploaded translog
private volatile long minRemoteGenReferenced;

// clean up translog folder uploaded by previous primaries once
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

public RemoteFsTranslog(
TranslogConfig config,
String translogUUID,
Expand Down Expand Up @@ -230,6 +242,7 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
}

Expand Down Expand Up @@ -327,13 +340,72 @@ protected void setMinSeqNoToKeep(long seqNo) {
this.minSeqNoToKeep = seqNo;
}

@Override
void deleteReaderFiles(TranslogReader reader) {
private void deleteRemoteGeneration(Set<Long> generations) {
try {
translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation);
} catch (IOException ignored) {
logger.error("Exception {} while deleting generation {}", ignored, reader.generation);
translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generations), e);
}
}

@Override
public void trimUnreferencedReaders() throws IOException {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
// clean up local translog files and updates readers
super.trimUnreferencedReaders();

// cleans up remote translog files not referenced in latest uploaded metadata.
// This enables us to restore translog from the metadata in case of failover or relocation.
Set<Long> generationsToDelete = new HashSet<>();
for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) {
if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) {
break;
}
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
deleteOlderPrimaryTranslogFilesFromRemoteStore();
}
}

/**
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
*/
private void deleteOlderPrimaryTranslogFilesFromRemoteStore() {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
logger.info("Cleaning up translog uploaded by previous primaries");
long minPrimaryTermInMetadata = current.getPrimaryTerm();
Set<Long> primaryTermsInRemote;
try {
primaryTermsInRemote = translogTransferManager.listPrimaryTerms();
} catch (IOException e) {
logger.error("Exception occurred while getting primary terms from remote store", e);
// If there are exceptions encountered, then we try to delete all older primary terms lesser than the
// minimum referenced primary term in remote translog metadata.
primaryTermsInRemote = LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet());
}
// Delete all primary terms that are no more referenced by the metadata file and exists in the
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermInMetadata)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> translogTransferManager.deleteTranslogAsync(term, new ActionListener<>() {
@Override
public void onResponse(Void response) {
// NO-OP
}

@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage("Exception occurred while deleting older translog files for primary_term={}", term),
e
);
}
}));
}
super.deleteReaderFiles(reader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,37 @@ public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IO
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
}

@Override
public void deleteBlobsAsync(Iterable<String> path, List<String> fileNames, ActionListener<Void> listener) {
executorService.execute(() -> {
try {
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
listener.onResponse(null);
} catch (IOException e) {
listener.onFailure(e);
}
});
}

@Override
public void deleteAsync(Iterable<String> path, ActionListener<Void> listener) {
executorService.execute(() -> {
try {
blobStore.blobContainer((BlobPath) path).delete();
listener.onResponse(null);
} catch (IOException e) {
listener.onFailure(e);
}
});
}

@Override
public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
}

@Override
public Set<String> listFolders(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).children().keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.translog.transfer.listener.FileTransferListener;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -55,9 +56,14 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
add(fileSnapshot.getName(), TransferState.FAILED);
}

@Override
public void onDelete(String name) {
fileTransferTracker.remove(name);
public void delete(List<String> names) {
for (String name : names) {
fileTransferTracker.remove(name);
}
}

public boolean uploaded(String file) {
return fileTransferTracker.get(file) == TransferState.SUCCESS;
}

public Set<TransferFileSnapshot> exclusionFilter(Set<TransferFileSnapshot> original) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ void uploadBlobAsync(

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

void deleteBlobsAsync(Iterable<String> path, List<String> fileNames, ActionListener<Void> listener);

void deleteAsync(Iterable<String> path, ActionListener<Void> listener);

/**
* Lists the files
* @param path : the path to list
Expand All @@ -53,6 +57,14 @@ void uploadBlobAsync(
*/
Set<String> listAll(Iterable<String> path) throws IOException;

/**
* Lists the folders inside the path.
* @param path : the path
* @return list of folders inside the path
* @throws IOException the exception while listing folders inside the path
*/
Set<String> listFolders(Iterable<String> path) throws IOException;

/**
*
* @param path the remote path from where download should be made
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,68 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
);
}

public void deleteTranslog(long primaryTerm, long generation) throws IOException {
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFilename = Translog.getFilename(generation);
// ToDo - Take care of metadata file cleanup
// https://github.com/opensearch-project/OpenSearch/issues/5677
fileTransferTracker.onDelete(ckpFileName);
fileTransferTracker.onDelete(translogFilename);
List<String> files = List.of(ckpFileName, translogFilename);
transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files);
/**
* This method handles deletion of multiple generations for a single primary term.
* TODO: Take care of metadata file cleanup. <a href="https://github.com/opensearch-project/OpenSearch/issues/5677">Github Issue #5677</a>
*
* @param primaryTerm primary term
* @param generations set of generation
*/
public void deleteTranslogAsync(long primaryTerm, Set<Long> generations) throws IOException {
if (generations.isEmpty()) {
return;
}
List<String> files = new ArrayList<>();
generations.forEach(generation -> {
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFilename = Translog.getFilename(generation);
files.addAll(List.of(ckpFileName, translogFilename));
});
transferService.deleteBlobsAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
}

@Override
public void onFailure(Exception e) {
logger.error(
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primary_term={} generations={}",
primaryTerm,
generations
),
e
);
}
});
}

/**
* Handles deletion of translog files for a particular primary term.
*
* @param primaryTerm primary term
* @param listener listener for response and failure
*/
public void deleteTranslogAsync(long primaryTerm, ActionListener<Void> listener) {
transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener);
}

/**
* Lists all primary terms existing on remote store.
*
* @return the list of primary terms.
* @throws IOException is thrown if it can read the data.
*/
public Set<Long> listPrimaryTerms() throws IOException {
return transferService.listFolders(remoteBaseTransferPath).stream().filter(s -> {
try {
Long.parseLong(s);
return true;
} catch (Exception ignored) {
// NO-OP
}
return false;
}).map(Long::parseLong).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ public interface FileTransferListener {
*/
void onFailure(TransferFileSnapshot fileSnapshot, Exception e);

void onDelete(String name);
}
Loading