Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azure-core] Small fixes for aiohttp #6490

Merged
merged 12 commits into from
Jul 26, 2019
1 change: 1 addition & 0 deletions sdk/core/azure-core/azure/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def __init__(self, message=None, response=None, **kwargs):
if response:
self.reason = response.reason
self.status_code = response.status_code

message = message or "Operation returned an invalid status '{}'".format(self.reason)
try:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def deserialize_from_http_generics(cls, response):
# Try to use content-type from headers if available
content_type = None
if response.content_type: # type: ignore
content_type = response.content_type[0].strip().lower() # type: ignore
content_type = response.content_type.split(";")[0].strip().lower() # type: ignore

# Ouch, this server did not declare what it sent...
# Let's guess it's JSON...
Expand Down
26 changes: 12 additions & 14 deletions sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import aiohttp

from azure.core.configuration import ConnectionConfiguration
from azure.core.exceptions import ServiceRequestError
from azure.core.exceptions import ServiceRequestError, ServiceResponseError
from azure.core.pipeline import Pipeline

from requests.exceptions import (
Expand Down Expand Up @@ -181,7 +181,8 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR
await response.load_body()
except aiohttp.client_exceptions.ClientConnectorError as err:
error = ServiceRequestError(err, error=err)

except asyncio.TimeoutError as err:
error = ServiceResponseError(err, error=err)
if error:
raise error
return response
Expand All @@ -191,19 +192,16 @@ class AioHttpStreamDownloadGenerator(AsyncIterator):
"""Streams the response body data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The client response object.
:type response: aiohttp.ClientResponse
:param block_size: block size of data sent over connection.
:type block_size: int
"""
def __init__(self, pipeline: Pipeline, request: HttpRequest,
response: aiohttp.ClientResponse, block_size: int) -> None:
def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None:
self.pipeline = pipeline
self.request = request
self.request = response.request
self.response = response
self.block_size = block_size
self.content_length = int(response.headers.get('Content-Length', 0))
self.block_size = response.block_size
self.content_length = int(response.internal_response.headers.get('Content-Length', 0))
self.downloaded = 0

def __len__(self):
Expand All @@ -215,13 +213,13 @@ async def __anext__(self):
retry_interval = 1000
while retry_active:
try:
chunk = await self.response.content.read(self.block_size)
chunk = await self.response.internal_response.content.read(self.block_size)
if not chunk:
raise _ResponseStopIteration()
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.close()
self.response.internal_response.close()
raise StopAsyncIteration()
except (ChunkedEncodingError, ConnectionError):
retry_total -= 1
Expand All @@ -233,7 +231,7 @@ async def __anext__(self):
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.status_code == 416:
raise
chunk = await self.response.content.read(self.block_size)
chunk = await self.response.internal_response.content.read(self.block_size)
if not chunk:
raise StopIteration()
self.downloaded += chunk
Expand All @@ -243,7 +241,7 @@ async def __anext__(self):
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
self.response.internal_response.close()
raise

class AioHttpTransportResponse(AsyncHttpResponse):
Expand Down Expand Up @@ -282,4 +280,4 @@ def stream_download(self, pipeline) -> AsyncIteratorType[bytes]:
:param pipeline: The pipeline object
:type pipeline: azure.core.pipeline
"""
return AioHttpStreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size)
return AioHttpStreamDownloadGenerator(pipeline, self)
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ async def __aenter__(self):
async def __aexit__(self, *exc_details): # pylint: disable=arguments-differ
return super(AsyncioRequestsTransport, self).__exit__()

async def sleep(self, duration):
await asyncio.sleep(duration)

async def send(self, request: HttpRequest, **kwargs: Any) -> AsyncHttpResponse: # type: ignore
"""Send the request using this HTTP sender.

Expand Down Expand Up @@ -135,18 +138,16 @@ class AsyncioStreamDownloadGenerator(AsyncIterator):
"""Streams the response body data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: block size of data sent over connection.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, pipeline: Pipeline, request: HttpRequest, response: requests.Response, block_size: int) -> None:
def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None:
self.pipeline = pipeline
self.request = request
self.request = response.request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.block_size = response.block_size
self.iter_content_func = self.response.internal_response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

Expand All @@ -170,7 +171,7 @@ async def __anext__(self):
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.close()
self.response.internal_response.close()
raise StopAsyncIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
Expand All @@ -197,7 +198,7 @@ async def __anext__(self):
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
self.response.internal_response.close()
raise


Expand All @@ -206,5 +207,4 @@ class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportRespo
"""
def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore
"""Generator for streaming request body data."""
return AsyncioStreamDownloadGenerator(pipeline, self.request,
self.internal_response, self.block_size) # type: ignore
return AsyncioStreamDownloadGenerator(pipeline, self) # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ def __init__(self, request, requests_response, block_size=None):
self.status_code = requests_response.status_code
self.headers = requests_response.headers
self.reason = requests_response.reason
content_type = requests_response.headers.get('content-type')
if content_type:
self.content_type = content_type.split(";")
self.content_type = requests_response.headers.get('content-type')

def body(self):
return self.internal_response.content
Expand All @@ -82,18 +80,16 @@ class StreamDownloadGenerator(object):
"""Generator for streaming response data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: Number of bytes to read into memory.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, pipeline, request, response, block_size):
def __init__(self, pipeline, response):
self.pipeline = pipeline
self.request = request
self.request = response.request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.block_size = response.block_size
self.iter_content_func = self.response.internal_response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

Expand All @@ -115,7 +111,7 @@ def __next__(self):
self.downloaded += self.block_size
return chunk
except StopIteration:
self.response.close()
self.response.internal_response.close()
raise StopIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
Expand All @@ -138,7 +134,7 @@ def __next__(self):
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
self.response.internal_response.close()
raise
next = __next__ # Python 2 compatibility.

Expand All @@ -149,7 +145,7 @@ class RequestsTransportResponse(HttpResponse, _RequestsTransportResponseBase):
def stream_download(self, pipeline):
# type: (PipelineType) -> Iterator[bytes]
"""Generator for streaming request body data."""
return StreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size)
return StreamDownloadGenerator(pipeline, self)


class RequestsTransport(HttpTransport):
Expand Down
17 changes: 7 additions & 10 deletions sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,16 @@ class TrioStreamDownloadGenerator(AsyncIterator):
"""Generator for streaming response data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: Number of bytes to read into memory.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, pipeline: Pipeline, request: HttpRequest, response: requests.Response, block_size: int) -> None:
def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None:
self.pipeline = pipeline
self.request = request
self.request = response.request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.block_size = response.block_size
self.iter_content_func = self.response.internal_response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

Expand All @@ -85,7 +83,7 @@ async def __anext__(self):
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.close()
self.response.internal_response.close()
raise StopAsyncIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
Expand All @@ -111,7 +109,7 @@ async def __anext__(self):
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
self.response.internal_response.close()
raise

class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore
Expand All @@ -120,8 +118,7 @@ class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse
def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore
"""Generator for streaming response data.
"""
return TrioStreamDownloadGenerator(pipeline, self.request,
self.internal_response, self.block_size) # type: ignore
return TrioStreamDownloadGenerator(pipeline, self) # type: ignore


class TrioRequestsTransport(RequestsTransport, AsyncHttpTransport): # type: ignore
Expand Down
17 changes: 17 additions & 0 deletions sdk/core/azure-core/tests/azure_core_asynctests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,23 @@ async def test_basic_async_requests():

assert response.http_response.status_code == 200

@pytest.mark.asyncio
async def test_async_transport_sleep():

async with AsyncioRequestsTransport() as transport:
await transport.sleep(1)

async with AioHttpTransport() as transport:
await transport.sleep(1)

def test_async_trio_transport_sleep():

async def do():
async with TrioRequestsTransport() as transport:
await transport.sleep(1)

response = trio.run(do)

@pytest.mark.asyncio
async def test_conf_async_requests():

Expand Down