Skip to content

Commit

Permalink
Improve naming, update TODOs & FIXMEs
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanjli committed Mar 19, 2024
1 parent e00f722 commit 371c0ea
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 132 deletions.
26 changes: 8 additions & 18 deletions control/adafruithat/planktoscope/camera/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,14 @@ def __init__(
preview_bitrate: the bitrate (in bits/sec) of the preview stream; defaults to a bitrate
for a high-quality stream.
"""
self.controls: typing.Optional[Controls] = None
# Settings
self._preview_size: tuple[int, int] = preview_size
self._preview_bitrate: typing.Optional[int] = preview_bitrate

# I/O
self._preview_output = preview_output
# Note(ethanjli): `__init__()` may be called from a different process than the one which
# will call the `start()` method, so we must initialize `self._camera` to None here, and
# we'll properly initialize it in the `start()` method:
self._camera: typing.Optional[picamera2.Picamera2] = None

# TODO(ethanjli): Delete these if we actually don't need them:
# self._main_format: typing.Optional[str] = None
# self._main_size: typing.Optional[tuple[int, int]] = None
self._preview_size: tuple[int, int] = preview_size
self._preview_bitrate: typing.Optional[int] = preview_bitrate
self.controls: typing.Optional[Controls] = None

def open(self) -> None:
"""Start the camera in the background, including output to the preview stream."""
Expand All @@ -65,10 +60,6 @@ def open(self) -> None:
buffer_count=3,
)
loguru.logger.debug(f"Camera configuration: {config}")
# TODO(ethanjli): Delete these if we actually don't need them:
# self._main_format = config["main"]["format"]
# self._main_size = config["main"]["size"]
# self._preview_size = config["preview"]["size"]
self._camera.configure(config)

self.controls = Controls(self._camera, config["controls"]["FrameDurationLimits"])
Expand Down Expand Up @@ -135,6 +126,7 @@ def close(self) -> None:
]


# FIXME(ethanjli): simplify this class!
class Controls:
"""A wrapper to simplify setting and querying of camera controls & properties."""

Expand Down Expand Up @@ -309,12 +301,12 @@ def image_gain(self, gain: float) -> None:
self._image_gain = gain
self._camera.controls.AnalogueGain = self._image_gain # DigitalGain

# TODO(ethanjli): Delete this if we actually don't need it:
# FIXME(ethanjli): Delete this if we actually don't need it:
# @property
# def image_quality(self):
# return self.__image_quality

# TODO(ethanjli): Delete this if we actually don't need it:
# FIXME(ethanjli): Delete this if we actually don't need it:
# @image_quality.setter
# def image_quality(self, image_quality):
# """Change the output image quality
Expand All @@ -332,8 +324,6 @@ def image_gain(self, gain: float) -> None:
# )
# raise ValueError

# TODO complete (if needed) the setters and getters of resolution & iso


class PreviewStream(io.BufferedIOBase):
"""A thread-safe stream of discrete byte buffers for use in live previews.
Expand Down
3 changes: 3 additions & 0 deletions control/adafruithat/planktoscope/camera/mjpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def do_GET(self):
return

if self.path == "/stream.mjpg":
# TODO(ethanjli): allow specifying a max framerate via HTTP GET query param? Currently
# we have no way to reduce bandwidth usage below the maximum supported by the network
# connection to the client.
self._send_mjpeg_header()
try:
while True:
Expand Down
30 changes: 23 additions & 7 deletions control/adafruithat/planktoscope/camera/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""mqtt provides an MJPEG+MQTT API for camera interaction."""

import json
import os
import threading
import time
import typing
Expand All @@ -13,6 +14,7 @@
loguru.logger.info("planktoscope.imager is loaded")


# FIXME(ethanjli): simplify this class
class Worker:
"""Runs a camera with live MJPEG preview and an MQTT API for adjusting camera settings.
Expand All @@ -22,7 +24,6 @@ class Worker:

def __init__(
self,
hardware_config: dict[str, typing.Any],
# FIXME(ethanjli): handle exposure time and ISO in hardware config instead of keyword args!
exposure_time: int = 125,
# exposure_time: int = 15000,
Expand All @@ -31,14 +32,25 @@ def __init__(
"""Initialize the backend.
Args:
hardware_config: a dict of camera control settings.
mqtt_client: an MQTT client.
exposure_time: the default value for initializing the camera's exposure time.
"""
# Settings
# FIXME(ethanjli): decompose config-loading to a separate module. That module should also be
# where the file schema is defined!
if os.path.exists("/home/pi/PlanktoScope/hardware.json"):
# load hardware.json
with open("/home/pi/PlanktoScope/hardware.json", "r", encoding="utf-8") as config_file:
hardware_config = json.load(config_file)
loguru.logger.debug(
f"Loaded hardware configuration loaded: {hardware_config}",
)
else:
loguru.logger.info("The hardware configuration file doesn't exist, using defaults!")
hardware_config = {}
# self.__camera_type = hardware_config.get("camera_type", "v2.1")
# self.__resolution = None # this is set by the start method
# FIXME: consolidate all camera settings into a dataclass or namedtuple!
# FIXME(ethanjli): consolidate all camera settings into a dataclass or namedtuple!
# self.__iso = iso
self._exposure_time = exposure_time # FIXME(ethanjli): load from hardware config?
self.__exposure_mode: hardware.ExposureModes = (
Expand Down Expand Up @@ -93,13 +105,13 @@ def open(self) -> None:
loguru.logger.info("Starting the MJPEG streaming server...")
address = ("", 8000) # FIXME(ethanjli): parameterize this
self._streaming_server = mjpeg.StreamingServer(self._preview_stream, address)
# FIXME(ethanjli): make this not be a daemon thread, by recovering resourcse
# appropriately at shutdown!
self._streaming_thread = threading.Thread(target=self._streaming_server.serve_forever)
self._streaming_thread.start()

loguru.logger.info("Starting the MQTT backend...")
# FIXME(ethanjli): expose the camera settings over "camera/settings" instead!
# TODO(ethanjli): expose the camera settings over "camera/settings" instead! This requires
# removing the "settings" action from the "imager/image" route which is a breaking change
# to the MQTT API, so we'll do this later.
self._mqtt = mqtt.MQTT_Client(topic="imager/image", name="imager_camera_client")
self._mqtt_receiver_close.clear()
self._mqtt_receiver_thread = threading.Thread(target=self._receive_messages)
Expand All @@ -122,6 +134,8 @@ def _receive_messages(self) -> None:

def _receive_message(self, message: dict[str, typing.Any]) -> None:
"""Handle a single MQTT message."""
assert self._mqtt is not None

if message["topic"] != "imager/image" or message["payload"].get("action", "") != "settings":
return
if "settings" not in message["payload"]:
Expand Down Expand Up @@ -227,7 +241,9 @@ def __message_settings_image_gain(self, settings):
self._mqtt.client.publish("status/imager", '{"status":"Error: Image gain not valid"}')
raise e

# TODO(ethanjli): allow an MQTT client to trigger this method with an MQTT command
# TODO(ethanjli): allow an MQTT client to trigger this method with an MQTT command. This
# requires modifying the MQTT API (by adding a new route), and we'll want to make the Node-RED
# dashboard query that route at startup, so we'll do this later.
def _announce_camera_name(self) -> None:
"""Announce the camera's sensor name as a status update."""
assert self._mqtt is not None
Expand Down
66 changes: 30 additions & 36 deletions control/adafruithat/planktoscope/imagernew/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
class Worker(multiprocessing.Process):
"""An MQTT+MJPEG API for the PlanktoScope's camera and image acquisition modules."""

# FIXME(ethanjli): instead of passing in a stop_event, just expose a `close()` method! This
# TODO(ethanjli): instead of passing in a stop_event, just expose a `close()` method! This
# way, we don't give any process the ability to stop all other processes watching the same
# stop_event!
def __init__(self, stop_event):
Expand All @@ -36,29 +36,15 @@ def __init__(self, stop_event):

loguru.logger.info("planktoscope.imager is initializing")

# FIXME(ethanjli): move this to self._camera if only self._camera needs these settings;
# ideally decompose config-loading to a separate module. That module should also be where
# the file schema is defined!
if os.path.exists("/home/pi/PlanktoScope/hardware.json"):
# load hardware.json
with open("/home/pi/PlanktoScope/hardware.json", "r", encoding="utf-8") as config_file:
self._hardware_config = json.load(config_file)
loguru.logger.debug(
f"Loaded hardware configuration loaded: {self._hardware_config}",
)
else:
loguru.logger.info("The hardware configuration file doesn't exist, using defaults!")
self._hardware_config = {}

# Internal state
self._stop_event_loop = stop_event
self._metadata: dict[str, typing.Any] = {}
self._routine: typing.Optional[ImageAcquisitionRoutine] = None
self._active_routine: typing.Optional[ImageAcquisitionRoutine] = None

# I/O
self._mqtt: typing.Optional[mqtt.MQTT_Client] = None
self._pump: typing.Optional[_MQTTPump] = None
# FIXME(ethanjli): instead of having the ImagerWorker start the camera worker, this should
self._pump: typing.Optional[_PumpClient] = None
# TODO(ethanjli): instead of having the ImagerWorker start the camera worker, this should
# be started from the main script; and then the camera object should be passed into the
# constructor.
self._camera: typing.Optional[camera.Worker] = None
Expand All @@ -73,23 +59,23 @@ def run(self) -> None:
self._mqtt.client.publish("status/imager", '{"status":"Starting up"}')

loguru.logger.info("Starting the pump RPC client...")
self._pump = _MQTTPump()
self._pump = _PumpClient()
self._pump.open()
loguru.logger.success("Pump RPC client is ready!")

loguru.logger.info("Starting the camera...")
self._camera = camera.Worker(self._hardware_config)
self._camera = camera.Worker()
self._camera.open()
loguru.logger.success("Camera is ready!")
self._mqtt.client.publish("status/imager", '{"status":"Ready"}')

try:
while not self._stop_event_loop.is_set():
if self._routine is not None and not self._routine.is_alive():
if self._active_routine is not None and not self._active_routine.is_alive():
# Garbage-collect any finished image-acquisition routine threads so that we're
# ready for the next configuration update command which arrives:
self._routine.stop()
self._routine = None
self._active_routine.stop()
self._active_routine = None

if not self._mqtt.new_message_received():
time.sleep(0.1)
Expand Down Expand Up @@ -127,18 +113,20 @@ def _handle_new_message(self) -> None:
self._update_metadata(latest_message)
elif action == "image":
self._start_acquisition(latest_message)
elif action == "stop" and self._routine is not None:
self._routine.stop()
self._routine = None
elif action == "stop" and self._active_routine is not None:
self._active_routine.stop()
self._active_routine = None
self._mqtt.read_message()

def _update_metadata(self, latest_message: dict[str, typing.Any]) -> None:
"""Handle a new imager command to update the configuration (i.e. the metadata)."""
assert self._mqtt is not None

# FIXME(ethanjli): it'll be simpler if we just take the configuration as part of the command
# to start image acquisition!
if self._routine is not None and self._routine.is_alive():
# TODO(ethanjli): it'll be simpler if we just take the configuration as part of the command
# to start image acquisition! This requires modifying the MQTT API (to remove the
# "update_config" action and require the client to pass the metadata with the "image"
# action), so we'll do it later.
if self._active_routine is not None and self._active_routine.is_alive():
loguru.logger.error("Can't update configuration during image acquisition!")
self._mqtt.client.publish("status/imager", '{"status":"Busy"}')
return
Expand All @@ -153,7 +141,6 @@ def _update_metadata(self, latest_message: dict[str, typing.Any]) -> None:
self._mqtt.client.publish("status/imager", '{"status":"Config updated"}')
loguru.logger.info("Updated configuration!")

# FIXME(ethanjli): reorder the methods!
def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None:
"""Handle a new imager command to start image acquisition."""
assert self._mqtt is not None
Expand All @@ -170,6 +157,7 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None:
{
**self._metadata,
"acq_local_datetime": datetime.datetime.now().isoformat().split(".")[0],
# FIXME(ethanjli): query the exposure time using a publicly-exposed property
"acq_camera_shutter_speed": self._camera._exposure_time,
"acq_uuid": identity.load_machine_name(),
"sample_uuid": identity.load_machine_name(),
Expand All @@ -184,11 +172,11 @@ def _start_acquisition(self, latest_message: dict[str, typing.Any]) -> None:
# An error status was already reported, so we don't need to do anything else
return

self._routine = ImageAcquisitionRoutine(
stopflow.Routine(self._pump, self._camera.camera, output_path, settings),
self._active_routine = ImageAcquisitionRoutine(
stopflow.Routine(output_path, settings, self._pump, self._camera.camera),
self._mqtt,
)
self._routine.start()
self._active_routine.start()


def _parse_acquisition_settings(
Expand Down Expand Up @@ -284,9 +272,15 @@ class ImageAcquisitionRoutine(threading.Thread):
"""A thread to run a single image acquisition routine to completion, with MQTT updates."""

# TODO(ethanjli): instead of taking an arg of type mqtt.MQTT_CLIENT, just take an arg of
# whatever `mqtt_client.client`'s type is supposed to be
# whatever `mqtt_client.client`'s type is supposed to be. Or maybe we should just initialize
# our own MQTT client in here?
def __init__(self, routine: stopflow.Routine, mqtt_client: mqtt.MQTT_Client) -> None:
"""Initialize the thread."""
"""Initialize the thread.
Args:
routine: the image-acquisition routine to run.
mqtt_client: an MQTT client which will be used to broadcast updates.
"""
super().__init__()
self._routine = routine
self._mqtt_client = mqtt_client.client
Expand Down Expand Up @@ -337,7 +331,7 @@ def stop(self) -> None:
# TODO(ethanjli): rearchitect the hardware controller so that the imager can directly call pump
# methods (by running all modules in the same process), so that we can just delete this entire class
# and simplify function calls between the imager and the pump!
class _MQTTPump:
class _PumpClient:
"""Thread-safe RPC stub for remotely controlling the pump over MQTT."""

def __init__(self) -> None:
Expand Down
9 changes: 4 additions & 5 deletions control/adafruithat/planktoscope/imagernew/stopflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class Settings(typing.NamedTuple):
pump: DiscretePumpSettings


# FIXME(ethanjli): write unit tests for this class!
class Routine:
"""A thread-safe stop-flow image acquisition routine.
Expand All @@ -74,19 +73,19 @@ class Routine:

def __init__(
self,
pump: PumpRunner,
camera: FileCapturer,
output_path: str,
settings: Settings,
pump: PumpRunner,
camera: FileCapturer,
) -> None:
"""Initialize the image-acquisition routine.
Args:
pump: the sample pump.
camera: the camera.
output_path: the directory to save acquired images and metadata files. This is assumed
not to exist.
settings: stop-flow routine settings.
pump: the sample pump.
camera: the camera.
"""
# Parameters
self.output_path: typing.Final[str] = output_path
Expand Down
Loading

0 comments on commit 371c0ea

Please sign in to comment.