From 2cefc7c567b1f297519b28c62fa386e96a57ed8b Mon Sep 17 00:00:00 2001 From: Elon Azoulay Date: Sun, 10 Dec 2023 01:31:35 -0800 Subject: [PATCH] Configure retries for trino-filesystem-gcs --- lib/trino-filesystem-gcs/pom.xml | 11 +++ .../filesystem/gcs/GcsFileSystemConfig.java | 89 +++++++++++++++++++ .../filesystem/gcs/GcsStorageFactory.java | 29 +++++- .../gcs/TestGcsFileSystemConfig.java | 42 +++++++-- .../filesystem/gcs/TestGcsFileSystemGcs.java | 18 ++++ 5 files changed, 182 insertions(+), 7 deletions(-) diff --git a/lib/trino-filesystem-gcs/pom.xml b/lib/trino-filesystem-gcs/pom.xml index 33acd16414f3..a958870f3926 100644 --- a/lib/trino-filesystem-gcs/pom.xml +++ b/lib/trino-filesystem-gcs/pom.xml @@ -123,6 +123,11 @@ jakarta.validation-api + + org.threeten + threetenbp + + io.trino trino-spi @@ -141,6 +146,12 @@ test + + io.airlift + testing + test + + io.trino trino-filesystem diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java index 766837c435b4..b5c8652ccdd3 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java @@ -18,10 +18,15 @@ import io.airlift.configuration.ConfigSecuritySensitive; import io.airlift.configuration.validation.FileExists; import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.airlift.units.MinDuration; import jakarta.annotation.Nullable; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; +import java.util.concurrent.TimeUnit; + import static com.google.common.base.Preconditions.checkState; import static io.airlift.units.DataSize.Unit.MEGABYTE; @@ -37,6 +42,12 @@ public class GcsFileSystemConfig private boolean useGcsAccessToken; private String jsonKey; private String jsonKeyFilePath; + private int maxRetries = 20; + private double backoffScaleFactor = 2.0; + private Duration maxRetryTime = new Duration(20, TimeUnit.SECONDS); + private Duration minBackoffDelay = new Duration(10, TimeUnit.MILLISECONDS); + // Note: there is no benefit to setting this much higher as the rpc quota is 1x per second: https://cloud.google.com/storage/docs/retry-strategy#java + private Duration maxBackoffDelay = new Duration(1100, TimeUnit.MILLISECONDS); @NotNull public DataSize getReadBlockSize() @@ -148,6 +159,84 @@ public GcsFileSystemConfig setJsonKeyFilePath(String jsonKeyFilePath) return this; } + @Min(0) + public int getMaxRetries() + { + return maxRetries; + } + + @Config("gcs.client.max-retries") + @ConfigDescription("Maximum number of RPC attempts") + public GcsFileSystemConfig setMaxRetries(int maxRetries) + { + this.maxRetries = maxRetries; + return this; + } + + @Min(1) + public double getBackoffScaleFactor() + { + return backoffScaleFactor; + } + + @Config("gcs.client.backoff-scale-factor") + @ConfigDescription("Scale factor for RPC retry delay") + public GcsFileSystemConfig setBackoffScaleFactor(double backoffScaleFactor) + { + this.backoffScaleFactor = backoffScaleFactor; + return this; + } + + @NotNull + public Duration getMaxRetryTime() + { + return maxRetryTime; + } + + @Config("gcs.client.max-retry-time") + @ConfigDescription("Total time limit for an RPC to be retried") + public GcsFileSystemConfig setMaxRetryTime(Duration maxRetryTime) + { + this.maxRetryTime = maxRetryTime; + return this; + } + + @NotNull + @MinDuration("0ms") + public Duration getMinBackoffDelay() + { + return minBackoffDelay; + } + + @Config("gcs.client.min-backoff-delay") + @ConfigDescription("Minimum delay between RPC retries") + public GcsFileSystemConfig setMinBackoffDelay(Duration minBackoffDelay) + { + this.minBackoffDelay = minBackoffDelay; + return this; + } + + @NotNull + @MinDuration("0ms") + public Duration getMaxBackoffDelay() + { + return maxBackoffDelay; + } + + @Config("gcs.client.max-backoff-delay") + @ConfigDescription("Maximum delay between RPC retries.") + public GcsFileSystemConfig setMaxBackoffDelay(Duration maxBackoffDelay) + { + this.maxBackoffDelay = maxBackoffDelay; + return this; + } + + @AssertTrue(message = "gcs.client.min-backoff-delay must be less than or equal to gcs.client.max-backoff-delay") + public boolean isRetryDelayValid() + { + return minBackoffDelay.compareTo(maxBackoffDelay) <= 0; + } + public void validate() { // This cannot be normal validation, as it would make it impossible to write TestGcsFileSystemConfig.testExplicitPropertyMappings diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java index 176d45061fda..bdb85f82d7e2 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java @@ -13,6 +13,7 @@ */ package io.trino.filesystem.gcs; +import com.google.api.gax.retrying.RetrySettings; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; @@ -20,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.trino.spi.security.ConnectorIdentity; +import org.threeten.bp.Duration; import java.io.ByteArrayInputStream; import java.io.FileInputStream; @@ -29,6 +31,7 @@ import java.util.List; import java.util.Optional; +import static com.google.cloud.storage.StorageRetryStrategy.getUniformStorageRetryStrategy; import static com.google.common.base.Strings.nullToEmpty; import static java.nio.charset.StandardCharsets.UTF_8; @@ -39,6 +42,11 @@ public class GcsStorageFactory private final String projectId; private final boolean useGcsAccessToken; private final Optional jsonGoogleCredential; + private final int maxRetries; + private final double backoffScaleFactor; + private final Duration maxRetryTime; + private final Duration minBackoffDelay; + private final Duration maxBackoffDelay; @Inject public GcsStorageFactory(GcsFileSystemConfig config) @@ -62,6 +70,12 @@ else if (jsonKeyFilePath != null) { else { jsonGoogleCredential = Optional.empty(); } + this.maxRetries = config.getMaxRetries(); + this.backoffScaleFactor = config.getBackoffScaleFactor(); + // To avoid name collision by importing io.airlift.Duration + this.maxRetryTime = Duration.ofMillis(config.getMaxRetryTime().toMillis()); + this.minBackoffDelay = Duration.ofMillis(config.getMinBackoffDelay().toMillis()); + this.maxBackoffDelay = Duration.ofMillis(config.getMaxBackoffDelay().toMillis()); } public Storage create(ConnectorIdentity identity) @@ -81,7 +95,20 @@ public Storage create(ConnectorIdentity identity) if (projectId != null) { storageOptionsBuilder.setProjectId(projectId); } - return storageOptionsBuilder.setCredentials(credentials).build().getService(); + // Note: without uniform strategy we cannot retry idempotent operations. + // The trino-filesystem api does not violate the conditions for idempotency, see https://cloud.google.com/storage/docs/retry-strategy#java for details. + return storageOptionsBuilder + .setCredentials(credentials) + .setStorageRetryStrategy(getUniformStorageRetryStrategy()) + .setRetrySettings(RetrySettings.newBuilder() + .setMaxAttempts(maxRetries + 1) + .setRetryDelayMultiplier(backoffScaleFactor) + .setTotalTimeout(maxRetryTime) + .setInitialRetryDelay(minBackoffDelay) + .setMaxRetryDelay(maxBackoffDelay) + .build()) + .build() + .getService(); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java index 4a6c9cc61508..d5b640f8f738 100644 --- a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java +++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java @@ -15,6 +15,8 @@ import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import jakarta.validation.constraints.AssertTrue; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -25,6 +27,10 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.testing.ValidationAssertions.assertFailsValidation; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestGcsFileSystemConfig @@ -33,14 +39,19 @@ public class TestGcsFileSystemConfig void testDefaults() { assertRecordedDefaults(recordDefaults(GcsFileSystemConfig.class) - .setReadBlockSize(DataSize.of(2, DataSize.Unit.MEGABYTE)) - .setWriteBlockSize(DataSize.of(16, DataSize.Unit.MEGABYTE)) + .setReadBlockSize(DataSize.of(2, MEGABYTE)) + .setWriteBlockSize(DataSize.of(16, MEGABYTE)) .setPageSize(100) .setBatchSize(100) .setProjectId(null) .setUseGcsAccessToken(false) .setJsonKey(null) - .setJsonKeyFilePath(null)); + .setJsonKeyFilePath(null) + .setMaxRetries(20) + .setBackoffScaleFactor(2.0) + .setMaxRetryTime(new Duration(20, SECONDS)) + .setMinBackoffDelay(new Duration(10, MILLISECONDS)) + .setMaxBackoffDelay(new Duration(1100, MILLISECONDS))); } @Test @@ -58,17 +69,27 @@ void testExplicitPropertyMappings() .put("gcs.use-access-token", "true") .put("gcs.json-key", "{}") .put("gcs.json-key-file-path", jsonKeyFile.toString()) + .put("gcs.client.max-retries", "10") + .put("gcs.client.backoff-scale-factor", "3.0") + .put("gcs.client.max-retry-time", "10s") + .put("gcs.client.min-backoff-delay", "20ms") + .put("gcs.client.max-backoff-delay", "20ms") .buildOrThrow(); GcsFileSystemConfig expected = new GcsFileSystemConfig() - .setReadBlockSize(DataSize.of(51, DataSize.Unit.MEGABYTE)) - .setWriteBlockSize(DataSize.of(52, DataSize.Unit.MEGABYTE)) + .setReadBlockSize(DataSize.of(51, MEGABYTE)) + .setWriteBlockSize(DataSize.of(52, MEGABYTE)) .setPageSize(10) .setBatchSize(11) .setProjectId("project") .setUseGcsAccessToken(true) .setJsonKey("{}") - .setJsonKeyFilePath(jsonKeyFile.toString()); + .setJsonKeyFilePath(jsonKeyFile.toString()) + .setMaxRetries(10) + .setBackoffScaleFactor(3.0) + .setMaxRetryTime(new Duration(10, SECONDS)) + .setMinBackoffDelay(new Duration(20, MILLISECONDS)) + .setMaxBackoffDelay(new Duration(20, MILLISECONDS)); assertFullMapping(properties, expected); } @@ -95,5 +116,14 @@ public void testValidation() .setJsonKeyFilePath("/dev/null")::validate) .isInstanceOf(IllegalStateException.class) .hasMessage("'gcs.json-key' and 'gcs.json-key-file-path' cannot be both set"); + + assertFailsValidation( + new GcsFileSystemConfig() + .setJsonKey("{}") + .setMinBackoffDelay(new Duration(20, MILLISECONDS)) + .setMaxBackoffDelay(new Duration(19, MILLISECONDS)), + "retryDelayValid", + "gcs.client.min-backoff-delay must be less than or equal to gcs.client.max-backoff-delay", + AssertTrue.class); } } diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java index b9a056e487fc..852b30da274b 100644 --- a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java +++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java @@ -13,10 +13,16 @@ */ package io.trino.filesystem.gcs; +import io.trino.filesystem.TrinoOutputFile; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import java.io.IOException; +import java.io.OutputStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThatNoException; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class TestGcsFileSystemGcs @@ -28,4 +34,16 @@ void setup() { initialize(getRequiredEnvironmentVariable("GCP_CREDENTIALS_KEY")); } + + @Test + void testCreateFileRetry() + { + assertThatNoException().isThrownBy(() -> { + for (int i = 1; i <= 100; i++) { + TrinoOutputFile outputFile = getFileSystem().newOutputFile(getRootLocation().appendPath("testFile")); + try (OutputStream out = outputFile.createOrOverwrite()) { + out.write("test".getBytes(UTF_8)); + } + }}); + } }