-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GroupBy "time gap" Issue #844
Comments
How about 1) and present a new operator to cover the parallel processing case directly. |
That doesn't prevent people from using |
It's a tough trade-off ... non-determinism when using things that should not inject non-determism ... or risk of blocking. Another possible solution is we could special case |
Actually ... modifying |
See ReactiveX#844 Not completely thrilled with or 100% confident in this solution, but it does make the groupBy unit tests pass.
See ReactiveX#844 Not completely thrilled with or 100% confident in this solution, but it does make the groupBy unit tests pass.
I've submitted an attempt at a solution. The issue appears to be resolved but I don't completely like the solution nor do I trust it 100% yet. I need to sleep and think about it again tomorrow, but I'd appreciate a review and feedback, or a better solution from someone :-) |
See ReactiveX#844 Not completely thrilled with or 100% confident in this solution, but it does make the groupBy unit tests pass.
There are trade-offs to solving this solution that probably are not worth it. I'd like people's opinion on whether we should figure out how to make these unit tests pass, or if it's acceptable for them not to: @Test
public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
final ArrayList<String> results = new ArrayList<String>();
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> sub) {
sub.onNext(1);
sub.onNext(2);
sub.onNext(1);
sub.onNext(2);
sub.onCompleted();
}
}).groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t) {
return t;
}
}).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
@Override
public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
return group.subscribeOn(Schedulers.newThread()).map(new Func1<Integer, String>() {
@Override
public String call(Integer t1) {
System.out.println("Received: " + t1 + " on group : " + group.getKey());
return "first groups: " + t1;
}
});
}
}).toBlockingObservable().forEach(new Action1<String>() {
@Override
public void call(String s) {
results.add(s);
}
});
System.out.println("Results: " + results);
assertEquals(4, results.size());
}
@Test
public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
final ArrayList<String> results = new ArrayList<String>();
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> sub) {
sub.onNext(1);
sub.onNext(2);
sub.onNext(1);
sub.onNext(2);
try {
first.await();
} catch (InterruptedException e) {
sub.onError(e);
return;
}
sub.onNext(3);
sub.onNext(3);
sub.onCompleted();
}
}).groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t) {
return t;
}
}).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
@Override
public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
if (group.getKey() < 3) {
return group.map(new Func1<Integer, String>() {
@Override
public String call(Integer t1) {
return "first groups: " + t1;
}
})
// must take(2) so an onCompleted + unsubscribe happens on these first 2 groups
.take(2).doOnCompleted(new Action0() {
@Override
public void call() {
first.countDown();
}
});
} else {
return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
@Override
public String call(Integer t1) {
return "last group: " + t1;
}
});
}
}
}).toBlockingObservable().forEach(new Action1<String>() {
@Override
public void call(String s) {
results.add(s);
}
});
System.out.println("Results: " + results);
assertEquals(6, results.size());
} |
Here is a @Test
public void testSubscribeOnPublishSubjectWithSlowScheduler() {
PublishSubject<Integer> ps = PublishSubject.create();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
ps.subscribeOn(new SlowScheduler()).subscribe(ts);
ps.onNext(1);
ps.onNext(2);
ps.onCompleted();
ts.awaitTerminalEvent();
ts.assertReceivedOnNext(Arrays.asList(1, 2));
} |
Maybe subscribeOn has to be smarter about its source (and not everyone else). |
That's an interesting idea, especially since What do you think about the lack of back pressure during that buffering period? The reason is the same as |
Bounded buffering like in observeOn seems to be a workable solution, since the source needs to be blocked until the replay catches up anyway. I'm not sure, however, that my idea is deadlock-free or not. I'll do some experiments tomorrow. |
Thanks, I look forward to seeing your idea. I appreciate your back-and-forth on this with me. |
I ran head-on into the "time gap" (ReactiveX#844) issue while working on a stream processing use case (and new 'pivot' operator I'm writing). This is a solution. It's still not ideal as the Javadocs of BufferUntilSubscriber mention, but this is working better than nothing and does not require blocking threads. A better solution will come as part of the back pressure work where BufferUntilSubscriber will evolve to have a bounded buffer.
I ran head-on into the "time gap" (ReactiveX#844) issue while working on a stream processing use case (and new 'pivot' operator I'm writing). This is a solution. It's still not ideal as the Javadocs of BufferUntilSubscriber mention, but this is working better than nothing and does not require blocking threads. A better solution will come as part of the back pressure work where BufferUntilSubscriber will evolve to have a bounded buffer.
Is this settled? |
I believe so. |
Hi Ben, I found your post after worrying that I could run into this situation in my own code. It is implied in your post, but I wanted to be entirely sure: Can this problem arise if you call subscribe on the same scheduler? i.e. if I do
is it guaranteed that the notification that evokes the first (outer) subscribe will also be pushed to |
As of 0.18 this problem shouldn’t exist any longer as it will buffer until the subscriber arrives. If the subscribe is occurring synchronously, then it is never a problem, only when the subscribe happens asynchronously (such as using Now the choice of same or different scheduler is nuanced if using Your example code would be fine, because the
The reason it becomes vulnerable is that the function returns and groupBy continues forward while the We put a solution in place for this in #975. The downside is it makes it possible to have a memory leak if groups are skipped (purposefully not subscribed to), and we don’t have a solution for that yet. We decided that we'd rather avoid the non-deterministic data loss and risk a memory leak as that's easier to explain to people. At some point we'll need to solve the memory leak issue as well (such as emit an error, auto-unsubscribe or something similar if a group is emitted and not subscribed to). |
I opened #1280 to cover the side-effect of this solution. |
The
groupBy
operator has a "time gap" issue when used withsubscribeOn
andobserveOn
. This exists in Rx.Net as well and was written about at http://blogs.msdn.com/b/rxteam/archive/2012/06/14/testing-rx-queries-using-virtual-time-scheduling.aspxIn discussion with @headinthebox I have decided to alter the behavior to remove this "time gap" issue so that non-deterministic data loss does not happen for the common use cases of using
observeOn
andsubscribeOn
withGroupedObservables
fromgroupBy
.Why? It is common to want to use
observeOn
orsubscribeOn
withGroupedObservable
do process different groups in parallel.It comes with a trade-off though: all
GroupedObservable
instances emitted bygroupBy
must be subscribed to otherwise it will block. The reason for this is that to solve the "time gap" one of two things must be done:a) use unbounded buffering (such as
ReplaySubject
)b) block the
onNext
calls untilGroupedObservable
is subscribed to and receiving the dataWe can not choose (a) for the reasons given in the Rx.Net blog post because it breaks backpressure and could buffer bloat until the system fails.
In general it is an appropriate thing to expect people to subscribe to all groups, except in one case where it will be expected to work – using
filter
.In this case we can solve the common case by special-casing
filter
to be aware ofGroupedObservable
. It's not decoupled or elegant, but it solves the common problem.Thus, the trade-offs are:
Allow for non-deterministic data loss if
observeOn
/subscribeOn
are used and expect people to learn about this by reading docs.Behave deterministically when
observeOn
/subscribeOn
are used but block if groups are manually skipped.Option 2 seems to be easier for developers to run into during dev and solve than option 1 which could often show up randomly – in prod – and be difficult to figure out and solve.
The text was updated successfully, but these errors were encountered: