Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Object mapping support #17080

Merged
14 commits merged into from
Mar 6, 2021
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@
import uuid
import logging
import copy
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast

import six

@@ -537,15 +537,8 @@ def __len__(self):

def _from_list(self, messages, parent_span=None):
# type: (Iterable[ServiceBusMessage], AbstractSpan) -> None
for each in messages:
if not isinstance(each, (ServiceBusMessage, dict)):
raise TypeError(
"Only ServiceBusMessage or an iterable object containing ServiceBusMessage "
"objects are accepted. Received instead: {}".format(
each.__class__.__name__
)
)
self._add(each, parent_span)
for message in messages:
self._add(message, parent_span)

@property
def max_size_in_bytes(self):
@@ -566,7 +559,7 @@ def size_in_bytes(self):
return self._size

def add_message(self, message):
# type: (ServiceBusMessage) -> None
# type: (Union[ServiceBusMessage, Mapping[str, Any]]) -> None
"""Try to add a single Message to the batch.
The total size of an added message is the sum of its body, properties, etc.
@@ -581,12 +574,12 @@ def add_message(self, message):

return self._add(message)

def _add(self, message, parent_span=None):
# type: (ServiceBusMessage, AbstractSpan) -> None
def _add(self, add_message, parent_span=None):
# type: (Union[ServiceBusMessage, Mapping[str, Any]], AbstractSpan) -> None
"""Actual add implementation. The shim exists to hide the internal parameters such as parent_span."""

message = create_messages_from_dicts_if_needed(message, ServiceBusMessage) # type: ignore
message = create_messages_from_dicts_if_needed(add_message, ServiceBusMessage)
message = transform_messages_to_sendable_if_needed(message)
message = cast(ServiceBusMessage, message)
trace_message(
message, parent_span
) # parent_span is e.g. if built as part of a send operation.
47 changes: 28 additions & 19 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,8 @@
Optional,
Type,
TYPE_CHECKING,
Union
Union,
cast
)
from contextlib import contextmanager
from msrest.serialization import UTC
@@ -59,19 +60,10 @@
from .receiver_mixins import ReceiverMixin
from .._servicebus_session import BaseSession

# pylint: disable=unused-import, ungrouped-imports
DictMessageType = Union[
Mapping,
MessagesType = Union[
Mapping[str, Any],
ServiceBusMessage,
List[Mapping[str, Any]],
List[ServiceBusMessage],
ServiceBusMessageBatch
]

DictMessageReturnType = Union[
ServiceBusMessage,
List[ServiceBusMessage],
ServiceBusMessageBatch
List[Union[Mapping[str, Any], ServiceBusMessage]]
]

_log = logging.getLogger(__name__)
@@ -222,20 +214,37 @@ def transform_messages_to_sendable_if_needed(messages):
except AttributeError:
return messages


def _single_message_from_dict(message, message_type):
# type: (Union[ServiceBusMessage, Mapping[str, Any]], Type[ServiceBusMessage]) -> ServiceBusMessage
if isinstance(message, message_type):
return message
try:
return message_type(**cast(Mapping[str, Any], message))
except TypeError:
raise TypeError(
"Only ServiceBusMessage instances or Mappings are supported. "
"Received instead: {}".format(
message.__class__.__name__
)
)


def create_messages_from_dicts_if_needed(messages, message_type):
# type: (DictMessageType, type) -> DictMessageReturnType
# type: (MessagesType, Type[ServiceBusMessage]) -> Union[ServiceBusMessage, List[ServiceBusMessage]]
"""
This method is used to convert dict representations
of messages to a list of ServiceBusMessage objects or ServiceBusBatchMessage.
annatisch marked this conversation as resolved.
Show resolved Hide resolved
:param DictMessageType messages: A list or single instance of messages of type ServiceBusMessages or
:param Messages messages: A list or single instance of messages of type ServiceBusMessages or
annatisch marked this conversation as resolved.
Show resolved Hide resolved
dict representations of type ServiceBusMessage. Also accepts ServiceBusBatchMessage.
annatisch marked this conversation as resolved.
Show resolved Hide resolved
:rtype: DictMessageReturnType
:param Type[ServiceBusMessage] message_type: The class type to return the messages as.
:rtype: Union[ServiceBusMessage, List[ServiceBusMessage]]
"""
if isinstance(messages, list):
return [(message_type(**message) if isinstance(message, dict) else message) for message in messages]
return [_single_message_from_dict(m, message_type) for m in messages]
return _single_message_from_dict(messages, message_type)

return_messages = message_type(**messages) if isinstance(messages, dict) else messages
return return_messages

def strip_protocol_from_uri(uri):
# type: (str) -> str
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
import logging
import time
import uuid
from typing import Any, TYPE_CHECKING, Union, List, Optional
from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast

import uamqp
from uamqp import SendClient, types
@@ -42,6 +42,16 @@
import datetime
from azure.core.credentials import TokenCredential

MessageTypes = Union[
Mapping[str, Any],
ServiceBusMessage,
List[Union[Mapping[str, Any], ServiceBusMessage]]
]
MessageObjTypes = Union[
ServiceBusMessage,
ServiceBusMessageBatch,
List[ServiceBusMessage]]

_LOGGER = logging.getLogger(__name__)


@@ -248,7 +258,7 @@ def _send(self, message, timeout=None, last_exception=None):
self._set_msg_timeout(default_timeout, None)

def schedule_messages(self, messages, schedule_time_utc, **kwargs):
# type: (Union[ServiceBusMessage, List[ServiceBusMessage]], datetime.datetime, Any) -> List[int]
# type: (MessageTypes, datetime.datetime, Any) -> List[int]
"""Send Message or multiple Messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.
@@ -272,21 +282,21 @@ def schedule_messages(self, messages, schedule_time_utc, **kwargs):
# pylint: disable=protected-access

