Skip to content

Commit

Permalink
Use async write for manifest file and use latch for timeout (#10968) (#…
Browse files Browse the repository at this point in the history
…10984)

* Use async write for manifest file and use latch for timeout

(cherry picked from commit 84be8c9)

Signed-off-by: Dhwanil Patel <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 2571eb7 commit 52ec075
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class RemoteClusterStateService implements Closeable {

public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final TimeValue METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final Setting<TimeValue> INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.index_metadata.upload_timeout",
INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Expand All @@ -101,6 +103,13 @@ public class RemoteClusterStateService implements Closeable {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.metadata_manifest.upload_timeout",
METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
Expand Down Expand Up @@ -157,6 +166,7 @@ public class RemoteClusterStateService implements Closeable {

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;
private volatile TimeValue metadataManifestUploadTimeout;

private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
private final RemotePersistenceStats remoteStateStats;
Expand Down Expand Up @@ -190,9 +200,11 @@ public RemoteClusterStateService(
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
this.metadataManifestUploadTimeout = clusterSettings.get(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
}

Expand Down Expand Up @@ -401,21 +413,21 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
try {
if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
// TODO: We should add metrics where transfer is timing out. [Issue: #10687]
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")
);
throw ex;
}
} catch (InterruptedException ex) {
GlobalMetadataTransferException exception = new GlobalMetadataTransferException(
RemoteStateTransferException exception = new RemoteStateTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete - %s"),
ex
);
Thread.currentThread().interrupt();
throw exception;
}
if (exceptionReference.get() != null) {
throw new GlobalMetadataTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
}
return result.get();
}
Expand All @@ -440,7 +452,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
);
result.add(uploadedIndexMetadata);
}, ex -> {
assert ex instanceof IndexMetadataTransferException;
assert ex instanceof RemoteStateTransferException;
logger.error(
() -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()),
ex
Expand All @@ -457,7 +469,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus

try {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
IndexMetadataTransferException ex = new IndexMetadataTransferException(
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(
Locale.ROOT,
"Timed out waiting for transfer of index metadata to complete - %s",
Expand All @@ -469,7 +481,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
}
} catch (InterruptedException ex) {
exceptionList.forEach(ex::addSuppressed);
IndexMetadataTransferException exception = new IndexMetadataTransferException(
RemoteStateTransferException exception = new RemoteStateTransferException(
String.format(
Locale.ROOT,
"Timed out waiting for transfer of index metadata to complete - %s",
Expand All @@ -481,7 +493,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
throw exception;
}
if (exceptionList.size() > 0) {
IndexMetadataTransferException exception = new IndexMetadataTransferException(
RemoteStateTransferException exception = new RemoteStateTransferException(
String.format(
Locale.ROOT,
"Exception during transfer of IndexMetadata to Remote %s",
Expand Down Expand Up @@ -520,7 +532,7 @@ private void writeIndexMetadataAsync(
indexMetadataContainer.path().buildAsString() + indexMetadataFilename
)
),
ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex))
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().toString(), ex))
);

INDEX_METADATA_FORMAT.writeAsyncWithUrgentPriority(
Expand Down Expand Up @@ -601,14 +613,45 @@ private ClusterMetadataManifest uploadManifest(

private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
throws IOException {
AtomicReference<String> result = new AtomicReference<String>();
AtomicReference<Exception> exceptionReference = new AtomicReference<Exception>();

final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID);
CLUSTER_METADATA_MANIFEST_FORMAT.write(

// latch to wait until upload is not finished
CountDownLatch latch = new CountDownLatch(1);

LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> {
logger.trace(String.format(Locale.ROOT, "Manifest file uploaded successfully."));
}, ex -> { exceptionReference.set(ex); }), latch);

CLUSTER_METADATA_MANIFEST_FORMAT.writeAsyncWithUrgentPriority(
uploadManifest,
metadataManifestContainer,
fileName,
blobStoreRepository.getCompressor(),
completionListener,
FORMAT_PARAMS
);

try {
if (latch.await(getMetadataManifestUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete")
);
throw ex;
}
} catch (InterruptedException ex) {
RemoteStateTransferException exception = new RemoteStateTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete - %s"),
ex
);
Thread.currentThread().interrupt();
throw exception;
}
if (exceptionReference.get() != null) {
throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
}
logger.debug(
"Metadata manifest file [{}] written during [{}] phase. ",
fileName,
Expand Down Expand Up @@ -668,6 +711,10 @@ private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTim
this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout;
}

private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploadTimeout) {
this.metadataManifestUploadTimeout = newMetadataManifestUploadTimeout;
}

public TimeValue getIndexMetadataUploadTimeout() {
return this.indexMetadataUploadTimeout;
}
Expand All @@ -676,6 +723,10 @@ public TimeValue getGlobalMetadataUploadTimeout() {
return this.globalMetadataUploadTimeout;
}

public TimeValue getMetadataManifestUploadTimeout() {
return this.metadataManifestUploadTimeout;
}

static String getManifestFileName(long term, long version, boolean committed) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down Expand Up @@ -1088,29 +1139,15 @@ public void writeMetadataFailed() {
}

/**
* Exception for IndexMetadata transfer failures to remote
*/
static class IndexMetadataTransferException extends RuntimeException {

public IndexMetadataTransferException(String errorDesc) {
super(errorDesc);
}

public IndexMetadataTransferException(String errorDesc, Throwable cause) {
super(errorDesc, cause);
}
}

/**
* Exception for GlobalMetadata transfer failures to remote
* Exception for Remote state transfer.
*/
static class GlobalMetadataTransferException extends RuntimeException {
static class RemoteStateTransferException extends RuntimeException {

public GlobalMetadataTransferException(String errorDesc) {
public RemoteStateTransferException(String errorDesc) {
super(errorDesc);
}

public GlobalMetadataTransferException(String errorDesc, Throwable cause) {
public RemoteStateTransferException(String errorDesc, Throwable cause) {
super(errorDesc, cause);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand Down Expand Up @@ -230,10 +231,17 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {

ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);
ArgumentCaptor<WriteContext> writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class);

AtomicReference<WriteContext> capturedWriteContext = new AtomicReference<>();
doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
capturedWriteContext.set(writeContextArgumentCaptor.getValue());
return null;
}).doAnswer((i) -> {
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture());

remoteClusterStateService.start();
Expand Down Expand Up @@ -262,27 +270,30 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID()));

assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 2);
assertEquals(writeContextArgumentCaptor.getAllValues().size(), 2);
assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 3);
assertEquals(writeContextArgumentCaptor.getAllValues().size(), 3);

WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue();
byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes();
byte[] writtenBytes = capturedWriteContext.get()
.getStreamProvider(Integer.MAX_VALUE)
.provideStream(0)
.getInputStream()
.readAllBytes();
IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize(
capturedWriteContext.getFileName(),
capturedWriteContext.get().getFileName(),
blobStoreRepository.getNamedXContentRegistry(),
new BytesArray(writtenBytes)
);

assertEquals(capturedWriteContext.getWritePriority(), WritePriority.URGENT);
assertEquals(capturedWriteContext.get().getWritePriority(), WritePriority.URGENT);
assertEquals(writtenIndexMetadata.getNumberOfShards(), 1);
assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0);
assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index");
assertEquals(writtenIndexMetadata.getIndex().getUUID(), "index-uuid");
long expectedChecksum = RemoteTransferContainer.checksumOfChecksum(new ByteArrayIndexInput("metadata-filename", writtenBytes), 8);
if (capturedWriteContext.doRemoteDataIntegrityCheck()) {
assertEquals(capturedWriteContext.getExpectedChecksum().longValue(), expectedChecksum);
if (capturedWriteContext.get().doRemoteDataIntegrityCheck()) {
assertEquals(capturedWriteContext.get().getExpectedChecksum().longValue(), expectedChecksum);
} else {
assertEquals(capturedWriteContext.getExpectedChecksum(), null);
assertEquals(capturedWriteContext.get().getExpectedChecksum(), null);
}

}
Expand All @@ -306,11 +317,44 @@ public void run() {

remoteClusterStateService.start();
assertThrows(
RemoteClusterStateService.GlobalMetadataTransferException.class,
RemoteClusterStateService.RemoteStateTransferException.class,
() -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10))
);
}

