From 604f1bf8ae8e2fe3267a1dcd1b65cee5cebb4063 Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Fri, 18 Aug 2023 22:14:52 -0700 Subject: [PATCH 01/14] add test for timeout cycle consistency --- pycromanager/test/test_hook_functions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pycromanager/test/test_hook_functions.py b/pycromanager/test/test_hook_functions.py index aabfbb9f..37142999 100644 --- a/pycromanager/test/test_hook_functions.py +++ b/pycromanager/test/test_hook_functions.py @@ -41,7 +41,7 @@ def hook_fn(image, metadata): def test_event_serialize_and_deserialize(launch_mm_headless): """ - Test that no information is lost when event is serialized and deserialized and passed through AcqEndJ + Test for cycle consistency of event serialization and deserialization. """ events = [ @@ -57,6 +57,8 @@ def test_event_serialize_and_deserialize(launch_mm_headless): 'properties': [['DeviceName', 'PropertyName', 'PropertyValue']]}, {'axes': {'z': 1}, 'stage_positions': [['ZDeviceName', 123.45]]}, + {'axes': {'time': 2}, + 'timeout': 1000}, ] def hook_fn(event): From 59c1e6a2a1e8f8a0f297c32b4f46e25b3152c79c Mon Sep 17 00:00:00 2001 From: AcqEngJ-Bot <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 19 Aug 2023 05:25:17 +0000 Subject: [PATCH 02/14] update AcqEngJ version and PycroManagerJava version (Created by Github action) --- java/pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index b4c7be07..c9a03a96 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.micro-manager.pycro-manager PycroManagerJava - 0.41.7 + 0.41.8 jar Pycro-Manager Java The Java components of Pycro-Manager @@ -54,7 +54,7 @@ org.micro-manager.acqengj AcqEngJ - 0.30.0 + 0.31.0 org.micro-manager.ndviewer @@ -144,4 +144,4 @@ - + \ No newline at end of file From 74e4b12ff89b1e2edcc3f1618ca140fb6b350c60 Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Sat, 19 Aug 2023 17:10:07 -0700 Subject: [PATCH 03/14] change default queue size --- java/pom.xml | 4 +- .../remote/RemoteImageProcessor.java | 2 - .../remote/RemoteNotificationHandler.java | 89 +++++++++++++++++++ .../remote/RemoteStorageMonitor.java | 18 ++-- pycromanager/_version.py | 2 +- pycromanager/acquisitions.py | 58 ++++++++++++ 6 files changed, 155 insertions(+), 18 deletions(-) create mode 100644 java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java diff --git a/java/pom.xml b/java/pom.xml index c1d44b11..8245a342 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.micro-manager.pycro-manager PycroManagerJava - 0.41.4 + 0.42.0 jar Pycro-Manager Java The Java components of Pycro-Manager @@ -144,4 +144,4 @@ - \ No newline at end of file + diff --git a/java/src/main/java/org/micromanager/remote/RemoteImageProcessor.java b/java/src/main/java/org/micromanager/remote/RemoteImageProcessor.java index 3ad7ceca..141d3078 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteImageProcessor.java +++ b/java/src/main/java/org/micromanager/remote/RemoteImageProcessor.java @@ -20,8 +20,6 @@ import org.micromanager.internal.zmq.ZMQPushSocket; import org.micromanager.internal.zmq.ZMQUtil; -// TODO: this class now duplicates functionality of AsyncImageProcessor in AcqEngJ - /** * Implements an ImageProcessor that sends/recieves images from a remote source * using ZMQ push/pull sockets. This enables image processing in Python/NumPy diff --git a/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java b/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java new file mode 100644 index 00000000..d0312197 --- /dev/null +++ b/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java @@ -0,0 +1,89 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.micromanager.remote; + +import mmcorej.org.json.JSONException; +import org.micromanager.acqj.api.AcqNotificationListener; +import org.micromanager.acqj.api.AcquisitionAPI; +import org.micromanager.acqj.main.AcqNotification; +import org.micromanager.acqj.main.Acquisition; +import org.micromanager.internal.zmq.ZMQPushSocket; +import org.micromanager.ndtiffstorage.IndexEntryData; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * A class that broadcasts information about images that have finsihed saving to disk + * @author henrypinkard + */ +public class RemoteNotificationHandler implements AcqNotificationListener { + + private ZMQPushSocket pushSocket_; + private ExecutorService executor_ = Executors.newSingleThreadExecutor((Runnable r) -> { + return new Thread(r, "Remote Event Source thread"); + }); + private LinkedBlockingDeque notifications_ = new LinkedBlockingDeque(); + + /** + * Called by python side + */ + public RemoteNotificationHandler(AcquisitionAPI acq) { + acq.addAcqNotificationListener(this); + pushSocket_ = new ZMQPushSocket( + t -> { + try { + return t.toJSON(); + } catch (JSONException e) { + throw new RuntimeException("Problem with notification socket"); + } + }); + } + + /** + * Start pushing out the indices to the other side + */ + public void start() { + //constantly poll the socket for more event sequences to submit + executor_.submit(() -> { + while (true) { + AcqNotification e = null; + try { + e = notifications_.takeFirst(); + } catch (InterruptedException ex) { + // this should never happen + ex.printStackTrace(); + throw new RuntimeException(ex); + } + + pushSocket_.push(e); + if (e.isAcquisitionFinishedNotification()) { + return ; + } + } + }); + } + + @Override + public void postNotification(AcqNotification n) { + notifications_.add(n); + } + + /** + * Called by the python side to signal that the final shutdown signal has been received + * and that the push socket can be closed + */ + public void notificationHandlingComplete() { + executor_.shutdown(); + pushSocket_.close(); + } + + public int getPort() { + return pushSocket_.getPort(); + } + +} diff --git a/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java b/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java index d4c69c74..106770c2 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java +++ b/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java @@ -84,6 +84,11 @@ public void imageWritten(IndexEntryData ied) { indexEntries_.addLast(ied); } + @Override + public void awaitCompletion() { + //deprecated + } + /** * Called by the python side to signal that the final shutdown signal has been received * and that the push socket can be closed @@ -93,17 +98,4 @@ public void storageMonitoringComplete() { pushSocket_.close(); } - @Override - public void awaitCompletion() { - // No need to do this, because the storage sould shutdown irrespective of this montior - // which exists on top of it - -// while (!executor_.isTerminated()) { -// try { -// Thread.sleep(5); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// } - } } diff --git a/pycromanager/_version.py b/pycromanager/_version.py index bb947852..0a56e4ce 100644 --- a/pycromanager/_version.py +++ b/pycromanager/_version.py @@ -1,2 +1,2 @@ -version_info = (0, 27, 6) +version_info = (0, 28, 0) __version__ = ".".join(map(str, version_info)) diff --git a/pycromanager/acquisitions.py b/pycromanager/acquisitions.py index 1d43ec06..1dfc35c5 100644 --- a/pycromanager/acquisitions.py +++ b/pycromanager/acquisitions.py @@ -215,13 +215,29 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect index_entry = message["index_entry"] axes = dataset._add_index_entry(index_entry) + acquisition._notification_queue.put({'type': 'image_saved', 'axes': axes}) dataset._new_image_arrived = True if callback is not None: callback(axes, dataset) except Exception as e: acquisition.abort(e) +def _notification_handler_fn(acquisition, notification_push_port, connected_event, debug=False): + monitor_socket = PullSocket(notification_push_port) + connected_event.set() + + while True: + try: + message = monitor_socket.receive() + acquisition._notification_queue.put(message) + if "acq_finished" in message["type"]: + # Poison, time to shut down + monitor_socket.close() + return + + except Exception as e: + acquisition.abort(e) class Acquisition(object, metaclass=NumpyDocstringInheritanceMeta): """ @@ -318,6 +334,7 @@ def __init__( self._timeout = timeout self._nd_viewer = None self._napari_viewer = None + self._notification_queue = queue.Queue(30) # Get a dict of all named argument values (or default values when nothing provided) arg_names = [k for k in signature(Acquisition.__init__).parameters.keys() if k != 'self'] @@ -337,6 +354,16 @@ def __init__( self._initialize_image_processor(**named_args) self._initialize_hooks(**named_args) + # Start remote acquisition + try: + self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler', + args=(self._remote_acq,), + port=self._port, new_socket=True) + self._acq_notification_thread = self._add_notification_handler_fn() + except: + warnings.warn('Could not create acquisition notification handler. This should not affect performance,' + ' but indicates that Micro-Manager is out of date') + # Acquistiion.start is now deprecated, so this can be removed later # Acquisitions now get started automatically when the first events submitted # but Magellan acquisitons (and probably others that generate their own events) @@ -413,6 +440,11 @@ def await_completion(self): time.sleep(1 if self._debug else 0.05) self._check_for_exceptions() + if hasattr(self, '_acq_notification_thread'): + # for backwards compatiblitiy with older versions of Pycromanager java before this added + self._acq_notification_thread.join() + self._remote_notification_handler.notification_handling_complete() + # Wait on all the other threads to shut down properly if hasattr(self, '_storage_monitor_thread'): self._storage_monitor_thread.join() @@ -493,6 +525,32 @@ def __exit__(self, exc_type, exc_val, exc_tb): ######## Private methods ########### + def _add_notification_handler_fn(self): + """ + Add a callback function that gets called whenever a new notification is received + """ + connected_event = threading.Event() + + pull_port = self._remote_notification_handler.get_port() + notification_thread = threading.Thread( + target=_notification_handler_fn, + args=( + self, + pull_port, + connected_event, + self._debug, + ), + name="NotificationHandlerThread", + ) + + # Wait for pulling to start before you signal for pushing to start + notification_thread.start() + connected_event.wait() + + # start pushing out all the notifications + self._remote_notification_handler.start() + return notification_thread + def _add_storage_monitor_fn(self, callback_fn=None, debug=False): """ Add a callback function that gets called whenever a new image is writtern to disk (for acquisitions in From 5e4f2170e6483dffcb2fa9bd2b80615b6ea29b29 Mon Sep 17 00:00:00 2001 From: AcqEngJ-Bot <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 20 Aug 2023 00:16:34 +0000 Subject: [PATCH 04/14] update AcqEngJ version and PycroManagerJava version (Created by Github action) --- java/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index c9a03a96..d9d4b426 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.micro-manager.pycro-manager PycroManagerJava - 0.41.8 + 0.41.9 jar Pycro-Manager Java The Java components of Pycro-Manager @@ -54,7 +54,7 @@ org.micro-manager.acqengj AcqEngJ - 0.31.0 + 0.32.0 org.micro-manager.ndviewer From 81e3076537df5983e4728d5980b73a709e244d0b Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Thu, 24 Aug 2023 10:04:06 -0700 Subject: [PATCH 05/14] fix some potential threading issues and comment out notifications for now to ensure things still work --- java/pom.xml | 2 +- .../remote/RemoteNotificationHandler.java | 39 ++++++++++----- .../remote/RemoteStorageMonitor.java | 49 ++++++++++++------- pycromanager/acquisitions.py | 49 +++++++++++-------- pycromanager/zmq_bridge/_bridge.py | 7 ++- 5 files changed, 95 insertions(+), 51 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index bc8ec303..e4b83d1d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.micro-manager.pycro-manager PycroManagerJava - 0.42.0 + 0.43.0 jar Pycro-Manager Java The Java components of Pycro-Manager diff --git a/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java b/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java index d0312197..90913987 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java +++ b/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java @@ -25,7 +25,7 @@ public class RemoteNotificationHandler implements AcqNotificationListener { private ZMQPushSocket pushSocket_; private ExecutorService executor_ = Executors.newSingleThreadExecutor((Runnable r) -> { - return new Thread(r, "Remote Event Source thread"); + return new Thread(r, "Remote notification thread"); }); private LinkedBlockingDeque notifications_ = new LinkedBlockingDeque(); @@ -34,20 +34,26 @@ public class RemoteNotificationHandler implements AcqNotificationListener { */ public RemoteNotificationHandler(AcquisitionAPI acq) { acq.addAcqNotificationListener(this); - pushSocket_ = new ZMQPushSocket( - t -> { - try { - return t.toJSON(); - } catch (JSONException e) { - throw new RuntimeException("Problem with notification socket"); - } - }); + executor_.submit(new Runnable() { + @Override + public void run() { + pushSocket_ = new ZMQPushSocket( + t -> { + try { + return t.toJSON(); + } catch (JSONException e) { + throw new RuntimeException("Problem with notification socket"); + } + }); + } + }); } /** * Start pushing out the indices to the other side */ public void start() { + System.out.println("Starting remote notifivation handler on port " + pushSocket_.getPort()); //constantly poll the socket for more event sequences to submit executor_.submit(() -> { while (true) { @@ -62,7 +68,7 @@ public void start() { pushSocket_.push(e); if (e.isAcquisitionFinishedNotification()) { - return ; + return; } } }); @@ -78,11 +84,20 @@ public void postNotification(AcqNotification n) { * and that the push socket can be closed */ public void notificationHandlingComplete() { - executor_.shutdown(); - pushSocket_.close(); + executor_.submit(() -> { + pushSocket_.close(); + executor_.shutdown(); + }); } public int getPort() { + while (pushSocket_ == null) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } return pushSocket_.getPort(); } diff --git a/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java b/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java index 106770c2..42db9c9c 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java +++ b/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java @@ -23,31 +23,37 @@ public class RemoteStorageMonitor implements ImageWrittenListener { private ZMQPushSocket pushSocket_; private ExecutorService executor_ = Executors.newSingleThreadExecutor((Runnable r) -> { - return new Thread(r, "Remote Event Source thread"); + return new Thread(r, "Remote storage monitor thread"); }); private LinkedBlockingDeque indexEntries_ = new LinkedBlockingDeque(); public RemoteStorageMonitor() { - pushSocket_ = new ZMQPushSocket( - t -> { - try { - JSONObject message = new JSONObject(); - if (t.isDataSetFinishedEntry()) { - message.put("finished", true); - } else { - message.put("index_entry", ((ByteBuffer) t.asByteBuffer()).array()); - } - return message; - } catch (JSONException e) { - throw new RuntimeException("Problem with data saved socket"); - } - }); + executor_.submit(new Runnable() { + @Override + public void run() { + pushSocket_ = new ZMQPushSocket( + t -> { + try { + JSONObject message = new JSONObject(); + if (t.isDataSetFinishedEntry()) { + message.put("finished", true); + } else { + message.put("index_entry", ((ByteBuffer) t.asByteBuffer()).array()); + } + return message; + } catch (JSONException e) { + throw new RuntimeException("Problem with data saved socket"); + } + }); + } + }); } /** * Start pushing out the indices to the other side */ public void start() { + System.out.println("Starting remote storage monitor on port " + pushSocket_.getPort()); //constantly poll the socket for more event sequences to submit executor_.submit(() -> { while (true) { @@ -73,6 +79,13 @@ public void start() { public int getPort() { + while (pushSocket_ == null) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } return pushSocket_.getPort(); } @@ -94,8 +107,10 @@ public void awaitCompletion() { * and that the push socket can be closed */ public void storageMonitoringComplete() { - executor_.shutdown(); - pushSocket_.close(); + executor_.submit(() -> { + pushSocket_.close(); + executor_.shutdown(); + }); } } diff --git a/pycromanager/acquisitions.py b/pycromanager/acquisitions.py index 1dfc35c5..2213ef42 100644 --- a/pycromanager/acquisitions.py +++ b/pycromanager/acquisitions.py @@ -193,6 +193,7 @@ def process_and_sendoff(image_tags_tuple, original_dtype): def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connected_event, image_saved_fn, event_queue, debug=False): + print('starting storage monitor on port {}'.format(storage_monitor_push_port)) monitor_socket = PullSocket(storage_monitor_push_port) connected_event.set() callback = None @@ -207,7 +208,9 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect while True: try: + print('awaiting storage callback') message = monitor_socket.receive() + print(message) if "finished" in message: # Poison, time to shut down monitor_socket.close() @@ -223,18 +226,22 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect acquisition.abort(e) def _notification_handler_fn(acquisition, notification_push_port, connected_event, debug=False): + print('starting notification handler on port {}'.format(notification_push_port)) monitor_socket = PullSocket(notification_push_port) connected_event.set() while True: try: + print('awaiting notification') message = monitor_socket.receive() + print(message) acquisition._notification_queue.put(message) + print('put message in queue') if "acq_finished" in message["type"]: # Poison, time to shut down monitor_socket.close() - return + break except Exception as e: acquisition.abort(e) @@ -355,16 +362,15 @@ def __init__( self._initialize_hooks(**named_args) # Start remote acquisition - try: - self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler', - args=(self._remote_acq,), - port=self._port, new_socket=True) - self._acq_notification_thread = self._add_notification_handler_fn() - except: - warnings.warn('Could not create acquisition notification handler. This should not affect performance,' - ' but indicates that Micro-Manager is out of date') - - # Acquistiion.start is now deprecated, so this can be removed later + # try: + # self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler', + # args=[self._remote_acq], port=self._port, new_socket=True) + # self._acq_notification_thread = self._add_notification_handler_fn() + # except: + # warnings.warn('Could not create acquisition notification handler. This should not affect performance,' + # ' but indicates that Micro-Manager is out of date') + + # Acquistition.start is now deprecated, so this can be removed later # Acquisitions now get started automatically when the first events submitted # but Magellan acquisitons (and probably others that generate their own events) # will need some new method to submit events only after image processors etc have been added @@ -440,10 +446,13 @@ def await_completion(self): time.sleep(1 if self._debug else 0.05) self._check_for_exceptions() - if hasattr(self, '_acq_notification_thread'): - # for backwards compatiblitiy with older versions of Pycromanager java before this added - self._acq_notification_thread.join() - self._remote_notification_handler.notification_handling_complete() + for hook_thread in self._hook_threads: + hook_thread.join() + + if hasattr(self, '_event_thread'): + self._event_thread.join() + + self._remote_acq = None # Wait on all the other threads to shut down properly if hasattr(self, '_storage_monitor_thread'): @@ -452,13 +461,13 @@ def await_completion(self): # tell it it is okay to shutdown its push socket self._remote_storage_monitor.storage_monitoring_complete() - for hook_thread in self._hook_threads: - hook_thread.join() + if hasattr(self, '_acq_notification_thread'): + # for backwards compatiblitiy with older versions of Pycromanager java before this added + self._acq_notification_thread.join() + self._remote_notification_handler.notification_handling_complete() + - if hasattr(self, '_event_thread'): - self._event_thread.join() - self._remote_acq = None self._finished = True def acquire(self, event_or_events: dict or list): diff --git a/pycromanager/zmq_bridge/_bridge.py b/pycromanager/zmq_bridge/_bridge.py index 4e949a40..c101d9f9 100644 --- a/pycromanager/zmq_bridge/_bridge.py +++ b/pycromanager/zmq_bridge/_bridge.py @@ -640,7 +640,12 @@ def __del__(self): print('DEBUG: destructor for {} on thread {}'.format( str(self), threading.current_thread().name)) print('DEBUG: thread name: {}'.format(threading.current_thread().name)) - self._close() + try: + self._close() + except Exception as e: + traceback.print_exc() + print('Exception in destructor for {} on thread {}'.format( + str(self), threading.current_thread().name)) def _access_field(self, name): """ From 38525edb86d3165e41dd3cca3f2e7caa6e83f17c Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Thu, 24 Aug 2023 11:13:20 -0700 Subject: [PATCH 06/14] notifications seem to work, but no external API yet --- .../remote/RemoteEventSource.java | 15 ++- pycromanager/acquisitions.py | 100 ++++++++++++------ 2 files changed, 76 insertions(+), 39 deletions(-) diff --git a/java/src/main/java/org/micromanager/remote/RemoteEventSource.java b/java/src/main/java/org/micromanager/remote/RemoteEventSource.java index 82d0ce81..d0ce2357 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteEventSource.java +++ b/java/src/main/java/org/micromanager/remote/RemoteEventSource.java @@ -34,6 +34,8 @@ public class RemoteEventSource { }); public RemoteEventSource() { + //constantly poll the socket for more event sequences to submit + executor_.submit(() -> { pullSocket_ = new ZMQPullSocket<>( t -> { try { @@ -48,8 +50,6 @@ public RemoteEventSource() { throw new RuntimeException("Incorrect format for acquisitio event"); } }); - //constantly poll the socket for more event sequences to submit - executor_.submit(() -> { try { System.out.println("pull socket started"); while (true) { @@ -59,7 +59,7 @@ public RemoteEventSource() { result.get(); //propogate any exceptions if (finished || executor_.isShutdown()) { executor_.shutdown(); - return; + break; } } } catch (InterruptedException e) { @@ -81,6 +81,14 @@ void setAcquisition(Acquisition aThis) { } public int getPort() { + while (pullSocket_ == null) { + // wait for it to be created ona different thread + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } return pullSocket_.getPort(); } @@ -110,7 +118,6 @@ void abort() { } }; - pullSocket_.close(); } } diff --git a/pycromanager/acquisitions.py b/pycromanager/acquisitions.py index 2213ef42..2fed2ea5 100644 --- a/pycromanager/acquisitions.py +++ b/pycromanager/acquisitions.py @@ -16,6 +16,7 @@ import os.path import queue from docstring_inheritance import NumpyDocstringInheritanceMeta +import traceback class AcqAlreadyCompleteException(Exception): def __init__(self, message): @@ -193,7 +194,6 @@ def process_and_sendoff(image_tags_tuple, original_dtype): def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connected_event, image_saved_fn, event_queue, debug=False): - print('starting storage monitor on port {}'.format(storage_monitor_push_port)) monitor_socket = PullSocket(storage_monitor_push_port) connected_event.set() callback = None @@ -206,15 +206,13 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect else: raise Exception('Image saved callbacks must have either 2 or three parameters') - while True: - try: - print('awaiting storage callback') + + try: + while True: message = monitor_socket.receive() - print(message) if "finished" in message: - # Poison, time to shut down - monitor_socket.close() - return + # Time to shut down + break index_entry = message["index_entry"] axes = dataset._add_index_entry(index_entry) @@ -222,29 +220,27 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect dataset._new_image_arrived = True if callback is not None: callback(axes, dataset) - except Exception as e: + except Exception as e: acquisition.abort(e) + finally: + monitor_socket.close() def _notification_handler_fn(acquisition, notification_push_port, connected_event, debug=False): - print('starting notification handler on port {}'.format(notification_push_port)) monitor_socket = PullSocket(notification_push_port) connected_event.set() - while True: - try: - print('awaiting notification') + try: + while True: message = monitor_socket.receive() - print(message) acquisition._notification_queue.put(message) - print('put message in queue') - if "acq_finished" in message["type"]: - # Poison, time to shut down - monitor_socket.close() break - except Exception as e: - acquisition.abort(e) + except Exception as e: + traceback.print_exc() + acquisition.abort(e) + finally: + monitor_socket.close() class Acquisition(object, metaclass=NumpyDocstringInheritanceMeta): """ @@ -341,7 +337,7 @@ def __init__( self._timeout = timeout self._nd_viewer = None self._napari_viewer = None - self._notification_queue = queue.Queue(30) + self._notification_queue = queue.Queue(100) # Get a dict of all named argument values (or default values when nothing provided) arg_names = [k for k in signature(Acquisition.__init__).parameters.keys() if k != 'self'] @@ -361,15 +357,16 @@ def __init__( self._initialize_image_processor(**named_args) self._initialize_hooks(**named_args) - # Start remote acquisition - # try: - # self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler', - # args=[self._remote_acq], port=self._port, new_socket=True) - # self._acq_notification_thread = self._add_notification_handler_fn() - # except: - # warnings.warn('Could not create acquisition notification handler. This should not affect performance,' - # ' but indicates that Micro-Manager is out of date') + try: + self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler', + args=[self._remote_acq], port=self._port, new_socket=True) + self._acq_notification_recieving_thread = self._start_receiving_notifications() + self._acq_notification_dispatcher_thread = self._start_notification_dispatcher() + except: + warnings.warn('Could not create acquisition notification handler. This should not affect performance,' + ' but indicates that Micro-Manager is out of date') + # Start remote acquisition # Acquistition.start is now deprecated, so this can be removed later # Acquisitions now get started automatically when the first events submitted # but Magellan acquisitons (and probably others that generate their own events) @@ -463,11 +460,10 @@ def await_completion(self): if hasattr(self, '_acq_notification_thread'): # for backwards compatiblitiy with older versions of Pycromanager java before this added - self._acq_notification_thread.join() + self._acq_notification_recieving_thread.join() + self._acq_notification_dispatcher_thread.join() self._remote_notification_handler.notification_handling_complete() - - self._finished = True def acquire(self, event_or_events: dict or list): @@ -534,9 +530,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): ######## Private methods ########### - def _add_notification_handler_fn(self): + def _start_receiving_notifications(self): """ - Add a callback function that gets called whenever a new notification is received + Thread that runs a function that pulls notifications from the acquisition engine and puts them on a queue + This is not all notifications, just ones that are relevant to the acquisition. Specifically, it does not + include notifications the progress of data saving """ connected_event = threading.Event() @@ -551,7 +549,7 @@ def _add_notification_handler_fn(self): ), name="NotificationHandlerThread", ) - + # # Wait for pulling to start before you signal for pushing to start notification_thread.start() connected_event.wait() @@ -560,6 +558,38 @@ def _add_notification_handler_fn(self): self._remote_notification_handler.start() return notification_thread + def _start_notification_dispatcher(self): + """ + Thread that runs a function that pulls notifications from the queue on the python side and dispatches + them to the appropriate listener + """ + def dispatch_notifications(): + while True: + # dispatch notifications to all listeners + try: + notification = self._notification_queue.get(timeout=0.05) # 50 ms timeout + except queue.Empty: + storage_monitoring_ongoing = hasattr(self, '_storage_monitor_thread')\ + and self._storage_monitor_thread.is_alive() + acq_notifications_ongoing = hasattr(self, '_acq_notification_recieving_thread')\ + and self._acq_notification_recieving_thread.is_alive() + if not storage_monitoring_ongoing and not acq_notifications_ongoing and self._notification_queue.empty(): + # if all the threads have shut down and the queue is empty, then shut down + break + else: + # TODO dispatch them to all listeners + # print(notification) + pass + + + dispatcher_thread = threading.Thread( + target=dispatch_notifications, + name="NotificationDispatcherThread", + ) + dispatcher_thread.start() + return dispatcher_thread + + def _add_storage_monitor_fn(self, callback_fn=None, debug=False): """ Add a callback function that gets called whenever a new image is writtern to disk (for acquisitions in From da16dd856bff53d8bdc498512ca249cc1093e513 Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Thu, 24 Aug 2023 12:32:05 -0700 Subject: [PATCH 07/14] fix tests --- .../main/java/org/micromanager/remote/RemoteEventSource.java | 1 - .../org/micromanager/remote/RemoteNotificationHandler.java | 1 - .../java/org/micromanager/remote/RemoteStorageMonitor.java | 1 - pycromanager/acquisitions.py | 5 +++-- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/java/src/main/java/org/micromanager/remote/RemoteEventSource.java b/java/src/main/java/org/micromanager/remote/RemoteEventSource.java index d0ce2357..461ce681 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteEventSource.java +++ b/java/src/main/java/org/micromanager/remote/RemoteEventSource.java @@ -51,7 +51,6 @@ public RemoteEventSource() { } }); try { - System.out.println("pull socket started"); while (true) { List eList = pullSocket_.next(); boolean finished = eList.get(eList.size() - 1).isAcquisitionFinishedEvent(); diff --git a/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java b/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java index 90913987..f2caddbc 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java +++ b/java/src/main/java/org/micromanager/remote/RemoteNotificationHandler.java @@ -53,7 +53,6 @@ public void run() { * Start pushing out the indices to the other side */ public void start() { - System.out.println("Starting remote notifivation handler on port " + pushSocket_.getPort()); //constantly poll the socket for more event sequences to submit executor_.submit(() -> { while (true) { diff --git a/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java b/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java index 42db9c9c..e6890299 100644 --- a/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java +++ b/java/src/main/java/org/micromanager/remote/RemoteStorageMonitor.java @@ -53,7 +53,6 @@ public void run() { * Start pushing out the indices to the other side */ public void start() { - System.out.println("Starting remote storage monitor on port " + pushSocket_.getPort()); //constantly poll the socket for more event sequences to submit executor_.submit(() -> { while (true) { diff --git a/pycromanager/acquisitions.py b/pycromanager/acquisitions.py index 2fed2ea5..a65941d8 100644 --- a/pycromanager/acquisitions.py +++ b/pycromanager/acquisitions.py @@ -458,11 +458,12 @@ def await_completion(self): # tell it it is okay to shutdown its push socket self._remote_storage_monitor.storage_monitoring_complete() - if hasattr(self, '_acq_notification_thread'): + if hasattr(self, '_acq_notification_recieving_thread'): # for backwards compatiblitiy with older versions of Pycromanager java before this added self._acq_notification_recieving_thread.join() - self._acq_notification_dispatcher_thread.join() self._remote_notification_handler.notification_handling_complete() + self._acq_notification_dispatcher_thread.join() + self._finished = True From b0cb55e19cc5438b93cd81ee7742e75f4ac3290c Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Thu, 24 Aug 2023 12:52:14 -0700 Subject: [PATCH 08/14] fix function for shutting down headless process --- pycromanager/_version.py | 2 +- pycromanager/acq_util.py | 5 +++-- pycromanager/test/conftest.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pycromanager/_version.py b/pycromanager/_version.py index df280057..0a56e4ce 100644 --- a/pycromanager/_version.py +++ b/pycromanager/_version.py @@ -1,2 +1,2 @@ -version_info = (0, 27, 5) +version_info = (0, 28, 0) __version__ = ".".join(map(str, version_info)) diff --git a/pycromanager/acq_util.py b/pycromanager/acq_util.py index c0b3152e..fd2c01a8 100644 --- a/pycromanager/acq_util.py +++ b/pycromanager/acq_util.py @@ -11,13 +11,14 @@ SUBPROCESSES = [] -def cleanup(): +def stop_headless(): for p in SUBPROCESSES: p.terminate() p.wait() # wait for process to terminate + SUBPROCESSES.clear() # make sure any Java processes are cleaned up when Python exits -atexit.register(cleanup) +atexit.register(stop_headless) def start_headless( mm_app_path: str, config_file: str='', java_loc: str=None, core_log_path: str='', diff --git a/pycromanager/test/conftest.py b/pycromanager/test/conftest.py index ca6aef1c..5fe3cf84 100644 --- a/pycromanager/test/conftest.py +++ b/pycromanager/test/conftest.py @@ -10,7 +10,7 @@ import pycromanager from pycromanager import start_headless -from pycromanager.acq_util import cleanup +from pycromanager.acq_util import stop_headless import socket def is_port_in_use(port): From 75c5d6d17eb79ffcb45ba5423f84ea488301992d Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Thu, 24 Aug 2023 13:13:52 -0700 Subject: [PATCH 09/14] bump required acqengj version --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index e4b83d1d..fbeb67a1 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -54,7 +54,7 @@ org.micro-manager.acqengj AcqEngJ - 0.32.0 + 0.32.1 org.micro-manager.ndviewer From d152a994155ee04f48f5b602bf5bb89515b1d02b Mon Sep 17 00:00:00 2001 From: AcqEngJ-Bot <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 24 Aug 2023 20:27:54 +0000 Subject: [PATCH 10/14] update AcqEngJ version and PycroManagerJava version (Created by Github action) --- java/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index d9d4b426..e230a39c 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.micro-manager.pycro-manager PycroManagerJava - 0.41.9 + 0.41.10 jar Pycro-Manager Java The Java components of Pycro-Manager @@ -54,7 +54,7 @@ org.micro-manager.acqengj AcqEngJ - 0.32.0 + 0.32.1 org.micro-manager.ndviewer From e6a925d2794c6609499af1a2113a00e8b6d89ddb Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Thu, 24 Aug 2023 13:52:37 -0700 Subject: [PATCH 11/14] bump acqengj version --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index fbeb67a1..0ed8ce65 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -54,7 +54,7 @@ org.micro-manager.acqengj AcqEngJ - 0.32.1 + 0.32.2 org.micro-manager.ndviewer From c81eba135ea859caa5a4cfbc81824d3eee23eef0 Mon Sep 17 00:00:00 2001 From: AcqEngJ-Bot <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 24 Aug 2023 21:10:17 +0000 Subject: [PATCH 12/14] update AcqEngJ version and PycroManagerJava version (Created by Github action) --- java/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index e230a39c..ea0057dd 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.micro-manager.pycro-manager PycroManagerJava - 0.41.10 + 0.41.11 jar Pycro-Manager Java The Java components of Pycro-Manager @@ -54,7 +54,7 @@ org.micro-manager.acqengj AcqEngJ - 0.32.1 + 0.32.2 org.micro-manager.ndviewer From 9cd5ae55a7153d2ae9161e86454aff2f4ddf8739 Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Thu, 24 Aug 2023 21:23:33 -0700 Subject: [PATCH 13/14] added python API for dealing with notifications --- pycromanager/acquisitions.py | 26 ++++++-- pycromanager/notifications.py | 117 ++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 6 deletions(-) create mode 100644 pycromanager/notifications.py diff --git a/pycromanager/acquisitions.py b/pycromanager/acquisitions.py index a65941d8..f35ae062 100644 --- a/pycromanager/acquisitions.py +++ b/pycromanager/acquisitions.py @@ -2,6 +2,7 @@ The Pycro-manager Acquisiton system """ import warnings +import weakref import numpy as np import multiprocessing @@ -17,6 +18,7 @@ import queue from docstring_inheritance import NumpyDocstringInheritanceMeta import traceback +from pycromanager.notifications import AcqNotification, AcquisitionFuture class AcqAlreadyCompleteException(Exception): def __init__(self, message): @@ -216,7 +218,7 @@ def _storage_monitor_fn(acquisition, dataset, storage_monitor_push_port, connect index_entry = message["index_entry"] axes = dataset._add_index_entry(index_entry) - acquisition._notification_queue.put({'type': 'image_saved', 'axes': axes}) + acquisition._notification_queue.put(AcqNotification.make_image_saved_notification(axes)) dataset._new_image_arrived = True if callback is not None: callback(axes, dataset) @@ -232,7 +234,7 @@ def _notification_handler_fn(acquisition, notification_push_port, connected_even try: while True: message = monitor_socket.receive() - acquisition._notification_queue.put(message) + acquisition._notification_queue.put(AcqNotification.from_json(message)) if "acq_finished" in message["type"]: break @@ -338,6 +340,7 @@ def __init__( self._nd_viewer = None self._napari_viewer = None self._notification_queue = queue.Queue(100) + self._acq_futures = [] # Get a dict of all named argument values (or default values when nothing provided) arg_names = [k for k in signature(Acquisition.__init__).parameters.keys() if k != 'self'] @@ -488,7 +491,16 @@ def acquire(self, event_or_events: dict or list): return _validate_acq_events(event_or_events) + + axes_or_axes_list = event_or_events['axes'] if type(event_or_events) == dict\ + else [e['axes'] for e in event_or_events] + acq_future = AcquisitionFuture(self, axes_or_axes_list) + self._acq_futures.append(weakref.ref(acq_future)) + # clear out old weakrefs + self._acq_futures = [f for f in self._acq_futures if f() is not None] + self._event_queue.put(event_or_events) + return acq_future def abort(self, exception=None): """ @@ -578,10 +590,12 @@ def dispatch_notifications(): # if all the threads have shut down and the queue is empty, then shut down break else: - # TODO dispatch them to all listeners - # print(notification) - pass - + # print(notification.to_json()) + for future in self._acq_futures: + strong_ref = future() + if strong_ref is not None: + strong_ref._notify(notification) + # TODO: can also add a user-specified notification callback dispatcher_thread = threading.Thread( target=dispatch_notifications, diff --git a/pycromanager/notifications.py b/pycromanager/notifications.py new file mode 100644 index 00000000..4f5841e5 --- /dev/null +++ b/pycromanager/notifications.py @@ -0,0 +1,117 @@ +import threading + +class AcqNotification: + + class Global: + ACQ_STARTED = "acq_started" + ACQ_FINISHED = "acq_finished" + + class Hardware: + PRE_HARDWARE = "pre_hardware" + POST_HARDWARE = "post_hardware" + + class Camera: + PRE_SEQUENCE_STARTED = "pre_sequence_started" + PRE_SNAP = "pre_snap" + POST_EXPOSURE = "post_exposure" + + class Image: + IMAGE_SAVED = "image_saved" + + def __init__(self, type, axes, phase=None): + if type is None: + # then figure it out based on the phase + if phase in [AcqNotification.Camera.PRE_SNAP, AcqNotification.Camera.POST_EXPOSURE, + AcqNotification.Camera.PRE_SEQUENCE_STARTED]: + type = AcqNotification.Camera + elif phase in [AcqNotification.Hardware.PRE_HARDWARE, AcqNotification.Hardware.POST_HARDWARE]: + type = AcqNotification.Hardware + elif phase == AcqNotification.Image.IMAGE_SAVED: + type = AcqNotification.Image + else: + raise ValueError("Unknown phase") + self.type = type + self.phase = phase + self.axes = axes + + @staticmethod + def make_image_saved_notification(axes): + return AcqNotification(AcqNotification.Image, axes, AcqNotification.Image.IMAGE_SAVED) + + def to_json(self): + return { + 'type': self.type, + 'phase': self.phase, + 'axes': self.axes, + } + + @staticmethod + def from_json(json): + return AcqNotification(json['type'], json['axes'] if 'axes' in json else None, + json['phase'] if 'phase' in json else None) + + +def _axes_to_key(axes_or_axes_list): + """ Turn axes into a hashable key """ + return frozenset(axes_or_axes_list.items()) + + +class AcquisitionFuture: + + def __init__(self, acq, axes_or_axes_list): + """ + :param event_or_events: a single event (dictionary) or a list of events + """ + self._acq = acq + self._condition = threading.Condition() + self._notification_recieved = {} + if isinstance(axes_or_axes_list, dict): + axes_or_axes_list = [axes_or_axes_list] + for axes in axes_or_axes_list: + # single event + # TODO maybe unify snap and sequence cause this is confusing + self._notification_recieved[_axes_to_key(axes)] = { + AcqNotification.Hardware.PRE_HARDWARE: False, + AcqNotification.Hardware.POST_HARDWARE: False, + + AcqNotification.Camera.PRE_SNAP: False, + AcqNotification.Camera.PRE_SEQUENCE_STARTED: False, + AcqNotification.Camera.POST_EXPOSURE: False, + + AcqNotification.Image.IMAGE_SAVED: False, + } + + + def _notify(self, notification): + """ + Called by the internal notification dispatcher in order so that it can check off that the notification was + received. Want to store this, rather than just waiting around for it, in case the await methods are called + after the notification has already been sent. + """ + if notification.type == AcqNotification.Global.ACQ_FINISHED: + return # ignore for now... + key = _axes_to_key(notification.axes) + if key not in self._notification_recieved.keys(): + return # ignore notifications that aren't relevant to this future + self._notification_recieved[key][notification.phase] = True + with self._condition: + self._condition.notify_all() + + def await_execution(self, axes, phase): + key = _axes_to_key(axes) + if key not in self._notification_recieved.keys() or phase not in self._notification_recieved[key].keys(): + notification = AcqNotification(None, axes, phase) + raise ValueError("this future is not expecting a notification for: " + str(notification.to_json())) + with self._condition: + while not self._notification_recieved[key][phase]: + self._condition.wait() + + def await_image_saved(self, axes, return_image=False): + key = _axes_to_key(axes) + with self._condition: + while not self._notification_recieved[key][AcqNotification.Image.IMAGE_SAVED]: + self._condition.wait() + if return_image: + return self._acq.get_dataset().read_image(**axes) + + From c3846352aefff8405b1f8fca48e890cd907c10c2 Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Fri, 25 Aug 2023 07:36:19 -0700 Subject: [PATCH 14/14] fix socket timeout errors --- pycromanager/acquisitions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pycromanager/acquisitions.py b/pycromanager/acquisitions.py index f35ae062..39588560 100644 --- a/pycromanager/acquisitions.py +++ b/pycromanager/acquisitions.py @@ -263,7 +263,7 @@ def __init__( image_saved_fn: callable=None, process: bool=False, saving_queue_size: int=20, - timeout: int=1000, + timeout: int=2000, port: int=DEFAULT_PORT, debug: int=False, core_log_debug: int=False, @@ -362,7 +362,7 @@ def __init__( try: self._remote_notification_handler = JavaObject('org.micromanager.remote.RemoteNotificationHandler', - args=[self._remote_acq], port=self._port, new_socket=True) + args=[self._remote_acq], port=self._port, new_socket=False) self._acq_notification_recieving_thread = self._start_receiving_notifications() self._acq_notification_dispatcher_thread = self._start_notification_dispatcher() except: @@ -392,7 +392,7 @@ def __init__( ndtiff_storage = data_sink.get_storage() summary_metadata = ndtiff_storage.get_summary_metadata() self._remote_storage_monitor = JavaObject('org.micromanager.remote.RemoteStorageMonitor', port=self._port, - new_socket=True) + new_socket=False) ndtiff_storage.add_image_written_listener(self._remote_storage_monitor) self._dataset = Dataset(dataset_path=self._dataset_disk_location, _summary_metadata=summary_metadata) # Monitor image arrival so they can be loaded on python side, but with no callback function