Skip to content

Commit

Permalink
Merge pull request ReactiveX#425 from benjchristensen/pull-407-refCount
Browse files Browse the repository at this point in the history
Manual Merge of Pull Request ReactiveX#407
  • Loading branch information
benjchristensen committed Oct 9, 2013
2 parents b530357 + 34a6e25 commit f2ec21b
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 1 deletion.
11 changes: 10 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.AsyncSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
Expand Down Expand Up @@ -3657,6 +3658,14 @@ public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription that contains the last notification only.
* @return a {@link ConnectableObservable}
*/
public ConnectableObservable<T> publishLast() {
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
}

/**
* Synonymous with <code>reduce()</code>.
* <p>
Expand Down Expand Up @@ -4414,7 +4423,7 @@ public Boolean call(T t) {
* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*
* NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface.
*
*
* @param o
* @return {@code true} if the given function is an internal implementation, and {@code false} otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.OperationRefCount;
import rx.util.functions.Func1;

/**
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
Expand Down Expand Up @@ -46,4 +48,12 @@ protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
*/
public abstract Subscription connect();

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
* @return a {@link Observable}
*/
public Observable<T> refCount() {
return Observable.create(OperationRefCount.refCount(this));
}
}
66 changes: 66 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationRefCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
*/
public final class OperationRefCount<T> {
public static <T> Observable.OnSubscribeFunc<T> refCount(ConnectableObservable<T> connectableObservable) {
return new RefCount<T>(connectableObservable);
}

private static class RefCount<T> implements Observable.OnSubscribeFunc<T> {
private final ConnectableObservable<T> innerConnectableObservable;
private final Object gate = new Object();
private int count = 0;
private Subscription connection = null;

public RefCount(ConnectableObservable<T> innerConnectableObservable) {
this.innerConnectableObservable = innerConnectableObservable;
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
final Subscription subscription = innerConnectableObservable.subscribe(observer);
synchronized (gate) {
if (count++ == 0) {
connection = innerConnectableObservable.connect();
}
}
return Subscriptions.create(new Action0() {
@Override
public void call() {
synchronized (gate) {
if (--count == 0) {
connection.unsubscribe();
connection = null;
}
}
subscription.unsubscribe();
}
});
}
}
}
42 changes: 42 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,48 @@ public void call(String v) {
}
}

@Test
public void testPublishLast() throws InterruptedException {
final AtomicInteger count = new AtomicInteger();
ConnectableObservable<String> connectable = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> observer) {
count.incrementAndGet();
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
observer.onNext("first");
observer.onNext("last");
observer.onCompleted();
}
}).start();
return subscription;
}
}).publishLast();

// subscribe once
final CountDownLatch latch = new CountDownLatch(1);
connectable.subscribe(new Action1<String>() {
@Override
public void call(String value) {
assertEquals("last", value);
latch.countDown();
}
});

// subscribe twice
connectable.subscribe(new Action1<String>() {
@Override
public void call(String _) {}
});

Subscription subscription = connectable.connect();
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertEquals(1, count.get());
subscription.unsubscribe();
}

@Test
public void testReplay() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
Expand Down
103 changes: 103 additions & 0 deletions rxjava-core/src/test/java/rx/RefCountTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package rx;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import rx.concurrency.TestScheduler;
import rx.util.functions.Action1;

public class RefCountTest {

@Test
public void testRefCount() {
TestScheduler s = new TestScheduler();
Observable<Long> interval = Observable.interval(100, TimeUnit.MILLISECONDS, s).publish().refCount();

// subscribe list1
final List<Long> list1 = new ArrayList<Long>();
Subscription s1 = interval.subscribe(new Action1<Long>() {

@Override
public void call(Long t1) {
list1.add(t1);
}

});
s.advanceTimeBy(200, TimeUnit.MILLISECONDS);

assertEquals(2, list1.size());
assertEquals(0L, list1.get(0).longValue());
assertEquals(1L, list1.get(1).longValue());

// subscribe list2
final List<Long> list2 = new ArrayList<Long>();
Subscription s2 = interval.subscribe(new Action1<Long>() {

@Override
public void call(Long t1) {
list2.add(t1);
}

});
s.advanceTimeBy(300, TimeUnit.MILLISECONDS);

// list 1 should have 5 items
assertEquals(5, list1.size());
assertEquals(2L, list1.get(2).longValue());
assertEquals(3L, list1.get(3).longValue());
assertEquals(4L, list1.get(4).longValue());

// list 2 should only have 3 items
assertEquals(3, list2.size());
assertEquals(2L, list2.get(0).longValue());
assertEquals(3L, list2.get(1).longValue());
assertEquals(4L, list2.get(2).longValue());

// unsubscribe list1
s1.unsubscribe();

// advance further
s.advanceTimeBy(300, TimeUnit.MILLISECONDS);

// list 1 should still have 5 items
assertEquals(5, list1.size());

// list 2 should have 6 items
assertEquals(6, list2.size());
assertEquals(5L, list2.get(3).longValue());
assertEquals(6L, list2.get(4).longValue());
assertEquals(7L, list2.get(5).longValue());

// unsubscribe list2
s2.unsubscribe();

// advance further
s.advanceTimeBy(1000, TimeUnit.MILLISECONDS);

// the following is not working as it seems the PublishSubject does not allow re-subscribing. TODO fix that in subsequent pull request


// // subscribing a new one should start over because the source should have been unsubscribed
// // subscribe list1
// final List<Long> list3 = new ArrayList<Long>();
// Subscription s3 = interval.subscribe(new Action1<Long>() {
//
// @Override
// public void call(Long t1) {
// list3.add(t1);
// }
//
// });
// s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
//
// assertEquals(2, list3.size());
// assertEquals(0L, list3.get(0).longValue());
// assertEquals(1L, list3.get(1).longValue());

}
}
48 changes: 48 additions & 0 deletions rxjava-core/src/test/java/rx/RefCountTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package rx;

import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

public class RefCountTests {

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}

@Test
public void onlyFirstShouldSubscribeAndLastUnsubscribe() {
final AtomicInteger subscriptionCount = new AtomicInteger();
final AtomicInteger unsubscriptionCount = new AtomicInteger();
Observable<Integer> observable = Observable.create(new Observable.OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> observer) {
subscriptionCount.incrementAndGet();
return Subscriptions.create(new Action0() {
@Override
public void call() {
unsubscriptionCount.incrementAndGet();
}
});
}
});
Observable<Integer> refCounted = observable.publish().refCount();
Observer<Integer> observer = mock(Observer.class);
Subscription first = refCounted.subscribe(observer);
assertEquals(1, subscriptionCount.get());
Subscription second = refCounted.subscribe(observer);
assertEquals(1, subscriptionCount.get());
first.unsubscribe();
assertEquals(0, unsubscriptionCount.get());
second.unsubscribe();
assertEquals(1, unsubscriptionCount.get());
}
}

0 comments on commit f2ec21b

Please sign in to comment.