Skip to content

Commit

Permalink
Rewrite async request tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ocaballeror committed Apr 5, 2018
1 parent 7377fa6 commit d18e221
Showing 1 changed file with 84 additions and 98 deletions.
182 changes: 84 additions & 98 deletions osbrain/tests/test_agent_async_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
"""
import time

import pytest

from osbrain import run_agent
from osbrain import Agent
from osbrain import run_logger
from osbrain.helper import sync_agent_logger
from osbrain.helper import logger_received
from osbrain.helper import wait_agent_attr

from common import nsproxy # noqa: F401
from common import append_received
Expand All @@ -22,180 +21,167 @@ def on_error(agent):
class Server(Agent):
def on_init(self):
self.received = []
self.blocked = False


class Client(Agent):
def on_init(self):
self.received = []


@pytest.fixture(scope='function')
def server_client_late_reply_return():
def late_reply(agent, request):
agent.received.append(request)
time.sleep(1)
return 'x' + request
def blocked_reply(agent, request):
agent.blocked = True
agent.received.append(request)
while agent.blocked:
time.sleep(.01)

return 'x' + request


def async_client_server(server_handler=blocked_reply):
server = run_agent('server', base=Server)
client = run_agent('client', base=Client)

addr = server.bind('ASYNC_REP', alias='replier', handler=late_reply)
addr = server.bind('ASYNC_REP', alias='replier', handler=server_handler)
client.connect(addr, alias='async', handler=append_received)

return (server, client)
return (client, server)


@pytest.fixture(scope='function')
def server_client_late_reply_delay():
def late_reply(agent, delay):
agent.received.append(delay)
time.sleep(delay)
return 'x' + str(delay)
def async_client_server_pair(server_handler=blocked_reply):
client, server1 = async_client_server(server_handler)

server = run_agent('server', base=Server)
client = run_agent('client', base=Client)
server2 = run_agent('server2', base=Server)
addr2 = server2.bind('ASYNC_REP', alias='replier2', handler=server_handler)
client.connect(addr2, alias='async2', handler=append_received)

addr = server.bind('ASYNC_REP', alias='replier', handler=late_reply)
client.connect(addr, alias='async', handler=append_received)

return (server, client)
return (client, server1, server2)


def test_return(nsproxy, server_client_late_reply_return):
def test_return(nsproxy):
"""
Asynchronous request-reply pattern with a reply handler that returns.
"""

server, client = server_client_late_reply_return

# Client requests should be non-blocking
t0 = time.time()
client, server1, server2 = async_client_server_pair()
client.send('async', 'foo')
client.send('async', 'bar')
assert time.time() - t0 < 0.1
client.send('async2', 'qux')

# Server should receive first request "soon"
time.sleep(0.1)
assert server.get_attr('received') == ['foo']
assert client.get_attr('received') == []
assert wait_agent_attr(server1, value=['foo'], timeout=.1)
assert wait_agent_attr(server2, value=['qux'], timeout=.1)
assert not wait_agent_attr(client, length=1, timeout=.1)

server2.unsafe.set_attr(blocked=False)
assert wait_agent_attr(client, value=['xqux'], timeout=.1)

# Wait for client to receive reply asynchronously
time.sleep(1)
assert server.get_attr('received') == ['foo', 'bar']
assert client.get_attr('received') == ['xfoo']
time.sleep(1)
assert server.get_attr('received') == ['foo', 'bar']
assert client.get_attr('received') == ['xfoo', 'xbar']
server1.unsafe.set_attr(blocked=False)
assert wait_agent_attr(client, value=['xqux', 'xfoo'], timeout=.1)
server1.unsafe.set_attr(blocked=False)
assert wait_agent_attr(client, value=['xqux', 'xfoo', 'xbar'], timeout=.1)


def test_yield(nsproxy):
"""
Asynchronous request-reply pattern with a reply handler that yields.
"""
def late_reply(agent, request):
def blocked_reply_yield(agent, request):
agent.received.append(request)
time.sleep(1)
agent.blocked = True
while agent.blocked:
time.sleep(.01)

yield 'x' + request
agent.received.append('y' + request)

server = run_agent('server', base=Server)
client = run_agent('client', base=Client)

addr = server.bind('ASYNC_REP', alias='replier', handler=late_reply)
client.connect(addr, alias='async', handler=append_received)

# Client requests should be non-blocking
t0 = time.time()
client, server1, server2 = async_client_server_pair(blocked_reply_yield)
client.send('async', 'foo')
client.send('async', 'bar')
assert time.time() - t0 < 0.1
client.send('async2', 'bar')

# Server should receive first request "soon"
time.sleep(0.1)
assert server.get_attr('received') == ['foo']
assert client.get_attr('received') == []
assert wait_agent_attr(server1, value=['foo'], timeout=.1)
assert wait_agent_attr(server2, value=['bar'], timeout=.1)
assert not wait_agent_attr(client, length=1, timeout=.1)

# Wait for client to receive reply asynchronously
time.sleep(1)
assert server.get_attr('received') == ['foo', 'yfoo', 'bar']
assert client.get_attr('received') == ['xfoo']
time.sleep(1)
assert server.get_attr('received') == ['foo', 'yfoo', 'bar', 'ybar']
assert client.get_attr('received') == ['xfoo', 'xbar']
server2.unsafe.set_attr(blocked=False)
assert wait_agent_attr(client, value=['xbar'], timeout=.1)
assert wait_agent_attr(server2, value=['bar', 'ybar'], timeout=.1)

server1.unsafe.set_attr(blocked=False)
assert wait_agent_attr(client, value=['xbar', 'xfoo'], timeout=.1)
assert wait_agent_attr(server1, value=['foo', 'yfoo'], timeout=.1)

def test_unknown(nsproxy, server_client_late_reply_return):

def test_unknown(nsproxy):
"""
When receiving a response for an unknown request (or an already processed
request), a warning should be logged and handler should not be executed.
"""
server, client = server_client_late_reply_return
client, server = async_client_server()

logger = run_logger('logger')
client.set_logger(logger)
sync_agent_logger(client, logger)

client.send('async', 'foo')
assert wait_agent_attr(server, value=['foo'], timeout=.1)

# Manually remove the pending request before it is received
client.set_attr(_pending_requests={})
time.sleep(1.1)
assert server.get_attr('received') == ['foo']
assert client.get_attr('received') == []
server.unsafe.set_attr(blocked=False)
assert logger_received(logger,
log_name='log_history_warning',
message='Received response for an unknown request!')
message='Received response for an unknown request!',
timeout=2)
assert len(client.get_attr('received')) == 0


def test_wait(nsproxy, server_client_late_reply_delay):
def test_wait_in_time(nsproxy):
"""
Asynchronous request-reply pattern maximum wait.
"""
server, client = server_client_late_reply_delay
client, server = async_client_server()

logger = run_logger('logger')
client.set_logger(logger)
sync_agent_logger(client, logger)

# Response received in time
client.send('async', 1, wait=2)
time.sleep(1.1)
assert server.get_attr('received') == [1]
assert client.get_attr('received') == ['x1']
client.send('async', 'foo', wait=1)
assert wait_agent_attr(server, value=['foo'])
server.unsafe.set_attr(blocked=False)
assert wait_agent_attr(client, value=['xfoo'])

# Response not received in time
client.send('async', 2, wait=1)

def test_wait_timeout(nsproxy):
"""
Asynchronous request-reply pattern when exceeding the maximum wait.
"""
client, server = async_client_server()
logger = run_logger('logger')
client.set_logger(logger)
sync_agent_logger(client, logger)

client.send('async', 'foo', wait=.5)
assert len(client.get_attr('_pending_requests')) == 1
assert logger_received(logger,
log_name='log_history_warning',
message='not receive req',
timeout=1.1)
timeout=.6)
assert len(client.get_attr('_pending_requests')) == 0

server.unsafe.set_attr(blocked=False)
assert logger_received(logger,
log_name='log_history_warning',
message='Received response for an unknown request!',
timeout=2)
assert len(client.get_attr('_pending_requests')) == 0
assert server.get_attr('received') == [1, 2]
assert client.get_attr('received') == ['x1']
message='Received response for an unknown request!')
assert server.get_attr('received') == ['foo']
assert client.get_attr('received') == []


def test_wait_on_error(nsproxy, server_client_late_reply_delay):
def test_wait_on_error(nsproxy):
"""
Asynchronous request-reply pattern maximum wait with an error handler.
"""
server, client = server_client_late_reply_delay
client, server = async_client_server()

client.set_attr(error_count=0)

# Response received in time
client.send('async', 1, wait=2)
time.sleep(1.1)
assert server.get_attr('received') == [1]
assert client.get_attr('received') == ['x1']

# Response not received in time
client.send('async', 2, wait=1, on_error=on_error)
time.sleep(1.1)
assert client.get_attr('error_count') == 1
assert client.get_attr('received') == ['x1']
client.send('async', 'foo', wait=.5, on_error=on_error)
assert wait_agent_attr(client, 'error_count', value=1, timeout=.6)
server.unsafe.set_attr(blocked=False)

0 comments on commit d18e221

Please sign in to comment.