Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Add additional stress test coverage to ensure parity with cross-language priorities #14437

Original file line number Diff line number Diff line change
Expand Up @@ -84,77 +84,77 @@ def __init__(self,


# Plugin functions the caller can override to further tailor the test.
def OnSend(self, state, sent_message, sender):
def on_send(self, state, sent_message, sender):
'''Called on every successful send, per message'''
pass

def OnReceive(self, state, received_message, receiver):
def on_receive(self, state, received_message, receiver):
'''Called on every successful receive, per message'''
pass

def OnReceiveBatch(self, state, batch, receiver):
def on_receive_batch(self, state, batch, receiver):
'''Called on every successful receive, at the batch or iterator level rather than per-message'''
pass

def PostReceive(self, state, receiver):
def post_receive(self, state, receiver):
'''Called after completion of every successful receive'''
pass

def OnComplete(self, send_results=[], receive_results=[]):
def on_complete(self, send_results=[], receive_results=[]):
'''Called on stress test run completion'''
pass


def PreProcessMessage(self, message):
def pre_process_message(self, message):
'''Allows user to transform the message before batching or sending it.'''
pass


def PreProcessMessageBatch(self, message):
def pre_process_message_batch(self, message):
'''Allows user to transform the batch before sending it.'''
pass


def PreProcessMessageBody(self, payload):
def pre_process_message_body(self, payload):
'''Allows user to transform message payload before sending it.'''
return payload


def _ConstructMessage(self):
def _construct_message(self):
if self.send_batch_size != None:
batch = BatchMessage()
for _ in range(self.send_batch_size):
message = Message(self.PreProcessMessageBody("a" * self.message_size))
self.PreProcessMessage(message)
message = Message(self.pre_process_message_body("a" * self.message_size))
self.pre_process_message(message)
batch.add(message)
self.PreProcessMessageBatch(batch)
self.pre_process_message_batch(batch)
return batch
else:
message = Message(self.PreProcessMessageBody("a" * self.message_size))
self.PreProcessMessage(message)
message = Message(self.pre_process_message_body("a" * self.message_size))
self.pre_process_message(message)
return message


def _Send(self, sender, end_time):
def _send(self, sender, end_time):
try:
print("STARTING SENDER")
with sender:
while end_time > datetime.utcnow():
print("SENDING")
message = self._ConstructMessage()
message = self._construct_message()
if self.send_session_id != None:
message.session_id = self.send_session_id
else:
sender.send_messages(message)
self.OnSend(self._state, message, sender)
sender.send_messages(message)
self.on_send(self._state, message, sender)
self._state.total_sent += 1
time.sleep(self.send_delay)
return self._state
except Exception as e:
print("Exception in sender", e)
return self._state


def _Receive(self, receiver, end_time):
def _receive(self, receiver, end_time):
try:
with receiver:
while end_time > datetime.utcnow():
Expand All @@ -166,9 +166,9 @@ def _Receive(self, receiver, end_time):
elif self.receive_type == ReceiveType.none:
batch = []

self.OnReceiveBatch(self._state, batch, receiver)
self.on_receive_batch(self._state, batch, receiver)
for message in batch:
self.OnReceive(self._state, message, receiver)
self.on_receive(self._state, message, receiver)
try:
if self.should_complete_messages:
message.complete()
Expand All @@ -179,21 +179,22 @@ def _Receive(self, receiver, end_time):
if end_time <= datetime.utcnow():
break
time.sleep(self.receive_delay)
self.PostReceive(self._state, receiver)
self.post_receive(self._state, receiver)
return self._state
except Exception as e:
print("Exception in receiver", e)
return self._state


def Run(self):
def run(self):
start_time = datetime.utcnow()
end_time = start_time + (self._duration_override or self.duration)
sent_messages = 0
received_messages = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as proc_pool:
print("STARTING PROC POOL")
senders = [proc_pool.submit(self._Send, sender, end_time) for sender in self.senders]
receivers = [proc_pool.submit(self._Receive, receiver, end_time) for receiver in self.receivers]
senders = [proc_pool.submit(self._send, sender, end_time) for sender in self.senders]
receivers = [proc_pool.submit(self._receive, receiver, end_time) for receiver in self.receivers]

result = StressTestResults()
for each in concurrent.futures.as_completed(senders + receivers):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_stress_queue_send_and_receive(self, servicebus_namespace_connection_str
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
duration=timedelta(seconds=60))

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)

Expand All @@ -55,7 +55,7 @@ def test_stress_queue_send_and_pull_receive(self, servicebus_namespace_connectio
receive_type=ReceiveType.pull,
duration=timedelta(seconds=60))

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)

Expand All @@ -74,7 +74,7 @@ def test_stress_queue_batch_send_and_receive(self, servicebus_namespace_connecti
duration=timedelta(seconds=60),
send_batch_size=5)

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)

