Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual merge of Lock-free, MPSC-queue based #1284

Merged
merged 4 commits into from
May 29, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.util;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* A multiple-producer single consumer queue implementation with padded reference
* to tail to avoid cache-line thrashing.
* Based on Netty's <a href='https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java'>MpscQueue implementation</a> but using AtomicReferenceFieldUpdater
* instead of Unsafe.
* @param <E> the element type
*/
public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.Node<E>> {
@SuppressWarnings(value = "rawtypes")
static final AtomicReferenceFieldUpdater<PaddedNode, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail");
/** */
private static final long serialVersionUID = 1L;
/** The padded tail reference. */
final PaddedNode<E> tail;
/**
* Initializes the empty queue.
*/
public MpscPaddedQueue() {
Node<E> first = new Node<E>(null);
tail = new PaddedNode<E>();
tail.tail = first;
set(first);
}
/**
* Offer a new value.
* @param v the value to offer
*/
public void offer(E v) {
Node<E> n = new Node<E>(v);
getAndSet(n).set(n);
}

/**
* @return Poll a value from the head of the queue or return null if the queue is empty.
*/
public E poll() {
Node<E> n = peekNode();
if (n == null) {
return null;
}
E v = n.value;
n.value = null; // do not retain this value as the node still stays in the queue
TAIL_UPDATER.lazySet(tail, n);
return v;
}
/**
* Check if there is a node available without changing anything.
*/
private Node<E> peekNode() {
for (;;) {
@SuppressWarnings(value = "unchecked")
Node<E> t = TAIL_UPDATER.get(tail);
Node<E> n = t.get();
if (n != null || get() == t) {
return n;
}
}
}
/**
* Clears the queue.
*/
public void clear() {
for (;;) {
if (poll() == null) {
break;
}
}
}
/** Class that contains a Node reference padded around to fit a typical cache line. */
static final class PaddedNode<E> {
/** Padding, public to prevent optimizing it away. */
public int p1;
volatile Node<E> tail;
/** Padding, public to prevent optimizing it away. */
public long p2;
/** Padding, public to prevent optimizing it away. */
public long p3;
/** Padding, public to prevent optimizing it away. */
public long p4;
/** Padding, public to prevent optimizing it away. */
public long p5;
/** Padding, public to prevent optimizing it away. */
public long p6;
}
/**
* Regular node with value and reference to the next node.
*/
static final class Node<E> {

E value;
@SuppressWarnings(value = "rawtypes")
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
volatile Node<E> tail;

public Node(E value) {
this.value = value;
}

public void set(Node<E> newTail) {
TAIL_UPDATER.lazySet(this, newTail);
}

@SuppressWarnings(value = "unchecked")
public Node<E> get() {
return TAIL_UPDATER.get(this);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
* An AtomicInteger with extra fields to pad it out to fit a typical cache line.
*/
public final class PaddedAtomicInteger extends AtomicInteger {
private static final long serialVersionUID = 1L;
/** Padding, public to prevent optimizing it away. */
public int p1;
/** Padding, public to prevent optimizing it away. */
public int p2;
/** Padding, public to prevent optimizing it away. */
public int p3;
/** Padding, public to prevent optimizing it away. */
public int p4;
/** Padding, public to prevent optimizing it away. */
public int p5;
/** Padding, public to prevent optimizing it away. */
public int p6;
/** Padding, public to prevent optimizing it away. */
public int p7;
/** Padding, public to prevent optimizing it away. */
public int p8;
/** Padding, public to prevent optimizing it away. */
public int p9;
/** Padding, public to prevent optimizing it away. */
public int p10;
/** Padding, public to prevent optimizing it away. */
public int p11;
/** Padding, public to prevent optimizing it away. */
public int p12;
/** Padding, public to prevent optimizing it away. */
public int p13;
/** @return prevents optimizing away the fields, most likely. */
public int noopt() {
return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13;
}

}
3 changes: 3 additions & 0 deletions rxjava-core/src/main/java/rx/internal/util/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This `rx.internal.*` package is for internal use only. Any code here can change at any time and is not considered part of the public API, even if the classes are `public` so as to be used from other packages within `rx.*`.

If you depend on these classes, your code may break in any future RxJava release, even if it's just a patch release (major.minor.patch).