diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index bf5f5d8aba..ef8fb4e98e 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -1412,4 +1412,89 @@ public static Observable fromFuture(Future future, long time Observable o = fromFuture(future, timeout, unit); return o.subscribeOn(scheduler); } + + public final Observable skipLast(long time, TimeUnit unit) { + return skipLast(time, unit, Schedulers.trampoline(), false, bufferSize()); + } + + public final Observable skipLast(long time, TimeUnit unit, boolean delayError) { + return skipLast(time, unit, Schedulers.trampoline(), delayError, bufferSize()); + } + + public final Observable skipLast(long time, TimeUnit unit, Scheduler scheduler) { + return skipLast(time, unit, scheduler, false, bufferSize()); + } + + public final Observable skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError) { + return skipLast(time, unit, scheduler, delayError, bufferSize()); + } + + public final Observable skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) { + Objects.requireNonNull(unit); + Objects.requireNonNull(scheduler); + if (bufferSize <= 0) { + throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize); + } + return lift(new OperatorSkipLastTimed<>(time, unit, scheduler, bufferSize, delayError)); + } + + public final Observable takeLast(long time, TimeUnit unit) { + return takeLast(time, unit, Schedulers.trampoline(), false, bufferSize()); + } + + public final Observable takeLast(long count, long time, TimeUnit unit) { + return takeLast(count, time, unit, Schedulers.trampoline(), false, bufferSize()); + } + + public final Observable takeLast(long time, TimeUnit unit, boolean delayError) { + return takeLast(time, unit, Schedulers.trampoline(), delayError, bufferSize()); + } + + public final Observable takeLast(long time, TimeUnit unit, Scheduler scheduler) { + return takeLast(time, unit, scheduler, false, bufferSize()); + } + + public final Observable takeLast(long count, long time, TimeUnit unit, Scheduler scheduler) { + return takeLast(count, time, unit, scheduler, false, bufferSize()); + } + + public final Observable takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError) { + return takeLast(time, unit, scheduler, delayError, bufferSize()); + } + + public final Observable takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) { + return takeLast(Long.MAX_VALUE, time, unit, scheduler, delayError, bufferSize); + } + + public final Observable takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) { + Objects.requireNonNull(unit); + Objects.requireNonNull(scheduler); + if (bufferSize <= 0) { + throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize); + } + if (count < 0) { + throw new IllegalArgumentException("count >= 0 required but it was " + count); + } + return lift(new OperatorTakeLastTimed<>(count, time, unit, scheduler, bufferSize, delayError)); + } + + public final Observable> takeLastBuffer(int count) { + return takeLast(count).toList(); + } + + public final Observable> takeLastBuffer(int count, long time, TimeUnit unit) { + return takeLast(count, time, unit).toList(); + } + + public final Observable> takeLastBuffer(int count, long time, TimeUnit unit, Scheduler scheduler) { + return takeLast(count, time, unit, scheduler).toList(); + } + + public final Observable> takeLastBuffer(long time, TimeUnit unit) { + return takeLast(time, unit).toList(); + } + + public final Observable> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) { + return takeLast(time, unit, scheduler).toList(); + } } diff --git a/src/main/java/io/reactivex/internal/operators/OperatorSkipLastTimed.java b/src/main/java/io/reactivex/internal/operators/OperatorSkipLastTimed.java new file mode 100644 index 0000000000..99102d2461 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/OperatorSkipLastTimed.java @@ -0,0 +1,243 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators; + +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.Observable.Operator; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.Scheduler; + +public final class OperatorSkipLastTimed implements Operator { + final long time; + final TimeUnit unit; + final Scheduler scheduler; + final int bufferSize; + final boolean delayError; + + public OperatorSkipLastTimed(long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) { + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + this.bufferSize = bufferSize; + this.delayError = delayError; + } + + @Override + public Subscriber apply(Subscriber t) { + return new SkipLastTimedSubscriber<>(t, time, unit, scheduler, bufferSize, delayError); + } + + static final class SkipLastTimedSubscriber extends AtomicInteger implements Subscriber, Subscription { + /** */ + private static final long serialVersionUID = -5677354903406201275L; + final Subscriber actual; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + final SpscLinkedArrayQueue queue; + final boolean delayError; + + Subscription s; + + volatile long requested; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(SkipLastTimedSubscriber.class, "requested"); + + volatile boolean cancelled; + + volatile boolean done; + Throwable error; + + public SkipLastTimedSubscriber(Subscriber actual, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) { + this.actual = actual; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + this.queue = new SpscLinkedArrayQueue<>(bufferSize); + this.delayError = delayError; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validateSubscription(this.s, s)) { + return; + } + this.s = s; + actual.onSubscribe(this); + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + final Queue q = queue; + + long now = scheduler.now(unit); + + q.offer(now); + q.offer(t); + + drain(); + } + + @Override + public void onError(Throwable t) { + error = t; + done = true; + drain(); + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validateRequest(n)) { + return; + } + BackpressureHelper.add(REQUESTED, this, n); + drain(); + } + + @Override + public void cancel() { + if (cancelled) { + cancelled = true; + + if (getAndIncrement() == 0) { + queue.clear(); + s.cancel(); + } + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + + final Subscriber a = actual; + final SpscLinkedArrayQueue q = queue; + final boolean delayError = this.delayError; + final TimeUnit unit = this.unit; + final Scheduler scheduler = this.scheduler; + final long time = this.time; + + for (;;) { + + if (checkTerminated(done, q.isEmpty(), a, delayError)) { + return; + } + + long r = requested; + boolean unbounded = r == Long.MAX_VALUE; + long e = 0L; + + while (r != 0) { + boolean d = done; + + Long ts = (Long)q.peek(); + + boolean empty = ts == null; + + if (checkTerminated(d, empty, a, delayError)) { + return; + } + + if (empty) { + break; + } + + long now = scheduler.now(unit); + + if (ts >= now - time) { + // not old enough + break; + } + + // wait unit the second value arrives + if (q.size() == 1L) { + continue; + } + + q.poll(); + + @SuppressWarnings("unchecked") + T v = (T)q.poll(); + + a.onNext(v); + + r--; + e--; + } + + if (e != 0L) { + if (!unbounded) { + REQUESTED.addAndGet(this, e); + } + } + + missed = getAndSet(-missed); + if (missed == 0) { + break; + } + } + } + + boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError) { + if (cancelled) { + queue.clear(); + s.cancel(); + return true; + } + if (d) { + if (delayError) { + if (empty) { + Throwable e = error; + if (e != null) { + a.onError(e); + } else { + a.onComplete(); + } + return true; + } + } else { + Throwable e = error; + if (e != null) { + queue.clear(); + a.onError(e); + return true; + } else + if (empty) { + a.onComplete(); + return true; + } + } + } + return false; + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/OperatorTakeLastTimed.java b/src/main/java/io/reactivex/internal/operators/OperatorTakeLastTimed.java new file mode 100644 index 0000000000..f3665d6680 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/OperatorTakeLastTimed.java @@ -0,0 +1,245 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators; + +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.Observable.Operator; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.Scheduler; + +public final class OperatorTakeLastTimed implements Operator { + final long count; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + final int bufferSize; + final boolean delayError; + + public OperatorTakeLastTimed(long count, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) { + this.count = count; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + this.bufferSize = bufferSize; + this.delayError = delayError; + } + + @Override + public Subscriber apply(Subscriber t) { + return new TakeLastTimedSubscriber<>(t, count, time, unit, scheduler, bufferSize, delayError); + } + + static final class TakeLastTimedSubscriber extends AtomicInteger implements Subscriber, Subscription { + /** */ + private static final long serialVersionUID = -5677354903406201275L; + final Subscriber actual; + final long count; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + final Queue queue; + final boolean delayError; + + Subscription s; + + volatile long requested; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(TakeLastTimedSubscriber.class, "requested"); + + volatile boolean cancelled; + + volatile boolean done; + Throwable error; + + public TakeLastTimedSubscriber(Subscriber actual, long count, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) { + this.actual = actual; + this.count = count; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + this.queue = new SpscLinkedArrayQueue<>(bufferSize); + this.delayError = delayError; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validateSubscription(this.s, s)) { + return; + } + this.s = s; + actual.onSubscribe(this); + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + final Queue q = queue; + + long now = scheduler.now(unit); + long time = this.time; + long c = count; + boolean unbounded = c == Long.MAX_VALUE; + + q.offer(now); + q.offer(t); + + while (!q.isEmpty()) { + long ts = (Long)q.peek(); + if (ts <= now - time || (!unbounded && (q.size() >> 1) > c)) { + q.poll(); + q.poll(); + } else { + break; + } + } + } + + @Override + public void onError(Throwable t) { + error = t; + done = true; + drain(); + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validateRequest(n)) { + return; + } + BackpressureHelper.add(REQUESTED, this, n); + drain(); + } + + @Override + public void cancel() { + if (cancelled) { + cancelled = true; + + if (getAndIncrement() == 0) { + queue.clear(); + s.cancel(); + } + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + + final Subscriber a = actual; + final Queue q = queue; + final boolean delayError = this.delayError; + + for (;;) { + + if (done) { + boolean empty = q.isEmpty(); + + if (checkTerminated(empty, a, delayError)) { + return; + } + + long r = requested; + boolean unbounded = r == Long.MAX_VALUE; + long e = 0L; + + while (r != 0) { + Object ts = q.poll(); // the timestamp long + empty = ts == null; + + if (checkTerminated(empty, a, delayError)) { + return; + } + + if (empty) { + break; + } + + @SuppressWarnings("unchecked") + T o = (T)q.poll(); + if (o == null) { + s.cancel(); + a.onError(new IllegalStateException("Queue empty?!")); + return; + } + + a.onNext(o); + + r--; + e--; + } + + if (e != 0L) { + if (!unbounded) { + REQUESTED.addAndGet(this, e); + } + } + } + + missed = getAndSet(-missed); + if (missed == 0) { + break; + } + } + } + + boolean checkTerminated(boolean empty, Subscriber a, boolean delayError) { + if (cancelled) { + queue.clear(); + s.cancel(); + return true; + } + if (delayError) { + if (empty) { + Throwable e = error; + if (e != null) { + a.onError(e); + } else { + a.onComplete(); + } + return true; + } + } else { + Throwable e = error; + if (e != null) { + queue.clear(); + a.onError(e); + return true; + } else + if (empty) { + a.onComplete(); + return true; + } + } + return false; + } + } +}