Skip to content

Commit

Permalink
[mqtt] Discovery services shall not unsubscribe unless they have alre…
Browse files Browse the repository at this point in the history
…ady subscribed (openhab#10566)

* [mqqt] do not allow unsubscribe unless already subscribed

Signed-off-by: Andrew Fiddian-Green <[email protected]>
Signed-off-by: Dave J Schoepel <[email protected]>
  • Loading branch information
andrewfg authored and dschoepel committed Nov 9, 2021
1 parent dbb428b commit 7356585
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand Down Expand Up @@ -48,11 +49,32 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp

private @Nullable ScheduledFuture<?> scheduledStop;

private AtomicBoolean isSubscribed;

public AbstractMQTTDiscovery(@Nullable Set<ThingTypeUID> supportedThingTypes, int timeout,
boolean backgroundDiscoveryEnabledByDefault, String baseTopic) {
super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault);
this.subscribeTopic = baseTopic;
this.timeout = timeout;
isSubscribed = new AtomicBoolean(false);
}

/**
* Only subscribe if we were not already subscribed
*/
private void subscribe() {
if (!isSubscribed.getAndSet(true)) {
getDiscoveryService().subscribe(this, subscribeTopic);
}
}

/**
* Only unsubscribe if we were already subscribed
*/
private void unSubscribe() {
if (isSubscribed.getAndSet(false)) {
getDiscoveryService().unsubscribe(this);
}
}

/**
Expand Down Expand Up @@ -94,7 +116,7 @@ protected void startScan() {
return;
}
resetTimeout();
getDiscoveryService().subscribe(this, subscribeTopic);
subscribe();
}

@Override
Expand All @@ -104,7 +126,7 @@ protected synchronized void stopScan() {
return;
}
stopTimeout();
getDiscoveryService().unsubscribe(this);
unSubscribe();
super.stopScan();
}

Expand All @@ -118,11 +140,11 @@ public synchronized void abortScan() {
protected void startBackgroundDiscovery() {
// Remove results that are restored after a restart
removeOlderResults(new Date().getTime());
getDiscoveryService().subscribe(this, subscribeTopic);
subscribe();
}

@Override
protected void stopBackgroundDiscovery() {
getDiscoveryService().unsubscribe(this);
unSubscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
*/
package org.openhab.binding.mqtt.internal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -54,11 +52,22 @@
@Component(service = { ThingHandlerFactory.class,
MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory")
public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService {

private static final Set<ThingTypeUID> SUPPORTED_THING_TYPES_UIDS = Stream
.of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
.collect(Collectors.toSet());

private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class);
protected final Map<String, List<MQTTTopicDiscoveryParticipant>> discoveryTopics = new HashMap<>();

/**
* This Map provides a lookup between a Topic string (key) and a Set of MQTTTopicDiscoveryParticipants (value),
* where the Set itself is a list of participants which are subscribed to the respective Topic.
*/
protected final Map<String, Set<MQTTTopicDiscoveryParticipant>> discoveryTopics = new ConcurrentHashMap<>();

/**
* This Set contains a list of all the Broker handlers that have been created by this factory
*/
protected final Set<AbstractBrokerHandler> handlers = Collections
.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));

Expand All @@ -75,12 +84,13 @@ public boolean supportsThingType(ThingTypeUID thingTypeUID) {
}

/**
* Add the given broker connection to all listeners.
* Add the given broker handler to the list of known handlers. And then iterate over all topics and their respective
* list of listeners, and register the respective new listener and topic with the given new broker handler.
*/
protected void createdHandler(AbstractBrokerHandler handler) {
handlers.add(handler);
discoveryTopics.forEach((topic, listenerList) -> {
listenerList.forEach(listener -> {
discoveryTopics.forEach((topic, listeners) -> {
listeners.forEach(listener -> {
handler.registerDiscoveryListener(listener, topic);
});
});
Expand Down Expand Up @@ -111,24 +121,33 @@ protected void createdHandler(AbstractBrokerHandler handler) {
/**
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to
* a MQTT topic that is registered on all available broker connections.
*
* Checks each topic, and if the listener is not already in the listener list for that topic, adds itself from that
* list, and registers itself and the respective topic with all the known brokers.
*/
@Override
@SuppressWarnings("null")
public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
List<MQTTTopicDiscoveryParticipant> listenerList = discoveryTopics.computeIfAbsent(topic,
t -> new ArrayList<>());
listenerList.add(listener);
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
Set<MQTTTopicDiscoveryParticipant> listeners = discoveryTopics.computeIfAbsent(topic,
t -> ConcurrentHashMap.newKeySet());
if (listeners.add(listener)) {
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
}
}

/**
* Unsubscribe a listener from all available broker connections.
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can unsubscribe from
* a MQTT topic that is registered on all available broker connections.
*
* Checks each topic, and if the listener is in the listener list for that topic, removes itself from that list, and
* unregisters itself and the respective topic from all the known brokers.
*/
@Override
@SuppressWarnings("null")
public void unsubscribe(MQTTTopicDiscoveryParticipant listener) {
discoveryTopics.forEach((topic, listenerList) -> {
listenerList.remove(listener);
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
discoveryTopics.forEach((topic, listeners) -> {
if (listeners.remove(listener)) {
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
}
});
}

Expand Down

0 comments on commit 7356585

Please sign in to comment.