Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved to atomic field updaters. #1246

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5425,7 +5425,7 @@ public final Observable<T> serialize() {
* there is more than 1 {@link Subscriber} this {@link Observable} will be subscribed and emitting data.
* When all subscribers have unsubscribed it will unsubscribe from the source {@link Observable}.
* <p>
* This is an alias for {@link #publish().refCount()}.
* This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishRefCount.png">
*
Expand Down
14 changes: 10 additions & 4 deletions rxjava-core/src/main/java/rx/operators/BlockingOperatorLatest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Notification;
import rx.Observable;
Expand Down Expand Up @@ -51,11 +51,15 @@ public Iterator<T> iterator() {
static final class LatestObserverIterator<T> extends Subscriber<Notification<? extends T>> implements Iterator<T> {
final Semaphore notify = new Semaphore(0);
// observer's notification
final AtomicReference<Notification<? extends T>> reference = new AtomicReference<Notification<? extends T>>();
volatile Notification<? extends T> value;
/** Updater for the value field. */
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<LatestObserverIterator, Notification> REFERENCE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Notification.class, "value");

@Override
public void onNext(Notification<? extends T> args) {
boolean wasntAvailable = reference.getAndSet(args) == null;
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null;
if (wasntAvailable) {
notify.release();
}
Expand Down Expand Up @@ -89,7 +93,9 @@ public boolean hasNext() {
throw Exceptions.propagate(ex);
}

iNotif = reference.getAndSet(null);
@SuppressWarnings("unchecked")
Notification<? extends T> n = (Notification<? extends T>)REFERENCE_UPDATER.getAndSet(this, null);
iNotif = n;
if (iNotif.isOnError()) {
throw Exceptions.propagate(iNotif.getThrowable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package rx.operators;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Subscriber;
Expand Down Expand Up @@ -79,39 +77,40 @@ public void remove() {
}

private static class MostRecentObserver<T> extends Subscriber<T> {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicReference<T> value;
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();

static final NotificationLite<Object> nl = NotificationLite.instance();
volatile Object value;

private MostRecentObserver(T value) {
this.value = new AtomicReference<T>(value);
this.value = nl.next(value);
}

@Override
public void onCompleted() {
completed.set(true);
value = nl.completed();
}

@Override
public void onError(Throwable e) {
exception.set(e);
value = nl.error(e);
}

@Override
public void onNext(T args) {
value.set(args);
value = nl.next(args);
}

private boolean isCompleted() {
return completed.get();
return nl.isCompleted(value);
}

private Throwable getThrowable() {
return exception.get();
Object v = value;
return nl.isError(v) ? nl.getError(v) : null;
}

@SuppressWarnings("unchecked")
private T getRecentValue() {
return value.get();
return (T)value;
}

}
Expand Down
25 changes: 18 additions & 7 deletions rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Notification;
import rx.Observable;
Expand Down Expand Up @@ -61,10 +61,16 @@ private NextIterator(NextObserver<? extends T> observer) {
this.observer = observer;
}


// in tests, set the waiting flag without blocking for the next value to
// allow lockstepping instead of multi-threading
void setWaiting(boolean value) {
observer.waiting.set(value);
/**
* In tests, set the waiting flag without blocking for the next value to
* allow lockstepping instead of multi-threading
* @param value use 1 to enter into the waiting state
*/
void setWaiting(int value) {
observer.setWaiting(value);
}

@Override
Expand Down Expand Up @@ -135,7 +141,10 @@ public void remove() {

private static class NextObserver<T> extends Subscriber<Notification<? extends T>> {
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
private final AtomicBoolean waiting = new AtomicBoolean(false);
volatile int waiting;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<NextObserver> WAITING_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(NextObserver.class, "waiting");

@Override
public void onCompleted() {
Expand All @@ -150,7 +159,7 @@ public void onError(Throwable e) {
@Override
public void onNext(Notification<? extends T> args) {

if (waiting.getAndSet(false) || !args.isOnNext()) {
if (WAITING_UPDATER.getAndSet(this, 0) == 1 || !args.isOnNext()) {
Notification<? extends T> toOffer = args;
while (!buf.offer(toOffer)) {
Notification<? extends T> concurrentItem = buf.poll();
Expand All @@ -165,9 +174,11 @@ public void onNext(Notification<? extends T> args) {
}

public Notification<? extends T> takeNext() throws InterruptedException {
waiting.set(true);
setWaiting(1);
return buf.take();
}

void setWaiting(int value) {
waiting = value;
}
}
}
58 changes: 38 additions & 20 deletions rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Observer;
import rx.Subscriber;
Expand Down Expand Up @@ -53,12 +53,28 @@ public static <T> BufferUntilSubscriber<T> create() {

/** The common state. */
static final class State<T> {
/** Lite notifications of type T. */
final NotificationLite<T> nl = NotificationLite.instance();
/** The first observer or the one which buffers until the first arrives. */
final AtomicReference<Observer<? super T>> observerRef = new AtomicReference<Observer<? super T>>(new BufferedObserver<T>());
volatile Observer<? super T> observerRef = new BufferedObserver<T>();
/** Allow a single subscriber only. */
final AtomicBoolean first = new AtomicBoolean();
volatile int first;
/** Field updater for observerRef. */
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");
/** Field updater for first. */
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<State> FIRST_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(State.class, "first");

boolean casFirst(int expected, int next) {
return FIRST_UPDATER.compareAndSet(this, expected, next);
}
void setObserverRef(Observer<? super T> o) {
observerRef = o;
}
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
}
}

static final class OnSubscribeAction<T> implements OnSubscribe<T> {
Expand All @@ -70,20 +86,21 @@ public OnSubscribeAction(State<T> state) {

@Override
public void call(final Subscriber<? super T> s) {
if (state.first.compareAndSet(false, true)) {
if (state.casFirst(0, 1)) {
final NotificationLite<T> nl = NotificationLite.instance();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NotificationLite is completely stateless so we can move this to a static reference can't we?

https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/NotificationLite.java#L47

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies to many other places in this PR as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static field loses the type variable T and I have to stick SuppressWarnings everywhere. Besides, it is typical one makes local copies of variables in methods. In many places, I considered the option of using static, local or instance NotificationLite based on use and memory cost.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copies in methods is no big deal as that is on the stack. At the object level it is different as that is on the heap.

We should make sure we aren't creating new references on anything allocated per onNext at least.

// drain queued notifications before subscription
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef.get();
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef;
Object o;
while ((o = buffered.buffer.poll()) != null) {
state.nl.accept(s, o);
nl.accept(s, o);
}
// register real observer for pass-thru ... and drain any further events received on first notification
state.observerRef.set(new PassThruObserver<T>(s, buffered.buffer, state.observerRef));
state.setObserverRef(new PassThruObserver<T>(s, buffered.buffer, state));
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
state.observerRef.set(Subscribers.empty());
state.setObserverRef(Subscribers.empty());
}
}));
} else {
Expand All @@ -101,17 +118,17 @@ private BufferUntilSubscriber(State<T> state) {

@Override
public void onCompleted() {
state.observerRef.get().onCompleted();
state.observerRef.onCompleted();
}

@Override
public void onError(Throwable e) {
state.observerRef.get().onError(e);
state.observerRef.onError(e);
}

@Override
public void onNext(T t) {
state.observerRef.get().onNext(t);
state.observerRef.onNext(t);
}

/**
Expand All @@ -127,13 +144,13 @@ private static final class PassThruObserver<T> extends Subscriber<T> {
private final Observer<? super T> actual;
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
private final ConcurrentLinkedQueue<Object> buffer;
private final AtomicReference<Observer<? super T>> observerRef;
private final NotificationLite<T> nl = NotificationLite.instance();
private final State<T> state;

PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer, AtomicReference<Observer<? super T>> observerRef) {
PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer,
State<T> state) {
this.actual = actual;
this.buffer = buffer;
this.observerRef = observerRef;
this.state = state;
}

@Override
Expand All @@ -155,20 +172,21 @@ public void onNext(T t) {
}

private void drainIfNeededAndSwitchToActual() {
final NotificationLite<T> nl = NotificationLite.instance();
Object o;
while ((o = buffer.poll()) != null) {
nl.accept(this, o);
}
// now we can safely change over to the actual and get rid of the pass-thru
// but only if not unsubscribed
observerRef.compareAndSet(this, actual);
state.casObserverRef(this, actual);
}

}

private static final class BufferedObserver<T> extends Subscriber<T> {
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
private final NotificationLite<T> nl = NotificationLite.instance();
private static final NotificationLite<Object> nl = NotificationLite.instance();

@Override
public void onCompleted() {
Expand Down
10 changes: 6 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperatorCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Observable;
import rx.Observable.OnSubscribe;
Expand Down Expand Up @@ -43,7 +43,10 @@
public final class OperatorCache<T> implements OnSubscribe<T> {
protected final Observable<? extends T> source;
protected final Subject<? super T, ? extends T> cache;
protected final AtomicBoolean sourceSubscribed;
volatile int sourceSubscribed;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<OperatorCache> SRC_SUBSCRIBED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(OperatorCache.class, "sourceSubscribed");

public OperatorCache(Observable<? extends T> source) {
this(source, ReplaySubject.<T> create());
Expand All @@ -52,12 +55,11 @@ public OperatorCache(Observable<? extends T> source) {
/* accessible to tests */OperatorCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
this.source = source;
this.cache = cache;
this.sourceSubscribed = new AtomicBoolean();
}

@Override
public void call(Subscriber<? super T> s) {
if (sourceSubscribed.compareAndSet(false, true)) {
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
source.unsafeSubscribe(Subscribers.from(cache));
/*
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
Expand Down
Loading