From 84be8c9207cf1153b2eb8dfaf77cf737959781cc Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Mon, 30 Oct 2023 12:55:01 +0530 Subject: [PATCH] Use async write for manifest file and use latch for timeout (#10968) * Use async write for manifest file and use latch for timeout Signed-off-by: Dhwanil Patel --- .../common/settings/ClusterSettings.java | 1 + .../remote/RemoteClusterStateService.java | 91 +++++++++++++------ .../RemoteClusterStateServiceTests.java | 84 ++++++++++++++--- 3 files changed, 137 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index c2c6effc3336f..3a1fff21db366 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -684,6 +684,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, + RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 205ae12cf6214..c892b475d71da 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -87,6 +87,8 @@ public class RemoteClusterStateService implements Closeable { public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + public static final TimeValue METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + public static final Setting INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( "cluster.remote_store.state.index_metadata.upload_timeout", INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, @@ -101,6 +103,13 @@ public class RemoteClusterStateService implements Closeable { Setting.Property.NodeScope ); + public static final Setting METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.metadata_manifest.upload_timeout", + METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", METADATA_NAME_FORMAT, @@ -157,6 +166,7 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue indexMetadataUploadTimeout; private volatile TimeValue globalMetadataUploadTimeout; + private volatile TimeValue metadataManifestUploadTimeout; private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); private final RemotePersistenceStats remoteStateStats; @@ -190,9 +200,11 @@ public RemoteClusterStateService( this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); + this.metadataManifestUploadTimeout = clusterSettings.get(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); + clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); } @@ -401,13 +413,13 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException try { if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { // TODO: We should add metrics where transfer is timing out. [Issue: #10687] - GlobalMetadataTransferException ex = new GlobalMetadataTransferException( + RemoteStateTransferException ex = new RemoteStateTransferException( String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete") ); throw ex; } } catch (InterruptedException ex) { - GlobalMetadataTransferException exception = new GlobalMetadataTransferException( + RemoteStateTransferException exception = new RemoteStateTransferException( String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete - %s"), ex ); @@ -415,7 +427,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException throw exception; } if (exceptionReference.get() != null) { - throw new GlobalMetadataTransferException(exceptionReference.get().getMessage(), exceptionReference.get()); + throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get()); } return result.get(); } @@ -440,7 +452,7 @@ private List writeIndexMetadataParallel(ClusterState clus ); result.add(uploadedIndexMetadata); }, ex -> { - assert ex instanceof IndexMetadataTransferException; + assert ex instanceof RemoteStateTransferException; logger.error( () -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()), ex @@ -457,7 +469,7 @@ private List writeIndexMetadataParallel(ClusterState clus try { if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { - IndexMetadataTransferException ex = new IndexMetadataTransferException( + RemoteStateTransferException ex = new RemoteStateTransferException( String.format( Locale.ROOT, "Timed out waiting for transfer of index metadata to complete - %s", @@ -469,7 +481,7 @@ private List writeIndexMetadataParallel(ClusterState clus } } catch (InterruptedException ex) { exceptionList.forEach(ex::addSuppressed); - IndexMetadataTransferException exception = new IndexMetadataTransferException( + RemoteStateTransferException exception = new RemoteStateTransferException( String.format( Locale.ROOT, "Timed out waiting for transfer of index metadata to complete - %s", @@ -481,7 +493,7 @@ private List writeIndexMetadataParallel(ClusterState clus throw exception; } if (exceptionList.size() > 0) { - IndexMetadataTransferException exception = new IndexMetadataTransferException( + RemoteStateTransferException exception = new RemoteStateTransferException( String.format( Locale.ROOT, "Exception during transfer of IndexMetadata to Remote %s", @@ -520,7 +532,7 @@ private void writeIndexMetadataAsync( indexMetadataContainer.path().buildAsString() + indexMetadataFilename ) ), - ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex)) + ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().toString(), ex)) ); INDEX_METADATA_FORMAT.writeAsyncWithUrgentPriority( @@ -601,14 +613,45 @@ private ClusterMetadataManifest uploadManifest( private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { + AtomicReference result = new AtomicReference(); + AtomicReference exceptionReference = new AtomicReference(); + final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); - CLUSTER_METADATA_MANIFEST_FORMAT.write( + + // latch to wait until upload is not finished + CountDownLatch latch = new CountDownLatch(1); + + LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> { + logger.trace(String.format(Locale.ROOT, "Manifest file uploaded successfully.")); + }, ex -> { exceptionReference.set(ex); }), latch); + + CLUSTER_METADATA_MANIFEST_FORMAT.writeAsyncWithUrgentPriority( uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor(), + completionListener, FORMAT_PARAMS ); + + try { + if (latch.await(getMetadataManifestUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete") + ); + throw ex; + } + } catch (InterruptedException ex) { + RemoteStateTransferException exception = new RemoteStateTransferException( + String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete - %s"), + ex + ); + Thread.currentThread().interrupt(); + throw exception; + } + if (exceptionReference.get() != null) { + throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get()); + } logger.debug( "Metadata manifest file [{}] written during [{}] phase. ", fileName, @@ -668,6 +711,10 @@ private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTim this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout; } + private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploadTimeout) { + this.metadataManifestUploadTimeout = newMetadataManifestUploadTimeout; + } + public TimeValue getIndexMetadataUploadTimeout() { return this.indexMetadataUploadTimeout; } @@ -676,6 +723,10 @@ public TimeValue getGlobalMetadataUploadTimeout() { return this.globalMetadataUploadTimeout; } + public TimeValue getMetadataManifestUploadTimeout() { + return this.metadataManifestUploadTimeout; + } + static String getManifestFileName(long term, long version, boolean committed) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest______C/P____ return String.join( @@ -1088,29 +1139,15 @@ public void writeMetadataFailed() { } /** - * Exception for IndexMetadata transfer failures to remote - */ - static class IndexMetadataTransferException extends RuntimeException { - - public IndexMetadataTransferException(String errorDesc) { - super(errorDesc); - } - - public IndexMetadataTransferException(String errorDesc, Throwable cause) { - super(errorDesc, cause); - } - } - - /** - * Exception for GlobalMetadata transfer failures to remote + * Exception for Remote state transfer. */ - static class GlobalMetadataTransferException extends RuntimeException { + static class RemoteStateTransferException extends RuntimeException { - public GlobalMetadataTransferException(String errorDesc) { + public RemoteStateTransferException(String errorDesc) { super(errorDesc); } - public GlobalMetadataTransferException(String errorDesc, Throwable cause) { + public RemoteStateTransferException(String errorDesc, Throwable cause) { super(errorDesc, cause); } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 4efd1b8a62970..65477051cdb30 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -68,6 +68,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; @@ -230,10 +231,17 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); - + AtomicReference capturedWriteContext = new AtomicReference<>(); doAnswer((i) -> { actionListenerArgumentCaptor.getValue().onResponse(null); return null; + }).doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + capturedWriteContext.set(writeContextArgumentCaptor.getValue()); + return null; + }).doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteClusterStateService.start(); @@ -262,27 +270,30 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 2); - assertEquals(writeContextArgumentCaptor.getAllValues().size(), 2); + assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 3); + assertEquals(writeContextArgumentCaptor.getAllValues().size(), 3); - WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue(); - byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes(); + byte[] writtenBytes = capturedWriteContext.get() + .getStreamProvider(Integer.MAX_VALUE) + .provideStream(0) + .getInputStream() + .readAllBytes(); IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize( - capturedWriteContext.getFileName(), + capturedWriteContext.get().getFileName(), blobStoreRepository.getNamedXContentRegistry(), new BytesArray(writtenBytes) ); - assertEquals(capturedWriteContext.getWritePriority(), WritePriority.URGENT); + assertEquals(capturedWriteContext.get().getWritePriority(), WritePriority.URGENT); assertEquals(writtenIndexMetadata.getNumberOfShards(), 1); assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0); assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index"); assertEquals(writtenIndexMetadata.getIndex().getUUID(), "index-uuid"); long expectedChecksum = RemoteTransferContainer.checksumOfChecksum(new ByteArrayIndexInput("metadata-filename", writtenBytes), 8); - if (capturedWriteContext.doRemoteDataIntegrityCheck()) { - assertEquals(capturedWriteContext.getExpectedChecksum().longValue(), expectedChecksum); + if (capturedWriteContext.get().doRemoteDataIntegrityCheck()) { + assertEquals(capturedWriteContext.get().getExpectedChecksum().longValue(), expectedChecksum); } else { - assertEquals(capturedWriteContext.getExpectedChecksum(), null); + assertEquals(capturedWriteContext.get().getExpectedChecksum(), null); } } @@ -306,11 +317,44 @@ public void run() { remoteClusterStateService.start(); assertThrows( - RemoteClusterStateService.GlobalMetadataTransferException.class, + RemoteClusterStateService.RemoteStateTransferException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); } + public void testTimeoutWhileWritingManifestFile() throws IOException { + // verify update metadata manifest upload timeout + int metadataManifestUploadTimeout = 2; + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer((i) -> { // For Global Metadata + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; + }).doAnswer((i) -> { // For Index Metadata + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; + }).doAnswer((i) -> { + // For Manifest file perform No Op, so latch in code will timeout + return null; + }).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture()); + + remoteClusterStateService.start(); + try { + remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)); + } catch (Exception e) { + assertTrue(e instanceof RemoteClusterStateService.RemoteStateTransferException); + assertTrue(e.getMessage().contains("Timed out waiting for transfer of manifest file to complete")); + } + } + public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); @@ -327,7 +371,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx remoteClusterStateService.start(); assertThrows( - RemoteClusterStateService.IndexMetadataTransferException.class, + RemoteClusterStateService.RemoteStateTransferException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); @@ -1142,6 +1186,22 @@ public void testIndexMetadataUploadWaitTimeSetting() { assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds()); } + public void testMetadataManifestUploadWaitTimeSetting() { + // verify default value + assertEquals( + RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT, + remoteClusterStateService.getMetadataManifestUploadTimeout() + ); + + // verify update metadata manifest upload timeout + int metadataManifestUploadTimeout = randomIntBetween(1, 10); + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(metadataManifestUploadTimeout, remoteClusterStateService.getMetadataManifestUploadTimeout().seconds()); + } + public void testGlobalMetadataUploadWaitTimeSetting() { // verify default value assertEquals(