Skip to content

Commit

Permalink
Merge pull request #677 from henrypinkard/notifications
Browse files Browse the repository at this point in the history
Added python API for notifications
  • Loading branch information
henrypinkard authored Aug 25, 2023
2 parents ef3b288 + d1f4bdd commit 5e7eb9f
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 9 deletions.
32 changes: 23 additions & 9 deletions pycromanager/acquisitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
The Pycro-manager Acquisiton system
"""
import warnings
import weakref

import numpy as np
import multiprocessing
Expand All @@ -17,6 +18,7 @@
import queue
from docstring_inheritance import NumpyDocstringInheritanceMeta
import traceback
from pycromanager.notifications import AcqNotification, AcquisitionFuture

class AcqAlreadyCompleteException(Exception):
def __init__(self, message):
Expand Down Expand Up @@ -216,7 +218,7 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect

index_entry = message["index_entry"]
axes = dataset._add_index_entry(index_entry)
acquisition._notification_queue.put({'type': 'image_saved', 'axes': axes})
acquisition._notification_queue.put(AcqNotification.make_image_saved_notification(axes))
dataset._new_image_arrived = True
if callback is not None:
callback(axes, dataset)
Expand All @@ -232,7 +234,7 @@ def _notification_handler_fn(acquisition, notification_push_port, connected_even
try:
while True:
message = monitor_socket.receive()
acquisition._notification_queue.put(message)
acquisition._notification_queue.put(AcqNotification.from_json(message))
if "acq_finished" in message["type"]:
break

Expand Down Expand Up @@ -261,7 +263,7 @@ def __init__(
image_saved_fn: callable=None,
process: bool=False,
saving_queue_size: int=20,
timeout: int=1000,
timeout: int=2000,
port: int=DEFAULT_PORT,
debug: int=False,
core_log_debug: int=False,
Expand Down Expand Up @@ -338,6 +340,7 @@ def __init__(
self._nd_viewer = None
self._napari_viewer = None
self._notification_queue = queue.Queue(100)
self._acq_futures = []

# Get a dict of all named argument values (or default values when nothing provided)
arg_names = [k for k in signature(Acquisition.__init__).parameters.keys() if k != 'self']
Expand All @@ -359,7 +362,7 @@ def __init__(

try:
self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler',
args=[self._remote_acq], port=self._port, new_socket=True)
args=[self._remote_acq], port=self._port, new_socket=False)
self._acq_notification_recieving_thread = self._start_receiving_notifications()
self._acq_notification_dispatcher_thread = self._start_notification_dispatcher()
except:
Expand Down Expand Up @@ -389,7 +392,7 @@ def __init__(
ndtiff_storage = data_sink.get_storage()
summary_metadata = ndtiff_storage.get_summary_metadata()
self._remote_storage_monitor = JavaObject('org.micromanager.remote.RemoteStorageMonitor', port=self._port,
new_socket=True)
new_socket=False)
ndtiff_storage.add_image_written_listener(self._remote_storage_monitor)
self._dataset = Dataset(dataset_path=self._dataset_disk_location, _summary_metadata=summary_metadata)
# Monitor image arrival so they can be loaded on python side, but with no callback function
Expand Down Expand Up @@ -488,7 +491,16 @@ def acquire(self, event_or_events: dict or list):
return

_validate_acq_events(event_or_events)

axes_or_axes_list = event_or_events['axes'] if type(event_or_events) == dict\
else [e['axes'] for e in event_or_events]
acq_future = AcquisitionFuture(self, axes_or_axes_list)
self._acq_futures.append(weakref.ref(acq_future))
# clear out old weakrefs
self._acq_futures = [f for f in self._acq_futures if f() is not None]

self._event_queue.put(event_or_events)
return acq_future

def abort(self, exception=None):
"""
Expand Down Expand Up @@ -578,10 +590,12 @@ def dispatch_notifications():
# if all the threads have shut down and the queue is empty, then shut down
break
else:
# TODO dispatch them to all listeners
# print(notification)
pass

# print(notification.to_json())
for future in self._acq_futures:
strong_ref = future()
if strong_ref is not None:
strong_ref._notify(notification)
# TODO: can also add a user-specified notification callback

dispatcher_thread = threading.Thread(
target=dispatch_notifications,
Expand Down
117 changes: 117 additions & 0 deletions pycromanager/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import threading

class AcqNotification:

class Global:
ACQ_STARTED = "acq_started"
ACQ_FINISHED = "acq_finished"

class Hardware:
PRE_HARDWARE = "pre_hardware"
POST_HARDWARE = "post_hardware"

class Camera:
PRE_SEQUENCE_STARTED = "pre_sequence_started"
PRE_SNAP = "pre_snap"
POST_EXPOSURE = "post_exposure"

class Image:
IMAGE_SAVED = "image_saved"

def __init__(self, type, axes, phase=None):
if type is None:
# then figure it out based on the phase
if phase in [AcqNotification.Camera.PRE_SNAP, AcqNotification.Camera.POST_EXPOSURE,
AcqNotification.Camera.PRE_SEQUENCE_STARTED]:
type = AcqNotification.Camera
elif phase in [AcqNotification.Hardware.PRE_HARDWARE, AcqNotification.Hardware.POST_HARDWARE]:
type = AcqNotification.Hardware
elif phase == AcqNotification.Image.IMAGE_SAVED:
type = AcqNotification.Image
else:
raise ValueError("Unknown phase")
self.type = type
self.phase = phase
self.axes = axes

@staticmethod
def make_image_saved_notification(axes):
return AcqNotification(AcqNotification.Image, axes, AcqNotification.Image.IMAGE_SAVED)

def to_json(self):
return {
'type': self.type,
'phase': self.phase,
'axes': self.axes,
}

@staticmethod
def from_json(json):
return AcqNotification(json['type'], json['axes'] if 'axes' in json else None,
json['phase'] if 'phase' in json else None)


def _axes_to_key(axes_or_axes_list):
""" Turn axes into a hashable key """
return frozenset(axes_or_axes_list.items())


class AcquisitionFuture:

def __init__(self, acq, axes_or_axes_list):
"""
:param event_or_events: a single event (dictionary) or a list of events
"""
self._acq = acq
self._condition = threading.Condition()
self._notification_recieved = {}
if isinstance(axes_or_axes_list, dict):
axes_or_axes_list = [axes_or_axes_list]
for axes in axes_or_axes_list:
# single event
# TODO maybe unify snap and sequence cause this is confusing
self._notification_recieved[_axes_to_key(axes)] = {
AcqNotification.Hardware.PRE_HARDWARE: False,
AcqNotification.Hardware.POST_HARDWARE: False,

AcqNotification.Camera.PRE_SNAP: False,
AcqNotification.Camera.PRE_SEQUENCE_STARTED: False,
AcqNotification.Camera.POST_EXPOSURE: False,

AcqNotification.Image.IMAGE_SAVED: False,
}


def _notify(self, notification):
"""
Called by the internal notification dispatcher in order so that it can check off that the notification was
received. Want to store this, rather than just waiting around for it, in case the await methods are called
after the notification has already been sent.
"""
if notification.type == AcqNotification.Global.ACQ_FINISHED:
return # ignore for now...
key = _axes_to_key(notification.axes)
if key not in self._notification_recieved.keys():
return # ignore notifications that aren't relevant to this future
self._notification_recieved[key][notification.phase] = True
with self._condition:
self._condition.notify_all()

def await_execution(self, axes, phase):
key = _axes_to_key(axes)
if key not in self._notification_recieved.keys() or phase not in self._notification_recieved[key].keys():
notification = AcqNotification(None, axes, phase)
raise ValueError("this future is not expecting a notification for: " + str(notification.to_json()))
with self._condition:
while not self._notification_recieved[key][phase]:
self._condition.wait()

def await_image_saved(self, axes, return_image=False):
key = _axes_to_key(axes)
with self._condition:
while not self._notification_recieved[key][AcqNotification.Image.IMAGE_SAVED]:
self._condition.wait()
if return_image:
return self._acq.get_dataset().read_image(**axes)


0 comments on commit 5e7eb9f

Please sign in to comment.