From cdfce6acc4e8364f90360591e710e82b1d7b1111 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Fri, 9 Sep 2016 10:06:16 +0200 Subject: [PATCH] Retry calls that open resumable upload in WriteChannel (#1233) --- .../cloud/bigquery/TableDataWriteChannel.java | 18 ++++++++++- .../bigquery/TableDataWriteChannelTest.java | 27 +++++++++++++++-- .../cloud/storage/BlobWriteChannel.java | 17 ++++++++++- .../cloud/storage/BlobWriteChannelTest.java | 30 ++++++++++++++++--- 4 files changed, 84 insertions(+), 8 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java index d56358f86e98..5c3d9c1fb0be 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/TableDataWriteChannel.java @@ -24,6 +24,8 @@ import com.google.cloud.RetryHelper; import com.google.cloud.WriteChannel; +import java.util.concurrent.Callable; + /** * WriteChannel implementation to stream data into a BigQuery table. */ @@ -31,7 +33,7 @@ class TableDataWriteChannel extends BaseWriteChannel() { + @Override + public String call() { + return options.rpc().open(writeChannelConfiguration.toPb()); + } + }, options.retryParams(), BigQueryImpl.EXCEPTION_HANDLER, options.clock()); + } catch (RetryHelper.RetryHelperException e) { + throw BigQueryException.translateAndThrow(e); + } + } + static class StateImpl extends BaseWriteChannel.BaseState { diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java index 646d82603950..95c1c90a3b2e 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/TableDataWriteChannelTest.java @@ -39,9 +39,12 @@ import org.easymock.CaptureType; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; +import java.net.SocketException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Random; @@ -63,6 +66,9 @@ public class TableDataWriteChannelTest { private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE; private static final Random RANDOM = new Random(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + private BigQueryOptions options; private BigQueryRpcFactory rpcFactoryMock; private BigQueryRpc bigqueryRpcMock; @@ -72,8 +78,7 @@ public class TableDataWriteChannelTest { public void setUp() { rpcFactoryMock = createMock(BigQueryRpcFactory.class); bigqueryRpcMock = createMock(BigQueryRpc.class); - expect(rpcFactoryMock.create(anyObject(BigQueryOptions.class))) - .andReturn(bigqueryRpcMock); + expect(rpcFactoryMock.create(anyObject(BigQueryOptions.class))).andReturn(bigqueryRpcMock); replay(rpcFactoryMock); options = BigQueryOptions.builder() .projectId("projectid") @@ -94,6 +99,24 @@ public void testCreate() { assertTrue(writer.isOpen()); } + @Test + public void testCreateRetryableError() { + BigQueryException exception = new BigQueryException(new SocketException("Socket closed")); + expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andThrow(exception); + expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); + replay(bigqueryRpcMock); + writer = new TableDataWriteChannel(options, LOAD_CONFIGURATION); + assertTrue(writer.isOpen()); + } + + @Test + public void testCreateNonRetryableError() { + expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andThrow(new RuntimeException()); + replay(bigqueryRpcMock); + thrown.expect(RuntimeException.class); + new TableDataWriteChannel(options, LOAD_CONFIGURATION); + } + @Test public void testWriteWithoutFlush() throws IOException { expect(bigqueryRpcMock.open(LOAD_CONFIGURATION.toPb())).andReturn(UPLOAD_ID); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java index 9765b8fa1bb7..b6687037f0b9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java @@ -26,6 +26,7 @@ import com.google.cloud.storage.spi.StorageRpc; import java.util.Map; +import java.util.concurrent.Callable; /** * Write channel implementation to upload Google Cloud Storage blobs. @@ -33,7 +34,7 @@ class BlobWriteChannel extends BaseWriteChannel { BlobWriteChannel(StorageOptions options, BlobInfo blob, Map optionsMap) { - this(options, blob, options.rpc().open(blob.toPb(), optionsMap)); + this(options, blob, open(options, blob, optionsMap)); } BlobWriteChannel(StorageOptions options, BlobInfo blobInfo, String uploadId) { @@ -58,6 +59,20 @@ protected StateImpl.Builder stateBuilder() { return StateImpl.builder(options(), entity(), uploadId()); } + private static String open(final StorageOptions options, final BlobInfo blob, + final Map optionsMap) { + try { + return runWithRetries(new Callable() { + @Override + public String call() { + return options.rpc().open(blob.toPb(), optionsMap); + } + }, options.retryParams(), StorageImpl.EXCEPTION_HANDLER, options.clock()); + } catch (RetryHelper.RetryHelperException e) { + throw StorageException.translateAndThrow(e); + } + } + static class StateImpl extends BaseWriteChannel.BaseState { private static final long serialVersionUID = -9028324143780151286L; diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java index 27d55d8b686c..e341a3e4f067 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java @@ -31,7 +31,6 @@ import static org.junit.Assert.fail; import com.google.cloud.RestorableState; -import com.google.cloud.RetryParams; import com.google.cloud.WriteChannel; import com.google.cloud.storage.spi.StorageRpc; import com.google.cloud.storage.spi.StorageRpcFactory; @@ -41,9 +40,12 @@ import org.easymock.CaptureType; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; +import java.net.SocketException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; @@ -61,6 +63,9 @@ public class BlobWriteChannelTest { private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE; private static final Random RANDOM = new Random(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + private StorageOptions options; private StorageRpcFactory rpcFactoryMock; private StorageRpc storageRpcMock; @@ -70,13 +75,11 @@ public class BlobWriteChannelTest { public void setUp() { rpcFactoryMock = createMock(StorageRpcFactory.class); storageRpcMock = createMock(StorageRpc.class); - expect(rpcFactoryMock.create(anyObject(StorageOptions.class))) - .andReturn(storageRpcMock); + expect(rpcFactoryMock.create(anyObject(StorageOptions.class))).andReturn(storageRpcMock); replay(rpcFactoryMock); options = StorageOptions.builder() .projectId("projectid") .serviceRpcFactory(rpcFactoryMock) - .retryParams(RetryParams.noRetries()) .build(); } @@ -93,6 +96,25 @@ public void testCreate() { assertTrue(writer.isOpen()); } + @Test + public void testCreateRetryableError() { + StorageException exception = new StorageException(new SocketException("Socket closed")); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andThrow(exception); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertTrue(writer.isOpen()); + } + + @Test + public void testCreateNonRetryableError() { + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)) + .andThrow(new RuntimeException()); + replay(storageRpcMock); + thrown.expect(RuntimeException.class); + new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + } + @Test public void testWriteWithoutFlush() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);