Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] implement throttling in indexer #55011

Merged
merged 16 commits into from
Apr 30, 2020

Conversation

hendrikmuhs
Copy link

@hendrikmuhs hendrikmuhs commented Apr 9, 2020

implement throttling in async-indexer used by rollup and transform. The added docs_per_second parameter is used to calculate a delay before the next
search request is send. With re-throttle its possible to change the parameter at
runtime, at stop its ensured that despite throttling the indexer stops in
reasonable time

relates #54862

This PR adds the basics to use throttling, usage/exposure of this feature is planned for separate PR's, that's why I label this as non-issue.

@hendrikmuhs hendrikmuhs added :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v8.0.0 :ml/Transform Transform v7.8.0 labels Apr 9, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (:Analytics/Rollup)

@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core (:ml/Transform)

Copy link
Member

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change. Test coverage is nice :D.

I am a bit concerned around synchronization with the scheduledNextSearch.

@hendrikmuhs hendrikmuhs force-pushed the transform-throttling-part1 branch from 5693925 to 7c55d35 Compare April 14, 2020 15:09
@hendrikmuhs hendrikmuhs requested a review from jimczi April 15, 2020 12:43
Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about this change. On the one hand I understand why this could be useful but on the other hand I wonder if this is the right place/method to do it.
I wonder for instance if we could use the search option called max_concurrent_shard_requests to limit the impact on the cluster ? So instead of throttling the search requests entirely, this option could control the number of shard requests per node that can be executed concurrently.
That seems easier for users than setting a number of requests per second since this number will depend greatly on the number of unique docs per bucket.
If this is not enough I think we should at least ensure that the cancel of the scheduled search is handled properly. Without clear synchronization, I am not sure how we can ensure that another scheduled search is created while calling triggerThrottledSearchNow ?

@hendrikmuhs
Copy link
Author

I have mixed feelings about this change. On the one hand I understand why this could be useful but on the other hand I wonder if this is the right place/method to do it.
I wonder for instance if we could use the search option called max_concurrent_shard_requests to limit the impact on the cluster ? So instead of throttling the search requests entirely, this option could control the number of shard requests per node that can be executed concurrently.
That seems easier for users than setting a number of requests per second since this number will depend greatly on the number of unique docs per bucket.

I disagree with that, max_concurrent_shard_requests is a much more abstract concept and harder to understand for a user than requests_per_second. It requires deep knowledge of the system, including the number of configured shards, number of nodes etc. This information might not be available (due to enabled security). The default of 5 does not provide much room for tweaking.

I agree that requests_per_second is not ideal, it highly depends on the system, the shape of the data and the used aggregations. Note that requests_per_second is a concept from reindex, which suffers the same problem (but less problematic as its not using aggregations).

In transform you can retrieve the current requests per second using _stats, however it requires some calculation. It would be helpful to calculate it directly, so that a user can see it. A workflow could be:

  1. realize transform is causing to much load
  2. check _stats for the current requests_per_second
  3. Call _update on the transform and e.g. set 50% of the output of 2

There are some ideas for "smart throttling", 1 way could be to automate the 3 steps above. But this is not planned for the first version, a low hanging fruit we thought about: suggest a value for requests_per_second in the UI based on data of other transforms.

Speaking of alternatives, I think the dream solution is this: #37867 as it would implement load shaping instead of brute force throttling.

Another alternative that would be great from a usage perspective: budgeting the search call. Instead of using size, it would be nice to specify ops or time_spent, a search would than return whatever it got within this budget (somewhat similar to terminate_after). For indexing this must still return correct results(complete buckets), in _preview - where we face some performance issues, too - we only need approximate results.

There are definitely better solutions than requests per second, but for now we have to use what's available.

If this is not enough I think we should at least ensure that the cancel of the scheduled search is handled properly. Without clear synchronization, I am not sure how we can ensure that another scheduled search is created while calling triggerThrottledSearchNow ?

You mean https://github.com/elastic/elasticsearch/pull/55011/files#diff-01a4ee37232ad8becca8990702ebb4f0R489?

