Skip to content

Commit

Permalink
Wrapped continuation token with Exception when recursive acl call is … (
Browse files Browse the repository at this point in the history
  • Loading branch information
gapra-msft authored Oct 1, 2020
1 parent 44743b1 commit e266b8c
Show file tree
Hide file tree
Showing 12 changed files with 1,658 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@
import com.azure.storage.file.datalake.implementation.models.SourceModifiedAccessConditions;
import com.azure.storage.file.datalake.implementation.util.DataLakeImplUtils;
import com.azure.storage.file.datalake.implementation.util.DataLakeSasImplUtil;
import com.azure.storage.file.datalake.implementation.util.ModelHelper;
import com.azure.storage.file.datalake.implementation.util.TransformUtils;
import com.azure.storage.file.datalake.models.AccessControlChangeCounters;
import com.azure.storage.file.datalake.models.AccessControlChangeFailure;
import com.azure.storage.file.datalake.models.AccessControlChangeResult;
import com.azure.storage.file.datalake.models.AccessControlChanges;
import com.azure.storage.file.datalake.models.DataLakeAclChangeFailedException;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.PathAccessControl;
import com.azure.storage.file.datalake.models.PathAccessControlEntry;
import com.azure.storage.file.datalake.models.PathHttpHeaders;
Expand Down Expand Up @@ -666,6 +669,9 @@ Mono<Response<PathInfo>> setAccessControlWithResponse(List<PathAccessControlEntr
*
* @param accessControlList The POSIX access control list for the file or directory.
* @return A reactive response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Mono<AccessControlChangeResult> setAccessControlRecursive(List<PathAccessControlEntry> accessControlList) {
try {
Expand All @@ -688,6 +694,9 @@ public Mono<AccessControlChangeResult> setAccessControlRecursive(List<PathAccess
*
* @param options {@link PathSetAccessControlRecursiveOptions}
* @return A reactive response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Mono<Response<AccessControlChangeResult>> setAccessControlRecursiveWithResponse(
PathSetAccessControlRecursiveOptions options) {
Expand All @@ -714,9 +723,12 @@ public Mono<Response<AccessControlChangeResult>> setAccessControlRecursiveWithRe
*
* @param accessControlList The POSIX access control list for the file or directory.
* @return A reactive response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Mono<AccessControlChangeResult> updateAccessControlRecursive(List<PathAccessControlEntry> accessControlList)
{
public Mono<AccessControlChangeResult> updateAccessControlRecursive(
List<PathAccessControlEntry> accessControlList) {
try {
return updateAccessControlRecursiveWithResponse(
new PathUpdateAccessControlRecursiveOptions(accessControlList))
Expand All @@ -738,6 +750,9 @@ public Mono<AccessControlChangeResult> updateAccessControlRecursive(List<PathAcc
*
* @param options {@link PathUpdateAccessControlRecursiveOptions}
* @return A reactive response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Mono<Response<AccessControlChangeResult>> updateAccessControlRecursiveWithResponse(
PathUpdateAccessControlRecursiveOptions options) {
Expand All @@ -764,6 +779,9 @@ public Mono<Response<AccessControlChangeResult>> updateAccessControlRecursiveWit
*
* @param accessControlList The POSIX access control list for the file or directory.
* @return A reactive response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Mono<AccessControlChangeResult> removeAccessControlRecursive(
List<PathRemoveAccessControlEntry> accessControlList) {
Expand All @@ -788,6 +806,9 @@ public Mono<AccessControlChangeResult> removeAccessControlRecursive(
*
* @param options {@link PathRemoveAccessControlRecursiveOptions}
* @return A reactive response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Mono<Response<AccessControlChangeResult>> removeAccessControlRecursiveWithResponse(
PathRemoveAccessControlRecursiveOptions options) {
Expand Down Expand Up @@ -818,6 +839,15 @@ Mono<Response<AccessControlChangeResult>> setAccessControlRecursiveWithResponse(

return this.dataLakeStorage.paths().setAccessControlRecursiveWithRestResponseAsync(mode, null,
continuationToken, continueOnFailure, batchSize, accessControlList, null, contextFinal)
.onErrorMap(e -> {
if (e instanceof DataLakeStorageException) {
return logger.logExceptionAsError(ModelHelper.changeAclRequestFailed((DataLakeStorageException) e,
continuationToken));
} else if (e instanceof Exception) {
return logger.logExceptionAsError(ModelHelper.changeAclFailed((Exception) e, continuationToken));
}
return e;
})
.flatMap(response -> setAccessControlRecursiveWithResponseHelper(response, maxBatches,
directoriesSuccessfulCount, filesSuccessfulCount, failureCount, batchesCount, progressHandler,
accessControlList, mode, batchSize, continueOnFailure, continuationToken, null, contextFinal));
Expand Down Expand Up @@ -921,6 +951,15 @@ Determine if we are finished either because there is no new continuation (failur
// If we're not finished, issue another request
return this.dataLakeStorage.paths().setAccessControlRecursiveWithRestResponseAsync(mode, null,
effectiveNextToken, continueOnFailure, batchSize, accessControlStr, null, context)
.onErrorMap(e -> {
if (e instanceof DataLakeStorageException) {
return logger.logExceptionAsError(ModelHelper.changeAclRequestFailed((DataLakeStorageException) e,
effectiveNextToken));
} else if (e instanceof Exception) {
return logger.logExceptionAsError(ModelHelper.changeAclFailed((Exception) e, effectiveNextToken));
}
return e;
})
.flatMap(response2 -> setAccessControlRecursiveWithResponseHelper(response2, maxBatches,
directoriesSuccessfulCount, filesSuccessfulCount, failureCount, batchesCount, progressHandler,
accessControlStr, mode, batchSize, continueOnFailure, effectiveNextToken, finalBatchFailures, context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.azure.storage.file.datalake.implementation.models.SourceModifiedAccessConditions;
import com.azure.storage.file.datalake.implementation.util.DataLakeImplUtils;
import com.azure.storage.file.datalake.models.AccessControlChangeResult;
import com.azure.storage.file.datalake.models.DataLakeAclChangeFailedException;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.PathAccessControl;
import com.azure.storage.file.datalake.models.PathAccessControlEntry;
import com.azure.storage.file.datalake.models.PathHttpHeaders;
Expand Down Expand Up @@ -372,6 +374,9 @@ public Response<PathInfo> setPermissionsWithResponse(PathPermissions permissions
*
* @param accessControlList The POSIX access control list for the file or directory.
* @return The result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public AccessControlChangeResult setAccessControlRecursive(List<PathAccessControlEntry> accessControlList) {
return setAccessControlRecursiveWithResponse(new PathSetAccessControlRecursiveOptions(accessControlList), null,
Expand All @@ -392,6 +397,9 @@ public AccessControlChangeResult setAccessControlRecursive(List<PathAccessContro
* @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Response<AccessControlChangeResult> setAccessControlRecursiveWithResponse(
PathSetAccessControlRecursiveOptions options, Duration timeout, Context context) {
Expand All @@ -416,6 +424,9 @@ public Response<AccessControlChangeResult> setAccessControlRecursiveWithResponse
*
* @param accessControlList The POSIX access control list for the file or directory.
* @return The result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public AccessControlChangeResult updateAccessControlRecursive(List<PathAccessControlEntry> accessControlList) {
return updateAccessControlRecursiveWithResponse(new PathUpdateAccessControlRecursiveOptions(accessControlList),
Expand All @@ -436,6 +447,9 @@ public AccessControlChangeResult updateAccessControlRecursive(List<PathAccessCon
* @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Response<AccessControlChangeResult> updateAccessControlRecursiveWithResponse(
PathUpdateAccessControlRecursiveOptions options, Duration timeout, Context context) {
Expand All @@ -460,6 +474,9 @@ public Response<AccessControlChangeResult> updateAccessControlRecursiveWithRespo
*
* @param accessControlList The POSIX access control list for the file or directory.
* @return The result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public AccessControlChangeResult removeAccessControlRecursive(
List<PathRemoveAccessControlEntry> accessControlList) {
Expand All @@ -481,6 +498,9 @@ public AccessControlChangeResult removeAccessControlRecursive(
* @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A response containing the result of the operation.
*
* @throws DataLakeAclChangeFailedException if a request to storage throws a
* {@link DataLakeStorageException} or a {@link Exception} to wrap the exception with the continuation token.
*/
public Response<AccessControlChangeResult> removeAccessControlRecursiveWithResponse(
PathRemoveAccessControlRecursiveOptions options, Duration timeout, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.file.datalake.models.DataLakeAclChangeFailedException;
import com.azure.storage.file.datalake.models.DataLakeStorageException;

/**
* This class provides helper methods for common model patterns.
Expand Down Expand Up @@ -76,4 +78,20 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
.setProgressReceiver(other.getProgressReceiver())
.setMaxSingleUploadSizeLong(maxSingleUploadSize);
}

public static DataLakeAclChangeFailedException changeAclRequestFailed(DataLakeStorageException e,
String continuationToken) {
String message = String.format("An error occurred while recursively changing the access control list. See the "
+ "exception of type %s with status=%s and error code=%s for more information. You can resume changing "
+ "the access control list using continuationToken=%s after addressing the error.", e.getClass(),
e.getStatusCode(), e.getErrorCode(), continuationToken);
return new DataLakeAclChangeFailedException(message, e, continuationToken);
}

public static DataLakeAclChangeFailedException changeAclFailed(Exception e, String continuationToken) {
String message = String.format("An error occurred while recursively changing the access control list. See the "
+ "exception of type %s for more information. You can resume changing the access control list using "
+ "continuationToken=%s after addressing the error.", e.getClass(), continuationToken);
return new DataLakeAclChangeFailedException(message, e, continuationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.models;

/**
* An exception thrown when an operation is interrupted and can be continued later on.
*/
public class DataLakeAclChangeFailedException extends RuntimeException {

private final String continuationToken;

/**
* Initializes a new instance of DataLakeAclChangeFailedException with a specified error message, and a reference
* to the inner exception that is the cause of this exception.
*
* @param message The message that describes the error.
* @param e The exception thrown.
* @param continuationToken The continuation token returned from the previous successful response.
*/
public DataLakeAclChangeFailedException(String message, Exception e, String continuationToken) {
super(message, e);
this.continuationToken = continuationToken;
}

/**
* Initializes a new instance of DataLakeAclChangeFailedException with a specified error message, HTTP status code,
* error code, and a reference to the inner exception that is the cause of this exception.
*
* @param message The message that describes the error.
* @param e The exception thrown from the failed request.
* @param continuationToken The continuation token returned from the previous successful response.
*/
public DataLakeAclChangeFailedException(String message, DataLakeStorageException e, String continuationToken) {
super(message, e);
this.continuationToken = continuationToken;
}

/**
* @return the continuation token to resume a datalake recursive acl function.
*/
public String getContinuationToken() {
return continuationToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.azure.storage.file.datalake

import com.azure.core.http.HttpClient
import com.azure.core.http.HttpHeaders
import com.azure.core.http.HttpMethod
import com.azure.core.http.HttpPipelineCallContext
import com.azure.core.http.HttpPipelineNextPolicy
import com.azure.core.http.HttpRequest
Expand Down Expand Up @@ -465,6 +466,24 @@ class APISpec extends Specification {
return builder.credential(credential).buildDirectoryClient()
}

DataLakeDirectoryClient getDirectoryClient(StorageSharedKeyCredential credential, String endpoint, String pathName, HttpPipelinePolicy... policies) {
DataLakePathClientBuilder builder = new DataLakePathClientBuilder()
.endpoint(endpoint)
.pathName(pathName)
.httpClient(getHttpClient())
.httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS))

for (HttpPipelinePolicy policy : policies) {
builder.addPolicy(policy)
}

if (testMode == TestMode.RECORD) {
builder.addPolicy(interceptorManager.getRecordPolicy())
}

return builder.credential(credential).buildDirectoryClient()
}

DataLakeDirectoryClient getDirectoryClient(String sasToken, String endpoint, String pathName) {
DataLakePathClientBuilder builder = new DataLakePathClientBuilder()
.endpoint(endpoint)
Expand Down Expand Up @@ -734,6 +753,14 @@ class APISpec extends Specification {
}
}

def getMockRequest() {
HttpHeaders headers = new HttpHeaders()
headers.put(Constants.HeaderConstants.CONTENT_ENCODING, "en-US")
URL url = new URL("http://devtest.blob.core.windows.net/test-container/test-blob")
HttpRequest request = new HttpRequest(HttpMethod.POST, url, headers, null)
return request
}

// Only sleep if test is running in live mode
def sleepIfRecord(long milliseconds) {
if (testMode != TestMode.PLAYBACK) {
Expand Down
Loading

0 comments on commit e266b8c

Please sign in to comment.