Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Updates to multipart support #10487

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions sdk/core/azure-core/azure/core/pipeline/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ def __exit__(self, *exc_details): # pylint: disable=arguments-differ
self._transport.__exit__(*exc_details)

@staticmethod
def _prepare_multipart_mixed_request(request):
# type: (HTTPRequestType) -> None
def _prepare_multipart_mixed_request(request, **kwargs):
# type: (HTTPRequestType, Any) -> None
"""Will execute the multipart policies.

Does nothing if "set_multipart_mixed" was never called.
Expand All @@ -174,7 +174,7 @@ def _prepare_multipart_mixed_request(request):
import concurrent.futures

def prepare_requests(req):
context = PipelineContext(None)
context = PipelineContext(None, **kwargs)
pipeline_request = PipelineRequest(req, context)
for policy in policies:
_await_result(policy.on_request, pipeline_request)
Expand All @@ -194,7 +194,8 @@ def run(self, request, **kwargs):
:return: The PipelineResponse object
:rtype: ~azure.core.pipeline.PipelineResponse
"""
self._prepare_multipart_mixed_request(request)
multipart_options = kwargs.pop("multipart_options", None) or {}
Copy link
Member

@lmazuel lmazuel Apr 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this parameter to be hidden?

self._prepare_multipart_mixed_request(request, **multipart_options)
request.prepare_multipart_body() # type: ignore
context = PipelineContext(self._transport, **kwargs)
pipeline_request = PipelineRequest(
Expand Down
9 changes: 5 additions & 4 deletions sdk/core/azure-core/azure/core/pipeline/_base_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ async def __aenter__(self) -> "AsyncPipeline":
async def __aexit__(self, *exc_details): # pylint: disable=arguments-differ
await self._transport.__aexit__(*exc_details)

async def _prepare_multipart_mixed_request(self, request):
# type: (HTTPRequestType) -> None
async def _prepare_multipart_mixed_request(self, request, **kwargs):
# type: (HTTPRequestType, Any) -> None
"""Will execute the multipart policies.

Does nothing if "set_multipart_mixed" was never called.
Expand All @@ -182,7 +182,7 @@ async def _prepare_multipart_mixed_request(self, request):
policies = multipart_mixed_info[1] # type: List[SansIOHTTPPolicy]

async def prepare_requests(req):
context = PipelineContext(None)
context = PipelineContext(None, **kwargs)
pipeline_request = PipelineRequest(req, context)
for policy in policies:
await _await_result(policy.on_request, pipeline_request)
Expand All @@ -201,7 +201,8 @@ async def run(self, request: HTTPRequestType, **kwargs: Any):
:return: The PipelineResponse object.
:rtype: ~azure.core.pipeline.PipelineResponse
"""
await self._prepare_multipart_mixed_request(request)
multipart_options = kwargs.pop("multipart_options", None) or {}
await self._prepare_multipart_mixed_request(request, **multipart_options)
request.prepare_multipart_body() # type: ignore
context = PipelineContext(self._transport, **kwargs)
pipeline_request = PipelineRequest(request, context)
Expand Down
110 changes: 82 additions & 28 deletions sdk/core/azure-core/azure/core/pipeline/transport/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
Optional,
Tuple,
Iterator,
Type
)

from six.moves.http_client import HTTPConnection, HTTPResponse as _HTTPResponse
Expand Down Expand Up @@ -138,6 +139,37 @@ def _urljoin(base_url, stub_url):
return parsed.geturl()


def _calculate_changesets(changesets):
# type: (List[Union[Tuple[int, int, str], Tuple[int, int]]]) -> Tuple[Dict[int, Optional[str]], List[int]]
"""Convert the changeset ranges into start/end indexes."""
start = {}
end = []
validate_indexes = []
boundary = None # type: Optional[str]
for changeset in changesets:
try:
start_index, end_index, boundary = cast('Tuple[int, int, str]', changeset)
except ValueError:
start_index, end_index = cast('Tuple[int, int]', changeset)
validate_indexes.extend(list(range(start_index, end_index + 1)))
start[start_index] = boundary
end.append(end_index)
if len(validate_indexes) != len(set(validate_indexes)):
raise ValueError("Changesets must not overlap.")
return start, end


def _add_message_part(index, message, request):
# type: (int, Message, HttpRequest) -> None
"""Add each request to the message batch."""
part_message = Message()
part_message.add_header("Content-Type", "application/http")
part_message.add_header("Content-Transfer-Encoding", "binary")
part_message.add_header("Content-ID", str(index))
part_message.set_payload(request.serialize())
message.attach(part_message)


