Skip to content

Commit

Permalink
Updated the MetricEventsPublisher contract to use Subscription instea…
Browse files Browse the repository at this point in the history
…d of a removeListener() method.
  • Loading branch information
Nitesh Kant committed Jun 3, 2014
1 parent 10276d7 commit 3abdab8
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public C build() {
if (null != eventListenersFactory) {
MetricEventsListener<? extends ClientMetricsEvent> listener =
newMetricsListener(eventListenersFactory, client);
client.addListener(listener);
client.subscribe(listener);
}
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.Subscription;

import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -196,12 +197,8 @@ protected PipelineConfigurator<O, I> adaptPipelineConfigurator(PipelineConfigura
}

@Override
public void addListener(MetricEventsListener<? extends ClientMetricsEvent> listener) {
eventsSubject.addListener(listener);
public Subscription subscribe(MetricEventsListener<? extends ClientMetricsEvent> listener) {
return eventsSubject.subscribe(listener);
}

@Override
public boolean removeListener(MetricEventsListener<? extends ClientMetricsEvent> listener) {
return eventsSubject.removeListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
package io.reactivex.netty.metrics;

import rx.Subscription;

/**
* @author Nitesh Kant
*/
public interface MetricEventsPublisher<E extends MetricsEvent> {

void addListener(MetricEventsListener<? extends E> listener);
Subscription subscribe(MetricEventsListener<? extends E> listener);

boolean removeListener(MetricEventsListener<? extends E> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package io.reactivex.netty.metrics;

import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.CopyOnWriteArrayList;

Expand Down Expand Up @@ -123,13 +126,14 @@ public void onEvent(E event, Object value, long duration) {
}

@Override
public void addListener(MetricEventsListener<? extends E> listener) {
public Subscription subscribe(final MetricEventsListener<? extends E> listener) {
listeners.add(listener);
}

@Override
public boolean removeListener(MetricEventsListener<? extends E> listener) {
return listeners.remove(listener);
return Subscriptions.create(new Action0() {
@Override
public void call() {
listeners.remove(listener);
}
});
}

protected ListenerInvocationException handleListenerError(ListenerInvocationException exception,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.reactivex.netty.metrics.MetricEventsListenerFactory;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.server.ConnectionBasedServerBuilder;
import io.reactivex.netty.server.RxServer;

/**
* A convenience builder to create instances of {@link HttpServer}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfiguratorComposite;
import rx.Subscription;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand Down Expand Up @@ -156,13 +157,8 @@ public int getServerPort() {
}

@Override
public void addListener(MetricEventsListener<? extends ServerMetricsEvent> listener) {
eventsSubject.addListener(listener);
}

@Override
public boolean removeListener(MetricEventsListener<? extends ServerMetricsEvent> listener) {
return eventsSubject.removeListener(listener);
public Subscription subscribe(MetricEventsListener<? extends ServerMetricsEvent> listener) {
return eventsSubject.subscribe(listener);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public S build() {
if (null != eventListenersFactory) {
MetricEventsListener<? extends ServerMetricsEvent> listener = newMetricsListener(eventListenersFactory,
server);
server.addListener(listener);
server.subscribe(listener);
}
return server;
}
Expand Down

0 comments on commit 3abdab8

Please sign in to comment.