Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: coverage, fixes, enhancements, cleanup 10/18-1 #4723

Merged
merged 1 commit into from
Oct 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7300,32 +7300,6 @@ public final <U, V> Observable<V> flatMapIterable(final Function<? super T, ? ex
return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), resultSelector, false, bufferSize(), bufferSize());
}

/**
* Returns an Observable that merges each item emitted by the source ObservableSource with the values in an
* Iterable corresponding to that item that is generated by a selector.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of item emitted by the resulting ObservableSource
* @param mapper
* a function that returns an Iterable sequence of values for when given an item emitted by the
* source ObservableSource
* @param bufferSize
* the number of elements to prefetch from the current Observable
* @return an Observable that emits the results of merging the items emitted by the source ObservableSource with
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int bufferSize) {
return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), false, bufferSize);
}

/**
* Maps each element of the upstream Observable into MaybeSources, subscribes to them and
* waits until the upstream and all MaybeSources complete.
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/io/reactivex/flowables/ConnectableFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.util.ConnectConsumer;
import io.reactivex.plugins.RxJavaPlugins;

/**
Expand Down Expand Up @@ -58,14 +59,9 @@ public abstract class ConnectableFlowable<T> extends Flowable<T> {
* @see <a href="http://reactivex.io/documentation/operators/connect.html">ReactiveX documentation: Connect</a>
*/
public final Disposable connect() {
final Disposable[] connection = new Disposable[1];
connect(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
connection[0] = d;
}
});
return connection[0];
ConnectConsumer cc = new ConnectConsumer();
connect(cc);
return cc.disposable;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,12 @@ public void onNext(T t) {
}

DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
if (!timer.compareAndSet(d, de)) {
return;
}
if (timer.compareAndSet(d, de)) {
d = worker.schedule(de, timeout, unit);

d = worker.schedule(de, timeout, unit);
de.setResource(d);
}

de.setResource(d);
}

@Override
Expand All @@ -117,7 +116,9 @@ public void onComplete() {
if (d != DisposableHelper.DISPOSED) {
@SuppressWarnings("unchecked")
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
de.emit();
if (de != null) {
de.run();
}
DisposableHelper.dispose(timer);
worker.dispose();
actual.onComplete();
Expand Down Expand Up @@ -162,10 +163,6 @@ static final class DebounceEmitter<T> extends AtomicReference<Disposable> implem

@Override
public void run() {
emit();
}

void emit() {
if (once.compareAndSet(false, true)) {
parent.emit(idx, value, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,12 @@ static final class GroupJoinDisposable<TLeft, TRight, TLeftEnd, TRightEnd, R>

@Override
public void dispose() {
if (cancelled) {
return;
}
cancelled = true;
cancelAll();
if (getAndIncrement() == 0) {
queue.clear();
if (!cancelled) {
cancelled = true;
cancelAll();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}

Expand Down Expand Up @@ -303,8 +302,7 @@ else if (mode == LEFT_CLOSE) {

lefts.remove(end.index);
disposables.remove(end);
}
else if (mode == RIGHT_CLOSE) {
} else {
LeftRightEndObserver end = (LeftRightEndObserver)val;

rights.remove(end.index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

package io.reactivex.internal.operators.observable;

import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.observers.BasicIntQueueDisposable;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.schedulers.TrampolineScheduler;
import io.reactivex.plugins.RxJavaPlugins;
Expand All @@ -45,24 +45,16 @@ protected void subscribeActual(Observer<? super T> observer) {
}
}

/**
* Pads the base atomic integer used for wip counting.
*/
static class Padding0 extends AtomicInteger {

private static final long serialVersionUID = 3172843496016154809L;

volatile long p01, p02, p03, p04, p05, p06, p07;
volatile long p08, p09, p0A, p0B, p0C, p0D, p0E, p0F;
}

static final class ObserveOnObserver<T> extends Padding0 implements Observer<T>, Disposable, Runnable {
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final SpscLinkedArrayQueue<T> queue;
final int bufferSize;

SimpleQueue<T> queue;

Disposable s;

Expand All @@ -71,17 +63,45 @@ static final class ObserveOnObserver<T> extends Padding0 implements Observer<T>,

volatile boolean cancelled;

int sourceMode;

boolean outputFused;

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
this.bufferSize = bufferSize;
}

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}

queue = new SpscLinkedArrayQueue<T>(bufferSize);

actual.onSubscribe(this);
}
}
Expand All @@ -92,10 +112,8 @@ public void onNext(T t) {
return;
}

if (!queue.offer(t)) {
s.dispose();
onError(new MissingBackpressureException("Queue full?!"));
return;
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
Expand Down Expand Up @@ -126,6 +144,9 @@ public void dispose() {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}

Expand All @@ -140,11 +161,10 @@ void schedule() {
}
}

@Override
public void run() {
void drainNormal() {
int missed = 1;

final SpscLinkedArrayQueue<T> q = queue;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;

for (;;) {
Expand All @@ -154,7 +174,17 @@ public void run() {

for (;;) {
boolean d = done;
T v = q.poll();
T v;

try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;

if (checkTerminated(d, empty, a)) {
Expand All @@ -175,10 +205,55 @@ public void run() {
}
}

void drainFused() {
int missed = 1;

for (;;) {
if (cancelled) {
return;
}

boolean d = done;
Throwable ex = error;

if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}

actual.onNext(null);

if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
s.dispose();
worker.dispose();
queue.clear();
return true;
}
if (d) {
Expand All @@ -195,6 +270,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
Expand All @@ -208,5 +284,29 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
}
return false;
}

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}

@Override
public T poll() throws Exception {
return queue.poll();
}

@Override
public void clear() {
queue.clear();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
}
Loading