Skip to content

Commit

Permalink
Merge pull request #2996 from davidmoten/switch-if-empty-request-bug
Browse files Browse the repository at this point in the history
switchIfEmpty - fix backpressure bug and lost requests
  • Loading branch information
akarnokd committed Jun 1, 2015
2 parents 4b58a87 + c4fcd91 commit 53a0204
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 53 deletions.
101 changes: 49 additions & 52 deletions src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.internal.producers.ProducerArbiter;
import rx.subscriptions.SerialSubscription;

/**
Expand All @@ -35,36 +35,32 @@ public OperatorSwitchIfEmpty(Observable<? extends T> alternate) {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final SerialSubscription ssub = new SerialSubscription();
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
ProducerArbiter arbiter = new ProducerArbiter();
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, ssub, arbiter, alternate);
ssub.set(parent);
child.add(ssub);
child.setProducer(arbiter);
return parent;
}

private class SwitchIfEmptySubscriber extends Subscriber<T> {

boolean empty = true;
final AtomicLong consumerCapacity = new AtomicLong(0l);
private static final class ParentSubscriber<T> extends Subscriber<T> {

private boolean empty = true;
private final Subscriber<? super T> child;
final SerialSubscription ssub;
private final SerialSubscription ssub;
private final ProducerArbiter arbiter;
private final Observable<? extends T> alternate;

public SwitchIfEmptySubscriber(Subscriber<? super T> child, final SerialSubscription ssub) {
ParentSubscriber(Subscriber<? super T> child, final SerialSubscription ssub, ProducerArbiter arbiter, Observable<? extends T> alternate) {
this.child = child;
this.ssub = ssub;
this.arbiter = arbiter;
this.alternate = alternate;
}

@Override
public void setProducer(final Producer producer) {
super.setProducer(new Producer() {
@Override
public void request(long n) {
if (empty) {
consumerCapacity.set(n);
}
producer.request(n);
}
});
arbiter.setProducer(producer);
}

@Override
Expand All @@ -77,41 +73,9 @@ public void onCompleted() {
}

private void subscribeToAlternate() {
ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() {

@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
@Override
public void request(long n) {
producer.request(n);
}
});
}

@Override
public void onStart() {
final long capacity = consumerCapacity.get();
if (capacity > 0) {
request(capacity);
}
}

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
child.onNext(t);
}
}));
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
ssub.set(as);
alternate.unsafeSubscribe(as);
}

@Override
Expand All @@ -123,6 +87,39 @@ public void onError(Throwable e) {
public void onNext(T t) {
empty = false;
child.onNext(t);
arbiter.produced(1);
}
}

private static final class AlternateSubscriber<T> extends Subscriber<T> {

private final ProducerArbiter arbiter;
private final Subscriber<? super T> child;

AlternateSubscriber(Subscriber<? super T> child, ProducerArbiter arbiter) {
this.child = child;
this.arbiter = arbiter;
}

@Override
public void setProducer(final Producer producer) {
arbiter.setProducer(producer);
}

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
child.onNext(t);
arbiter.produced(1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import rx.*;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

public class OperatorSwitchIfEmptyTest {
Expand Down Expand Up @@ -142,6 +146,10 @@ public void onStart() {

assertEquals(Arrays.asList(1), ts.getOnNextEvents());
ts.assertNoErrors();
ts.requestMore(1);
ts.assertValueCount(2);
ts.requestMore(1);
ts.assertValueCount(3);
}
@Test
public void testBackpressureNoRequest() {
Expand All @@ -153,8 +161,51 @@ public void onStart() {
}
};
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(ts);

assertTrue(ts.getOnNextEvents().isEmpty());
ts.assertNoErrors();
}

@Test
public void testBackpressureOnFirstObservable() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
Observable.just(1,2,3).switchIfEmpty(Observable.just(4, 5, 6)).subscribe(ts);
ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();
}

@Test(timeout = 10000)
public void testRequestsNotLost() throws InterruptedException {
final TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
Observable.create(new OnSubscribe<Long>() {

@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.setProducer(new Producer() {
final AtomicBoolean completed = new AtomicBoolean(false);
@Override
public void request(long n) {
if (n > 0 && completed.compareAndSet(false, true)) {
Schedulers.io().createWorker().schedule(new Action0() {
@Override
public void call() {
subscriber.onCompleted();
}}, 100, TimeUnit.MILLISECONDS);
}
}});
}})
.switchIfEmpty(Observable.from(Arrays.asList(1L, 2L, 3L)))
.subscribeOn(Schedulers.computation())
.subscribe(ts);
ts.requestMore(0);
Thread.sleep(50);
//request while first observable is still finishing (as empty)
ts.requestMore(1);
ts.requestMore(1);
Thread.sleep(500);
ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertValueCount(2);
ts.unsubscribe();
}
}

0 comments on commit 53a0204

Please sign in to comment.