Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding backpressure to OnSubscribeRedo #1548

Merged
merged 1 commit into from
Aug 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@
import static rx.Observable.create;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
Expand Down Expand Up @@ -154,32 +158,40 @@ public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observa
}

private Observable<T> source;
private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f;
private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> controlHandlerFunction;
private boolean stopOnComplete;
private boolean stopOnError;
private final Scheduler scheduler;
private final AtomicBoolean isLocked = new AtomicBoolean(true);
private final AtomicBoolean isStarted = new AtomicBoolean(false);
// incremented when requests are made, decremented when requests are fulfilled
private final AtomicLong consumerCapacity = new AtomicLong(0l);
private final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();

private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
Scheduler scheduler) {
this.source = source;
this.f = f;
this.controlHandlerFunction = f;
this.stopOnComplete = stopOnComplete;
this.stopOnError = stopOnError;
this.scheduler = scheduler;
}

@Override
public void call(final Subscriber<? super T> child) {
isStarted.set(false);
isLocked.set(true);
consumerCapacity.set(0l);
currentProducer.set(null);

final Scheduler.Worker inner = scheduler.createWorker();
child.add(inner);

final CompositeSubscription sourceSubscriptions = new CompositeSubscription();
child.add(sourceSubscriptions);

final PublishSubject<Notification<?>> terminals = PublishSubject.create();


final Action0 subscribeToSource = new Action0() {
@Override
public void call() {
Expand All @@ -198,8 +210,15 @@ public void onError(Throwable e) {

@Override
public void onNext(T v) {
consumerCapacity.decrementAndGet();
child.onNext(v);
}

@Override
public void setProducer(Producer producer) {
currentProducer.set(producer);
producer.request(consumerCapacity.get());
}
};
// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscription
Expand All @@ -208,8 +227,10 @@ public void onNext(T v) {
}
};

final Observable<?> restarts = f.call(
// lifting in a custom operator to kind of do a merge/map/filter thing.
// the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat'
// type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert
// the retry/repeat relevant values to the control handler
final Observable<?> restarts = controlHandlerFunction.call(
terminals.lift(new Operator<Notification<?>, Notification<?>>() {
@Override
public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
Expand All @@ -233,6 +254,11 @@ public void onNext(Notification<?> t) {
filteredTerminals.onNext(t);
}
}

@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
};
}
}));
Expand All @@ -255,15 +281,31 @@ public void onError(Throwable e) {
@Override
public void onNext(Object t) {
if (!isLocked.get() && !child.isUnsubscribed()) {
// if (!child.isUnsubscribed()) {
child.add(inner.schedule(subscribeToSource));
}
}

@Override
public void setProducer(Producer producer) {
producer.request(Long.MAX_VALUE);
}
});
}
}));
if (!child.isUnsubscribed()) {
child.add(inner.schedule(subscribeToSource));
}

child.setProducer(new Producer() {

@Override
public void request(long n) {
if (isStarted.compareAndSet(false, true)) {
consumerCapacity.set(n);
if (!child.isUnsubscribed()) child.add(inner.schedule(subscribeToSource));
} else if (currentProducer.get() != null) {
consumerCapacity.getAndAdd(n);
currentProducer.get().request(n);
}
}
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import rx.Observable.OnSubscribe;
import rx.Notification;
import rx.Observer;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
Expand Down