Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replay(bufferSize, time, TimeUnit) not working - truncate should be called with replay? #3917

Closed
jam01 opened this issue May 6, 2016 · 7 comments

Comments

@jam01
Copy link

jam01 commented May 6, 2016

The buffer only gets truncated when a new item is added. So any subscribers will always receive that last emission even if it's older than the time-limit.

    'Observable<Long> retrofit = Observable.interval(2L, 2L, TimeUnit.SECONDS).take(10);

    ConnectableObservable<Long> connectableRetrofit;
    connectableRetrofit = retrofit.replay(1, 5, TimeUnit.SECONDS);


    System.out.println("Subscribing A");
    connectableRetrofit.subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println("subscriber! A ========== " + aLong);
        }
    });

    System.out.println("Connecting!");
    connectableRetrofit.connect();

    Thread.sleep(10000L);


    System.out.println("Subscribing B");
    connectableRetrofit.subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println("subscriber! B ========== " + aLong);
        }
    });

    Thread.sleep(30000L);


    System.out.println("Subscribing C");
    connectableRetrofit.subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println("subscriber! C ========== " + aLong);
        }
    });`

Here subscriber C is subbing after 30 secs so you'd expect the buffer to have truncated the last value since the time-limit is 5 secs.

This is the output:

Subscribing A
Connecting!
subscriber! A ========== 0
subscriber! A ========== 1
subscriber! A ========== 2
subscriber! A ========== 3
subscriber! A ========== 4
Subscribing B
subscriber! B ========== 4
subscriber! A ========== 5
subscriber! B ========== 5
subscriber! A ========== 6
subscriber! B ========== 6
subscriber! A ========== 7
subscriber! B ========== 7
subscriber! A ========== 8
subscriber! B ========== 8
subscriber! A ========== 9
subscriber! B ========== 9
Subscribing C
subscriber! C ========== 9

@akarnokd
Copy link
Member

akarnokd commented May 7, 2016

True, replay starts from the current head which is only moved forward by onNext. I've been thinking about rewriting ReplaySubject to support backpressure; in that structure, when the child Subscriber picks up the head, we can skip old entries right there.

Question is, do we want this behavior change and whether the current behavior is expected by unit tests?

@jam01
Copy link
Author

jam01 commented May 7, 2016

Good point. I think the current behavior is a conflict with the method description/purpose. So I'd say change, if not at least an update to the documentation. Maybe deprecate the current one? Being new to Rx I spent quite a while trying to figure out why this wasn't working.

@jam01
Copy link
Author

jam01 commented May 7, 2016

Also, could you recommend a workaround to achieve that behavior? Basically I wanted to do something like replay a network call within a specific time, and if it came out empty I'd repeat the call.

@akarnokd
Copy link
Member

akarnokd commented May 8, 2016

You can timestamp values before it reaches the ReplaySubject then use skipWhile(n -> n.timestamp < now - maxAge).

Otherwise, see #3918 for an updated ReplaySubject where stale data is skipped on subscription.

@akarnokd
Copy link
Member

Sorry, I completely forgot about this. I'll post an PR to replay() and see what it takes to skip old entries.

@akarnokd
Copy link
Member

See #4023 for a proposed fix for replay() itself.

@akarnokd
Copy link
Member

Closing via #4023.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants