From 1e2f0ab90fbac4f645a8ea80cb4b43f00944b307 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Tue, 11 Jun 2019 12:51:18 -0700 Subject: [PATCH] StreamingRetry (#5785) * StreamingRetry * add sleep * update per the review feedback --- .../azure/core/pipeline/transport/aiohttp.py | 42 +++++++++++++++---- .../azure/core/pipeline/transport/base.py | 5 ++- .../core/pipeline/transport/base_async.py | 5 ++- .../pipeline/transport/requests_asyncio.py | 35 ++++++++++++++-- .../core/pipeline/transport/requests_basic.py | 37 ++++++++++++---- .../core/pipeline/transport/requests_trio.py | 33 +++++++++++++-- 6 files changed, 132 insertions(+), 25 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py index 74fadeec74e5..08d6e4526f1c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py @@ -27,12 +27,16 @@ from collections.abc import AsyncIterator import logging +import asyncio import aiohttp from azure.core.configuration import Configuration -from azure.core.exceptions import ( - ServiceRequestError, - ServiceResponseError) +from azure.core.exceptions import ServiceRequestError +from azure.core.pipeline import Pipeline + +from requests.exceptions import ( + ChunkedEncodingError, + StreamConsumedError) from .base import HttpRequest from .base_async import ( @@ -158,15 +162,21 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR class AioHttpStreamDownloadGenerator(AsyncIterator): """Streams the response body data. + :param pipeline: The pipeline object + :param request: The request object :param response: The client response object. :type response: aiohttp.ClientResponse :param block_size: block size of data sent over connection. :type block_size: int """ - def __init__(self, response: aiohttp.ClientResponse, block_size: int) -> None: + def __init__(self, pipeline: Pipeline, request: HttpRequest, + response: aiohttp.ClientResponse, block_size: int) -> None: + self.pipeline = pipeline + self.request = request self.response = response self.block_size = block_size self.content_length = int(response.headers.get('Content-Length', 0)) + self.downloaded = 0 def __len__(self): return self.content_length @@ -174,20 +184,35 @@ def __len__(self): async def __anext__(self): retry_active = True retry_total = 3 + retry_interval = 1000 while retry_active: try: chunk = await self.response.content.read(self.block_size) if not chunk: raise _ResponseStopIteration() + self.downloaded += self.block_size return chunk except _ResponseStopIteration: self.response.close() raise StopAsyncIteration() - except ServiceResponseError: + except (ChunkedEncodingError, ConnectionError): retry_total -= 1 if retry_total <= 0: retry_active = False + else: + await asyncio.sleep(retry_interval) + headers = {'range': 'bytes=' + self.downloaded + '-'} + resp = self.pipeline.run(self.request, stream=True, headers=headers) + if resp.status_code == 416: + raise + chunk = await self.response.content.read(self.block_size) + if not chunk: + raise StopIteration() + self.downloaded += chunk + return chunk continue + except StreamConsumedError: + raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) self.response.close() @@ -223,7 +248,10 @@ async def load_body(self) -> None: """Load in memory the body, so it could be accessible from sync methods.""" self._body = await self.internal_response.read() - def stream_download(self) -> AsyncIteratorType[bytes]: + def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: """Generator for streaming response body data. + + :param pipeline: The pipeline object + :type pipeline: azure.core.pipeline """ - return AioHttpStreamDownloadGenerator(self.internal_response, self.block_size) + return AioHttpStreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/base.py b/sdk/core/azure-core/azure/core/pipeline/transport/base.py index 027560c490ac..0fb2b49ceb5f 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/base.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/base.py @@ -48,6 +48,7 @@ HTTPResponseType = TypeVar("HTTPResponseType") HTTPRequestType = TypeVar("HTTPRequestType") +PipelineType = TypeVar("PipelineType") _LOGGER = logging.getLogger(__name__) @@ -259,8 +260,8 @@ def text(self, encoding=None): class HttpResponse(_HttpResponseBase): - def stream_download(self): - # type: () -> Iterator[bytes] + def stream_download(self, pipeline): + # type: (PipelineType) -> Iterator[bytes] """Generator for streaming request body data. Should be implemented by sub-classes if streaming download diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/base_async.py b/sdk/core/azure-core/azure/core/pipeline/transport/base_async.py index c9133177fd2f..8a3ed3a522f6 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/base_async.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/base_async.py @@ -70,11 +70,14 @@ class AsyncHttpResponse(_HttpResponseBase): Allows for the asynchronous streaming of data from the response. """ - def stream_download(self) -> AsyncIterator[bytes]: + def stream_download(self, pipeline) -> AsyncIterator[bytes]: """Generator for streaming response body data. Should be implemented by sub-classes if streaming download is supported. Will return an asynchronous generator. + + :param pipeline: The pipeline object + :type pipeline: azure.core.pipeline """ diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/requests_asyncio.py index 996b3c98e3d2..287851cac4e8 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/requests_asyncio.py @@ -36,6 +36,7 @@ ServiceRequestError, ServiceResponseError ) +from azure.core.pipeline import Pipeline from .base import HttpRequest from .base_async import ( AsyncHttpTransport, @@ -121,16 +122,21 @@ async def send(self, request: HttpRequest, **kwargs: Any) -> AsyncHttpResponse: class AsyncioStreamDownloadGenerator(AsyncIterator): """Streams the response body data. + :param pipeline: The pipeline object + :param request: The request object :param response: The response object. :param int block_size: block size of data sent over connection. :param generator iter_content_func: Iterator for response data. :param int content_length: size of body in bytes. """ - def __init__(self, response: requests.Response, block_size: int) -> None: + def __init__(self, pipeline: Pipeline, request: HttpRequest, response: requests.Response, block_size: int) -> None: + self.pipeline = pipeline + self.request = request self.response = response self.block_size = block_size self.iter_content_func = self.response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) + self.downloaded = 0 def __len__(self): return self.content_length @@ -139,6 +145,7 @@ async def __anext__(self): loop = _get_running_loop() retry_active = True retry_total = 3 + retry_interval = 1000 while retry_active: try: chunk = await loop.run_in_executor( @@ -148,15 +155,34 @@ async def __anext__(self): ) if not chunk: raise _ResponseStopIteration() + self.downloaded += self.block_size return chunk except _ResponseStopIteration: self.response.close() raise StopAsyncIteration() - except ServiceResponseError: + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError): retry_total -= 1 if retry_total <= 0: retry_active = False + else: + await asyncio.sleep(retry_interval) + headers = {'range': 'bytes=' + self.downloaded + '-'} + resp = self.pipeline.run(self.request, stream=True, headers=headers) + if resp.status_code == 416: + raise + chunk = await loop.run_in_executor( + None, + _iterate_response_content, + self.iter_content_func, + ) + if not chunk: + raise StopIteration() + self.downloaded += chunk + return chunk continue + except requests.exceptions.StreamConsumedError: + raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) self.response.close() @@ -166,6 +192,7 @@ async def __anext__(self): class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore """Asynchronous streaming of data from the response. """ - def stream_download(self) -> AsyncIteratorType[bytes]: # type: ignore + def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore """Generator for streaming request body data.""" - return AsyncioStreamDownloadGenerator(self.internal_response, self.block_size) # type: ignore + return AsyncioStreamDownloadGenerator(pipeline, self.request, + self.internal_response, self.block_size) # type: ignore diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py index 4e93a9048578..8b820dee21bb 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py @@ -25,17 +25,18 @@ # -------------------------------------------------------------------------- from __future__ import absolute_import import logging -from typing import Iterator, Optional, Any, Union +from typing import Iterator, Optional, Any, Union, TypeVar +import time import urllib3 # type: ignore from urllib3.util.retry import Retry # type: ignore import requests - from azure.core.configuration import Configuration from azure.core.exceptions import ( ServiceRequestError, ServiceResponseError ) +from azure.core.pipeline import Pipeline from . import HttpRequest # pylint: disable=unused-import from .base import ( @@ -44,6 +45,7 @@ _HttpResponseBase ) +PipelineType = TypeVar("PipelineType") _LOGGER = logging.getLogger(__name__) @@ -80,16 +82,21 @@ def text(self, encoding=None): class StreamDownloadGenerator(object): """Generator for streaming response data. + :param pipeline: The pipeline object + :param request: The request object :param response: The response object. :param int block_size: Number of bytes to read into memory. :param generator iter_content_func: Iterator for response data. :param int content_length: size of body in bytes. """ - def __init__(self, response, block_size): + def __init__(self, pipeline, request, response, block_size): + self.pipeline = pipeline + self.request = request self.response = response self.block_size = block_size self.iter_content_func = self.response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) + self.downloaded = 0 def __len__(self): return self.content_length @@ -100,20 +107,36 @@ def __iter__(self): def __next__(self): retry_active = True retry_total = 3 + retry_interval = 1000 while retry_active: try: chunk = next(self.iter_content_func) if not chunk: raise StopIteration() + self.downloaded += self.block_size return chunk except StopIteration: self.response.close() raise StopIteration() - except ServiceResponseError: + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError): retry_total -= 1 if retry_total <= 0: retry_active = False + else: + time.sleep(retry_interval) + headers = {'range': 'bytes=' + self.downloaded + '-'} + resp = self.pipeline.run(self.request, stream=True, headers=headers) + if resp.status_code == 416: + raise + chunk = next(self.iter_content_func) + if not chunk: + raise StopIteration() + self.downloaded += chunk + return chunk continue + except requests.exceptions.StreamConsumedError: + raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) self.response.close() @@ -124,10 +147,10 @@ def __next__(self): class RequestsTransportResponse(HttpResponse, _RequestsTransportResponseBase): """Streaming of data from the response. """ - def stream_download(self): - # type: () -> Iterator[bytes] + def stream_download(self, pipeline): + # type: (PipelineType) -> Iterator[bytes] """Generator for streaming request body data.""" - return StreamDownloadGenerator(self.internal_response, self.block_size) + return StreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size) class RequestsTransport(HttpTransport): diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py index 91f28eff7e48..b72887502220 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py @@ -36,6 +36,7 @@ ServiceRequestError, ServiceResponseError ) +from azure.core.pipeline import Pipeline from .base import HttpRequest from .base_async import ( AsyncHttpTransport, @@ -51,16 +52,21 @@ class TrioStreamDownloadGenerator(AsyncIterator): """Generator for streaming response data. + :param pipeline: The pipeline object + :param request: The request object :param response: The response object. :param int block_size: Number of bytes to read into memory. :param generator iter_content_func: Iterator for response data. :param int content_length: size of body in bytes. """ - def __init__(self, response: requests.Response, block_size: int) -> None: + def __init__(self, pipeline: Pipeline, request: HttpRequest, response: requests.Response, block_size: int) -> None: + self.pipeline = pipeline + self.request = request self.response = response self.block_size = block_size self.iter_content_func = self.response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) + self.downloaded = 0 def __len__(self): return self.content_length @@ -76,15 +82,33 @@ async def __anext__(self): ) if not chunk: raise _ResponseStopIteration() + self.downloaded += self.block_size return chunk except _ResponseStopIteration: self.response.close() raise StopAsyncIteration() - except ServiceResponseError: + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError): retry_total -= 1 if retry_total <= 0: retry_active = False + else: + await trio.sleep(1000) + headers = {'range': 'bytes=' + self.downloaded + '-'} + resp = self.pipeline.run(self.request, stream=True, headers=headers) + if resp.status_code == 416: + raise + chunk = await trio.run_sync_in_worker_thread( + _iterate_response_content, + self.iter_content_func, + ) + if not chunk: + raise StopIteration() + self.downloaded += chunk + return chunk continue + except requests.exceptions.StreamConsumedError: + raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) self.response.close() @@ -93,10 +117,11 @@ async def __anext__(self): class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore """Asynchronous streaming of data from the response. """ - def stream_download(self) -> AsyncIteratorType[bytes]: # type: ignore + def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore """Generator for streaming response data. """ - return TrioStreamDownloadGenerator(self.internal_response, self.block_size) # type: ignore + return TrioStreamDownloadGenerator(pipeline, self.request, + self.internal_response, self.block_size) # type: ignore class TrioRequestsTransport(RequestsTransport, AsyncHttpTransport): # type: ignore