-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batching in Azure Storage Blobs #5693
Conversation
import java.util.function.Consumer; | ||
|
||
/** | ||
* This class provides |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...no javadoc
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobBatch.java
Outdated
Show resolved
Hide resolved
*/ | ||
public Response<Void> delete(String containerName, String blobName) { | ||
return deleteHelper(buildClient(containerName, blobName), null, null); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this adds an operation (but doesn't actually perform it), it seems weird that this returns Response<Void>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The planned implementation is a little round about where the submission will return a response that becomes populated once the batch submission returns. Could change the response type to a Future
to better indicate that this doesn't do anything yet and document that the batch must be submitted via submitBatch
before the response will have a value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Mono instead of Future would be preferable so we don't introduce a separate paradigm just for this. It might be ok to have this as an "async only" feature where sync customers who want to use it just have to block because I agree that there should be some sort of type-enforced means of communicating that the return value isn't valid until the batch completes.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobBatch.java
Outdated
Show resolved
Hide resolved
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobBatch.java
Outdated
Show resolved
Hide resolved
* | ||
* <p>If any request in a batch fails this will throw a {@link StorageException}.</p> | ||
* | ||
* <p><strong>Code Samples</strong></p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need code samples
* <p><strong>Code Samples</strong></p> | ||
* | ||
* @param batch Batch to submit. | ||
* @return An empty response indicating that the batch operation has completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document how the user gets the response information for each operation.
* | ||
* <p>If any request in a batch fails this will throw a {@link StorageException}.</p> | ||
* | ||
* <p><strong>Code Samples</strong></p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
EXPECTED_SET_TIER_STATUS_CODES); | ||
} | ||
|
||
private <T> Response<T> createAndReturnBatchOperation(Mono<Response<T>> response, int... expectedStatusCodes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to include expectedStatusCodes? Does that not get processed in RestProxy anymore in batch?
.flatMap(batchOperation -> batchOperation) | ||
.subscribe(); | ||
|
||
while (!disposable.isDisposed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not flux.blockLast?
} | ||
|
||
Disposable disposable = Flux.fromStream(batchOperationQueue.stream()) | ||
.flatMap(batchOperation -> batchOperation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you leave an inline comment that mentions the flatmap is just to force subscription on each of the inner requests? Otherwise it comes across as a bit cryptic to just map something to itself
* and it adds the header "Content-Id" that allows the request to be mapped to the response. | ||
*/ | ||
private Mono<HttpResponse> cleanseHeaders(HttpPipelineCallContext context, HttpPipelineNextPolicy next) { | ||
// Remove the "x-ms-header" as it shouldn't be included in the batch operation request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"x-ms-version"
} | ||
|
||
String getContentType() { | ||
return String.format("multipart/mixed; boundary=%s", batchBoundary); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be gathered with the other constants at the top?
batchRequest.add(ByteBuffer.wrap(batchRequestBuilder.toString().getBytes(StandardCharsets.UTF_8))); | ||
batchMapping.get(contentId).setRequest(request); | ||
|
||
return Mono.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a Mono is more appropriate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great Alan! I added a couple of comments and questions, but none of it absolutely has to be addressed before checking in.
Azure Blob storage is Microsoft's object storage solution for the cloud. Blob | ||
storage is optimized for storing massive amounts of unstructured data. | ||
Unstructured data is data that does not adhere to a particular data model or | ||
definition, such as text or binary data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - add something about batching. I have:
Azure Blob storage is Microsoft's object storage solution for the cloud. Blob
storage is optimized for storing massive amounts of unstructured data. This
library allows you to batch multiple Azure Blob Storage operations in a single request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this whole file could use some more Batch specifics. 😄
* | ||
* @param client {@link BlobServiceClient} used to construct the batch. | ||
*/ | ||
public BlobBatch(BlobServiceClient client) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - if you're adding BlobBatchClient, then maybe that should be taken here.
batchPipelineBuilder.policies(this::cleanseHeaders); | ||
} | ||
|
||
batchPipelineBuilder.policies(pipeline.getPolicy(i)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - just pass policy
.pipeline(pipeline) | ||
.buildAsyncClient(); | ||
|
||
this.batchOperationQueue = new ConcurrentLinkedDeque<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be Concurrent? In .NET our clients are thread safe but our models aren't. You need to synchronize access to the model if you want to share it across threads, but I'm not going to impose that cost on everyone else. BlobBatch
looks like a client, but it's really a model type.
I wouldn't bother changing this before you ship v1.
* @throws UnsupportedOperationException If this batch has already added an operation of another type. | ||
*/ | ||
public Response<Void> delete(String blobUrl) { | ||
return deleteHelper(getUrlPath(blobUrl), null, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just getting the path? Not the query as well? That might be okay because I guess your SAS is handled with a credential instead of being passed around on the URI.
setStatusCodeAndHeaders(batchOperationResponse, subResponseSections[1]); | ||
|
||
// The third section will contain the body. | ||
if (subResponseSections.length > 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
>=
?
continue; | ||
} | ||
|
||
if (line.startsWith("HTTP")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - should we just add a Boolean first = true
outside the loop and then reset it here? What if you get a header starting with HTTP
?
* | ||
* @param <T> The deserialized type of the response content, available from {@link #getValue()}. | ||
*/ | ||
public class BlobBatchOperationResponse<T> implements Response<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be public? in .NET we've made ours private and users only publicly see Response<T>
.
@@ -3,48 +3,45 @@ | |||
|
|||
package com.azure.storage.blob; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this autogenerated? Why the massive diff delta?
@@ -97,7 +98,7 @@ public ServicesImpl(AzureBlobStorageImpl client) { | |||
Mono<ServicesGetAccountInfoResponse> getAccountInfo(@HostParam("url") String url, @HeaderParam("x-ms-version") String version, @QueryParam("restype") String restype, @QueryParam("comp") String comp, Context context); | |||
|
|||
@Post("") | |||
@ExpectedResponses({200}) | |||
@ExpectedResponses({202}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you add a readme transform? Is it in your PR?
@@ -73,6 +73,7 @@ | |||
com.azure.security.keyvault.secrets, // FIXME this should not be a long-term solution | |||
com.azure.storage.common, // FIXME this should not be a long-term solution | |||
com.azure.storage.blob, // FIXME this should not be a long-term solution | |||
com.azure.storage.blob.batch, // FIXME this should not be a long-term solution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you file a tracking issue for all of these FIXMEs
Storage Blob to use Netty HTTP client. | ||
|
||
### Alternate HTTP client | ||
If, instead of Netty it is preferable to use OkHTTP, there is a HTTP client available for that too. Exclude the default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove comma after If
@@ -240,4 +246,48 @@ public void getAccountInfoWithResponse() { | |||
StorageAccountInfo accountInfo = client.getAccountInfoWithResponse(timeout, context).getValue(); | |||
// END: com.azure.storage.blob.BlobServiceClient.getAccountInfoWithResponse#Duration-Context | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like all of our batching tests are batching just 2 operations. Can we test with an N greater than 2 as well?
Closed in favor of #5734 |
Implements #4509, #4510, and #5161
Adds batching into Azure Storage Blobs and implements the batching APIs for Blob delete and setTier.