Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New ExecutorScheduler Implementation #1219

Closed
benjchristensen opened this issue May 20, 2014 · 16 comments
Closed

New ExecutorScheduler Implementation #1219

benjchristensen opened this issue May 20, 2014 · 16 comments
Milestone

Comments

@benjchristensen
Copy link
Member

It turns out a lot of people used ExecutorScheduler despite its problems (#711 & #713). We need to bring it back, but in a way that is compliant with the contract.

This will mean that each Worker from the ExecutorScheduler will need to maintain it's own queue outside of the Executor (similar to observeOn) and then recurse on the Executor. Only a single task per Worker can be scheduled/enqueued on an Executor, and when it completes then it should recurse pulling items from the Worker queue. When the queue is empty it can stop processing. When a new task is enqueued on the Worker then it can schedule against for execution on the Executor.

We will be working against the default behavior of the Executor but need to do that to maintain the single-threaded contract of a Scheduler.Worker.

@chrisjenx
Copy link

@benjchristensen you've probably already investigated but does any of the quasar work help or the backported ForkJoin for JRE6?

@benjchristensen
Copy link
Member Author

My understanding of ForkJoin is that it would still have the same need for us to manage a queue externally, otherwise it will process the items on multiple threads concurrently if we put them into the ForkJoin queue.

@chrisjenx
Copy link

@benjchristensen your right....
So idea I have at present. Is you spawn a new SerialExecutor for each worker.

  • Pass in a Executor into the Scheduler.
  • Create Worker() this creates a SerialExecutor that sits on a Thread from the passed in Executor.
  • Process through all the actions, blocking until the Worker() completes.
  • Complete and return the Thread back to the pool.

Not sure if that would actually solve the problem, but from my understanding, is that we always need to serially execute queued up actions on any one worker.

@benjchristensen
Copy link
Member Author

That all sounds right except the "blocking until the Worker() completes" part. That would okay in Quasar, but not with native threads. It will need to behave like observeOn where it releases the thread (think of it as an event loop) and then reschedules back on a thread the next time it receives an onNext.

See observeOn here: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java#L98

@chrisjenx
Copy link

@benjchristensen yeah OK that makes sense.
I still think it should be noted that people should try to avoid Executors where possible. It should be a "when you really have too".

@akarnokd
Copy link
Member

I guess we need to make sure executing a tasks by the same worker shouldn't hop threads, right? I see a few problems:

  • Generally, threads can die from executing non RxJava tasks and get replaced by fresh threads so thread "hopping" is more likely to occur than with the standard schedulers.
  • Since most executors use a single work queue, work might be dequeued by any of the threads. Even if a work item can tell where it should run, one would need some mechanism to toss the work over to the target thread, at which point task reordering can happen.
    • Maybe a write-cursor and read-cursor pairs may help; plus each worker thread needs dedicated queue with every instance of Scheduler.Worker assigned to it. These queues may then be processed round-robin.
    • Even if this worker thread affinity is established, one would need constantly running tasks that poll these queues or blocking ones that get unblocked by yet another queue. Since we don't know how many threads there are in the Executor, it becomes difficult to schedule such processing routines if necessary, and even so, they will block out other tasks submitted from elsewhere to the same Executor.

Bottom line is, in my opinion, that there aren't any good ways to ensure worker affinity on an Executor whose internal threads and queueing we can't control.

@samueltardieu
Copy link
Contributor

As far as I am concerned, the needs I have is:

  • I have to serialize all my Bluetooth Low Energy communications on Android and requests can come from multiple threads, I'd like my observables to be scheduled one after the other.
  • In cgeo, I do not want more than 5 (or 10) concurrent HTTP download connections from Android, as I download large images and the memory pressure will be too high if I download more of them at the same time.

Is there a way to achieve those two goals easily without an executor?

@chrisjenx
Copy link

@samueltardieu This high jacks the issue somewhat but, the Computation Scheduler only runs as many concurrent threads as there are cores on the device. (2-4) for most Android Devices.

The whole point of the new Schedulers is that they execute in order which Executors are not guaranteed to do.

I would use either an IO or Trampoline Scheduler for your BTLE Comms, (Also remember to Bind the Observable if you're passing it off to the UI thread @mattias provided that one).
As for HTTP; Either use Computation Scheduler or Your own Scheduler, we solved the problem in Retrofit by using the Executor in the onSubscribe() method. See RxSupport.

@akarnokd
Copy link
Member

There have been a few PRs proposing a Scheduler with fixed thread count and another with thread-caching support. The latter can be extended to put an upper limit on the active thread count if necessary. But unfortunately, these efforts are blocked on the enhancement quest to add load-balancing to the base computation scheduler and the usual concerns about API size and features to expose. I've been fiddling with an idea for some time to ask for a contrib-experimental module where all these "outcasts" and other stuff may be put.

@chrisjenx
Copy link

@akarnokd I quite like that idea, keep the main lib lean. I'm sure a contrib-concurrency would not be overkill.
I mean I always use Proguard so bloat isn't so much of an issue for me, but it's great you guys are thinking about it.

@benjchristensen
Copy link
Member Author

All interested in this, please try out the new implementation via Schedulers.newExecutor as implemented by @akarnokd in #1272.

@benjchristensen
Copy link
Member Author

Should we rename Schedulers.newExecutor to something else? Reason I'm asking is that it doesn't actually create a new Executor like newThread does. It is a new Scheduler around a given Executor. Perhaps it is instead Schedulers.fromExecutor(Executor e) or just Schedulers.executor(Executor e)?

@headinthebox
Copy link
Contributor

I like Schedulers.from(Executor e)

@daschl
Copy link
Contributor

daschl commented May 28, 2014

@headinthebox +1 very consistent!

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue May 30, 2014
@benjchristensen
Copy link
Member Author

I believe this is done so closing out.

@cq321
Copy link

cq321 commented Oct 30, 2015

-keep class rx.internal.util.** { *; }
add it in proguard to resolve few problems

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants