diff --git a/java/pom.xml b/java/pom.xml index b4c7be07..0ed8ce65 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.micro-manager.pycro-manager PycroManagerJava - 0.41.7 + 0.43.0 jar Pycro-Manager Java The Java components of Pycro-Manager @@ -54,7 +54,7 @@ org.micro-manager.acqengj AcqEngJ - 0.30.0 + 0.32.2 org.micro-manager.ndviewer diff --git a/java/src/main/java/org/micromanager/remote/RemoteEventSource.java b/java/src/main/java/org/micromanager/remote/RemoteEventSource.java index 82d0ce81..461ce681 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteEventSource.java +++ b/java/src/main/java/org/micromanager/remote/RemoteEventSource.java @@ -34,6 +34,8 @@ public class RemoteEventSource { }); public RemoteEventSource() { + //constantly poll the socket for more event sequences to submit + executor_.submit(() -> { pullSocket_ = new ZMQPullSocket<>( t -> { try { @@ -48,10 +50,7 @@ public RemoteEventSource() { throw new RuntimeException("Incorrect format for acquisitio event"); } }); - //constantly poll the socket for more event sequences to submit - executor_.submit(() -> { try { - System.out.println("pull socket started"); while (true) { List eList = pullSocket_.next(); boolean finished = eList.get(eList.size() - 1).isAcquisitionFinishedEvent(); @@ -59,7 +58,7 @@ public RemoteEventSource() { result.get(); //propogate any exceptions if (finished || executor_.isShutdown()) { executor_.shutdown(); - return; + break; } } } catch (InterruptedException e) { @@ -81,6 +80,14 @@ void setAcquisition(Acquisition aThis) { } public int getPort() { + while (pullSocket_ == null) { + // wait for it to be created ona different thread + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } return pullSocket_.getPort(); } @@ -110,7 +117,6 @@ void abort() { } }; - pullSocket_.close(); } } diff --git a/java/src/main/java/org/micromanager/remote/RemoteImageProcessor.java b/java/src/main/java/org/micromanager/remote/RemoteImageProcessor.java index 3ad7ceca..141d3078 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteImageProcessor.java +++ b/java/src/main/java/org/micromanager/remote/RemoteImageProcessor.java @@ -20,8 +20,6 @@ import org.micromanager.internal.zmq.ZMQPushSocket; import org.micromanager.internal.zmq.ZMQUtil; -// TODO: this class now duplicates functionality of AsyncImageProcessor in AcqEngJ - /** * Implements an ImageProcessor that sends/recieves images from a remote source * using ZMQ push/pull sockets. This enables image processing in Python/NumPy diff --git a/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java b/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java new file mode 100644 index 00000000..f2caddbc --- /dev/null +++ b/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java @@ -0,0 +1,103 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.micromanager.remote; + +import mmcorej.org.json.JSONException; +import org.micromanager.acqj.api.AcqNotificationListener; +import org.micromanager.acqj.api.AcquisitionAPI; +import org.micromanager.acqj.main.AcqNotification; +import org.micromanager.acqj.main.Acquisition; +import org.micromanager.internal.zmq.ZMQPushSocket; +import org.micromanager.ndtiffstorage.IndexEntryData; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * A class that broadcasts information about images that have finsihed saving to disk + * @author henrypinkard + */ +public class RemoteNotificationHandler implements AcqNotificationListener { + + private ZMQPushSocket pushSocket_; + private ExecutorService executor_ = Executors.newSingleThreadExecutor((Runnable r) -> { + return new Thread(r, "Remote notification thread"); + }); + private LinkedBlockingDeque notifications_ = new LinkedBlockingDeque(); + + /** + * Called by python side + */ + public RemoteNotificationHandler(AcquisitionAPI acq) { + acq.addAcqNotificationListener(this); + executor_.submit(new Runnable() { + @Override + public void run() { + pushSocket_ = new ZMQPushSocket( + t -> { + try { + return t.toJSON(); + } catch (JSONException e) { + throw new RuntimeException("Problem with notification socket"); + } + }); + } + }); + } + + /** + * Start pushing out the indices to the other side + */ + public void start() { + //constantly poll the socket for more event sequences to submit + executor_.submit(() -> { + while (true) { + AcqNotification e = null; + try { + e = notifications_.takeFirst(); + } catch (InterruptedException ex) { + // this should never happen + ex.printStackTrace(); + throw new RuntimeException(ex); + } + + pushSocket_.push(e); + if (e.isAcquisitionFinishedNotification()) { + return; + } + } + }); + } + + @Override + public void postNotification(AcqNotification n) { + notifications_.add(n); + } + + /** + * Called by the python side to signal that the final shutdown signal has been received + * and that the push socket can be closed + */ + public void notificationHandlingComplete() { + executor_.submit(() -> { + pushSocket_.close(); + executor_.shutdown(); + }); + } + + public int getPort() { + while (pushSocket_ == null) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return pushSocket_.getPort(); + } + +} diff --git a/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java b/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java index d4c69c74..e6890299 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java +++ b/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java @@ -23,25 +23,30 @@ public class RemoteStorageMonitor implements ImageWrittenListener { private ZMQPushSocket pushSocket_; private ExecutorService executor_ = Executors.newSingleThreadExecutor((Runnable r) -> { - return new Thread(r, "Remote Event Source thread"); + return new Thread(r, "Remote storage monitor thread"); }); private LinkedBlockingDeque indexEntries_ = new LinkedBlockingDeque(); public RemoteStorageMonitor() { - pushSocket_ = new ZMQPushSocket( - t -> { - try { - JSONObject message = new JSONObject(); - if (t.isDataSetFinishedEntry()) { - message.put("finished", true); - } else { - message.put("index_entry", ((ByteBuffer) t.asByteBuffer()).array()); - } - return message; - } catch (JSONException e) { - throw new RuntimeException("Problem with data saved socket"); - } - }); + executor_.submit(new Runnable() { + @Override + public void run() { + pushSocket_ = new ZMQPushSocket( + t -> { + try { + JSONObject message = new JSONObject(); + if (t.isDataSetFinishedEntry()) { + message.put("finished", true); + } else { + message.put("index_entry", ((ByteBuffer) t.asByteBuffer()).array()); + } + return message; + } catch (JSONException e) { + throw new RuntimeException("Problem with data saved socket"); + } + }); + } + }); } /** @@ -73,6 +78,13 @@ public void start() { public int getPort() { + while (pushSocket_ == null) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } return pushSocket_.getPort(); } @@ -84,26 +96,20 @@ public void imageWritten(IndexEntryData ied) { indexEntries_.addLast(ied); } + @Override + public void awaitCompletion() { + //deprecated + } + /** * Called by the python side to signal that the final shutdown signal has been received * and that the push socket can be closed */ public void storageMonitoringComplete() { - executor_.shutdown(); - pushSocket_.close(); + executor_.submit(() -> { + pushSocket_.close(); + executor_.shutdown(); + }); } - @Override - public void awaitCompletion() { - // No need to do this, because the storage sould shutdown irrespective of this montior - // which exists on top of it - -// while (!executor_.isTerminated()) { -// try { -// Thread.sleep(5); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// } - } } diff --git a/pycromanager/_version.py b/pycromanager/_version.py index 5829d673..03d6b15b 100644 --- a/pycromanager/_version.py +++ b/pycromanager/_version.py @@ -1,2 +1,2 @@ -version_info = (0, 28, 1) +version_info = (0, 28, 2) __version__ = ".".join(map(str, version_info)) diff --git a/pycromanager/acq_util.py b/pycromanager/acq_util.py index fb07fbd2..3a352c3f 100644 --- a/pycromanager/acq_util.py +++ b/pycromanager/acq_util.py @@ -11,13 +11,14 @@ SUBPROCESSES = [] -def cleanup(): +def stop_headless(): for p in SUBPROCESSES: p.terminate() p.wait() # wait for process to terminate + SUBPROCESSES.clear() # make sure any Java processes are cleaned up when Python exits -atexit.register(cleanup) +atexit.register(stop_headless) def start_headless( mm_app_path: str, config_file: str='', java_loc: str=None, core_log_path: str='', diff --git a/pycromanager/acquisitions.py b/pycromanager/acquisitions.py index 1d43ec06..39588560 100644 --- a/pycromanager/acquisitions.py +++ b/pycromanager/acquisitions.py @@ -2,6 +2,7 @@ The Pycro-manager Acquisiton system """ import warnings +import weakref import numpy as np import multiprocessing @@ -16,6 +17,8 @@ import os.path import queue from docstring_inheritance import NumpyDocstringInheritanceMeta +import traceback +from pycromanager.notifications import AcqNotification, AcquisitionFuture class AcqAlreadyCompleteException(Exception): def __init__(self, message): @@ -205,23 +208,41 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect else: raise Exception('Image saved callbacks must have either 2 or three parameters') - while True: - try: + + try: + while True: message = monitor_socket.receive() if "finished" in message: - # Poison, time to shut down - monitor_socket.close() - return + # Time to shut down + break index_entry = message["index_entry"] axes = dataset._add_index_entry(index_entry) + acquisition._notification_queue.put(AcqNotification.make_image_saved_notification(axes)) dataset._new_image_arrived = True if callback is not None: callback(axes, dataset) - except Exception as e: + except Exception as e: acquisition.abort(e) + finally: + monitor_socket.close() +def _notification_handler_fn(acquisition, notification_push_port, connected_event, debug=False): + monitor_socket = PullSocket(notification_push_port) + connected_event.set() + try: + while True: + message = monitor_socket.receive() + acquisition._notification_queue.put(AcqNotification.from_json(message)) + if "acq_finished" in message["type"]: + break + + except Exception as e: + traceback.print_exc() + acquisition.abort(e) + finally: + monitor_socket.close() class Acquisition(object, metaclass=NumpyDocstringInheritanceMeta): """ @@ -242,7 +263,7 @@ def __init__( image_saved_fn: callable=None, process: bool=False, saving_queue_size: int=20, - timeout: int=1000, + timeout: int=2000, port: int=DEFAULT_PORT, debug: int=False, core_log_debug: int=False, @@ -318,6 +339,8 @@ def __init__( self._timeout = timeout self._nd_viewer = None self._napari_viewer = None + self._notification_queue = queue.Queue(100) + self._acq_futures = [] # Get a dict of all named argument values (or default values when nothing provided) arg_names = [k for k in signature(Acquisition.__init__).parameters.keys() if k != 'self'] @@ -337,7 +360,17 @@ def __init__( self._initialize_image_processor(**named_args) self._initialize_hooks(**named_args) - # Acquistiion.start is now deprecated, so this can be removed later + try: + self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler', + args=[self._remote_acq], port=self._port, new_socket=False) + self._acq_notification_recieving_thread = self._start_receiving_notifications() + self._acq_notification_dispatcher_thread = self._start_notification_dispatcher() + except: + warnings.warn('Could not create acquisition notification handler. This should not affect performance,' + ' but indicates that Micro-Manager is out of date') + + # Start remote acquisition + # Acquistition.start is now deprecated, so this can be removed later # Acquisitions now get started automatically when the first events submitted # but Magellan acquisitons (and probably others that generate their own events) # will need some new method to submit events only after image processors etc have been added @@ -359,7 +392,7 @@ def __init__( ndtiff_storage = data_sink.get_storage() summary_metadata = ndtiff_storage.get_summary_metadata() self._remote_storage_monitor = JavaObject('org.micromanager.remote.RemoteStorageMonitor', port=self._port, - new_socket=True) + new_socket=False) ndtiff_storage.add_image_written_listener(self._remote_storage_monitor) self._dataset = Dataset(dataset_path=self._dataset_disk_location, _summary_metadata=summary_metadata) # Monitor image arrival so they can be loaded on python side, but with no callback function @@ -413,6 +446,14 @@ def await_completion(self): time.sleep(1 if self._debug else 0.05) self._check_for_exceptions() + for hook_thread in self._hook_threads: + hook_thread.join() + + if hasattr(self, '_event_thread'): + self._event_thread.join() + + self._remote_acq = None + # Wait on all the other threads to shut down properly if hasattr(self, '_storage_monitor_thread'): self._storage_monitor_thread.join() @@ -420,13 +461,13 @@ def await_completion(self): # tell it it is okay to shutdown its push socket self._remote_storage_monitor.storage_monitoring_complete() - for hook_thread in self._hook_threads: - hook_thread.join() + if hasattr(self, '_acq_notification_recieving_thread'): + # for backwards compatiblitiy with older versions of Pycromanager java before this added + self._acq_notification_recieving_thread.join() + self._remote_notification_handler.notification_handling_complete() + self._acq_notification_dispatcher_thread.join() - if hasattr(self, '_event_thread'): - self._event_thread.join() - self._remote_acq = None self._finished = True def acquire(self, event_or_events: dict or list): @@ -450,7 +491,16 @@ def acquire(self, event_or_events: dict or list): return _validate_acq_events(event_or_events) + + axes_or_axes_list = event_or_events['axes'] if type(event_or_events) == dict\ + else [e['axes'] for e in event_or_events] + acq_future = AcquisitionFuture(self, axes_or_axes_list) + self._acq_futures.append(weakref.ref(acq_future)) + # clear out old weakrefs + self._acq_futures = [f for f in self._acq_futures if f() is not None] + self._event_queue.put(event_or_events) + return acq_future def abort(self, exception=None): """ @@ -493,6 +543,68 @@ def __exit__(self, exc_type, exc_val, exc_tb): ######## Private methods ########### + def _start_receiving_notifications(self): + """ + Thread that runs a function that pulls notifications from the acquisition engine and puts them on a queue + This is not all notifications, just ones that are relevant to the acquisition. Specifically, it does not + include notifications the progress of data saving + """ + connected_event = threading.Event() + + pull_port = self._remote_notification_handler.get_port() + notification_thread = threading.Thread( + target=_notification_handler_fn, + args=( + self, + pull_port, + connected_event, + self._debug, + ), + name="NotificationHandlerThread", + ) + # + # Wait for pulling to start before you signal for pushing to start + notification_thread.start() + connected_event.wait() + + # start pushing out all the notifications + self._remote_notification_handler.start() + return notification_thread + + def _start_notification_dispatcher(self): + """ + Thread that runs a function that pulls notifications from the queue on the python side and dispatches + them to the appropriate listener + """ + def dispatch_notifications(): + while True: + # dispatch notifications to all listeners + try: + notification = self._notification_queue.get(timeout=0.05) # 50 ms timeout + except queue.Empty: + storage_monitoring_ongoing = hasattr(self, '_storage_monitor_thread')\ + and self._storage_monitor_thread.is_alive() + acq_notifications_ongoing = hasattr(self, '_acq_notification_recieving_thread')\ + and self._acq_notification_recieving_thread.is_alive() + if not storage_monitoring_ongoing and not acq_notifications_ongoing and self._notification_queue.empty(): + # if all the threads have shut down and the queue is empty, then shut down + break + else: + # print(notification.to_json()) + for future in self._acq_futures: + strong_ref = future() + if strong_ref is not None: + strong_ref._notify(notification) + # TODO: can also add a user-specified notification callback + + dispatcher_thread = threading.Thread( + target=dispatch_notifications, + name="NotificationDispatcherThread", + ) + dispatcher_thread.start() + return dispatcher_thread + + def _add_storage_monitor_fn(self, callback_fn=None, debug=False): """ Add a callback function that gets called whenever a new image is writtern to disk (for acquisitions in diff --git a/pycromanager/notifications.py b/pycromanager/notifications.py new file mode 100644 index 00000000..4f5841e5 --- /dev/null +++ b/pycromanager/notifications.py @@ -0,0 +1,117 @@ +import threading + +class AcqNotification: + + class Global: + ACQ_STARTED = "acq_started" + ACQ_FINISHED = "acq_finished" + + class Hardware: + PRE_HARDWARE = "pre_hardware" + POST_HARDWARE = "post_hardware" + + class Camera: + PRE_SEQUENCE_STARTED = "pre_sequence_started" + PRE_SNAP = "pre_snap" + POST_EXPOSURE = "post_exposure" + + class Image: + IMAGE_SAVED = "image_saved" + + def __init__(self, type, axes, phase=None): + if type is None: + # then figure it out based on the phase + if phase in [AcqNotification.Camera.PRE_SNAP, AcqNotification.Camera.POST_EXPOSURE, + AcqNotification.Camera.PRE_SEQUENCE_STARTED]: + type = AcqNotification.Camera + elif phase in [AcqNotification.Hardware.PRE_HARDWARE, AcqNotification.Hardware.POST_HARDWARE]: + type = AcqNotification.Hardware + elif phase == AcqNotification.Image.IMAGE_SAVED: + type = AcqNotification.Image + else: + raise ValueError("Unknown phase") + self.type = type + self.phase = phase + self.axes = axes + + @staticmethod + def make_image_saved_notification(axes): + return AcqNotification(AcqNotification.Image, axes, AcqNotification.Image.IMAGE_SAVED) + + def to_json(self): + return { + 'type': self.type, + 'phase': self.phase, + 'axes': self.axes, + } + + @staticmethod + def from_json(json): + return AcqNotification(json['type'], json['axes'] if 'axes' in json else None, + json['phase'] if 'phase' in json else None) + + +def _axes_to_key(axes_or_axes_list): + """ Turn axes into a hashable key """ + return frozenset(axes_or_axes_list.items()) + + +class AcquisitionFuture: + + def __init__(self, acq, axes_or_axes_list): + """ + :param event_or_events: a single event (dictionary) or a list of events + """ + self._acq = acq + self._condition = threading.Condition() + self._notification_recieved = {} + if isinstance(axes_or_axes_list, dict): + axes_or_axes_list = [axes_or_axes_list] + for axes in axes_or_axes_list: + # single event + # TODO maybe unify snap and sequence cause this is confusing + self._notification_recieved[_axes_to_key(axes)] = { + AcqNotification.Hardware.PRE_HARDWARE: False, + AcqNotification.Hardware.POST_HARDWARE: False, + + AcqNotification.Camera.PRE_SNAP: False, + AcqNotification.Camera.PRE_SEQUENCE_STARTED: False, + AcqNotification.Camera.POST_EXPOSURE: False, + + AcqNotification.Image.IMAGE_SAVED: False, + } + + + def _notify(self, notification): + """ + Called by the internal notification dispatcher in order so that it can check off that the notification was + received. Want to store this, rather than just waiting around for it, in case the await methods are called + after the notification has already been sent. + """ + if notification.type == AcqNotification.Global.ACQ_FINISHED: + return # ignore for now... + key = _axes_to_key(notification.axes) + if key not in self._notification_recieved.keys(): + return # ignore notifications that aren't relevant to this future + self._notification_recieved[key][notification.phase] = True + with self._condition: + self._condition.notify_all() + + def await_execution(self, axes, phase): + key = _axes_to_key(axes) + if key not in self._notification_recieved.keys() or phase not in self._notification_recieved[key].keys(): + notification = AcqNotification(None, axes, phase) + raise ValueError("this future is not expecting a notification for: " + str(notification.to_json())) + with self._condition: + while not self._notification_recieved[key][phase]: + self._condition.wait() + + def await_image_saved(self, axes, return_image=False): + key = _axes_to_key(axes) + with self._condition: + while not self._notification_recieved[key][AcqNotification.Image.IMAGE_SAVED]: + self._condition.wait() + if return_image: + return self._acq.get_dataset().read_image(**axes) + + diff --git a/pycromanager/test/conftest.py b/pycromanager/test/conftest.py index 8962f966..4a37a908 100644 --- a/pycromanager/test/conftest.py +++ b/pycromanager/test/conftest.py @@ -10,7 +10,7 @@ import pycromanager from pycromanager import start_headless -from pycromanager.acq_util import cleanup +from pycromanager.acq_util import stop_headless import socket def is_port_in_use(port): diff --git a/pycromanager/test/test_hook_functions.py b/pycromanager/test/test_hook_functions.py index b435217f..8ca875dc 100644 --- a/pycromanager/test/test_hook_functions.py +++ b/pycromanager/test/test_hook_functions.py @@ -43,7 +43,7 @@ def hook_fn(image, metadata): def test_event_serialize_and_deserialize(launch_mm_headless): """ - Test that no information is lost when event is serialized and deserialized and passed through AcqEndJ + Test for cycle consistency of event serialization and deserialization. """ events = [ @@ -59,6 +59,8 @@ def test_event_serialize_and_deserialize(launch_mm_headless): 'properties': [['DeviceName', 'PropertyName', 'PropertyValue']]}, {'axes': {'z': 1}, 'stage_positions': [['ZDeviceName', 123.45]]}, + {'axes': {'time': 2}, + 'timeout': 1000}, ] def hook_fn(event): diff --git a/pycromanager/zmq_bridge/_bridge.py b/pycromanager/zmq_bridge/_bridge.py index 4e949a40..c101d9f9 100644 --- a/pycromanager/zmq_bridge/_bridge.py +++ b/pycromanager/zmq_bridge/_bridge.py @@ -640,7 +640,12 @@ def __del__(self): print('DEBUG: destructor for {} on thread {}'.format( str(self), threading.current_thread().name)) print('DEBUG: thread name: {}'.format(threading.current_thread().name)) - self._close() + try: + self._close() + except Exception as e: + traceback.print_exc() + print('Exception in destructor for {} on thread {}'.format( + str(self), threading.current_thread().name)) def _access_field(self, name): """