Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] add provisional azure.core.rest #19502

Merged
merged 69 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
6dff5d5
tests passing locally
iscai-msft Jun 8, 2021
58c184a
move pytests.init to root
iscai-msft Jun 8, 2021
c7663fe
remove set formatting from windows run
iscai-msft Jun 8, 2021
bec31ed
try to fix windows testserver start
iscai-msft Jun 8, 2021
405d5e8
uncomment testserver termination
iscai-msft Jun 8, 2021
fb692df
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-pytho…
iscai-msft Jun 17, 2021
f826377
add devops scripts to start testserver
iscai-msft Jun 17, 2021
6ceb795
scriptSource -> scriptPath
iscai-msft Jun 17, 2021
0f10e5e
set env vars
iscai-msft Jun 18, 2021
c35fdc2
add pwsh testserver
iscai-msft Jun 18, 2021
f8fd247
return result
iscai-msft Jun 21, 2021
c4fd6d2
try retuning exit code
iscai-msft Jun 21, 2021
b322e28
remove ci work, make pytest fixture module level
iscai-msft Jun 21, 2021
449f42a
tests working without pytest.ini
iscai-msft Jun 21, 2021
a021de2
only have testserver fixture in conftest
iscai-msft Jun 21, 2021
7a68dd3
switch to package scope
iscai-msft Jun 21, 2021
df42f15
unite testserver setting
iscai-msft Jun 21, 2021
da62f4b
switch to environment variables
iscai-msft Jun 22, 2021
e06ebb4
see what happens if we don't kill testserver
iscai-msft Jun 22, 2021
04ddbd0
cycle through ports
iscai-msft Jun 22, 2021
e860e8a
remove scripts
iscai-msft Jun 22, 2021
2f39ad2
allow 2.7 compatibility
iscai-msft Jun 22, 2021
f503b2c
wait longer for pypy
iscai-msft Jun 23, 2021
8ccce34
increase sleep to 2 for pypy
iscai-msft Jun 23, 2021
30c6e39
move core testserver into tests
iscai-msft Jun 24, 2021
20e593a
switch to urllib requesting to see if port open
iscai-msft Jun 24, 2021
1483f8c
ignore coretestserver readme
iscai-msft Jun 24, 2021
4cbf6a8
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
iscai-msft Jun 24, 2021
aabe409
initial commit for rest layer
iscai-msft Jun 24, 2021
cf637ef
add rest transport responses
iscai-msft Jun 24, 2021
0e596ce
add rest resposne mishandling exceptiosn to exceptions.py
iscai-msft Jun 24, 2021
c930e74
Merge branch 'get_testserver_working' of https://github.com/iscai-msf…
iscai-msft Jun 24, 2021
d20fa00
current tests passing
iscai-msft Jun 24, 2021
883c761
adding initial tests
iscai-msft Jun 24, 2021
e35d198
most sync tests passing, working on async
iscai-msft Jun 25, 2021
b26a9e7
most async tests passing
iscai-msft Jun 25, 2021
f6ba2a9
fix asynciteratortype typing
iscai-msft Jun 25, 2021
37ae392
fix aiohttp and trio typing
iscai-msft Jun 25, 2021
f23a9b9
fix kwargs popping of chunk_size
iscai-msft Jun 25, 2021
28adf60
fix deepcopy of requests
iscai-msft Jun 25, 2021
6ffe5db
say rest requests can only be used with send_Request in changelog, al…
iscai-msft Jun 25, 2021
10a2350
add tests for trio and asyncio transports
iscai-msft Jun 25, 2021
8deb6d7
lint and mypy
iscai-msft Jun 25, 2021
680f423
move tests, 2.7 passing
iscai-msft Jun 25, 2021
6b23aa2
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
iscai-msft Jun 25, 2021
7793a2a
add provisional docs
iscai-msft Jun 25, 2021
051743c
remove port from rest tests
iscai-msft Jun 25, 2021
911230c
fix tests and bump version
iscai-msft Jun 26, 2021
a43004c
mypy
iscai-msft Jun 27, 2021
b392f7c
split up async chunksize tests
iscai-msft Jun 28, 2021
61edc06
pylint
iscai-msft Jun 28, 2021
038f8da
make exit call close
iscai-msft Jun 28, 2021
49e3c33
remove chunk_size
iscai-msft Jun 28, 2021
c6860f2
move transport responses to rest module
iscai-msft Jun 28, 2021
831a439
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
iscai-msft Jun 28, 2021
8b3c812
move asyncio waiting for internal response's close onto transport res…
iscai-msft Jun 28, 2021
f9ecfdb
fix wording of StreamClosedError
iscai-msft Jun 28, 2021
16ed67b
fix return_pipeline_response's http_request property
iscai-msft Jun 28, 2021
934f53d
inclide zlib compressing of aiohttp body
iscai-msft Jun 28, 2021
fcbd91d
add error reading tests
iscai-msft Jun 28, 2021
2667239
clear up content setting
iscai-msft Jun 28, 2021
5be9b52
pass self.content in aiohttp to get the decompressor
iscai-msft Jun 28, 2021
930509a
johan's comments
iscai-msft Jun 28, 2021
7fd947f
stop importing resposne types once we get one
iscai-msft Jun 28, 2021
931aba3
set decompress property for aiohttp in to_rest_response
iscai-msft Jun 28, 2021
27f4af9
remove special content from aiohttp and add reuest info to new rest e…
iscai-msft Jun 28, 2021
9d4013b
default application/json to utf-8, and use chardet to detect endoing …
iscai-msft Jun 29, 2021
59f0015
read body in a try except so we can close if it fails
iscai-msft Jun 29, 2021
a575549
remove num_bytes_downloaded
iscai-msft Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Release History

