Skip to content

Commit

Permalink
Configure retries for trino-filesystem-gcs
Browse files Browse the repository at this point in the history
  • Loading branch information
elonazoulay committed Dec 10, 2023
1 parent da9a306 commit 2cefc7c
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 7 deletions.
11 changes: 11 additions & 0 deletions lib/trino-filesystem-gcs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>org.threeten</groupId>
<artifactId>threetenbp</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand All @@ -141,6 +146,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.configuration.validation.FileExists;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

Expand All @@ -37,6 +42,12 @@ public class GcsFileSystemConfig
private boolean useGcsAccessToken;
private String jsonKey;
private String jsonKeyFilePath;
private int maxRetries = 20;
private double backoffScaleFactor = 2.0;
private Duration maxRetryTime = new Duration(20, TimeUnit.SECONDS);
private Duration minBackoffDelay = new Duration(10, TimeUnit.MILLISECONDS);
// Note: there is no benefit to setting this much higher as the rpc quota is 1x per second: https://cloud.google.com/storage/docs/retry-strategy#java
private Duration maxBackoffDelay = new Duration(1100, TimeUnit.MILLISECONDS);

@NotNull
public DataSize getReadBlockSize()
Expand Down Expand Up @@ -148,6 +159,84 @@ public GcsFileSystemConfig setJsonKeyFilePath(String jsonKeyFilePath)
return this;
}

@Min(0)
public int getMaxRetries()
{
return maxRetries;
}

@Config("gcs.client.max-retries")
@ConfigDescription("Maximum number of RPC attempts")
public GcsFileSystemConfig setMaxRetries(int maxRetries)
{
this.maxRetries = maxRetries;
return this;
}

@Min(1)
public double getBackoffScaleFactor()
{
return backoffScaleFactor;
}

@Config("gcs.client.backoff-scale-factor")
@ConfigDescription("Scale factor for RPC retry delay")
public GcsFileSystemConfig setBackoffScaleFactor(double backoffScaleFactor)
{
this.backoffScaleFactor = backoffScaleFactor;
return this;
}

@NotNull
public Duration getMaxRetryTime()
{
return maxRetryTime;
}

