Skip to content

Commit

Permalink
[mqtt] Fix avail topics subscription after Brige Restart
Browse files Browse the repository at this point in the history
Fixes #9850

Signed-off-by: Florian Albrecht <[email protected]>
  • Loading branch information
albrechtf committed Jan 17, 2021
1 parent de3cacc commit a956bce
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package org.openhab.binding.mqtt.generic;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
Expand All @@ -23,12 +22,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.generic.utils.FutureCollector;
import org.openhab.binding.mqtt.generic.values.OnOffValue;
import org.openhab.binding.mqtt.generic.values.Value;
import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
Expand Down Expand Up @@ -195,19 +192,7 @@ public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
// We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
// class.
try {
Collection<CompletableFuture<@Nullable Void>> futures = availabilityStates.values().stream().map(s -> {
if (s != null) {
return s.start(connection, scheduler, 0);
}
return CompletableFuture.allOf();
}).collect(Collectors.toList());

futures.add(start(connection));

futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage());
return null;
}).get(subscribeTimeout, TimeUnit.MILLISECONDS);
start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ignored) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
"Did not receive all required topics");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public GenericMQTTThingHandler(Thing thing, MqttChannelStateDescriptionProvider
*/
@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
// availability topics are also started asynchronously, so no problem here
clearAllAvailabilityTopics();
initializeAvailabilityTopicsFromConfig();
return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0))
.collect(FutureCollector.allOf()).thenRun(this::calculateThingStatus);
}
Expand Down Expand Up @@ -142,15 +145,7 @@ protected ChannelState createChannelState(ChannelConfig channelConfig, ChannelUI

@Override
public void initialize() {
GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);

String availabilityTopic = config.availabilityTopic;

if (availabilityTopic != null) {
addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
} else {
clearAllAvailabilityTopics();
}
initializeAvailabilityTopicsFromConfig();

List<ChannelUID> configErrors = new ArrayList<>();
for (Channel channel : thing.getChannels()) {
Expand Down Expand Up @@ -194,4 +189,16 @@ protected void updateThingStatus(boolean messageReceived, Optional<Boolean> avai
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
}
}

private void initializeAvailabilityTopicsFromConfig() {
GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);

String availabilityTopic = config.availabilityTopic;

if (availabilityTopic != null) {
addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
} else {
clearAllAvailabilityTopics();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,16 @@ public void processMessage() {
verify(callback).stateUpdated(eq(textChannelUID), argThat(arg -> "UPDATE".equals(arg.toString())));
assertThat(textValue.getChannelState().toString(), is("UPDATE"));
}

@Test
public void handleBridgeStatusChange() {
Configuration config = new Configuration();
config.put("availabilityTopic", "test/LWT");
when(thing.getConfiguration()).thenReturn(config);
thingHandler.initialize();
thingHandler
.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null));
thingHandler.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.ONLINE, ThingStatusDetail.NONE, null));
verify(connection, times(2)).subscribe(eq("test/LWT"), any());
}
}

0 comments on commit a956bce

Please sign in to comment.