Skip to content

Commit

Permalink
Merge pull request #1138 from akarnokd/OperatorWindow430
Browse files Browse the repository at this point in the history
Operator Window and other changes
  • Loading branch information
benjchristensen committed May 5, 2014
2 parents 1af3674 + b1effc7 commit b794705
Show file tree
Hide file tree
Showing 10 changed files with 1,256 additions and 1,305 deletions.
22 changes: 11 additions & 11 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7235,7 +7235,7 @@ public final Observable<T> unsubscribeOn(Scheduler scheduler) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) {
return create(OperationWindow.window(this, closingSelector));
return lift(new OperatorWindowWithObservable<T, TClosing>(closingSelector));
}

/**
Expand All @@ -7252,7 +7252,7 @@ public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observa
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(int count) {
return create(OperationWindow.window(this, count));
return lift(new OperatorWindowWithSize<T>(count, count));
}

/**
Expand All @@ -7272,7 +7272,7 @@ public final Observable<Observable<T>> window(int count) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(int count, int skip) {
return create(OperationWindow.window(this, count, skip));
return lift(new OperatorWindowWithSize<T>(count, skip));
}

/**
Expand All @@ -7294,7 +7294,7 @@ public final Observable<Observable<T>> window(int count, int skip) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit) {
return create(OperationWindow.window(this, timespan, timeshift, unit));
return lift(new OperatorWindowWithTime<T>(timespan, timeshift, unit, Integer.MAX_VALUE, Schedulers.computation()));
}

/**
Expand All @@ -7318,7 +7318,7 @@ public final Observable<Observable<T>> window(long timespan, long timeshift, Tim
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, timeshift, unit, scheduler));
return lift(new OperatorWindowWithTime<T>(timespan, timeshift, unit, Integer.MAX_VALUE, scheduler));
}

/**
Expand All @@ -7339,7 +7339,7 @@ public final Observable<Observable<T>> window(long timespan, long timeshift, Tim
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
return create(OperationWindow.window(this, timespan, unit));
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, Integer.MAX_VALUE, Schedulers.computation()));
}

/**
Expand All @@ -7364,7 +7364,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int count) {
return create(OperationWindow.window(this, timespan, unit, count));
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, count, Schedulers.computation()));
}

/**
Expand All @@ -7391,7 +7391,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, unit, count, scheduler));
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, count, scheduler));
}

/**
Expand All @@ -7414,7 +7414,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, unit, scheduler));
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, Integer.MAX_VALUE, scheduler));
}

/**
Expand All @@ -7434,7 +7434,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Sche
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final <TOpening, TClosing> Observable<Observable<T>> window(Observable<? extends TOpening> windowOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> closingSelector) {
return create(OperationWindow.window(this, windowOpenings, closingSelector));
return lift(new OperatorWindowWithStartEndObservable<T, TOpening, TClosing>(windowOpenings, closingSelector));
}

/**
Expand All @@ -7452,7 +7452,7 @@ public final <TOpening, TClosing> Observable<Observable<T>> window(Observable<?
* where the boundary of each window is determined by the items emitted from the {@code boundary} Observable
*/
public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
return create(OperationWindow.window(this, boundary));
return lift(new OperatorWindowWithObservable<T, U>(boundary));
}

/**
Expand Down
113 changes: 57 additions & 56 deletions rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.Subscribers;
import rx.subscriptions.Subscriptions;

/**
* A solution to the "time gap" problem that occurs with `groupBy` and `pivot` => https://github.com/Netflix/RxJava/issues/844
Expand All @@ -43,44 +47,72 @@
public class BufferUntilSubscriber<T> extends Observable<T> implements Observer<T> {

public static <T> BufferUntilSubscriber<T> create() {
return new BufferUntilSubscriber<T>(new AtomicReference<Observer<? super T>>(new BufferedObserver<T>()));
State<T> state = new State<T>();
return new BufferUntilSubscriber<T>(state);
}

private final AtomicReference<Observer<? super T>> observerRef;
/** The common state. */
static final class State<T> {
/** Lite notifications of type T. */
final NotificationLite<T> nl = NotificationLite.instance();
/** The first observer or the one which buffers until the first arrives. */
final AtomicReference<Observer<? super T>> observerRef = new AtomicReference<Observer<? super T>>(new BufferedObserver<T>());
/** Allow a single subscriber only. */
final AtomicBoolean first = new AtomicBoolean();
}

static final class OnSubscribeAction<T> implements OnSubscribe<T> {
final State<T> state;

private BufferUntilSubscriber(final AtomicReference<Observer<? super T>> observerRef) {
super(new OnSubscribe<T>() {
public OnSubscribeAction(State<T> state) {
this.state = state;
}

@Override
public void call(Subscriber<? super T> s) {
@Override
public void call(final Subscriber<? super T> s) {
if (state.first.compareAndSet(false, true)) {
// drain queued notifications before subscription
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
BufferedObserver<T> buffered = (BufferedObserver<T>) observerRef.get();
Object o = null;
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef.get();
Object o;
while ((o = buffered.buffer.poll()) != null) {
emit(s, o);
state.nl.accept(s, o);
}
// register real observer for pass-thru ... and drain any further events received on first notification
observerRef.set(new PassThruObserver<T>(s, buffered.buffer, observerRef));
state.observerRef.set(new PassThruObserver<T>(s, buffered.buffer, state.observerRef));
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
state.observerRef.set(Subscribers.empty());
}
}));
} else {
s.onError(new IllegalStateException("Only one subscriber allowed!"));
}

});
this.observerRef = observerRef;
}

}
final State<T> state;

private BufferUntilSubscriber(State<T> state) {
super(new OnSubscribeAction<T>(state));
this.state = state;
}

@Override
public void onCompleted() {
observerRef.get().onCompleted();
state.observerRef.get().onCompleted();
}

@Override
public void onError(Throwable e) {
observerRef.get().onError(e);
state.observerRef.get().onError(e);
}

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void onNext(T t) {
observerRef.get().onNext(t);
state.observerRef.get().onNext(t);
}

/**
Expand All @@ -97,6 +129,7 @@ private static class PassThruObserver<T> extends Subscriber<T> {
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
private final ConcurrentLinkedQueue<Object> buffer;
private final AtomicReference<Observer<? super T>> observerRef;
private final NotificationLite<T> nl = NotificationLite.instance();

PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer, AtomicReference<Observer<? super T>> observerRef) {
this.actual = actual;
Expand All @@ -123,67 +156,35 @@ public void onNext(T t) {
}

private void drainIfNeededAndSwitchToActual() {
Object o = null;
Object o;
while ((o = buffer.poll()) != null) {
emit(this, o);
nl.accept(this, o);
}
// now we can safely change over to the actual and get rid of the pass-thru
observerRef.set(actual);
// but only if not unsubscribed
observerRef.compareAndSet(this, actual);
}

}

private static class BufferedObserver<T> extends Subscriber<T> {
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
private final NotificationLite<T> nl = NotificationLite.instance();

@Override
public void onCompleted() {
buffer.add(COMPLETE_SENTINEL);
buffer.add(nl.completed());
}

@Override
public void onError(Throwable e) {
buffer.add(new ErrorSentinel(e));
buffer.add(nl.error(e));
}

@Override
public void onNext(T t) {
if (t == null) {
buffer.add(NULL_SENTINEL);
} else {
buffer.add(t);
}
}

}

private final static <T> void emit(Observer<T> s, Object v) {
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
s.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
s.onCompleted();
} else if (v instanceof ErrorSentinel) {
s.onError(((ErrorSentinel) v).e);
}
} else {
s.onNext((T) v);
buffer.add(nl.next(t));
}
}

private static class Sentinel {

}

private static Sentinel NULL_SENTINEL = new Sentinel();
private static Sentinel COMPLETE_SENTINEL = new Sentinel();

private static class ErrorSentinel extends Sentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
this.e = e;
}
}

}
Loading

0 comments on commit b794705

Please sign in to comment.