Skip to content

Commit

Permalink
Merge pull request #1802 from simonbasle/hasObservers
Browse files Browse the repository at this point in the history
add hasObservers method to Subjects (#1772)
  • Loading branch information
benjchristensen committed Oct 28, 2014
2 parents 7924a3e + 8690b18 commit e7fa614
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,14 @@ public void onNext(T t) {
emit(state.nl.next(t));
}
}


@Override
public boolean hasObservers() {
synchronized (state.guard) {
return state.observerRef != null;
}
}

@SuppressWarnings("rawtypes")
private final static Observer EMPTY_OBSERVER = new Observer() {

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/rx/internal/operators/OperatorReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,9 @@ public void onCompleted() {
subject.onCompleted();
}

@Override
public boolean hasObservers() {
return this.subject.hasObservers();
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,8 @@ public void onNext(T v) {
lastValue = nl.next(v);
}

@Override
public boolean hasObservers() {
return state.observers().length > 0;
}
}
7 changes: 6 additions & 1 deletion src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,13 @@ public void onNext(T v) {
}
}
}

/* test support */ int subscriberCount() {
return state.observers().length;
}

@Override
public boolean hasObservers() {
return state.observers().length > 0;
}
}
5 changes: 5 additions & 0 deletions src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,9 @@ public void onNext(T v) {
bo.onNext(v);
}
}

@Override
public boolean hasObservers() {
return state.observers().length > 0;
}
}
7 changes: 6 additions & 1 deletion src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,12 @@ public void onCompleted() {
/* Support test. */int subscriberCount() {
return ssm.state.observers.length;
}


@Override
public boolean hasObservers() {
return ssm.observers().length > 0;
}

private boolean caughtUp(SubjectObserver<? super T> o) {
if (!o.caughtUp) {
o.caughtUp = true;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/rx/subjects/SerializedSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/
public class SerializedSubject<T, R> extends Subject<T, R> {
private final SerializedObserver<T> observer;
private final Subject<T, R> actual;

public SerializedSubject(final Subject<T, R> actual) {
super(new OnSubscribe<R>() {
Expand All @@ -44,6 +45,7 @@ public void call(Subscriber<? super R> child) {
}

});
this.actual = actual;
this.observer = new SerializedObserver<T>(actual);
}

Expand All @@ -62,4 +64,8 @@ public void onNext(T t) {
observer.onNext(t);
}

@Override
public boolean hasObservers() {
return actual.hasObservers();
}
}
6 changes: 6 additions & 0 deletions src/main/java/rx/subjects/Subject.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ public abstract class Subject<T, R> extends Observable<R> implements Observer<T>
protected Subject(OnSubscribe<R> onSubscribe) {
super(onSubscribe);
}

/**
* Indicates whether the {@link Subject} has {@link Observer Observers} subscribed to it.
* @return true if there is at least one Observer subscribed to this Subject, false otherwise
*/
public abstract boolean hasObservers();
}
11 changes: 8 additions & 3 deletions src/main/java/rx/subjects/TestSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private void _onCompleted() {

/**
* Schedule a call to {@code onCompleted} relative to "now()" +n milliseconds in the future.
*
*
* @param timeInMilliseconds
* the number of milliseconds in the future relative to "now()" at which to call {@code onCompleted}
*/
Expand Down Expand Up @@ -119,7 +119,7 @@ private void _onError(final Throwable e) {

/**
* Schedule a call to {@code onError} relative to "now()" +n milliseconds in the future.
*
*
* @param e
* the {@code Throwable} to pass to the {@code onError} method
* @param timeInMilliseconds
Expand Down Expand Up @@ -152,7 +152,7 @@ private void _onNext(T v) {

/**
* Schedule a call to {@code onNext} relative to "now()" +n milliseconds in the future.
*
*
* @param v
* the item to emit
* @param timeInMilliseconds
Expand All @@ -168,4 +168,9 @@ public void call() {

}, timeInMilliseconds, TimeUnit.MILLISECONDS);
}

@Override
public boolean hasObservers() {
return state.observers().length > 0;
}
}
2 changes: 2 additions & 0 deletions src/test/java/rx/subjects/BehaviorSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.subjects;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -370,6 +371,7 @@ public void testTakeOneSubscriber() {
verify(o, never()).onError(any(Throwable.class));

assertEquals(0, source.subscriberCount());
assertFalse(source.hasObservers());
}

@Test
Expand Down

0 comments on commit e7fa614

Please sign in to comment.