Skip to content

Commit

Permalink
Allowed for manual flushing
Browse files Browse the repository at this point in the history
Added a new pv to change the flush mode, it can be
periodic, manual, or immediate.
Added a PVI button for it manual.
Adjusted tests to work for it.
  • Loading branch information
evalott100 committed Nov 13, 2023
1 parent 7e4ea96 commit f8b57de
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 31 deletions.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
("py:class", "'id'"),
("py:class", "typing_extensions.Literal"),
("py:func", "int"),
("py:class", "asyncio.locks.Event"),
]

# Both the class’ and the __init__ method’s docstring are concatenated and
Expand Down
85 changes: 61 additions & 24 deletions src/pandablocks/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
import logging
from asyncio.streams import StreamReader, StreamWriter
from contextlib import suppress
from typing import AsyncGenerator, Dict, Iterable, Optional, Sequence
from enum import Enum
from typing import AsyncGenerator, Dict, Iterable, Optional

from .commands import Command, T
from .connections import ControlConnection, DataConnection
from .responses import Data

# Define the public API of this module
__all__ = ["AsyncioClient"]
__all__ = ["AsyncioClient", "FlushMode"]


class _StreamHelper:
Expand Down Expand Up @@ -49,6 +51,16 @@ async def close(self):
await writer.wait_closed()


class FlushMode(Enum):
"""
The mode which `AsyncioClient.data()` uses when flushing data frames.
"""

IMMEDIATE = 0 # Flush every frame
PERIODIC = 1 # Flush periodically
MANUAL = 2 # Flush only when the user presses the button to flush


class AsyncioClient:
"""Asyncio implementation of a PandABlocks client.
For example::
Expand All @@ -71,7 +83,7 @@ def __init__(self, host: str):

async def connect(self):
"""Connect to the control port, and be ready to handle commands"""
await self._ctrl_stream.connect(self._host, 8888),
await self._ctrl_stream.connect(self._host, 8888)

self._ctrl_task = asyncio.create_task(
self._ctrl_read_forever(self._ctrl_stream.reader)
Expand Down Expand Up @@ -131,15 +143,24 @@ async def data(
scaled: bool = True,
flush_period: Optional[float] = None,
frame_timeout: Optional[float] = None,
flush_event: Optional[asyncio.Event] = None,
flush_mode: FlushMode = FlushMode.IMMEDIATE,
) -> AsyncGenerator[Data, None]:
"""Connect to data port and yield data frames
Args:
scaled: Whether to scale and average data frames, reduces throughput
flush_period: How often to flush partial data frames, None is on every
chunk of data from the server
flush_period: How often to flush partial data frames when flush_mode
is ``PERIODIC``.
frame_timeout: If no data is received for this amount of time, raise
`asyncio.TimeoutError`
``asyncio.TimeoutError``.
flush_event: An `asyncio.Event` to manually flush. When set while
``flush_mode`` is ``MANUAL`` a flush will be performed and the event
will be unset.
flush_mode: One of the ``FlushMode`` values:
``IMMEDIATE`` flush every frame
``PERIODIC`` flush every ``flush_period`` seconds
``MANUAL`` flush only when ``flush_event`` is set
"""

stream = _StreamHelper()
Expand All @@ -150,29 +171,54 @@ def raise_timeouterror():
raise asyncio.TimeoutError(f"No data received for {frame_timeout}s")
yield

async def periodic_flush():
if flush_period is not None:
while True:
# Every flush_period seconds flush and queue data
await asyncio.sleep(flush_period)
queue.put_nowait(connection.flush())
async def flush_manual():
while True:
await flush_event.wait()
queue.put_nowait(connection.flush())
flush_event.clear()

async def flush_periodic():
while True:
await asyncio.sleep(flush_period)
queue.put_nowait(connection.flush())

async def flush_immediate():
pass

if flush_mode == FlushMode.MANUAL:
if not flush_event:
raise ValueError("flush_event cannot be None if flush_mode is 'Manual'")
flush_loop = flush_manual
elif flush_mode == FlushMode.PERIODIC:
if not flush_period:
raise ValueError("flush_period cannot be 0 if flush_mode is 'Periodic'")
flush_loop = flush_periodic
elif flush_mode == FlushMode.IMMEDIATE:
flush_loop = flush_immediate
else:
raise ValueError(
"flush_mode must be one of 'IMMEDIATE', 'PERIODIC', 'MANUAL'"
)

async def read_from_stream():
reader = stream.reader
# Should we flush every FrameData?
flush_every_frame = flush_period is None
while True:
try:
recv = await asyncio.wait_for(reader.read(4096), frame_timeout)
except asyncio.TimeoutError:
queue.put_nowait(raise_timeouterror())
break
else:
queue.put_nowait(connection.receive_bytes(recv, flush_every_frame))
queue.put_nowait(
connection.receive_bytes(
recv, flush_every_frame=(flush_mode == FlushMode.IMMEDIATE)
)
)

await stream.connect(self._host, 8889)
await stream.write_and_drain(connection.connect(scaled))
fut = asyncio.gather(periodic_flush(), read_from_stream())
fut = asyncio.gather(read_from_stream(), flush_loop())
try:
while True:
for data in await queue.get():
Expand All @@ -182,12 +228,3 @@ async def read_from_stream():
await stream.close()
with suppress(asyncio.CancelledError):
await fut

async def flush_now(self) -> Sequence[Data]:
stream = _StreamHelper()
connection = DataConnection()
await stream.connect(self._host, 8889)
await stream.write_and_drain(connection.connect(False))
frame_data = list(connection.flush())
await stream.close()
return frame_data
106 changes: 99 additions & 7 deletions tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import pytest

from pandablocks.asyncio import AsyncioClient
from pandablocks.asyncio import AsyncioClient, FlushMode
from pandablocks.commands import CommandException, Get, Put
from pandablocks.responses import EndData, FrameData, ReadyData, StartData

from .conftest import DummyServer

Expand All @@ -30,34 +31,125 @@ async def test_asyncio_bad_put_raises(dummy_server_async):
assert dummy_server_async.received == ["PCAP.thing=1"]


@pytest.mark.asyncio
@pytest.mark.parametrize("disarmed", [True, False])
@pytest.mark.parametrize("flush_period", [0.1, None])
async def test_asyncio_data(
dummy_server_async, fast_dump, fast_dump_expected, disarmed, flush_period
async def test_asyncio_data_periodic_flushing(
dummy_server_async, fast_dump, fast_dump_expected, disarmed
):
if not disarmed:
# simulate getting the data without the END marker as if arm was not pressed
fast_dump = (x.split(b"END")[0] for x in fast_dump)
fast_dump_expected = list(fast_dump_expected)[:-1]

dummy_server_async.data = fast_dump
events = []
async with AsyncioClient("localhost") as client:
async for data in client.data(
frame_timeout=1, flush_period=0.1, flush_mode=FlushMode.PERIODIC
):
events.append(data)

if len(events) == len(fast_dump_expected):
break

assert fast_dump_expected == events


async def test_asyncio_data_manual_flushing(
dummy_server_async, fast_dump, fast_dump_expected
):
dummy_server_async.data = fast_dump

# Button push event
flush_event = asyncio.Event()

async def wait_and_press_button(data_generator):
await asyncio.sleep(0.2)
flush_event.set()
return await data_generator.__anext__()

async with AsyncioClient("localhost") as client:
data_generator = client.data(
frame_timeout=5,
flush_event=flush_event,
flush_mode=FlushMode.MANUAL,
)

assert isinstance(await data_generator.__anext__(), ReadyData)
assert isinstance(await data_generator.__anext__(), StartData)
assert isinstance(await wait_and_press_button(data_generator), FrameData)
assert not flush_event.is_set()
assert isinstance(await wait_and_press_button(data_generator), FrameData)
assert not flush_event.is_set()
assert isinstance(await wait_and_press_button(data_generator), FrameData)
assert not flush_event.is_set()
assert isinstance(await wait_and_press_button(data_generator), FrameData)
assert isinstance(await data_generator.__anext__(), EndData)
await data_generator.aclose()


async def test_asyncio_data_flush_every_frame(
dummy_server_async, fast_dump, fast_dump_expected
):
dummy_server_async.data = fast_dump

events = []
async with AsyncioClient("localhost") as client:
async for data in client.data(frame_timeout=1, flush_period=flush_period):
async for data in client.data(
frame_timeout=5,
flush_mode=FlushMode.IMMEDIATE,
):
events.append(data)

if len(events) == len(fast_dump_expected):
break

assert fast_dump_expected == events


async def test_asyncio_data_timeout_on_no_manual_press(dummy_server_async, fast_dump):
flush_event = asyncio.Event()
async with AsyncioClient("localhost") as client:
with pytest.raises(asyncio.TimeoutError, match="No data received for 3s"):
async for data in client.data(
frame_timeout=3, flush_mode=FlushMode.MANUAL, flush_event=flush_event
):
pass


async def test_asyncio_data_timeout(dummy_server_async, fast_dump):
dummy_server_async.data = fast_dump
async with AsyncioClient("localhost") as client:
with pytest.raises(asyncio.TimeoutError, match="No data received for 0.1s"):
async for data in client.data(frame_timeout=0.1):
async for data in client.data(
frame_timeout=0.1, flush_mode=FlushMode.IMMEDIATE
):
"This goes forever, when it runs out of data we will get our timeout"


async def test_asyncio_data_nonexistent_flushmode(dummy_server_async, fast_dump):
dummy_server_async.data = fast_dump
async with AsyncioClient("localhost") as client:
with pytest.raises(ValueError, match="flush_mode must be one of"):
async for data in client.data(frame_timeout=0.1, flush_mode=None):
pass


@pytest.mark.parametrize(
"kwargs",
[
{"flush_mode": FlushMode.PERIODIC, "flush_period": 0},
{"flush_mode": FlushMode.MANUAL, "flush_event": None},
],
)
async def test_asyncio_data_missing_kwargs(dummy_server_async, fast_dump, kwargs):
print(kwargs)
dummy_server_async.data = fast_dump
async with AsyncioClient("localhost") as client:
with pytest.raises(ValueError):
async for data in client.data(**kwargs):
pass


@pytest.mark.asyncio
async def test_asyncio_connects(dummy_server_async: DummyServer):
async with AsyncioClient("localhost") as client:
Expand Down

0 comments on commit f8b57de

Please sign in to comment.