Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Pre-GA Bug Bash fixes #15489

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* An improper `receive_mode` value will now raise `ValueError` instead of `TypeError` in line with supporting extensible enums.
* Rename enum values `DeadLetter` to `DEAD_LETTER`, `TransferDeadLetter` to `TRANSFER_DEAD_LETTER`, `PeekLock` to `PEEK_LOCK` and `ReceiveAndDelete` to `RECEIVE_AND_DELETE` to conform to sdk guidelines going forward.
* `send_messages`, `schedule_messages`, `cancel_scheduled_messages` and `receive_deferred_messages` now performs a no-op rather than raising a `ValueError` if provided an empty list of messages or an empty batch.
* `amqp_annotated_message` has been renamed to `raw_amqp_message` to normalize with other SDKs.
* `ServiceBusMessage.amqp_annotated_message` has been renamed to `ServiceBusMessage.raw_amqp_message` to normalize with other SDKs.
* Redesigned error hierarchy based on the service-defined error condition:
- `MessageAlreadySettled` now inherits from `ValueError` instead of `ServiceBusMessageError` as it's a client-side validation.
- Removed `NoActiveSession` which is now replaced by `OperationTimeoutError` as the client times out when trying to connect to any available session.
Expand All @@ -41,6 +41,7 @@ raise `ValueError` if the `queue_name` or `topic_name` does not match the `Entit
- Settling a message that has been peeked will raise `ValueError`.
- Settling a message or renewing a lock on a message received in `RECEIVE_AND_DELETE` receive mode will raise `ValueError`.
- Setting `session_id`, `reply_to_session_id`, `message_id` and `partition_key` on `ServiceBusMessage` longer than 128 characters will raise `ValueError`.
* `ServiceBusReceiver.get_streaming_message_iter` has been made internal for the time being to assess use patterns before comitting to back-compatibility; messages may still be iterated over in equivelent fashion by iterating on the receiver itself.

**BugFixes**

Expand Down
6 changes: 3 additions & 3 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator. This is equivilent to get_streaming_message_iter().
for msg in receiver: # ServiceBusReceiver instance is a generator.
print(str(msg))
# If it is desired to halt receiving early, one can break out of the loop here safely.
```
Expand Down Expand Up @@ -383,7 +383,7 @@ There are various timeouts a user should be aware of within the library.
- 10 minute service side link closure: A link, once opened, will be closed after 10 minutes idle to protect the service against resource leakage. This should largely
be transparent to a user, but if you notice a reconnect occurring after such a duration, this is why. Performing any operations, including management operations, on the
link will extend this timeout.
- max_wait_time: Provided on creation of a receiver or when calling `receive_messages()` or `get_streaming_message_iter()`, the time after which receiving messages will halt after no traffic. This applies both to the imperative `receive_messages()` function as well as the length
- max_wait_time: Provided on creation of a receiver or when calling `receive_messages()`, the time after which receiving messages will halt after no traffic. This applies both to the imperative `receive_messages()` function as well as the length
a generator-style receive will run for before exiting if there are no messages. Passing None (default) will wait forever, up until the 10 minute threshold if no other action is taken.

