Skip to content

Commit

Permalink
fix: prune subscriptions trie when topics are unsubscribed
Browse files Browse the repository at this point in the history
  • Loading branch information
saranyailla committed Nov 20, 2024
1 parent 2554a42 commit aaf4e42
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,57 @@ public boolean remove(String topic, K cb) {
* @return if changed after removal
*/
public boolean remove(String topic, Set<K> cbs) {
SubscriptionTrie<K> sub = lookup(topic);
if (sub == null) {
return false;
SubscriptionTrie<K> current = this;
SubscriptionTrie<K> prevNodeWithCallbacks = current;
String[] topicLevels = topic.split(TOPIC_LEVEL_SEPARATOR);
String prevTopicLevel = topicLevels[0];
for (String topicLevel : topicLevels) {
if (!current.get(topicLevel).isEmpty()) {
prevNodeWithCallbacks = current;
prevTopicLevel = topicLevel;
}
current = current.children.get(topicLevel);
}

if (!canRemoveNode(current)) {
boolean removedCallbacks = current.subscriptionCallbacks.removeAll(cbs);
// If topic is still a prefix of another registered topic in the trie, do not remove and return
if (!canRemoveNode(current)) {
return removedCallbacks;
}
}

// If the current topic is neither a prefix of another topic nor has callbacks, prune the trie such
// that all the topic levels of this topic that don't have children or callbacks are removed.
prevNodeWithCallbacks.children.remove(prevTopicLevel);
// since lastNodeWithCallbacks becomes a leaf, prune it and possibly the whole trie if it has no children or
// callbacks registered
if (canRemoveNode(prevNodeWithCallbacks)) {
pruneRecursively(topicLevels);
}
return true;
}

private boolean canRemoveNode(SubscriptionTrie<K> node) {
// returns false if the topic level is not prefix of another topic or has callbacks registered
return node.children.isEmpty() && node.subscriptionCallbacks.isEmpty();

}

void pruneRecursively(String... topicLevels) {
SubscriptionTrie<K> current = this;
SubscriptionTrie<K> prev = current;
for (int i = 0; i < topicLevels.length; i++) {
if (current == null) {
return; // nothing to prune
}
if (i > 0 && canRemoveNode(current)) {
prev.children.remove(topicLevels[i - 1]);
pruneRecursively(topicLevels);
}
prev = current;
current = current.children.get(topicLevels[i]);
}
return sub.subscriptionCallbacks.removeAll(cbs);
}

/**
Expand Down Expand Up @@ -183,3 +229,4 @@ public static boolean isWildcard(String topic) {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,51 @@ public void GIVEN_subscription_wildcard_WHEN_remove_topic_THEN_no_matches() {
assertEquals(0, trie.size());
}

@Test
public void GIVEN_subscriptions_with_wildcards_WHEN_remove_topics_THEN_clean_up_trie() {
assertEquals(0, trie.size());
SubscriptionCallback cb1 = generateSubscriptionCallback();
SubscriptionCallback cb2 = generateSubscriptionCallback();
SubscriptionCallback cb3 = generateSubscriptionCallback();
String topic = "foo/+/bar/#";
trie.add("bar", cb1);
trie.add(topic, cb1);
trie.add(topic, cb2);
// Topic is not registered with the callback, mark it as removed when requested to remove
assertThat("remove topic", trie.remove(topic, cb3), is(false));
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2));

trie.add("foo/#", cb3);
trie.add("foo/+", cb2);

assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2, cb3));
assertEquals(5, trie.size());

assertThat("remove topic", trie.remove("foo/+", cb2), is(true));
assertEquals(4, trie.size());
assertThat(trie.get("foo/+"), containsInAnyOrder(cb3)); // foo/+ still matches with foo/#
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2, cb3)); // foo/+/bar/# still exists

assertThat("remove topic", trie.remove("foo/#", cb3), is(true));
assertFalse(trie.containsKey("foo/#"));
assertThat(trie.get("foo/#"), is(empty()));
assertThat(trie.get("foo/+"), is(empty())); // foo/+ doesn't match with any existing topic
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2)); // foo/+/bar/# still exists
assertEquals(3, trie.size());

assertThat("remove topic", trie.remove(topic, cb1), is(true));
assertThat(trie.get(topic), contains(cb2));
assertEquals(2, trie.size());
assertTrue(trie.containsKey("foo/+"));
assertTrue(trie.containsKey("foo/+/bar/#"));

assertThat("remove topic", trie.remove(topic, cb2), is(true));
assertThat(trie.get(topic), is(empty()));
assertEquals(1, trie.size());
assertFalse(trie.containsKey("foo/+"));
assertFalse(trie.containsKey("foo/+/bar/#"));
}

@Test
void GIVEN_topics_WHEN_isWildcard_THEN_returns_whether_it_uses_wildcard() {
assertTrue(SubscriptionTrie.isWildcard("+"));
Expand Down

0 comments on commit aaf4e42

Please sign in to comment.