Skip to content

Commit

Permalink
Merge branch 'main' into ResamplerFix
Browse files Browse the repository at this point in the history
  • Loading branch information
Nico Stuurman committed Sep 5, 2023
2 parents 35e0d19 + 5e7eb9f commit 2acb43b
Show file tree
Hide file tree
Showing 12 changed files with 409 additions and 59 deletions.
4 changes: 2 additions & 2 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.micro-manager.pycro-manager</groupId>
<artifactId>PycroManagerJava</artifactId>
<version>0.41.7</version>
<version>0.43.0</version>
<packaging>jar</packaging>
<name>Pycro-Manager Java</name>
<description>The Java components of Pycro-Manager</description>
Expand Down Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>org.micro-manager.acqengj</groupId>
<artifactId>AcqEngJ</artifactId>
<version>0.30.0</version>
<version>0.32.2</version>
</dependency>
<dependency>
<groupId>org.micro-manager.ndviewer</groupId>
Expand Down
16 changes: 11 additions & 5 deletions java/src/main/java/org/micromanager/remote/RemoteEventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class RemoteEventSource {
});

public RemoteEventSource() {
//constantly poll the socket for more event sequences to submit
executor_.submit(() -> {
pullSocket_ = new ZMQPullSocket<>(
t -> {
try {
Expand All @@ -48,18 +50,15 @@ public RemoteEventSource() {
throw new RuntimeException("Incorrect format for acquisitio event");
}
});
//constantly poll the socket for more event sequences to submit
executor_.submit(() -> {
try {
System.out.println("pull socket started");
while (true) {
List<AcquisitionEvent> eList = pullSocket_.next();
boolean finished = eList.get(eList.size() - 1).isAcquisitionFinishedEvent();
Future result = acq_.submitEventIterator(eList.iterator());
result.get(); //propogate any exceptions
if (finished || executor_.isShutdown()) {
executor_.shutdown();
return;
break;
}
}
} catch (InterruptedException e) {
Expand All @@ -81,6 +80,14 @@ void setAcquisition(Acquisition aThis) {
}

public int getPort() {
while (pullSocket_ == null) {
// wait for it to be created ona different thread
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return pullSocket_.getPort();
}

Expand Down Expand Up @@ -110,7 +117,6 @@ void abort() {
}
};

pullSocket_.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.micromanager.internal.zmq.ZMQPushSocket;
import org.micromanager.internal.zmq.ZMQUtil;

// TODO: this class now duplicates functionality of AsyncImageProcessor in AcqEngJ

/**
* Implements an ImageProcessor that sends/recieves images from a remote source
* using ZMQ push/pull sockets. This enables image processing in Python/NumPy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.micromanager.remote;

import mmcorej.org.json.JSONException;
import org.micromanager.acqj.api.AcqNotificationListener;
import org.micromanager.acqj.api.AcquisitionAPI;
import org.micromanager.acqj.main.AcqNotification;
import org.micromanager.acqj.main.Acquisition;
import org.micromanager.internal.zmq.ZMQPushSocket;
import org.micromanager.ndtiffstorage.IndexEntryData;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

/**
* A class that broadcasts information about images that have finsihed saving to disk
* @author henrypinkard
*/
public class RemoteNotificationHandler implements AcqNotificationListener {

private ZMQPushSocket<AcqNotification> pushSocket_;
private ExecutorService executor_ = Executors.newSingleThreadExecutor((Runnable r) -> {
return new Thread(r, "Remote notification thread");
});
private LinkedBlockingDeque<AcqNotification> notifications_ = new LinkedBlockingDeque<AcqNotification>();

/**
* Called by python side
*/
public RemoteNotificationHandler(AcquisitionAPI acq) {
acq.addAcqNotificationListener(this);
executor_.submit(new Runnable() {
@Override
public void run() {
pushSocket_ = new ZMQPushSocket<AcqNotification>(
t -> {
try {
return t.toJSON();
} catch (JSONException e) {
throw new RuntimeException("Problem with notification socket");
}
});
}
});
}

/**
* Start pushing out the indices to the other side
*/
public void start() {
//constantly poll the socket for more event sequences to submit
executor_.submit(() -> {
while (true) {
AcqNotification e = null;
try {
e = notifications_.takeFirst();
} catch (InterruptedException ex) {
// this should never happen
ex.printStackTrace();
throw new RuntimeException(ex);
}

pushSocket_.push(e);
if (e.isAcquisitionFinishedNotification()) {
return;
}
}
});
}

@Override
public void postNotification(AcqNotification n) {
notifications_.add(n);
}

/**
* Called by the python side to signal that the final shutdown signal has been received
* and that the push socket can be closed
*/
public void notificationHandlingComplete() {
executor_.submit(() -> {
pushSocket_.close();
executor_.shutdown();
});
}

public int getPort() {
while (pushSocket_ == null) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return pushSocket_.getPort();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,30 @@ public class RemoteStorageMonitor implements ImageWrittenListener {

private ZMQPushSocket<IndexEntryData> pushSocket_;
private ExecutorService executor_ = Executors.newSingleThreadExecutor((Runnable r) -> {
return new Thread(r, "Remote Event Source thread");
return new Thread(r, "Remote storage monitor thread");
});
private LinkedBlockingDeque<IndexEntryData> indexEntries_ = new LinkedBlockingDeque<IndexEntryData>();

public RemoteStorageMonitor() {
pushSocket_ = new ZMQPushSocket<IndexEntryData>(
t -> {
try {
JSONObject message = new JSONObject();
if (t.isDataSetFinishedEntry()) {
message.put("finished", true);
} else {
message.put("index_entry", ((ByteBuffer) t.asByteBuffer()).array());
}
return message;
} catch (JSONException e) {
throw new RuntimeException("Problem with data saved socket");
}
});
executor_.submit(new Runnable() {
@Override
public void run() {
pushSocket_ = new ZMQPushSocket<IndexEntryData>(
t -> {
try {
JSONObject message = new JSONObject();
if (t.isDataSetFinishedEntry()) {
message.put("finished", true);
} else {
message.put("index_entry", ((ByteBuffer) t.asByteBuffer()).array());
}
return message;
} catch (JSONException e) {
throw new RuntimeException("Problem with data saved socket");
}
});
}
});
}

/**
Expand Down Expand Up @@ -73,6 +78,13 @@ public void start() {


public int getPort() {
while (pushSocket_ == null) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return pushSocket_.getPort();
}

Expand All @@ -84,26 +96,20 @@ public void imageWritten(IndexEntryData ied) {
indexEntries_.addLast(ied);
}

@Override
public void awaitCompletion() {
//deprecated
}

/**
* Called by the python side to signal that the final shutdown signal has been received
* and that the push socket can be closed
*/
public void storageMonitoringComplete() {
executor_.shutdown();
pushSocket_.close();
executor_.submit(() -> {
pushSocket_.close();
executor_.shutdown();
});
}

@Override
public void awaitCompletion() {
// No need to do this, because the storage sould shutdown irrespective of this montior
// which exists on top of it

// while (!executor_.isTerminated()) {
// try {
// Thread.sleep(5);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
}
}
2 changes: 1 addition & 1 deletion pycromanager/_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version_info = (0, 28, 1)
version_info = (0, 28, 2)
__version__ = ".".join(map(str, version_info))
5 changes: 3 additions & 2 deletions pycromanager/acq_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

SUBPROCESSES = []

def cleanup():
def stop_headless():
for p in SUBPROCESSES:
p.terminate()
p.wait() # wait for process to terminate
SUBPROCESSES.clear()

# make sure any Java processes are cleaned up when Python exits
atexit.register(cleanup)
atexit.register(stop_headless)

def start_headless(
mm_app_path: str, config_file: str='', java_loc: str=None, core_log_path: str='',
Expand Down
Loading

0 comments on commit 2acb43b

Please sign in to comment.