Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Observable.window static/instance bug #351

Merged
merged 1 commit into from
Sep 6, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 20 additions & 40 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1509,8 +1509,6 @@ public Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit,
* Observable produced by the specified {@link Func0} produces a {@link rx.util.Closing} object. The {@link Func0} will then be used to create a new Observable to listen for the end of the next
* window.
*
* @param source
* The source {@link Observable} which produces values.
* @param closingSelector
* The {@link Func0} which is used to produce an {@link Observable} for every window created.
* When this {@link Observable} produces a {@link rx.util.Closing} object, the associated window
Expand All @@ -1519,8 +1517,8 @@ public Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit,
* An {@link Observable} which produces connected non-overlapping windows, which are emitted
* when the current {@link Observable} created with the {@link Func0} argument produces a {@link rx.util.Closing} object.
*/
public Observable<Observable<T>> window(Observable<? extends T> source, Func0<? extends Observable<? extends Closing>> closingSelector) {
return create(OperationWindow.window(source, closingSelector));
public Observable<Observable<T>> window(Func0<? extends Observable<? extends Closing>> closingSelector) {
return create(OperationWindow.window(this, closingSelector));
}

/**
Expand All @@ -1529,8 +1527,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, Func0<?
* Additionally the {@link Func0} argument is used to create an Observable which produces {@link rx.util.Closing} objects. When this Observable produces such an object, the associated window is
* emitted.
*
* @param source
* The source {@link Observable} which produces values.
* @param windowOpenings
* The {@link Observable} which when it produces a {@link rx.util.Opening} object, will cause
* another window to be created.
Expand All @@ -1541,34 +1537,30 @@ public Observable<Observable<T>> window(Observable<? extends T> source, Func0<?
* @return
* An {@link Observable} which produces windows which are created and emitted when the specified {@link Observable}s publish certain objects.
*/
public Observable<Observable<T>> window(Observable<? extends T> source, Observable<? extends Opening> windowOpenings, Func1<Opening, ? extends Observable<? extends Closing>> closingSelector) {
return create(OperationWindow.window(source, windowOpenings, closingSelector));
public Observable<Observable<T>> window(Observable<? extends Opening> windowOpenings, Func1<Opening, ? extends Observable<? extends Closing>> closingSelector) {
return create(OperationWindow.window(this, windowOpenings, closingSelector));
}

/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
* non-overlapping windows, each containing "count" elements. When the source Observable completes or
* encounters an error, the current window is emitted, and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param count
* The maximum size of each window before it should be emitted.
* @return
* An {@link Observable} which produces connected non-overlapping windows containing at most
* "count" produced values.
*/
public Observable<Observable<T>> window(Observable<? extends T> source, int count) {
return create(OperationWindow.window(source, count));
public Observable<Observable<T>> window(int count) {
return create(OperationWindow.window(this, count));
}

/**
* Creates an Observable which produces windows of collected values. This Observable produces windows every
* "skip" values, each containing "count" elements. When the source Observable completes or encounters an error,
* the current window is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param count
* The maximum size of each window before it should be emitted.
* @param skip
Expand All @@ -1578,17 +1570,15 @@ public Observable<Observable<T>> window(Observable<? extends T> source, int coun
* An {@link Observable} which produces windows every "skipped" values containing at most
* "count" produced values.
*/
public Observable<Observable<T>> window(Observable<? extends T> source, int count, int skip) {
return create(OperationWindow.window(source, count, skip));
public Observable<Observable<T>> window(int count, int skip) {
return create(OperationWindow.window(this, count, skip));
}

/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
* non-overlapping windows, each of a fixed duration specified by the "timespan" argument. When the source
* Observable completes or encounters an error, the current window is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each window is collecting values before it should be emitted, and
* replaced with a new window.
Expand All @@ -1597,17 +1587,15 @@ public Observable<Observable<T>> window(Observable<? extends T> source, int coun
* @return
* An {@link Observable} which produces connected non-overlapping windows with a fixed duration.
*/
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
return create(OperationWindow.window(source, timespan, unit));
public Observable<Observable<T>> window(long timespan, TimeUnit unit) {
return create(OperationWindow.window(this, timespan, unit));
}

/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
* non-overlapping windows, each of a fixed duration specified by the "timespan" argument. When the source
* Observable completes or encounters an error, the current window is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each window is collecting values before it should be emitted, and
* replaced with a new window.
Expand All @@ -1618,8 +1606,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* @return
* An {@link Observable} which produces connected non-overlapping windows with a fixed duration.
*/
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, Scheduler scheduler) {
return create(OperationWindow.window(source, timespan, unit, scheduler));
public Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, unit, scheduler));
}

/**
Expand All @@ -1628,8 +1616,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* specified by the "count" argument (which ever is reached first). When the source Observable completes
* or encounters an error, the current window is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each window is collecting values before it should be emitted, and
* replaced with a new window.
Expand All @@ -1641,8 +1627,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* An {@link Observable} which produces connected non-overlapping windows which are emitted after
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
*/
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
return create(OperationWindow.window(source, timespan, unit, count));
public Observable<Observable<T>> window(long timespan, TimeUnit unit, int count) {
return create(OperationWindow.window(this, timespan, unit, count));
}

/**
Expand All @@ -1651,8 +1637,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* specified by the "count" argument (which ever is reached first). When the source Observable completes
* or encounters an error, the current window is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each window is collecting values before it should be emitted, and
* replaced with a new window.
Expand All @@ -1666,8 +1650,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* An {@link Observable} which produces connected non-overlapping windows which are emitted after
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
*/
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count, Scheduler scheduler) {
return create(OperationWindow.window(source, timespan, unit, count, scheduler));
public Observable<Observable<T>> window(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, unit, count, scheduler));
}

/**
Expand All @@ -1676,8 +1660,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* specified by the "timespan" argument. When the source Observable completes or encounters an error, the
* current window is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each window is collecting values before it should be emitted.
* @param timeshift
Expand All @@ -1688,8 +1670,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* An {@link Observable} which produces new windows periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
return create(OperationWindow.window(source, timespan, timeshift, unit));
public Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit) {
return create(OperationWindow.window(this, timespan, timeshift, unit));
}

/**
Expand All @@ -1698,8 +1680,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* specified by the "timespan" argument. When the source Observable completes or encounters an error, the
* current window is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each window is collecting values before it should be emitted.
* @param timeshift
Expand All @@ -1712,8 +1692,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
* An {@link Observable} which produces new windows periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
return create(OperationWindow.window(source, timespan, timeshift, unit, scheduler));
public Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, timeshift, unit, scheduler));
}

/**
Expand Down
39 changes: 39 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableWindowTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rx;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class ObservableWindowTests {

@Test
public void testWindow() {
final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();
Observable.from(1, 2, 3, 4, 5, 6)
.window(3).map(new Func1<Observable<Integer>, List<Integer>>() {

@Override
public List<Integer> call(Observable<Integer> o) {
return o.toList().toBlockingObservable().single();
}

}).toBlockingObservable().forEach(new Action1<List<Integer>>() {

@Override
public void call(List<Integer> t) {
lists.add(t);
}
});

assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
assertEquals(2, lists.size());

}
}