Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

NC-1564: Only subscription owner can unsubscribe #36

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,26 @@ private void mapSubscriptionToConnection(final String connectionId, final Long s
}

public boolean unsubscribe(final UnsubscribeRequest request) {
LOG.debug("Unsubscribe request subscriptionId = {}", request.getSubscriptionId());
final Long subscriptionId = request.getSubscriptionId();
final String connectionId = request.getConnectionId();

if (!subscriptions.containsKey(request.getSubscriptionId())) {
throw new SubscriptionNotFoundException(request.getSubscriptionId());
LOG.debug("Unsubscribe request subscriptionId = {}", subscriptionId);

if (!subscriptions.containsKey(subscriptionId)
|| !connectionOwnsSubscription(subscriptionId, connectionId)) {
throw new SubscriptionNotFoundException(subscriptionId);
}

destroySubscription(request.getSubscriptionId(), request.getConnectionId());
destroySubscription(subscriptionId, connectionId);

return true;
}

private boolean connectionOwnsSubscription(Long subscriptionId, String connectionId) {
return connectionSubscriptionsMap.get(connectionId) != null
&& connectionSubscriptionsMap.get(connectionId).contains(subscriptionId);
}

private void destroySubscription(final long subscriptionId, final String connectionId) {
subscriptions.remove(subscriptionId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.consensys.pantheon.ethereum.jsonrpc.websocket.subscription;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.hamcrest.CoreMatchers.both;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand All @@ -13,6 +14,7 @@
import net.consensys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing.SyncingSubscription;

import java.util.List;
import java.util.UUID;

import org.junit.Before;
import org.junit.Rule;
Expand All @@ -33,8 +35,7 @@ public void before() {

@Test
public void subscribeShouldCreateSubscription() {
final SubscribeRequest subscribeRequest =
new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID);
final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID);

final Long subscriptionId = subscriptionManager.subscribe(subscribeRequest);

Expand All @@ -49,8 +50,7 @@ public void subscribeShouldCreateSubscription() {

@Test
public void unsubscribeExistingSubscriptionShouldDestroySubscription() {
final SubscribeRequest subscribeRequest =
new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID);
final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID);
final Long subscriptionId = subscriptionManager.subscribe(subscribeRequest);

assertThat(subscriptionManager.subscriptions().get(subscriptionId)).isNotNull();
Expand All @@ -76,8 +76,7 @@ public void unsubscribeAbsentSubscriptionShouldThrowSubscriptionNotFoundExceptio

@Test
public void shouldAddSubscriptionToNewConnection() {
final SubscribeRequest subscribeRequest =
new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID);
final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID);

subscriptionManager.subscribe(subscribeRequest);

Expand All @@ -90,8 +89,7 @@ public void shouldAddSubscriptionToNewConnection() {

@Test
public void shouldAddSubscriptionToExistingConnection() {
final SubscribeRequest subscribeRequest =
new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID);
final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID);

subscriptionManager.subscribe(subscribeRequest);

Expand All @@ -110,8 +108,7 @@ public void shouldAddSubscriptionToExistingConnection() {

@Test
public void shouldRemoveSubscriptionFromExistingConnection() {
final SubscribeRequest subscribeRequest =
new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID);
final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID);

final Long subscriptionId1 = subscriptionManager.subscribe(subscribeRequest);

Expand Down Expand Up @@ -140,8 +137,7 @@ public void shouldRemoveSubscriptionFromExistingConnection() {

@Test
public void shouldRemoveConnectionWithSingleSubscriptions() {
final SubscribeRequest subscribeRequest =
new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID);
final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID);

final Long subscriptionId1 = subscriptionManager.subscribe(subscribeRequest);

Expand Down Expand Up @@ -188,17 +184,37 @@ public void getSubscriptionsOfWrongTypeReturnEmptyList() {
}

@Test
public void unsubscribeWithUnknownConnectionId() {
final SubscribeRequest subscribeRequestOne =
new SubscribeRequest(SubscriptionType.NEW_BLOCK_HEADERS, null, true, CONNECTION_ID);
final long subscriptionId = subscriptionManager.subscribe(subscribeRequestOne);
public void unsubscribeOthersSubscriptionsNotHavingOwnSubscriptionShouldReturnNotFound() {
SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID);
Long subscriptionId = subscriptionManager.subscribe(subscribeRequest);

final boolean success =
subscriptionManager.unsubscribe(
new UnsubscribeRequest(subscriptionId, "unknown-connection-id"));
UnsubscribeRequest unsubscribeRequest =
new UnsubscribeRequest(subscriptionId, UUID.randomUUID().toString());

assertThat(success).isTrue();
final Throwable thrown =
catchThrowable(() -> subscriptionManager.unsubscribe(unsubscribeRequest));
assertThat(thrown).isInstanceOf(SubscriptionNotFoundException.class);
}

// TODO vertx event bus testing for response
@Test
public void unsubscribeOthersSubscriptionsHavingOwnSubscriptionShouldReturnNotFound() {
String ownConnectionId = UUID.randomUUID().toString();
SubscribeRequest ownSubscribeRequest = subscribeRequest(ownConnectionId);
subscriptionManager.subscribe(ownSubscribeRequest);

String otherConnectionId = UUID.randomUUID().toString();
SubscribeRequest otherSubscribeRequest = subscribeRequest(otherConnectionId);
Long otherSubscriptionId = subscriptionManager.subscribe(otherSubscribeRequest);

UnsubscribeRequest unsubscribeRequest =
new UnsubscribeRequest(otherSubscriptionId, ownConnectionId);

final Throwable thrown =
catchThrowable(() -> subscriptionManager.unsubscribe(unsubscribeRequest));
assertThat(thrown).isInstanceOf(SubscriptionNotFoundException.class);
}

private SubscribeRequest subscribeRequest(String connectionId) {
return new SubscribeRequest(SubscriptionType.SYNCING, null, null, connectionId);
}
}