## 1.15.1 (Unreleased)
## 1.16.0 (Unreleased)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiangyan99 asked @johanste and he says it should be 1.16.0


### New Features

- Add new ***provisional*** methods `send_request` onto the `azure.core.PipelineClient` and `azure.core.AsyncPipelineClient`. This method takes in
requests and sends them through our pipelines.
- Add new ***provisional*** module `azure.core.rest`. `azure.core.rest` is our new public simple HTTP library in `azure.core` that users will use to create requests, and consume responses.
- Add new ***provisional*** errors `StreamConsumedError`, `StreamClosedError`, and `ResponseNotReadError` to `azure.core.exceptions`. These errors
are thrown if you mishandle streamed responses from the provisional `azure.core.rest` module

### Bug Fixes

Expand Down
15 changes: 15 additions & 0 deletions sdk/core/azure-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ class TooManyRedirectsError(HttpResponseError):

*kwargs* are keyword arguments to include with the exception.

#### **Provisional** StreamConsumedError
A **provisional** error thrown if you try to access the stream of the **provisional**
responses `azure.core.rest.HttpResponse` or `azure.core.rest.AsyncHttpResponse` once
the response stream has been consumed.

#### **Provisional** StreamClosedError
A **provisional** error thrown if you try to access the stream of the **provisional**
responses `azure.core.rest.HttpResponse` or `azure.core.rest.AsyncHttpResponse` once
the response stream has been closed.

#### **Provisional** ResponseNotReadError
A **provisional** error thrown if you try to access the `content` of the **provisional**
responses `azure.core.rest.HttpResponse` or `azure.core.rest.AsyncHttpResponse` before
reading in the response's bytes first.

### Configurations

When calling the methods, some properties can be configured by passing in as kwargs arguments.
Expand Down
52 changes: 52 additions & 0 deletions sdk/core/azure-core/azure/core/_pipeline_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
RetryPolicy,
)
from .pipeline.transport import RequestsTransport
from .pipeline._tools import to_rest_response as _to_rest_response

try:
from typing import TYPE_CHECKING
Expand All @@ -58,10 +59,23 @@
Callable,
Iterator,
cast,
TypeVar
) # pylint: disable=unused-import
HTTPResponseType = TypeVar("HTTPResponseType")
HTTPRequestType = TypeVar("HTTPRequestType")

_LOGGER = logging.getLogger(__name__)

def _prepare_request(request):
# returns the request ready to run through pipelines
# and a bool telling whether we ended up converting it
rest_request = False
try:
request_to_run = request._to_pipeline_transport_request() # pylint: disable=protected-access
rest_request = True
except AttributeError:
request_to_run = request
return rest_request, request_to_run

class PipelineClient(PipelineClientBase):
"""Service client core methods.
Expand Down Expand Up @@ -170,3 +184,41 @@ def _build_pipeline(self, config, **kwargs): # pylint: disable=no-self-use
transport = RequestsTransport(**kwargs)

return Pipeline(transport, policies)


def send_request(self, request, **kwargs):
# type: (HTTPRequestType, Any) -> HTTPResponseType
"""**Provisional** method that runs the network request through the client's chained policies.

This method is marked as **provisional**, meaning it may be changed in a future release.

>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest('GET', 'http://www.example.com')
<HttpRequest [GET], url: 'http://www.example.com'>
>>> response = client.send_request(request)
<HttpResponse: 200 OK>

:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.HttpResponse
# """
rest_request, request_to_run = _prepare_request(request)
return_pipeline_response = kwargs.pop("_return_pipeline_response", False)
pipeline_response = self._pipeline.run(request_to_run, **kwargs) # pylint: disable=protected-access
response = pipeline_response.http_response
if rest_request:
response = _to_rest_response(response)
try:
if not kwargs.get("stream", False):
response.read()
response.close()
except Exception as exc:
response.close()
raise exc
if return_pipeline_response:
pipeline_response.http_response = response
iscai-msft marked this conversation as resolved.
Show resolved Hide resolved
pipeline_response.http_request = request
return pipeline_response
return response
62 changes: 60 additions & 2 deletions sdk/core/azure-core/azure/core/_pipeline_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import logging
from collections.abc import Iterable
from typing import Any, Awaitable
from .configuration import Configuration
from .pipeline import AsyncPipeline
from .pipeline.transport._base import PipelineClientBase
Expand All @@ -36,16 +37,20 @@
RequestIdPolicy,
AsyncRetryPolicy,
)
from ._pipeline_client import _prepare_request
from .pipeline._tools_async import to_rest_response as _to_rest_response

try:
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypeVar
except ImportError:
TYPE_CHECKING = False

HTTPRequestType = TypeVar("HTTPRequestType")
AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType")

