From 8b8c72113f277c21d413fc52868a1c6a6c4dd0ab Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 22 May 2014 13:24:19 +0200 Subject: [PATCH 1/3] Moved to atomic field updaters. --- rxjava-core/src/main/java/rx/Observable.java | 2 +- .../rx/operators/BlockingOperatorLatest.java | 14 +- .../operators/BlockingOperatorMostRecent.java | 29 +- .../rx/operators/BlockingOperatorNext.java | 25 +- .../rx/operators/BufferUntilSubscriber.java | 58 ++-- .../main/java/rx/operators/OperatorCache.java | 10 +- .../java/rx/operators/OperatorConcat.java | 30 +- .../java/rx/operators/OperatorGroupBy.java | 258 ++++++++++-------- .../main/java/rx/operators/OperatorMerge.java | 141 +++++----- .../rx/operators/OperatorMergeDelayError.java | 182 ++++++------ .../rx/operators/OperatorMergeMapPair.java | 166 ++++++----- .../operators/OperatorMergeMapTransform.java | 206 +++++++------- .../operators/OperatorMergeMaxConcurrent.java | 191 +++++++------ .../java/rx/operators/OperatorObserveOn.java | 24 +- .../rx/operators/OperatorParallelMerge.java | 31 ++- .../main/java/rx/operators/OperatorPivot.java | 35 ++- .../main/java/rx/operators/OperatorRetry.java | 39 ++- .../rx/operators/OperatorSampleWithTime.java | 14 +- .../rx/operators/OperatorTimeoutBase.java | 28 +- .../main/java/rx/operators/OperatorZip.java | 23 +- .../main/java/rx/plugins/RxJavaPlugins.java | 57 ++-- .../rx/schedulers/EventLoopsScheduler.java | 17 +- .../GenericScheduledExecutorService.java | 20 +- .../rx/schedulers/NewThreadScheduler.java | 57 ++-- .../rx/schedulers/TrampolineScheduler.java | 10 +- .../main/java/rx/subjects/ReplaySubject.java | 36 ++- .../subjects/SubjectSubscriptionManager.java | 41 +-- .../rx/subscriptions/BooleanSubscription.java | 11 +- .../MultipleAssignmentSubscription.java | 25 +- .../subscriptions/RefCountSubscription.java | 60 ++-- .../rx/subscriptions/SerialSubscription.java | 21 +- .../java/rx/subscriptions/Subscriptions.java | 55 ++-- .../operators/BlockingOperatorNextTest.java | 4 +- .../ReplaySubjectBoundedConcurrencyTest.java | 6 +- 34 files changed, 1082 insertions(+), 844 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9c6f419537..467c0e115b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -5425,7 +5425,7 @@ public final Observable serialize() { * there is more than 1 {@link Subscriber} this {@link Observable} will be subscribed and emitting data. * When all subscribers have unsubscribed it will unsubscribe from the source {@link Observable}. *

- * This is an alias for {@link #publish().refCount()}. + * This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}. *

* * diff --git a/rxjava-core/src/main/java/rx/operators/BlockingOperatorLatest.java b/rxjava-core/src/main/java/rx/operators/BlockingOperatorLatest.java index 0c16bb399e..682b36d168 100644 --- a/rxjava-core/src/main/java/rx/operators/BlockingOperatorLatest.java +++ b/rxjava-core/src/main/java/rx/operators/BlockingOperatorLatest.java @@ -18,7 +18,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Notification; import rx.Observable; @@ -51,11 +51,15 @@ public Iterator iterator() { static final class LatestObserverIterator extends Subscriber> implements Iterator { final Semaphore notify = new Semaphore(0); // observer's notification - final AtomicReference> reference = new AtomicReference>(); + volatile Notification value; + /** Updater for the value field. */ + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater REFERENCE_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Notification.class, "value"); @Override public void onNext(Notification args) { - boolean wasntAvailable = reference.getAndSet(args) == null; + boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null; if (wasntAvailable) { notify.release(); } @@ -89,7 +93,9 @@ public boolean hasNext() { throw Exceptions.propagate(ex); } - iNotif = reference.getAndSet(null); + @SuppressWarnings("unchecked") + Notification n = (Notification)REFERENCE_UPDATER.getAndSet(this, null); + iNotif = n; if (iNotif.isOnError()) { throw Exceptions.propagate(iNotif.getThrowable()); } diff --git a/rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java b/rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java index ef974fb1b5..e2b6fb22cb 100644 --- a/rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java +++ b/rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java @@ -16,8 +16,7 @@ package rx.operators; import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observable; import rx.Subscriber; @@ -79,39 +78,43 @@ public void remove() { } private static class MostRecentObserver extends Subscriber { - private final AtomicBoolean completed = new AtomicBoolean(false); - private final AtomicReference value; - private final AtomicReference exception = new AtomicReference(); - + static final NotificationLite nl = NotificationLite.instance(); + volatile Object value; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater VALUE_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(MostRecentObserver.class, Object.class, "value"); + private MostRecentObserver(T value) { - this.value = new AtomicReference(value); + VALUE_UPDATER.lazySet(this, nl.next(value)); } @Override public void onCompleted() { - completed.set(true); + VALUE_UPDATER.lazySet(this, nl.completed()); } @Override public void onError(Throwable e) { - exception.set(e); + VALUE_UPDATER.lazySet(this, nl.error(e)); } @Override public void onNext(T args) { - value.set(args); + VALUE_UPDATER.lazySet(this, nl.next(args)); } private boolean isCompleted() { - return completed.get(); + return nl.isCompleted(value); } private Throwable getThrowable() { - return exception.get(); + Object v = value; + return nl.isError(v) ? nl.getError(v) : null; } + @SuppressWarnings("unchecked") private T getRecentValue() { - return value.get(); + return (T)value; } } diff --git a/rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java b/rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java index 9c52880ef0..5d570f955e 100644 --- a/rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java +++ b/rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java @@ -19,7 +19,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Notification; import rx.Observable; @@ -61,10 +61,16 @@ private NextIterator(NextObserver observer) { this.observer = observer; } + // in tests, set the waiting flag without blocking for the next value to // allow lockstepping instead of multi-threading - void setWaiting(boolean value) { - observer.waiting.set(value); + /** + * In tests, set the waiting flag without blocking for the next value to + * allow lockstepping instead of multi-threading + * @param value use 1 to enter into the waiting state + */ + void setWaiting(int value) { + observer.setWaiting(value); } @Override @@ -135,7 +141,10 @@ public void remove() { private static class NextObserver extends Subscriber> { private final BlockingQueue> buf = new ArrayBlockingQueue>(1); - private final AtomicBoolean waiting = new AtomicBoolean(false); + volatile int waiting; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WAITING_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(NextObserver.class, "waiting"); @Override public void onCompleted() { @@ -150,7 +159,7 @@ public void onError(Throwable e) { @Override public void onNext(Notification args) { - if (waiting.getAndSet(false) || !args.isOnNext()) { + if (WAITING_UPDATER.getAndSet(this, 0) == 1 || !args.isOnNext()) { Notification toOffer = args; while (!buf.offer(toOffer)) { Notification concurrentItem = buf.poll(); @@ -165,9 +174,11 @@ public void onNext(Notification args) { } public Notification takeNext() throws InterruptedException { - waiting.set(true); + setWaiting(1); return buf.take(); } - + void setWaiting(int value) { + WAITING_UPDATER.lazySet(this, value); + } } } diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java index f4db4baa5f..7bbbb4d3f9 100644 --- a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java +++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java @@ -16,8 +16,8 @@ package rx.operators; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observer; import rx.Subscriber; @@ -53,12 +53,28 @@ public static BufferUntilSubscriber create() { /** The common state. */ static final class State { - /** Lite notifications of type T. */ - final NotificationLite nl = NotificationLite.instance(); /** The first observer or the one which buffers until the first arrives. */ - final AtomicReference> observerRef = new AtomicReference>(new BufferedObserver()); + volatile Observer observerRef = new BufferedObserver(); /** Allow a single subscriber only. */ - final AtomicBoolean first = new AtomicBoolean(); + volatile int first; + /** Field updater for observerRef. */ + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater OBSERVER_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef"); + /** Field updater for first. */ + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater FIRST_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(State.class, "first"); + + boolean casFirst(int expected, int next) { + return FIRST_UPDATER.compareAndSet(this, expected, next); + } + void setObserverRef(Observer o) { + OBSERVER_UPDATER.lazySet(this, o); + } + boolean casObserverRef(Observer expected, Observer next) { + return OBSERVER_UPDATER.compareAndSet(this, expected, next); + } } static final class OnSubscribeAction implements OnSubscribe { @@ -70,20 +86,21 @@ public OnSubscribeAction(State state) { @Override public void call(final Subscriber s) { - if (state.first.compareAndSet(false, true)) { + if (state.casFirst(0, 1)) { + final NotificationLite nl = NotificationLite.instance(); // drain queued notifications before subscription // we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer - BufferedObserver buffered = (BufferedObserver)state.observerRef.get(); + BufferedObserver buffered = (BufferedObserver)state.observerRef; Object o; while ((o = buffered.buffer.poll()) != null) { - state.nl.accept(s, o); + nl.accept(s, o); } // register real observer for pass-thru ... and drain any further events received on first notification - state.observerRef.set(new PassThruObserver(s, buffered.buffer, state.observerRef)); + state.setObserverRef(new PassThruObserver(s, buffered.buffer, state)); s.add(Subscriptions.create(new Action0() { @Override public void call() { - state.observerRef.set(Subscribers.empty()); + state.setObserverRef(Subscribers.empty()); } })); } else { @@ -101,17 +118,17 @@ private BufferUntilSubscriber(State state) { @Override public void onCompleted() { - state.observerRef.get().onCompleted(); + state.observerRef.onCompleted(); } @Override public void onError(Throwable e) { - state.observerRef.get().onError(e); + state.observerRef.onError(e); } @Override public void onNext(T t) { - state.observerRef.get().onNext(t); + state.observerRef.onNext(t); } /** @@ -127,13 +144,13 @@ private static final class PassThruObserver extends Subscriber { private final Observer actual; // this assumes single threaded synchronous notifications (the Rx contract for a single Observer) private final ConcurrentLinkedQueue buffer; - private final AtomicReference> observerRef; - private final NotificationLite nl = NotificationLite.instance(); + private final State state; - PassThruObserver(Observer actual, ConcurrentLinkedQueue buffer, AtomicReference> observerRef) { + PassThruObserver(Observer actual, ConcurrentLinkedQueue buffer, + State state) { this.actual = actual; this.buffer = buffer; - this.observerRef = observerRef; + this.state = state; } @Override @@ -155,20 +172,21 @@ public void onNext(T t) { } private void drainIfNeededAndSwitchToActual() { + final NotificationLite nl = NotificationLite.instance(); Object o; while ((o = buffer.poll()) != null) { nl.accept(this, o); } // now we can safely change over to the actual and get rid of the pass-thru // but only if not unsubscribed - observerRef.compareAndSet(this, actual); + state.casObserverRef(this, actual); } } private static final class BufferedObserver extends Subscriber { private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue(); - private final NotificationLite nl = NotificationLite.instance(); + private static final NotificationLite nl = NotificationLite.instance(); @Override public void onCompleted() { diff --git a/rxjava-core/src/main/java/rx/operators/OperatorCache.java b/rxjava-core/src/main/java/rx/operators/OperatorCache.java index 8a7578322c..3c0cb78483 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorCache.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorCache.java @@ -15,7 +15,7 @@ */ package rx.operators; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Observable.OnSubscribe; @@ -43,7 +43,10 @@ public final class OperatorCache implements OnSubscribe { protected final Observable source; protected final Subject cache; - protected final AtomicBoolean sourceSubscribed; + volatile int sourceSubscribed; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater SRC_SUBSCRIBED_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(OperatorCache.class, "sourceSubscribed"); public OperatorCache(Observable source) { this(source, ReplaySubject. create()); @@ -52,12 +55,11 @@ public OperatorCache(Observable source) { /* accessible to tests */OperatorCache(Observable source, Subject cache) { this.source = source; this.cache = cache; - this.sourceSubscribed = new AtomicBoolean(); } @Override public void call(Subscriber s) { - if (sourceSubscribed.compareAndSet(false, true)) { + if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) { source.unsafeSubscribe(Subscribers.from(cache)); /* * Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values. diff --git a/rxjava-core/src/main/java/rx/operators/OperatorConcat.java b/rxjava-core/src/main/java/rx/operators/OperatorConcat.java index 7b6a21eafe..26ffbe04f5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorConcat.java @@ -16,7 +16,7 @@ package rx.operators; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; @@ -30,30 +30,32 @@ * other. *

* + * @param the source and result value type */ public final class OperatorConcat implements Operator> { - final NotificationLite> nl = NotificationLite.instance(); @Override public Subscriber> call(final Subscriber child) { final SerializedSubscriber s = new SerializedSubscriber(child); final SerialSubscription current = new SerialSubscription(); child.add(current); - return new ConcatSubscriber(s, current); + return new ConcatSubscriber(s, current); } - final class ConcatSubscriber extends Subscriber> { - + static final class ConcatSubscriber extends Subscriber> { + final NotificationLite> nl = NotificationLite.instance(); private final Subscriber s; private final SerialSubscription current; final ConcurrentLinkedQueue queue; - final AtomicInteger wip; + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip"); public ConcatSubscriber(Subscriber s, SerialSubscription current) { super(s); this.s = s; this.current = current; this.queue = new ConcurrentLinkedQueue(); - this.wip = new AtomicInteger(); add(Subscriptions.create(new Action0() { @Override public void call() { @@ -65,7 +67,7 @@ public void call() { @Override public void onNext(Observable t) { queue.add(nl.next(t)); - if (wip.getAndIncrement() == 0) { + if (WIP_UPDATER.getAndIncrement(this) == 0) { subscribeNext(); } } @@ -79,11 +81,15 @@ public void onError(Throwable e) { @Override public void onCompleted() { queue.add(nl.completed()); - if (wip.getAndIncrement() == 0) { + if (WIP_UPDATER.getAndIncrement(this) == 0) { + subscribeNext(); + } + } + void completeInner() { + if (WIP_UPDATER.decrementAndGet(this) > 0) { subscribeNext(); } } - void subscribeNext() { Object o = queue.poll(); if (nl.isCompleted(o)) { @@ -105,9 +111,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { - if (wip.decrementAndGet() > 0) { - subscribeNext(); - } + completeInner(); } }; diff --git a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java index b530bd9d86..8bdb0709e5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java @@ -1,24 +1,23 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable.OnSubscribe; import rx.Observable.Operator; @@ -35,126 +34,145 @@ * grouped items as Observables, one Observable per group. *

* + * @param the key type + * @param the source and group value type */ public final class OperatorGroupBy implements Operator, T> { - + final Func1 keySelector; - + public OperatorGroupBy(final Func1 keySelector) { this.keySelector = keySelector; } - + @Override public Subscriber call(final Subscriber> childObserver) { - // a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle - // and will unsubscribe on this parent if they are all unsubscribed - return new Subscriber(new CompositeSubscription()) { - private final Map> groups = new HashMap>(); - private final AtomicInteger completionCounter = new AtomicInteger(0); - private final AtomicBoolean completionEmitted = new AtomicBoolean(false); - private final AtomicBoolean terminated = new AtomicBoolean(false); - - @Override - public void onCompleted() { - if (terminated.compareAndSet(false, true)) { - // if we receive onCompleted from our parent we onComplete children - for (BufferUntilSubscriber ps : groups.values()) { - ps.onCompleted(); - } - - // special case for empty (no groups emitted) - if (completionCounter.get() == 0) { - // we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted - if (completionEmitted.compareAndSet(false, true)) { - childObserver.onCompleted(); - } + return new GroupBySubscriber(keySelector, childObserver); + } + static final class GroupBySubscriber extends Subscriber { + final Func1 keySelector; + final Subscriber> childObserver; + public GroupBySubscriber(Func1 keySelector, Subscriber> childObserver) { + // a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle + // and will unsubscribe on this parent if they are all unsubscribed + super(new CompositeSubscription()); + this.keySelector = keySelector; + this.childObserver = childObserver; + } + private final Map> groups = new HashMap>(); + volatile int completionCounter; + volatile int completionEmitted; + volatile int terminated; + + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater COUNTER_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "completionCounter"); + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater EMITTED_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "completionEmitted"); + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater TERMINATED_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "terminated"); + + @Override + public void onCompleted() { + if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) { + // if we receive onCompleted from our parent we onComplete children + for (BufferUntilSubscriber ps : groups.values()) { + ps.onCompleted(); + } + + // special case for empty (no groups emitted) + if (completionCounter == 0) { + // we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted + if (EMITTED_UPDATER.compareAndSet(this, 0, 1)) { + childObserver.onCompleted(); } } } - - @Override - public void onError(Throwable e) { - if (terminated.compareAndSet(false, true)) { - // we immediately tear everything down if we receive an error - childObserver.onError(e); - } + } + + @Override + public void onError(Throwable e) { + if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) { + // we immediately tear everything down if we receive an error + childObserver.onError(e); } - - @Override - public void onNext(T t) { - try { - final K key = keySelector.call(t); - BufferUntilSubscriber gps = groups.get(key); - if (gps == null) { - // this group doesn't exist - if (childObserver.isUnsubscribed()) { - // we have been unsubscribed on the outer so won't send any more groups - return; - } - gps = BufferUntilSubscriber.create(); - final BufferUntilSubscriber _gps = gps; - - GroupedObservable go = new GroupedObservable(key, new OnSubscribe() { - - @Override - public void call(final Subscriber o) { - // number of children we have running - completionCounter.incrementAndGet(); - o.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - completeInner(); - } - - })); - _gps.unsafeSubscribe(new Subscriber(o) { - - @Override - public void onCompleted() { - o.onCompleted(); - completeInner(); - } - - @Override - public void onError(Throwable e) { - o.onError(e); - } - - @Override - public void onNext(T t) { - o.onNext(t); - } - - }); - } - - }); - groups.put(key, gps); - childObserver.onNext(go); + } + + @Override + public void onNext(T t) { + try { + final K key = keySelector.call(t); + BufferUntilSubscriber gps = groups.get(key); + if (gps == null) { + // this group doesn't exist + if (childObserver.isUnsubscribed()) { + // we have been unsubscribed on the outer so won't send any more groups + return; } - // we have the correct group so send value to it - gps.onNext(t); - } catch (Throwable e) { - onError(OnErrorThrowable.addValueAsLastCause(e, t)); + gps = BufferUntilSubscriber.create(); + final BufferUntilSubscriber _gps = gps; + + GroupedObservable go = new GroupedObservable(key, new OnSubscribe() { + + @Override + public void call(final Subscriber o) { + // number of children we have running + COUNTER_UPDATER.incrementAndGet(GroupBySubscriber.this); + o.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + completeInner(); + } + + })); + _gps.unsafeSubscribe(new Subscriber(o) { + + @Override + public void onCompleted() { + o.onCompleted(); + completeInner(); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onNext(T t) { + o.onNext(t); + } + + }); + } + + }); + groups.put(key, gps); + childObserver.onNext(go); } + // we have the correct group so send value to it + gps.onNext(t); + } catch (Throwable e) { + onError(OnErrorThrowable.addValueAsLastCause(e, t)); } - - private void completeInner() { - // count can be < 0 because unsubscribe also calls this - if (completionCounter.decrementAndGet() <= 0 && (terminated.get() || childObserver.isUnsubscribed())) { - // completionEmitted ensures we only emit onCompleted once - if (completionEmitted.compareAndSet(false, true)) { - if (childObserver.isUnsubscribed()) { - // if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up. - unsubscribe(); - } - childObserver.onCompleted(); + } + + private void completeInner() { + // count can be < 0 because unsubscribe also calls this + if (COUNTER_UPDATER.decrementAndGet(this) <= 0 && (terminated == 1 || childObserver.isUnsubscribed())) { + // completionEmitted ensures we only emit onCompleted once + if (EMITTED_UPDATER.compareAndSet(this, 0, 1)) { + if (childObserver.isUnsubscribed()) { + // if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up. + unsubscribe(); } + childObserver.onCompleted(); } } - - }; + } + } - } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java index 5b4091310a..03a6afdeab 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java @@ -15,7 +15,7 @@ */ package rx.operators; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Observable.Operator; @@ -30,6 +30,8 @@ *

* You can combine the items emitted by multiple Observables so that they act like a single * Observable, by using the merge operation. + * + * @param the source and merged value type */ public final class OperatorMerge implements Operator> { @@ -39,73 +41,82 @@ public Subscriber> call(final Subscriber oute final Subscriber o = new SerializedSubscriber(outerOperation); final CompositeSubscription childrenSubscriptions = new CompositeSubscription(); outerOperation.add(childrenSubscriptions); + + return new MergeSubscriber(o, childrenSubscriptions); - return new Subscriber>(outerOperation) { - - private volatile boolean completed = false; - private final AtomicInteger runningCount = new AtomicInteger(); - - @Override - public void onCompleted() { - completed = true; - if (runningCount.get() == 0) { - o.onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - o.onError(e); + } + static final class MergeSubscriber extends Subscriber> { + final Subscriber actual; + final CompositeSubscription childrenSubscriptions; + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(MergeSubscriber.class, "wip"); + + public MergeSubscriber(Subscriber actual, CompositeSubscription childrenSubscriptions) { + super(actual); + this.actual = actual; + this.childrenSubscriptions = childrenSubscriptions; + WIP_UPDATER.lazySet(this, 1); + } + + @Override + public void onNext(Observable t) { + WIP_UPDATER.incrementAndGet(this); + Subscriber i = new InnerSubscriber(this); + childrenSubscriptions.add(i); + t.unsafeSubscribe(i); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + if (WIP_UPDATER.decrementAndGet(this) == 0) { + actual.onCompleted(); } - - @Override - public void onNext(Observable innerObservable) { - runningCount.incrementAndGet(); - Subscriber i = new InnerObserver(); - childrenSubscriptions.add(i); - innerObservable.unsafeSubscribe(i); + } + + } + static final class InnerSubscriber extends Subscriber { + final Subscriber actual; + final MergeSubscriber parent; + /** Make sure the inner termination events are delivered only once. */ + volatile int once; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ONCE_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "once"); + + public InnerSubscriber(MergeSubscriber parent) { + this.parent = parent; + this.actual = parent.actual; + } + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public void onError(Throwable e) { + if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { + parent.onError(e); } - - final class InnerObserver extends Subscriber { - - private boolean innerCompleted = false; - - public InnerObserver() { + } + + @Override + public void onCompleted() { + if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { + try { + parent.onCompleted(); + } finally { + parent.childrenSubscriptions.remove(this); } - - @Override - public void onCompleted() { - if (!innerCompleted) { - // we check if already completed otherwise a misbehaving Observable that emits onComplete more than once - // will cause the runningCount to decrement multiple times. - innerCompleted = true; - if (runningCount.decrementAndGet() == 0 && completed) { - o.onCompleted(); - } - cleanup(); - } - } - - @Override - public void onError(Throwable e) { - o.onError(e); - cleanup(); - } - - @Override - public void onNext(T a) { - o.onNext(a); - } - - private void cleanup() { - // remove subscription onCompletion so it cleans up immediately and doesn't memory leak - // see https://github.com/Netflix/RxJava/issues/897 - childrenSubscriptions.remove(this); - } - - }; - - }; - + } + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java index fd25aa544f..b5b8b72fc9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java @@ -1,22 +1,22 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; @@ -39,95 +39,109 @@ *

* NOTE: If this is used on an Observable that never completes, it will never call * onError and will effectively swallow errors. - * + * * @param the source and result value type */ public final class OperatorMergeDelayError implements Operator> { - + @Override public Subscriber> call(Subscriber child) { final SerializedSubscriber s = new SerializedSubscriber(child); final CompositeSubscription csub = new CompositeSubscription(); child.add(csub); - final AtomicInteger wip = new AtomicInteger(1); + + return new MergeDelayErrorSubscriber(s, csub); + } + + static final class MergeDelayErrorSubscriber extends Subscriber> { + final Subscriber s; + final CompositeSubscription csub; final ConcurrentLinkedQueue exceptions = new ConcurrentLinkedQueue(); - return new Subscriber>() { - - @Override - public void onNext(Observable t) { - wip.incrementAndGet(); - - Subscriber itemSub = new Subscriber() { - /** Make sure terminal events are handled once to avoid wip problems. */ - boolean once = true; - @Override - public void onNext(T t) { - // prevent misbehaving source to emit past the error - if (once) { - try { - s.onNext(t); - } catch (Throwable e) { - // in case the source doesn't properly handle exceptions - onError(e); - } + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(MergeDelayErrorSubscriber.class, "wip"); + + public MergeDelayErrorSubscriber(Subscriber s, CompositeSubscription csub) { + super(s); + this.s = s; + this.csub = csub; + WIP_UPDATER.lazySet(this, 1); + } + + @Override + public void onNext(Observable t) { + WIP_UPDATER.incrementAndGet(this); + + Subscriber itemSub = new Subscriber() { + /** Make sure terminal events are handled once to avoid wip problems. */ + boolean once = true; + @Override + public void onNext(T t) { + // prevent misbehaving source to emit past the error + if (once) { + try { + s.onNext(t); + } catch (Throwable e) { + // in case the source doesn't properly handle exceptions + onError(e); } } - - @Override - public void onError(Throwable e) { - if (once) { - once = false; - error(e); - } + } + + @Override + public void onError(Throwable e) { + if (once) { + once = false; + error(e); } - - @Override - public void onCompleted() { - if (once) { - once = false; - try { - complete(); - } finally { - csub.remove(this); - } + } + + @Override + public void onCompleted() { + if (once) { + once = false; + try { + complete(); + } finally { + csub.remove(this); } } - - }; - csub.add(itemSub); + } - t.unsafeSubscribe(itemSub); - } - - @Override - public void onError(Throwable e) { - error(e); - } - - @Override - public void onCompleted() { - complete(); - } - void error(Throwable e) { - exceptions.add(e); - complete(); - } - void complete() { - if (wip.decrementAndGet() == 0) { - if (exceptions.isEmpty()) { - s.onCompleted(); - } else + }; + csub.add(itemSub); + + t.unsafeSubscribe(itemSub); + } + + @Override + public void onError(Throwable e) { + error(e); + } + + @Override + public void onCompleted() { + complete(); + } + void error(Throwable e) { + exceptions.add(e); + complete(); + } + void complete() { + if (WIP_UPDATER.decrementAndGet(this) == 0) { + if (exceptions.isEmpty()) { + s.onCompleted(); + } else if (exceptions.size() > 1) { s.onError(new CompositeException(exceptions)); } else { s.onError(exceptions.peek()); } - exceptions.clear(); - unsubscribe(); - } + exceptions.clear(); + unsubscribe(); } - - }; + } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java index 947347e882..9a1f437f22 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java @@ -1,21 +1,21 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ package rx.operators; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; @@ -27,7 +27,7 @@ /** * Observable that pairs up the source values and all the derived collection * values and projects them via the selector. - * + * * @param the input value type * @param the derived collection value type * @param the result type @@ -45,76 +45,96 @@ public Observable call(T t1) { final Func1> collectionSelector; final Func2 resultSelector; - - public OperatorMergeMapPair(Func1> collectionSelector, + + public OperatorMergeMapPair(Func1> collectionSelector, Func2 resultSelector) { this.collectionSelector = collectionSelector; this.resultSelector = resultSelector; } - + @Override public Subscriber call(Subscriber child) { final SerializedSubscriber s = new SerializedSubscriber(child); final CompositeSubscription csub = new CompositeSubscription(); child.add(csub); - - return new Subscriber(child) { - final AtomicInteger wip = new AtomicInteger(1); - final Subscriber self = this; - @Override - public void onNext(final T t) { - Observable collection; - try { - collection = collectionSelector.call(t); - } catch (Throwable e) { - onError(e); - return; - } - - Subscriber collectionSub = new Subscriber() { - - @Override - public void onNext(U u) { - try { - s.onNext(resultSelector.call(t, u)); - } catch (Throwable e) { - onError(e); - } - } + + return new SourceSubscriber(s, csub, collectionSelector, resultSelector); + } + static final class SourceSubscriber extends Subscriber { + final Subscriber s; + final CompositeSubscription csub; + final Func1> collectionSelector; + final Func2 resultSelector; - @Override - public void onError(Throwable e) { - self.onError(e); - } + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "wip"); - @Override - public void onCompleted() { - try { - self.onCompleted(); - } finally { - csub.remove(this); - } + public SourceSubscriber(Subscriber s, CompositeSubscription csub, + Func1> collectionSelector, + Func2 resultSelector) { + super(s); + this.s = s; + this.csub = csub; + this.collectionSelector = collectionSelector; + this.resultSelector = resultSelector; + WIP_UPDATER.lazySet(this, 1); + } + + @Override + public void onNext(final T t) { + Observable collection; + try { + collection = collectionSelector.call(t); + } catch (Throwable e) { + onError(e); + return; + } + + Subscriber collectionSub = new Subscriber() { + + @Override + public void onNext(U u) { + try { + s.onNext(resultSelector.call(t, u)); + } catch (Throwable e) { + onError(e); } - }; - csub.add(collectionSub); - wip.incrementAndGet(); + } - collection.unsafeSubscribe(collectionSub); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - unsubscribe(); - } - - @Override - public void onCompleted() { - if (wip.decrementAndGet() == 0) { - s.onCompleted(); + @Override + public void onError(Throwable e) { + SourceSubscriber.this.onError(e); } - } + + @Override + public void onCompleted() { + try { + SourceSubscriber.this.onCompleted(); + } finally { + csub.remove(this); + } + } + }; + csub.add(collectionSub); + WIP_UPDATER.incrementAndGet(this); - }; + collection.unsafeSubscribe(collectionSub); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + if (WIP_UPDATER.decrementAndGet(this) == 0) { + s.onCompleted(); + } + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java index 88d49cfb57..1799e071eb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java @@ -1,21 +1,21 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ package rx.operators; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; @@ -27,7 +27,7 @@ /** * Projects the notification of an observable sequence to an observable * sequence and merges the results into one. - * + * * @param the input value type * @param the output value type */ @@ -35,97 +35,117 @@ public final class OperatorMergeMapTransform implements Operator { final Func1> onNext; final Func1> onError; final Func0> onCompleted; - - public OperatorMergeMapTransform(Func1> onNext, - Func1> onError, + + public OperatorMergeMapTransform(Func1> onNext, + Func1> onError, Func0> onCompleted) { this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } - + @Override public Subscriber call(Subscriber child) { final SerializedSubscriber s = new SerializedSubscriber(child); final CompositeSubscription csub = new CompositeSubscription(); child.add(csub); - return new Subscriber(child) { - final AtomicInteger wip = new AtomicInteger(1); - @Override - public void onNext(T t) { - Observable o; - try { - o = onNext.call(t); - } catch (Throwable e) { - error(e); - return; - } - subscribeTo(o); - } - - @Override - public void onError(Throwable e) { - Observable o; - try { - o = onError.call(e); - } catch (Throwable t) { - error(t); - return; - } - subscribeTo(o); - finish(); + return new SourceSubscriber(s, csub, onNext, onError, onCompleted); + } + static final class SourceSubscriber extends Subscriber { + final Subscriber s; + final CompositeSubscription csub; + final Func1> onNext; + final Func1> onError; + final Func0> onCompleted; + + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "wip"); + + public SourceSubscriber(Subscriber s, CompositeSubscription csub, Func1> onNext, Func1> onError, Func0> onCompleted) { + super(s); + this.s = s; + this.csub = csub; + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + WIP_UPDATER.lazySet(this, 1); + } + + @Override + public void onNext(T t) { + Observable o; + try { + o = onNext.call(t); + } catch (Throwable e) { + error(e); + return; } - - @Override - public void onCompleted() { - Observable o; - try { - o = onCompleted.call(); - } catch (Throwable e) { - error(e); - return; - } - subscribeTo(o); - finish(); + subscribeTo(o); + } + + @Override + public void onError(Throwable e) { + Observable o; + try { + o = onError.call(e); + } catch (Throwable t) { + error(t); + return; } - void finish() { - if (wip.decrementAndGet() == 0) { - s.onCompleted(); - } + subscribeTo(o); + finish(); + } + + @Override + public void onCompleted() { + Observable o; + try { + o = onCompleted.call(); + } catch (Throwable e) { + error(e); + return; } - void error(Throwable t) { - s.onError(t); - unsubscribe(); + subscribeTo(o); + finish(); + } + void finish() { + if (WIP_UPDATER.decrementAndGet(this) == 0) { + s.onCompleted(); } - void subscribeTo(Observable o) { - Subscriber oSub = new Subscriber() { - - @Override - public void onNext(R t) { - s.onNext(t); - } - - @Override - public void onError(Throwable e) { - error(e); - } - - @Override - public void onCompleted() { - try { - finish(); - } finally { - csub.remove(this); - } + } + void error(Throwable t) { + s.onError(t); + unsubscribe(); + } + void subscribeTo(Observable o) { + Subscriber oSub = new Subscriber() { + + @Override + public void onNext(R t) { + s.onNext(t); + } + + @Override + public void onError(Throwable e) { + error(e); + } + + @Override + public void onCompleted() { + try { + finish(); + } finally { + csub.remove(this); } - }; - csub.add(oSub); - wip.incrementAndGet(); - - o.unsafeSubscribe(oSub); - } - }; + } + }; + csub.add(oSub); + WIP_UPDATER.incrementAndGet(this); + + o.unsafeSubscribe(oSub); + } } - } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeMaxConcurrent.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeMaxConcurrent.java index 2880125fa5..67b01ebb35 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMergeMaxConcurrent.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeMaxConcurrent.java @@ -1,23 +1,23 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ package rx.operators; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; @@ -31,12 +31,12 @@ *

* You can combine the items emitted by multiple Observables so that they act like a single * Observable, by using the merge operation. - * + * * @param the emitted value type */ public final class OperatorMergeMaxConcurrent implements Operator> { final int maxConcurrency; - + public OperatorMergeMaxConcurrent(int maxConcurrency) { this.maxConcurrency = maxConcurrency; } @@ -46,82 +46,99 @@ public Subscriber> call(Subscriber ch final SerializedSubscriber s = new SerializedSubscriber(child); final CompositeSubscription csub = new CompositeSubscription(); child.add(csub); - return new Subscriber>(child) { - final Subscriber self = this; - final AtomicInteger wip = new AtomicInteger(1); - final Object guard = new Object(); - /** Guarded by guard. */ - int active; - /** Guarded by guard. */ - Queue> queue = new LinkedList>(); + + return new SourceSubscriber(maxConcurrency, s, csub); + } + static final class SourceSubscriber extends Subscriber> { + final int maxConcurrency; + final Subscriber s; + final CompositeSubscription csub; + final Object guard; - @Override - public void onNext(Observable t) { - synchronized (guard) { - queue.add(t); + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "wip"); + + /** Guarded by guard. */ + int active; + /** Guarded by guard. */ + final Queue> queue; + + public SourceSubscriber(int maxConcurrency, Subscriber s, CompositeSubscription csub) { + super(s); + this.maxConcurrency = maxConcurrency; + this.s = s; + this.csub = csub; + this.guard = new Object(); + this.queue = new LinkedList>(); + WIP_UPDATER.lazySet(this, 1); + } + + @Override + public void onNext(Observable t) { + synchronized (guard) { + queue.add(t); + } + subscribeNext(); + } + + void subscribeNext() { + Observable t; + synchronized (guard) { + t = queue.peek(); + if (t == null || active >= maxConcurrency) { + return; } - subscribeNext(); + active++; + queue.poll(); } - - void subscribeNext() { - Observable t; - synchronized (guard) { - t = queue.peek(); - if (t == null || active >= maxConcurrency) { - return; - } - active++; - queue.poll(); + + Subscriber itemSub = new Subscriber() { + boolean once = true; + @Override + public void onNext(T t) { + s.onNext(t); } - wip.incrementAndGet(); - - Subscriber itemSub = new Subscriber() { - boolean once = true; - @Override - public void onNext(T t) { - s.onNext(t); - } - - @Override - public void onError(Throwable e) { - self.onError(e); - } - - @Override - public void onCompleted() { - if (once) { - once = false; - synchronized (guard) { - active--; - } - csub.remove(this); - - subscribeNext(); - - self.onCompleted(); + + @Override + public void onError(Throwable e) { + SourceSubscriber.this.onError(e); + } + + @Override + public void onCompleted() { + if (once) { + once = false; + synchronized (guard) { + active--; } + csub.remove(this); + + subscribeNext(); + + SourceSubscriber.this.onCompleted(); } - - }; - csub.add(itemSub); - - t.unsafeSubscribe(itemSub); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - unsubscribe(); - } - - @Override - public void onCompleted() { - if (wip.decrementAndGet() == 0) { - s.onCompleted(); } - } + + }; + csub.add(itemSub); + WIP_UPDATER.incrementAndGet(this); - }; + t.unsafeSubscribe(itemSub); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + if (WIP_UPDATER.decrementAndGet(this) == 0) { + s.onCompleted(); + } + } } - } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index 972abf58f6..a55b0b4797 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -15,9 +15,7 @@ */ package rx.operators; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Observable.Operator; import rx.Scheduler; @@ -30,8 +28,10 @@ * Delivers events on the specified Scheduler asynchronously via an unbounded buffer. * * + * + * @param the transmitted value type */ -public class OperatorObserveOn implements Operator { +public final class OperatorObserveOn implements Operator { private final Scheduler scheduler; @@ -56,13 +56,17 @@ public Subscriber call(Subscriber child) { } /** Observe through individual queue per observer. */ - private static class ObserveOnSubscriber extends Subscriber { - private final NotificationLite on = NotificationLite.instance(); + private static final class ObserveOnSubscriber extends Subscriber { final Subscriber observer; private final Scheduler.Worker recursiveScheduler; - + final NotificationLite on = NotificationLite.instance(); + /** Guarded by this. */ private FastList queue = new FastList(); - final AtomicLong counter = new AtomicLong(0); + + volatile long counter; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater COUNTER_UPDATER + = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter"); public ObserveOnSubscriber(Scheduler scheduler, Subscriber subscriber) { super(subscriber); @@ -96,7 +100,7 @@ public void onError(final Throwable e) { } protected void schedule() { - if (counter.getAndIncrement() == 0) { + if (COUNTER_UPDATER.getAndIncrement(this) == 0) { recursiveScheduler.schedule(new Action0() { @Override @@ -121,7 +125,7 @@ private void pollQueue() { } on.accept(observer, v); } - if (counter.addAndGet(-vs.size) == 0) { + if (COUNTER_UPDATER.addAndGet(this, -vs.size) == 0) { break; } } while (true); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorParallelMerge.java b/rxjava-core/src/main/java/rx/operators/OperatorParallelMerge.java index 1e5e4fc2d4..5d74a41982 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorParallelMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorParallelMerge.java @@ -15,8 +15,8 @@ */ package rx.operators; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Observable; import rx.Scheduler; import rx.functions.Func1; @@ -32,14 +32,8 @@ public static Observable> parallelMerge(final Observable Observable> parallelMerge(final Observable> source, final int parallelObservables, final Scheduler scheduler) { - return source.groupBy(new Func1, Integer>() { - final AtomicLong rollingCount = new AtomicLong(); - - @Override - public Integer call(Observable o) { - return (int) rollingCount.incrementAndGet() % parallelObservables; - } - }).map(new Func1>, Observable>() { + return source.groupBy(new StrideMapper(parallelObservables)) + .map(new Func1>, Observable>() { @Override public Observable call(GroupedObservable> o) { @@ -47,7 +41,24 @@ public Observable call(GroupedObservable> o) { } }); - } + /** Maps source observables in a round-robin fashion to streaming groups. */ + static final class StrideMapper implements Func1, Integer> { + final int parallelObservables; + + volatile long rollingCount; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater ROLLING_COUNT_UPDATER + = AtomicLongFieldUpdater.newUpdater(StrideMapper.class, "rollingCount"); + public StrideMapper(int parallelObservables) { + this.parallelObservables = parallelObservables; + } + + @Override + public Integer call(Observable t1) { + return (int)ROLLING_COUNT_UPDATER.incrementAndGet(this) % parallelObservables; + } + + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorPivot.java b/rxjava-core/src/main/java/rx/operators/OperatorPivot.java index 5b9c6742fd..c803bb12b1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorPivot.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorPivot.java @@ -20,7 +20,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import rx.Observable.OnSubscribe; @@ -31,12 +31,12 @@ import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; -public class OperatorPivot implements Operator>, GroupedObservable>> { +public final class OperatorPivot implements Operator>, GroupedObservable>> { @Override public Subscriber>> call(final Subscriber>> child) { final AtomicReference state = new AtomicReference(State.create()); - final OperatorPivot.PivotSubscriber pivotSubscriber = new PivotSubscriber(new CompositeSubscription(), child, state); + final PivotSubscriber pivotSubscriber = new PivotSubscriber(new CompositeSubscription(), child, state); child.add(Subscriptions.create(new Action0() { @Override @@ -60,7 +60,7 @@ public void call() { return pivotSubscriber; } - private final class PivotSubscriber extends Subscriber>> { + private static final class PivotSubscriber extends Subscriber>> { /* * needs to decouple the subscription as the inner subscriptions need a separate lifecycle * and will unsubscribe on this parent if they are all unsubscribed @@ -155,12 +155,17 @@ public void onNext(T t) { } - private static class GroupState { + private static final class GroupState { private final ConcurrentHashMap, Inner> innerSubjects = new ConcurrentHashMap, Inner>(); private final ConcurrentHashMap> outerSubjects = new ConcurrentHashMap>(); - private final AtomicBoolean completeEmitted = new AtomicBoolean(); private final CompositeSubscription parentSubscription; private final Subscriber>> child; + /** Indicates a terminal state. */ + volatile int completed; + /** Field updater for completed. */ + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater COMPLETED_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed"); public GroupState(CompositeSubscription parentSubscription, Subscriber>> child) { this.parentSubscription = parentSubscription; @@ -169,7 +174,7 @@ public GroupState(CompositeSubscription parentSubscription, Subscriber state, K1 key) { State current; - State newState = null; + State newState; do { current = state.get(); newState = current.addK1(key); @@ -191,7 +196,7 @@ public void completeK1Group(AtomicReference state, K1 key) { public void startK1K2Group(AtomicReference state, KeyPair keyPair) { State current; - State newState = null; + State newState; do { current = state.get(); newState = current.addK1k2(keyPair); @@ -212,7 +217,7 @@ public void completeK1K2Group(AtomicReference state, KeyPair keyP } public void completeAll(State state) { - if (completeEmitted.compareAndSet(false, true)) { + if (COMPLETED_UPDATER.compareAndSet(this, 0, 1)) { /* * after we are completely done emitting we can now shut down the groups */ @@ -284,7 +289,7 @@ private Outer getOrCreateOuter(final AtomicReference state, fi } } - private static class Inner { + private static final class Inner { private final BufferUntilSubscriber subscriber; private final GroupedObservable group; @@ -336,7 +341,7 @@ public void onNext(T t) { } } - private static class Outer { + private static final class Outer { private final BufferUntilSubscriber> subscriber; private final GroupedObservable> group; @@ -380,7 +385,7 @@ public void onNext(GroupedObservable t) { } } - private static class State { + private static final class State { private final boolean unsubscribed; private final boolean completed; private final Set k1Keys; @@ -430,11 +435,11 @@ public State unsubscribe() { } public boolean shouldComplete() { - if (k1Keys.size() == 0 && completed) { + if (k1Keys.isEmpty() && completed) { return true; } else if (unsubscribed) { // if unsubscribed and all groups are completed/unsubscribed we can complete - return k1k2Keys.size() == 0; + return k1k2Keys.isEmpty(); } else { return false; } @@ -446,7 +451,7 @@ public String toString() { } } - private static class KeyPair { + private static final class KeyPair { private final K1 k1; private final K2 k2; diff --git a/rxjava-core/src/main/java/rx/operators/OperatorRetry.java b/rxjava-core/src/main/java/rx/operators/OperatorRetry.java index 0422688b32..1c2d431ecb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorRetry.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorRetry.java @@ -31,7 +31,7 @@ * limitations under the License. */ -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Observable.Operator; @@ -41,7 +41,7 @@ import rx.schedulers.Schedulers; import rx.subscriptions.SerialSubscription; -public class OperatorRetry implements Operator> { +public final class OperatorRetry implements Operator> { private static final int INFINITE_RETRY = -1; @@ -65,10 +65,31 @@ public Subscriber> call(final Subscriber child) final SerialSubscription serialSubscription = new SerialSubscription(); // add serialSubscription so it gets unsubscribed if child is unsubscribed child.add(serialSubscription); - return new Subscriber>(child) { - final AtomicInteger attempts = new AtomicInteger(0); - - @Override + + return new SourceSubscriber(child, retryCount, inner, serialSubscription); + } + + static final class SourceSubscriber extends Subscriber> { + final Subscriber child; + final int retryCount; + final Scheduler.Worker inner; + final SerialSubscription serialSubscription; + + volatile int attempts; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ATTEMPTS_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts"); + + public SourceSubscriber(Subscriber child, int retryCount, Scheduler.Worker inner, + SerialSubscription serialSubscription) { + this.child = child; + this.retryCount = retryCount; + this.inner = inner; + this.serialSubscription = serialSubscription; + } + + + @Override public void onCompleted() { // ignore as we expect a single nested Observable } @@ -85,7 +106,7 @@ public void onNext(final Observable o) { @Override public void call() { final Action0 _self = this; - attempts.incrementAndGet(); + ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this); // new subscription each time so if it unsubscribes itself it does not prevent retries // by unsubscribing the child subscription @@ -98,7 +119,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !inner.isUnsubscribed()) { + if ((retryCount == INFINITE_RETRY || attempts <= retryCount) && !inner.isUnsubscribed()) { // retry again inner.schedule(_self); } else { @@ -119,7 +140,5 @@ public void onNext(T v) { } }); } - - }; } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java b/rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java index 31642ea439..1bc48b8033 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java @@ -16,7 +16,7 @@ package rx.operators; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observable.Operator; import rx.Scheduler; import rx.Scheduler.Worker; @@ -61,14 +61,18 @@ static final class SamplerSubscriber extends Subscriber implements Action0 private final Subscriber subscriber; /** Indicates that no value is available. */ private static final Object EMPTY_TOKEN = new Object(); - final AtomicReference value; + /** The shared value between the observer and the timed action. */ + volatile Object value = EMPTY_TOKEN; + /** Updater for the value field. */ + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater VALUE_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(SamplerSubscriber.class, Object.class, "value"); public SamplerSubscriber(Subscriber subscriber) { this.subscriber = subscriber; - this.value = new AtomicReference(EMPTY_TOKEN); } @Override public void onNext(T t) { - value.set(t); + VALUE_UPDATER.lazySet(this, t); } @Override @@ -85,7 +89,7 @@ public void onCompleted() { @Override public void call() { - Object localValue = value.getAndSet(EMPTY_TOKEN); + Object localValue = VALUE_UPDATER.getAndSet(this, EMPTY_TOKEN); if (localValue != EMPTY_TOKEN) { try { @SuppressWarnings("unchecked") diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java b/rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java index 10aa4ffd67..808e650835 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTimeoutBase.java @@ -16,8 +16,8 @@ package rx.operators; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Observable; import rx.Observable.Operator; @@ -80,8 +80,6 @@ public Subscriber call(Subscriber subscriber) { /* package-private */static class TimeoutSubscriber extends Subscriber { - private final AtomicBoolean terminated = new AtomicBoolean(false); - private final AtomicLong actual = new AtomicLong(0L); private final SerialSubscription serial; private final Object gate = new Object(); @@ -91,6 +89,16 @@ public Subscriber call(Subscriber subscriber) { private final Observable other; private final Scheduler.Worker inner; + + volatile int terminated; + volatile long actual; + + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater TERMINATED_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(TimeoutSubscriber.class, "terminated"); + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater ACTUAL_UPDATER + = AtomicLongFieldUpdater.newUpdater(TimeoutSubscriber.class, "actual"); private TimeoutSubscriber( SerializedSubscriber serializedSubscriber, @@ -109,14 +117,14 @@ private TimeoutSubscriber( public void onNext(T value) { boolean onNextWins = false; synchronized (gate) { - if (!terminated.get()) { - actual.incrementAndGet(); + if (terminated == 0) { + ACTUAL_UPDATER.incrementAndGet(this); onNextWins = true; } } if (onNextWins) { serializedSubscriber.onNext(value); - serial.set(timeoutStub.call(this, actual.get(), value, inner)); + serial.set(timeoutStub.call(this, actual, value, inner)); } } @@ -124,7 +132,7 @@ public void onNext(T value) { public void onError(Throwable error) { boolean onErrorWins = false; synchronized (gate) { - if (!terminated.getAndSet(true)) { + if (TERMINATED_UPDATER.getAndSet(this, 1) == 0) { onErrorWins = true; } } @@ -138,7 +146,7 @@ public void onError(Throwable error) { public void onCompleted() { boolean onCompletedWins = false; synchronized (gate) { - if (!terminated.getAndSet(true)) { + if (TERMINATED_UPDATER.getAndSet(this, 1) == 0) { onCompletedWins = true; } } @@ -152,7 +160,7 @@ public void onTimeout(long seqId) { long expected = seqId; boolean timeoutWins = false; synchronized (gate) { - if (expected == actual.get() && !terminated.getAndSet(true)) { + if (expected == actual && TERMINATED_UPDATER.getAndSet(this, 1) == 0) { timeoutWins = true; } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorZip.java b/rxjava-core/src/main/java/rx/operators/OperatorZip.java index 8a2b7d8e9e..ec6034553d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorZip.java @@ -16,7 +16,7 @@ package rx.operators; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Observable; import rx.Observable.Operator; @@ -48,6 +48,7 @@ *

* The resulting Observable returned from zip will invoke onNext as many times as the * number of onNext invocations of the source Observable that emits the fewest items. + * @param the result type */ public final class OperatorZip implements Operator[]> { /* @@ -135,14 +136,17 @@ public void onNext(Observable[] observables) { } static final NotificationLite on = NotificationLite.instance(); - private static class Zip { + private static final class Zip { @SuppressWarnings("rawtypes") final Observable[] os; final Object[] observers; final Observer observer; final FuncN zipFunction; final CompositeSubscription childSubscription = new CompositeSubscription(); - + volatile long counter; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater COUNTER_UPDATER + = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter"); @SuppressWarnings("rawtypes") public Zip(Observable[] os, final Subscriber observer, FuncN zipFunction) { @@ -166,8 +170,6 @@ public void zip() { } } - final AtomicLong counter = new AtomicLong(0); - /** * check if we have values for each and emit if we do * @@ -177,7 +179,7 @@ public void zip() { */ @SuppressWarnings("unchecked") void tick() { - if (counter.getAndIncrement() == 0) { + if (COUNTER_UPDATER.getAndIncrement(this) == 0) { do { final Object[] vs = new Object[observers.length]; boolean allHaveValues = true; @@ -212,10 +214,11 @@ void tick() { return; } // now remove them - for (int i = 0; i < observers.length; i++) { - ((InnerObserver) observers[i]).items.poll(); + for (Object obj : observers) { + InnerObserver io = (InnerObserver)obj; + io.items.poll(); // eagerly check if the next item on this queue is an onComplete - if (on.isCompleted(((InnerObserver) observers[i]).items.peek())) { + if (on.isCompleted(io.items.peek())) { // it is an onComplete so shut down observer.onCompleted(); // we need to unsubscribe from all children since children are independently subscribed @@ -224,7 +227,7 @@ void tick() { } } } - } while (counter.decrementAndGet() > 0); + } while (COUNTER_UPDATER.decrementAndGet(this) > 0); } } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java index 408802acb3..a3eb2f193c 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java @@ -15,7 +15,7 @@ */ package rx.plugins; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Registry for plugin implementations that allows global override and handles the retrieval of correct implementation based on order of precedence: @@ -29,10 +29,17 @@ public class RxJavaPlugins { private final static RxJavaPlugins INSTANCE = new RxJavaPlugins(); - private final AtomicReference errorHandler = new AtomicReference(); - private final AtomicReference observableExecutionHook = new AtomicReference(); - private final AtomicReference schedulerOverrides = new AtomicReference(); + volatile RxJavaErrorHandler errorHandler; + volatile RxJavaObservableExecutionHook observableExecutionHook; + volatile RxJavaDefaultSchedulers schedulerOverrides; + static final AtomicReferenceFieldUpdater ERROR_HANDLER_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaErrorHandler.class, "errorHandler"); + static final AtomicReferenceFieldUpdater EXECUTION_HOOK_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaObservableExecutionHook.class, "observableExecutionHook"); + static final AtomicReferenceFieldUpdater SCHEDULER_OVERRIDE_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaDefaultSchedulers.class, "schedulerOverrides"); + public static RxJavaPlugins getInstance() { return INSTANCE; } @@ -41,9 +48,9 @@ public static RxJavaPlugins getInstance() { } - /* package accessible for ujnit tests */void reset() { - INSTANCE.errorHandler.set(null); - INSTANCE.observableExecutionHook.set(null); + /* package accessible for unit tests */void reset() { + ERROR_HANDLER_UPDATER.lazySet(this, null); + EXECUTION_HOOK_UPDATER.lazySet(this, null); } /** @@ -55,19 +62,19 @@ public static RxJavaPlugins getInstance() { * @return {@link RxJavaErrorHandler} implementation to use */ public RxJavaErrorHandler getErrorHandler() { - if (errorHandler.get() == null) { + if (errorHandler == null) { // check for an implementation from System.getProperty first Object impl = getPluginImplementationViaProperty(RxJavaErrorHandler.class); if (impl == null) { // nothing set via properties so initialize with default - errorHandler.compareAndSet(null, RxJavaErrorHandlerDefault.getInstance()); + ERROR_HANDLER_UPDATER.compareAndSet(this, null, RxJavaErrorHandlerDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from the system property so use it - errorHandler.compareAndSet(null, (RxJavaErrorHandler) impl); + ERROR_HANDLER_UPDATER.compareAndSet(this, null, (RxJavaErrorHandler) impl); } } - return errorHandler.get(); + return errorHandler; } /** @@ -79,8 +86,8 @@ public RxJavaErrorHandler getErrorHandler() { * if called more than once or after the default was initialized (if usage occurs before trying to register) */ public void registerErrorHandler(RxJavaErrorHandler impl) { - if (!errorHandler.compareAndSet(null, impl)) { - throw new IllegalStateException("Another strategy was already registered: " + errorHandler.get()); + if (!ERROR_HANDLER_UPDATER.compareAndSet(this, null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + errorHandler); } } @@ -93,19 +100,19 @@ public void registerErrorHandler(RxJavaErrorHandler impl) { * @return {@link RxJavaObservableExecutionHook} implementation to use */ public RxJavaObservableExecutionHook getObservableExecutionHook() { - if (observableExecutionHook.get() == null) { + if (observableExecutionHook == null) { // check for an implementation from System.getProperty first Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class); if (impl == null) { // nothing set via properties so initialize with default - observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance()); + EXECUTION_HOOK_UPDATER.compareAndSet(this, null, RxJavaObservableExecutionHookDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from the system property so use it - observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl); + EXECUTION_HOOK_UPDATER.compareAndSet(this, null, (RxJavaObservableExecutionHook) impl); } } - return observableExecutionHook.get(); + return observableExecutionHook; } /** @@ -117,8 +124,8 @@ public RxJavaObservableExecutionHook getObservableExecutionHook() { * if called more than once or after the default was initialized (if usage occurs before trying to register) */ public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl) { - if (!observableExecutionHook.compareAndSet(null, impl)) { - throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook.get()); + if (!EXECUTION_HOOK_UPDATER.compareAndSet(this, null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook); } } @@ -161,19 +168,19 @@ private static Object getPluginImplementationViaProperty(Class pluginClass) { * @return {@link RxJavaErrorHandler} implementation to use */ public RxJavaDefaultSchedulers getDefaultSchedulers() { - if (schedulerOverrides.get() == null) { + if (schedulerOverrides == null) { // check for an implementation from System.getProperty first Object impl = getPluginImplementationViaProperty(RxJavaDefaultSchedulers.class); if (impl == null) { // nothing set via properties so initialize with default - schedulerOverrides.compareAndSet(null, RxJavaDefaultSchedulersDefault.getInstance()); + SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, RxJavaDefaultSchedulersDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from the system property so use it - schedulerOverrides.compareAndSet(null, (RxJavaDefaultSchedulers) impl); + SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, (RxJavaDefaultSchedulers) impl); } } - return schedulerOverrides.get(); + return schedulerOverrides; } /** @@ -185,8 +192,8 @@ public RxJavaDefaultSchedulers getDefaultSchedulers() { * if called more than once or after the default was initialized (if usage occurs before trying to register) */ public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) { - if (!schedulerOverrides.compareAndSet(null, impl)) { - throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get()); + if (!SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides); } } } diff --git a/rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java b/rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java index 36cd9dd36c..912a4ac7f2 100644 --- a/rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/EventLoopsScheduler.java @@ -17,29 +17,22 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import rx.Scheduler; import rx.Subscription; import rx.functions.Action0; import rx.schedulers.NewThreadScheduler.NewThreadWorker.ScheduledAction; +import rx.schedulers.NewThreadScheduler.RxThreadFactory; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; /* package */class EventLoopsScheduler extends Scheduler { /** Manages a fixed number of workers. */ + private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-"; + private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); + static final class FixedSchedulerPool { final int cores; - final ThreadFactory factory = new ThreadFactory() { - final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - }; final PoolWorker[] eventLoops; long n; @@ -49,7 +42,7 @@ public Thread newThread(Runnable r) { this.cores = Runtime.getRuntime().availableProcessors(); this.eventLoops = new PoolWorker[cores]; for (int i = 0; i < cores; i++) { - this.eventLoops[i] = new PoolWorker(factory); + this.eventLoops[i] = new PoolWorker(THREAD_FACTORY); } } diff --git a/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java b/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java index 5b19b40ba5..30fd7e96b6 100644 --- a/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java +++ b/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java @@ -19,8 +19,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; import rx.Scheduler; @@ -33,8 +31,11 @@ * the work asynchronously on the appropriate {@link Scheduler} implementation. This means for example that you would not use this approach * along with {@link TrampolineScheduler} or {@link ImmediateScheduler}. */ -/* package */class GenericScheduledExecutorService { +/* package */final class GenericScheduledExecutorService { + private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-"; + private static final NewThreadScheduler.RxThreadFactory THREAD_FACTORY = new NewThreadScheduler.RxThreadFactory(THREAD_NAME_PREFIX); + private final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService(); private final ScheduledExecutorService executor; @@ -47,18 +48,7 @@ private GenericScheduledExecutorService() { if (count > 8) { count = 8; } - executor = Executors.newScheduledThreadPool(count, new ThreadFactory() { - - final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxScheduledExecutorPool-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - - }); + executor = Executors.newScheduledThreadPool(count, THREAD_FACTORY); } /** diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index 841e6ba936..938c14df2a 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -20,8 +20,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Scheduler; import rx.Subscription; @@ -34,18 +34,28 @@ */ public class NewThreadScheduler extends Scheduler { - private final static NewThreadScheduler INSTANCE = new NewThreadScheduler(); - private final static AtomicLong count = new AtomicLong(); - private final static ThreadFactory THREAD_FACTORY = new ThreadFactory() { + private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler-"; + private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); + private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); + static final class RxThreadFactory implements ThreadFactory { + final String prefix; + volatile long counter; + static final AtomicLongFieldUpdater COUNTER_UPDATER + = AtomicLongFieldUpdater.newUpdater(RxThreadFactory.class, "counter"); + + public RxThreadFactory(String prefix) { + this.prefix = prefix; + } + @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); + Thread t = new Thread(r, prefix + COUNTER_UPDATER.incrementAndGet(this)); t.setDaemon(true); return t; } - }; - + } + /* package */static NewThreadScheduler instance() { return INSTANCE; } @@ -60,8 +70,8 @@ public Worker createWorker() { } /* package */static class NewThreadWorker extends Scheduler.Worker implements Subscription { - private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final ScheduledExecutorService executor; + volatile boolean isUnsubscribed; /* package */NewThreadWorker(ThreadFactory threadFactory) { executor = Executors.newScheduledThreadPool(1, threadFactory); @@ -74,14 +84,14 @@ public Subscription schedule(final Action0 action) { @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { - if (innerSubscription.isUnsubscribed()) { + if (isUnsubscribed) { return Subscriptions.empty(); } return scheduleActual(action, delayTime, unit); } /* package */ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { - ScheduledAction run = new ScheduledAction(action, innerSubscription); + ScheduledAction run = new ScheduledAction(action); Future f; if (delayTime <= 0) { f = executor.submit(run); @@ -97,12 +107,13 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit private static final class Remover implements Subscription { final Subscription s; final CompositeSubscription parent; - final AtomicBoolean once; + volatile int once; + static final AtomicIntegerFieldUpdater ONCE_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once"); public Remover(Subscription s, CompositeSubscription parent) { this.s = s; this.parent = parent; - this.once = new AtomicBoolean(); } @Override @@ -112,7 +123,7 @@ public boolean isUnsubscribed() { @Override public void unsubscribe() { - if (once.compareAndSet(false, true)) { + if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { parent.remove(s); } } @@ -125,14 +136,13 @@ public void unsubscribe() { public static final class ScheduledAction implements Runnable, Subscription { final CompositeSubscription cancel; final Action0 action; - final CompositeSubscription parent; - final AtomicBoolean once; + volatile int once; + static final AtomicIntegerFieldUpdater ONCE_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "once"); - public ScheduledAction(Action0 action, CompositeSubscription parent) { + public ScheduledAction(Action0 action) { this.action = action; - this.parent = parent; this.cancel = new CompositeSubscription(); - this.once = new AtomicBoolean(); } @Override @@ -151,9 +161,8 @@ public boolean isUnsubscribed() { @Override public void unsubscribe() { - if (once.compareAndSet(false, true)) { + if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { cancel.unsubscribe(); - parent.remove(this); } } public void add(Subscription s) { @@ -171,13 +180,13 @@ public void addParent(CompositeSubscription parent) { @Override public void unsubscribe() { - executor.shutdown(); - innerSubscription.unsubscribe(); + isUnsubscribed = true; + executor.shutdownNow(); } @Override public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); + return isUnsubscribed; } } diff --git a/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java index 736dc3e14f..b6055d5b79 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java @@ -17,7 +17,7 @@ import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Scheduler; import rx.Subscription; @@ -28,7 +28,7 @@ /** * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed. */ -public class TrampolineScheduler extends Scheduler { +public final class TrampolineScheduler extends Scheduler { private static final TrampolineScheduler INSTANCE = new TrampolineScheduler(); /* package */static TrampolineScheduler instance() { @@ -45,7 +45,9 @@ public Worker createWorker() { private static final ThreadLocal> QUEUE = new ThreadLocal>(); - private final AtomicInteger counter = new AtomicInteger(0); + volatile int counter; + static final AtomicIntegerFieldUpdater COUNTER_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(TrampolineScheduler.class, "counter"); private class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription { @@ -75,7 +77,7 @@ private Subscription enqueue(Action0 action, long execTime) { QUEUE.set(queue); } - final TimedAction timedAction = new TimedAction(action, execTime, counter.incrementAndGet()); + final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this)); queue.add(timedAction); if (exec) { diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index f1de19d760..9ca0d7407a 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -18,8 +18,8 @@ import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observer; import rx.Scheduler; @@ -331,14 +331,16 @@ static final class UnboundedReplayState implements ReplayState { /** Each Observer is tracked here for what events they have received. */ final ConcurrentHashMap, Integer> replayState; private final NotificationLite nl = NotificationLite.instance(); - /** The size of the buffer. */ - private final AtomicInteger index; /** The buffer. */ private final ArrayList list; /** The termination flag. */ private volatile boolean terminated; + /** The size of the buffer. */ + volatile int index; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater INDEX_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(UnboundedReplayState.class, "index"); public UnboundedReplayState(int initialCapacity) { - index = new AtomicInteger(0); list = new ArrayList(initialCapacity); replayState = new ConcurrentHashMap, Integer>(); } @@ -347,7 +349,7 @@ public UnboundedReplayState(int initialCapacity) { public void next(T n) { if (!terminated) { list.add(nl.next(n)); - index.getAndIncrement(); + INDEX_UPDATER.getAndIncrement(this); } } @@ -360,7 +362,7 @@ public void complete() { if (!terminated) { terminated = true; list.add(nl.completed()); - index.getAndIncrement(); + INDEX_UPDATER.getAndIncrement(this); } } @Override @@ -368,7 +370,7 @@ public void error(Throwable e) { if (!terminated) { terminated = true; list.add(nl.error(e)); - index.getAndIncrement(); + INDEX_UPDATER.getAndIncrement(this); } } @@ -391,7 +393,7 @@ public void replayObserver(SubjectObserver observer) { @Override public Integer replayObserverFromIndex(Integer idx, SubjectObserver observer) { int i = idx; - while (i < index.get()) { + while (i < index) { accept(observer, i); i++; } @@ -418,17 +420,21 @@ public int replayStateSize() { */ static final class BoundedState implements ReplayState> { final NodeList list; - final AtomicReference> tail; final ConcurrentHashMap, NodeList.Node> replayState; final EvictionPolicy evictionPolicy; final Func1 enterTransform; final Func1 leaveTransform; final NotificationLite nl = NotificationLite.instance(); volatile boolean terminated; + volatile NodeList.Node tail; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater TAIL_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(BoundedState.class, NodeList.Node.class, "tail"); + public BoundedState(EvictionPolicy evictionPolicy, Func1 enterTransform, Func1 leaveTransform) { this.list = new NodeList(); - this.tail = new AtomicReference>(list.tail); + TAIL_UPDATER.lazySet(this, list.tail); this.replayState = new ConcurrentHashMap, NodeList.Node>(); this.evictionPolicy = evictionPolicy; this.enterTransform = enterTransform; @@ -439,7 +445,7 @@ public void next(T value) { if (!terminated) { list.addLast(enterTransform.call(nl.next(value))); evictionPolicy.evict(list); - tail.set(list.tail); + TAIL_UPDATER.lazySet(this, list.tail); } } @Override @@ -450,7 +456,7 @@ public void complete() { evictionPolicy.evict(list); // so add it later list.addLast(enterTransform.call(nl.completed())); - tail.set(list.tail); + TAIL_UPDATER.lazySet(this, list.tail); } } @@ -462,7 +468,7 @@ public void error(Throwable e) { evictionPolicy.evict(list); // so add it later list.addLast(enterTransform.call(nl.error(e))); - tail.set(list.tail); + TAIL_UPDATER.lazySet(this, list.tail); } } public void accept(Observer o, NodeList.Node node) { @@ -484,7 +490,7 @@ public Node head() { return list.head; } public Node tail() { - return tail.get(); + return tail; } public Node removeState(SubjectObserver o) { return replayState.remove(o); diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index 6ed6a5e942..df782a0b90 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -17,7 +17,7 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observable.OnSubscribe; import rx.Observer; @@ -35,9 +35,13 @@ @SuppressWarnings({"unchecked", "rawtypes"}) /* package */final class SubjectSubscriptionManager implements OnSubscribe { /** Contains the unsubscription flag and the array of active subscribers. */ - private final AtomicReference> state = new AtomicReference>(State.EMPTY); + volatile State state = State.EMPTY; + static final AtomicReferenceFieldUpdater STATE_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(SubjectSubscriptionManager.class, State.class, "state"); /** Stores the latest value or the terminal value for some Subjects. */ - final AtomicReference latest = new AtomicReference(); + volatile Object latest; + static final AtomicReferenceFieldUpdater LATEST_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(SubjectSubscriptionManager.class, Object.class, "latest"); /** Indicates that the subject is active (cheaper than checking the state).*/ boolean active = true; /** Action called when a new subscriber subscribes but before it is added to the state. */ @@ -70,15 +74,15 @@ public void call() { } /** Set the latest NotificationLite value. */ void set(Object value) { - this.latest.set(value); + LATEST_UPDATER.lazySet(this, value); } /** @return Retrieve the latest NotificationLite value */ Object get() { - return latest.get(); + return latest; } /** @return the array of active subscribers, don't write into the array! */ SubjectObserver[] observers() { - return state.get().observers; + return state.observers; } /** * Try to atomically add a SubjectObserver to the active state. @@ -87,13 +91,13 @@ SubjectObserver[] observers() { */ boolean add(SubjectObserver o) { do { - State oldState = state.get(); + State oldState = state; if (oldState.terminated) { onTerminated.call(o); return false; } State newState = oldState.add(o); - if (state.compareAndSet(oldState, newState)) { + if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { onAdded.call(o); return true; } @@ -105,12 +109,12 @@ boolean add(SubjectObserver o) { */ void remove(SubjectObserver o) { do { - State oldState = state.get(); + State oldState = state; if (oldState.terminated) { return; } State newState = oldState.remove(o); - if (newState == oldState || state.compareAndSet(oldState, newState)) { + if (newState == oldState || STATE_UPDATER.compareAndSet(this, oldState, newState)) { return; } } while (true); @@ -122,7 +126,7 @@ void remove(SubjectObserver o) { */ SubjectObserver[] next(Object n) { set(n); - return state.get().observers; + return state.observers; } /** * Atomically set the terminal NotificationLite value (which could be any of the 3), @@ -133,15 +137,12 @@ SubjectObserver[] next(Object n) { SubjectObserver[] terminate(Object n) { set(n); active = false; - do { - State oldState = state.get(); - if (oldState.terminated) { - return State.NO_OBSERVERS; - } - if (state.compareAndSet(oldState, State.TERMINATED)) { - return oldState.observers; - } - } while (true); + + State oldState = state; + if (oldState.terminated) { + return State.NO_OBSERVERS; + } + return STATE_UPDATER.getAndSet(this, State.TERMINATED).observers; } /** State-machine representing the termination state and active SubjectObservers. */ diff --git a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java index 8e305fd1a4..929d104baa 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java @@ -15,7 +15,7 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Observable; import rx.Subscription; @@ -28,8 +28,10 @@ */ public final class BooleanSubscription implements Subscription { - private final AtomicBoolean unsubscribed = new AtomicBoolean(false); private final Action0 action; + volatile int unsubscribed; + static final AtomicIntegerFieldUpdater UNSUBSCRIBED_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(BooleanSubscription.class, "unsubscribed"); public BooleanSubscription() { action = null; @@ -47,13 +49,14 @@ public static BooleanSubscription create(Action0 onUnsubscribe) { return new BooleanSubscription(onUnsubscribe); } + @Override public boolean isUnsubscribed() { - return unsubscribed.get(); + return unsubscribed != 0; } @Override public final void unsubscribe() { - if (unsubscribed.compareAndSet(false, true)) { + if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) { if (action != null) { action.call(); } diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 296cd231cd..60b075e02f 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -15,7 +15,7 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observable; import rx.Subscription; @@ -26,9 +26,12 @@ * @see Rx.Net equivalent MultipleAssignmentDisposable */ public final class MultipleAssignmentSubscription implements Subscription { - - private final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); - + /** The shared empty state. */ + static final State EMPTY_STATE = new State(false, Subscriptions.empty()); + volatile State state = EMPTY_STATE; + static final AtomicReferenceFieldUpdater STATE_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(MultipleAssignmentSubscription.class, State.class, "state"); + private static final class State { final boolean isUnsubscribed; final Subscription subscription; @@ -47,9 +50,9 @@ State set(Subscription s) { } } - + @Override public boolean isUnsubscribed() { - return state.get().isUnsubscribed; + return state.isUnsubscribed; } @Override @@ -57,13 +60,13 @@ public void unsubscribe() { State oldState; State newState; do { - oldState = state.get(); + oldState = state; if (oldState.isUnsubscribed) { return; } else { newState = oldState.unsubscribe(); } - } while (!state.compareAndSet(oldState, newState)); + } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); oldState.subscription.unsubscribe(); } @@ -74,18 +77,18 @@ public void set(Subscription s) { State oldState; State newState; do { - oldState = state.get(); + oldState = state; if (oldState.isUnsubscribed) { s.unsubscribe(); return; } else { newState = oldState.set(s); } - } while (!state.compareAndSet(oldState, newState)); + } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); } public Subscription get() { - return state.get().subscription; + return state.subscription; } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java index fdc9641dd8..35fc538f00 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -15,8 +15,8 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Subscription; @@ -28,7 +28,10 @@ */ public final class RefCountSubscription implements Subscription { private final Subscription actual; - private final AtomicReference state = new AtomicReference(new State(false, 0)); + static final State EMPTY_STATE = new State(false, 0); + volatile State state = EMPTY_STATE; + static final AtomicReferenceFieldUpdater STATE_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(RefCountSubscription.class, State.class, "state"); private static final class State { final boolean isUnsubscribed; @@ -66,28 +69,27 @@ public RefCountSubscription(Subscription s) { } /** - * Returns a new sub-subscription. + * Returns a new sub-subscription + * @return a new sub-subscription. */ public Subscription get() { State oldState; State newState; do { - oldState = state.get(); + oldState = state; if (oldState.isUnsubscribed) { return Subscriptions.empty(); } else { newState = oldState.addChild(); } - } while (!state.compareAndSet(oldState, newState)); + } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); - return new InnerSubscription(); + return new InnerSubscription(this); } - /** - * Check if this subscription is already unsubscribed. - */ + @Override public boolean isUnsubscribed() { - return state.get().isUnsubscribed; + return state.isUnsubscribed; } @Override @@ -95,12 +97,12 @@ public void unsubscribe() { State oldState; State newState; do { - oldState = state.get(); + oldState = state; if (oldState.isUnsubscribed) { return; } newState = oldState.unsubscribe(); - } while (!state.compareAndSet(oldState, newState)); + } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); unsubscribeActualIfApplicable(newState); } @@ -109,27 +111,35 @@ private void unsubscribeActualIfApplicable(State state) { actual.unsubscribe(); } } + void unsubscribeAChild() { + State oldState; + State newState; + do { + oldState = state; + newState = oldState.removeChild(); + } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + unsubscribeActualIfApplicable(newState); + } /** The individual sub-subscriptions. */ - private final class InnerSubscription implements Subscription { - final AtomicBoolean innerDone = new AtomicBoolean(); - + private static final class InnerSubscription implements Subscription { + final RefCountSubscription parent; + volatile int innerDone; + static final AtomicIntegerFieldUpdater INNER_DONE_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(InnerSubscription.class, "innerDone"); + public InnerSubscription(RefCountSubscription parent) { + this.parent = parent; + } @Override public void unsubscribe() { - if (innerDone.compareAndSet(false, true)) { - State oldState; - State newState; - do { - oldState = state.get(); - newState = oldState.removeChild(); - } while (!state.compareAndSet(oldState, newState)); - unsubscribeActualIfApplicable(newState); + if (INNER_DONE_UPDATER.compareAndSet(this, 0, 1)) { + parent.unsubscribeAChild(); } } @Override public boolean isUnsubscribed() { - return innerDone.get(); + return innerDone != 0; } }; } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java index 77b6226db6..eb4b410d72 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java @@ -15,7 +15,7 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Subscription; @@ -26,8 +26,10 @@ * @see Rx.Net equivalent SerialDisposable */ public final class SerialSubscription implements Subscription { - - private final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); + static final State EMPTY_STATE = new State(false, Subscriptions.empty()); + volatile State state = EMPTY_STATE; + static final AtomicReferenceFieldUpdater STATE_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(SerialSubscription.class, State.class, "state"); private static final class State { final boolean isUnsubscribed; @@ -48,8 +50,9 @@ State set(Subscription s) { } + @Override public boolean isUnsubscribed() { - return state.get().isUnsubscribed; + return state.isUnsubscribed; } @Override @@ -57,13 +60,13 @@ public void unsubscribe() { State oldState; State newState; do { - oldState = state.get(); + oldState = state; if (oldState.isUnsubscribed) { return; } else { newState = oldState.unsubscribe(); } - } while (!state.compareAndSet(oldState, newState)); + } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); oldState.subscription.unsubscribe(); } @@ -74,19 +77,19 @@ public void set(Subscription s) { State oldState; State newState; do { - oldState = state.get(); + oldState = state; if (oldState.isUnsubscribed) { s.unsubscribe(); return; } else { newState = oldState.set(s); } - } while (!state.compareAndSet(oldState, newState)); + } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); oldState.subscription.unsubscribe(); } public Subscription get() { - return state.get().subscription; + return state.subscription; } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java index e489faf99d..9709cf676e 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -16,7 +16,7 @@ package rx.subscriptions; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Subscription; import rx.functions.Action0; @@ -49,27 +49,32 @@ public static Subscription create(final Action0 unsubscribe) { * Subscription that delegates the unsubscription action to an Action0 instance */ private static final class ActionSubscription implements Subscription { - private final AtomicReference actual; + volatile Action0 actual; + static final AtomicReferenceFieldUpdater ACTUAL_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(ActionSubscription.class, Action0.class, "actual"); public ActionSubscription(Action0 action) { - this.actual = new AtomicReference(action != null ? action : Actions.empty()); + ACTUAL_UPDATER.lazySet(this, action != null ? action : Actions.empty()); } @Override public boolean isUnsubscribed() { - return actual.get() == UNSUBSCRIBED_ACTION; + return actual == UNSUBSCRIBED_ACTION; } @Override public void unsubscribe() { - Action0 a = actual.getAndSet(UNSUBSCRIBED_ACTION); + Action0 a = ACTUAL_UPDATER.getAndSet(this, UNSUBSCRIBED_ACTION); a.call(); } /** The no-op unique action indicating an unsubscribed state. */ - private static final Action0 UNSUBSCRIBED_ACTION = new Action0() { + private static final Unsubscribed UNSUBSCRIBED_ACTION = new Unsubscribed(); + /** Naming classes helps with debugging. */ + private static final class Unsubscribed implements Action0 { @Override public void call() { - + } - }; + } } + /** * A {@link Subscription} that wraps a {@link Future} and cancels it when unsubscribed. @@ -80,19 +85,24 @@ public void call() { * @return {@link Subscription} */ public static Subscription from(final Future f) { - return new Subscription() { - - @Override - public void unsubscribe() { - f.cancel(true); - } + return new FutureSubscription(f); + } + /** Naming classes helps with debugging. */ + private static final class FutureSubscription implements Subscription { + final Future f; - @Override - public boolean isUnsubscribed() { - return f.isCancelled(); - } + public FutureSubscription(Future f) { + this.f = f; + } + @Override + public void unsubscribe() { + f.cancel(true); + } - }; + @Override + public boolean isUnsubscribed() { + return f.isCancelled(); + } } /** @@ -110,7 +120,10 @@ public static CompositeSubscription from(Subscription... subscriptions) { /** * A {@link Subscription} that does nothing when its unsubscribe method is called. */ - private static Subscription EMPTY = new Subscription() { + private static final Empty EMPTY = new Empty(); + /** Naming classes helps with debugging. */ + private static final class Empty implements Subscription { + @Override public void unsubscribe() { } @@ -118,5 +131,5 @@ public void unsubscribe() { public boolean isUnsubscribed() { return false; } - }; + } } diff --git a/rxjava-core/src/test/java/rx/operators/BlockingOperatorNextTest.java b/rxjava-core/src/test/java/rx/operators/BlockingOperatorNextTest.java index 953db5c6fd..a4e8139232 100644 --- a/rxjava-core/src/test/java/rx/operators/BlockingOperatorNextTest.java +++ b/rxjava-core/src/test/java/rx/operators/BlockingOperatorNextTest.java @@ -304,12 +304,12 @@ public void testSingleSourceManyIterators() throws InterruptedException { for (long i = 0; i < 9; i++) { // hasNext has to set the waiting to true, otherwise, all onNext will be skipped - it.setWaiting(true); + it.setWaiting(1); ps.onNext(i); Assert.assertEquals(true, it.hasNext()); Assert.assertEquals(j + "th iteration", Long.valueOf(i), it.next()); } - it.setWaiting(true); + it.setWaiting(1); ps.onNext(9L); Assert.assertEquals(j + "th iteration", false, it.hasNext()); diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java index 6e32b8f6cf..e12cab276a 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java @@ -71,7 +71,7 @@ public void call(Subscriber o) { }); source.start(); - long v = replay.toBlockingObservable().last(); + long v = replay.toBlocking().last(); assertEquals(10000, v); // it's been played through once so now it will all be replays @@ -198,7 +198,7 @@ public void call(Subscriber o) { @Override public void run() { - List values = replay.toList().toBlockingObservable().last(); + List values = replay.toList().toBlocking().last(); listOfListsOfValues.add(values); System.out.println("Finished thread: " + count); } @@ -330,7 +330,7 @@ public SubjectObserverThread(ReplaySubject subject) { public void run() { try { // a timeout exception will happen if we don't get a terminal state - String v = subject.timeout(2000, TimeUnit.MILLISECONDS).toBlockingObservable().single(); + String v = subject.timeout(2000, TimeUnit.MILLISECONDS).toBlocking().single(); value.set(v); } catch (Exception e) { e.printStackTrace(); From ccbd4832f3b3c5616d0fcf83be605c10f6dac6d9 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 23 May 2014 23:25:56 +0200 Subject: [PATCH 2/3] Changed lazySet to regular volatile write to avoid potential visibility issues. --- .../operators/BlockingOperatorMostRecent.java | 12 ++-- .../rx/operators/BlockingOperatorNext.java | 2 +- .../rx/operators/BufferUntilSubscriber.java | 2 +- .../main/java/rx/operators/OperatorMerge.java | 2 +- .../rx/operators/OperatorMergeDelayError.java | 2 +- .../rx/operators/OperatorMergeMapPair.java | 2 +- .../operators/OperatorMergeMapTransform.java | 2 +- .../operators/OperatorMergeMaxConcurrent.java | 2 +- .../rx/operators/OperatorSampleWithTime.java | 2 +- .../main/java/rx/plugins/RxJavaPlugins.java | 57 ++++++++----------- .../main/java/rx/subjects/ReplaySubject.java | 11 ++-- .../subjects/SubjectSubscriptionManager.java | 2 +- .../subscriptions/CompositeSubscription.java | 5 +- .../java/rx/subscriptions/Subscriptions.java | 2 +- 14 files changed, 45 insertions(+), 60 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java b/rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java index e2b6fb22cb..81f846f392 100644 --- a/rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java +++ b/rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java @@ -16,7 +16,6 @@ package rx.operators; import java.util.Iterator; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observable; import rx.Subscriber; @@ -80,27 +79,24 @@ public void remove() { private static class MostRecentObserver extends Subscriber { static final NotificationLite nl = NotificationLite.instance(); volatile Object value; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater VALUE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(MostRecentObserver.class, Object.class, "value"); private MostRecentObserver(T value) { - VALUE_UPDATER.lazySet(this, nl.next(value)); + this.value = nl.next(value); } @Override public void onCompleted() { - VALUE_UPDATER.lazySet(this, nl.completed()); + value = nl.completed(); } @Override public void onError(Throwable e) { - VALUE_UPDATER.lazySet(this, nl.error(e)); + value = nl.error(e); } @Override public void onNext(T args) { - VALUE_UPDATER.lazySet(this, nl.next(args)); + value = nl.next(args); } private boolean isCompleted() { diff --git a/rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java b/rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java index 5d570f955e..fdb688156a 100644 --- a/rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java +++ b/rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java @@ -178,7 +178,7 @@ public Notification takeNext() throws InterruptedException { return buf.take(); } void setWaiting(int value) { - WAITING_UPDATER.lazySet(this, value); + waiting = value; } } } diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java index 7bbbb4d3f9..a2046aa56b 100644 --- a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java +++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java @@ -70,7 +70,7 @@ boolean casFirst(int expected, int next) { return FIRST_UPDATER.compareAndSet(this, expected, next); } void setObserverRef(Observer o) { - OBSERVER_UPDATER.lazySet(this, o); + observerRef = o; } boolean casObserverRef(Observer expected, Observer next) { return OBSERVER_UPDATER.compareAndSet(this, expected, next); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java index 03a6afdeab..49072c2c47 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java @@ -57,7 +57,7 @@ public MergeSubscriber(Subscriber actual, CompositeSubscription childrenSubsc super(actual); this.actual = actual; this.childrenSubscriptions = childrenSubscriptions; - WIP_UPDATER.lazySet(this, 1); + this.wip = 1; } @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java index b5b8b72fc9..a62aaa9db2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeDelayError.java @@ -67,7 +67,7 @@ public MergeDelayErrorSubscriber(Subscriber s, CompositeSubscription super(s); this.s = s; this.csub = csub; - WIP_UPDATER.lazySet(this, 1); + this.wip = 1; } @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java index 9a1f437f22..7db1c21daf 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapPair.java @@ -79,7 +79,7 @@ public SourceSubscriber(Subscriber s, CompositeSubscription csub, this.csub = csub; this.collectionSelector = collectionSelector; this.resultSelector = resultSelector; - WIP_UPDATER.lazySet(this, 1); + this.wip = 1; } @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java index 1799e071eb..f4c5a6c761 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMergeMapTransform.java @@ -71,7 +71,7 @@ public SourceSubscriber(Subscriber s, CompositeSubscription csub, Func1 s, CompositeSubscripti this.csub = csub; this.guard = new Object(); this.queue = new LinkedList>(); - WIP_UPDATER.lazySet(this, 1); + this.wip = 1; } @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java b/rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java index 1bc48b8033..7474c9ec04 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java @@ -72,7 +72,7 @@ public SamplerSubscriber(Subscriber subscriber) { } @Override public void onNext(T t) { - VALUE_UPDATER.lazySet(this, t); + value = t; } @Override diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java index a3eb2f193c..408802acb3 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java @@ -15,7 +15,7 @@ */ package rx.plugins; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; /** * Registry for plugin implementations that allows global override and handles the retrieval of correct implementation based on order of precedence: @@ -29,17 +29,10 @@ public class RxJavaPlugins { private final static RxJavaPlugins INSTANCE = new RxJavaPlugins(); - volatile RxJavaErrorHandler errorHandler; - volatile RxJavaObservableExecutionHook observableExecutionHook; - volatile RxJavaDefaultSchedulers schedulerOverrides; + private final AtomicReference errorHandler = new AtomicReference(); + private final AtomicReference observableExecutionHook = new AtomicReference(); + private final AtomicReference schedulerOverrides = new AtomicReference(); - static final AtomicReferenceFieldUpdater ERROR_HANDLER_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaErrorHandler.class, "errorHandler"); - static final AtomicReferenceFieldUpdater EXECUTION_HOOK_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaObservableExecutionHook.class, "observableExecutionHook"); - static final AtomicReferenceFieldUpdater SCHEDULER_OVERRIDE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(RxJavaPlugins.class, RxJavaDefaultSchedulers.class, "schedulerOverrides"); - public static RxJavaPlugins getInstance() { return INSTANCE; } @@ -48,9 +41,9 @@ public static RxJavaPlugins getInstance() { } - /* package accessible for unit tests */void reset() { - ERROR_HANDLER_UPDATER.lazySet(this, null); - EXECUTION_HOOK_UPDATER.lazySet(this, null); + /* package accessible for ujnit tests */void reset() { + INSTANCE.errorHandler.set(null); + INSTANCE.observableExecutionHook.set(null); } /** @@ -62,19 +55,19 @@ public static RxJavaPlugins getInstance() { * @return {@link RxJavaErrorHandler} implementation to use */ public RxJavaErrorHandler getErrorHandler() { - if (errorHandler == null) { + if (errorHandler.get() == null) { // check for an implementation from System.getProperty first Object impl = getPluginImplementationViaProperty(RxJavaErrorHandler.class); if (impl == null) { // nothing set via properties so initialize with default - ERROR_HANDLER_UPDATER.compareAndSet(this, null, RxJavaErrorHandlerDefault.getInstance()); + errorHandler.compareAndSet(null, RxJavaErrorHandlerDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from the system property so use it - ERROR_HANDLER_UPDATER.compareAndSet(this, null, (RxJavaErrorHandler) impl); + errorHandler.compareAndSet(null, (RxJavaErrorHandler) impl); } } - return errorHandler; + return errorHandler.get(); } /** @@ -86,8 +79,8 @@ public RxJavaErrorHandler getErrorHandler() { * if called more than once or after the default was initialized (if usage occurs before trying to register) */ public void registerErrorHandler(RxJavaErrorHandler impl) { - if (!ERROR_HANDLER_UPDATER.compareAndSet(this, null, impl)) { - throw new IllegalStateException("Another strategy was already registered: " + errorHandler); + if (!errorHandler.compareAndSet(null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + errorHandler.get()); } } @@ -100,19 +93,19 @@ public void registerErrorHandler(RxJavaErrorHandler impl) { * @return {@link RxJavaObservableExecutionHook} implementation to use */ public RxJavaObservableExecutionHook getObservableExecutionHook() { - if (observableExecutionHook == null) { + if (observableExecutionHook.get() == null) { // check for an implementation from System.getProperty first Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class); if (impl == null) { // nothing set via properties so initialize with default - EXECUTION_HOOK_UPDATER.compareAndSet(this, null, RxJavaObservableExecutionHookDefault.getInstance()); + observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from the system property so use it - EXECUTION_HOOK_UPDATER.compareAndSet(this, null, (RxJavaObservableExecutionHook) impl); + observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl); } } - return observableExecutionHook; + return observableExecutionHook.get(); } /** @@ -124,8 +117,8 @@ public RxJavaObservableExecutionHook getObservableExecutionHook() { * if called more than once or after the default was initialized (if usage occurs before trying to register) */ public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl) { - if (!EXECUTION_HOOK_UPDATER.compareAndSet(this, null, impl)) { - throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook); + if (!observableExecutionHook.compareAndSet(null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook.get()); } } @@ -168,19 +161,19 @@ private static Object getPluginImplementationViaProperty(Class pluginClass) { * @return {@link RxJavaErrorHandler} implementation to use */ public RxJavaDefaultSchedulers getDefaultSchedulers() { - if (schedulerOverrides == null) { + if (schedulerOverrides.get() == null) { // check for an implementation from System.getProperty first Object impl = getPluginImplementationViaProperty(RxJavaDefaultSchedulers.class); if (impl == null) { // nothing set via properties so initialize with default - SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, RxJavaDefaultSchedulersDefault.getInstance()); + schedulerOverrides.compareAndSet(null, RxJavaDefaultSchedulersDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from the system property so use it - SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, (RxJavaDefaultSchedulers) impl); + schedulerOverrides.compareAndSet(null, (RxJavaDefaultSchedulers) impl); } } - return schedulerOverrides; + return schedulerOverrides.get(); } /** @@ -192,8 +185,8 @@ public RxJavaDefaultSchedulers getDefaultSchedulers() { * if called more than once or after the default was initialized (if usage occurs before trying to register) */ public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) { - if (!SCHEDULER_OVERRIDE_UPDATER.compareAndSet(this, null, impl)) { - throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides); + if (!schedulerOverrides.compareAndSet(null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get()); } } } diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 9ca0d7407a..f212319abb 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -427,14 +427,11 @@ static final class BoundedState implements ReplayState nl = NotificationLite.instance(); volatile boolean terminated; volatile NodeList.Node tail; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater TAIL_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(BoundedState.class, NodeList.Node.class, "tail"); public BoundedState(EvictionPolicy evictionPolicy, Func1 enterTransform, Func1 leaveTransform) { this.list = new NodeList(); - TAIL_UPDATER.lazySet(this, list.tail); + this.tail = list.tail; this.replayState = new ConcurrentHashMap, NodeList.Node>(); this.evictionPolicy = evictionPolicy; this.enterTransform = enterTransform; @@ -445,7 +442,7 @@ public void next(T value) { if (!terminated) { list.addLast(enterTransform.call(nl.next(value))); evictionPolicy.evict(list); - TAIL_UPDATER.lazySet(this, list.tail); + tail = list.tail; } } @Override @@ -456,7 +453,7 @@ public void complete() { evictionPolicy.evict(list); // so add it later list.addLast(enterTransform.call(nl.completed())); - TAIL_UPDATER.lazySet(this, list.tail); + tail = list.tail; } } @@ -468,7 +465,7 @@ public void error(Throwable e) { evictionPolicy.evict(list); // so add it later list.addLast(enterTransform.call(nl.error(e))); - TAIL_UPDATER.lazySet(this, list.tail); + tail = list.tail; } } public void accept(Observer o, NodeList.Node node) { diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index df782a0b90..6c6d598e85 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -74,7 +74,7 @@ public void call() { } /** Set the latest NotificationLite value. */ void set(Object value) { - LATEST_UPDATER.lazySet(this, value); + latest = value; } /** @return Retrieve the latest NotificationLite value */ Object get() { diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 79fd979f55..10c59a0ba9 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -17,7 +17,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Subscription; @@ -103,11 +102,11 @@ State clear() { public CompositeSubscription() { // this creates only a store-store barrier which is generally faster when // CompositeSubscriptions are created in a tight loop. - STATE_UPDATER.lazySet(this, CLEAR_STATE); + state = CLEAR_STATE; } public CompositeSubscription(final Subscription... subscriptions) { - STATE_UPDATER.lazySet(this, new State(false, subscriptions)); + state = new State(false, subscriptions); } @Override diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java index 9709cf676e..283a4a5587 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -53,7 +53,7 @@ private static final class ActionSubscription implements Subscription { static final AtomicReferenceFieldUpdater ACTUAL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ActionSubscription.class, Action0.class, "actual"); public ActionSubscription(Action0 action) { - ACTUAL_UPDATER.lazySet(this, action != null ? action : Actions.empty()); + this.actual = action != null ? action : Actions.empty(); } @Override public boolean isUnsubscribed() { From 2a08d800b11e0856a8cec7980660bb54ca5c408f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 24 May 2014 00:03:42 +0200 Subject: [PATCH 3/3] Make the chunk_test.clj work again for now --- .../main/java/rx/operators/OperatorMerge.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java index 49072c2c47..20f2a31071 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMerge.java @@ -49,6 +49,7 @@ static final class MergeSubscriber extends Subscriber final Subscriber actual; final CompositeSubscription childrenSubscriptions; volatile int wip; + volatile boolean completed; @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP_UPDATER = AtomicIntegerFieldUpdater.newUpdater(MergeSubscriber.class, "wip"); @@ -57,7 +58,6 @@ public MergeSubscriber(Subscriber actual, CompositeSubscription childrenSubsc super(actual); this.actual = actual; this.childrenSubscriptions = childrenSubscriptions; - this.wip = 1; } @Override @@ -76,11 +76,20 @@ public void onError(Throwable e) { @Override public void onCompleted() { - if (WIP_UPDATER.decrementAndGet(this) == 0) { + completed = true; + if (wip == 0) { actual.onCompleted(); } } - + void completeInner(InnerSubscriber s) { + try { + if (WIP_UPDATER.decrementAndGet(this) == 0 && completed) { + actual.onCompleted(); + } + } finally { + childrenSubscriptions.remove(s); + } + } } static final class InnerSubscriber extends Subscriber { final Subscriber actual; @@ -110,11 +119,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { - try { - parent.onCompleted(); - } finally { - parent.childrenSubscriptions.remove(this); - } + parent.completeInner(this); } }