diff --git a/pycromanager/acquisitions.py b/pycromanager/acquisitions.py index a65941d8..39588560 100644 --- a/pycromanager/acquisitions.py +++ b/pycromanager/acquisitions.py @@ -2,6 +2,7 @@ The Pycro-manager Acquisiton system """ import warnings +import weakref import numpy as np import multiprocessing @@ -17,6 +18,7 @@ import queue from docstring_inheritance import NumpyDocstringInheritanceMeta import traceback +from pycromanager.notifications import AcqNotification, AcquisitionFuture class AcqAlreadyCompleteException(Exception): def __init__(self, message): @@ -216,7 +218,7 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect index_entry = message["index_entry"] axes = dataset._add_index_entry(index_entry) - acquisition._notification_queue.put({'type': 'image_saved', 'axes': axes}) + acquisition._notification_queue.put(AcqNotification.make_image_saved_notification(axes)) dataset._new_image_arrived = True if callback is not None: callback(axes, dataset) @@ -232,7 +234,7 @@ def _notification_handler_fn(acquisition, notification_push_port, connected_even try: while True: message = monitor_socket.receive() - acquisition._notification_queue.put(message) + acquisition._notification_queue.put(AcqNotification.from_json(message)) if "acq_finished" in message["type"]: break @@ -261,7 +263,7 @@ def __init__( image_saved_fn: callable=None, process: bool=False, saving_queue_size: int=20, - timeout: int=1000, + timeout: int=2000, port: int=DEFAULT_PORT, debug: int=False, core_log_debug: int=False, @@ -338,6 +340,7 @@ def __init__( self._nd_viewer = None self._napari_viewer = None self._notification_queue = queue.Queue(100) + self._acq_futures = [] # Get a dict of all named argument values (or default values when nothing provided) arg_names = [k for k in signature(Acquisition.__init__).parameters.keys() if k != 'self'] @@ -359,7 +362,7 @@ def __init__( try: self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler', - args=[self._remote_acq], port=self._port, new_socket=True) + args=[self._remote_acq], port=self._port, new_socket=False) self._acq_notification_recieving_thread = self._start_receiving_notifications() self._acq_notification_dispatcher_thread = self._start_notification_dispatcher() except: @@ -389,7 +392,7 @@ def __init__( ndtiff_storage = data_sink.get_storage() summary_metadata = ndtiff_storage.get_summary_metadata() self._remote_storage_monitor = JavaObject('org.micromanager.remote.RemoteStorageMonitor', port=self._port, - new_socket=True) + new_socket=False) ndtiff_storage.add_image_written_listener(self._remote_storage_monitor) self._dataset = Dataset(dataset_path=self._dataset_disk_location, _summary_metadata=summary_metadata) # Monitor image arrival so they can be loaded on python side, but with no callback function @@ -488,7 +491,16 @@ def acquire(self, event_or_events: dict or list): return _validate_acq_events(event_or_events) + + axes_or_axes_list = event_or_events['axes'] if type(event_or_events) == dict\ + else [e['axes'] for e in event_or_events] + acq_future = AcquisitionFuture(self, axes_or_axes_list) + self._acq_futures.append(weakref.ref(acq_future)) + # clear out old weakrefs + self._acq_futures = [f for f in self._acq_futures if f() is not None] + self._event_queue.put(event_or_events) + return acq_future def abort(self, exception=None): """ @@ -578,10 +590,12 @@ def dispatch_notifications(): # if all the threads have shut down and the queue is empty, then shut down break else: - # TODO dispatch them to all listeners - # print(notification) - pass - + # print(notification.to_json()) + for future in self._acq_futures: + strong_ref = future() + if strong_ref is not None: + strong_ref._notify(notification) + # TODO: can also add a user-specified notification callback dispatcher_thread = threading.Thread( target=dispatch_notifications, diff --git a/pycromanager/notifications.py b/pycromanager/notifications.py new file mode 100644 index 00000000..4f5841e5 --- /dev/null +++ b/pycromanager/notifications.py @@ -0,0 +1,117 @@ +import threading + +class AcqNotification: + + class Global: + ACQ_STARTED = "acq_started" + ACQ_FINISHED = "acq_finished" + + class Hardware: + PRE_HARDWARE = "pre_hardware" + POST_HARDWARE = "post_hardware" + + class Camera: + PRE_SEQUENCE_STARTED = "pre_sequence_started" + PRE_SNAP = "pre_snap" + POST_EXPOSURE = "post_exposure" + + class Image: + IMAGE_SAVED = "image_saved" + + def __init__(self, type, axes, phase=None): + if type is None: + # then figure it out based on the phase + if phase in [AcqNotification.Camera.PRE_SNAP, AcqNotification.Camera.POST_EXPOSURE, + AcqNotification.Camera.PRE_SEQUENCE_STARTED]: + type = AcqNotification.Camera + elif phase in [AcqNotification.Hardware.PRE_HARDWARE, AcqNotification.Hardware.POST_HARDWARE]: + type = AcqNotification.Hardware + elif phase == AcqNotification.Image.IMAGE_SAVED: + type = AcqNotification.Image + else: + raise ValueError("Unknown phase") + self.type = type + self.phase = phase + self.axes = axes + + @staticmethod + def make_image_saved_notification(axes): + return AcqNotification(AcqNotification.Image, axes, AcqNotification.Image.IMAGE_SAVED) + + def to_json(self): + return { + 'type': self.type, + 'phase': self.phase, + 'axes': self.axes, + } + + @staticmethod + def from_json(json): + return AcqNotification(json['type'], json['axes'] if 'axes' in json else None, + json['phase'] if 'phase' in json else None) + + +def _axes_to_key(axes_or_axes_list): + """ Turn axes into a hashable key """ + return frozenset(axes_or_axes_list.items()) + + +class AcquisitionFuture: + + def __init__(self, acq, axes_or_axes_list): + """ + :param event_or_events: a single event (dictionary) or a list of events + """ + self._acq = acq + self._condition = threading.Condition() + self._notification_recieved = {} + if isinstance(axes_or_axes_list, dict): + axes_or_axes_list = [axes_or_axes_list] + for axes in axes_or_axes_list: + # single event + # TODO maybe unify snap and sequence cause this is confusing + self._notification_recieved[_axes_to_key(axes)] = { + AcqNotification.Hardware.PRE_HARDWARE: False, + AcqNotification.Hardware.POST_HARDWARE: False, + + AcqNotification.Camera.PRE_SNAP: False, + AcqNotification.Camera.PRE_SEQUENCE_STARTED: False, + AcqNotification.Camera.POST_EXPOSURE: False, + + AcqNotification.Image.IMAGE_SAVED: False, + } + + + def _notify(self, notification): + """ + Called by the internal notification dispatcher in order so that it can check off that the notification was + received. Want to store this, rather than just waiting around for it, in case the await methods are called + after the notification has already been sent. + """ + if notification.type == AcqNotification.Global.ACQ_FINISHED: + return # ignore for now... + key = _axes_to_key(notification.axes) + if key not in self._notification_recieved.keys(): + return # ignore notifications that aren't relevant to this future + self._notification_recieved[key][notification.phase] = True + with self._condition: + self._condition.notify_all() + + def await_execution(self, axes, phase): + key = _axes_to_key(axes) + if key not in self._notification_recieved.keys() or phase not in self._notification_recieved[key].keys(): + notification = AcqNotification(None, axes, phase) + raise ValueError("this future is not expecting a notification for: " + str(notification.to_json())) + with self._condition: + while not self._notification_recieved[key][phase]: + self._condition.wait() + + def await_image_saved(self, axes, return_image=False): + key = _axes_to_key(axes) + with self._condition: + while not self._notification_recieved[key][AcqNotification.Image.IMAGE_SAVED]: + self._condition.wait() + if return_image: + return self._acq.get_dataset().read_image(**axes) + +