Skip to content

Commit

Permalink
Operator refCount
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Aug 29, 2015
1 parent 3e7bb6e commit cd81369
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1814,4 +1814,5 @@ public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler,
public final BlockingObservable<T> toBlocking() {
return BlockingObservable.from(this);
}

}
197 changes: 197 additions & 0 deletions src/main/java/io/reactivex/internal/operators/PublisherRefCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators;

import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.reactivestreams.*;

import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.SetCompositeResource;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.observables.ConnectableObservable;

/**
* Returns an observable sequence that stays connected to the source as long as
* there is at least one subscription to the observable sequence.
*
* @param <T>
* the value type
*/
public final class PublisherRefCount<T> implements Publisher<T> {

final class ConnectionSubscriber implements Subscriber<T>, Subscription {
final Subscriber<? super T> subscriber;
final SetCompositeResource<Disposable> currentBase;
final Disposable resource;

Subscription s;

private ConnectionSubscriber(Subscriber<? super T> subscriber,
SetCompositeResource<Disposable> currentBase, Disposable resource) {
this.subscriber = subscriber;
this.currentBase = currentBase;
this.resource = resource;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validateSubscription(this.s, s)) {
return;
}
this.s = s;
subscriber.onSubscribe(this);
}

@Override
public void onError(Throwable e) {
cleanup();
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onComplete() {
cleanup();
subscriber.onComplete();
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
resource.dispose();
}

void cleanup() {
// on error or completion we need to unsubscribe the base subscription
// and set the subscriptionCount to 0
lock.lock();
try {
if (baseSubscription == currentBase) {
baseSubscription.dispose();
baseSubscription = new SetCompositeResource<>(Disposable::dispose);
subscriptionCount.set(0);
}
} finally {
lock.unlock();
}
}
}

final ConnectableObservable<? extends T> source;
volatile SetCompositeResource<Disposable> baseSubscription = new SetCompositeResource<>(Disposable::dispose);
final AtomicInteger subscriptionCount = new AtomicInteger(0);

/**
* Use this lock for every subscription and disconnect action.
*/
final ReentrantLock lock = new ReentrantLock();

/**
* Constructor.
*
* @param source
* observable to apply ref count to
*/
public PublisherRefCount(ConnectableObservable<? extends T> source) {
this.source = source;
}

@Override
public void subscribe(final Subscriber<? super T> subscriber) {

lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {

final AtomicBoolean writeLocked = new AtomicBoolean(true);

try {
// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the Action1 passed
// to source.connect above being called
if (writeLocked.get()) {
// Action1 passed to source.connect was not called
lock.unlock();
}
}
} else {
try {
// ready to subscribe to source so do it
doSubscribe(subscriber, baseSubscription);
} finally {
// release the read lock
lock.unlock();
}
}

}

private Consumer<Disposable> onSubscribe(final Subscriber<? super T> subscriber,
final AtomicBoolean writeLocked) {
return subscription -> {
try {
baseSubscription.add(subscription);
// ready to subscribe to source so do it
doSubscribe(subscriber, baseSubscription);
} finally {
// release the write lock
lock.unlock();
writeLocked.set(false);
}
};
}

void doSubscribe(final Subscriber<? super T> subscriber, final SetCompositeResource<Disposable> currentBase) {
// handle unsubscribing from the base subscription
Disposable d = disconnect(currentBase);

ConnectionSubscriber s = new ConnectionSubscriber(subscriber, currentBase, d);

source.unsafeSubscribe(s);
}

private Disposable disconnect(final SetCompositeResource<Disposable> current) {
return () -> {
lock.lock();
try {
if (baseSubscription == current) {
if (subscriptionCount.decrementAndGet() == 0) {
baseSubscription.dispose();
// need a new baseSubscription because once
// unsubscribed stays that way
baseSubscription = new SetCompositeResource<>(Disposable::dispose);
}
}
} finally {
lock.unlock();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.operators.PublisherAutoConnect;
import io.reactivex.internal.operators.*;

/**
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
Expand Down Expand Up @@ -74,9 +74,7 @@ public final Disposable connect() {
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
*/
public Observable<T> refCount() {
// TODO implement RefCount
// return create(new PublisherRefCount<T>(this));
throw new UnsupportedOperationException();
return create(new PublisherRefCount<>(this));
}

/**
Expand Down

0 comments on commit cd81369

Please sign in to comment.