Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow scaling executors to reject tasks after shutdown #81856

Conversation

tlrx
Copy link
Member

@tlrx tlrx commented Dec 17, 2021

Today scaling thread pools never reject tasks but always add them to the queue of task the execute, even in the case the thread pool executor is shutting down or terminated. This behaviour does not work great when a task is blocked waiting for another task from another scaling thread pool to complete an I/O operation which will never be executed if the task was enqueued just before the scaling thread pool was shutting down.

This situation is more likely to happen with searchable snapshots in which multiple threads can be blocked waiting for parts of Lucene files to be fetched and made available in cache. We saw tests failures in CI where Lucene 9 uses concurrent threads (to asynchronously checks indices) that were blocked waiting for cache files to be available and failing because of leaking files handles (see #77017, #77178).

This pull request changes the ForceQueuePolicy used by scaling thread pools so that it now accepts a rejectAfterShutdown flag which can be set on a per thread pool basis to indicate when tasks should just be rejected once the thread pool is shut down. Because we rely on many scaling thread pools to be black holes and never reject tasks, this flag is set to false on most of them to keep the current behavior. In some cases where the rejection logic was already implemented correctly this flag has been set to true.

This pull request also reimplements the interface XRejectedExecutionHandler into an abstract class EsRejectedExecutionHandler that allows to share some logic for rejections.

@elasticmachine elasticmachine added the Team:Core/Infra Meta label for core/infra team label Dec 17, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra (Team:Core/Infra)

@elasticsearchmachine
Copy link
Collaborator

Hi @tlrx, I've created a changelog YAML for you.

@tlrx tlrx requested a review from henningandersen December 17, 2021 20:01
Copy link

@MaratCrash MaratCrash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tlrx great PR, thank you!

@@ -210,7 +210,7 @@ private static void logAndFailTest(Exception e) {
private final ThreadPool threadPool = new TestThreadPool(
"TrackedCluster",
// a single thread for "client" activities, to limit the number of activities all starting at once
new ScalingExecutorBuilder(CLIENT, 1, 1, TimeValue.ZERO, CLIENT)
new ScalingExecutorBuilder(CLIENT, 1, 1, TimeValue.ZERO, true, CLIENT)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, can we import static constant here and use ZERO instead TimeValue.ZERO? The code will be cleaner. But its just my opinion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep it the way it is :)

}

protected final EsRejectedExecutionException newRejectedException(Runnable r, ThreadPoolExecutor executor, boolean isExecutorShutdown) {
rejected.inc();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what purpose the inc() method is called here? I mean the method name is newRejectedException, but in the method implementation we also have an increment. I think its not so clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I pushed 6e2d87a

builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(
Names.GENERIC,
new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these values (4 and 30) to constants please?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does not change that so I would prefer to leave this as is. We would have to make all the numbers here constants if we did this and I think that would make the code here harder to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Henning here.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I left a few comments.


@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (rejectAfterShutdown && executor.isShutdown()) {
throw newRejectedException(r, executor, true);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a potential for a race condition here, though very unlikely? If we get into rejectedExecution due to all threads active but pool is shutdown here and then all threads go inactive too before we put the runnable on the queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the code again I think you are right. I pushed a939ea6 to add another test that executes tasks while concurrently shutting down the executor, making it more likely to have the race condition (on slow machines, it only fails 1 on 10K on my workstation though).

I reworked the logic in 0d89228 to avoid the race condition, let me know what you think.

builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(
Names.GENERIC,
new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does not change that so I would prefer to leave this as is. We would have to make all the numbers here constants if we did this and I think that would make the code here harder to read.

@@ -561,19 +561,25 @@ protected XPackLicenseState getLicenseState() {

public static ScalingExecutorBuilder[] executorBuilders(Settings settings) {
final int processors = EsExecutors.allocatedProcessors(settings);
// searchable snapshots cache thread pools should always reject tasks once they are shutting down, otherwise some threads might be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you look into adding a test provoking this specific issue consistently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reproduced the issue in a test but it did not fail consistently and was way much too complex to maintain, while the current black hole behavior is more easily reproducible consistently in a thread pool test, so I went this way.

@@ -190,6 +191,7 @@ public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws Interru
1,
10,
TimeUnit.SECONDS,
false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can check both true and false here?

Suggested change
false,
randomBoolean(),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I changed this in ad729c9

for (int i = 0; i < queuedAfterShutdown; i++) {
execute(scalingExecutor, () -> {}, executed, rejected, failed);
}
assertThat(scalingExecutor.getQueue().size(), rejectAfterShutdown ? equalTo(queued) : equalTo(queued + queuedAfterShutdown));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also validate that rejected has 0 or queuedAfterShutdown dependent on rejectAfterShutdown here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I changed this in ad729c9


block.countDown();

assertBusy(() -> assertTrue(scalingExecutor.isTerminated()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also verify adding new tasks after termination are rejected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++, changed in ad729c9

@tlrx tlrx requested a review from henningandersen January 11, 2022 07:57
@tlrx
Copy link
Member Author

tlrx commented Jan 11, 2022

Sorry for the delay @henningandersen, the race condition took me some time to fix. Can you please have another look? Thanks

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I left a few smaller comments, but no need for another round.

assert executor.getQueue() instanceof ExecutorScalingQueue;
executor.getQueue().put(r);
assert queue instanceof ExecutorScalingQueue;
queue.put(task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should check if it is shutdown prior to adding to the queue (in addition to the check after adding it)? That would avoid the risk of the task being picked up during shutdown and only leave this "risk" for concurrent races, where it would be perfectly OK to run the task.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I changed that in 3a4e5f4

Comment on lines 343 to 344
if (rejectAfterShutdown) {
if (executor.isShutdown() && executor.remove(task)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would find this slightly more logical as:

Suggested change
if (rejectAfterShutdown) {
if (executor.isShutdown() && executor.remove(task)) {
if (rejectAfterShutdown && executor.isShutdown()) {
if (executor.remove(task)) {

since that seems to be the special case. With the remove being mutating I like that in its own condition to not think about and/or and order of evaluation, but could also collapse into one if statement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more logical I agree. (I collapsed the statements)

}

protected final EsRejectedExecutionException newRejectedException(Runnable r, ThreadPoolExecutor executor, boolean isExecutorShutdown) {
return new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, isExecutorShutdown);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add info to the exception message about it being shutdown when isExecutorShutdown=true? I think that logging the exception will not show the flag otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added (shutdown) in the message for that.


final Matcher<Long> executionsMatcher = rejectAfterShutdown
? equalTo((long) max + queued)
: greaterThanOrEqualTo((long) max + queued);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also check that it is lessThanOrEqualTo(max + queued + queuedAfterShutdown)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

}

assertBusy(() -> assertTrue(scalingExecutor.isTerminated()));
assertThat(scalingExecutor.getCompletedTaskCount(), greaterThanOrEqualTo((long) max));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here can we check less than or equal to (max + barrier.getParties() - 1)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

assertThat(scalingExecutor.getCompletedTaskCount(), greaterThanOrEqualTo((long) max));
assertThat(scalingExecutor.getQueue().size(), equalTo(0));
assertThat(scalingExecutor.getActiveCount(), equalTo(0));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check that scalingExecutor.getCompletedTaskCount() + rejected.get() == max + barrier.getParties() - 1? To ensure every request is accounted for exactly once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@tlrx tlrx added auto-backport-and-merge auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) labels Jan 24, 2022
@elasticsearchmachine elasticsearchmachine merged commit 24e1888 into elastic:master Jan 24, 2022
tlrx added a commit to tlrx/elasticsearch that referenced this pull request Jan 24, 2022
Today scaling thread pools never reject tasks but always add them to the
queue of task the execute, even in the case the thread pool executor is
shutting down or terminated. This behaviour does not work great when a
task is blocked waiting for another task from another scaling thread
pool to complete an I/O operation which will never be executed if the
task was enqueued just before the scaling thread pool was shutting down.


This situation is more likely to happen with searchable snapshots in
which multiple threads can be blocked waiting for parts of Lucene files
to be fetched and made available in cache. We saw tests failures in CI
where Lucene 9 uses concurrent threads (to asynchronously checks
indices) that were blocked waiting for cache files to be available and
failing because of leaking files handles (see elastic#77017, elastic#77178).

This pull request changes the `ForceQueuePolicy` used by scaling thread
pools so that it now accepts a `rejectAfterShutdown` flag which can be
set on a per thread pool basis to indicate when tasks should just be
rejected once the thread pool is shut down. Because we rely on many
scaling thread pools to be black holes and never reject tasks, this flag
is set to `false` on most of them to keep the current behavior. In some
cases where the rejection logic was already implemented correctly this
flag has been set to `true`.

This pull request also reimplements the interface
`XRejectedExecutionHandler` into an abstract class
`EsRejectedExecutionHandler` that allows to share some logic for
rejections.
@elasticsearchmachine
Copy link
Collaborator

💔 Backport failed

Status Branch Result
8.0
7.17 Commit could not be cherrypicked due to conflicts

You can use sqren/backport to manually backport by running backport --upstream elastic/elasticsearch --pr 81856

tlrx added a commit to tlrx/elasticsearch that referenced this pull request Jan 24, 2022
Today scaling thread pools never reject tasks but always add them to the
queue of task the execute, even in the case the thread pool executor is
shutting down or terminated. This behaviour does not work great when a
task is blocked waiting for another task from another scaling thread
pool to complete an I/O operation which will never be executed if the
task was enqueued just before the scaling thread pool was shutting down.

This situation is more likely to happen with searchable snapshots in
which multiple threads can be blocked waiting for parts of Lucene files
to be fetched and made available in cache. We saw tests failures in CI
where Lucene 9 uses concurrent threads (to asynchronously checks
indices) that were blocked waiting for cache files to be available and
failing because of leaking files handles (see elastic#77017, elastic#77178).

This pull request changes the `ForceQueuePolicy` used by scaling thread
pools so that it now accepts a `rejectAfterShutdown` flag which can be
set on a per thread pool basis to indicate when tasks should just be
rejected once the thread pool is shut down. Because we rely on many
scaling thread pools to be black holes and never reject tasks, this flag
is set to `false` on most of them to keep the current behavior. In some
cases where the rejection logic was already implemented correctly this
flag has been set to `true`.

This pull request also reimplements the interface
`XRejectedExecutionHandler` into an abstract class
`EsRejectedExecutionHandler` that allows to share some logic for
rejections.
elasticsearchmachine pushed a commit that referenced this pull request Jan 24, 2022
Today scaling thread pools never reject tasks but always add them to the
queue of task the execute, even in the case the thread pool executor is
shutting down or terminated. This behaviour does not work great when a
task is blocked waiting for another task from another scaling thread
pool to complete an I/O operation which will never be executed if the
task was enqueued just before the scaling thread pool was shutting down.


This situation is more likely to happen with searchable snapshots in
which multiple threads can be blocked waiting for parts of Lucene files
to be fetched and made available in cache. We saw tests failures in CI
where Lucene 9 uses concurrent threads (to asynchronously checks
indices) that were blocked waiting for cache files to be available and
failing because of leaking files handles (see #77017, #77178).

This pull request changes the `ForceQueuePolicy` used by scaling thread
pools so that it now accepts a `rejectAfterShutdown` flag which can be
set on a per thread pool basis to indicate when tasks should just be
rejected once the thread pool is shut down. Because we rely on many
scaling thread pools to be black holes and never reject tasks, this flag
is set to `false` on most of them to keep the current behavior. In some
cases where the rejection logic was already implemented correctly this
flag has been set to `true`.

This pull request also reimplements the interface
`XRejectedExecutionHandler` into an abstract class
`EsRejectedExecutionHandler` that allows to share some logic for
rejections.
tlrx added a commit that referenced this pull request Jan 24, 2022
Today scaling thread pools never reject tasks but always add them to the
queue of task the execute, even in the case the thread pool executor is
shutting down or terminated. This behaviour does not work great when a
task is blocked waiting for another task from another scaling thread
pool to complete an I/O operation which will never be executed if the
task was enqueued just before the scaling thread pool was shutting down.

This situation is more likely to happen with searchable snapshots in
which multiple threads can be blocked waiting for parts of Lucene files
to be fetched and made available in cache. We saw tests failures in CI
where Lucene 9 uses concurrent threads (to asynchronously checks
indices) that were blocked waiting for cache files to be available and
failing because of leaking files handles (see #77017, #77178).

This pull request changes the `ForceQueuePolicy` used by scaling thread
pools so that it now accepts a `rejectAfterShutdown` flag which can be
set on a per thread pool basis to indicate when tasks should just be
rejected once the thread pool is shut down. Because we rely on many
scaling thread pools to be black holes and never reject tasks, this flag
is set to `false` on most of them to keep the current behavior. In some
cases where the rejection logic was already implemented correctly this
flag has been set to `true`.

This pull request also reimplements the interface
`XRejectedExecutionHandler` into an abstract class
`EsRejectedExecutionHandler` that allows to share some logic for
rejections.

Backport of #81856
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) :Core/Infra/Core Core issues without another label >enhancement Team:Core/Infra Meta label for core/infra team v7.17.0 v8.0.0 v8.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants