Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix to remove drift from schedulePeriodic #1162

Merged
merged 1 commit into from
May 6, 2014

Conversation

kirkshoop
Copy link
Member

When implementing the 'worker' scheduler pattern in rxcpp I implemented schedulePeriodic differently because I predicted that the RxJava version would drift. Once the changes were working, I verified my prediction in rxcpp.

I decided to test the same in RxJava and contribute my approach.

This was my test:

import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.schedulers.Schedulers;
import rx.functions.Action0;

public class periodic {

    public static void main(String[] args) {

        final Scheduler scheduler = Schedulers.immediate();
        final Scheduler.Worker w = scheduler.createWorker();

        final long initial = TimeUnit.SECONDS.toMillis(2);
        final long period = TimeUnit.SECONDS.toMillis(1);
        final long start = scheduler.now() + initial;

        w.schedulePeriodically(new Action0() {
            long count = 0;
            @Override
            public void call() {
                long tick = scheduler.now();
                System.out.println(String.format("expected -> %dms, actual -> %dms, drift -> %dms", count*period, tick - start, tick - (start + (count*period))));
                ++count;
            }
        }, initial, period, TimeUnit.MILLISECONDS);
    }

}

The existing impl causes this output:

$ java -cp rxjava-core/build/libs/rxjava-core-0.18.3-SNAPSHOT.jar:./ periodic
expected -> 0ms, actual -> 1ms, drift -> 1ms
expected -> 1000ms, actual -> 1002ms, drift -> 2ms
expected -> 2000ms, actual -> 2003ms, drift -> 3ms
expected -> 3000ms, actual -> 3004ms, drift -> 4ms
expected -> 4000ms, actual -> 4005ms, drift -> 5ms
expected -> 5000ms, actual -> 5006ms, drift -> 6ms
expected -> 6000ms, actual -> 6007ms, drift -> 7ms
expected -> 7000ms, actual -> 7008ms, drift -> 8ms
expected -> 8000ms, actual -> 8009ms, drift -> 9ms
expected -> 9000ms, actual -> 9010ms, drift -> 10ms
expected -> 10000ms, actual -> 10011ms, drift -> 11ms
expected -> 11000ms, actual -> 11012ms, drift -> 12ms
expected -> 12000ms, actual -> 12013ms, drift -> 13ms
expected -> 13000ms, actual -> 13014ms, drift -> 14ms
expected -> 14000ms, actual -> 14016ms, drift -> 16ms
expected -> 15000ms, actual -> 15017ms, drift -> 17ms
expected -> 16000ms, actual -> 16018ms, drift -> 18ms
expected -> 17000ms, actual -> 17019ms, drift -> 19ms
expected -> 18000ms, actual -> 18020ms, drift -> 20ms
expected -> 19000ms, actual -> 19021ms, drift -> 21ms
expected -> 20000ms, actual -> 20022ms, drift -> 22ms
expected -> 21000ms, actual -> 21023ms, drift -> 23ms
expected -> 22000ms, actual -> 22023ms, drift -> 23ms
expected -> 23000ms, actual -> 23024ms, drift -> 24ms

With this commit the test outputs:

$ java -cp ./:./rxjava-core/build/libs/rxjava-core-0.18.3-SNAPSHOT.jar periodic
expected -> 0ms, actual -> 3ms, drift -> 3ms
expected -> 1000ms, actual -> 1001ms, drift -> 1ms
expected -> 2000ms, actual -> 2001ms, drift -> 1ms
expected -> 3000ms, actual -> 3000ms, drift -> 0ms
expected -> 4000ms, actual -> 4001ms, drift -> 1ms
expected -> 5000ms, actual -> 5000ms, drift -> 0ms
expected -> 6000ms, actual -> 6000ms, drift -> 0ms
expected -> 7000ms, actual -> 7000ms, drift -> 0ms
expected -> 8000ms, actual -> 8001ms, drift -> 1ms
expected -> 9000ms, actual -> 9001ms, drift -> 1ms
expected -> 10000ms, actual -> 10001ms, drift -> 1ms
expected -> 11000ms, actual -> 11001ms, drift -> 1ms
expected -> 12000ms, actual -> 12001ms, drift -> 1ms
expected -> 13000ms, actual -> 13001ms, drift -> 1ms
expected -> 14000ms, actual -> 14001ms, drift -> 1ms
expected -> 15000ms, actual -> 15001ms, drift -> 1ms
expected -> 16000ms, actual -> 16001ms, drift -> 1ms
expected -> 17000ms, actual -> 17000ms, drift -> 0ms
expected -> 18000ms, actual -> 18000ms, drift -> 0ms
expected -> 19000ms, actual -> 19000ms, drift -> 0ms
expected -> 20000ms, actual -> 20000ms, drift -> 0ms
expected -> 21000ms, actual -> 21002ms, drift -> 2ms
expected -> 22000ms, actual -> 22000ms, drift -> 0ms
expected -> 23000ms, actual -> 23001ms, drift -> 1ms
expected -> 24000ms, actual -> 24001ms, drift -> 1ms