class _HTTPSerializer(HTTPConnection, object):
"""Hacking the stdlib HTTPConnection to serialize HTTP request as strings.
"""
Expand Down Expand Up @@ -377,13 +409,19 @@ def set_multipart_mixed(self, *requests, **kwargs):

:keyword list[SansIOHTTPPolicy] policies: SansIOPolicy to apply at preparation time
:keyword str boundary: Optional boundary

:keyword changesets: Optional list of tuples marking the change sets within the request list.
Each tuple should contain two integers: the start and inclusive-end indexes of each change set
within the request list. Optionally, the tuple can also contain a changeset boundary string.
Any request that fall outside the changeset ranges will be added to the batch normally.
Change set ranges must not overlap.
:paramtype changesets: List[Union[Tuple[int, int, str], Tuple[int, int]]]
:param requests: HttpRequests object
"""
self.multipart_mixed_info = (
requests,
kwargs.pop("policies", []),
kwargs.pop("boundary", []),
kwargs.pop("boundary", None),
kwargs.pop("changesets", [])
)

def prepare_multipart_body(self):
Expand All @@ -398,21 +436,28 @@ def prepare_multipart_body(self):
if not self.multipart_mixed_info:
return

requests = self.multipart_mixed_info[0] # type: List[HttpRequest]
boundary = self.multipart_mixed_info[2] # type: Optional[str]
requests = self.multipart_mixed_info[0] # type: List[HttpRequest]
start_changeset, end_changeset = _calculate_changesets(self.multipart_mixed_info[3])

# Update the main request with the body
main_message = Message()
main_message.add_header("Content-Type", "multipart/mixed")
if boundary:
main_message.set_boundary(boundary)
for i, req in enumerate(requests):
part_message = Message()
part_message.add_header("Content-Type", "application/http")
part_message.add_header("Content-Transfer-Encoding", "binary")
part_message.add_header("Content-ID", str(i))
part_message.set_payload(req.serialize())
main_message.attach(part_message)

working_message = main_message
for index, request in enumerate(requests):
if index in start_changeset:
working_message = Message()
working_message.add_header("Content-Type", "multipart/mixed")
changeset_boundary = start_changeset[index]
if changeset_boundary:
working_message.set_boundary(changeset_boundary)
_add_message_part(index, working_message, request)
if index in end_changeset:
main_message.attach(working_message)
working_message = main_message

try:
from email.policy import HTTP
Expand Down Expand Up @@ -482,6 +527,32 @@ def text(self, encoding=None):
encoding = "utf-8-sig"
return self.body().decode(encoding)

def _decode_parts(self, message, http_response_type, requests):
# type: (Message, Type[_HttpResponseBase], List[HttpRequest]) -> List[HttpResponse]
"""Rebuild an HTTP response from pure string."""
responses = []
offset = 0
for index, raw_reponse in enumerate(message.get_payload()):
content_type = raw_reponse.get_content_type()
if content_type == "application/http":
responses.append(
_deserialize_response(
raw_reponse.get_payload(decode=True),
requests[index],
http_response_type=http_response_type,
)
)
elif content_type == "multipart/mixed":
# The message batch contains one or more change sets
changeset_responses = self._decode_parts(raw_reponse, http_response_type, requests[offset:])
offset += len(changeset_responses)
responses.extend(changeset_responses)
else:
raise ValueError(
"Multipart doesn't support part other than application/http for now"
)
return responses

def _get_raw_parts(self, http_response_type=None):
# type (Optional[Type[_HttpResponseBase]]) -> Iterator[HttpResponse]
"""Assuming this body is multipart, return the iterator or parts.
Expand All @@ -500,26 +571,9 @@ def _get_raw_parts(self, http_response_type=None):
+ b"\r\n\r\n"
+ body_as_bytes
)

message = message_parser(http_body) # type: Message

# Rebuild an HTTP response from pure string
requests = self.request.multipart_mixed_info[0] # type: List[HttpRequest]
responses = []
for request, raw_reponse in zip(requests, message.get_payload()):
if raw_reponse.get_content_type() == "application/http":
responses.append(
_deserialize_response(
raw_reponse.get_payload(decode=True),
request,
http_response_type=http_response_type,
)
)
else:
raise ValueError(
"Multipart doesn't support part other than application/http for now"
)
return responses
return self._decode_parts(message, http_response_type, requests)


class HttpResponse(_HttpResponseBase): # pylint: disable=abstract-method
Expand Down
Loading