From 071aa4c38c4cd035642610757400b37ed26f025a Mon Sep 17 00:00:00 2001 From: Aaron Tull Date: Mon, 4 Aug 2014 11:23:26 -0700 Subject: [PATCH] Adding backpressure to OnSubscribeRedo --- .../internal/operators/OnSubscribeRedo.java | 62 ++++++++++++++++--- .../internal/operators/OperatorRetryTest.java | 1 - 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRedo.java index 9947dd1341..5b1099693e 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -34,11 +34,15 @@ import static rx.Observable.create; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import rx.Notification; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observable.Operator; +import rx.Producer; import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; @@ -154,16 +158,20 @@ public static Observable redo(Observable source, Func1 source; - private final Func1>, ? extends Observable> f; + private final Func1>, ? extends Observable> controlHandlerFunction; private boolean stopOnComplete; private boolean stopOnError; private final Scheduler scheduler; private final AtomicBoolean isLocked = new AtomicBoolean(true); + private final AtomicBoolean isStarted = new AtomicBoolean(false); + // incremented when requests are made, decremented when requests are fulfilled + private final AtomicLong consumerCapacity = new AtomicLong(0l); + private final AtomicReference currentProducer = new AtomicReference(); private OnSubscribeRedo(Observable source, Func1>, ? extends Observable> f, boolean stopOnComplete, boolean stopOnError, Scheduler scheduler) { this.source = source; - this.f = f; + this.controlHandlerFunction = f; this.stopOnComplete = stopOnComplete; this.stopOnError = stopOnError; this.scheduler = scheduler; @@ -171,15 +179,19 @@ private OnSubscribeRedo(Observable source, Func1 child) { + isStarted.set(false); + isLocked.set(true); + consumerCapacity.set(0l); + currentProducer.set(null); + final Scheduler.Worker inner = scheduler.createWorker(); child.add(inner); final CompositeSubscription sourceSubscriptions = new CompositeSubscription(); child.add(sourceSubscriptions); - + final PublishSubject> terminals = PublishSubject.create(); - final Action0 subscribeToSource = new Action0() { @Override public void call() { @@ -198,8 +210,15 @@ public void onError(Throwable e) { @Override public void onNext(T v) { + consumerCapacity.decrementAndGet(); child.onNext(v); } + + @Override + public void setProducer(Producer producer) { + currentProducer.set(producer); + producer.request(consumerCapacity.get()); + } }; // new subscription each time so if it unsubscribes itself it does not prevent retries // by unsubscribing the child subscription @@ -208,8 +227,10 @@ public void onNext(T v) { } }; - final Observable restarts = f.call( - // lifting in a custom operator to kind of do a merge/map/filter thing. + // the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat' + // type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert + // the retry/repeat relevant values to the control handler + final Observable restarts = controlHandlerFunction.call( terminals.lift(new Operator, Notification>() { @Override public Subscriber> call(final Subscriber> filteredTerminals) { @@ -233,6 +254,11 @@ public void onNext(Notification t) { filteredTerminals.onNext(t); } } + + @Override + public void setProducer(Producer producer) { + producer.request(Long.MAX_VALUE); + } }; } })); @@ -255,15 +281,31 @@ public void onError(Throwable e) { @Override public void onNext(Object t) { if (!isLocked.get() && !child.isUnsubscribed()) { - // if (!child.isUnsubscribed()) { child.add(inner.schedule(subscribeToSource)); } } + + @Override + public void setProducer(Producer producer) { + producer.request(Long.MAX_VALUE); + } }); } })); - if (!child.isUnsubscribed()) { - child.add(inner.schedule(subscribeToSource)); - } + + child.setProducer(new Producer() { + + @Override + public void request(long n) { + if (isStarted.compareAndSet(false, true)) { + consumerCapacity.set(n); + if (!child.isUnsubscribed()) child.add(inner.schedule(subscribeToSource)); + } else if (currentProducer.get() != null) { + consumerCapacity.getAndAdd(n); + currentProducer.get().request(n); + } + } + }); + } } diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorRetryTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorRetryTest.java index 5cae6f9055..4bb789a2fc 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorRetryTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorRetryTest.java @@ -37,7 +37,6 @@ import rx.Observable.OnSubscribe; import rx.Notification; import rx.Observer; -import rx.Scheduler.Worker; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0;