Skip to content

Commit

Permalink
Disable soft delete policy when creating a default bucket for a project.
Browse files Browse the repository at this point in the history
Also, getBucket() and removeBucket() are added into GcsUtil.
  • Loading branch information
shunping committed May 17, 2024
1 parent 2196758 commit 8f2d2c6
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 2 deletions.
24 changes: 24 additions & 0 deletions sdks/java/extensions/google-cloud-platform-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,32 @@ task integrationTestKms(type: Test) {
}
}

// Note that no runner is specified here, so tests running under this task should not be running
// pipelines.
task integrationTestNoKms(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests-cmek'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
outputs.upToDateWhen { false }

include '**/*IT.class'
maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
useJUnit {
excludeCategories "org.apache.beam.sdk.testing.UsesKms"
}
}

task postCommit {
group = "Verification"
description = "Integration tests of GCP connectors using the DirectRunner."
dependsOn integrationTestKms
dependsOn integrationTestNoKms
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.cloudresourcemanager.CloudResourceManager;
import com.google.api.services.cloudresourcemanager.model.Project;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Bucket.SoftDeletePolicy;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
Expand Down Expand Up @@ -398,6 +399,12 @@ class GcpTempLocationFactory implements DefaultValueFactory<String> {
*/
@VisibleForTesting
static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) {
return tryCreateDefaultBucketWithPrefix(options, crmClient, "dataflow-staging-");
}

@VisibleForTesting
static String tryCreateDefaultBucketWithPrefix(
PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) {
GcsOptions gcsOptions = options.as(GcsOptions.class);

checkArgument(
Expand All @@ -419,9 +426,18 @@ static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManag
if (!isNullOrEmpty(gcsOptions.getZone())) {
region = getRegionFromZone(gcsOptions.getZone());
}
final String bucketName = "dataflow-staging-" + region + "-" + projectNumber;
final String bucketName = bucketNamePrefix + region + "-" + projectNumber;
LOG.info("No tempLocation specified, attempting to use default bucket: {}", bucketName);
Bucket bucket = new Bucket().setName(bucketName).setLocation(region);

// Disable soft delete policy for a bucket.
// Reference: https://cloud.google.com/storage/docs/soft-delete
SoftDeletePolicy softDeletePolicy = new SoftDeletePolicy().setRetentionDurationSeconds(0L);

Bucket bucket =
new Bucket()
.setName(bucketName)
.setLocation(region)
.setSoftDeletePolicy(softDeletePolicy);
// Always try to create the bucket before checking access, so that we do not
// race with other pipelines that may be attempting to do the same thing.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,17 @@ public void createBucket(String projectId, Bucket bucket) throws IOException {
createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT);
}

/** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */
@Nullable
public Bucket getBucket(GcsPath path) throws IOException {
return getBucket(path, createBackOff(), Sleeper.DEFAULT);
}

/** Remove an empty {@link Bucket} in Cloud Storage or propagates an exception. */
public void removeBucket(Bucket bucket) throws IOException {
removeBucket(bucket, createBackOff(), Sleeper.DEFAULT);
}

/**
* Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due
* to permissions.
Expand Down Expand Up @@ -753,6 +764,40 @@ public boolean shouldRetry(IOException e) {
}
}

@VisibleForTesting
void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException {
Storage.Buckets.Delete getBucket = storageClient.buckets().delete(bucket.getName());

try {
ResilientOperation.retry(
getBucket::execute,
backoff,
new RetryDeterminer<IOException>() {
@Override
public boolean shouldRetry(IOException e) {
if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
return false;
}
return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
}
},
IOException.class,
sleeper);
} catch (GoogleJsonResponseException e) {
if (errorExtractor.accessDenied(e)) {
throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
}
if (errorExtractor.itemNotFound(e)) {
throw new FileNotFoundException(e.getMessage());
}
throw e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
String.format("Error while attempting to remove bucket gs://%s", bucket.getName()), e);
}
}

private static void executeBatches(List<BatchInterface> batches) throws IOException {
ExecutorService executor =
MoreExecutors.listeningDecorator(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.gcp.options;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.google.api.services.cloudresourcemanager.CloudResourceManager;
import com.google.api.services.storage.model.Bucket;
import java.io.IOException;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Integration tests for {@link GcpOptions}. These tests are designed to run against production
* Google Cloud Storage.
*
* <p>This is a runnerless integration test, even though the Beam IT framework assumes one. Thus,
* this test should only be run against single runner (such as DirectRunner).
*/
@RunWith(JUnit4.class)
public class GcpOptionsIT {
/** Tests the creation of a default bucket in a project. */
@Test
public void testCreateDefaultBucket() throws IOException {
TestPipelineOptions options =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);

CloudResourceManager crmClient =
GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient(
options.as(CloudResourceManagerOptions.class))
.build();

GcsOptions gcsOptions = options.as(GcsOptions.class);
GcsUtil gcsUtil = gcsOptions.getGcsUtil();

String tempLocation =
GcpOptions.GcpTempLocationFactory.tryCreateDefaultBucketWithPrefix(
options, crmClient, "gcp-options-it-");

GcsPath gcsPath = GcsPath.fromUri(tempLocation);
System.out.println(gcsPath);

Bucket bucket = gcsUtil.getBucket(gcsPath);
assertNotNull(bucket);
// verify the soft delete policy is disabled
assertEquals(bucket.getSoftDeletePolicy().getRetentionDurationSeconds(), Long.valueOf(0L));

gcsUtil.removeBucket(bucket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.services.cloudresourcemanager.CloudResourceManager;
Expand Down Expand Up @@ -56,6 +59,7 @@
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand Down Expand Up @@ -230,6 +234,14 @@ public void testCreateBucket() throws Exception {

String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
assertEquals("gs://dataflow-staging-us-north1-1/temp/", bucket);

ArgumentCaptor<Bucket> bucketArg = ArgumentCaptor.forClass(Bucket.class);
verify(mockGcsUtil, times(1)).createBucket(anyString(), bucketArg.capture());

// verify that the soft delete policy is disabled in the default bucket
assertEquals(
bucketArg.getValue().getSoftDeletePolicy().getRetentionDurationSeconds(),
Long.valueOf(0L));
}

@Test
Expand Down

0 comments on commit 8f2d2c6

Please sign in to comment.