From 802b9169a62959835f76b65a7ced4426ee113575 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 28 Aug 2015 22:37:56 +0200 Subject: [PATCH] Operators first, last, multi-offer for SpscLinkedArrayQueue --- src/main/java/io/reactivex/Observable.java | 16 +++++++++ .../operators/OperatorSkipLastTimed.java | 8 ++--- .../internal/queue/SpscLinkedArrayQueue.java | 35 +++++++++++++++++++ 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index ef8fb4e98e..ed3bafe518 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -1497,4 +1497,20 @@ public final Observable> takeLastBuffer(long time, TimeUnit unit) { public final Observable> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) { return takeLast(time, unit, scheduler).toList(); } + + public final Observable first() { + return take(1).single(); + } + + public final Observable first(T defaultValue) { + return take(1).single(defaultValue); + } + + public final Observable last() { + return takeLast(1).single(); + } + + public final Observable last(T defaultValue) { + return takeLast(1).single(defaultValue); + } } diff --git a/src/main/java/io/reactivex/internal/operators/OperatorSkipLastTimed.java b/src/main/java/io/reactivex/internal/operators/OperatorSkipLastTimed.java index 99102d2461..f41467c6a6 100644 --- a/src/main/java/io/reactivex/internal/operators/OperatorSkipLastTimed.java +++ b/src/main/java/io/reactivex/internal/operators/OperatorSkipLastTimed.java @@ -13,17 +13,16 @@ package io.reactivex.internal.operators; -import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.Observable.Operator; +import io.reactivex.Scheduler; import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; -import io.reactivex.Scheduler; public final class OperatorSkipLastTimed implements Operator { final long time; @@ -88,12 +87,11 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - final Queue q = queue; + final SpscLinkedArrayQueue q = queue; long now = scheduler.now(unit); - q.offer(now); - q.offer(t); + q.offer(now, t); drain(); } diff --git a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java index a862a3d162..970faaefda 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java @@ -311,4 +311,39 @@ public T remove() { public T element() { throw new UnsupportedOperationException(); } + + /** + * Offer two elements at the same time. + *

Don't use the regular offer() with this at all! + * @param first + * @param second + * @return + */ + public boolean offer(T first, T second) { + final AtomicReferenceArray buffer = producerBuffer; + final long p = producerIndex; + final int m = producerMask; + + int pi = calcWrappedOffset(p + 1, m); + + if (null == lvElement(buffer, pi)) { + soElement(buffer, pi, second); + soProducerIndex(p + 2); + soElement(buffer, pi - 1, first); + } else { + final int capacity = buffer.length(); + final AtomicReferenceArray newBuffer = new AtomicReferenceArray<>(capacity); + producerBuffer = newBuffer; + + soElement(newBuffer, pi, second);// StoreStore + soElement(newBuffer, pi - 1, first); + soNext(buffer, newBuffer); + + soProducerIndex(p + 2);// this ensures correctness on 32bit platforms + + soElement(buffer, pi - 1, HAS_NEXT); // new buffer is visible after element is + } + + return true; + } }