Skip to content

Commit

Permalink
Merge pull request #3112 from akarnokd/TestCoverageObservers
Browse files Browse the repository at this point in the history
Observers package test coverage and fixes.
  • Loading branch information
akarnokd committed Aug 12, 2015
2 parents 98530ed + 2423a17 commit cb1712d
Show file tree
Hide file tree
Showing 10 changed files with 1,475 additions and 169 deletions.
188 changes: 80 additions & 108 deletions src/main/java/rx/observers/SerializedObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
package rx.observers;

import rx.Observer;
import rx.exceptions.Exceptions;
import rx.exceptions.*;
import rx.internal.operators.NotificationLite;

/**
* Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
Expand All @@ -35,13 +36,15 @@
public class SerializedObserver<T> implements Observer<T> {
private final Observer<? super T> actual;

private boolean emitting = false;
private boolean terminated = false;
private boolean emitting;
/** Set to true if a terminal event was received. */
private volatile boolean terminated;
/** If not null, it indicates more work. */
private FastList queue;
private final NotificationLite<T> nl = NotificationLite.instance();

private static final int MAX_DRAIN_ITERATION = Integer.MAX_VALUE;
private static final Object NULL_SENTINEL = new Object();
private static final Object COMPLETE_SENTINEL = new Object();
/** Number of iterations without additional safepoint poll in the drain loop. */
private static final int MAX_DRAIN_ITERATION = 1024;

static final class FastList {
Object[] array;
Expand All @@ -64,150 +67,119 @@ public void add(Object o) {
}
}

private static final class ErrorSentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
this.e = e;
}
}

public SerializedObserver(Observer<? super T> s) {
this.actual = s;
}

@Override
public void onCompleted() {
FastList list;
public void onNext(T t) {
if (terminated) {
return;
}
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (emitting) {
if (queue == null) {
queue = new FastList();
FastList list = queue;
if (list == null) {
list = new FastList();
queue = list;
}
queue.add(COMPLETE_SENTINEL);
list.add(nl.next(t));
return;
}
emitting = true;
list = queue;
queue = null;
}
drainQueue(list);
actual.onCompleted();
try {
actual.onNext(t);
} catch (Throwable e) {
terminated = true;
Exceptions.throwIfFatal(e);
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
return;
}
for (;;) {
for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
FastList list;
synchronized (this) {
list = queue;
if (list == null) {
emitting = false;
return;
}
queue = null;
}
for (Object o : list.array) {
if (o == null) {
break;
}
try {
if (nl.accept(actual, o)) {
terminated = true;
return;
}
} catch (Throwable e) {
terminated = true;
Exceptions.throwIfFatal(e);
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
return;
}
}
}
}
}

@Override
public void onError(final Throwable e) {
Exceptions.throwIfFatal(e);
FastList list;
if (terminated) {
return;
}
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (emitting) {
if (queue == null) {
queue = new FastList();
/*
* FIXME: generally, errors jump the queue but this wasn't true
* for SerializedObserver and may break existing expectations.
*/
FastList list = queue;
if (list == null) {
list = new FastList();
queue = list;
}
queue.add(new ErrorSentinel(e));
list.add(nl.error(e));
return;
}
emitting = true;
list = queue;
queue = null;
}
drainQueue(list);
actual.onError(e);
synchronized(this) {
emitting = false;
}
}

@Override
public void onNext(T t) {
FastList list;

public void onCompleted() {
if (terminated) {
return;
}
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (emitting) {
if (queue == null) {
queue = new FastList();
FastList list = queue;
if (list == null) {
list = new FastList();
queue = list;
}
queue.add(t != null ? t : NULL_SENTINEL);
// another thread is emitting so we add to the queue and return
list.add(nl.completed());
return;
}
// we can emit
emitting = true;
// reference to the list to drain before emitting our value
list = queue;
queue = null;
}

// we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
boolean skipFinal = false;
try {
int iter = MAX_DRAIN_ITERATION;
do {
drainQueue(list);
if (iter == MAX_DRAIN_ITERATION) {
// after the first draining we emit our own value
actual.onNext(t);
}
--iter;
if (iter > 0) {
synchronized (this) {
list = queue;
queue = null;
if (list == null) {
emitting = false;
skipFinal = true;
return;
}
}
}
} while (iter > 0);
} finally {
if (!skipFinal) {
synchronized (this) {
if (terminated) {
list = queue;
queue = null;
} else {
emitting = false;
list = null;
}
}
}
}

// this will only drain if terminated (done here outside of synchronized block)
drainQueue(list);
}

void drainQueue(FastList list) {
if (list == null || list.size == 0) {
return;
}
for (Object v : list.array) {
if (v == null) {
break;
}
if (v == NULL_SENTINEL) {
actual.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
actual.onCompleted();
} else if (v.getClass() == ErrorSentinel.class) {
actual.onError(((ErrorSentinel) v).e);
} else {
@SuppressWarnings("unchecked")
T t = (T)v;
actual.onNext(t);
}
}
actual.onCompleted();
}
}
14 changes: 9 additions & 5 deletions src/main/java/rx/observers/TestObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,17 @@ public void assertReceivedOnNext(List<T> items) {
}

for (int i = 0; i < items.size(); i++) {
if (items.get(i) == null) {
T expected = items.get(i);
T actual = onNextEvents.get(i);
if (expected == null) {
// check for null equality
if (onNextEvents.get(i) != null) {
throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + onNextEvents.get(i) + "]");
if (actual != null) {
throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + actual + "]");
}
} else if (!items.get(i).equals(onNextEvents.get(i))) {
throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] (" + items.get(i).getClass().getSimpleName() + ") but was: [" + onNextEvents.get(i) + "] (" + onNextEvents.get(i).getClass().getSimpleName() + ")");
} else if (!expected.equals(actual)) {
throw new AssertionError("Value at index: " + i
+ " expected to be [" + expected + "] (" + expected.getClass().getSimpleName()
+ ") but was: [" + actual + "] (" + (actual != null ? actual.getClass().getSimpleName() : "null") + ")");

}
}
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/rx/observers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,15 @@ public void assertUnsubscribed() {
* if this {@code Subscriber} has received one or more {@code onError} notifications
*/
public void assertNoErrors() {
if (getOnErrorEvents().size() > 0) {
// can't use AssertionError because (message, cause) doesn't exist until Java 7
throw new RuntimeException("Unexpected onError events: " + getOnErrorEvents().size(), getOnErrorEvents().get(0));
// TODO possibly check for Java7+ and then use AssertionError at runtime (since we always compile with 7)
List<Throwable> onErrorEvents = getOnErrorEvents();
if (onErrorEvents.size() > 0) {
AssertionError ae = new AssertionError("Unexpected onError events: " + getOnErrorEvents().size());
if (onErrorEvents.size() == 1) {
ae.initCause(getOnErrorEvents().get(0));
} else {
ae.initCause(new CompositeException(onErrorEvents));
}
throw ae;
}
}

Expand Down
Loading

0 comments on commit cb1712d

Please sign in to comment.