diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle index d4dfd46f7451..4af856cc9ff3 100644 --- a/sdks/java/extensions/google-cloud-platform-core/build.gradle +++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle @@ -87,8 +87,32 @@ task integrationTestKms(type: Test) { } } +// Note that no runner is specified here, so tests running under this task should not be running +// pipelines. +task integrationTestNoKms(type: Test) { + group = "Verification" + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests-cmek' + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--project=${gcpProject}", + "--tempRoot=${gcpTempRoot}", + ]) + + // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date" + outputs.upToDateWhen { false } + + include '**/*IT.class' + maxParallelForks 4 + classpath = sourceSets.test.runtimeClasspath + testClassesDirs = sourceSets.test.output.classesDirs + useJUnit { + excludeCategories "org.apache.beam.sdk.testing.UsesKms" + } +} + task postCommit { group = "Verification" description = "Integration tests of GCP connectors using the DirectRunner." dependsOn integrationTestKms + dependsOn integrationTestNoKms } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 3c65f0fa748c..8cec680b98af 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -27,6 +27,7 @@ import com.google.api.services.cloudresourcemanager.CloudResourceManager; import com.google.api.services.cloudresourcemanager.model.Project; import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Bucket.SoftDeletePolicy; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -398,6 +399,12 @@ class GcpTempLocationFactory implements DefaultValueFactory { */ @VisibleForTesting static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) { + return tryCreateDefaultBucketWithPrefix(options, crmClient, "dataflow-staging-"); + } + + @VisibleForTesting + static String tryCreateDefaultBucketWithPrefix( + PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) { GcsOptions gcsOptions = options.as(GcsOptions.class); checkArgument( @@ -419,9 +426,18 @@ static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManag if (!isNullOrEmpty(gcsOptions.getZone())) { region = getRegionFromZone(gcsOptions.getZone()); } - final String bucketName = "dataflow-staging-" + region + "-" + projectNumber; + final String bucketName = bucketNamePrefix + region + "-" + projectNumber; LOG.info("No tempLocation specified, attempting to use default bucket: {}", bucketName); - Bucket bucket = new Bucket().setName(bucketName).setLocation(region); + + // Disable soft delete policy for a bucket. + // Reference: https://cloud.google.com/storage/docs/soft-delete + SoftDeletePolicy softDeletePolicy = new SoftDeletePolicy().setRetentionDurationSeconds(0L); + + Bucket bucket = + new Bucket() + .setName(bucketName) + .setLocation(region) + .setSoftDeletePolicy(softDeletePolicy); // Always try to create the bucket before checking access, so that we do not // race with other pipelines that may be attempting to do the same thing. try { diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 0338323bb0aa..60e8443d2640 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -652,6 +652,17 @@ public void createBucket(String projectId, Bucket bucket) throws IOException { createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT); } + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ + @Nullable + public Bucket getBucket(GcsPath path) throws IOException { + return getBucket(path, createBackOff(), Sleeper.DEFAULT); + } + + /** Remove an empty {@link Bucket} in Cloud Storage or propagates an exception. */ + public void removeBucket(Bucket bucket) throws IOException { + removeBucket(bucket, createBackOff(), Sleeper.DEFAULT); + } + /** * Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due * to permissions. @@ -753,6 +764,40 @@ public boolean shouldRetry(IOException e) { } } + @VisibleForTesting + void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Buckets.Delete getBucket = storageClient.buckets().delete(bucket.getName()); + + try { + ResilientOperation.retry( + getBucket::execute, + backoff, + new RetryDeterminer() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); + } + }, + IOException.class, + sleeper); + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); + } + if (errorExtractor.itemNotFound(e)) { + throw new FileNotFoundException(e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format("Error while attempting to remove bucket gs://%s", bucket.getName()), e); + } + } + private static void executeBatches(List batches) throws IOException { ExecutorService executor = MoreExecutors.listeningDecorator( diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java new file mode 100644 index 000000000000..023c6a9788c6 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.storage.model.Bucket; +import java.io.IOException; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; +import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration tests for {@link GcpOptions}. These tests are designed to run against production + * Google Cloud Storage. + * + *

This is a runnerless integration test, even though the Beam IT framework assumes one. Thus, + * this test should only be run against single runner (such as DirectRunner). + */ +@RunWith(JUnit4.class) +public class GcpOptionsIT { + /** Tests the creation of a default bucket in a project. */ + @Test + public void testCreateDefaultBucket() throws IOException { + TestPipelineOptions options = + TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + + CloudResourceManager crmClient = + GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient( + options.as(CloudResourceManagerOptions.class)) + .build(); + + GcsOptions gcsOptions = options.as(GcsOptions.class); + GcsUtil gcsUtil = gcsOptions.getGcsUtil(); + + String tempLocation = + GcpOptions.GcpTempLocationFactory.tryCreateDefaultBucketWithPrefix( + options, crmClient, "gcp-options-it-"); + + GcsPath gcsPath = GcsPath.fromUri(tempLocation); + System.out.println(gcsPath); + + Bucket bucket = gcsUtil.getBucket(gcsPath); + assertNotNull(bucket); + // verify the soft delete policy is disabled + assertEquals(bucket.getSoftDeletePolicy().getRetentionDurationSeconds(), Long.valueOf(0L)); + + gcsUtil.removeBucket(bucket); + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java index bf30b4c030e2..a182f0ab82cf 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java @@ -21,10 +21,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.api.services.cloudresourcemanager.CloudResourceManager; @@ -56,6 +59,7 @@ import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -230,6 +234,14 @@ public void testCreateBucket() throws Exception { String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); assertEquals("gs://dataflow-staging-us-north1-1/temp/", bucket); + + ArgumentCaptor bucketArg = ArgumentCaptor.forClass(Bucket.class); + verify(mockGcsUtil, times(1)).createBucket(anyString(), bucketArg.capture()); + + // verify that the soft delete policy is disabled in the default bucket + assertEquals( + bucketArg.getValue().getSoftDeletePolicy().getRetentionDurationSeconds(), + Long.valueOf(0L)); } @Test