-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement unexpectlengthexception in azure core #5108
Changes from 36 commits
8a3e7dc
1d9f1cf
fffb585
89039fc
ee165be
d3d8c04
96f3b79
623d6e7
43ab40e
d162cd1
c71ceb4
0666ca4
823999d
b4b0282
a80346c
d91091b
25a1f7f
4ba94ba
05f3bb9
aae6689
f45166a
59211eb
98c7202
7ef2653
d8a819f
72d7075
9395346
8937136
e09b7ed
af3ef5a
72eff56
ec9f417
609e0b3
088eaeb
e0b1d6e
55a1b53
7a802f5
165a0b2
c63f931
f599930
3c34beb
dd1dbab
b88014e
942d7bc
9a9ed7c
9ee4bb7
78fff12
5e16f5a
645da4a
43b5581
8a38331
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
package com.azure.storage.blob; | ||
|
||
import com.azure.core.http.rest.Response; | ||
import com.azure.core.implementation.exception.UnexpectedLengthException; | ||
import com.azure.core.util.Context; | ||
import com.azure.storage.blob.models.AppendBlobAccessConditions; | ||
import com.azure.storage.blob.models.AppendBlobItem; | ||
|
@@ -14,6 +15,7 @@ | |
import com.azure.storage.blob.models.SourceModifiedAccessConditions; | ||
import com.azure.storage.blob.models.StorageException; | ||
import com.azure.storage.common.Utility; | ||
import java.util.Objects; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.core.scheduler.Schedulers; | ||
|
@@ -159,24 +161,14 @@ public AppendBlobItem appendBlock(InputStream data, long length) { | |
* @param context Additional context that is passed through the Http pipeline during the service call. | ||
* | ||
* @return A {@link Response} whose {@link Response#value() value} contains the append blob operation. | ||
* @throws UnexpectedLengthException when the length of data does not match the input {@code length}. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You cannot throw UnexpectedLengthException here as it is not public API. You either need to make it public API, or not throw it from public API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put the exception in public exception folder |
||
* @throws NullPointerException if the input data is null. | ||
*/ | ||
public Response<AppendBlobItem> appendBlockWithResponse(InputStream data, long length, | ||
AppendBlobAccessConditions appendBlobAccessConditions, Duration timeout, Context context) { | ||
Flux<ByteBuffer> fbb = Flux.range(0, (int) Math.ceil((double) length / (double) MAX_APPEND_BLOCK_BYTES)) | ||
.map(i -> i * MAX_APPEND_BLOCK_BYTES) | ||
.concatMap(pos -> Mono.fromCallable(() -> { | ||
long count = pos + MAX_APPEND_BLOCK_BYTES > length ? length - pos : MAX_APPEND_BLOCK_BYTES; | ||
byte[] cache = new byte[(int) count]; | ||
int read = 0; | ||
while (read < count) { | ||
read += data.read(cache, read, (int) count - read); | ||
} | ||
|
||
return ByteBuffer.wrap(cache); | ||
})); | ||
|
||
Mono<Response<AppendBlobItem>> response = appendBlobAsyncClient.appendBlockWithResponse( | ||
fbb.subscribeOn(Schedulers.elastic()), length, appendBlobAccessConditions, context); | ||
Objects.requireNonNull(data); | ||
Flux<ByteBuffer> fbb = Utility.convertStreamToByteBuffer(data, length, MAX_APPEND_BLOCK_BYTES); | ||
Mono<Response<AppendBlobItem>> response = appendBlobAsyncClient.appendBlockWithResponse(fbb.subscribeOn(Schedulers.elastic()), length, appendBlobAccessConditions, context); | ||
return Utility.blockWithOptionalTimeout(response, timeout); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess what we're doing here is correct. I asked the question in SOF and got a response from David (the main contributor of rx) - https://stackoverflow.com/questions/57776012/mutating-array-elements-in-reactive-publisher-onnext-doonnext-methods/
The last thing to rule out is whether to use volatile keyword to ensure immediate update visibility across thread, waiting for his response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're good, David confirmed that framework takes care of memory visibility, no need for volatile. https://stackoverflow.com/questions/57776012/mutating-array-elements-in-reactive-publisher-onnext-doonnext-methods/57786588?noredirect=1#comment102032427_57786588
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Anu for taking care of this! Learned a lot from the thread.