-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Raise exception rather than swallowing it if there is something wrong… #16783
Conversation
… in retry stream downloading
sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py
Outdated
Show resolved
Hide resolved
chunk = next(self.iter_content_func) | ||
try: | ||
resp = self.pipeline.run(self.request, stream=True, headers=headers) | ||
if not resp.http_response: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't recall that the HttpResponse object had a boolean value? Or will this value be None under some circumstances?
Maybe we could add a small in-line comment as to what this if-clause is intending to catch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise | ||
chunk = next(self.iter_content_func) | ||
except Exception as err: # pylint: disable=broad-except | ||
_LOGGER.warning("Unable to stream download: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional nit:
From a customers perspective - will this code always be in the context of a "download"? Do we stream in other customer usecases?
Maybe it could be more generic with something like "Unable to stream response content"..... but maybe that sounds too HTTP-ish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to be consistent to https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py#L152. But I am open to change it.
if resp.http_response.status_code == 416: | ||
raise | ||
chunk = await self.response.internal_response.content.read(self.block_size) | ||
except Exception as err: # pylint: disable=broad-except |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious - prior to your change - which except
clause was catching and swallowing these errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious - prior to your change - which
except
clause was catching and swallowing these errors?
It fails in https://github.com/psf/requests/blob/master/requests/models.py#L920,
self.status_code is None.
This pull request is protected by Check Enforcer. What is Check Enforcer?Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass. Why am I getting this message?You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged. What should I do now?If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows: What if I am onboarding a new service?Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment: |
raise | ||
if resp.http_response.status_code == 416: | ||
raise | ||
self.iter_content_func = resp.http_response.iter_content(self.block_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This last change of re-setting self.iter_content_func
seems to make sense superficially, because now next(self.iter_content_func)
can indeed fetch data (rather than just always raising StopIteration as it did before), but it really does not help for the reasons I explained already in some depth, so I'm just citing myself here:
And as already commented, the code makes assumptions it cannot safely make: StreamDownloader is also used for requests that already have a (non-trivial) range header set. So what happens if the original range header is, say, "bytes=1000-1999", chunking is 200 and there is a connection error after the first chunk?
This code will then go on a make a request for "range: bytes=200-", which does not overlap with the original request.
So this can produce complete garbage data!
The only way out I can see here is to remove the retry entirely. This function just does not have enough context information to re-try safely. So the best it can do is to raise and let the upper layers handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
Yes. I was about to answer this. :)
As you know, the purpose for azure-core is to support other SDK libraries. We have not heard any real scenarios from libraries that need to read bytes like "bytes=1000-1999". In storage, we always download entire file. This made us leaning towards having retry rather than failing directly.
But yes, you are right. It is not correct in case "bytes=1000-1999". I will open an issue to track it.
In the meantime, do you have real scenarios that need this?
That can help us prioritize the issue.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surely you are aware this is used in the azure-storage-blob library, which does set range headers by default, as it will use multi-threading to download large blobs in chunks, in parallel?
See e.g. here:
azure-sdk-for-python/sdk/storage/azure-storage-blob/azure/storage/blob/_download.py
Line 505 in b222aa7
def readinto(self, stream): |
Yes, we have real scenarios we use this. As you might be aware, we use azure storage blob for big data analysis, processing dozens of TB daily. We had several customer-facing incidents that are very likely due to this bug. We spent person-weeks tracking this down and working around this.
@@ -130,17 +130,25 @@ def __next__(self): | |||
self.response.internal_response.close() | |||
raise StopIteration() | |||
except (requests.exceptions.ChunkedEncodingError, | |||
requests.exceptions.ConnectionError): | |||
requests.exceptions.ConnectionError) as ex: | |||
retry_total -= 1 | |||
if retry_total <= 0: | |||
retry_active = False | |||
else: | |||
time.sleep(retry_interval) | |||
headers = {'range': 'bytes=' + str(self.downloaded) + '-'} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm starting to repeat myself at least 3 times over, but again: the original request saved in self.request
can already have a range set, e.g. range: bytes=1000-
. So what does this new range request do, exactly?
Also note that there are REST APIs that have special range request headers that take precedence over range
, e.g. azure blob store GET used x-ms-range
. For more information please refer to the documentation here: https://docs.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
So it would not even help to try parse this header of the original request here, because one can never be sure to catch all the special cases of APIs here.
Again, please do not retry, this cannot be done safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should also add that, in general self.downloaded
is unreliable, because it is computed inconsistently, sometimes as
self.downloaded += len(chunk)
and sometimes as
self.downloaded += self._block_size
Both of them can be wrong, in general: requests configures urllib3 to do decoding based on the content-encoding header of the response (e.g. gunzip the data stream):
https://github.com/psf/requests/blob/bdc00eb0978dd3cdd43f7cd1f95ced7aa75fab39/requests/models.py#L753-L754
So this will (usually) read block_size
bytes from the underlying raq data stream, then decode these bytes to return chunk
. So self.downloaded += len(chunk)
will be wrong if such decoding is happening.
However, note that urllib3's stream
function that is documented here:
https://urllib3.readthedocs.io/en/1.26.3/reference/urllib3.response.html#urllib3.response.HTTPResponse.stream
actually does not promise to always read exactly block_size
bytes from the underlying stream. And in general, it won't: in the current version, for chunked http responses, the block_size is not actually used; instead, the chunks are iterated over.
So in essence, there is no way to reliably know how many of the original bytes were downloaded unless it's clear there is the special case of no http response chunking and no content decoding, and relying on implementation details of urllib3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @xiangyan99,
regarding the real word scenarios. We are using the azure blob storage in our machine learning applications that pull parquet data from the azure blob storage. Parquet files implement row group statistics, so a common optimisation is to only read the parts of the parquet file that have the row group statistics and then do pruning based on predicates.
See https://github.com/mbr/simplekv/blob/master/simplekv/net/_azurestore_new.py#L194 for a file like interface to access the azure blob storage and http://peter-hoffmann.com/2020/understand-predicate-pushdown-on-rowgroup-level-in-parquet-with-pyarrow-and-python.html or an explanation about row group pruning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jochen-ott-by @hoffmann , your information is really helpful.
Let me re-think about it. :)
sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py
Outdated
Show resolved
Hide resolved
sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py
Outdated
Show resolved
Hide resolved
sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py
Outdated
Show resolved
Hide resolved
This looks like good progress, @xiangyan99 ! However, I'm still concerned about the complexity of this solution, which does not deal with the fact that (in general) Here is my view:
So again, please consider removing the retry code altogether to get a fix for the data corruption bug ASAP. Note that removing the code that was intended to re-try does not really make anything worse for anyone, as the code never actually helped retrying anything (!). So removing it is a clear net win for everyone. After this immediate fix, one could evaluate again weather the complexity of a re-try at this point of the code is worth the effort. (In my personal opinion, it's not worth it; instead, better re-try some levels up in the stack, where it's much easier, as much more context information is available how to re-try best). |
Thanks for your inputs. I am not quite clear why self.downloaded cannot be computed reliably. Let's say the file has 1mb and each time we read 1k. First read we will try get 0~1k. If succeeds, we record 1k is downloaded (the return may be less than 1k because of decoding). If we need to re-connect, we want to tell the service to continue from 1k because from service's perspective, it always deals with pre-decoded bits. right? We release our SDKs on monthly cadence and next release will be in the week of 3/8. (I thought you already talked to your customer support team and confirmed 3/8 worked for you?) |
I already explained this in some depth here: #16783 (comment) I'm citing the relevant part here:
You wrote:
The point is that the
No. What can happen is that the first chunk from
Release-wise, this would work. I'm just not confident that the current approach of implementing re-tries is something that can be done properly by then. |
Yes. len(chunk) is not reliable and here we modified the code to only use block_size.
it says: stream(amt=65536, decode_content=None) So it reads block_size or the connection is closed (connection error or done). If there is a connection error, we don't call if it is done, everything is fine.
To make sure we are one same page, you are saying the return (after decoding) is 786, but the raw bytes (before decoding) it reads is 1000, right? So next time we should tell it to read start from 1000.
|
It also says: amt: How much of the content to read. The generator will return up to much data per iteration, but may return less. So the documentation is incomplete here. Also, if you look how it's implemented, you'll see that for chunked encoding, this will always iterate over the chunks returned by the server. This is what I specifically wrote above. I actually tested it. It really does not respect this parameter.
No, in general, it does not.
No. Assume there is no compression decoding (like gzip) at all. You pass amt=1000 to BTW, there is another potential issue here I have not yet tested: what happens if the content-encoding is gzip and you try to re-download a partial content after a failure? The first request will (successfully) decompress the response. But for the partial data, the client cannot really do that ... |
@xiangyan99 |
We have merged one to disable the retry. Could you help to describe your scenario? That can help us prioritize the issue. Thank you. |
@xiangyan99 We download the blob from databricks and sometime there are network error with no exception and the downloaded file seems incomplete compare to source one. Do you mean that you already fix those behavior(making incomplete file with no exception ) ? So if there are no exception, those gap never occur? |
We used to have the issue that we return incomplete downloads with a "succeeded" tag. With the fix, current behavior is: Please let me know if this does not meet your requirements. |
It is already fixed. |
… in retry stream downloading