From 308255868f25e8f805d4e8b3f9a9d3571636b817 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 28 Jan 2015 16:48:40 +0100 Subject: [PATCH 1/4] RxRingBuffer with synchronization --- .../rx/internal/operators/OperatorMerge.java | 4 +- .../java/rx/internal/util/RxRingBuffer.java | 101 +++++++++++------- .../internal/util/unsafe/SpscArrayQueue.java | 23 ++-- 3 files changed, 77 insertions(+), 51 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index b86db26c47..384a7bea42 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -308,7 +308,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou private RxRingBuffer getOrCreateScalarValueQueue() { RxRingBuffer svq = scalarValueQueue; if (svq == null) { - svq = RxRingBuffer.getSpmcInstance(); + svq = RxRingBuffer.getSpscInstance(); scalarValueQueue = svq; } return svq; @@ -581,7 +581,7 @@ private static final class InnerSubscriber extends Subscriber { @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated"); - private final RxRingBuffer q = RxRingBuffer.getSpmcInstance(); + private final RxRingBuffer q = RxRingBuffer.getSpscInstance(); public InnerSubscriber(MergeSubscriber parent, MergeProducer producer) { this.parentSubscriber = parent; diff --git a/src/main/java/rx/internal/util/RxRingBuffer.java b/src/main/java/rx/internal/util/RxRingBuffer.java index 389793c1c3..649f8e8a6e 100644 --- a/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/src/main/java/rx/internal/util/RxRingBuffer.java @@ -34,7 +34,7 @@ public class RxRingBuffer implements Subscription { public static RxRingBuffer getSpscInstance() { if (UnsafeAccess.isUnsafeAvailable()) { // TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now - return new RxRingBuffer(SPMC_POOL, SIZE); + return new RxRingBuffer(SPSC_POOL, SIZE); } else { return new RxRingBuffer(); } @@ -306,12 +306,13 @@ private RxRingBuffer(ObjectPool> pool, int size) { this.size = size; } - public void release() { - if (pool != null) { - Queue q = queue; + public synchronized void release() { + Queue q = queue; + ObjectPool> p = pool; + if (p != null && q != null) { q.clear(); queue = null; - pool.returnObject(q); + p.returnObject(q); } } @@ -331,10 +332,21 @@ public void unsubscribe() { * if more onNext are sent than have been requested */ public void onNext(Object o) throws MissingBackpressureException { - if (queue == null) { + boolean iae = false; + boolean mbe = false; + synchronized (this) { + Queue q = queue; + if (q != null) { + mbe = !q.offer(on.next(o)); + } else { + iae = true; + } + } + + if (iae) { throw new IllegalStateException("This instance has been unsubscribed and the queue is no longer usable."); } - if (!queue.offer(on.next(o))) { + if (mbe) { throw new MissingBackpressureException(); } } @@ -362,55 +374,66 @@ public int capacity() { } public int count() { - if (queue == null) { + Queue q = queue; + if (q == null) { return 0; } - return queue.size(); + return q.size(); } public boolean isEmpty() { - if (queue == null) { + Queue q = queue; + if (q == null) { return true; } - return queue.isEmpty(); + return q.isEmpty(); } public Object poll() { - if (queue == null) { - // we are unsubscribed and have released the undelrying queue - return null; - } Object o; - o = queue.poll(); - /* - * benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll() - * is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case, - * "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState. - * - * The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on* - * or needing to enqueue terminalState. - * - * This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires - * a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it - * is currently the way it is. - */ - if (o == null && terminalState != null && queue.isEmpty()) { - o = terminalState; - // once emitted we clear so a poll loop will finish - terminalState = null; + synchronized (this) { + Queue q = queue; + if (q == null) { + // we are unsubscribed and have released the undelrying queue + return null; + } + o = q.poll(); + + /* + * benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll() + * is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case, + * "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState. + * + * The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on* + * or needing to enqueue terminalState. + * + * This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires + * a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it + * is currently the way it is. + */ + Object ts = terminalState; + if (o == null && ts != null && q.peek() == null) { + o = ts; + // once emitted we clear so a poll loop will finish + terminalState = null; + } } return o; } public Object peek() { - if (queue == null) { - // we are unsubscribed and have released the undelrying queue - return null; - } Object o; - o = queue.peek(); - if (o == null && terminalState != null && queue.isEmpty()) { - o = terminalState; + synchronized (this) { + Queue q = queue; + if (q == null) { + // we are unsubscribed and have released the undelrying queue + return null; + } + o = q.peek(); + Object ts = terminalState; + if (o == null && ts != null && q.peek() == null) { + o = ts; + } } return o; } diff --git a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java index 16d1e81fae..6696cbce1b 100644 --- a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java @@ -118,19 +118,22 @@ public SpscArrayQueue(final int capacity) { */ @Override public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } +// if (null == e) { +// throw new NullPointerException("Null is not a valid element"); +// } // local load of field to avoid repeated loads after volatile reads final E[] lElementBuffer = buffer; final long offset = calcElementOffset(producerIndex); - if (producerIndex >= producerLookAhead) { - if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad - producerLookAhead = producerIndex + lookAheadStep; - } - else if (null != lvElement(lElementBuffer, offset)){ - return false; - } +// if (producerIndex >= producerLookAhead) { +// if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad +// producerLookAhead = producerIndex + lookAheadStep; +// } +// else if (null != lvElement(lElementBuffer, offset)){ +// return false; +// } +// } + if (null != lvElement(lElementBuffer, offset)){ + return false; } producerIndex++; // do increment here so the ordered store give both a barrier soElement(lElementBuffer, offset, e);// StoreStore From 72c56f37c58ea1cfab292d654ef363d3e8719679 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 28 Jan 2015 18:42:44 +0100 Subject: [PATCH 2/4] Update RxRingBuffer.java Fixes in comments. --- src/main/java/rx/internal/util/RxRingBuffer.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/src/main/java/rx/internal/util/RxRingBuffer.java b/src/main/java/rx/internal/util/RxRingBuffer.java index 649f8e8a6e..30d55a279a 100644 --- a/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/src/main/java/rx/internal/util/RxRingBuffer.java @@ -33,7 +33,6 @@ public class RxRingBuffer implements Subscription { public static RxRingBuffer getSpscInstance() { if (UnsafeAccess.isUnsafeAvailable()) { - // TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now return new RxRingBuffer(SPSC_POOL, SIZE); } else { return new RxRingBuffer(); @@ -394,23 +393,11 @@ public Object poll() { synchronized (this) { Queue q = queue; if (q == null) { - // we are unsubscribed and have released the undelrying queue + // we are unsubscribed and have released the underlying queue return null; } o = q.poll(); - /* - * benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll() - * is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case, - * "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState. - * - * The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on* - * or needing to enqueue terminalState. - * - * This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires - * a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it - * is currently the way it is. - */ Object ts = terminalState; if (o == null && ts != null && q.peek() == null) { o = ts; From dc5bf0d9d8864ba49a14dbdf33758bad9b422025 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 28 Jan 2015 18:43:57 +0100 Subject: [PATCH 3/4] Update RxRingBuffer.java Yet another undelrying --- src/main/java/rx/internal/util/RxRingBuffer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/util/RxRingBuffer.java b/src/main/java/rx/internal/util/RxRingBuffer.java index 30d55a279a..7498b445be 100644 --- a/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/src/main/java/rx/internal/util/RxRingBuffer.java @@ -413,7 +413,7 @@ public Object peek() { synchronized (this) { Queue q = queue; if (q == null) { - // we are unsubscribed and have released the undelrying queue + // we are unsubscribed and have released the underlying queue return null; } o = q.peek(); From 84d38bf0485ca07f01f2871882baba8d291bbd9c Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 28 Jan 2015 18:59:24 +0100 Subject: [PATCH 4/4] Removed commented-out code. --- .../java/rx/internal/util/unsafe/SpscArrayQueue.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java index 6696cbce1b..3410a625f6 100644 --- a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java @@ -118,20 +118,9 @@ public SpscArrayQueue(final int capacity) { */ @Override public boolean offer(final E e) { -// if (null == e) { -// throw new NullPointerException("Null is not a valid element"); -// } // local load of field to avoid repeated loads after volatile reads final E[] lElementBuffer = buffer; final long offset = calcElementOffset(producerIndex); -// if (producerIndex >= producerLookAhead) { -// if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad -// producerLookAhead = producerIndex + lookAheadStep; -// } -// else if (null != lvElement(lElementBuffer, offset)){ -// return false; -// } -// } if (null != lvElement(lElementBuffer, offset)){ return false; }