Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Add additional stress test coverage to ensure parity with cross-language priorities #14437

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
class ReceiveType:
push="push"
pull="pull"
none=None


class StressTestResults(object):
Expand Down Expand Up @@ -57,7 +58,8 @@ def __init__(self,
send_delay = .01,
receive_delay = 0,
should_complete_messages = True,
max_message_count = 1):
max_message_count = 1,
send_session_id = None):
self.senders = senders
self.receivers = receivers
self.duration=duration
Expand All @@ -69,6 +71,7 @@ def __init__(self,
self.receive_delay = receive_delay
self.should_complete_messages = should_complete_messages
self.max_message_count = max_message_count
self.send_session_id = send_session_id

# Because of pickle we need to create a state object and not just pass around ourselves.
# If we ever require multiple runs of this one after another, just make Run() reset this.
Expand All @@ -81,14 +84,21 @@ def __init__(self,


# Plugin functions the caller can override to further tailor the test.
def OnSend(self, state, sent_message):
'''Called on every successful send'''
def OnSend(self, state, sent_message, sender):
'''Called on every successful send, per message'''
pass

def OnReceive(self, state, received_message):
'''Called on every successful receive'''
def OnReceive(self, state, received_message, receiver):
'''Called on every successful receive, per message'''
pass

def OnReceiveBatch(self, state, batch, receiver):
KieranBrantnerMagee marked this conversation as resolved.
Show resolved Hide resolved
'''Called on every successful receive, at the batch or iterator level rather than per-message'''
pass

def PostReceive(self, state, receiver):
'''Called after completion of every successful receive'''
pass

def OnComplete(self, send_results=[], receive_results=[]):
'''Called on stress test run completion'''
Expand Down Expand Up @@ -132,8 +142,11 @@ def _Send(self, sender, end_time):
while end_time > datetime.utcnow():
print("SENDING")
message = self._ConstructMessage()
sender.send_messages(message)
self.OnSend(self._state, message)
if self.send_session_id != None:
message.session_id = self.send_session_id
else:
sender.send_messages(message)
self.OnSend(self._state, message, sender)
self._state.total_sent += 1
time.sleep(self.send_delay)
return self._state
Expand All @@ -150,9 +163,12 @@ def _Receive(self, receiver, end_time):
batch = receiver.receive_messages(max_message_count=self.max_message_count, max_wait_time=self.max_wait_time)
elif self.receive_type == ReceiveType.push:
batch = receiver.get_streaming_message_iter(max_wait_time=self.max_wait_time)
elif self.receive_type == ReceiveType.none:
batch = []

self.OnReceiveBatch(self._state, batch, receiver)
for message in batch:
self.OnReceive(self._state, message)
self.OnReceive(self._state, message, receiver)
try:
if self.should_complete_messages:
message.complete()
Expand All @@ -163,6 +179,7 @@ def _Receive(self, receiver, end_time):
if end_time <= datetime.utcnow():
break
time.sleep(self.receive_delay)
self.PostReceive(self._state, receiver)
return self._state
except Exception as e:
print("Exception in receiver", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
import time

from azure.servicebus import ServiceBusClient
from azure.servicebus import ServiceBusClient, AutoLockRenewer
from azure.servicebus._common.constants import ReceiveMode

from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer
Expand Down Expand Up @@ -157,9 +157,10 @@ def test_stress_queue_receive_large_batch_size(self, servicebus_namespace_connec

# Cannot be defined at local scope due to pickling into multiproc runner.
class ReceiverTimeoutStressTestRunner(StressTestRunner):
def OnSend(self, state, sent_message):
def OnSend(self, state, sent_message, sender):
'''Called on every successful send'''
if state.total_sent % 10 == 0:
# To make receive time out, in push mode this delay would trigger receiver reconnection
time.sleep(self.max_wait_time + 5)

@pytest.mark.liveTest
Expand All @@ -179,4 +180,121 @@ def test_stress_queue_pull_receive_timeout(self, servicebus_namespace_connection

result = stress_test.Run()
assert(result.total_sent > 0)
assert(result.total_received > 0)
assert(result.total_received > 0)


class LongRenewStressTestRunner(StressTestRunner):
def OnReceive(self, state, received_message, receiver):
'''Called on every successful receive'''
renewer = AutoLockRenew()
renewer.register(received_message, timeout=300)
time.sleep(300)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_stress_queue_long_renew_send_and_receive(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

stress_test = ServiceBusQueueStressTests.LongRenewStressTestRunner(
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
duration=timedelta(seconds=3000),
send_delay=300)

result = stress_test.Run()
assert(result.total_sent > 0)
assert(result.total_received > 0)


class LongSessionRenewStressTestRunner(StressTestRunner):
def OnReceive(self, state, received_message, receiver):
'''Called on every successful receive'''
renewer = AutoLockRenew()
renewer.register(receiver.Session, timeout=300)
time.sleep(300)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_stress_queue_long_renew_session_send_and_receive(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

session_id = 'test_stress_queue_long_renew_send_and_receive'

stress_test = ServiceBusQueueStressTests.LongSessionRenewStressTestRunner(
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, session_id=session_id)],
duration=timedelta(seconds=3000),
send_delay=300,
send_session_id=session_id)

result = stress_test.Run()
assert(result.total_sent > 0)
assert(result.total_received > 0)


class PeekOnReceiveStressTestRunner(StressTestRunner):
def OnReceiveBatch(self, state, received_message, receiver):
'''Called on every successful receive'''
assert receiver.peek_messages()[0]

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_stress_queue_peek_messages(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

stress_test = ServiceBusQueueStressTests.PeekOnReceiveStressTestRunner(
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
duration = timedelta(seconds=300),
receive_delay = 30,
receive_type = ReceiveType.none)

result = stress_test.Run()
assert(result.total_sent > 0)
# TODO: This merits better validation, to be implemented alongside full metric spread.


class RestartHandlerStressTestRunner(StressTestRunner):
def PostReceive(self, state, receiver):
'''Called after completion of every successful receive'''
if state.total_received % 3 == 0:
receiver.__exit__()
receiver.__enter__()

def OnSend(self, state, sent_message, sender):
'''Called after completion of every successful receive'''
if state.total_sent % 3 == 0:
sender.__exit__()
sender.__enter__()

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_stress_queue_close_and_reopen(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

stress_test = ServiceBusQueueStressTests.RestartHandlerStressTestRunner(
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
duration = timedelta(seconds=300),
receive_delay = 30,
send_delay = 10)

result = stress_test.Run()
assert(result.total_sent > 0)
assert(result.total_received > 0)