Skip to content

Commit

Permalink
Merge pull request #1526 from benjchristensen/ring-buffer-queue
Browse files Browse the repository at this point in the history
Restore use of SpmcArrayQueue in RxRingBuffer
  • Loading branch information
benjchristensen committed Jul 29, 2014
2 parents eedeeb9 + 58a9d34 commit 08bf50f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
38 changes: 23 additions & 15 deletions rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import rx.Subscription;
import rx.exceptions.MissingBackpressureException;
import rx.internal.operators.NotificationLite;
import rx.internal.util.unsafe.SpmcArrayQueue;
import rx.internal.util.unsafe.SpscArrayQueue;
import rx.internal.util.unsafe.UnsafeAccess;

/**
Expand All @@ -31,28 +33,16 @@ public class RxRingBuffer implements Subscription {

public static RxRingBuffer getSpscInstance() {
if (UnsafeAccess.isUnsafeAvailable()) {
// using SynchronizedQueue until issues are solved with SpscArrayQueue offer rejection
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
// BackpressureException when using SpscArrayQueue
// return new RxRingBuffer(SPSC_POOL, SIZE); // this is the one we were trying to use
// return new RxRingBuffer(new SpscArrayQueue<Object>(SIZE), SIZE);
// the performance of this is sufficient (actually faster in some cases)
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
// TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now
return new RxRingBuffer(SPMC_POOL, SIZE);
} else {
return new RxRingBuffer();
}
}

public static RxRingBuffer getSpmcInstance() {
if (UnsafeAccess.isUnsafeAvailable()) {
// using SynchronizedQueue until issues are solved with SpmcArrayQueue offer rejection
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
// BackpressureException when using SpmcArrayQueue/MpmcArrayQueue
// return new RxRingBuffer(SPMC_POOL, SIZE); // this is the one we were trying to use
// return new RxRingBuffer(new SpmcArrayQueue<Object>(SIZE), SIZE);
// return new RxRingBuffer(new MpmcArrayQueue<Object>(SIZE), SIZE);
// the performance of this is sufficient (actually faster in some cases)
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
return new RxRingBuffer(SPMC_POOL, SIZE);
} else {
return new RxRingBuffer();
}
Expand Down Expand Up @@ -170,6 +160,24 @@ public static RxRingBuffer getSpmcInstance() {

public static final int SIZE = 1024;

private static ObjectPool<Queue<Object>> SPSC_POOL = new ObjectPool<Queue<Object>>() {

@Override
protected SpscArrayQueue<Object> createObject() {
return new SpscArrayQueue<Object>(SIZE);
}

};

private static ObjectPool<Queue<Object>> SPMC_POOL = new ObjectPool<Queue<Object>>() {

@Override
protected SpmcArrayQueue<Object> createObject() {
return new SpmcArrayQueue<Object>(SIZE);
}

};

private RxRingBuffer(Queue<Object> queue, int size) {
this.queue = queue;
this.pool = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,15 @@ public boolean offer(final E e) {
final long currProducerIndex = lvProducerIndex();
final long offset = calcElementOffset(currProducerIndex);
if (null != lvElement(lb, offset)) {
return false;
// strict check as per https://github.com/JCTools/JCTools/issues/21#issuecomment-50204120
int size = (int) (currProducerIndex - lvConsumerIndex());
if (size == capacity) {
return false;
}
else {
// spin wait for slot to clear, buggers wait freedom
while (null != lvElement(lb, offset));
}
}
spElement(lb, offset, e);
// single producer, so store ordered is valid. It is also required to correctly publish the element
Expand Down

0 comments on commit 08bf50f

Please sign in to comment.