Skip to content

Commit

Permalink
[backport] avoid double subscriptions #612 (#624)
Browse files Browse the repository at this point in the history
* Fixed #610: Subscriptions not removed from session on unsubscribe (#612)

Co-authored-by: Hylke van der Schaaf <[email protected]>
  • Loading branch information
andsel and hylkevds authored Aug 8, 2021
1 parent b1c6682 commit 0d24147
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.15.1:
[fix] avoid double subscription (#612)
[leak] fixed buffer leak in processing of PUBREC for QoS2 flow (#609)

Version 0.12:
Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private MqttSubAckMessage doAckMessageFromValidateFilters(List<MqttTopicSubscrip

public void unsubscribe(List<String> topics, MQTTConnection mqttConnection, int messageId) {
final String clientID = mqttConnection.getClientId();
final Session session = sessionRegistry.retrieve(clientID);
for (String t : topics) {
Topic topic = new Topic(t);
boolean validTopic = topic.isValid();
Expand All @@ -147,8 +148,7 @@ public void unsubscribe(List<String> topics, MQTTConnection mqttConnection, int
LOG.trace("Removing subscription topic={}", topic);
subscriptions.removeSubscription(topic, clientID);

// TODO remove the subscriptions to Session
// clientSession.unsubscribeFrom(topic);
session.removeSubscription(topic);

String username = NettyUtils.userName(mqttConnection.channel);
interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username);
Expand Down
6 changes: 5 additions & 1 deletion broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static final class Will {
private final Queue<SessionRegistry.EnqueuedMessage> sessionQueue;
private final AtomicReference<SessionStatus> status = new AtomicReference<>(SessionStatus.DISCONNECTED);
private MQTTConnection mqttConnection;
private List<Subscription> subscriptions = new ArrayList<>();
private final Set<Subscription> subscriptions = new HashSet<>();
private final Map<Integer, SessionRegistry.EnqueuedMessage> inflightWindow = new HashMap<>();
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private final Map<Integer, MqttPublishMessage> qos2Receiving = new HashMap<>();
Expand Down Expand Up @@ -150,6 +150,10 @@ public void addSubscriptions(List<Subscription> newSubscriptions) {
subscriptions.addAll(newSubscriptions);
}

public void removeSubscription(Topic topic) {
subscriptions.remove(new Subscription(clientId, topic, MqttQoS.EXACTLY_ONCE));
}

public boolean hasWill() {
return will != null;
}
Expand Down
17 changes: 16 additions & 1 deletion broker/src/test/java/io/moquette/broker/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
import java.util.concurrent.ConcurrentLinkedQueue;

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import io.moquette.broker.subscriptions.Subscription;
import java.util.Arrays;
import org.assertj.core.api.Assertions;
import static org.junit.jupiter.api.Assertions.*;

public class SessionTest {

private static final String CLIENT_ID = "Subscriber";

private EmbeddedChannel testChannel;
private Session client;
private Queue<SessionRegistry.EnqueuedMessage> queuedMessages;
Expand All @@ -25,7 +30,7 @@ public class SessionTest {
public void setUp() {
testChannel = new EmbeddedChannel();
queuedMessages = new ConcurrentLinkedQueue<>();
client = new Session("Subscriber", true, null, queuedMessages);
client = new Session(CLIENT_ID, true, null, queuedMessages);
createConnection(client);
}

Expand Down Expand Up @@ -101,6 +106,16 @@ public void testSecondResendOfANotAckedMessage() throws InterruptedException {
ConnectionTestUtils.verifyReceivePublish(testChannel, destinationTopic.toString(), "Message not ACK-ed at first send!");
}

@Test
public void testRemoveSubscription() {
client.addSubscriptions(Arrays.asList(new Subscription(CLIENT_ID, new Topic("topic/one"), MqttQoS.AT_MOST_ONCE)));
Assertions.assertThat(client.getSubscriptions()).hasSize(1);
client.addSubscriptions(Arrays.asList(new Subscription(CLIENT_ID, new Topic("topic/one"), MqttQoS.EXACTLY_ONCE)));
Assertions.assertThat(client.getSubscriptions()).hasSize(1);
client.removeSubscription(new Topic("topic/one"));
Assertions.assertThat(client.getSubscriptions()).isEmpty();
}

private void createConnection(Session client) {
BrokerConfiguration brokerConfiguration = new BrokerConfiguration(true, false, false, false);
MQTTConnection mqttConnection = new MQTTConnection(testChannel, brokerConfiguration, null, null, null);
Expand Down

0 comments on commit 0d24147

Please sign in to comment.