From e7e6a2f0479316d36a9d395519a8bbdcfbafd872 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Tue, 7 Nov 2023 10:21:20 +0000 Subject: [PATCH] Changed to use asyncio.Event ... for the manual flush flag. It is passed into the client.data() function to avoid blocking. --- src/pandablocks/asyncio.py | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/pandablocks/asyncio.py b/src/pandablocks/asyncio.py index b9872e675..e27733eb9 100644 --- a/src/pandablocks/asyncio.py +++ b/src/pandablocks/asyncio.py @@ -2,7 +2,7 @@ import logging from asyncio.streams import StreamReader, StreamWriter from contextlib import suppress -from typing import AsyncGenerator, Callable, Dict, Iterable, Optional +from typing import AsyncGenerator, Dict, Iterable, Optional from .commands import Command, T from .connections import ControlConnection, DataConnection @@ -63,8 +63,6 @@ class AsyncioClient: # Control and data ports are now disconnected """ - _flush_now = False - def __init__(self, host: str): self._host = host self._ctrl_connection = ControlConnection() @@ -134,6 +132,7 @@ async def data( scaled: bool = True, flush_period: Optional[float] = None, frame_timeout: Optional[float] = None, + flush_event: Optional[asyncio.Event] = None, ) -> AsyncGenerator[Data, None]: """Connect to data port and yield data frames @@ -149,28 +148,23 @@ async def data( stream = _StreamHelper() connection = DataConnection() queue: asyncio.Queue[Iterable[Data]] = asyncio.Queue() + event = flush_event or asyncio.Event() def raise_timeouterror(): raise asyncio.TimeoutError(f"No data received for {frame_timeout}s") yield - async def flush(): - queue.put_nowait(connection.flush()) - self._flush_now = False - - async def periodic_flush(): - if flush_period: - while True: - # Every flush_period seconds flush and queue data - await asyncio.sleep(flush_period) - await flush() + async def flush_loop(): + timeout = flush_period if flush_period > 0 else None + while True: + await asyncio.wait_for(event.wait(), timeout) + queue.put_nowait(connection.flush()) + event.clear() async def read_from_stream(): reader = stream.reader # Should we flush every FrameData? while True: - if self._flush_now: - await flush() try: recv = await asyncio.wait_for(reader.read(4096), frame_timeout) except asyncio.TimeoutError: @@ -185,7 +179,7 @@ async def read_from_stream(): await stream.connect(self._host, 8889) await stream.write_and_drain(connection.connect(scaled)) - fut = asyncio.gather(periodic_flush(), read_from_stream()) + fut = asyncio.gather(read_from_stream(), flush_loop()) try: while True: for data in await queue.get():