> **NOTE:** If processing of a message or session is sufficiently long as to cause timeouts, as an alternative to calling `receiver.renew_message_lock`/`receiver.session.renew_lock` manually, one can
Expand Down Expand Up @@ -501,7 +501,7 @@ contact [[email protected]](mailto:[email protected]) with any additio
[client_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.ServiceBusClient
[send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=send_messages#azure.servicebus.ServiceBusSender.send_messages
[receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusReceiver.receive_messages
[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=get_streaming_message_iter#azure.servicebus.ServiceBusReceiver.get_streaming_message_iter
[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=next#azure.servicebus.ServiceBusReceiver.next
[session_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusSessionReceiver.receive_messages
[session_send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=session_id#azure.servicebus.ServiceBusMessage.session_id
[complete_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=complete_message#azure.servicebus.ServiceBusReceiver.complete_message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ def _iter_contextual_wrapper(self, max_wait_time=None):
break
finally:
if original_timeout:
self._handler._timeout = original_timeout
try:
self._handler._timeout = original_timeout
except AttributeError: # Handler may be disposed already.
pass


def _inner_next(self):
# We do this weird wrapping such that an imperitive next() call, and a generator-based iter both trace sanely.
Expand Down Expand Up @@ -484,7 +488,7 @@ def close(self):
super(ServiceBusReceiver, self).close()
self._message_iter = None # pylint: disable=attribute-defined-outside-init

def get_streaming_message_iter(self, max_wait_time=None):
def _get_streaming_message_iter(self, max_wait_time=None):
# type: (Optional[float]) -> Iterator[ServiceBusReceivedMessage]
"""Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until
such a timeout occurs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ async def __anext__(self):
return message
finally:
if original_timeout:
self.receiver._handler._timeout = original_timeout
try:
self.receiver._handler._timeout = original_timeout
except AttributeError: # Handler may be disposed already.
pass


def __aiter__(self):
return self._IterContextualWrapper(self)
Expand Down Expand Up @@ -478,7 +482,7 @@ async def close(self) -> None:
await super(ServiceBusReceiver, self).close()
self._message_iter = None

def get_streaming_message_iter(
def _get_streaming_message_iter(
self,
max_wait_time: Optional[float] = None
) -> AsyncIterator[ServiceBusReceivedMessage]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ async def example_send_and_receive_async():
servicebus_receiver = await example_create_servicebus_receiver_async()
# [START receive_forever_async]
async with servicebus_receiver:
async for message in servicebus_receiver.get_streaming_message_iter():
async for message in servicebus_receiver:
print(str(message))
await servicebus_receiver.complete_message(message)
# [END receive_forever_async]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def example_send_and_receive_sync():
servicebus_receiver = example_create_servicebus_receiver_sync()
# [START receive_forever]
with servicebus_receiver:
for message in servicebus_receiver.get_streaming_message_iter():
for message in servicebus_receiver:
print(str(message))
servicebus_receiver.complete_message(message)
# [END receive_forever]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel
await receiver.receive_messages(max_wait_time=0)

with pytest.raises(ValueError):
await receiver.get_streaming_message_iter(max_wait_time=0)
await receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
async for message in receiver:
Expand Down Expand Up @@ -1142,7 +1142,7 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace
await sender.send_messages([message_a, message_b])

received_messages = []
async for message in receiver.get_streaming_message_iter(max_wait_time=5):
async for message in receiver._get_streaming_message_iter(max_wait_time=5):
received_messages.append(message)
await receiver.complete_message(message)

Expand Down Expand Up @@ -1451,11 +1451,11 @@ async def test_async_queue_receiver_alive_after_timeout(self, servicebus_namespa
messages = []
async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10) as receiver:

async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)
break

async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)

for message in messages:
Expand All @@ -1469,9 +1469,9 @@ async def test_async_queue_receiver_alive_after_timeout(self, servicebus_namespa
message_3 = ServiceBusMessage("3")
await sender.send_messages([message_2, message_3])

async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)
async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)

assert len(messages) == 4
Expand Down Expand Up @@ -1536,19 +1536,19 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi
async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

time_1 = receiver._handler._counter.get_current_ms()
async for message in receiver.get_streaming_message_iter(max_wait_time=10):
async for message in receiver._get_streaming_message_iter(max_wait_time=10):
messages.append(message)
await receiver.complete_message(message)

time_2 = receiver._handler._counter.get_current_ms()
async for message in receiver.get_streaming_message_iter(max_wait_time=1):
async for message in receiver._get_streaming_message_iter(max_wait_time=1):
messages.append(message)
time_3 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2)
time_4 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11)

async for message in receiver.get_streaming_message_iter(max_wait_time=3):
async for message in receiver._get_streaming_message_iter(max_wait_time=3):
messages.append(message)
time_5 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4)
Expand All @@ -1558,7 +1558,7 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi
time_6 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6)

async for message in receiver.get_streaming_message_iter():
async for message in receiver._get_streaming_message_iter():
messages.append(message)
time_7 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def test_subscription_by_subscription_client_conn_str_receive_basic(self,
await receiver.receive_messages(max_wait_time=-1)

with pytest.raises(ValueError):
await receiver.get_streaming_message_iter(max_wait_time=0)
await receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
async for message in receiver:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ def _receive(self, receiver, end_time):
if self.receive_type == ReceiveType.pull:
batch = receiver.receive_messages(max_message_count=self.max_message_count, max_wait_time=self.max_wait_time)
elif self.receive_type == ReceiveType.push:
batch = receiver.get_streaming_message_iter(max_wait_time=self.max_wait_time)
batch = receiver._get_streaming_message_iter(max_wait_time=self.max_wait_time)
else:
batch = []

for message in batch:
self.on_receive(self._state, message, receiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def test_stress_queue_receive_and_delete(self, servicebus_namespace_connection_s

stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE)],
should_complete_messages = False,
duration=timedelta(seconds=60))

result = stress_test.run()
Expand Down Expand Up @@ -186,8 +187,8 @@ def test_stress_queue_pull_receive_timeout(self, servicebus_namespace_connection
class LongRenewStressTestRunner(StressTestRunner):
def on_receive(self, state, received_message, receiver):
'''Called on every successful receive'''
renewer = AutoLockRenew()
renewer.register(received_message, timeout=300)
renewer = AutoLockRenewer()
renewer.register(receiver, received_message, max_lock_renewal_duration=300)
time.sleep(300)

@pytest.mark.liveTest
Expand Down Expand Up @@ -216,7 +217,7 @@ def on_receive(self, state, received_message, receiver):
renewer = AutoLockRenewer()
def on_fail(renewable, error):
print("FAILED AUTOLOCKRENEW: " + str(error))
renewer.register(receiver.session, timeout=600, on_lock_renew_failure=on_fail)
renewer.register(receiver, receiver.session, max_lock_renewal_duration=600, on_lock_renew_failure=on_fail)

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down Expand Up @@ -282,6 +283,7 @@ def on_send(self, state, sent_message, sender):

@pytest.mark.liveTest
@pytest.mark.live_test_only
@pytest.skip(reason='This test is disabled unless re-openability of handlers is desired and re-enabled')
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
Expand All @@ -299,3 +301,52 @@ def test_stress_queue_close_and_reopen(self, servicebus_namespace_connection_str
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)

# This test validates that all individual messages are received contiguously over a long time period.
# (e.g. not dropped for whatever reason, not sent, or not received)
class DroppedMessageCheckerStressTestRunner(StressTestRunner):
KieranBrantnerMagee marked this conversation as resolved.
Show resolved Hide resolved
def on_receive(self, state, received_message, receiver):
'''Called on every successful receive'''
last_seen = getattr(state, 'last_seen', -1)
noncontiguous = getattr(state, 'noncontiguous', set())
body = int(str(received_message))
if body == last_seen+1:
last_seen += 1
if noncontiguous:
while (last_seen+1) in noncontiguous:
last_seen += 1
noncontiguous.remove(last_seen)
else:
noncontiguous.add(body)
state.noncontiguous = noncontiguous
state.last_seen = last_seen

def pre_process_message_body(self, payload):
'''Called when constructing message body'''
try:
body = self._message_id
except:
self._message_id = 0
body = 0
self._message_id += 1

return str(body)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_stress_queue_check_for_dropped_messages(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

stress_test = ServiceBusQueueStressTests.DroppedMessageCheckerStressTestRunner(
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
receive_type=ReceiveType.pull,
duration=timedelta(seconds=3000))

result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)
Loading