-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: add Flowable.parallel() and parallel operators #4974
Conversation
Current coverage is 94.94% (diff: 74.69%)@@ 2.x #4974 diff @@
==========================================
Files 592 609 +17
Lines 37969 39186 +1217
Methods 0 0
Messages 0 0
Branches 5752 5968 +216
==========================================
+ Hits 36273 37204 +931
- Misses 741 955 +214
- Partials 955 1027 +72
|
I'll restore the +95% coverage in a separate PR. |
I have a use case that could benefit from this depending on how it is implemented. I don't see the API for Let me describe the type of parallel processing and see if your goal of 100s of network connections, each spread across n event loops (say 16). The semantic behavior is to merge the 100s of connections into a single stream, then do In theory, a Is this the type of thing you want |
@benjchristensen No. ParallelFlowable optimizes for a fixed parallelism level with round-robin dispatch and round-robin join. The closest thing is the |
Too bad. Maybe someday I'll get around to making a "ConcurrentFlowable" happen ... but it's been on my todo list for 3 years, so not counting on it :-) |
@JakeWharton Do you want to review this or if not, do you at least willing to accept it into RxJava 2? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few nits.
return; | ||
} | ||
|
||
int n = subscribers.length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same in other similar places would be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't do those unless the variable has to be accessed from an inner class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In long methods reader has to spend extra time to check that it's not modified anywhere, but ok
/** | ||
* Flattens the generated Publishers on each rail. | ||
* | ||
* @param <T> the input value type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: naming generic type parameters Input
and Output
would remove such comments and make code slightly more readable. Or I
and O
as a reference to common I/O
abbr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an established naming pattern with other generic types of RxJava.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, "just saying"
@@ -0,0 +1,51 @@ | |||
/** | |||
* Copyright 2016 Netflix, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(c) 2016-present, RxJava Contributors
here and in all other files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update the PR.
} else { | ||
SimpleQueue<T> q = inner.getQueue(); | ||
|
||
// FIXME overflow handling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signal MBE? When do you plan to resolve FIXME? It'll lead to silently dropped values…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding it right now.
} catch (Throwable ex) { | ||
Exceptions.throwIfFatal(ex); | ||
RxJavaPlugins.onError(ex); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add return
to avoid request in case of error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlowableDoOnLifecycle
doesn't return either. There is no good way to report an error and not inject a lot of overhead. See strict()
.
* times as this ParallelFlowable's parallelism level is. | ||
* <p> | ||
* No assumptions are made about the Scheduler's parallelism level, | ||
* if the Scheduler's parallelism level is lwer than the ParallelFlowable's, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lower
* times as this ParallelFlowable's parallelism level is. | ||
* <p> | ||
* No assumptions are made about the Scheduler's parallelism level, | ||
* if the Scheduler's parallelism level is lwer than the ParallelFlowable's, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lower
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing.
@BackpressureSupport(BackpressureKind.FULL) | ||
@SchedulerSupport(SchedulerSupport.NONE) | ||
@CheckReturnValue | ||
public final Flowable<T> sequential() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be a verb: sequentize()
/etc to be consistent with other operators (which are verbs mostly).
Btw, reading chains like:
Flowable.range(1, 100)
.parallel(10)
.runOn(Schedulers.io())
.map(v -> httpClient.blockingGet("http://server/item/" + v))
.sequential()
feels strange because sequential
after parallel
looks like an operator that disables parallelization of the chain (of course it can't, but I dunno, it just reads strange to me).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tis naming matches Java 8 Stream's parallel()
and sequential()
operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, though JDK is not the best example of good naming.
* @return the new Px instance | ||
*/ | ||
@CheckReturnValue | ||
public final Flowable<T> sorted(Comparator<? super T> comparator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matches the naming of Flowable.sorted()
.
What about tests and benchmark comparisons with parallelization that you can achieve at the moment, using existing RxJava apis? |
* and dispatches the upstream items to them in a round-robin fashion. | ||
* <p> | ||
* Note that the rails don't execute in parallel on their own and one needs to | ||
* apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about remove runOn
and add Scheduler
as a parameter to parallel()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the same logic as with regular factory methods such as just
, range
, fromIterable
don't take a Scheduler
, plus you can apply multiple runOn
's on a sequence at different stages. For example create a pipeline with stages of parallelism=2 and 3 stages in total.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plus you can apply multiple
runOn's
on a sequence at different stages
Ah, that's nice, got it.
8c009a9
to
14111b6
Compare
I've added a benchmark and here are the results (i7 4770K, Windows 7 x64, Java 8u112): Clearly, parallel has lower overhead than Comparing against |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I like it.
requested.addAndGet(-e); | ||
} | ||
|
||
int w = get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't seen this optimization before with missed
(calling get
before addAndGet
). Is this particular to the ParallelJoin
use case or do you expect to start applying it elsewhere too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of places which uses this pattern: range, observeOn, fromArray.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not going to apply them eagerly because it is another local variable/register to worry about when there are lots of other locals in deeper/user code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok thanks
Updated the For smaller computation, parallel has less overhead. For longer computation, they are roughly next to each other. Parallel uses round-robin collection whereas flatMap collects from a source as long as it can. |
This PR adds the
parallel()
method toFlowable
which opens up a sub-DSL with parallel operations. (Note that only a few operators make sense in a parallel settings.)This parallel sub-DSL is not limited to computation tasks as it allows specifying the parallelism and the
Scheduler
to run the parallel 'rails'. For example, you can have parallel downloads that block: