From a956bce44c7976a71a28033b7a2e4bbb6bf1d27a Mon Sep 17 00:00:00 2001 From: Florian Albrecht <11006895+albrechtf@users.noreply.github.com> Date: Sun, 17 Jan 2021 15:11:46 +0100 Subject: [PATCH] [mqtt] Fix avail topics subscription after Brige Restart Fixes #9850 Signed-off-by: Florian Albrecht --- .../generic/AbstractMQTTThingHandler.java | 17 +------------ .../handler/GenericMQTTThingHandler.java | 25 ++++++++++++------- .../handler/GenericThingHandlerTests.java | 12 +++++++++ 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java index 1c11e462857a2..14ecdc8510dd2 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java @@ -12,7 +12,6 @@ */ package org.openhab.binding.mqtt.generic; -import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Optional; @@ -23,12 +22,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; -import org.openhab.binding.mqtt.generic.utils.FutureCollector; import org.openhab.binding.mqtt.generic.values.OnOffValue; import org.openhab.binding.mqtt.generic.values.Value; import org.openhab.binding.mqtt.handler.AbstractBrokerHandler; @@ -195,19 +192,7 @@ public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) { // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived // class. try { - Collection> futures = availabilityStates.values().stream().map(s -> { - if (s != null) { - return s.start(connection, scheduler, 0); - } - return CompletableFuture.allOf(); - }).collect(Collectors.toList()); - - futures.add(start(connection)); - - futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> { - updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage()); - return null; - }).get(subscribeTimeout, TimeUnit.MILLISECONDS); + start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException ignored) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Did not receive all required topics"); diff --git a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java index 3a991d9471f16..01a61be84e587 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java @@ -85,6 +85,9 @@ public GenericMQTTThingHandler(Thing thing, MqttChannelStateDescriptionProvider */ @Override protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) { + // availability topics are also started asynchronously, so no problem here + clearAllAvailabilityTopics(); + initializeAvailabilityTopicsFromConfig(); return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0)) .collect(FutureCollector.allOf()).thenRun(this::calculateThingStatus); } @@ -142,15 +145,7 @@ protected ChannelState createChannelState(ChannelConfig channelConfig, ChannelUI @Override public void initialize() { - GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class); - - String availabilityTopic = config.availabilityTopic; - - if (availabilityTopic != null) { - addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable); - } else { - clearAllAvailabilityTopics(); - } + initializeAvailabilityTopicsFromConfig(); List configErrors = new ArrayList<>(); for (Channel channel : thing.getChannels()) { @@ -194,4 +189,16 @@ protected void updateThingStatus(boolean messageReceived, Optional avai updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE); } } + + private void initializeAvailabilityTopicsFromConfig() { + GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class); + + String availabilityTopic = config.availabilityTopic; + + if (availabilityTopic != null) { + addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable); + } else { + clearAllAvailabilityTopics(); + } + } } diff --git a/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java b/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java index 13cf9946e686c..d9f41464ae229 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java @@ -192,4 +192,16 @@ public void processMessage() { verify(callback).stateUpdated(eq(textChannelUID), argThat(arg -> "UPDATE".equals(arg.toString()))); assertThat(textValue.getChannelState().toString(), is("UPDATE")); } + + @Test + public void handleBridgeStatusChange() { + Configuration config = new Configuration(); + config.put("availabilityTopic", "test/LWT"); + when(thing.getConfiguration()).thenReturn(config); + thingHandler.initialize(); + thingHandler + .bridgeStatusChanged(new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null)); + thingHandler.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.ONLINE, ThingStatusDetail.NONE, null)); + verify(connection, times(2)).subscribe(eq("test/LWT"), any()); + } }