Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Multicast poc #32

Open
wants to merge 10 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration/int_3Ri_2BhaRi2_3Re_2BhaRe3/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class Receiver(MessagingHandler, threading.Thread):
Receiver implementation of a Proton client that run as a thread.
"""
def __init__(self, url, message_count, timeout=0, container_id=None, durable=False, save_messages=False,
ignore_dups=False):
super(Receiver, self).__init__()
ignore_dups=False, auto_accept=True, auto_settle=True):
super(Receiver, self).__init__(auto_accept=auto_accept, auto_settle=auto_settle)
threading.Thread.__init__(self)
self.url = url
self.receiver = None
Expand Down
21 changes: 16 additions & 5 deletions integration/int_3Ri_2BhaRi2_3Re_2BhaRe3/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ class Sender(MessagingHandler, threading.Thread):
lock = threading.Lock()

def __init__(self, url, message_count, sender_id, message_size=1024, timeout=0,
user_id=None, proton_option=AtLeastOnce(), use_unique_body=False):
super(Sender, self).__init__()
user_id=None, proton_option=AtLeastOnce(), use_unique_body=False,
auto_accept=True, auto_settle=True):
super(Sender, self).__init__(auto_accept=auto_accept, auto_settle=auto_settle)
threading.Thread.__init__(self)
self.url = url
self.total = message_count
self.sender_id = sender_id
self.sender = None
self.connection = None
self.sent = 0
self.confirmed = 0
self.accepted = 0
self.released = 0
self.rejected = 0
self.modified = 0
self.settled = 0
self.container = None
self.message_size = message_size

Expand Down Expand Up @@ -91,6 +94,7 @@ def is_done_sending(self):
Returns True if all expected messages have been sent or if sender has timed out.
:return:
"""
#????
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a doubt or do you have concerns with the logic?

return self.stopped or (self.total > 0 and (self.sent - self.released - self.rejected == self.total))

def _generate_message_id_and_body(self) -> list:
Expand Down Expand Up @@ -134,15 +138,22 @@ def on_sendable(self, event):

def on_accepted(self, event):
"""
Increases the confirmed count (if delivery not yet in tracker list).
Increases the accepted count (if delivery not yet in tracker list).
:param event:
:return:
"""
if event.delivery not in self.tracker:
logging.debug('Ignoring confirmation for other deliveries - %s' % event.delivery.tag)
self.confirmed += 1
self.accepted += 1
self.verify_sender_done(event)

def on_modified(self, event):
# XXX verify if this has sense, it seems to be never called.
self.modified += 1

def on_settled(self, event):
self.settled += 1

