Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-submission of CachedThreadScheduler #1276

Merged
merged 6 commits into from
May 30, 2014

Conversation

jbripley
Copy link
Contributor

Merged in the latest from the master branch and refactored some internals in CachedThreadScheduler to better match the latest Scheduler changes. Aimed at solving #1140

@cloudbees-pull-request-builder

RxJava-pull-requests #1174 SUCCESS
This pull request looks good

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/* package */class CachedThreadScheduler extends Scheduler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add final here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, wasn't sure how it should be, since there doesn't seem to be any standard to follow there. ImmediateScheduler is final, but not EventLoopsScheduler or NewThreadScheduler.

@akarnokd
Copy link
Member

Those should be final as well. I sometimes add them in one PR, forget to add them in another and at the end, a third PR gets merged without it. I plan to do another cleanup round before 1.0 so hopefully final will be applied uniformly.

@cloudbees-pull-request-builder

RxJava-pull-requests #1175 SUCCESS
This pull request looks good

@jbripley jbripley changed the title Re-submission of CachedThreadScheduler, implementing #1140 Re-submission of CachedThreadScheduler May 28, 2014
@benjchristensen
Copy link
Member

Testing with this code:

    public static void main(String[] args) {

        Observable.timer(0, 10, TimeUnit.MILLISECONDS).take(100).flatMap(i -> {
            return Observable.just(i).subscribeOn(Schedulers.io()).map(t -> {
                try {
                    Thread.sleep(10);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return t + "-" + Thread.currentThread();
            });
        }).toBlocking().forEach(System.out::println );

    }

Without this pull request it creates 100 threads:

0-Thread[RxNewThreadScheduler-1,5,main]
1-Thread[RxNewThreadScheduler-2,5,main]
2-Thread[RxNewThreadScheduler-3,5,main]
3-Thread[RxNewThreadScheduler-4,5,main]
4-Thread[RxNewThreadScheduler-5,5,main]
5-Thread[RxNewThreadScheduler-6,5,main]
6-Thread[RxNewThreadScheduler-7,5,main]
7-Thread[RxNewThreadScheduler-8,5,main]
8-Thread[RxNewThreadScheduler-9,5,main]
9-Thread[RxNewThreadScheduler-10,5,main]
10-Thread[RxNewThreadScheduler-11,5,main]
11-Thread[RxNewThreadScheduler-12,5,main]
12-Thread[RxNewThreadScheduler-13,5,main]
13-Thread[RxNewThreadScheduler-14,5,main]
14-Thread[RxNewThreadScheduler-15,5,main]
15-Thread[RxNewThreadScheduler-16,5,main]
16-Thread[RxNewThreadScheduler-17,5,main]
17-Thread[RxNewThreadScheduler-18,5,main]
18-Thread[RxNewThreadScheduler-19,5,main]
19-Thread[RxNewThreadScheduler-20,5,main]
20-Thread[RxNewThreadScheduler-21,5,main]
21-Thread[RxNewThreadScheduler-22,5,main]
22-Thread[RxNewThreadScheduler-23,5,main]
23-Thread[RxNewThreadScheduler-24,5,main]
24-Thread[RxNewThreadScheduler-25,5,main]
25-Thread[RxNewThreadScheduler-26,5,main]
26-Thread[RxNewThreadScheduler-27,5,main]
27-Thread[RxNewThreadScheduler-28,5,main]
28-Thread[RxNewThreadScheduler-29,5,main]
29-Thread[RxNewThreadScheduler-30,5,main]
30-Thread[RxNewThreadScheduler-31,5,main]
31-Thread[RxNewThreadScheduler-32,5,main]
32-Thread[RxNewThreadScheduler-33,5,main]
33-Thread[RxNewThreadScheduler-34,5,main]
34-Thread[RxNewThreadScheduler-35,5,main]
35-Thread[RxNewThreadScheduler-36,5,main]
36-Thread[RxNewThreadScheduler-37,5,main]
37-Thread[RxNewThreadScheduler-38,5,main]
38-Thread[RxNewThreadScheduler-39,5,main]
39-Thread[RxNewThreadScheduler-40,5,main]
40-Thread[RxNewThreadScheduler-41,5,main]
41-Thread[RxNewThreadScheduler-42,5,main]
42-Thread[RxNewThreadScheduler-43,5,main]
43-Thread[RxNewThreadScheduler-44,5,main]
44-Thread[RxNewThreadScheduler-45,5,main]
45-Thread[RxNewThreadScheduler-46,5,main]
46-Thread[RxNewThreadScheduler-47,5,main]
47-Thread[RxNewThreadScheduler-48,5,main]
48-Thread[RxNewThreadScheduler-49,5,main]
49-Thread[RxNewThreadScheduler-50,5,main]
50-Thread[RxNewThreadScheduler-51,5,main]
51-Thread[RxNewThreadScheduler-52,5,main]
52-Thread[RxNewThreadScheduler-53,5,main]
53-Thread[RxNewThreadScheduler-54,5,main]
54-Thread[RxNewThreadScheduler-55,5,main]
55-Thread[RxNewThreadScheduler-56,5,main]
56-Thread[RxNewThreadScheduler-57,5,main]
57-Thread[RxNewThreadScheduler-58,5,main]
58-Thread[RxNewThreadScheduler-59,5,main]
59-Thread[RxNewThreadScheduler-60,5,main]
60-Thread[RxNewThreadScheduler-61,5,main]
61-Thread[RxNewThreadScheduler-62,5,main]
62-Thread[RxNewThreadScheduler-63,5,main]
63-Thread[RxNewThreadScheduler-64,5,main]
64-Thread[RxNewThreadScheduler-65,5,main]
65-Thread[RxNewThreadScheduler-66,5,main]
66-Thread[RxNewThreadScheduler-67,5,main]
67-Thread[RxNewThreadScheduler-68,5,main]
68-Thread[RxNewThreadScheduler-69,5,main]
69-Thread[RxNewThreadScheduler-70,5,main]
70-Thread[RxNewThreadScheduler-71,5,main]
71-Thread[RxNewThreadScheduler-72,5,main]
72-Thread[RxNewThreadScheduler-73,5,main]
73-Thread[RxNewThreadScheduler-74,5,main]
74-Thread[RxNewThreadScheduler-75,5,main]
75-Thread[RxNewThreadScheduler-76,5,main]
76-Thread[RxNewThreadScheduler-77,5,main]
77-Thread[RxNewThreadScheduler-78,5,main]
78-Thread[RxNewThreadScheduler-79,5,main]
79-Thread[RxNewThreadScheduler-80,5,main]
80-Thread[RxNewThreadScheduler-81,5,main]
81-Thread[RxNewThreadScheduler-82,5,main]
82-Thread[RxNewThreadScheduler-83,5,main]
83-Thread[RxNewThreadScheduler-84,5,main]
84-Thread[RxNewThreadScheduler-85,5,main]
85-Thread[RxNewThreadScheduler-86,5,main]
86-Thread[RxNewThreadScheduler-87,5,main]
87-Thread[RxNewThreadScheduler-88,5,main]
88-Thread[RxNewThreadScheduler-89,5,main]
89-Thread[RxNewThreadScheduler-90,5,main]
90-Thread[RxNewThreadScheduler-91,5,main]
91-Thread[RxNewThreadScheduler-92,5,main]
92-Thread[RxNewThreadScheduler-93,5,main]
93-Thread[RxNewThreadScheduler-94,5,main]
94-Thread[RxNewThreadScheduler-95,5,main]
95-Thread[RxNewThreadScheduler-96,5,main]
96-Thread[RxNewThreadScheduler-97,5,main]
97-Thread[RxNewThreadScheduler-98,5,main]
98-Thread[RxNewThreadScheduler-99,5,main]
99-Thread[RxNewThreadScheduler-100,5,main]

With this pull request it uses only 2 threads:

0-Thread[RxCachedThreadScheduler-1,5,main]
1-Thread[RxCachedThreadScheduler-2,5,main]
2-Thread[RxCachedThreadScheduler-1,5,main]
3-Thread[RxCachedThreadScheduler-2,5,main]
4-Thread[RxCachedThreadScheduler-1,5,main]
5-Thread[RxCachedThreadScheduler-2,5,main]
6-Thread[RxCachedThreadScheduler-1,5,main]
7-Thread[RxCachedThreadScheduler-2,5,main]
8-Thread[RxCachedThreadScheduler-1,5,main]
9-Thread[RxCachedThreadScheduler-2,5,main]
10-Thread[RxCachedThreadScheduler-1,5,main]
11-Thread[RxCachedThreadScheduler-2,5,main]
12-Thread[RxCachedThreadScheduler-1,5,main]
13-Thread[RxCachedThreadScheduler-2,5,main]
14-Thread[RxCachedThreadScheduler-1,5,main]
15-Thread[RxCachedThreadScheduler-2,5,main]
16-Thread[RxCachedThreadScheduler-1,5,main]
17-Thread[RxCachedThreadScheduler-2,5,main]
18-Thread[RxCachedThreadScheduler-1,5,main]
19-Thread[RxCachedThreadScheduler-2,5,main]
20-Thread[RxCachedThreadScheduler-1,5,main]
21-Thread[RxCachedThreadScheduler-2,5,main]
22-Thread[RxCachedThreadScheduler-1,5,main]
23-Thread[RxCachedThreadScheduler-2,5,main]
24-Thread[RxCachedThreadScheduler-1,5,main]
25-Thread[RxCachedThreadScheduler-2,5,main]
26-Thread[RxCachedThreadScheduler-1,5,main]
27-Thread[RxCachedThreadScheduler-2,5,main]
28-Thread[RxCachedThreadScheduler-1,5,main]
29-Thread[RxCachedThreadScheduler-2,5,main]
30-Thread[RxCachedThreadScheduler-1,5,main]
31-Thread[RxCachedThreadScheduler-2,5,main]
32-Thread[RxCachedThreadScheduler-1,5,main]
33-Thread[RxCachedThreadScheduler-2,5,main]
34-Thread[RxCachedThreadScheduler-1,5,main]
35-Thread[RxCachedThreadScheduler-2,5,main]
36-Thread[RxCachedThreadScheduler-1,5,main]
37-Thread[RxCachedThreadScheduler-2,5,main]
38-Thread[RxCachedThreadScheduler-1,5,main]
39-Thread[RxCachedThreadScheduler-2,5,main]
40-Thread[RxCachedThreadScheduler-1,5,main]
41-Thread[RxCachedThreadScheduler-2,5,main]
42-Thread[RxCachedThreadScheduler-1,5,main]
43-Thread[RxCachedThreadScheduler-2,5,main]
44-Thread[RxCachedThreadScheduler-1,5,main]
45-Thread[RxCachedThreadScheduler-2,5,main]
46-Thread[RxCachedThreadScheduler-1,5,main]
47-Thread[RxCachedThreadScheduler-2,5,main]
48-Thread[RxCachedThreadScheduler-1,5,main]
49-Thread[RxCachedThreadScheduler-2,5,main]
50-Thread[RxCachedThreadScheduler-1,5,main]
51-Thread[RxCachedThreadScheduler-2,5,main]
52-Thread[RxCachedThreadScheduler-1,5,main]
53-Thread[RxCachedThreadScheduler-2,5,main]
54-Thread[RxCachedThreadScheduler-1,5,main]
55-Thread[RxCachedThreadScheduler-2,5,main]
56-Thread[RxCachedThreadScheduler-1,5,main]
57-Thread[RxCachedThreadScheduler-2,5,main]
58-Thread[RxCachedThreadScheduler-1,5,main]
59-Thread[RxCachedThreadScheduler-2,5,main]
60-Thread[RxCachedThreadScheduler-1,5,main]
61-Thread[RxCachedThreadScheduler-2,5,main]
62-Thread[RxCachedThreadScheduler-1,5,main]
63-Thread[RxCachedThreadScheduler-2,5,main]
64-Thread[RxCachedThreadScheduler-1,5,main]
65-Thread[RxCachedThreadScheduler-2,5,main]
66-Thread[RxCachedThreadScheduler-1,5,main]
67-Thread[RxCachedThreadScheduler-2,5,main]
68-Thread[RxCachedThreadScheduler-1,5,main]
69-Thread[RxCachedThreadScheduler-2,5,main]
70-Thread[RxCachedThreadScheduler-1,5,main]
71-Thread[RxCachedThreadScheduler-2,5,main]
72-Thread[RxCachedThreadScheduler-1,5,main]
73-Thread[RxCachedThreadScheduler-2,5,main]
74-Thread[RxCachedThreadScheduler-1,5,main]
75-Thread[RxCachedThreadScheduler-2,5,main]
76-Thread[RxCachedThreadScheduler-1,5,main]
77-Thread[RxCachedThreadScheduler-2,5,main]
78-Thread[RxCachedThreadScheduler-1,5,main]
79-Thread[RxCachedThreadScheduler-2,5,main]
80-Thread[RxCachedThreadScheduler-1,5,main]
81-Thread[RxCachedThreadScheduler-2,5,main]
82-Thread[RxCachedThreadScheduler-1,5,main]
83-Thread[RxCachedThreadScheduler-2,5,main]
84-Thread[RxCachedThreadScheduler-1,5,main]
85-Thread[RxCachedThreadScheduler-2,5,main]
86-Thread[RxCachedThreadScheduler-1,5,main]
87-Thread[RxCachedThreadScheduler-2,5,main]
88-Thread[RxCachedThreadScheduler-1,5,main]
89-Thread[RxCachedThreadScheduler-2,5,main]
90-Thread[RxCachedThreadScheduler-1,5,main]
91-Thread[RxCachedThreadScheduler-2,5,main]
92-Thread[RxCachedThreadScheduler-1,5,main]
93-Thread[RxCachedThreadScheduler-2,5,main]
94-Thread[RxCachedThreadScheduler-1,5,main]
95-Thread[RxCachedThreadScheduler-2,5,main]
96-Thread[RxCachedThreadScheduler-1,5,main]
97-Thread[RxCachedThreadScheduler-2,5,main]
98-Thread[RxCachedThreadScheduler-1,5,main]
99-Thread[RxCachedThreadScheduler-2,5,main]

@benjchristensen
Copy link
Member

Thanks @jbripley for this!

benjchristensen added a commit that referenced this pull request May 30, 2014
Re-submission of CachedThreadScheduler
@benjchristensen benjchristensen merged commit 47ea6c8 into ReactiveX:master May 30, 2014
@jbripley jbripley deleted the cachedthreadscheduler2 branch May 30, 2014 16:35
@jbripley
Copy link
Contributor Author

Thanks for all the pointers @akarnokd. I've learned a lot about both thread safe Java and how the Scheduler code in RxJava works.

@akarnokd
Copy link
Member

You are welcome.

@benjchristensen benjchristensen mentioned this pull request Jun 1, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants