diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 62533cbe..5f2987ab 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -66,3 +66,4 @@ of those changes to CLEARTYPE SRL. | [@5tefan](https://github.com/5tefan/) | Stefan Codrescu | | [@kuba-lilz](https://github.com/kuba-lilz/) | Jakub Kolodziejczyk | | [@dbowring](https://github.com/dbowring/) | Daniel Bowring | +| [@07pepa](https://github.com/07pepa) | Pepa | \ No newline at end of file diff --git a/dramatiq/results/backend.py b/dramatiq/results/backend.py index 35fcbc50..9b318090 100644 --- a/dramatiq/results/backend.py +++ b/dramatiq/results/backend.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . - +import asyncio import hashlib import time import typing @@ -110,6 +110,53 @@ def get_result(self, message, *, block: bool = False, timeout: typing.Optional[i else: return self.unwrap_result(result) + async def get_result_async(self, message, *, timeout: typing.Optional[int] = None) -> Result: + """Get a result from the backend asynchronously. + This code is non-blocking and will return the result when it is available. + + Parameters: + message(Message) + timeout(int): The maximum amount of time, in ms, to wait for + a result when block is True. Defaults to 10 seconds. + + Raises: + ResultTimeout: When waiting for a result times out. + + Returns: + object: The result. + """ + message_key = self.build_message_key(message) + + result = self._get(message_key) + if result is not Missing: + return self.unwrap_result(result) + + attempts = 1 + timeout_triggered = asyncio.Event() + + async def timeout_setter(sleep_time): + if sleep_time is None: + sleep_time = DEFAULT_TIMEOUT + await asyncio.sleep(sleep_time / 1000) + timeout_triggered.set() + + to_setter = asyncio.create_task(timeout_setter(timeout)) + while not timeout_triggered.is_set(): + try: + attempts, delay = compute_backoff(attempts, factor=BACKOFF_FACTOR) + delay /= 1000 + await asyncio.wait_for(timeout_triggered.wait(), delay) + except asyncio.TimeoutError: + result = self._get(message_key) + if result is not Missing: + return self.unwrap_result(result) + continue + finally: + if not to_setter.done(): + to_setter.cancel() + + raise ResultTimeout(message) + def store_result(self, message, result: Result, ttl: int) -> None: """Store a result in the backend. diff --git a/tests/test_results.py b/tests/test_results.py index 7dc6d9a4..9ce984a0 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -1,3 +1,4 @@ +import asyncio import time from unittest.mock import patch @@ -374,3 +375,28 @@ def do_work(): # Then the result should still be None. assert result_backend.get_result(sent_message) is None + + +async def test_async_get_result_works(result_backend): + @dramatiq.actor(store_results=True) + async def do_work(): + await asyncio.sleep(0.1) + return 42 + + sent_message = do_work.send() + + # Await a result + assert (await result_backend.get_result_async(sent_message)) == 42 + + +async def test_async_get_result_timeouts(result_backend): + @dramatiq.actor(store_results=True) + async def do_work(): + await asyncio.sleep(0.2) + return 42 + + sent_message = do_work.send() + + # Await a result + with pytest.raises(ResultTimeout): + await result_backend.get_result_async(sent_message, timeout=100)