The answer is: we do not ensure. This works optimistic (to avoid overhead) and to the best of my knowledge should work:

Assume thread A runs the indexer and a request for stop comes in from thread B, its possible that although thread B sets the state to STOPPING, thread A schedules a search. That's why I added 7c55d35, which looks a bit strange but handles the described problem. The underlying java futures are thread-safe afaik, so concurrent access to it, shouldn't be a problem.

Note that there is always only 1 thread A, but there can be multiple thread B's.

@hendrikmuhs hendrikmuhs force-pushed the transform-throttling-part1 branch from 1a4c37f to 35a9b74 Compare April 21, 2020 09:47
@hendrikmuhs
Copy link
Author

I implemented some improvements after offline discussion:

  • state is checked more often, that way a query is not send if the user stopped the jobs
  • the waiting time is exposed to the transform/rollup
    • this allows further optimization like running with a lower page size, batched_reduce_size, max_concurrent_shard_requests and other options, this will be done in separate/enabling PR's (rollup/transform specific)

Copy link
Member

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some concurrency concerns. I THINK cancel will simply return true if the thread is already executing, which might cause issues.

@@ -178,6 +208,35 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
}
}

/**
* Cancels a scheduled search request and issues the search request immediately
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Cancels a scheduled search request and issues the search request immediately
* Cancels a scheduled search request

* Cancels a scheduled search request and issues the search request immediately
*/
private synchronized void stopThrottledSearch() {
if (scheduledNextSearch != null && scheduledNextSearch.cancel()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand around cancel, the only times it will return false are:

  • If the action has already been completed
  • If the action has already been cancelled.

This means it will return true if the thread is executing.

threadPool.executor(executorName).execute(() -> checkState(getState()));

Could happen in the middle of triggerNextSearch. This MIGHT be ok, but the call to checkState(getState()) might transition from STOPPING -> STOPPED while a search is still in flight. I am not sure this is intended behavior.

@@ -461,4 +562,37 @@ private boolean checkState(IndexerState currentState) {
}
}

private synchronized void reQueueThrottledSearch() {
if (scheduledNextSearch != null && scheduledNextSearch.cancel()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to above, a current search could be inflight. This means that if the next delay is 0L, we could have two searches occurring in parallel.

As long as this method is NEVER called out of band, I think this might be ok.

@hendrikmuhs
Copy link
Author

I have some concurrency concerns. I THINK cancel will simply return true if the thread is already executing, which might cause issues.

Do you have a link? My source (code) says:

This attempt will fail if the task has already completed, has already been cancelled or could not be cancelled for some other reason.
...
@return {@code false} if the task could not be cancelled,
typically because it has already completed normally;
{@code true} otherwise

(same as https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html#cancel(boolean), we call it with false)

So my understanding is that cancel will return false if it could not stop it, e.g. if its running or has run.

@benwtrent
Copy link
Member

benwtrent commented Apr 21, 2020

@hendrikmuhs here is nothing in the JavaDOC that gives guarantees around if the thread is already executing and we are not interrupting.

@return {@code false} if the task could not be cancelled,
typically because it has already completed normally;
{@code true} otherwise

This seems to indicate to me that IF the action is already running, it will NOT be interrupted and cancel will return true.

This seems to be validated here (unless my code is wrong): https://repl.it/@ben_w_trent/WhenIsCancelTrue

@hendrikmuhs
Copy link
Author

Your code obviously proves it. However I think I am not the only one, thinking that the documentation sucks, e.g. https://bugs.openjdk.java.net/browse/JDK-8022624. SO has some posts, too.

@hendrikmuhs
Copy link
Author

I addressed the test problems in the separate PR #55666, to avoid making this one more complicated.

Copy link
Member

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK, now that rescheduling won't kick off another search request if one is in flight.

Would be good to get another approval before merge.

hendrikmuhs pushed a commit that referenced this pull request Apr 24, 2020
improve tests related to stopping using a client that answers and can be
synchronized with the test thread in order to test special situations

relates #55011
hendrikmuhs pushed a commit that referenced this pull request Apr 24, 2020
improve tests related to stopping using a client that answers and can be
synchronized with the test thread in order to test special situations

relates #55011
@hendrikmuhs hendrikmuhs force-pushed the transform-throttling-part1 branch from 9a247d1 to df0b527 Compare April 24, 2020 06:59
Copy link
Contributor

@droberts195 droberts195 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a couple of nits but I stopped reviewing when I saw the throttling formula because it seems to highlight a fundamental issue: are we intending to throttle based on search requests per second or documents retrieved per second? What changes are required will depend on the answer to that.

if (requestsPerSecond <= 0) {
return TimeValue.ZERO;
}
float timeToWaitNanos = (docCount / requestsPerSecond) * TimeUnit.SECONDS.toNanos(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formula implies that requestsPerSecond is really desiredDocsPerSecond. If that's correct then requestsPerSecond seems like it will cause confusion in the future because I would assume requestsPerSecond referred to the number of searches, each of which could return many documents.

For example, if I saw a configuration parameter requests_per_second I might decide to set it to 2 so that I'd get a maximum of 2 search requests per second from this functionality. But then if one of my searches returns 1000 documents then I get a 500 second wait until the next search.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a couple of nits but I stopped reviewing when I saw the throttling formula because it seems to highlight a fundamental issue: are we intending to throttle based on search requests per second or documents retrieved per second? What changes are required will depend on the answer to that.

This design and the reasons for that are discussed in the tracking issue: #54862. In a nutshell: You are right that requests_per_second is misleading. The name has still been chosen, because its a existing concept from reindex. It's wrong there, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I missed that requests_per_second is used in this way for reindex already. In that case consistency with reindex is more important.

@hendrikmuhs
Copy link
Author

run elasticsearch-ci/2

@hendrikmuhs
Copy link
Author

status update: We are evaluating/discussing about renaming requests_per_second, possible candidate: documents_per_second

@hendrikmuhs hendrikmuhs force-pushed the transform-throttling-part1 branch from b712882 to 20125a3 Compare April 28, 2020 19:19
@hendrikmuhs
Copy link
Author

The setting got renamed to docs_per_second. This PR is ready from my side.

@droberts195 @jimczi would be nice to get your input.

Copy link
Contributor

@droberts195 droberts195 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setting got renamed to docs_per_second.

I think that is much better as it now reflects how the throttling is done. It would be nice if reindex v2 used the same setting for consistency between similar features.

LGTM

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM2

@hendrikmuhs hendrikmuhs changed the title [Transform] implement throttling in indexer, to be used in transform [Transform] implement throttling in indexer Apr 30, 2020
@hendrikmuhs hendrikmuhs merged commit 72a43dd into elastic:master Apr 30, 2020
@hendrikmuhs hendrikmuhs deleted the transform-throttling-part1 branch April 30, 2020 05:07
hendrikmuhs pushed a commit to hendrikmuhs/elasticsearch that referenced this pull request Apr 30, 2020
implement throttling in async-indexer used by rollup and transform. The added
docs_per_second parameter is used to calculate a delay before the next
search request is send. With re-throttle its possible to change the parameter
at runtime. When stopping a running job, its ensured that despite throttling
the indexer stops in reasonable time. This change contains the groundwork, but
does not expose the new functionality.

relates elastic#54862
# Conflicts:
#	x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java
#	x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java
#	x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java
#	x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java
hendrikmuhs pushed a commit that referenced this pull request Apr 30, 2020
implement throttling in async-indexer used by rollup and transform. The added
docs_per_second parameter is used to calculate a delay before the next
search request is send. With re-throttle its possible to change the parameter
at runtime. When stopping a running job, its ensured that despite throttling
the indexer stops in reasonable time. This change contains the groundwork, but
does not expose the new functionality.

relates #54862
backport: #55011
@hendrikmuhs hendrikmuhs mentioned this pull request Apr 30, 2020
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:ml/Transform Transform >non-issue :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v7.8.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants