From c1aef2fb564bd6d86b3c698a0e4eec6e393cc430 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 16 Sep 2023 00:17:52 +0530 Subject: [PATCH 1/3] Optimize read write lock constructs during translog upload to remote store Signed-off-by: Ashish Singh --- .../index/translog/RemoteFsTranslog.java | 74 ++++++++++++------- .../TranslogCheckpointTransferSnapshot.java | 1 - .../transfer/TranslogTransferManager.java | 2 + .../listener/TranslogTransferListener.java | 4 + .../index/translog/RemoteFsTranslogTests.java | 1 - 5 files changed, 54 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index c488127857ed5..6b631b68e592d 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -60,6 +60,7 @@ public class RemoteFsTranslog extends Translog { private final BooleanSupplier primaryModeSupplier; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private volatile long maxRemoteTranslogGenerationUploaded; + private final Object uploadMutex = new Object(); private volatile long minSeqNoToKeep; @@ -237,11 +238,20 @@ public static TranslogTransferManager buildTranslogTransferManager( @Override public boolean ensureSynced(Location location) throws IOException { - try (ReleasableLock ignored = writeLock.acquire()) { - assert location.generation <= current.getGeneration(); - if (location.generation == current.getGeneration()) { - ensureOpen(); - return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation); + try { + boolean shouldUpload = false; + try (ReleasableLock ignored = writeLock.acquire()) { + assert location.generation <= current.getGeneration(); + if (location.generation == current.getGeneration()) { + ensureOpen(); + if (prepareForUpload(location.generation) == false) { + return false; + } + shouldUpload = true; + } + } + if (shouldUpload) { + return performUpload(primaryTermSupplier.getAsLong(), location.generation); } } catch (final Exception ex) { closeOnTragicEvent(ex); @@ -256,10 +266,12 @@ public void rollGeneration() throws IOException { if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) { return; } - prepareAndUpload(primaryTermSupplier.getAsLong(), null); + if (prepareForUpload(null)) { + performUpload(primaryTermSupplier.getAsLong(), null); + } } - private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException { + private boolean prepareForUpload(Long generation) throws IOException { try (Releasable ignored = writeLock.acquire()) { if (generation == null || generation == current.getGeneration()) { try { @@ -275,23 +287,30 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc closeOnTragicEvent(e); throw e; } - } else if (generation < current.getGeneration()) { - return false; - } + return true; + } else return generation >= current.getGeneration(); + } + } - // Do we need remote writes in sync fashion ? - // If we don't , we should swallow FileAlreadyExistsException while writing to remote store - // and also verify for same during primary-primary relocation - // Writing remote in sync fashion doesn't hurt as global ckp update - // is not updated in remote translog except in primary to primary recovery. - if (generation == null) { - if (closed.get() == false) { - return upload(primaryTerm, current.getGeneration() - 1); + private boolean performUpload(Long primaryTerm, Long generation) throws IOException { + synchronized (uploadMutex) { + try (Releasable ignored = readLock.acquire()) { + // Do we need remote writes in sync fashion ? + // If we don't , we should swallow FileAlreadyExistsException while writing to remote store + // and also verify for same during primary-primary relocation + // Writing remote in sync fashion doesn't hurt as global ckp update + // is not updated in remote translog except in primary to primary recovery. + long generationToUpload; + if (generation == null) { + if (closed.get() == false) { + generationToUpload = current.getGeneration() - 1; + } else { + generationToUpload = current.getGeneration(); + } } else { - return upload(primaryTerm, current.getGeneration()); + generationToUpload = generation; } - } else { - return upload(primaryTerm, generation); + return upload(primaryTerm, generationToUpload); } } } @@ -344,7 +363,9 @@ private boolean syncToDisk() throws IOException { public void sync() throws IOException { try { if (syncToDisk() || syncNeeded()) { - prepareAndUpload(primaryTermSupplier.getAsLong(), null); + if (prepareForUpload(null)) { + performUpload(primaryTermSupplier.getAsLong(), null); + } } } catch (final Exception e) { tragedy.setTragicException(e); @@ -528,8 +549,6 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen @Override public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { - transferReleasable.close(); - closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; minRemoteGenReferenced = getMinFileGeneration(); logger.trace("uploaded translog for {} {} ", primaryTerm, generation); @@ -537,13 +556,16 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti @Override public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException { - transferReleasable.close(); - closeFilesIfNoPendingRetentionLocks(); if (ex instanceof IOException) { throw (IOException) ex; } else { throw (RuntimeException) ex; } } + + @Override + public void close() { + transferReleasable.close(); + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index 10dec13c81e1a..eb0eebb564b63 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -164,7 +164,6 @@ public TranslogCheckpointTransferSnapshot build() throws IOException { translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration); assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term"; - assert this.generation == highestGeneration : " inconsistent generation "; final long finalHighestGeneration = highestGeneration; assert LongStream.iterate(lowestGeneration, i -> i + 1) .limit(highestGeneration) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index fd4936603671c..9764d9fcd68b5 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -191,6 +191,8 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans captureStatsOnUploadFailure(); translogTransferListener.onUploadFailed(transferSnapshot, ex); return false; + } finally { + translogTransferListener.close(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java index 132d1adf916da..3080fd05ab93c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -32,4 +32,8 @@ public interface TranslogTransferListener { * @throws IOException the exception during the transfer of data */ void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException; + + default void close() { + + } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index de1b2990f0a50..3c654818ffc6c 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -108,7 +108,6 @@ import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("ExtrasFS") - public class RemoteFsTranslogTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); From e74b1c1596db0554992fa2fff09f3915c569b870 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 18 Sep 2023 10:53:45 +0530 Subject: [PATCH 2/3] Incorporate PR review comments Signed-off-by: Ashish Singh --- .../index/translog/RemoteFsTranslog.java | 17 +++++++++++++---- .../listener/TranslogTransferListener.java | 7 ++----- .../transfer/TranslogTransferManagerTests.java | 3 +++ 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 6b631b68e592d..7d0b670862a8d 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -292,6 +292,17 @@ private boolean prepareForUpload(Long generation) throws IOException { } } + /** + * This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized + * is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the + * underlying translog readers are not deleted and the current writer is not converted to a reader at the time of + * upload. + * + * @param primaryTerm current primary term + * @param generation current generation + * @return true if upload is successful + * @throws IOException if the upload fails due to any underlying exceptions. + */ private boolean performUpload(Long primaryTerm, Long generation) throws IOException { synchronized (uploadMutex) { try (Releasable ignored = readLock.acquire()) { @@ -362,10 +373,8 @@ private boolean syncToDisk() throws IOException { @Override public void sync() throws IOException { try { - if (syncToDisk() || syncNeeded()) { - if (prepareForUpload(null)) { - performUpload(primaryTermSupplier.getAsLong(), null); - } + if (syncToDisk() || syncNeeded() || prepareForUpload(null)) { + performUpload(primaryTermSupplier.getAsLong(), null); } } catch (final Exception e) { tragedy.setTragicException(e); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java index 3080fd05ab93c..d63440bfac363 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -10,6 +10,7 @@ import org.opensearch.index.translog.transfer.TransferSnapshot; +import java.io.Closeable; import java.io.IOException; /** @@ -17,7 +18,7 @@ * * @opensearch.internal */ -public interface TranslogTransferListener { +public interface TranslogTransferListener extends Closeable { /** * Invoked when the transfer of {@link TransferSnapshot} succeeds * @param transferSnapshot the transfer snapshot @@ -32,8 +33,4 @@ public interface TranslogTransferListener { * @throws IOException the exception during the transfer of data */ void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException; - - default void close() { - - } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 6fc4557a75675..0987201eb8602 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -168,6 +168,9 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) { public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { translogTransferFailed.incrementAndGet(); } + + @Override + public void close() {} })); assertEquals(4, fileTransferSucceeded.get()); assertEquals(0, fileTransferFailed.get()); From 5595fc2ae2ee90065b2b2cff4a7c7142b604b671 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 18 Sep 2023 13:30:04 +0530 Subject: [PATCH 3/3] Incorporate PR review comments Signed-off-by: Ashish Singh --- .../java/org/opensearch/index/translog/RemoteFsTranslog.java | 2 +- .../index/translog/transfer/TranslogTransferManager.java | 4 +--- .../translog/transfer/listener/TranslogTransferListener.java | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 7d0b670862a8d..a633cb0787f27 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -373,7 +373,7 @@ private boolean syncToDisk() throws IOException { @Override public void sync() throws IOException { try { - if (syncToDisk() || syncNeeded() || prepareForUpload(null)) { + if ((syncToDisk() || syncNeeded()) && prepareForUpload(null)) { performUpload(primaryTermSupplier.getAsLong(), null); } } catch (final Exception e) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 9764d9fcd68b5..fe6b5dab9937b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -109,7 +109,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded(); long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); - try { + try (translogTransferListener) { toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); if (toUpload.isEmpty()) { @@ -191,8 +191,6 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans captureStatsOnUploadFailure(); translogTransferListener.onUploadFailed(transferSnapshot, ex); return false; - } finally { - translogTransferListener.close(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java index d63440bfac363..8805c16298d96 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -10,7 +10,6 @@ import org.opensearch.index.translog.transfer.TransferSnapshot; -import java.io.Closeable; import java.io.IOException; /** @@ -18,7 +17,7 @@ * * @opensearch.internal */ -public interface TranslogTransferListener extends Closeable { +public interface TranslogTransferListener extends AutoCloseable { /** * Invoked when the transfer of {@link TransferSnapshot} succeeds * @param transferSnapshot the transfer snapshot