Skip to content

Commit

Permalink
[Remote Store] Update Translog Metadata file name (#8350) (#8546)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Jul 8, 2023
1 parent a59de5d commit 85bd301
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ public void trimUnreferencedReaders() throws IOException {
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
deleteStaleRemotePrimaryTermsAndMetadataFiles();
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release);
deleteStaleRemotePrimaryTerms();
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
}
Expand All @@ -409,7 +410,7 @@ private void deleteRemoteGeneration(Set<Long> generations) {
* <br>
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
*/
private void deleteStaleRemotePrimaryTermsAndMetadataFiles() {
private void deleteStaleRemotePrimaryTerms() {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
Expand All @@ -418,8 +419,6 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() {
assert readers.isEmpty() == false : "Expected non-empty readers";
long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get();
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
// Second we delete all stale metadata files from remote store
translogTransferManager.deleteStaleTranslogMetadataFilesAsync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand All @@ -23,6 +24,8 @@
import java.util.List;
import java.util.Set;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;

/**
* Service that handles remote transfer of translog and checkpoint files
*
Expand Down Expand Up @@ -114,17 +117,6 @@ public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
}

@Override
public void listAllAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener) {
threadPool.executor(threadpoolName).execute(() -> {
try {
listener.onResponse(listAll(path));
} catch (IOException e) {
listener.onFailure(e);
}
});
}

@Override
public Set<String> listFolders(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).children().keySet();
Expand All @@ -140,4 +132,18 @@ public void listFoldersAsync(String threadpoolName, Iterable<String> path, Actio
}
});
}

public void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) {
blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder("", limit, LEXICOGRAPHIC, listener);
}

public void listAllInSortedOrderAsync(
String threadpoolName,
Iterable<String> path,
int limit,
ActionListener<List<BlobMetadata>> listener
) {
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, limit, listener); });
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

import java.io.IOException;
Expand Down Expand Up @@ -80,14 +81,6 @@ void uploadBlobAsync(
*/
Set<String> listAll(Iterable<String> path) throws IOException;

/**
* Lists the files and invokes the listener on success or failure
* @param threadpoolName threadpool type which will be used to list all files asynchronously.
* @param path the path to list
* @param listener the callback to be invoked once list operation completes successfully/fails.
*/
void listAllAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener);

/**
* Lists the folders inside the path.
* @param path : the path
Expand All @@ -114,4 +107,8 @@ void uploadBlobAsync(
*/
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;

void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);

void listAllInSortedOrderAsync(String threadpoolName, Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
Expand Down Expand Up @@ -42,8 +44,6 @@

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -185,15 +185,39 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
}

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return metadataStreamWrapper.readStream(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
}
}).orElse(null);
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(blobMetadataList -> {
if (blobMetadataList.isEmpty()) return;
String filename = blobMetadataList.get(0).name();
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
}
}, e -> {
logger.error(() -> new ParameterizedMessage("Exception while listing metadata files "), e);
exceptionSetOnce.set((IOException) e);
}),
latch
);

try {
transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener);
latch.await();
} catch (InterruptedException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
}

if (exceptionSetOnce.get() != null) {
throw exceptionSetOnce.get();
}

return metadataSetOnce.get();
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Expand All @@ -211,7 +235,7 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));

return new TransferFileSnapshot(
getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()),
translogTransferMetadata.getFileName(),
getMetadataBytes(translogTransferMetadata),
translogTransferMetadata.getPrimaryTerm()
);
Expand All @@ -230,7 +254,7 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"translog transfer metadata " + metadata.getPrimaryTerm(),
getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()),
metadata.getFileName(),
output,
TranslogTransferMetadata.BUFFER_SIZE
)
Expand All @@ -253,20 +277,14 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = new ArrayList<>();
List<String> metadataFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
translogFiles.addAll(List.of(ckpFileName, translogFileName));
// Add metadata file tio metadata file list which is located in basePath/metadata
String metadataFileName = TranslogTransferMetadata.getFileName(primaryTerm, generation);
metadataFiles.add(metadataFileName);
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
// Delete the metadata files asynchronously
deleteMetadataFilesAsync(metadataFiles, onCompletion);
}

/**
Expand Down Expand Up @@ -341,24 +359,37 @@ public void onFailure(Exception e) {
});
}

public void deleteStaleTranslogMetadataFilesAsync() {
transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> metadataFiles) {
List<String> sortedMetadataFiles = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(0, sortedMetadataFiles.size() - 1);
deleteMetadataFilesAsync(metadataFilesToDelete);
}
public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
try {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
onCompletion.run();
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size());
logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete);
deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
}
});
@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
onCompletion.run();
}
}
);
} catch (Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
onCompletion.run();
}
}

public void deleteTranslogFiles() throws IOException {
Expand Down Expand Up @@ -407,14 +438,6 @@ public void onFailure(Exception e) {
}
}

/**
* Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool.
* @param metadataFilesToDelete list of metadata files to be deleted.
*/
private void deleteMetadataFilesAsync(List<String> metadataFilesToDelete) {
deleteMetadataFilesAsync(metadataFilesToDelete, () -> {});
}

/**
* Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool. On success or failure, runs {@code onCompletion}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.common.SetOnce;
import org.opensearch.index.remote.RemoteStoreUtils;

import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -42,13 +42,14 @@ public class TranslogTransferMetadata {

static final String METADATA_CODEC = "md";

public static final Comparator<String> METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator();
private final long createdAt;

public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) {
this.primaryTerm = primaryTerm;
this.generation = generation;
this.minTranslogGeneration = minTranslogGeneration;
this.count = count;
this.createdAt = System.currentTimeMillis();
}

public long getPrimaryTerm() {
Expand All @@ -75,8 +76,19 @@ public Map<String, String> getGenerationToPrimaryTermMapper() {
return generationToPrimaryTermMapper.get();
}

public static String getFileName(long primaryTerm, long generation) {
return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation)));
/*
This should be used only at the time of creation.
*/
public String getFileName() {
return String.join(
METADATA_SEPARATOR,
Arrays.asList(
RemoteStoreUtils.invertLong(primaryTerm),
RemoteStoreUtils.invertLong(generation),
RemoteStoreUtils.invertLong(createdAt),
String.valueOf(CURRENT_VERSION)
)
);
}

@Override
Expand All @@ -91,22 +103,4 @@ public boolean equals(Object o) {
TranslogTransferMetadata other = (TranslogTransferMetadata) o;
return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation);
}

private static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
// Format of metadata filename is <Primary Term>__<Generation>
String[] filenameTokens1 = first.split(METADATA_SEPARATOR);
String[] filenameTokens2 = second.split(METADATA_SEPARATOR);
// Here, we are comparing only primary term and generation.
for (int i = 0; i < filenameTokens1.length; i++) {
if (filenameTokens1[i].equals(filenameTokens2[i]) == false) {
return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i]));
}
}
throw new IllegalArgumentException(
"TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation"
);
}
}
}
Loading

0 comments on commit 85bd301

Please sign in to comment.