From 99495122bd41a093ab31f0c16f04eba635e27397 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Mon, 16 Jan 2023 10:34:00 +0530 Subject: [PATCH 01/15] Deleting remote translog considering latest remote metadata Signed-off-by: Gaurav Bafna --- .../index/translog/RemoteFsTranslog.java | 28 +++++++-- .../transfer/FileTransferTracker.java | 12 +++- .../transfer/TranslogTransferManager.java | 3 +- .../listener/FileTransferListener.java | 1 - .../index/translog/RemoteFSTranslogTests.java | 35 +++++++---- .../transfer/FileTransferTrackerTests.java | 22 +++++++ .../TranslogTransferManagerTests.java | 60 ++++++++++++------- 7 files changed, 117 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 0cfaa5234c1fe..be2cc13a16948 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -51,6 +51,8 @@ public class RemoteFsTranslog extends Translog { private volatile long minSeqNoToKeep; + private volatile long minRemoteGenReferenced; + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -230,6 +232,7 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti transferReleasable.close(); closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; + minRemoteGenReferenced = getMinFileGeneration(); logger.trace("uploaded translog for {} {} ", primaryTerm, generation); } @@ -327,13 +330,28 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } - @Override - void deleteReaderFiles(TranslogReader reader) { + void deleteRemoteGeneration(long generation) { try { - translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation); + translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), generation); } catch (IOException ignored) { - logger.error("Exception {} while deleting generation {}", ignored, reader.generation); + logger.error("Exception {} while deleting generation {}", ignored, generation); + } + } + + public void trimUnreferencedReaders() throws IOException { + // clean up local translog files and updates readers + super.trimUnreferencedReaders(); + + // cleans up remote translog files not referenced in latest uploaded metadata + // This enables us to restore translog from that metadata + for (long generation = minRemoteGenReferenced - 1; generation > 0; generation--) { + String translogFilename = Translog.getFilename(generation); + if (fileTransferTracker.uploaded(translogFilename)) { + logger.trace("delete remote translog generation file [{}], not referenced by metadata anymore", generation); + deleteRemoteGeneration(generation); + } else { + break; + } } - super.deleteReaderFiles(reader); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 5338142afed33..fdec36e67b50c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -13,6 +13,7 @@ import org.opensearch.index.translog.transfer.listener.FileTransferListener; import java.util.HashSet; +import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -55,9 +56,14 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { add(fileSnapshot.getName(), TransferState.FAILED); } - @Override - public void onDelete(String name) { - fileTransferTracker.remove(name); + public void delete(List names) { + for (String name : names) { + fileTransferTracker.remove(name); + } + } + + public boolean uploaded(String file) { + return (fileTransferTracker.get(file) == TransferState.SUCCESS); } public Set exclusionFilter(Set original) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 35ccb4ccf17db..efb61d805d87c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -200,9 +200,8 @@ public void deleteTranslog(long primaryTerm, long generation) throws IOException String translogFilename = Translog.getFilename(generation); // ToDo - Take care of metadata file cleanup // https://github.com/opensearch-project/OpenSearch/issues/5677 - fileTransferTracker.onDelete(ckpFileName); - fileTransferTracker.onDelete(translogFilename); List files = List.of(ckpFileName, translogFilename); + fileTransferTracker.delete(files); transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java index c489e4b9a5809..af78cb50b63c6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java @@ -30,5 +30,4 @@ public interface FileTransferListener { */ void onFailure(TransferFileSnapshot fileSnapshot, Exception e); - void onDelete(String name); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 0e728ca3f1d4e..7bd054233d7ba 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -477,18 +477,18 @@ public void testSimpleOperationsUpload() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - assertEquals(translog.allUploaded().size(), 2); + assertEquals(4, translog.allUploaded().size()); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); - assertEquals(translog.allUploaded().size(), 4); + assertEquals(6, translog.allUploaded().size()); translog.rollGeneration(); - assertEquals(translog.allUploaded().size(), 4); + assertEquals(6, translog.allUploaded().size()); Set mdFiles = blobStoreTransferService.listAll( repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata") ); - assertEquals(mdFiles.size(), 2); + assertEquals(2, mdFiles.size()); logger.info("All md files {}", mdFiles); Set tlogFiles = blobStoreTransferService.listAll( @@ -529,33 +529,44 @@ public void testSimpleOperationsUpload() throws IOException { translog.deletionPolicy.setLocalCheckpointOfSafeCommit(0); // simulating the remote segment upload . translog.setMinSeqNoToKeep(0); - // This should not trim anything + // This should not trim anything from local translog.trimUnreferencedReaders(); - assertEquals(translog.allUploaded().size(), 4); + assertEquals(2, translog.readers.size()); + assertEquals(4, translog.allUploaded().size()); assertEquals( + 4, blobStoreTransferService.listAll( repository.basePath() .add(shardId.getIndex().getUUID()) .add(String.valueOf(shardId.id())) .add(String.valueOf(primaryTerm.get())) - ).size(), - 4 + ).size() ); - // This should trim tlog-2.* files as it contains seq no 0 + // This should trim tlog-2 from local + // This should not trim tlog-2.* files from remote as we not uploading any more translog to remote translog.setMinSeqNoToKeep(1); + translog.deletionPolicy.setLocalCheckpointOfSafeCommit(1); translog.trimUnreferencedReaders(); - assertEquals(translog.allUploaded().size(), 2); + assertEquals(1, translog.readers.size()); + assertEquals(4, translog.allUploaded().size()); assertEquals( + 4, blobStoreTransferService.listAll( repository.basePath() .add(shardId.getIndex().getUUID()) .add(String.valueOf(shardId.id())) .add(String.valueOf(primaryTerm.get())) - ).size(), - 2 + ).size() ); + // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); + translog.deletionPolicy.setLocalCheckpointOfSafeCommit(1); + translog.setMinSeqNoToKeep(2); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + assertEquals(4, translog.allUploaded().size()); } private Long populateTranslogOps(boolean withMissingOps) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java index c6b4579f5ddd1..be14e4a7bd380 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java @@ -15,6 +15,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.List; public class FileTransferTrackerTests extends OpenSearchTestCase { @@ -74,4 +75,25 @@ public void testOnFailure() throws IOException { } } + public void testUploaded() throws IOException { + fileTransferTracker = new FileTransferTracker(shardId); + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + try ( + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong() + ); + + ) { + fileTransferTracker.onSuccess(transferFileSnapshot); + String fileName = String.valueOf(testFile.getFileName()); + assertTrue(fileTransferTracker.uploaded(fileName)); + assertFalse(fileTransferTracker.uploaded("random-name")); + + fileTransferTracker.delete(List.of(fileName)); + assertFalse(fileTransferTracker.uploaded(fileName)); + } + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 0677879549905..c213c3fd4592f 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -16,27 +16,28 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; -import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { @@ -74,23 +75,25 @@ public void testTransferSnapshot() throws IOException { return null; }).when(transferService).uploadBlobAsync(any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { + @Override + public void onSuccess(TransferFileSnapshot fileSnapshot) { + fileTransferSucceeded.incrementAndGet(); + super.onSuccess(fileSnapshot); + } + + @Override + public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { + fileTransferFailed.incrementAndGet(); + super.onFailure(fileSnapshot, e); + } + + }; + TranslogTransferManager translogTransferManager = new TranslogTransferManager( transferService, remoteBaseTransferPath, - new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { - @Override - public void onSuccess(TransferFileSnapshot fileSnapshot) { - fileTransferSucceeded.incrementAndGet(); - } - - @Override - public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { - fileTransferFailed.incrementAndGet(); - } - - @Override - public void onDelete(String name) {} - } + fileTransferTracker ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -108,6 +111,7 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { assertEquals(0, fileTransferFailed.get()); assertEquals(1, translogTransferSucceeded.get()); assertEquals(0, translogTransferFailed.get()); + assertEquals(4, fileTransferTracker.allUploaded().size()); } private TransferSnapshot createTransferSnapshot() { @@ -297,4 +301,18 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { tracker.add(checkpointFile, true); } + + public void testDeleteTranslog() throws IOException { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, tracker); + String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; + tracker.add(translogFile, true); + tracker.add(checkpointFile, true); + assertEquals(2, tracker.allUploaded().size()); + + List files = List.of(checkpointFile, translogFile); + translogTransferManager.deleteTranslog(primaryTerm, 19); + assertEquals(0, tracker.allUploaded().size()); + verify(transferService).deleteBlobs(any(BlobPath.class), eq(files)); + } } From 2eed7bb3a4f9a7bf126bff67b66990ea734e39af Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 17 Jan 2023 09:40:57 +0530 Subject: [PATCH 02/15] PR comments Signed-off-by: Gaurav Bafna --- .../java/org/opensearch/index/translog/RemoteFsTranslog.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index be2cc13a16948..12aec4935b465 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -51,6 +51,7 @@ public class RemoteFsTranslog extends Translog { private volatile long minSeqNoToKeep; + // min generation referred by last uploaded translog private volatile long minRemoteGenReferenced; public RemoteFsTranslog( @@ -344,7 +345,7 @@ public void trimUnreferencedReaders() throws IOException { // cleans up remote translog files not referenced in latest uploaded metadata // This enables us to restore translog from that metadata - for (long generation = minRemoteGenReferenced - 1; generation > 0; generation--) { + for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) { String translogFilename = Translog.getFilename(generation); if (fileTransferTracker.uploaded(translogFilename)) { logger.trace("delete remote translog generation file [{}], not referenced by metadata anymore", generation); From b2785d6a54e8a62e79ae2579cae7f9da0dbfd835 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Wed, 18 Jan 2023 10:56:49 +0530 Subject: [PATCH 03/15] Cleaning up translogs from older primaries Signed-off-by: Gaurav Bafna --- .../index/translog/RemoteFsTranslog.java | 21 +++++++++++++++++++ .../translog/transfer/TransferService.java | 2 ++ .../transfer/TranslogTransferManager.java | 4 ++++ 3 files changed, 27 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 12aec4935b465..a4da858fba07b 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.apache.lucene.util.SetOnce; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; @@ -54,6 +55,9 @@ public class RemoteFsTranslog extends Translog { // min generation referred by last uploaded translog private volatile long minRemoteGenReferenced; + // clean up translog folder uploaded by previous primaries once + private final SetOnce olderPrimaryCleaned = new SetOnce<>();; + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -350,9 +354,26 @@ public void trimUnreferencedReaders() throws IOException { if (fileTransferTracker.uploaded(translogFilename)) { logger.trace("delete remote translog generation file [{}], not referenced by metadata anymore", generation); deleteRemoteGeneration(generation); + // Safe to delete + if (olderPrimaryCleaned.get() == null) { + // clean up translog uploaded by previous primaries if any + cleanupOlderPrimary(); + olderPrimaryCleaned.set(true); + } } else { break; } } } + + public void cleanupOlderPrimary() { + logger.info("Cleaning up translog uploaded by previous primaries"); + for (long oldPrimaryTerm = current.getPrimaryTerm() - 1; oldPrimaryTerm >= 0; oldPrimaryTerm--) { + try { + translogTransferManager.cleanTranslog(oldPrimaryTerm); + } catch (IOException e) { + logger.error("Exception {} while deleting older primary translog files {}", e, oldPrimaryTerm); + } + } + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 5745d0838efb3..2e5ee4ed45cb0 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -45,6 +45,8 @@ void uploadBlobAsync( void deleteBlobs(Iterable path, List fileNames) throws IOException; + void delete(Iterable path) throws IOException; + /** * Lists the files * @param path : the path to list diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index efb61d805d87c..d9ceb9f9897e6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -204,4 +204,8 @@ public void deleteTranslog(long primaryTerm, long generation) throws IOException fileTransferTracker.delete(files); transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files); } + + public void cleanTranslog(long primaryTerm) throws IOException { + transferService.delete(remoteBaseTransferPath.add(String.valueOf(primaryTerm))); + } } From 33355d88589c157ef530b1e469ca565ca44355e4 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Thu, 19 Jan 2023 14:14:40 +0530 Subject: [PATCH 04/15] Cleaning remote translog of older primary term in async manner Signed-off-by: Gaurav Bafna --- .../index/translog/RemoteFsTranslog.java | 27 ++++++++++++++----- .../transfer/BlobStoreTransferService.java | 13 +++++++++ .../translog/transfer/TransferService.java | 2 +- .../transfer/TranslogTransferManager.java | 4 +-- 4 files changed, 36 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index a4da858fba07b..568ed3e435983 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.apache.lucene.util.SetOnce; +import org.opensearch.action.ActionListener; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; @@ -353,11 +354,12 @@ public void trimUnreferencedReaders() throws IOException { String translogFilename = Translog.getFilename(generation); if (fileTransferTracker.uploaded(translogFilename)) { logger.trace("delete remote translog generation file [{}], not referenced by metadata anymore", generation); + // ToDo : Make it async. deleteRemoteGeneration(generation); // Safe to delete if (olderPrimaryCleaned.get() == null) { // clean up translog uploaded by previous primaries if any - cleanupOlderPrimary(); + cleanupOlderPrimariesTranslog(); olderPrimaryCleaned.set(true); } } else { @@ -366,14 +368,25 @@ public void trimUnreferencedReaders() throws IOException { } } - public void cleanupOlderPrimary() { + /** + * Cleans up remote translog uploaded by previous primaries + * Safe to do when latest metadata doesn't refer them + */ + public void cleanupOlderPrimariesTranslog() { logger.info("Cleaning up translog uploaded by previous primaries"); + // ToDo : Move deletions to another threadpool so as not to block indexing for (long oldPrimaryTerm = current.getPrimaryTerm() - 1; oldPrimaryTerm >= 0; oldPrimaryTerm--) { - try { - translogTransferManager.cleanTranslog(oldPrimaryTerm); - } catch (IOException e) { - logger.error("Exception {} while deleting older primary translog files {}", e, oldPrimaryTerm); - } + long finalOldPrimaryTerm = oldPrimaryTerm; + translogTransferManager.cleanTranslogAsync(oldPrimaryTerm, new ActionListener<>() { + @Override + public void onResponse(Void response) { + } + + @Override + public void onFailure(Exception e) { + logger.info("Exception {} while deleting older primary translog files", e, finalOldPrimaryTerm); + } + }); } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 78a26baa052ef..059e20f95bb7d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -81,6 +81,19 @@ public void deleteBlobs(Iterable path, List fileNames) throws IO blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); } + @Override + public void deleteAsync(Iterable path, ActionListener listener) { + executorService.execute( () -> { + try { + blobStore.blobContainer((BlobPath) path).delete(); + } catch (IOException e) { + listener.onFailure(e); + } + listener.onResponse(null); + }); + } + + @Override public Set listAll(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).listBlobs().keySet(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 2e5ee4ed45cb0..9b8ffbaabe676 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -45,7 +45,7 @@ void uploadBlobAsync( void deleteBlobs(Iterable path, List fileNames) throws IOException; - void delete(Iterable path) throws IOException; + void deleteAsync(Iterable path, ActionListener listener); /** * Lists the files diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index d9ceb9f9897e6..468644329bf28 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -205,7 +205,7 @@ public void deleteTranslog(long primaryTerm, long generation) throws IOException transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files); } - public void cleanTranslog(long primaryTerm) throws IOException { - transferService.delete(remoteBaseTransferPath.add(String.valueOf(primaryTerm))); + public void cleanTranslogAsync(long primaryTerm, ActionListener listener) { + transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener); } } From c504767d894df82b585abd8945f1249e5a610899 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 30 Jan 2023 23:02:34 +0530 Subject: [PATCH 05/15] Refactor code Signed-off-by: Ashish Singh --- .../index/translog/RemoteFsTranslog.java | 47 ++++++++++--------- .../transfer/BlobStoreTransferService.java | 17 +++++-- .../transfer/FileTransferTracker.java | 2 +- .../translog/transfer/TransferService.java | 2 + .../transfer/TranslogTransferManager.java | 32 ++++++++++++- .../TranslogTransferManagerTests.java | 2 +- 6 files changed, 75 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 568ed3e435983..e163b40326e40 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -336,11 +337,11 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } - void deleteRemoteGeneration(long generation) { + private void deleteRemoteGeneration(Set generationsToDelete) { try { - translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), generation); - } catch (IOException ignored) { - logger.error("Exception {} while deleting generation {}", ignored, generation); + translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generationsToDelete); + } catch (IOException e) { + logger.error("Exception occurred while deleting generation {}", generationsToDelete, e); } } @@ -348,23 +349,26 @@ public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers super.trimUnreferencedReaders(); - // cleans up remote translog files not referenced in latest uploaded metadata - // This enables us to restore translog from that metadata + // cleans up remote translog files not referenced in latest uploaded metadata. + // This enables us to restore translog from the metadata in case of failover or relocation. + Set generationsToDelete = new HashSet<>(); for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) { - String translogFilename = Translog.getFilename(generation); - if (fileTransferTracker.uploaded(translogFilename)) { - logger.trace("delete remote translog generation file [{}], not referenced by metadata anymore", generation); - // ToDo : Make it async. - deleteRemoteGeneration(generation); - // Safe to delete - if (olderPrimaryCleaned.get() == null) { - // clean up translog uploaded by previous primaries if any - cleanupOlderPrimariesTranslog(); - olderPrimaryCleaned.set(true); - } - } else { + if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) { break; } + generationsToDelete.add(generation); + } + deleteRemoteGeneration(generationsToDelete); + deleteOlderPrimaryTranslogFilesFromRemoteStore(); + } + + private void deleteOlderPrimaryTranslogFilesFromRemoteStore() { + // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there + // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part + // of older primary term. + if (olderPrimaryCleaned.get() == null) { + // clean up translog uploaded by previous primaries if any + cleanupOlderPrimariesTranslog(); } } @@ -374,17 +378,18 @@ public void trimUnreferencedReaders() throws IOException { */ public void cleanupOlderPrimariesTranslog() { logger.info("Cleaning up translog uploaded by previous primaries"); - // ToDo : Move deletions to another threadpool so as not to block indexing + // TODO: Move deletions to another threadpool so as not to block indexing for (long oldPrimaryTerm = current.getPrimaryTerm() - 1; oldPrimaryTerm >= 0; oldPrimaryTerm--) { long finalOldPrimaryTerm = oldPrimaryTerm; - translogTransferManager.cleanTranslogAsync(oldPrimaryTerm, new ActionListener<>() { + translogTransferManager.cleanTranslogAsync(oldPrimaryTerm, new ActionListener<>() { @Override public void onResponse(Void response) { + olderPrimaryCleaned.set(true); } @Override public void onFailure(Exception e) { - logger.info("Exception {} while deleting older primary translog files", e, finalOldPrimaryTerm); + logger.info("Exception occurred while deleting older translog files for primary_term={}", finalOldPrimaryTerm, e); } }); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 059e20f95bb7d..353a6ecd4f96d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -81,19 +81,30 @@ public void deleteBlobs(Iterable path, List fileNames) throws IO blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); } + @Override + public void deleteBlobsAsync(Iterable path, List fileNames, ActionListener listener) { + executorService.execute(() -> { + try { + blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); + listener.onResponse(null); + } catch (IOException e) { + listener.onFailure(e); + } + }); + } + @Override public void deleteAsync(Iterable path, ActionListener listener) { - executorService.execute( () -> { + executorService.execute(() -> { try { blobStore.blobContainer((BlobPath) path).delete(); + listener.onResponse(null); } catch (IOException e) { listener.onFailure(e); } - listener.onResponse(null); }); } - @Override public Set listAll(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).listBlobs().keySet(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index fdec36e67b50c..1909164bd821a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -63,7 +63,7 @@ public void delete(List names) { } public boolean uploaded(String file) { - return (fileTransferTracker.get(file) == TransferState.SUCCESS); + return fileTransferTracker.get(file) == TransferState.SUCCESS; } public Set exclusionFilter(Set original) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 9b8ffbaabe676..9d160bb1a39e1 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -45,6 +45,8 @@ void uploadBlobAsync( void deleteBlobs(Iterable path, List fileNames) throws IOException; + void deleteBlobsAsync(Iterable path, List fileNames, ActionListener listener); + void deleteAsync(Iterable path, ActionListener listener); /** diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 468644329bf28..ff5f0b73482cc 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -195,7 +195,7 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); } - public void deleteTranslog(long primaryTerm, long generation) throws IOException { + public void deleteTranslogAsync(long primaryTerm, long generation) throws IOException { String ckpFileName = Translog.getCommitCheckpointFileName(generation); String translogFilename = Translog.getFilename(generation); // ToDo - Take care of metadata file cleanup @@ -205,6 +205,36 @@ public void deleteTranslog(long primaryTerm, long generation) throws IOException transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files); } + /** + * This method handles deletion of multiple generations for a single primary term. + * TODO: Take care of metadata file cleanup. Github Issue #5677 + * + * @param primaryTerm primary term + * @param generations set of generation + */ + public void deleteTranslogAsync(long primaryTerm, Set generations) throws IOException { + if (generations.isEmpty()) { + return; + } + List files = new ArrayList<>(); + generations.forEach(generation -> { + String ckpFileName = Translog.getCommitCheckpointFileName(generation); + String translogFilename = Translog.getFilename(generation); + files.addAll(List.of(ckpFileName, translogFilename)); + }); + transferService.deleteBlobsAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + fileTransferTracker.delete(files); + } + + @Override + public void onFailure(Exception e) { + logger.info("Exception occurred while deleting translog for primary_term={} generations={}", primaryTerm, generations, e); + } + }); + } + public void cleanTranslogAsync(long primaryTerm, ActionListener listener) { transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index c213c3fd4592f..48d24db798b4d 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -311,7 +311,7 @@ public void testDeleteTranslog() throws IOException { assertEquals(2, tracker.allUploaded().size()); List files = List.of(checkpointFile, translogFile); - translogTransferManager.deleteTranslog(primaryTerm, 19); + translogTransferManager.deleteTranslogAsync(primaryTerm, 19); assertEquals(0, tracker.allUploaded().size()); verify(transferService).deleteBlobs(any(BlobPath.class), eq(files)); } From 14bfc06268c95dc1d3b889f3336937e7c47fcd85 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 31 Jan 2023 13:54:54 +0530 Subject: [PATCH 06/15] Fix failing UTs Signed-off-by: Ashish Singh --- .../index/translog/RemoteFsTranslog.java | 53 +++++++++---------- .../transfer/TranslogTransferManager.java | 27 +++++----- .../index/translog/RemoteFSTranslogTests.java | 50 +++++++++-------- .../TranslogTransferManagerTests.java | 15 ------ 4 files changed, 67 insertions(+), 78 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index e163b40326e40..ab18c168271bb 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -8,8 +8,9 @@ package org.opensearch.index.translog; -import org.apache.lucene.util.SetOnce; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; +import org.opensearch.common.SetOnce; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; @@ -58,7 +59,7 @@ public class RemoteFsTranslog extends Translog { private volatile long minRemoteGenReferenced; // clean up translog folder uploaded by previous primaries once - private final SetOnce olderPrimaryCleaned = new SetOnce<>();; + private final SetOnce olderPrimaryCleaned = new SetOnce<>(); public RemoteFsTranslog( TranslogConfig config, @@ -341,7 +342,7 @@ private void deleteRemoteGeneration(Set generationsToDelete) { try { translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generationsToDelete); } catch (IOException e) { - logger.error("Exception occurred while deleting generation {}", generationsToDelete, e); + logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generationsToDelete), e); } } @@ -366,32 +367,28 @@ private void deleteOlderPrimaryTranslogFilesFromRemoteStore() { // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part // of older primary term. - if (olderPrimaryCleaned.get() == null) { - // clean up translog uploaded by previous primaries if any - cleanupOlderPrimariesTranslog(); - } - } - - /** - * Cleans up remote translog uploaded by previous primaries - * Safe to do when latest metadata doesn't refer them - */ - public void cleanupOlderPrimariesTranslog() { - logger.info("Cleaning up translog uploaded by previous primaries"); - // TODO: Move deletions to another threadpool so as not to block indexing - for (long oldPrimaryTerm = current.getPrimaryTerm() - 1; oldPrimaryTerm >= 0; oldPrimaryTerm--) { - long finalOldPrimaryTerm = oldPrimaryTerm; - translogTransferManager.cleanTranslogAsync(oldPrimaryTerm, new ActionListener<>() { - @Override - public void onResponse(Void response) { - olderPrimaryCleaned.set(true); - } + if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { + logger.info("Cleaning up translog uploaded by previous primaries"); + for (long oldPrimaryTerm = current.getPrimaryTerm() - 1; oldPrimaryTerm >= 0; oldPrimaryTerm--) { + long finalOldPrimaryTerm = oldPrimaryTerm; + translogTransferManager.deleteTranslogAsync(oldPrimaryTerm, new ActionListener<>() { + @Override + public void onResponse(Void response) { + // NO-OP + } - @Override - public void onFailure(Exception e) { - logger.info("Exception occurred while deleting older translog files for primary_term={}", finalOldPrimaryTerm, e); - } - }); + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage( + "Exception occurred while deleting older translog files for primary_term={}", + finalOldPrimaryTerm + ), + e + ); + } + }); + } } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index ff5f0b73482cc..34194fb6cf64b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -195,16 +195,6 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); } - public void deleteTranslogAsync(long primaryTerm, long generation) throws IOException { - String ckpFileName = Translog.getCommitCheckpointFileName(generation); - String translogFilename = Translog.getFilename(generation); - // ToDo - Take care of metadata file cleanup - // https://github.com/opensearch-project/OpenSearch/issues/5677 - List files = List.of(ckpFileName, translogFilename); - fileTransferTracker.delete(files); - transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files); - } - /** * This method handles deletion of multiple generations for a single primary term. * TODO: Take care of metadata file cleanup. Github Issue #5677 @@ -230,12 +220,25 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) { - logger.info("Exception occurred while deleting translog for primary_term={} generations={}", primaryTerm, generations, e); + logger.error( + () -> new ParameterizedMessage( + "Exception occurred while deleting translog for primary_term={} generations={}", + primaryTerm, + generations + ), + e + ); } }); } - public void cleanTranslogAsync(long primaryTerm, ActionListener listener) { + /** + * Handles deletion on translog files for a particular primary term. + * + * @param primaryTerm primary term + * @param listener listener for response and failure + */ + public void deleteTranslogAsync(long primaryTerm, ActionListener listener) { transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener); } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 7bd054233d7ba..70752b7ae31c8 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -130,7 +130,7 @@ private LongConsumer getPersistedSeqNoConsumer() { @Before public void setUp() throws Exception { super.setUp(); - primaryTerm.set(randomLongBetween(1, Integer.MAX_VALUE)); + primaryTerm.set(randomLongBetween(1, 8)); // if a previous test failed we clean up things here translogDir = createTempDir(); translog = create(translogDir); @@ -465,7 +465,7 @@ public void testRangeSnapshot() throws Exception { } } - public void testSimpleOperationsUpload() throws IOException { + public void testSimpleOperationsUpload() throws Exception { ArrayList ops = new ArrayList<>(); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.size(0)); @@ -532,16 +532,18 @@ public void testSimpleOperationsUpload() throws IOException { // This should not trim anything from local translog.trimUnreferencedReaders(); assertEquals(2, translog.readers.size()); - assertEquals(4, translog.allUploaded().size()); - assertEquals( - 4, - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())) - ).size() - ); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll( + repository.basePath() + .add(shardId.getIndex().getUUID()) + .add(String.valueOf(shardId.id())) + .add(String.valueOf(primaryTerm.get())) + ).size() + ); + }); // This should trim tlog-2 from local // This should not trim tlog-2.* files from remote as we not uploading any more translog to remote @@ -549,16 +551,18 @@ public void testSimpleOperationsUpload() throws IOException { translog.deletionPolicy.setLocalCheckpointOfSafeCommit(1); translog.trimUnreferencedReaders(); assertEquals(1, translog.readers.size()); - assertEquals(4, translog.allUploaded().size()); - assertEquals( - 4, - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())) - ).size() - ); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll( + repository.basePath() + .add(shardId.getIndex().getUUID()) + .add(String.valueOf(shardId.id())) + .add(String.valueOf(primaryTerm.get())) + ).size() + ); + }); // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); @@ -566,7 +570,7 @@ public void testSimpleOperationsUpload() throws IOException { translog.setMinSeqNoToKeep(2); translog.trimUnreferencedReaders(); assertEquals(1, translog.readers.size()); - assertEquals(4, translog.allUploaded().size()); + assertBusy(() -> assertEquals(4, translog.allUploaded().size())); } private Long populateTranslogOps(boolean withMissingOps) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 48d24db798b4d..42bed8f7e4466 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -27,7 +27,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -301,18 +300,4 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { tracker.add(checkpointFile, true); } - - public void testDeleteTranslog() throws IOException { - FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); - TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, tracker); - String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; - tracker.add(translogFile, true); - tracker.add(checkpointFile, true); - assertEquals(2, tracker.allUploaded().size()); - - List files = List.of(checkpointFile, translogFile); - translogTransferManager.deleteTranslogAsync(primaryTerm, 19); - assertEquals(0, tracker.allUploaded().size()); - verify(transferService).deleteBlobs(any(BlobPath.class), eq(files)); - } } From 2d00f97f3f158e3a88d52ac2299e06c974b628e4 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 31 Jan 2023 14:36:11 +0530 Subject: [PATCH 07/15] Add UTs for purging remote translog in async Signed-off-by: Ashish Singh --- .../TranslogTransferManagerTests.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 42bed8f7e4466..0abbfcd3eb69c 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -12,7 +12,10 @@ import org.apache.lucene.util.Constants; import org.mockito.Mockito; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -27,6 +30,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -298,6 +302,54 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { // Since the tracker already holds the files with success state, adding them with success state is allowed tracker.add(translogFile, true); tracker.add(checkpointFile, true); + } + + public void testDeleteTranslogSuccess() throws Exception { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService( + blobStore, + OpenSearchExecutors.newDirectExecutorService() + ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + blobStoreTransferService, + remoteBaseTransferPath, + tracker + ); + String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; + tracker.add(translogFile, true); + tracker.add(checkpointFile, true); + assertEquals(2, tracker.allUploaded().size()); + + List files = List.of(checkpointFile, translogFile); + translogTransferManager.deleteTranslogAsync(primaryTerm, Set.of(19L)); + assertBusy(() -> assertEquals(0, tracker.allUploaded().size())); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files)); + } + + public void testDeleteTranslogFailure() throws Exception { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + doAnswer(invocation -> { throw new IOException("test exception"); }).when(blobStore).blobContainer(any(BlobPath.class)); + // when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService( + blobStore, + OpenSearchExecutors.newDirectExecutorService() + ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + blobStoreTransferService, + remoteBaseTransferPath, + tracker + ); + String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; + tracker.add(translogFile, true); + tracker.add(checkpointFile, true); + assertEquals(2, tracker.allUploaded().size()); + translogTransferManager.deleteTranslogAsync(primaryTerm, Set.of(19L)); + assertEquals(2, tracker.allUploaded().size()); } } From e618ba63ebf561cac849e1d768f94a03653d7457 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 31 Jan 2023 17:20:23 +0530 Subject: [PATCH 08/15] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../org/opensearch/index/translog/RemoteFsTranslog.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index ab18c168271bb..55ab2ad944f3b 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -338,11 +338,11 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } - private void deleteRemoteGeneration(Set generationsToDelete) { + private void deleteRemoteGeneration(Set generations) { try { - translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generationsToDelete); + translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations); } catch (IOException e) { - logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generationsToDelete), e); + logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generations), e); } } From c3b90f545e5113bdf87a099d4f6dad3e90b763be Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 31 Jan 2023 18:41:36 +0530 Subject: [PATCH 09/15] Empty-Commit Signed-off-by: Ashish Singh From b5c0f7f88d69b769302c9d426a10ef0c01618673 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 1 Feb 2023 01:16:19 +0530 Subject: [PATCH 10/15] Smarten deletion logic of remote translog Signed-off-by: Ashish Singh --- .../index/translog/RemoteFsTranslog.java | 69 ++++++++++++++----- .../transfer/BlobStoreTransferService.java | 5 ++ .../translog/transfer/TransferService.java | 8 +++ .../transfer/TranslogTransferManager.java | 20 +++++- 4 files changed, 83 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 55ab2ad944f3b..526c6e2607d3b 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -31,11 +31,14 @@ import java.nio.file.Path; import java.util.HashSet; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.LongStream; /** * A Translog implementation which syncs local FS with a remote store @@ -346,6 +349,7 @@ private void deleteRemoteGeneration(Set generations) { } } + @Override public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers super.trimUnreferencedReaders(); @@ -369,26 +373,55 @@ private void deleteOlderPrimaryTranslogFilesFromRemoteStore() { // of older primary term. if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { logger.info("Cleaning up translog uploaded by previous primaries"); - for (long oldPrimaryTerm = current.getPrimaryTerm() - 1; oldPrimaryTerm >= 0; oldPrimaryTerm--) { - long finalOldPrimaryTerm = oldPrimaryTerm; - translogTransferManager.deleteTranslogAsync(oldPrimaryTerm, new ActionListener<>() { - @Override - public void onResponse(Void response) { - // NO-OP - } + long minPrimaryTermInMetadata = getMinPrimaryTermInMetadata(); + if (minPrimaryTermInMetadata == -1) { + // We should keep all primary terms since we can not determine the minimum primary term referenced by the metadata file + return; + } + Set primaryTermsInRemote = getPrimaryTermsInRemote(); + // Delete all primary terms that are no more referenced by the metadata file and exists in the + Set primaryTermsToDelete = primaryTermsInRemote.stream() + .filter(term -> term < minPrimaryTermInMetadata) + .collect(Collectors.toSet()); + primaryTermsToDelete.forEach(term -> translogTransferManager.deleteTranslogAsync(term, new ActionListener<>() { + @Override + public void onResponse(Void response) { + // NO-OP + } - @Override - public void onFailure(Exception e) { - logger.error( - () -> new ParameterizedMessage( - "Exception occurred while deleting older translog files for primary_term={}", - finalOldPrimaryTerm - ), - e - ); - } - }); + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage("Exception occurred while deleting older translog files for primary_term={}", term), + e + ); + } + })); + } + } + + private Set getPrimaryTermsInRemote() { + try { + return translogTransferManager.listPrimaryTerms(); + } catch (IOException e) { + logger.error("Exception occurred while getting oldest primary term in remote store", e); + } + return LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet()); + } + + private long getMinPrimaryTermInMetadata() { + long minPrimaryTerm = -1; + try { + TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); + if (translogMetadata != null && translogMetadata.getGenerationToPrimaryTermMapper().isEmpty() == false) { + OptionalLong min = translogMetadata.getGenerationToPrimaryTermMapper().values().stream().mapToLong(Long::valueOf).min(); + if (min.isPresent()) { + minPrimaryTerm = min.getAsLong(); + } } + } catch (IOException e) { + logger.error("Exception occurred while getting max primary term in remote translog metadata", e); } + return minPrimaryTerm; } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 353a6ecd4f96d..08a98a491a035 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -109,4 +109,9 @@ public void deleteAsync(Iterable path, ActionListener listener) { public Set listAll(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).listBlobs().keySet(); } + + @Override + public Set listFolders(Iterable path) throws IOException { + return blobStore.blobContainer((BlobPath) path).children().keySet(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 9d160bb1a39e1..5ba15ad01d44e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -57,6 +57,14 @@ void uploadBlobAsync( */ Set listAll(Iterable path) throws IOException; + /** + * Lists the folders inside the path. + * @param path : the path + * @return list of folders inside the path + * @throws IOException the exception while listing folders inside the path + */ + Set listFolders(Iterable path) throws IOException; + /** * * @param path the remote path from where download should be made diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 34194fb6cf64b..48331f6528606 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -233,7 +233,7 @@ public void onFailure(Exception e) { } /** - * Handles deletion on translog files for a particular primary term. + * Handles deletion of translog files for a particular primary term. * * @param primaryTerm primary term * @param listener listener for response and failure @@ -241,4 +241,22 @@ public void onFailure(Exception e) { public void deleteTranslogAsync(long primaryTerm, ActionListener listener) { transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener); } + + /** + * Lists all primary terms existing on remote store. + * + * @return the list of primary terms. + * @throws IOException is thrown if it can read the data. + */ + public Set listPrimaryTerms() throws IOException { + return transferService.listFolders(remoteBaseTransferPath).stream().filter(s -> { + try { + Long.parseLong(s); + return true; + } catch (Exception ignored) { + // NO-OP + } + return false; + }).map(Long::parseLong).collect(Collectors.toSet()); + } } From 0942b2d0309784768f0385e599ead5b61b9eb4bf Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 1 Feb 2023 17:21:44 +0530 Subject: [PATCH 11/15] Empty-Commit Signed-off-by: Ashish Singh From 6ab321a88de7b37b6509f4fa3cc6369caee61b2a Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 2 Feb 2023 10:58:14 +0530 Subject: [PATCH 12/15] Delete older primary terms if valid generations to delete Signed-off-by: Ashish Singh --- .../index/translog/RemoteFsTranslog.java | 33 +++++-------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 526c6e2607d3b..c2640a0e7e0bc 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -31,7 +31,6 @@ import java.nio.file.Path; import java.util.HashSet; import java.util.Map; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.BooleanSupplier; @@ -363,21 +362,23 @@ public void trimUnreferencedReaders() throws IOException { } generationsToDelete.add(generation); } - deleteRemoteGeneration(generationsToDelete); - deleteOlderPrimaryTranslogFilesFromRemoteStore(); + if (generationsToDelete.isEmpty() == false) { + deleteRemoteGeneration(generationsToDelete); + deleteOlderPrimaryTranslogFilesFromRemoteStore(); + } } + /** + * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures + * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term. + */ private void deleteOlderPrimaryTranslogFilesFromRemoteStore() { // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part // of older primary term. if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { logger.info("Cleaning up translog uploaded by previous primaries"); - long minPrimaryTermInMetadata = getMinPrimaryTermInMetadata(); - if (minPrimaryTermInMetadata == -1) { - // We should keep all primary terms since we can not determine the minimum primary term referenced by the metadata file - return; - } + long minPrimaryTermInMetadata = current.getPrimaryTerm(); Set primaryTermsInRemote = getPrimaryTermsInRemote(); // Delete all primary terms that are no more referenced by the metadata file and exists in the Set primaryTermsToDelete = primaryTermsInRemote.stream() @@ -408,20 +409,4 @@ private Set getPrimaryTermsInRemote() { } return LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet()); } - - private long getMinPrimaryTermInMetadata() { - long minPrimaryTerm = -1; - try { - TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); - if (translogMetadata != null && translogMetadata.getGenerationToPrimaryTermMapper().isEmpty() == false) { - OptionalLong min = translogMetadata.getGenerationToPrimaryTermMapper().values().stream().mapToLong(Long::valueOf).min(); - if (min.isPresent()) { - minPrimaryTerm = min.getAsLong(); - } - } - } catch (IOException e) { - logger.error("Exception occurred while getting max primary term in remote translog metadata", e); - } - return minPrimaryTerm; - } } From ff5de9f159805364674543b998d4f17653032407 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 2 Feb 2023 11:05:42 +0530 Subject: [PATCH 13/15] Incorporated PR review feedback - Reword log message Signed-off-by: Ashish Singh --- .../java/org/opensearch/index/translog/RemoteFsTranslog.java | 2 +- .../org/opensearch/index/translog/RemoteFSTranslogTests.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index c2640a0e7e0bc..53534af115dab 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -405,7 +405,7 @@ private Set getPrimaryTermsInRemote() { try { return translogTransferManager.listPrimaryTerms(); } catch (IOException e) { - logger.error("Exception occurred while getting oldest primary term in remote store", e); + logger.error("Exception occurred while getting primary terms from remote store", e); } return LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet()); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 70752b7ae31c8..cb3affb71b3dc 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -130,7 +130,7 @@ private LongConsumer getPersistedSeqNoConsumer() { @Before public void setUp() throws Exception { super.setUp(); - primaryTerm.set(randomLongBetween(1, 8)); + primaryTerm.set(randomLongBetween(1, Integer.MAX_VALUE)); // if a previous test failed we clean up things here translogDir = createTempDir(); translog = create(translogDir); From 7dd90b948f3d24920d22d21362b66d26e352b9c4 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 2 Feb 2023 19:15:39 +0530 Subject: [PATCH 14/15] Incorporate PR feedback - Make list primary terms inline Signed-off-by: Ashish Singh --- .../index/translog/RemoteFsTranslog.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 53534af115dab..a3a6eba39e126 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -379,7 +379,15 @@ private void deleteOlderPrimaryTranslogFilesFromRemoteStore() { if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { logger.info("Cleaning up translog uploaded by previous primaries"); long minPrimaryTermInMetadata = current.getPrimaryTerm(); - Set primaryTermsInRemote = getPrimaryTermsInRemote(); + Set primaryTermsInRemote; + try { + primaryTermsInRemote = translogTransferManager.listPrimaryTerms(); + } catch (IOException e) { + logger.error("Exception occurred while getting primary terms from remote store", e); + // If there are exceptions encountered, then we try to delete all older primary terms lesser than the + // minimum referenced primary term in remote translog metadata. + primaryTermsInRemote = LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet()); + } // Delete all primary terms that are no more referenced by the metadata file and exists in the Set primaryTermsToDelete = primaryTermsInRemote.stream() .filter(term -> term < minPrimaryTermInMetadata) @@ -400,13 +408,4 @@ public void onFailure(Exception e) { })); } } - - private Set getPrimaryTermsInRemote() { - try { - return translogTransferManager.listPrimaryTerms(); - } catch (IOException e) { - logger.error("Exception occurred while getting primary terms from remote store", e); - } - return LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet()); - } } From df2b6dd142d40d5c801c00ed3e9ad868ba27a9d8 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 2 Feb 2023 21:08:56 +0530 Subject: [PATCH 15/15] Empty-Commit Signed-off-by: Ashish Singh