public void testTimeoutWhileWritingManifestFile() throws IOException {
// verify update metadata manifest upload timeout
int metadataManifestUploadTimeout = 2;
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);

final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);

ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);

doAnswer((i) -> { // For Global Metadata
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> { // For Index Metadata
actionListenerArgumentCaptor.getValue().onResponse(null);
return null;
}).doAnswer((i) -> {
// For Manifest file perform No Op, so latch in code will timeout
return null;
}).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture());

remoteClusterStateService.start();
try {
remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10));
} catch (Exception e) {
assertTrue(e instanceof RemoteClusterStateService.RemoteStateTransferException);
assertTrue(e.getMessage().contains("Timed out waiting for transfer of manifest file to complete"));
}
}

public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);
Expand All @@ -327,7 +371,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx

remoteClusterStateService.start();
assertThrows(
RemoteClusterStateService.IndexMetadataTransferException.class,
RemoteClusterStateService.RemoteStateTransferException.class,
() -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10))
);
assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
Expand Down Expand Up @@ -1142,6 +1186,22 @@ public void testIndexMetadataUploadWaitTimeSetting() {
assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds());
}

public void testMetadataManifestUploadWaitTimeSetting() {
// verify default value
assertEquals(
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT,
remoteClusterStateService.getMetadataManifestUploadTimeout()
);

// verify update metadata manifest upload timeout
int metadataManifestUploadTimeout = randomIntBetween(1, 10);
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);
assertEquals(metadataManifestUploadTimeout, remoteClusterStateService.getMetadataManifestUploadTimeout().seconds());
}

public void testGlobalMetadataUploadWaitTimeSetting() {
// verify default value
assertEquals(
Expand Down

0 comments on commit 52ec075

Please sign in to comment.