if TYPE_CHECKING:
from typing import (
List,
Any,
Dict,
Union,
IO,
Expand Down Expand Up @@ -168,3 +173,56 @@ def _build_pipeline(self, config, **kwargs): # pylint: disable=no-self-use
transport = AioHttpTransport(**kwargs)

return AsyncPipeline(transport, policies)

async def _make_pipeline_call(self, request, **kwargs):
rest_request, request_to_run = _prepare_request(request)
return_pipeline_response = kwargs.pop("_return_pipeline_response", False)
pipeline_response = await self._pipeline.run(
request_to_run, **kwargs # pylint: disable=protected-access
)
response = pipeline_response.http_response
if rest_request:
rest_response = _to_rest_response(response)
if not kwargs.get("stream"):
try:
# in this case, the pipeline transport response already called .load_body(), so
# the body is loaded. instead of doing response.read(), going to set the body
# to the internal content
rest_response._content = response.body() # pylint: disable=protected-access
await rest_response.close()
except Exception as exc:
await rest_response.close()
raise exc
response = rest_response
if return_pipeline_response:
pipeline_response.http_response = response
pipeline_response.http_request = request
return pipeline_response
return response

def send_request(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not async?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's a little confusing, but it returns an awaitable object. This has to do with the fact that we're returning a context manager that both has an __await__ and a __aenter__ etc. So this way, you can call it like this:

response = await client.send_request(request)

and like this (mainly in the case of streaming)

async with client.send_request(request) as response:
    print(response)

talked through this with @johanste and he's ok with not making it async def. Reason is once we mark it as async def, the compiler thinks we're returning a coroutine, which don't have attributes __aenter__, so we get an attribute error when trying to use it as a context managed response

self,
request: HTTPRequestType,
*,
stream: bool = False,
**kwargs: Any
) -> Awaitable[AsyncHTTPResponseType]:
"""**Provisional** method that runs the network request through the client's chained policies.

This method is marked as **provisional**, meaning it may be changed in a future release.

>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest('GET', 'http://www.example.com')
<HttpRequest [GET], url: 'http://www.example.com'>
>>> response = await client.send_request(request)
<AsyncHttpResponse: 200 OK>

:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.AsyncHttpResponse
"""
from .rest._rest_py3 import _AsyncContextManager
wrapped = self._make_pipeline_call(request, stream=stream, **kwargs)
return _AsyncContextManager(wrapped=wrapped)
2 changes: 1 addition & 1 deletion sdk/core/azure-core/azure/core/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# regenerated.
# --------------------------------------------------------------------------

VERSION = "1.15.1"
VERSION = "1.16.0"
47 changes: 47 additions & 0 deletions sdk/core/azure-core/azure/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,50 @@ def __str__(self):
if self._error_format:
return str(self._error_format)
return super(ODataV4Error, self).__str__()

class StreamConsumedError(AzureError):
"""**Provisional** error thrown if you try to access the stream of a response once consumed.

This error is marked as **provisional**, meaning it may be changed in a future release. It is
thrown if you try to read / stream an ~azure.core.rest.HttpResponse or
~azure.core.rest.AsyncHttpResponse once the response's stream has been consumed.
"""
def __init__(self, response):
message = (
"You are attempting to read or stream the content from request {}. "\
"You have likely already consumed this stream, so it can not be accessed anymore.".format(
response.request
)
)
super(StreamConsumedError, self).__init__(message)

class StreamClosedError(AzureError):
"""**Provisional** error thrown if you try to access the stream of a response once closed.

This error is marked as **provisional**, meaning it may be changed in a future release. It is
thrown if you try to read / stream an ~azure.core.rest.HttpResponse or
~azure.core.rest.AsyncHttpResponse once the response's stream has been closed.
"""
def __init__(self, response):
message = (
"The content for response from request {} can no longer be read or streamed, since the "\
"response has already been closed.".format(response.request)
)
super(StreamClosedError, self).__init__(message)

class ResponseNotReadError(AzureError):
"""**Provisional** error thrown if you try to access a response's content without reading first.

This error is marked as **provisional**, meaning it may be changed in a future release. It is
thrown if you try to access an ~azure.core.rest.HttpResponse or
~azure.core.rest.AsyncHttpResponse's content without first reading the response's bytes in first.
"""

def __init__(self, response):
message = (
iscai-msft marked this conversation as resolved.
Show resolved Hide resolved
"You have not read in the bytes for the response from request {}. "\
"Call .read() on the response first.".format(
response.request
)
)
super(ResponseNotReadError, self).__init__(message)
37 changes: 37 additions & 0 deletions sdk/core/azure-core/azure/core/pipeline/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,40 @@ def await_result(func, *args, **kwargs):
"Policy {} returned awaitable object in non-async pipeline.".format(func)
)
return result

def to_rest_request(pipeline_transport_request):
from ..rest import HttpRequest as RestHttpRequest
return RestHttpRequest(
method=pipeline_transport_request.method,
url=pipeline_transport_request.url,
headers=pipeline_transport_request.headers,
files=pipeline_transport_request.files,
data=pipeline_transport_request.data
)

def to_rest_response(pipeline_transport_response):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will this method do if users have created their client with a custom transport?
Does it matter here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if they create their client with a custom transport, since i don't have a corresponding rest transport response to change it to, I change it to the default azure.core.rest.HttpResponse or azure.core.rest.AsyncHttpResponse

from .transport._requests_basic import RequestsTransportResponse
from ..rest._requests_basic import RestRequestsTransportResponse
from ..rest import HttpResponse
if isinstance(pipeline_transport_response, RequestsTransportResponse):
response_type = RestRequestsTransportResponse
else:
response_type = HttpResponse
response = response_type(
request=to_rest_request(pipeline_transport_response.request),
internal_response=pipeline_transport_response.internal_response,
)
response._connection_data_block_size = pipeline_transport_response.block_size # pylint: disable=protected-access
return response

def get_block_size(response):
try:
return response._connection_data_block_size # pylint: disable=protected-access
except AttributeError:
return response.block_size