@cloudbees-pull-request-builder

RxJava-pull-requests #1077 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

Very nice, thanks @kirkshoop.

While considering this I though of something but I haven't played with this enough to know exactly how it will behave, but if someone chooses an interval of 1ms, and the work being done each time takes longer than 1ms, what is the expected behavior?

We should never have concurrent execution. If I understand correctly it will result in a negative delay and result in scheduling it immediately, but obviously have drift.

@benjchristensen
Copy link
Member

From what I can tell it does the right thing and just delays rather than allow concurrent execution.

public class periodic {

    public static void main(String[] args) {

        final Scheduler scheduler = Schedulers.immediate();
        final Scheduler.Worker w = scheduler.createWorker();

        final long initial = 100;
        final long period = 100;
        final long start = scheduler.now() + initial;

        w.schedulePeriodically(new Action0() {
            long count = 0;
            @Override
            public void call() {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                long tick = scheduler.now();
                System.out.println(String.format("expected -> %dms, actual -> %dms, drift -> %dms", count*period, tick - start, tick - (start + (count*period))));
                ++count;
            }
        }, initial, period, TimeUnit.MILLISECONDS);
    }

}
expected -> 0ms, actual -> 1503ms, drift -> 1503ms
expected -> 100ms, actual -> 3009ms, drift -> 2909ms
expected -> 200ms, actual -> 4511ms, drift -> 4311ms
expected -> 300ms, actual -> 6012ms, drift -> 5712ms
expected -> 400ms, actual -> 7513ms, drift -> 7113ms
expected -> 500ms, actual -> 9015ms, drift -> 8515ms

Based on this I'm merging this as it is safe in the degraded case while being better in the normal case (work being less time than the interval).

benjchristensen added a commit that referenced this pull request May 6, 2014
fix to remove drift from schedulePeriodic
@benjchristensen benjchristensen merged commit dded83c into ReactiveX:master May 6, 2014
@kirkshoop
Copy link
Member Author

Yes, that was the behavior before and I kept it. There were only two options and skipping a period seemed like the wrong policy to me. The user supplies both the function and the period and so they own the behavior (they can return immediately to skip and get back on track, or just ensure that the period was sufficient to begin with.

On May 6, 2014, at 9:11 AM, "Ben Christensen" <[email protected]mailto:[email protected]> wrote:

Very nice, thanks @kirkshoophttps://github.com/kirkshoop.

While considering this I though of something but I haven't played with this enough to know exactly how it will behave, but if someone chooses an interval of 1ms, and the work being done each time takes longer than 1ms, what is the expected behavior?

We should never have concurrent execution. If I understand correctly it will result in a negative delay and result in scheduling it immediately, but obviously have drift.

Reply to this email directly or view it on GitHubhttps://github.com//pull/1162#issuecomment-42323326.

@kirkshoop kirkshoop deleted the stableperiodic branch May 6, 2014 18:06
@benjchristensen
Copy link
Member

This is an excellent improvement. I'm working on something that does microsecond level scheduling, and this change makes it work very accurately:

########################################################################################
Step: 10  Interval: 111micros  Rate: 9000/s
########################################################################################

Total => Success: 315237 Error: 0   Last 10s => Success: 9001/s Error: 0/s    Latency => 50th: 0  90th: 0  99th: 0  100th: 0
Total => Success: 360280 Error: 0   Last 10s => Success: 9000/s Error: 0/s    Latency => 50th: 0  90th: 0  99th: 0  100th: 0
Total => Success: 405334 Error: 0   Last 10s => Success: 9001/s Error: 0/s    Latency => 50th: 0  90th: 0  99th: 0  100th: 0
Total => Success: 450373 Error: 0   Last 10s => Success: 9001/s Error: 0/s    Latency => 50th: 0  90th: 0  99th: 0  100th: 0

Without this change, it isn't even close:

########################################################################################
Step: 10  Interval: 111micros  Rate: 9000/s
########################################################################################

Total => Success: 239363 Error: 0   Last 10s => Success: 6835/s Error: 0/s    Latency => 50th: 0  90th: 0  99th: 0  100th: 0
Total => Success: 273742 Error: 0   Last 10s => Success: 6869/s Error: 0/s    Latency => 50th: 0  90th: 0  99th: 0  100th: 0
Total => Success: 308053 Error: 0   Last 10s => Success: 6863/s Error: 0/s    Latency => 50th: 0  90th: 0  99th: 0  100th: 0
Total => Success: 342820 Error: 0   Last 10s => Success: 6901/s Error: 0/s    Latency => 50th: 0  90th: 0  99th: 0  100th: 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants