Skip to content

Commit

Permalink
Merge branch 'parallel-deadlock-fix' into pr-13518-and-13621
Browse files Browse the repository at this point in the history
  • Loading branch information
ssalonen committed Nov 5, 2022
2 parents 99e309e + 62236d9 commit eb0c5c5
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collector;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.generic.utils.FutureCollector;
Expand Down Expand Up @@ -105,8 +107,14 @@ public AbstractMQTTThingHandler(Thing thing, int subscribeTimeout) {
* @return A future that completes normal on success and exceptionally on any errors.
*/
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
return availabilityStates.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, 0))
.collect(FutureCollector.allOf());
@NonNull
Collector<@NonNull CompletableFuture<@Nullable Void>, @NonNull Set<@NonNull CompletableFuture<@Nullable Void>>, @NonNull CompletableFuture<@Nullable Void>> allOfCollector = FutureCollector
.allOf();
return availabilityStates.values().stream().map(cChannel -> {
final @NonNull CompletableFuture<@Nullable Void> fut;
fut = cChannel == null ? CompletableFuture.completedFuture(null) : cChannel.start(connection, scheduler, 0);
return fut;
}).collect(allOfCollector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.generic.AvailabilityTracker;
Expand Down Expand Up @@ -147,9 +149,12 @@ public void processMessage(String topic, byte[] payload) {
this.discoverTime = discoverTime;
this.discoveredListener = componentsDiscoveredListener;
this.connectionRef = new WeakReference<>(connection);
@NonNull
Collector<@NonNull CompletableFuture<@NonNull Boolean>, @NonNull Set<@NonNull CompletableFuture<@NonNull Boolean>>, @NonNull CompletableFuture<@Nullable Void>> allOfCollector = FutureCollector
.allOf();

// Subscribe to the wildcard topic and start receive MQTT retained topics
this.topics.parallelStream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
this.topics.stream().map(t -> connection.subscribe(t, this)).collect(allOfCollector)
.thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail);

return discoverFinishedFuture;
Expand All @@ -161,7 +166,7 @@ private void subscribeSuccess() {
if (connection != null && discoverTime > 0) {
this.stopDiscoveryFuture = scheduler.schedule(() -> {
this.stopDiscoveryFuture = null;
this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
this.discoveredListener = null;
discoverFinishedFuture.complete(null);
}, discoverTime, TimeUnit.MILLISECONDS);
Expand All @@ -180,7 +185,7 @@ private void subscribeSuccess() {
this.discoveredListener = null;
final MqttBrokerConnection connection = connectionRef.get();
if (connection != null) {
this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
connectionRef.clear();
}
discoverFinishedFuture.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
Expand Down Expand Up @@ -120,8 +123,11 @@ public void setConfigSeen() {
*/
public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
int timeout) {
return channels.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, timeout))
.collect(FutureCollector.allOf());
@NonNull
Collector<@NonNull CompletableFuture<@Nullable Void>, @NonNull Set<@NonNull CompletableFuture<@Nullable Void>>, @NonNull CompletableFuture<@Nullable Void>> allOfCollector = FutureCollector
.allOf();
return channels.values().stream().map(cChannel -> cChannel.start(connection, scheduler, timeout))
.collect(allOfCollector);
}

/**
Expand All @@ -131,7 +137,10 @@ public void setConfigSeen() {
* exceptionally on errors.
*/
public CompletableFuture<@Nullable Void> stop() {
return channels.values().parallelStream().map(ComponentChannel::stop).collect(FutureCollector.allOf());
@NonNull
Collector<@NonNull CompletableFuture<@Nullable Void>, @NonNull Set<@NonNull CompletableFuture<@Nullable Void>>, @NonNull CompletableFuture<@Nullable Void>> allOfCollector = FutureCollector
.allOf();
return channels.values().stream().map(ComponentChannel::stop).collect(allOfCollector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
Expand Down Expand Up @@ -195,7 +197,7 @@ public CompletableFuture<Void> unsubscribeAll() {
// Start all known components and channels within the components and put the Thing offline
// if any subscribing failed ( == broker connection lost)
CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
haComponents.values().parallelStream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
.reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
// one
.exceptionally(e -> {
Expand All @@ -212,10 +214,13 @@ protected void stop() {
if (started) {
discoverComponents.stopDiscovery();
delayedProcessing.join();
@NonNull
Collector<@NonNull CompletableFuture<@Nullable Void>, @NonNull Set<@NonNull CompletableFuture<@Nullable Void>>, @NonNull CompletableFuture<@Nullable Void>> allOfCollector = FutureCollector
.allOf();
// haComponents does not need to be synchronised -> the discovery thread is disabled
haComponents.values().parallelStream().map(AbstractComponent::stop) //
haComponents.values().stream().map(AbstractComponent::stop) //
// we need to join all the stops, otherwise they might not be done when start is called
.collect(FutureCollector.allOf()).join();
.collect(allOfCollector).join();

started = false;
}
Expand Down

0 comments on commit eb0c5c5

Please sign in to comment.