Skip to content

Commit

Permalink
Operators first, last, multi-offer for SpscLinkedArrayQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Aug 28, 2015
1 parent e7ce462 commit 802b916
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
16 changes: 16 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1497,4 +1497,20 @@ public final Observable<List<T>> takeLastBuffer(long time, TimeUnit unit) {
public final Observable<List<T>> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) {
return takeLast(time, unit, scheduler).toList();
}

public final Observable<T> first() {
return take(1).single();
}

public final Observable<T> first(T defaultValue) {
return take(1).single(defaultValue);
}

public final Observable<T> last() {
return takeLast(1).single();
}

public final Observable<T> last(T defaultValue) {
return takeLast(1).single(defaultValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@

package io.reactivex.internal.operators;

import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

import io.reactivex.Observable.Operator;
import io.reactivex.Scheduler;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.Scheduler;

public final class OperatorSkipLastTimed<T> implements Operator<T, T> {
final long time;
Expand Down Expand Up @@ -88,12 +87,11 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
final Queue<Object> q = queue;
final SpscLinkedArrayQueue<Object> q = queue;

long now = scheduler.now(unit);

q.offer(now);
q.offer(t);
q.offer(now, t);

drain();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,39 @@ public T remove() {
public T element() {
throw new UnsupportedOperationException();
}

/**
* Offer two elements at the same time.
* <p>Don't use the regular offer() with this at all!
* @param first
* @param second
* @return
*/
public boolean offer(T first, T second) {
final AtomicReferenceArray<Object> buffer = producerBuffer;
final long p = producerIndex;
final int m = producerMask;

int pi = calcWrappedOffset(p + 1, m);

if (null == lvElement(buffer, pi)) {
soElement(buffer, pi, second);
soProducerIndex(p + 2);
soElement(buffer, pi - 1, first);
} else {
final int capacity = buffer.length();
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<>(capacity);
producerBuffer = newBuffer;

soElement(newBuffer, pi, second);// StoreStore
soElement(newBuffer, pi - 1, first);
soNext(buffer, newBuffer);

soProducerIndex(p + 2);// this ensures correctness on 32bit platforms

soElement(buffer, pi - 1, HAS_NEXT); // new buffer is visible after element is
}

return true;
}
}

0 comments on commit 802b916

Please sign in to comment.