Skip to content
This repository has been archived by the owner on Sep 8, 2024. It is now read-only.

Commit

Permalink
Use event to capture messages in then_wait()
Browse files Browse the repository at this point in the history
Instead of busily polling the bus for new messages an event handler is
registered (and teared down after check is complete)
  • Loading branch information
forslund committed Jul 16, 2021
1 parent 31e1e2d commit 15c5e20
Showing 1 changed file with 127 additions and 16 deletions.
143 changes: 127 additions & 16 deletions test/integrationtests/voight_kampff/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

"""Common tools to use when creating step files for behave tests."""

from threading import Event
import time

from mycroft.messagebus import Message
Expand All @@ -23,8 +24,129 @@
TIMEOUT = 10


class CriteriaWaiter:
"""Wait for a message to meet a certain criteria.
Args:
msg_type: message type to watch
criteria_func: Function to determine if a message fulfilling the
test case has been found.
context: behave context
"""
def __init__(self, msg_type, criteria_func, context):
self.msg_type = msg_type
self.criteria_func = criteria_func
self.context = context
self.result = Event()

def reset(self):
"""Reset the wait state."""
self.result.clear()

# TODO: Remove in 21.08
def wait_unspecific(self, timeout):
"""
Wait for a specified time for criteria to be fulfilled by any message.
This use case is deprecated and only for backward compatibility
Args:
timeout: Time allowance for a message fulfilling the criteria, if
provided will override the normal normal step timeout.
Returns:
tuple (bool, str) test status and debug output
"""
timeout = timeout or self.context.step_timeout
start_time = time.monotonic()
debug = ''
while time.monotonic() < start_time + timeout:
for message in self.context.bus.get_messages(None):
status, test_dbg = self.criteria_func(message)
debug += test_dbg
if status:
self.context.matched_message = message
self.context.bus.remove_message(message)
return True, debug
self.context.bus.new_message_available.wait(0.5)
# Timed out return debug from test
return False, debug

def _check_historical_messages(self):
"""Search through the already received messages for a match.
Returns:
tuple (bool, str) test status and debug output
"""
debug = ''
for message in self.context.bus.get_messages(self.msg_type):
status, test_dbg = self.criteria_func(message)
debug += test_dbg
if status:
self.context.matched_message = message
self.context.bus.remove_message(message)
self.result.set()
break
return debug

def wait_specific(self, timeout=None):
"""Wait for a specific message type to fullfil a criteria.
Uses an event-handler to not repeatedly loop.
Args:
timeout: Time allowance for a message fulfilling the criteria, if
provided will override the normal normal step timeout.
Returns:
tuple (bool, str) test status and debug output
"""
timeout = timeout or self.context.step_timeout

debug = ''

def on_message(message):
nonlocal debug
status, test_dbg = self.criteria_func(message)
debug += test_dbg
if status:
self.context.matched_message = message
self.result.set()

self.context.bus.on(self.msg_type, on_message)
# Check historical messages
historical_debug = self._check_historical_messages()

# If no matching message was already caught, wait for it
if not self.result.is_set():
self.result.wait(timeout=timeout)
self.context.bus.remove(self.msg_type, on_message)
return self.result.is_set(), historical_debug + debug

def wait(self, timeout=None):
"""Wait for a specific message type to fullfil a criteria.
Uses an event-handler to not repeatedly loop.
Args:
timeout: Time allowance for a message fulfilling the criteria, if
provided will override the normal normal step timeout.
Returns:
(result (bool), debug (str)) Result containing status and debug
message.
"""
if self.msg_type is None:
return self.wait_unspecific(timeout)
else:
return self.wait_specific(timeout)


def then_wait(msg_type, criteria_func, context, timeout=None):
"""Wait for a specified time for criteria to be fulfilled.
"""Wait for a specific message type to fullfil a criteria.
Uses an event-handler to not repeatedly loop.
Args:
msg_type: message type to watch
Expand All @@ -35,22 +157,11 @@ def then_wait(msg_type, criteria_func, context, timeout=None):
provided will override the normal normal step timeout.
Returns:
tuple (bool, str) test status and debug output
(result (bool), debug (str)) Result containing status and debug
message.
"""
timeout = timeout or context.step_timeout
start_time = time.monotonic()
debug = ''
while time.monotonic() < start_time + timeout:
for message in context.bus.get_messages(msg_type):
status, test_dbg = criteria_func(message)
debug += test_dbg
if status:
context.matched_message = message
context.bus.remove_message(message)
return True, debug
context.bus.new_message_available.wait(0.5)
# Timed out return debug from test
return False, debug
waiter = CriteriaWaiter(msg_type, criteria_func, context)
return waiter.wait(timeout)


def then_wait_fail(msg_type, criteria_func, context, timeout=None):
Expand Down

0 comments on commit 15c5e20

Please sign in to comment.