Skip to content

Commit

Permalink
Smarten deletion logic of remote translog
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Jan 31, 2023
1 parent 1e448b3 commit 9d30ec8
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

/**
* A Translog implementation which syncs local FS with a remote store
Expand Down Expand Up @@ -346,6 +349,7 @@ private void deleteRemoteGeneration(Set<Long> generations) {
}
}

@Override
public void trimUnreferencedReaders() throws IOException {
// clean up local translog files and updates readers
super.trimUnreferencedReaders();
Expand All @@ -369,26 +373,55 @@ private void deleteOlderPrimaryTranslogFilesFromRemoteStore() {
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
logger.info("Cleaning up translog uploaded by previous primaries");
for (long oldPrimaryTerm = current.getPrimaryTerm() - 1; oldPrimaryTerm >= 0; oldPrimaryTerm--) {
long finalOldPrimaryTerm = oldPrimaryTerm;
translogTransferManager.deleteTranslogAsync(oldPrimaryTerm, new ActionListener<>() {
@Override
public void onResponse(Void response) {
// NO-OP
}
long minPrimaryTermInMetadata = getMinPrimaryTermInMetadata();
if (minPrimaryTermInMetadata == -1) {
// We should keep all primary terms since we can not determine the minimum primary term referenced by the metadata file
return;
}
Set<Long> primaryTermsInRemote = getPrimaryTermsInRemote();
// Delete all primary terms that are no more referenced by the metadata file and exists in the
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermInMetadata)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> translogTransferManager.deleteTranslogAsync(term, new ActionListener<>() {
@Override
public void onResponse(Void response) {
// NO-OP
}

@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting older translog files for primary_term={}",
finalOldPrimaryTerm
),
e
);
}
});
@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage("Exception occurred while deleting older translog files for primary_term={}", term),
e
);
}
}));
}
}

private Set<Long> getPrimaryTermsInRemote() {
try {
return translogTransferManager.listPrimaryTerms();
} catch (IOException e) {
logger.error("Exception occurred while getting oldest primary term in remote store", e);
}
return LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet());
}

private long getMinPrimaryTermInMetadata() {
long minPrimaryTerm = -1;
try {
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null && translogMetadata.getGenerationToPrimaryTermMapper().isEmpty() == false) {
OptionalLong min = translogMetadata.getGenerationToPrimaryTermMapper().values().stream().mapToLong(Long::valueOf).min();
if (min.isPresent()) {
minPrimaryTerm = min.getAsLong();
}
}
} catch (IOException e) {
logger.error("Exception occurred while getting max primary term in remote translog metadata", e);
}
return minPrimaryTerm;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ public void deleteAsync(Iterable<String> path, ActionListener<Void> listener) {
public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
}

@Override
public Set<String> listFolders(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).children().keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ void uploadBlobAsync(
*/
Set<String> listAll(Iterable<String> path) throws IOException;

/**
* Lists the folders inside the path.
* @param path : the path
* @return list of folders inside the path
* @throws IOException the exception while listing folders inside the path
*/
Set<String> listFolders(Iterable<String> path) throws IOException;

/**
*
* @param path the remote path from where download should be made
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,30 @@ public void onFailure(Exception e) {
}

/**
* Handles deletion on translog files for a particular primary term.
* Handles deletion of translog files for a particular primary term.
*
* @param primaryTerm primary term
* @param listener listener for response and failure
*/
public void deleteTranslogAsync(long primaryTerm, ActionListener<Void> listener) {
transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener);
}

/**
* Lists all primary terms existing on remote store.
*
* @return the list of primary terms.
* @throws IOException is thrown if it can read the data.
*/
public Set<Long> listPrimaryTerms() throws IOException {
return transferService.listFolders(remoteBaseTransferPath).stream().filter(s -> {
try {
Long.parseLong(s);
return true;
} catch (Exception ignored) {
// NO-OP
}
return false;
}).map(Long::parseLong).collect(Collectors.toSet());
}
}

0 comments on commit 9d30ec8

Please sign in to comment.