Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mqtt] Fix availability topics subscription after Brige Restart #9851

Merged
merged 1 commit into from
Dec 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package org.openhab.binding.mqtt.generic;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
Expand All @@ -23,12 +22,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.generic.utils.FutureCollector;
import org.openhab.binding.mqtt.generic.values.OnOffValue;
import org.openhab.binding.mqtt.generic.values.Value;
import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
Expand Down Expand Up @@ -195,19 +192,7 @@ public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
// We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
// class.
try {
Collection<CompletableFuture<@Nullable Void>> futures = availabilityStates.values().stream().map(s -> {
if (s != null) {
return s.start(connection, scheduler, 0);
}
return CompletableFuture.allOf();
}).collect(Collectors.toList());

futures.add(start(connection));

futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage());
return null;
}).get(subscribeTimeout, TimeUnit.MILLISECONDS);
start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ignored) {
Comment on lines +195 to 196
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if you made sure that the offline status would include the exception message, just like it did before.

updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
"Did not receive all required topics");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public GenericMQTTThingHandler(Thing thing, MqttChannelStateDescriptionProvider
*/
@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
// availability topics are also started asynchronously, so no problem here
clearAllAvailabilityTopics();
initializeAvailabilityTopicsFromConfig();
return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0))
.collect(FutureCollector.allOf()).thenRun(this::calculateThingStatus);
}
Expand Down Expand Up @@ -142,15 +145,7 @@ protected ChannelState createChannelState(ChannelConfig channelConfig, ChannelUI

@Override
public void initialize() {
GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);

String availabilityTopic = config.availabilityTopic;

if (availabilityTopic != null) {
addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
} else {
clearAllAvailabilityTopics();
}
initializeAvailabilityTopicsFromConfig();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, due to your changes in start above, topic subscription will occur automatically as part of the call to super.initialize(). So you don't need to call initializeAvailabilityTopicsFromConfig() here at all. In fact it might be better if you don't since that way config errors will prevent further initialization, as they should.

You should also make changes in AbstractMQTTThingHandler.bridgeStatusChanged to make sure that the handler isn't offline due to config errors before it tries initializing a connection.


List<ChannelUID> configErrors = new ArrayList<>();
for (Channel channel : thing.getChannels()) {
Expand Down Expand Up @@ -194,4 +189,16 @@ protected void updateThingStatus(boolean messageReceived, Optional<Boolean> avai
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
}
}

private void initializeAvailabilityTopicsFromConfig() {
GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);

String availabilityTopic = config.availabilityTopic;

if (availabilityTopic != null) {
addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
} else {
clearAllAvailabilityTopics();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,16 @@ public void processMessage() {
verify(callback).stateUpdated(eq(textChannelUID), argThat(arg -> "UPDATE".equals(arg.toString())));
assertThat(textValue.getChannelState().toString(), is("UPDATE"));
}

@Test
public void handleBridgeStatusChange() {
Configuration config = new Configuration();
config.put("availabilityTopic", "test/LWT");
when(thing.getConfiguration()).thenReturn(config);
thingHandler.initialize();
thingHandler
.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null));
thingHandler.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.ONLINE, ThingStatusDetail.NONE, null));
verify(connection, times(2)).subscribe(eq("test/LWT"), any());
}
}