Skip to content

Commit

Permalink
Observable.from(iterable) should emit onCompleted even if none reques…
Browse files Browse the repository at this point in the history
…ted when iterable is empty
  • Loading branch information
davidmoten committed Apr 21, 2015
1 parent aefdebb commit 1f5ed40
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
12 changes: 7 additions & 5 deletions src/main/java/rx/internal/operators/OnSubscribeFromIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
o.setProducer(new IterableProducer<T>(o, it));
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
o.setProducer(new IterableProducer<T>(o, it));
}

private static final class IterableProducer<T> implements Producer {
Expand All @@ -62,12 +65,11 @@ private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {

@Override
public void request(long n) {
if (REQUESTED_UPDATER.get(this) == Long.MAX_VALUE) {
if (requested == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE) {
REQUESTED_UPDATER.set(this, n);
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
// fast-path without backpressure
while (it.hasNext()) {
if (o.isUnsubscribed()) {
Expand All @@ -78,7 +80,7 @@ public void request(long n) {
if (!o.isUnsubscribed()) {
o.onCompleted();
}
} else if(n > 0) {
} else if (n > 0) {
// backpressure is requested
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
if (_c == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -74,12 +75,12 @@ public Iterator<String> iterator() {

@Override
public boolean hasNext() {
return i++ < 3;
return i < 3;
}

@Override
public String next() {
return String.valueOf(i);
return String.valueOf(++i);
}

@Override
Expand Down Expand Up @@ -193,5 +194,31 @@ public void onNext(Integer t) {
assertTrue(latch.await(10, TimeUnit.SECONDS));
}

@Test
public void testFromEmptyIterableWhenZeroRequestedShouldStillEmitOnCompletedEagerly() {
final AtomicBoolean completed = new AtomicBoolean(false);
Observable.from(Collections.emptyList()).subscribe(new Subscriber<Object>() {

@Override
public void onStart() {
request(0);
}

@Override
public void onCompleted() {
completed.set(true);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Object t) {

}});
assertTrue(completed.get());
}

}

0 comments on commit 1f5ed40

Please sign in to comment.