Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamingRetry #5785

Merged
merged 10 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
from collections.abc import AsyncIterator

import logging
import asyncio
import aiohttp

from azure.core.configuration import Configuration
from azure.core.exceptions import (
ServiceRequestError,
ServiceResponseError)
from azure.core.exceptions import ServiceRequestError

from requests.exceptions import (
ChunkedEncodingError,
StreamConsumedError)

from .base import HttpRequest
from .base_async import (
Expand Down Expand Up @@ -158,15 +161,20 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR
class AioHttpStreamDownloadGenerator(AsyncIterator):
"""Streams the response body data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The client response object.
:type response: aiohttp.ClientResponse
:param block_size: block size of data sent over connection.
:type block_size: int
"""
def __init__(self, response: aiohttp.ClientResponse, block_size: int) -> None:
def __init__(self, pipeline, request, response: aiohttp.ClientResponse, block_size: int) -> None:
xiangyan99 marked this conversation as resolved.
Show resolved Hide resolved
self.pipeline = pipeline
self.request = request
self.response = response
self.block_size = block_size
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

def __len__(self):
return self.content_length
Expand All @@ -179,15 +187,29 @@ async def __anext__(self):
chunk = await self.response.content.read(self.block_size)
if not chunk:
raise _ResponseStopIteration()
self.downloaded += chunk
return chunk
except _ResponseStopIteration:
self.response.close()
raise StopAsyncIteration()
except ServiceResponseError:
except (ChunkedEncodingError, ConnectionError):
retry_total -= 1
if retry_total <= 0:
retry_active = False
else:
await asyncio.sleep(1000)
xiangyan99 marked this conversation as resolved.
Show resolved Hide resolved
headers = {'range': 'bytes=' + self.downloaded + '-'}
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.status_code == 416:
raise
chunk = await self.response.content.read(self.block_size)
if not chunk:
raise StopIteration()
self.downloaded += chunk
return chunk
continue
except StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
Expand Down Expand Up @@ -223,7 +245,7 @@ async def load_body(self) -> None:
"""Load in memory the body, so it could be accessible from sync methods."""
self._body = await self.internal_response.read()

def stream_download(self) -> AsyncIteratorType[bytes]:
def stream_download(self, pipeline) -> AsyncIteratorType[bytes]:
xiangyan99 marked this conversation as resolved.
Show resolved Hide resolved
"""Generator for streaming response body data.
"""
return AioHttpStreamDownloadGenerator(self.internal_response, self.block_size)
return AioHttpStreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size)
5 changes: 3 additions & 2 deletions sdk/core/azure-core/azure/core/pipeline/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

HTTPResponseType = TypeVar("HTTPResponseType")
HTTPRequestType = TypeVar("HTTPRequestType")
PipelineType = TypeVar("PipelineType")

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -259,8 +260,8 @@ def text(self, encoding=None):


class HttpResponse(_HttpResponseBase):
def stream_download(self):
# type: () -> Iterator[bytes]
def stream_download(self, pipeline):
# type: (PipelineType) -> Iterator[bytes]
"""Generator for streaming request body data.

Should be implemented by sub-classes if streaming download
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class AsyncHttpResponse(_HttpResponseBase):

Allows for the asynchronous streaming of data from the response.
"""
def stream_download(self) -> AsyncIterator[bytes]:
def stream_download(self, pipeline) -> AsyncIterator[bytes]:
xiangyan99 marked this conversation as resolved.
Show resolved Hide resolved
"""Generator for streaming response body data.

Should be implemented by sub-classes if streaming download
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,21 @@ async def send(self, request: HttpRequest, **kwargs: Any) -> AsyncHttpResponse:
class AsyncioStreamDownloadGenerator(AsyncIterator):
"""Streams the response body data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: block size of data sent over connection.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, response: requests.Response, block_size: int) -> None:
def __init__(self, pipeline, request, response: requests.Response, block_size: int) -> None:
xiangyan99 marked this conversation as resolved.
Show resolved Hide resolved
self.pipeline = pipeline
self.request = request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