@Config("gcs.client.max-retry-time")
@ConfigDescription("Total time limit for an RPC to be retried")
public GcsFileSystemConfig setMaxRetryTime(Duration maxRetryTime)
{
this.maxRetryTime = maxRetryTime;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getMinBackoffDelay()
{
return minBackoffDelay;
}

@Config("gcs.client.min-backoff-delay")
@ConfigDescription("Minimum delay between RPC retries")
public GcsFileSystemConfig setMinBackoffDelay(Duration minBackoffDelay)
{
this.minBackoffDelay = minBackoffDelay;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getMaxBackoffDelay()
{
return maxBackoffDelay;
}

@Config("gcs.client.max-backoff-delay")
@ConfigDescription("Maximum delay between RPC retries.")
public GcsFileSystemConfig setMaxBackoffDelay(Duration maxBackoffDelay)
{
this.maxBackoffDelay = maxBackoffDelay;
return this;
}

@AssertTrue(message = "gcs.client.min-backoff-delay must be less than or equal to gcs.client.max-backoff-delay")
public boolean isRetryDelayValid()
{
return minBackoffDelay.compareTo(maxBackoffDelay) <= 0;
}

public void validate()
{
// This cannot be normal validation, as it would make it impossible to write TestGcsFileSystemConfig.testExplicitPropertyMappings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
*/
package io.trino.filesystem.gcs;

import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.spi.security.ConnectorIdentity;
import org.threeten.bp.Duration;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
Expand All @@ -29,6 +31,7 @@
import java.util.List;
import java.util.Optional;

import static com.google.cloud.storage.StorageRetryStrategy.getUniformStorageRetryStrategy;
import static com.google.common.base.Strings.nullToEmpty;
import static java.nio.charset.StandardCharsets.UTF_8;

Expand All @@ -39,6 +42,11 @@ public class GcsStorageFactory
private final String projectId;
private final boolean useGcsAccessToken;
private final Optional<GoogleCredentials> jsonGoogleCredential;
private final int maxRetries;
private final double backoffScaleFactor;
private final Duration maxRetryTime;
private final Duration minBackoffDelay;
private final Duration maxBackoffDelay;

@Inject
public GcsStorageFactory(GcsFileSystemConfig config)
Expand All @@ -62,6 +70,12 @@ else if (jsonKeyFilePath != null) {
else {
jsonGoogleCredential = Optional.empty();
}
this.maxRetries = config.getMaxRetries();
this.backoffScaleFactor = config.getBackoffScaleFactor();
// To avoid name collision by importing io.airlift.Duration
this.maxRetryTime = Duration.ofMillis(config.getMaxRetryTime().toMillis());
this.minBackoffDelay = Duration.ofMillis(config.getMinBackoffDelay().toMillis());
this.maxBackoffDelay = Duration.ofMillis(config.getMaxBackoffDelay().toMillis());
}

public Storage create(ConnectorIdentity identity)
Expand All @@ -81,7 +95,20 @@ public Storage create(ConnectorIdentity identity)
if (projectId != null) {
storageOptionsBuilder.setProjectId(projectId);
}
return storageOptionsBuilder.setCredentials(credentials).build().getService();
// Note: without uniform strategy we cannot retry idempotent operations.
// The trino-filesystem api does not violate the conditions for idempotency, see https://cloud.google.com/storage/docs/retry-strategy#java for details.
return storageOptionsBuilder
.setCredentials(credentials)
.setStorageRetryStrategy(getUniformStorageRetryStrategy())
.setRetrySettings(RetrySettings.newBuilder()
.setMaxAttempts(maxRetries + 1)
.setRetryDelayMultiplier(backoffScaleFactor)
.setTotalTimeout(maxRetryTime)
.setInitialRetryDelay(minBackoffDelay)
.setMaxRetryDelay(maxBackoffDelay)
.build())
.build()
.getService();
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import jakarta.validation.constraints.AssertTrue;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand All @@ -25,6 +27,10 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.testing.ValidationAssertions.assertFailsValidation;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestGcsFileSystemConfig
Expand All @@ -33,14 +39,19 @@ public class TestGcsFileSystemConfig
void testDefaults()
{
assertRecordedDefaults(recordDefaults(GcsFileSystemConfig.class)
.setReadBlockSize(DataSize.of(2, DataSize.Unit.MEGABYTE))
.setWriteBlockSize(DataSize.of(16, DataSize.Unit.MEGABYTE))
.setReadBlockSize(DataSize.of(2, MEGABYTE))
.setWriteBlockSize(DataSize.of(16, MEGABYTE))
.setPageSize(100)
.setBatchSize(100)
.setProjectId(null)
.setUseGcsAccessToken(false)
.setJsonKey(null)
.setJsonKeyFilePath(null));
.setJsonKeyFilePath(null)
.setMaxRetries(20)
.setBackoffScaleFactor(2.0)
.setMaxRetryTime(new Duration(20, SECONDS))
.setMinBackoffDelay(new Duration(10, MILLISECONDS))
.setMaxBackoffDelay(new Duration(1100, MILLISECONDS)));
}

@Test
Expand All @@ -58,17 +69,27 @@ void testExplicitPropertyMappings()
.put("gcs.use-access-token", "true")
.put("gcs.json-key", "{}")
.put("gcs.json-key-file-path", jsonKeyFile.toString())
.put("gcs.client.max-retries", "10")
.put("gcs.client.backoff-scale-factor", "3.0")
.put("gcs.client.max-retry-time", "10s")
.put("gcs.client.min-backoff-delay", "20ms")
.put("gcs.client.max-backoff-delay", "20ms")
.buildOrThrow();

GcsFileSystemConfig expected = new GcsFileSystemConfig()
.setReadBlockSize(DataSize.of(51, DataSize.Unit.MEGABYTE))
.setWriteBlockSize(DataSize.of(52, DataSize.Unit.MEGABYTE))
.setReadBlockSize(DataSize.of(51, MEGABYTE))
.setWriteBlockSize(DataSize.of(52, MEGABYTE))
.setPageSize(10)
.setBatchSize(11)
.setProjectId("project")
.setUseGcsAccessToken(true)
.setJsonKey("{}")
.setJsonKeyFilePath(jsonKeyFile.toString());
.setJsonKeyFilePath(jsonKeyFile.toString())
.setMaxRetries(10)
.setBackoffScaleFactor(3.0)
.setMaxRetryTime(new Duration(10, SECONDS))
.setMinBackoffDelay(new Duration(20, MILLISECONDS))
.setMaxBackoffDelay(new Duration(20, MILLISECONDS));
assertFullMapping(properties, expected);
}

Expand All @@ -95,5 +116,14 @@ public void testValidation()
.setJsonKeyFilePath("/dev/null")::validate)
.isInstanceOf(IllegalStateException.class)
.hasMessage("'gcs.json-key' and 'gcs.json-key-file-path' cannot be both set");

assertFailsValidation(
new GcsFileSystemConfig()
.setJsonKey("{}")
.setMinBackoffDelay(new Duration(20, MILLISECONDS))
.setMaxBackoffDelay(new Duration(19, MILLISECONDS)),
"retryDelayValid",
"gcs.client.min-backoff-delay must be less than or equal to gcs.client.max-backoff-delay",
AssertTrue.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@
*/
package io.trino.filesystem.gcs;

import io.trino.filesystem.TrinoOutputFile;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.io.IOException;
import java.io.OutputStream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThatNoException;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestGcsFileSystemGcs
Expand All @@ -28,4 +34,16 @@ void setup()
{
initialize(getRequiredEnvironmentVariable("GCP_CREDENTIALS_KEY"));
}

@Test
void testCreateFileRetry()
{
assertThatNoException().isThrownBy(() -> {
for (int i = 1; i <= 100; i++) {
TrinoOutputFile outputFile = getFileSystem().newOutputFile(getRootLocation().appendPath("testFile"));
try (OutputStream out = outputFile.createOrOverwrite()) {
out.write("test".getBytes(UTF_8));
}
}});
}
}

0 comments on commit 2cefc7c

Please sign in to comment.