def get_internal_response(response):
try:
return response._internal_response # pylint: disable=protected-access
except AttributeError:
return response.internal_response
35 changes: 35 additions & 0 deletions sdk/core/azure-core/azure/core/pipeline/_tools_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from ._tools import to_rest_request

async def await_result(func, *args, **kwargs):
"""If func returns an awaitable, await it."""
Expand All @@ -31,3 +32,37 @@ async def await_result(func, *args, **kwargs):
# type ignore on await: https://github.com/python/mypy/issues/7587
return await result # type: ignore
return result

def _get_response_type(pipeline_transport_response):
try:
from .transport import AioHttpTransportResponse
from ..rest._aiohttp import RestAioHttpTransportResponse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support (or plan to support) custom transport?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what I'm doing here is "if . However, I can't do a specific corresponding rest transport response for a custom transport since idk what that transport is, so I fall back to just creating the default AsyncHttpResponse out of it

Our goal in a later PR (hopefully in the next release cycle) is to fix the need to transform rest requests and responses, and somehow have our pipelines support both the old and new requests and responses. So this code is also temporary.

if isinstance(pipeline_transport_response, AioHttpTransportResponse):
return RestAioHttpTransportResponse
except ImportError:
pass
try:
from .transport import AsyncioRequestsTransportResponse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to import all of them?

Not only needed in except ImportError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can add a check for each of these to only try to import if response_type is still None. But basically I put them each into their own try except import error block bc, for example, if you have a trio transport and no aiohttp isntalled, importing aiohttp will cause an error for you, so I wanted to accomodate all of the different combos

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I now stop importing once I get a reseponse type. Thanks for the suggestion @xiangyan99 !

from ..rest._requests_asyncio import RestAsyncioRequestsTransportResponse
if isinstance(pipeline_transport_response, AsyncioRequestsTransportResponse):
return RestAsyncioRequestsTransportResponse
except ImportError:
pass
try:
from .transport import TrioRequestsTransportResponse
from ..rest._requests_trio import RestTrioRequestsTransportResponse
if isinstance(pipeline_transport_response, TrioRequestsTransportResponse):
return RestTrioRequestsTransportResponse
except ImportError:
pass
from ..rest import AsyncHttpResponse
return AsyncHttpResponse

def to_rest_response(pipeline_transport_response):
response_type = _get_response_type(pipeline_transport_response)
response = response_type(
request=to_rest_request(pipeline_transport_response.request),
internal_response=pipeline_transport_response.internal_response,
)
response._connection_data_block_size = pipeline_transport_response.block_size # pylint: disable=protected-access
return response
15 changes: 9 additions & 6 deletions sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
AsyncHttpTransport,
AsyncHttpResponse,
_ResponseStopIteration)
from .._tools import get_block_size as _get_block_size, get_internal_response as _get_internal_response

# Matching requests, because why not?
CONTENT_CHUNK_SIZE = 10 * 1024
Expand Down Expand Up @@ -215,22 +216,24 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse, *, decompres
self.pipeline = pipeline
self.request = response.request
self.response = response
self.block_size = response.block_size
self.block_size = _get_block_size(response)
self._decompress = decompress
self.content_length = int(response.internal_response.headers.get('Content-Length', 0))
internal_response = _get_internal_response(response)
self.content_length = int(internal_response.headers.get('Content-Length', 0))
self._decompressor = None

def __len__(self):
return self.content_length

async def __anext__(self):
internal_response = _get_internal_response(self.response)
try:
chunk = await self.response.internal_response.content.read(self.block_size)
chunk = await internal_response.content.read(self.block_size)
if not chunk:
raise _ResponseStopIteration()
if not self._decompress:
return chunk
enc = self.response.internal_response.headers.get('Content-Encoding')
enc = internal_response.headers.get('Content-Encoding')
if not enc:
return chunk
enc = enc.lower()
Expand All @@ -242,13 +245,13 @@ async def __anext__(self):
chunk = self._decompressor.decompress(chunk)
return chunk
except _ResponseStopIteration:
self.response.internal_response.close()
internal_response.close()
raise StopAsyncIteration()
except StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
internal_response.close()
raise

class AioHttpTransportResponse(AsyncHttpResponse):
Expand Down
Loading