Expand All @@ -93,7 +93,7 @@ def test_stress_queue_slow_send_and_receive(self, servicebus_namespace_connectio
duration=timedelta(seconds=3501*3),
send_delay=3500)

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)

Expand All @@ -111,7 +111,7 @@ def test_stress_queue_receive_and_delete(self, servicebus_namespace_connection_s
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, receive_mode=ReceiveMode.ReceiveAndDelete)],
duration=timedelta(seconds=60))

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)

Expand All @@ -130,7 +130,7 @@ def test_stress_queue_unsettled_messages(self, servicebus_namespace_connection_s
duration = timedelta(seconds=350),
should_complete_messages = False)

result = stress_test.Run()
result = stress_test.run()
# This test is prompted by reports of an issue where enough unsettled messages saturate a service-side cache
# and prevent further receipt.
assert(result.total_sent > 2500)
Expand All @@ -151,13 +151,13 @@ def test_stress_queue_receive_large_batch_size(self, servicebus_namespace_connec
duration = timedelta(seconds=60),
max_message_count = 50)

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)

# Cannot be defined at local scope due to pickling into multiproc runner.
class ReceiverTimeoutStressTestRunner(StressTestRunner):
def OnSend(self, state, sent_message, sender):
def on_send(self, state, sent_message, sender):
'''Called on every successful send'''
if state.total_sent % 10 == 0:
# To make receive time out, in push mode this delay would trigger receiver reconnection
Expand All @@ -179,13 +179,13 @@ def test_stress_queue_pull_receive_timeout(self, servicebus_namespace_connection
receive_type=ReceiveType.pull,
duration=timedelta(seconds=600))

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)


class LongRenewStressTestRunner(StressTestRunner):
def OnReceive(self, state, received_message, receiver):
def on_receive(self, state, received_message, receiver):
'''Called on every successful receive'''
renewer = AutoLockRenew()
renewer.register(received_message, timeout=300)
Expand All @@ -206,23 +206,24 @@ def test_stress_queue_long_renew_send_and_receive(self, servicebus_namespace_con
duration=timedelta(seconds=3000),
send_delay=300)

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)


class LongSessionRenewStressTestRunner(StressTestRunner):
def OnReceive(self, state, received_message, receiver):
def on_receive(self, state, received_message, receiver):
'''Called on every successful receive'''
renewer = AutoLockRenew()
renewer.register(receiver.Session, timeout=300)
time.sleep(300)
renewer = AutoLockRenewer()
def on_fail(renewable, error):
print("FAILED AUTOLOCKRENEW: " + str(error))
renewer.register(receiver.session, timeout=600, on_lock_renew_failure=on_fail)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
def test_stress_queue_long_renew_session_send_and_receive(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)
Expand All @@ -231,18 +232,18 @@ def test_stress_queue_long_renew_session_send_and_receive(self, servicebus_names

stress_test = ServiceBusQueueStressTests.LongSessionRenewStressTestRunner(
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, session_id=session_id)],
receivers = [sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id)],
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
duration=timedelta(seconds=3000),
send_delay=300,
send_session_id=session_id)

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)


class PeekOnReceiveStressTestRunner(StressTestRunner):
def OnReceiveBatch(self, state, received_message, receiver):
class Peekon_receiveStressTestRunner(StressTestRunner):
def on_receive_batch(self, state, received_message, receiver):
'''Called on every successful receive'''
assert receiver.peek_messages()[0]

Expand All @@ -255,26 +256,26 @@ def test_stress_queue_peek_messages(self, servicebus_namespace_connection_string
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

stress_test = ServiceBusQueueStressTests.PeekOnReceiveStressTestRunner(
stress_test = ServiceBusQueueStressTests.Peekon_receiveStressTestRunner(
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
duration = timedelta(seconds=300),
receive_delay = 30,
receive_type = ReceiveType.none)

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
# TODO: This merits better validation, to be implemented alongside full metric spread.


class RestartHandlerStressTestRunner(StressTestRunner):
def PostReceive(self, state, receiver):
def post_receive(self, state, receiver):
'''Called after completion of every successful receive'''
if state.total_received % 3 == 0:
receiver.__exit__()
receiver.__enter__()

def OnSend(self, state, sent_message, sender):
def on_send(self, state, sent_message, sender):
'''Called after completion of every successful receive'''
if state.total_sent % 3 == 0:
sender.__exit__()
Expand All @@ -296,6 +297,6 @@ def test_stress_queue_close_and_reopen(self, servicebus_namespace_connection_str
receive_delay = 30,
send_delay = 10)

result = stress_test.Run()
result = stress_test.run()
assert(result.total_sent > 0)
assert(result.total_received > 0)