Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable#repeatWhen #2889

Closed
NachoSoto opened this issue Apr 18, 2015 · 10 comments
Closed

Observable#repeatWhen #2889

NachoSoto opened this issue Apr 18, 2015 · 10 comments
Labels
Milestone

Comments

@NachoSoto
Copy link

I'm trying to leverage the repeatWhen operator, but its behavior doesn't seem to be following the documentation.
I'm probably not using it correctly, but I can't see exactly what I'm doing wrong, and I also noticed that this operator is lacking test coverage.

final AtomicInteger i = new AtomicInteger(0);
final Observable<?> timer = Observable.timer(100, TimeUnit.MILLISECONDS)
        .take(6)
        .cache();

final Observable<Integer> result = Observable.defer(() -> Observable.just(i.getAndIncrement()))
        .repeatWhen(observable -> timer)
        .cache();

result
        .subscribe(System.out::println);

// wait for result to complete.
result
        .toList()
        .toBlocking()
        .first();

I would expect this to print 0, 1, 2, 3, 4, 5 and then complete, but instead result is only emitting 0 and completing.

Note that what I'm trying to accomplish is more complex than this (this example in particular could be implemented with timer() + map()).
In my example the observable returned from repeatWhen is a subject to which I send values to make the resulting Observable repeat itself, but I simplified this for illustration purposes.

Could somebody point to what I'm doing wrong, or whether there's a better way to implement what I described?

Thank you.

@wendigo
Copy link

wendigo commented Apr 18, 2015

Your first observable does not emmit onError notification so basically repeatWhen will never kick in ;) (it will resubscribe only onError notification)

@NachoSoto
Copy link
Author

Isn't that only relevant for retryWhen?

In any case, I reaized that a much simpler solution for my problem is using flatMap, though I'd still like to understand the semantics of this operator.

Thanks for your reply!

@akarnokd akarnokd added the Bug label Apr 20, 2015
@akarnokd
Copy link
Member

This appears to be a bug with repeatWhen (I can't find any unit test for this operator).

Edit: quick clarification: Observable.timer(100, TimeUnit.MILLISECONDS) fires only once so you should see only 1 repetition.

@akarnokd
Copy link
Member

I've looked into repeatWhen and it is unclear to me how the returned observable from the user function should affect the resubscription. It seems you can't dismiss the incoming observable and return something independent because the returned observable timer will be subscribed to immediately. My guess is that you need to map/flatMap over the observable to return a possible delay for the resubscription:

AtomicInteger c = new AtomicInteger();
Observable.just(1)
.repeatWhen(o -> o.flatMap(v -> {
    if (c.getAndIncrement() == 0) {
        return Observable.just(1);
    }
    return Observable.empty();
}))
.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));

But this never prints "Done".

/cc @benjchristensen @stealthcode

@alcarvalho
Copy link

@akarnokd, for me your example is working as expected.

If you flatMap the observable o given by repeatWhen, it will emit all observables inside it but will only complete when o completes. If you keep feeding it with empty observers it won't complete. But it won't repeat, because it only repeats if you emit something.

This will print "Done", but only one "1", of course:

AtomicInteger c = new AtomicInteger();
Observable.just(1)
.repeatWhen(o -> o.flatMap(v -> {
    if (c.getAndIncrement() == 0) {
        return Observable.just(1);
    }
    return Observable.empty();
}).take(1))
.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));

Unless I'm missing something, that's the intended behavior, right?

@stealthcode
Copy link

@NachoSoto Your usage of repeatWhen should map the given observable and return it. The reason you are not seeing any repeats is because you are throwing away the feedback loop and we are unable to subscribe to it. Instead we are subscribing to your timer observable. Here is a functioning example with the desired delay.

        CountDownLatch l = new CountDownLatch(50);
        Observable.defer(() -> Observable.just(i.getAndIncrement()))
                .repeatWhen(observable -> observable.delay(100, TimeUnit.MILLISECONDS))
                .subscribe((t) -> {
                    System.out.println(t);
                   l.countDown(); 
                });
        l.await();

@stealthcode
Copy link

Oh and one more thing. You can get the desired effect of final Observable<?> timer = Observable.timer(100, TimeUnit.MILLISECONDS).take(6) with the following.

CountDownLatch l = new CountDownLatch(50);
Observable.defer(() -> Observable.just(i.getAndIncrement()))
        .repeatWhen(observable -> observable.delay(100, TimeUnit.MILLISECONDS).take(6))
        .subscribe((t) -> {
            System.out.println(t);
            l.countDown(); 
        });
l.await();

output

0
1
2
3
4
5
6

@stealthcode
Copy link

@akarnokd The case you point out seems like a bug to me. The final subscriber after the repeatWhen should always receive the onCompleted. Emitting an empty observable from the notificationHandler in a repeatWhen will propagate the onCompleted event to the child subscriber effectively terminating the observable chain. Its interesting that when I change the flatMap to a takeWhile the subscriber does execute it's onCompleted.

AtomicInteger c = new AtomicInteger();
Observable.just(1)
    .repeatWhen(o -> o.takeWhile((v -> c.getAndIncrement() == 0)))
    .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));

output

1
1
Done

@stealthcode
Copy link

/cc @davidmoten is there a unit test covering this use case in #2997?

@benjchristensen benjchristensen added this to the 1.0.x milestone Aug 28, 2015
@akarnokd akarnokd added Question and removed Bug labels Mar 13, 2016
@akarnokd
Copy link
Member

I now understand the intent behind this operator and @stealthcode 's example with delay works as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants