Skip to content

Commit

Permalink
Changed to use asyncio.Event
Browse files Browse the repository at this point in the history
... for the manual flush flag.
It is passed into the client.data() function
to avoid blocking.
  • Loading branch information
evalott100 committed Nov 7, 2023
1 parent b53af4f commit e7e6a2f
Showing 1 changed file with 10 additions and 16 deletions.
26 changes: 10 additions & 16 deletions src/pandablocks/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from asyncio.streams import StreamReader, StreamWriter
from contextlib import suppress
from typing import AsyncGenerator, Callable, Dict, Iterable, Optional
from typing import AsyncGenerator, Dict, Iterable, Optional

from .commands import Command, T
from .connections import ControlConnection, DataConnection
Expand Down Expand Up @@ -63,8 +63,6 @@ class AsyncioClient:
# Control and data ports are now disconnected
"""

_flush_now = False

def __init__(self, host: str):
self._host = host
self._ctrl_connection = ControlConnection()
Expand Down Expand Up @@ -134,6 +132,7 @@ async def data(
scaled: bool = True,
flush_period: Optional[float] = None,
frame_timeout: Optional[float] = None,
flush_event: Optional[asyncio.Event] = None,
) -> AsyncGenerator[Data, None]:
"""Connect to data port and yield data frames
Expand All @@ -149,28 +148,23 @@ async def data(
stream = _StreamHelper()
connection = DataConnection()
queue: asyncio.Queue[Iterable[Data]] = asyncio.Queue()
event = flush_event or asyncio.Event()

def raise_timeouterror():
raise asyncio.TimeoutError(f"No data received for {frame_timeout}s")
yield

async def flush():
queue.put_nowait(connection.flush())
self._flush_now = False

async def periodic_flush():
if flush_period:
while True:
# Every flush_period seconds flush and queue data
await asyncio.sleep(flush_period)
await flush()
async def flush_loop():
timeout = flush_period if flush_period > 0 else None
while True:
await asyncio.wait_for(event.wait(), timeout)
queue.put_nowait(connection.flush())
event.clear()

async def read_from_stream():
reader = stream.reader
# Should we flush every FrameData?
while True:
if self._flush_now:
await flush()
try:
recv = await asyncio.wait_for(reader.read(4096), frame_timeout)
except asyncio.TimeoutError:
Expand All @@ -185,7 +179,7 @@ async def read_from_stream():

await stream.connect(self._host, 8889)
await stream.write_and_drain(connection.connect(scaled))
fut = asyncio.gather(periodic_flush(), read_from_stream())
fut = asyncio.gather(read_from_stream(), flush_loop())
try:
while True:
for data in await queue.get():
Expand Down

0 comments on commit e7e6a2f

Please sign in to comment.