Skip to content

Commit

Permalink
Merge pull request #1056 from zsxwing/scala-drop
Browse files Browse the repository at this point in the history
Add drop(skip) and dropRight(skipLast) to rxscala
  • Loading branch information
benjchristensen committed Apr 21, 2014
2 parents 315a994 + 6b4d8f5 commit a2274d3
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,32 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(1, List(1).toObservable.toBlockingObservable.single)
}

@Test def dropExample() {
val o = List(1, 2, 3, 4).toObservable
assertEquals(List(3, 4), o.drop(2).toBlockingObservable.toList)
}

@Test def dropWithTimeExample() {
val o = List(1, 2, 3, 4).toObservable.zip(
Observable.interval(500 millis, IOScheduler())).map(_._1) // emit every 500 millis
println(
o.drop(1250 millis, IOScheduler()).toBlockingObservable.toList // output List(3, 4)
)
}

@Test def dropRightExample() {
val o = List(1, 2, 3, 4).toObservable
assertEquals(List(1, 2), o.dropRight(2).toBlockingObservable.toList)
}

@Test def dropRightWithTimeExample() {
val o = List(1, 2, 3, 4).toObservable.zip(
Observable.interval(500 millis, IOScheduler())).map(_._1) // emit every 500 millis
println(
o.dropRight(750 millis, IOScheduler()).toBlockingObservable.toList // output List(1, 2)
)
}

def square(x: Int): Int = {
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
Thread.sleep(100) // calculating a square is heavy work :)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,35 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.skip(n))
}

/**
* Returns an Observable that drops values emitted by the source Observable before a specified time window
* elapses.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skip.t.png">
*
* @param time the length of the time window to drop
* @return an Observable that drops values emitted by the source Observable before the time window defined
* by `time` elapses and emits the remainder
*/
def drop(time: Duration): Observable[T] = {
toScalaObservable(asJavaObservable.skip(time.length, time.unit))
}

/**
* Returns an Observable that drops values emitted by the source Observable before a specified time window
* elapses.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skip.t.png">
*
* @param time the length of the time window to drop
* @param scheduler the `Scheduler` on which the timed wait happens
* @return an Observable that drops values emitted by the source Observable before the time window defined
* by `time` elapses and emits the remainder
*/
def drop(time: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable(asJavaObservable.skip(time.length, time.unit, scheduler))
}

/**
* Returns an Observable that bypasses all items from the source Observable as long as the specified
* condition holds true. Emits all further source items as soon as the condition becomes false.
Expand All @@ -1246,6 +1275,58 @@ trait Observable[+T]
toScalaObservable(asJavaObservable.skipWhile(predicate))
}

/**
* Returns an Observable that drops a specified number of items from the end of the sequence emitted by the
* source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.png">
* <p>
* This Observer accumulates a queue long enough to store the first `n` items. As more items are
* received, items are taken from the front of the queue and emitted by the returned Observable. This causes
* such items to be delayed.
*
* @param n number of items to drop from the end of the source sequence
* @return an Observable that emits the items emitted by the source Observable except for the dropped ones
* at the end
* @throws IndexOutOfBoundsException if `n` is less than zero
*/
def dropRight(n: Int): Observable[T] = {
toScalaObservable(asJavaObservable.skipLast(n))
}

/**
* Returns an Observable that drops items emitted by the source Observable during a specified time window
* before the source completes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.t.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
*
* @param time the length of the time window
* @return an Observable that drops those items emitted by the source Observable in a time window before the
* source completes defined by `time`
*/
def dropRight(time: Duration): Observable[T] = {
toScalaObservable(asJavaObservable.skipLast(time.length, time.unit))
}

/**
* Returns an Observable that drops items emitted by the source Observable during a specified time window
* (defined on a specified scheduler) before the source completes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.ts.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
*
* @param time the length of the time window
* @param scheduler the scheduler used as the time source
* @return an Observable that drops those items emitted by the source Observable in a time window before the
* source completes defined by `time` and `scheduler`
*/
def dropRight(time: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable(asJavaObservable.skipLast(time.length, time.unit, scheduler))
}

/**
* Returns an Observable that emits only the first `num` items emitted by the source
* Observable.
Expand Down
6 changes: 5 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5566,6 +5566,8 @@ public final Observable<T> skipLast(int count) {
* before the source completes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.t.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
*
* @param time
* the length of the time window
Expand All @@ -5585,7 +5587,9 @@ public final Observable<T> skipLast(long time, TimeUnit unit) {
* (defined on a specified scheduler) before the source completes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.ts.png">
*
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
*
* @param time
* the length of the time window
* @param unit
Expand Down

0 comments on commit a2274d3

Please sign in to comment.