Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x and 2.x - Schedulers.io() fails to produce new thread when needed, leading to deadlock #7153

Closed
sgc-code opened this issue Jan 21, 2021 · 5 comments · Fixed by #7160
Closed

Comments

@sgc-code
Copy link
Contributor

I've tested this with the latest versions of 3.x and 2.x (v3.0.9 and v2.2.20 at the time)

If my understanding is correct, Schedulers.io() should create a new thread if all the other threads are busy. This is not happening here, and it leads to a deadlock.

Flowable.just("item")
        .observeOn(Schedulers.io())
        .firstOrError()
        .subscribe(__ -> {
            for (int i = 0; i < 200; i++) {
                Completable.complete()
                        .observeOn(Schedulers.io())
                        .blockingAwait();
            }
            System.out.println("Will never reach this");
        });

I managed to simplify our code to this minimal reproducible example. I'm aware that .blockingAwait() doesn't make sense here but it does in the original code

The issue is easier to reproduce if the IO thread pool contains just a few threads, but it can still happen with bigger pools.

I think it can be related to some operator releasing the worker too soon. It will work fine if I replace the firstOrError operator like this

Single.just("item")
        .observeOn(Schedulers.io())
        .subscribe(__ -> {
            for (int i = 0; i < 200; i++) {
                Completable.complete()
                        .observeOn(Schedulers.io())
                        .blockingAwait();
            }
            System.out.println("NO problem here");
        });

Any help with this is much appreciated.
Cheers

@sgc-code sgc-code changed the title 3.x and 2.x Schedulers.io() fails to produce new thread when needed 3.x and 2.x - Schedulers.io() fails to produce new thread when needed Jan 21, 2021
@sgc-code sgc-code changed the title 3.x and 2.x - Schedulers.io() fails to produce new thread when needed 3.x and 2.x - Schedulers.io() fails to produce new thread when needed, leading to deadlock Jan 21, 2021
@akarnokd
Copy link
Member

Hi. Indeed the operator releases the worker and then blocks on it, preventing any progress when the worker is reused. The only workaround is to not block on the code path and avoid blocking inside flows in general.

The current behavior is a trade-off. In some cases, a late release would lead to excess thread creation.

@sgc-code
Copy link
Contributor Author

Thanks for your feedback @akarnokd,

As a workaround, I did this modification in IoScheduler.EventLoopWorker.dispose() (in v2.x)

@Override
public void dispose() {
    if (once.compareAndSet(false, true)) {
        if (delayedDisposal) {
            // Do not return the thread to the pool until we are sure that it's ready to reuse
            threadWorker.scheduleActual(this::performDispose, 0, TimeUnit.MILLISECONDS, tasks);
        } else {
            performDispose();
        }
    }
}

private void performDispose() {
    tasks.dispose();
    pool.release(threadWorker);
}

I ran some tests in a complex app and the number of total threads created was similar.

I see your point but with the trade-off. But in my opinion having no deadlocks is more important than a temporary thread excess. Or maybe it's something that we can allow to be configured externally.

Regarding my workaround, I'd really appreciate your opinion on the code as I'm not very familiar with the library internals.

Thanks!

@akarnokd
Copy link
Member

having no deadlocks

There is no reason to use RxJava in such a blocking fashion; there are operators that can get you in-sequence execution.

important than a temporary thread excess

The problem was the sustained creation of excess threads as the scheduled release couldn't keep up.

At best, we might add a system parameter to enable delayed release, similar to your code. However, tasks.dispose() has to happen right there so that tasks blocked can get interrupted and let the release task run.

@sgc-code
Copy link
Contributor Author

Thanks for the feedback.

Blocking operators are frequently abused. And it's usually a red flag when you are reviewing code.
But I think there are plenty of valid use cases for them. As they are the only way out of the reactive world.
My expectation was that, since those operators are part of the library, they won't cause any issues.

Let me try to explain why I was blocking in the first place. Here it's some pseudo-code to illustrate my point:

interface Service {
    val currentStatus: Status
    fun observeStatus() : Flowable<Status>
}

class ServiceImpl() : Service {

    override val currentStatus(): Status
        get() = observeStatus().blockingFirst()

    override fun observeStatus() : Flowable<Status> {
        return combineLatest(....)
            .map { ... }
            .flatmap { ... }
    }
}
  • I need to implement a service that conforms to some interface. And that interface exposes the current status in a non-reactive way.
    Some could argue that the interface shouldn't do that. But, what if you can't change the interface, what if it's used by some other team, library, or just another part of your code that is not reactive yet. Changing the interface is not possible all the time

  • The other option would be to change the implementation. So instead of blocking, I could rewrite the method to do the same as observeStatus() but in a non-reactive way. But, what if observeStatus() itself depends on some other reactive code? I'd need to rewrite that too, or use the blocking operators.
    Rewriting a lot of code like this will be error-prone and a maintenance nightmare. Therefore changing the implementation is also not an option.

For those two reasons I don't see how blocking can be avoided here.


Regarding the workaround. I really appreciate your suggestion.
I think this can be the final version of if, if someone is interested:

@Override
public void dispose() {
    if (once.compareAndSet(false, true)) {
        tasks.dispose();
        pool.release(threadWorker);
        // Schedule actual disposal so we don't reuse this worker until the processing is completed
        threadWorker.scheduleDirect(this::releaseToPool, 0, TimeUnit.MILLISECONDS);
    }
}

private void releaseToPool() {
    pool.release(threadWorker);
}

I run many unit tests, and I've also tested with a real-world app, and I didn't see a noticeable increase of threads.

I'd like to do a PR with a flag to turn it on, but I'm not sure about the impact on other places, or maybe the same logic should be applied to other Schedulers as well, it doesn't seem specific to the IoScheduler.

Thanks for taking a look at this

@akarnokd
Copy link
Member

This release only affects the IO scheduler.

Alright, you can post a PR along these lines:

  • You can add a property KEY_SCHEDULED_RELEASE with string rx3.io-scheduled-release here and a static final boolean USE_SCHEDULED_RELEASE.
  • Load it in the static block with default false.
  • Have EventLookWorker extend Runnable.
  • You can then change the release logic here
@Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                if (USE_SCHEDULED_RELEASE) {
                     threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null);
                } else {
                     // releasing the pool should be the last action
                     pool.release(threadWorker);
                }
            }
        }

        @Override
        public void run() {
            pool.release(threadWorker);
        }
  • Insert the new property description here and here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants