Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the issue that Sample doesn't call 'unsubscribe' #1965

Merged
merged 1 commit into from
Dec 15, 2014

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Dec 12, 2014

Fix #1958

@akarnokd
Copy link
Member

Unfortunately, this makes the following program hang:

Observable.range(0, 100).subscribeOn(Schedulers.io())
        .concatMap(e -> Observable.timer(100, TimeUnit.MILLISECONDS).map(f -> e))
        .sample(1000, TimeUnit.MILLISECONDS)
        .delay(1000, TimeUnit.MILLISECONDS)
        .toBlocking().forEach(System.out::println);
        System.out.println("Done");

Having:

        SamplerSubscriber<T> sampler = new SamplerSubscriber<T>(s);
        child.add(sampler); // unsubscribe goes only upstream this way.
        worker.schedulePeriodically(sampler, time, time, unit);

Makes the example work.

@zsxwing zsxwing force-pushed the fix-sample-unsubscribe branch from 6c958e3 to 522ce79 Compare December 12, 2014 15:52
@zsxwing
Copy link
Member Author

zsxwing commented Dec 12, 2014

I see, it doesn't work with delay. So if an operator calls unsubscribe explicitly, it should not use new Subscriber(Subscriber). Right?

@benjchristensen
Copy link
Member

Correct, it should not chain them together as it will unsubscribe up and down the stream. If it wants to unsubscribe up then it must decouple the chain.

@benjchristensen
Copy link
Member

I have tested this code locally and it is working. The code was force-pushed to match the feedback from @akarnokd.

Thanks @zsxwing

benjchristensen added a commit that referenced this pull request Dec 15, 2014
Fix the issue that Sample doesn't call 'unsubscribe'
@benjchristensen benjchristensen merged commit 1d487a3 into ReactiveX:1.x Dec 15, 2014
@zsxwing zsxwing deleted the fix-sample-unsubscribe branch December 15, 2014 04:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants