Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved to atomic field updaters. #1246

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5425,7 +5425,7 @@ public final Observable<T> serialize() {
* there is more than 1 {@link Subscriber} this {@link Observable} will be subscribed and emitting data.
* When all subscribers have unsubscribed it will unsubscribe from the source {@link Observable}.
* <p>
* This is an alias for {@link #publish().refCount()}.
* This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishRefCount.png">
*
Expand Down
14 changes: 10 additions & 4 deletions rxjava-core/src/main/java/rx/operators/BlockingOperatorLatest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Notification;
import rx.Observable;
Expand Down Expand Up @@ -51,11 +51,15 @@ public Iterator<T> iterator() {
static final class LatestObserverIterator<T> extends Subscriber<Notification<? extends T>> implements Iterator<T> {
final Semaphore notify = new Semaphore(0);
// observer's notification
final AtomicReference<Notification<? extends T>> reference = new AtomicReference<Notification<? extends T>>();
volatile Notification<? extends T> value;
/** Updater for the value field. */
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<LatestObserverIterator, Notification> REFERENCE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Notification.class, "value");

@Override
public void onNext(Notification<? extends T> args) {
boolean wasntAvailable = reference.getAndSet(args) == null;
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null;
if (wasntAvailable) {
notify.release();
}
Expand Down Expand Up @@ -89,7 +93,9 @@ public boolean hasNext() {
throw Exceptions.propagate(ex);
}

iNotif = reference.getAndSet(null);
@SuppressWarnings("unchecked")
Notification<? extends T> n = (Notification<? extends T>)REFERENCE_UPDATER.getAndSet(this, null);
iNotif = n;
if (iNotif.isOnError()) {
throw Exceptions.propagate(iNotif.getThrowable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
package rx.operators;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Observable;
import rx.Subscriber;
Expand Down Expand Up @@ -79,39 +78,43 @@ public void remove() {
}

private static class MostRecentObserver<T> extends Subscriber<T> {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicReference<T> value;
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();

static final NotificationLite<Object> nl = NotificationLite.instance();
volatile Object value;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<MostRecentObserver, Object> VALUE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(MostRecentObserver.class, Object.class, "value");

private MostRecentObserver(T value) {
this.value = new AtomicReference<T>(value);
VALUE_UPDATER.lazySet(this, nl.next(value));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can use lazySet here. Take a look at this definition: http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6275329

The semantics are that the write is guaranteed not to be re-ordered with any previous write, but may be reordered with subsequent operations (or equivalently, might not be visible to other threads) until some other volatile write or synchronizing action occurs).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should stay on the safe side with such constructors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor might work (need to understand the memory model better for that) since the JMM might flush everything after construction.

I'm wondering if the onNext/onCompleted/onError is okay though as getRecentValue() may get a stale result. That may or may not be okay in this scenario. The Iterator will keep checking. It seems like it is a race condition by definition so probably okay here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stuff I'm reading in the JMM about constructors is not answering my question. It's clear about final fields, but isn't clear on whether it causes the equivalent of a "volatile write" that would flush anything such as a lazySet. Thus, I think we need to assume it is not consistent unless we learn otherwise.

}

@Override
public void onCompleted() {
completed.set(true);
VALUE_UPDATER.lazySet(this, nl.completed());
}

@Override
public void onError(Throwable e) {
exception.set(e);
VALUE_UPDATER.lazySet(this, nl.error(e));
}

@Override
public void onNext(T args) {
value.set(args);
VALUE_UPDATER.lazySet(this, nl.next(args));
}

private boolean isCompleted() {
return completed.get();
return nl.isCompleted(value);
}

private Throwable getThrowable() {
return exception.get();
Object v = value;
return nl.isError(v) ? nl.getError(v) : null;
}

@SuppressWarnings("unchecked")
private T getRecentValue() {
return value.get();
return (T)value;
}

}
Expand Down
25 changes: 18 additions & 7 deletions rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Notification;
import rx.Observable;
Expand Down Expand Up @@ -61,10 +61,16 @@ private NextIterator(NextObserver<? extends T> observer) {
this.observer = observer;
}


// in tests, set the waiting flag without blocking for the next value to
// allow lockstepping instead of multi-threading
void setWaiting(boolean value) {
observer.waiting.set(value);
/**
* In tests, set the waiting flag without blocking for the next value to
* allow lockstepping instead of multi-threading
* @param value use 1 to enter into the waiting state
*/
void setWaiting(int value) {
observer.setWaiting(value);
}

@Override
Expand Down Expand Up @@ -135,7 +141,10 @@ public void remove() {

private static class NextObserver<T> extends Subscriber<Notification<? extends T>> {
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
private final AtomicBoolean waiting = new AtomicBoolean(false);
volatile int waiting;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<NextObserver> WAITING_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(NextObserver.class, "waiting");

@Override
public void onCompleted() {
Expand All @@ -150,7 +159,7 @@ public void onError(Throwable e) {
@Override
public void onNext(Notification<? extends T> args) {

if (waiting.getAndSet(false) || !args.isOnNext()) {
if (WAITING_UPDATER.getAndSet(this, 0) == 1 || !args.isOnNext()) {
Notification<? extends T> toOffer = args;
while (!buf.offer(toOffer)) {
Notification<? extends T> concurrentItem = buf.poll();
Expand All @@ -165,9 +174,11 @@ public void onNext(Notification<? extends T> args) {
}

public Notification<? extends T> takeNext() throws InterruptedException {
waiting.set(true);
setWaiting(1);
return buf.take();
}

void setWaiting(int value) {
WAITING_UPDATER.lazySet(this, value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Help me understand why lazySet is safe to use here. It seems we want to have visibility of this value when we next read, but my understanding of lazySet is that it does not guarantee that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read up further on lazySet and added another comment to code above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value set here is eventually visible, which may take a few nanoseconds. The benefit is that the lazySet doesn't flush the store buffer whereas a volatile write does, which incurs 10-40 nanosecond cache-coherency traffic. Generally, the lazySet only makes sure writes that happened before are not reordered after the lazy write; which is generally enough in a SPSC value transmission case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is eventually visible safe here? It doesn't seem to be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main guide was a blog you recommended:
http://psy-lob-saw.blogspot.co.uk/2012/12/atomiclazyset-is-performance-win-for.html

If you wish, I can eliminate all lazySet calls so there won't be any visibility concerns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand when it's safe. I get the use case for setting something null in cleanup, I'm more concerned in places where we set it and need to read it without any further writes guaranteed to happen before.

}
}
}
58 changes: 38 additions & 20 deletions rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Observer;
import rx.Subscriber;
Expand Down Expand Up @@ -53,12 +53,28 @@ public static <T> BufferUntilSubscriber<T> create() {

/** The common state. */
static final class State<T> {
/** Lite notifications of type T. */
final NotificationLite<T> nl = NotificationLite.instance();
/** The first observer or the one which buffers until the first arrives. */
final AtomicReference<Observer<? super T>> observerRef = new AtomicReference<Observer<? super T>>(new BufferedObserver<T>());
volatile Observer<? super T> observerRef = new BufferedObserver<T>();
/** Allow a single subscriber only. */
final AtomicBoolean first = new AtomicBoolean();
volatile int first;
/** Field updater for observerRef. */
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");
/** Field updater for first. */
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<State> FIRST_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(State.class, "first");

boolean casFirst(int expected, int next) {
return FIRST_UPDATER.compareAndSet(this, expected, next);
}
void setObserverRef(Observer<? super T> o) {
OBSERVER_UPDATER.lazySet(this, o);
}
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
}
}

static final class OnSubscribeAction<T> implements OnSubscribe<T> {
Expand All @@ -70,20 +86,21 @@ public OnSubscribeAction(State<T> state) {

@Override
public void call(final Subscriber<? super T> s) {
if (state.first.compareAndSet(false, true)) {
if (state.casFirst(0, 1)) {
final NotificationLite<T> nl = NotificationLite.instance();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NotificationLite is completely stateless so we can move this to a static reference can't we?

https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/NotificationLite.java#L47

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies to many other places in this PR as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static field loses the type variable T and I have to stick SuppressWarnings everywhere. Besides, it is typical one makes local copies of variables in methods. In many places, I considered the option of using static, local or instance NotificationLite based on use and memory cost.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copies in methods is no big deal as that is on the stack. At the object level it is different as that is on the heap.

We should make sure we aren't creating new references on anything allocated per onNext at least.

// drain queued notifications before subscription
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef.get();
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef;
Object o;
while ((o = buffered.buffer.poll()) != null) {
state.nl.accept(s, o);
nl.accept(s, o);
}
// register real observer for pass-thru ... and drain any further events received on first notification
state.observerRef.set(new PassThruObserver<T>(s, buffered.buffer, state.observerRef));
state.setObserverRef(new PassThruObserver<T>(s, buffered.buffer, state));
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
state.observerRef.set(Subscribers.empty());
state.setObserverRef(Subscribers.empty());
}
}));
} else {
Expand All @@ -101,17 +118,17 @@ private BufferUntilSubscriber(State<T> state) {

@Override
public void onCompleted() {
state.observerRef.get().onCompleted();
state.observerRef.onCompleted();
}

@Override
public void onError(Throwable e) {
state.observerRef.get().onError(e);
state.observerRef.onError(e);
}

@Override
public void onNext(T t) {
state.observerRef.get().onNext(t);
state.observerRef.onNext(t);
}

/**
Expand All @@ -127,13 +144,13 @@ private static final class PassThruObserver<T> extends Subscriber<T> {
private final Observer<? super T> actual;
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
private final ConcurrentLinkedQueue<Object> buffer;
private final AtomicReference<Observer<? super T>> observerRef;
private final NotificationLite<T> nl = NotificationLite.instance();
private final State<T> state;

PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer, AtomicReference<Observer<? super T>> observerRef) {
PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer,
State<T> state) {
this.actual = actual;
this.buffer = buffer;
this.observerRef = observerRef;
this.state = state;
}

@Override
Expand All @@ -155,20 +172,21 @@ public void onNext(T t) {
}

private void drainIfNeededAndSwitchToActual() {
final NotificationLite<T> nl = NotificationLite.instance();
Object o;
while ((o = buffer.poll()) != null) {
nl.accept(this, o);
}
// now we can safely change over to the actual and get rid of the pass-thru
// but only if not unsubscribed
observerRef.compareAndSet(this, actual);
state.casObserverRef(this, actual);
}

}

private static final class BufferedObserver<T> extends Subscriber<T> {
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
private final NotificationLite<T> nl = NotificationLite.instance();
private static final NotificationLite<Object> nl = NotificationLite.instance();

@Override
public void onCompleted() {
Expand Down
10 changes: 6 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperatorCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Observable;
import rx.Observable.OnSubscribe;
Expand Down Expand Up @@ -43,7 +43,10 @@
public final class OperatorCache<T> implements OnSubscribe<T> {
protected final Observable<? extends T> source;
protected final Subject<? super T, ? extends T> cache;
protected final AtomicBoolean sourceSubscribed;
volatile int sourceSubscribed;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<OperatorCache> SRC_SUBSCRIBED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(OperatorCache.class, "sourceSubscribed");

public OperatorCache(Observable<? extends T> source) {
this(source, ReplaySubject.<T> create());
Expand All @@ -52,12 +55,11 @@ public OperatorCache(Observable<? extends T> source) {
/* accessible to tests */OperatorCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
this.source = source;
this.cache = cache;
this.sourceSubscribed = new AtomicBoolean();
}

@Override
public void call(Subscriber<? super T> s) {
if (sourceSubscribed.compareAndSet(false, true)) {
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
source.unsafeSubscribe(Subscribers.from(cache));
/*
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
Expand Down
Loading