def __len__(self):
return self.content_length
Expand All @@ -148,15 +153,34 @@ async def __anext__(self):
)
if not chunk:
raise _ResponseStopIteration()
self.downloaded += chunk
return chunk
except _ResponseStopIteration:
self.response.close()
raise StopAsyncIteration()
except ServiceResponseError:
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
retry_total -= 1
if retry_total <= 0:
retry_active = False
else:
await asyncio.sleep(1000)
headers = {'range': 'bytes=' + self.downloaded + '-'}
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.status_code == 416:
raise
chunk = await loop.run_in_executor(
None,
_iterate_response_content,
self.iter_content_func,
)
if not chunk:
raise StopIteration()
self.downloaded += chunk
return chunk
continue
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
Expand All @@ -166,6 +190,7 @@ async def __anext__(self):
class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore
"""Asynchronous streaming of data from the response.
"""
def stream_download(self) -> AsyncIteratorType[bytes]: # type: ignore
def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore
"""Generator for streaming request body data."""
return AsyncioStreamDownloadGenerator(self.internal_response, self.block_size) # type: ignore
return AsyncioStreamDownloadGenerator(pipeline, self.request,
self.internal_response, self.block_size) # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
# --------------------------------------------------------------------------
from __future__ import absolute_import
import logging
from typing import Iterator, Optional, Any, Union
from typing import Iterator, Optional, Any, Union, TypeVar
import time
import urllib3 # type: ignore
from urllib3.util.retry import Retry # type: ignore
import requests


from azure.core.configuration import Configuration
from azure.core.exceptions import (
ServiceRequestError,
Expand All @@ -44,6 +44,7 @@
_HttpResponseBase
)

PipelineType = TypeVar("PipelineType")

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -80,16 +81,21 @@ def text(self, encoding=None):
class StreamDownloadGenerator(object):
"""Generator for streaming response data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: Number of bytes to read into memory.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, response, block_size):
def __init__(self, pipeline, request, response, block_size):
self.pipeline = pipeline
self.request = request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

def __len__(self):
return self.content_length
Expand All @@ -105,15 +111,30 @@ def __next__(self):
chunk = next(self.iter_content_func)
if not chunk:
raise StopIteration()
self.downloaded += chunk
return chunk
except StopIteration:
self.response.close()
raise StopIteration()
except ServiceResponseError:
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
retry_total -= 1
if retry_total <= 0:
retry_active = False
else:
time.sleep(1000)
headers = {'range': 'bytes=' + self.downloaded + '-'}
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.status_code == 416:
raise
chunk = next(self.iter_content_func)
if not chunk:
raise StopIteration()
self.downloaded += chunk
return chunk
continue
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
Expand All @@ -124,10 +145,10 @@ def __next__(self):
class RequestsTransportResponse(HttpResponse, _RequestsTransportResponseBase):
"""Streaming of data from the response.
"""
def stream_download(self):
# type: () -> Iterator[bytes]
def stream_download(self, pipeline):
# type: (PipelineType) -> Iterator[bytes]
"""Generator for streaming request body data."""
return StreamDownloadGenerator(self.internal_response, self.block_size)
return StreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size)


class RequestsTransport(HttpTransport):
Expand Down
32 changes: 28 additions & 4 deletions sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,21 @@
class TrioStreamDownloadGenerator(AsyncIterator):
"""Generator for streaming response data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: Number of bytes to read into memory.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, response: requests.Response, block_size: int) -> None:
def __init__(self, pipeline, request, response: requests.Response, block_size: int) -> None:
self.pipeline = pipeline
self.request = request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

def __len__(self):
return self.content_length
Expand All @@ -76,15 +81,33 @@ async def __anext__(self):
)
if not chunk:
raise _ResponseStopIteration()
self.downloaded += chunk
return chunk
except _ResponseStopIteration:
self.response.close()
raise StopAsyncIteration()
except ServiceResponseError:
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
retry_total -= 1
if retry_total <= 0:
retry_active = False
else:
await trio.sleep(1000)
headers = {'range': 'bytes=' + self.downloaded + '-'}
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.status_code == 416:
raise
chunk = await trio.run_sync_in_worker_thread(
_iterate_response_content,
self.iter_content_func,
)
if not chunk:
raise StopIteration()
self.downloaded += chunk
return chunk
continue
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
Expand All @@ -93,10 +116,11 @@ async def __anext__(self):
class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore
"""Asynchronous streaming of data from the response.
"""
def stream_download(self) -> AsyncIteratorType[bytes]: # type: ignore
def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore
"""Generator for streaming response data.
"""
return TrioStreamDownloadGenerator(self.internal_response, self.block_size) # type: ignore
return TrioStreamDownloadGenerator(pipeline, self.request,
self.internal_response, self.block_size) # type: ignore


class TrioRequestsTransport(RequestsTransport, AsyncHttpTransport): # type: ignore
Expand Down