def on_released(self, event):
"""
Increases the released count
Expand Down
202 changes: 202 additions & 0 deletions integration/int_3Ri_2BhaRi2_3Re_2BhaRe3/test_multicast_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import math
import hashlib
import logging
import ast
import time
import os
import random

import pytest
from messaging_abstract.message import Message
from messaging_components.brokers import Artemis
from pytest_iqa.instance import IQAInstance

from integration.int_3Ri_2BhaRi2_3Re_2BhaRe3.receiver import Receiver
from integration.int_3Ri_2BhaRi2_3Re_2BhaRe3.sender import Sender

class Outcome:
accept = "accept"
release = "release"
modify = "modify"
reject = "reject"

class Expected:
accepted = "accepted"
released = "released"
modified = "modified"
rejected = "rejected"

def idfn(outcome):
return outcome["test_id"]

outcomes_config_list = [
{
"recv_outcomes": 4*[Outcome.accept],
"expected": Expected.accepted,
"test_id": "Expect accepted if all accept.",
},
{
## expect REJECTED if any reject:
"recv_outcomes": [Outcome.reject, Outcome.modify, Outcome.release],
"expected": Expected.rejected,
"test_id": "Expect rejected if any reject.",
},
{
"recv_outcomes": [Outcome.reject, Outcome.release],
"expected": Expected.rejected,
"test_id": "Expect rejected if any reject.",
},
#two issues here:
#1) proton sender counts modified as released
#2) the "Expect accept if no rejects" (taken from system tests) fails but according to documentation...
#probably it should fail, from documentation:
#accepted means:
# - All consumers received the message,
# - Or, at least one consumer received the message, but no consumers rejected it.
#in this case no consumer is accepting ...
#{
#"recv_outcomes": [Outcome.modify, Outcome.release],
#"expected": Expected.accepted,
#"test_id": "Expect accept if no rejects=====",
#},
#{
#"recv_outcomes": 3*[Outcome.release] + [Outcome.modify],
#"expected": Expected.modified,
#"test_id": "Expect modified over released=====",
#},
#{
#"recv_outcomes": 2*[Outcome.modify],
#"expected": Expected.modified,
#"test_id": "Expected modify if all modify",
#},
{
"recv_outcomes": 6*[Outcome.release],
"expected": Expected.released,
"test_id": "Release only if all released",
},
]

@pytest.fixture(params=outcomes_config_list, ids=idfn)
def outcomes(request):
return request.param

class _Receiver(Receiver):
def __init__(self, *args, settle=Outcome.accept, **kwargs):
super(_Receiver, self).__init__(*args, auto_accept=False, **kwargs)
self._settle = getattr(self, settle)

def modify(self, delivery):
super(_Receiver, self).release(delivery, delivered=True)

def release(self, delivery):
super(_Receiver, self).release(delivery, delivered=False)

def on_message(self, event):
self.last_received_id[event.message.user_id] = event.message.id
self.received += 1
self.messages.append(event.message.body)

logging.info("settle = %s" % self._settle.__name__)
self._settle(event.delivery)

if self.is_done_receiving():
self.stop_receiver(event.receiver, event.connection)

class _Sender(Sender):
def is_done_sending(self):
return (self.stopped or (self.total > 0 and self.sent == self.total))

class TestMulticast:
MESSAGES_COUNT = 5
MESSAGE_SIZE = 128

TIMEOUT = 4 #why?
address = "multicast/bla"

@staticmethod
def _get_router_url(router, topic):
return "amqp://%s:%s/%s" % (router.node.get_ip(), router.port, topic)

def _sender(self, router, topic):
s = _Sender(url=self._get_router_url(router, topic),
message_count=self.MESSAGES_COUNT,
sender_id='sender-%s' % router.node.hostname,
timeout=self.TIMEOUT,
message_size=self.MESSAGE_SIZE,
use_unique_body=True,
auto_settle=False,
)

s.start()
return s

def _receiver(self, router, topic, settle=Outcome.modify):
r = _Receiver(url=self._get_router_url(router, topic),
message_count=self.MESSAGES_COUNT,
settle=settle,
timeout=self.TIMEOUT,
)
r.start()
return r

def launch_receivers(self, outcomes, iqa):
def _wait(receivers):
for r in receivers:
while not r.receiver:
time.sleep(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if a time.sleep here might cause issues to proton.
Maybe we can use some of the proton events to consider the receivers in the list as ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change this as you suggest


all_routers = iqa.get_routers()
routers = random.sample(all_routers, len(outcomes))
receivers = []
for idx, router in enumerate(routers):
receivers.append(self._receiver(router, self.address,
settle=outcomes[idx]))
_wait(receivers)
return receivers

def test_base_multicast(self, iqa: IQAInstance, router, outcomes):

def _wait_for_all_process_to_terminate(threads):
for t in threads:
t.join()

def _assert_all_receivers_messages(receivers, expected):
for r in receivers:
assert r.messages == expected

def _assert_sender_expected_settlement(sender, expected):
assert sender.settled == self.MESSAGES_COUNT
for e in [Expected.accepted, Expected.released,
Expected.rejected, Expected.modified]:

outcome_count = getattr(sender, e)
if e == expected:
assert outcome_count == self.MESSAGES_COUNT
else:
assert outcome_count == 0

router_send = router

receivers = self.launch_receivers(outcomes["recv_outcomes"], iqa)
sender = self._sender(router_send, self.address)

_wait_for_all_process_to_terminate(receivers + [sender])

logging.info("sender_id: {}".format(sender.sender_id))
assert sender.sent == self.MESSAGES_COUNT

logging.info("""sent: accepted: {}
rejected: {}
released: {}
modified: {}
settled {}
""".format(
sender.accepted,
sender.rejected,
sender.released,
sender.modified,
sender.settled))

_assert_all_receivers_messages(receivers,
expected = [sender.message_body] * self.MESSAGES_COUNT)
_assert_sender_expected_settlement(sender, outcomes["expected"])