self._check_live()
messages = create_messages_from_dicts_if_needed(messages, ServiceBusMessage) # type: ignore
obj_messages = create_messages_from_dicts_if_needed(messages, ServiceBusMessage)
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")

with send_trace_context_manager(span_name=SPAN_NAME_SCHEDULE) as send_span:
if isinstance(messages, ServiceBusMessage):
if isinstance(obj_messages, ServiceBusMessage):
request_body = self._build_schedule_request(
schedule_time_utc, send_span, messages
schedule_time_utc, send_span, obj_messages
)
else:
if len(messages) == 0:
if len(obj_messages) == 0:
return [] # No-op on empty list.
request_body = self._build_schedule_request(
schedule_time_utc, send_span, *messages
schedule_time_utc, send_span, *obj_messages
)
if send_span:
self._add_span_request_attributes(send_span)
@@ -338,7 +348,7 @@ def cancel_scheduled_messages(self, sequence_numbers, **kwargs):
)

def send_messages(self, message, **kwargs):
# type: (Union[ServiceBusMessage, ServiceBusMessageBatch, List[ServiceBusMessage]], Any) -> None
# type: (Union[MessageTypes, ServiceBusMessageBatch], Any) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.
If a list of messages was provided, attempts to send them as a single batch, throwing a
@@ -368,48 +378,44 @@ def send_messages(self, message, **kwargs):
:caption: Send message.
"""

self._check_live()
message = create_messages_from_dicts_if_needed(message, ServiceBusMessage)
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")

with send_trace_context_manager() as send_span:
# Ensure message is sendable (not a ReceivedMessage), and if needed (a list) is batched. Adds tracing.
message = transform_messages_to_sendable_if_needed(message)
try:
for each_message in iter(message): # type: ignore # Ignore type (and below) as it will except if wrong.
add_link_to_send(each_message, send_span)
batch = self.create_message_batch()
batch._from_list(message, send_span) # type: ignore # pylint: disable=protected-access
message = batch
except TypeError: # Message was not a list or generator. Do needed tracing.
if isinstance(message, ServiceBusMessageBatch):
for (
batch_message
) in message.message._body_gen: # pylint: disable=protected-access
add_link_to_send(batch_message, send_span)
elif isinstance(message, ServiceBusMessage):
trace_message(message, send_span)
add_link_to_send(message, send_span)
if isinstance(message, ServiceBusMessageBatch):
for (
batch_message
) in message.message._body_gen: # pylint: disable=protected-access
add_link_to_send(batch_message, send_span)
obj_message = message # type: MessageObjTypes
else:
obj_message = create_messages_from_dicts_if_needed(message, ServiceBusMessage)
# Ensure message is sendable (not a ReceivedMessage), and if needed (a list) is batched. Adds tracing.
obj_message = transform_messages_to_sendable_if_needed(obj_message)
try:
# Ignore type (and below) as it will except if wrong.
for each_message in iter(obj_message): # type: ignore
add_link_to_send(each_message, send_span)
batch = self.create_message_batch()
batch._from_list(obj_message, send_span) # type: ignore # pylint: disable=protected-access
obj_message = batch
except TypeError: # Message was not a list or generator. Do needed tracing.
trace_message(cast(ServiceBusMessage, obj_message), send_span)
add_link_to_send(obj_message, send_span)

if (
isinstance(message, ServiceBusMessageBatch) and len(message) == 0
isinstance(obj_message, ServiceBusMessageBatch) and len(obj_message) == 0
): # pylint: disable=len-as-condition
return # Short circuit noop if an empty list or batch is provided.
if not isinstance(message, (ServiceBusMessageBatch, ServiceBusMessage)):
raise TypeError(
"Can only send azure.servicebus.<ServiceBusMessageBatch, ServiceBusMessage> "
"or lists of ServiceBusMessage."
)
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved

if send_span:
self._add_span_request_attributes(send_span)

self._do_retryable_operation(
self._send,
message=message,
message=obj_message,
timeout=timeout,
operation_requires_timeout=True,
require_last_exception=True,
Loading