Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxRingBuffer with synchronization #2553

Merged
merged 4 commits into from
Jan 28, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
private RxRingBuffer getOrCreateScalarValueQueue() {
RxRingBuffer svq = scalarValueQueue;
if (svq == null) {
svq = RxRingBuffer.getSpmcInstance();
svq = RxRingBuffer.getSpscInstance();
scalarValueQueue = svq;
}
return svq;
Expand Down Expand Up @@ -581,7 +581,7 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");

private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
private final RxRingBuffer q = RxRingBuffer.getSpscInstance();

public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
this.parentSubscriber = parent;
Expand Down
90 changes: 50 additions & 40 deletions src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public class RxRingBuffer implements Subscription {

public static RxRingBuffer getSpscInstance() {
if (UnsafeAccess.isUnsafeAvailable()) {
// TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now
return new RxRingBuffer(SPMC_POOL, SIZE);
return new RxRingBuffer(SPSC_POOL, SIZE);
} else {
return new RxRingBuffer();
}
Expand Down Expand Up @@ -306,12 +305,13 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
this.size = size;
}

public void release() {
if (pool != null) {
Queue<Object> q = queue;
public synchronized void release() {
Queue<Object> q = queue;
ObjectPool<Queue<Object>> p = pool;
if (p != null && q != null) {
q.clear();
queue = null;
pool.returnObject(q);
p.returnObject(q);
}
}

Expand All @@ -331,10 +331,21 @@ public void unsubscribe() {
* if more onNext are sent than have been requested
*/
public void onNext(Object o) throws MissingBackpressureException {
if (queue == null) {
boolean iae = false;
boolean mbe = false;
synchronized (this) {
Queue<Object> q = queue;
if (q != null) {
mbe = !q.offer(on.next(o));
} else {
iae = true;
}
}

if (iae) {
throw new IllegalStateException("This instance has been unsubscribed and the queue is no longer usable.");
}
if (!queue.offer(on.next(o))) {
if (mbe) {
throw new MissingBackpressureException();
}
}
Expand Down Expand Up @@ -362,55 +373,54 @@ public int capacity() {
}

public int count() {
if (queue == null) {
Queue<Object> q = queue;
if (q == null) {
return 0;
}
return queue.size();
return q.size();
}

public boolean isEmpty() {
if (queue == null) {
Queue<Object> q = queue;
if (q == null) {
return true;
}
return queue.isEmpty();
return q.isEmpty();
}

public Object poll() {
if (queue == null) {
// we are unsubscribed and have released the undelrying queue
return null;
}
Object o;
o = queue.poll();
/*
* benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
* "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
*
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
* or needing to enqueue terminalState.
*
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
* is currently the way it is.
*/
if (o == null && terminalState != null && queue.isEmpty()) {
o = terminalState;
// once emitted we clear so a poll loop will finish
terminalState = null;
synchronized (this) {
Queue<Object> q = queue;
if (q == null) {
// we are unsubscribed and have released the underlying queue
return null;
}
o = q.poll();

Object ts = terminalState;
if (o == null && ts != null && q.peek() == null) {
o = ts;
// once emitted we clear so a poll loop will finish
terminalState = null;
}
}
return o;
}

public Object peek() {
if (queue == null) {
// we are unsubscribed and have released the undelrying queue
return null;
}
Object o;
o = queue.peek();
if (o == null && terminalState != null && queue.isEmpty()) {
o = terminalState;
synchronized (this) {
Queue<Object> q = queue;
if (q == null) {
// we are unsubscribed and have released the underlying queue
return null;
}
o = q.peek();
Object ts = terminalState;
if (o == null && ts != null && q.peek() == null) {
o = ts;
}
}
return o;
}
Expand Down
23 changes: 13 additions & 10 deletions src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,22 @@ public SpscArrayQueue(final int capacity) {
*/
@Override
public boolean offer(final E e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the fixes for SpscArrayQueue tied to this PR or can they be done separately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change relaxing Spsc improves the performance by 1-2% but conflicts with #2541.

if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// if (null == e) {
// throw new NullPointerException("Null is not a valid element");
// }
// local load of field to avoid repeated loads after volatile reads
final E[] lElementBuffer = buffer;
final long offset = calcElementOffset(producerIndex);
if (producerIndex >= producerLookAhead) {
if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
producerLookAhead = producerIndex + lookAheadStep;
}
else if (null != lvElement(lElementBuffer, offset)){
return false;
}
// if (producerIndex >= producerLookAhead) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason to keep this commented code?

// if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
// producerLookAhead = producerIndex + lookAheadStep;
// }
// else if (null != lvElement(lElementBuffer, offset)){
// return false;
// }
// }
if (null != lvElement(lElementBuffer, offset)){
return false;
}
producerIndex++; // do increment here so the ordered store give both a barrier
soElement(lElementBuffer, offset, e);// StoreStore
Expand Down