Skip to content

Commit

Permalink
Merge pull request ReactiveX#231 from mairbek/multicast
Browse files Browse the repository at this point in the history
Multicast
  • Loading branch information
benjchristensen committed Apr 18, 2013
2 parents bd0c601 + 1cf92e3 commit 3eb5602
Show file tree
Hide file tree
Showing 6 changed files with 462 additions and 119 deletions.
29 changes: 28 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationConcat;
Expand All @@ -49,6 +50,7 @@
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMostRecent;
import rx.operators.OperatorMulticast;
import rx.operators.OperationNext;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
Expand All @@ -72,6 +74,7 @@
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.Subject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
Expand Down Expand Up @@ -585,6 +588,17 @@ public void call(Object args) {
});
}

/**
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*
* @param subject the subject to push source elements into.
* @param <R> result type
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) {
return multicast(this, subject);
}

/**
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
*
Expand Down Expand Up @@ -2072,9 +2086,22 @@ public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {
return OperationMostRecent.mostRecent(source, initialValue);
}

/**
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*
* @param source the source sequence whose elements will be pushed into the specified subject.
* @param subject the subject to push source elements into.
* @param <T> source type
* @param <R> result type
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
return OperatorMulticast.multicast(source, subject);
}

/**
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
*
*
* @param that
* the source Observable
* @return The single element in the observable sequence.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observables;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;

public abstract class ConnectableObservable<T> extends Observable<T> {

protected ConnectableObservable(Func1<Observer<T>, Subscription> onSubscribe) {
super(onSubscribe);
}

public abstract Subscription connect();

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.Subject;
import rx.subjects.DefaultSubject;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
Expand Down Expand Up @@ -174,7 +174,7 @@ public Boolean call(Integer input)

@Test
public void testTakeWhileOnSubject1() {
Subject<Integer> s = Subject.create();
DefaultSubject<Integer> s = DefaultSubject.create();
Observable<Integer> w = (Observable<Integer>) s;
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
{
Expand Down
243 changes: 243 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorMulticast.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subjects.DefaultSubject;
import rx.subjects.Subject;
import rx.util.functions.Func1;

import static org.mockito.Mockito.*;

public class OperatorMulticast {
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
return new MulticastConnectableObservable<T, R>(source, subject);
}

private static class MulticastConnectableObservable<T, R> extends ConnectableObservable<R> {
private final Object lock = new Object();

private final Observable<T> source;
private final Subject<T, R> subject;

private Subscription subscription;

public MulticastConnectableObservable(Observable<T> source, final Subject<T, R> subject) {
super(new Func1<Observer<R>, Subscription>() {
@Override
public Subscription call(Observer<R> observer) {
return subject.subscribe(observer);
}
});
this.source = source;
this.subject = subject;
}

public Subscription connect() {
synchronized (lock) {
if (subscription == null) {
subscription = source.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
subject.onCompleted();
}

@Override
public void onError(Exception e) {
subject.onError(e);
}

@Override
public void onNext(T args) {
subject.onNext(args);
}
});
}
}


return new Subscription() {
@Override
public void unsubscribe() {
synchronized (lock) {
if (subscription != null) {
subscription.unsubscribe();
subscription = null;
}
}
}
};
}


}

public static class UnitTest {

@Test
public void testMulticast() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);

source.sendOnNext("one");
source.sendOnNext("two");

multicasted.connect();

source.sendOnNext("three");
source.sendOnNext("four");
source.sendOnCompleted();

verify(observer, never()).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, times(1)).onNext("four");
verify(observer, times(1)).onCompleted();

}

@Test
public void testMulticastConnectTwice() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);

source.sendOnNext("one");

multicasted.connect();
multicasted.connect();

source.sendOnNext("two");
source.sendOnCompleted();

verify(observer, never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onCompleted();

}

@Test
public void testMulticastDisconnect() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);

source.sendOnNext("one");

Subscription connection = multicasted.connect();
source.sendOnNext("two");

connection.unsubscribe();
source.sendOnNext("three");

multicasted.connect();
source.sendOnNext("four");
source.sendOnCompleted();

verify(observer, never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, never()).onNext("three");
verify(observer, times(1)).onNext("four");
verify(observer, times(1)).onCompleted();

}


private static class TestObservable extends Observable<String> {

Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
// Do nothing
}

@Override
public void onError(Exception e) {
// Do nothing
}

@Override
public void onNext(String args) {
// Do nothing
}
};
Subscription s = new Subscription() {
@Override
public void unsubscribe() {
observer = new Observer<String>() {
@Override
public void onCompleted() {
// Do nothing
}

@Override
public void onError(Exception e) {
// Do nothing
}

@Override
public void onNext(String args) {
// Do nothing
}
};
}
};

public TestObservable() {
}

/* used to simulate subscription */
public void sendOnCompleted() {
observer.onCompleted();
}

/* used to simulate subscription */
public void sendOnNext(String value) {
observer.onNext(value);
}

/* used to simulate subscription */
public void sendOnError(Exception e) {
observer.onError(e);
}

@Override
public Subscription subscribe(final Observer<String> observer) {
this.observer = observer;
return s;
}

}

}
}
Loading

0 comments on commit 3eb5602

Please sign in to comment.