Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Segment Search]: Design/Prototype changes for TaskResourceTrackingFramework and SearchBackpressure with concurrent search #7425

Closed
sohami opened this issue May 4, 2023 · 8 comments · Fixed by #7673 or #7502
Assignees
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request v2.9.0 'Issues and PRs related to version v2.9.0' v3.0.0 Issues and PRs related to version 3.0.0

Comments

@sohami
Copy link
Collaborator

sohami commented May 4, 2023

There is TaskResourceTrackingService and SearchBackpressureService which works based on the resource consumption by each threads associated with a task. With concurrent search there will be multiple threads working on a ShardSearchRequest and from different threadpools. This issue is to figure out design and changes needed to:
i) Correctly track the resource consumption with concurrent search model
ii) Changes needed for SearchBackPressureService

@sohami sohami added enhancement Enhancement or improvement to existing feature or request untriaged labels May 4, 2023
@jed326
Copy link
Collaborator

jed326 commented May 4, 2023

I'm taking a look at this, will share a design for the changes needed soon.

@anasalkouz anasalkouz moved this to In Progress in Concurrent Search May 4, 2023
@jed326
Copy link
Collaborator

jed326 commented May 5, 2023

Background

With the current implementation of concurrent search and the task resource framework tracking components, only the main thread resources are tracked during concurrent search. In order to implement a search back-pressure implementation for concurrent search we need to make the task resource tracking framework properly track resource usage in the concurrent search threads as well.

Figure 1. Lifecycle of a search task with concurrent search

Concurrent Search Lifecycle drawio

TaskAwareRunnable

The existing resource tracking framework depends on the TaskAwareRunnable type of Runnable to start and stop tracking of tasks. This Runnable type first checks the thread context to see if a task id is present. If a task ID is present then it will register the thread ID with that task ID in TaskResourceTrackingService. This basically makes TaskResourceTrackingService aware of the threads associated with each task and can get the updated thread resource usage stats on demand later.
The task ID is set in the thread context via TaskManager, and a more detailed discussion of that can be found in #2819. From Figure 1, we see that the task ID is actually set in the transport layer before the task is forked to the search threadpool. This works because all OpenSearchThreadPoolExectors wrap Runnables as a ContextPreservingAbstractRunnable, which will save the current thread context as a field of the Runnable and is restored to the executing thread upon execution.

Proposed Solutions

There are 3 categories of changes we need to examine. [1] Modify the INDEX_SEARCHER threadpool to support the TaskAwareRunnable, [2] Update the Tasks API response with more relevant stats, [3] Validate that existing resource stats consumers still work correctly with changes to [1] and [2].

1. Modify the INDEX_SEARCHER threadpool executor to support the TaskAwareRunnable

Currently the TaskAwareRunnable is only used as a wrapper for the QueueResizableOpenSearchThreadPoolExecutor, which is effectively coupling the Executor queue type with the Runnable type. This is the executor type that the search threadpool currently use and is how the search threadpool tracks the task resource stats.
For the INDEX_SEARCHER threadpool we are using a fixed queue size threadpool which currently doesn't support the TaskAwareRunnable. Looking forward to #7439, #7356, and other changes that we may want to make to support furthering configuring concurrency, I propose we change the INDEX_SEARCHER threadpool to the QueueResizableOpenSearchThreadPoolExecutor type, which will enable task resource tracking for us and allow us to easier modify the queue size in the future as needed.
Because the ThreadContext is automatically propagated to forked threads via the ContextPreservingAbstractRunnable mentioned earlier, we will not need to set the task id in the search thread context again when we fork the concurrent search task from the search threadpool to the INDEX_SEARCHER threadpool.

2. Update Tasks API response with more relevant multi-threaded stats

From the sample Tasks API response below, we see that today the reported resource stats consists only of a sum total of cpu and memory usage:

"MZtXw1AITRKbdaeaOd62uQ:3637426" : {
          "node" : "MZtXw1AITRKbdaeaOd62uQ",
          "id" : 3637426,
          "type" : "transport",
          "action" : "indices:data/read/search[phase/query]",
          "description" : "shardId[[logs-241998-med_restored_13][5]]",
          "start_time_in_millis" : 1682544578929,
          "running_time_in_nanos" : 1593585622,
          "cancellable" : true,
          "cancelled" : false,
          "parent_task_id" : "saZSqjexQgC-fT-tklLOcw:4469785",
          "headers" : { },
          "resource_stats" : {
            "total" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            }
          }
        }

This is fine for a single-threaded context, but in a multi-threaded context we will want more detailed stats beyond a simple sum. Today the resource_stats is represented by a TaskResourceStats object which is a map from stats type (ie "total") to a TaskResourceUsage object, which is consists of a cpu_time_in_nanos and memory_in_bytes field.
I propose adding the additional stats to support the multi-threaded use case: min, max, average, thread count. I have a few implementation methods in mind and am looking for some feedback on this:

1. Add thread count to resource_stats field

"MZtXw1AITRKbdaeaOd62uQ:3637426" : {

...
          "headers" : { },
          "resource_stats" : [
          {
            "thread_count": 1
          },
          {
            "total" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "min" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "max" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "average" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            }
          }]
        }

Pros:
This puts all of the resource stats related response into the same section of the response and we will not need to modify or add any exceptions for task-index-mapping.json.
Cons:
The way that resource_stats is implemented today expects only a TaskResourceStats object and will require some refactoring in order to accept the thread_count field.

2. Add thread count to Task Info

"MZtXw1AITRKbdaeaOd62uQ:3637426" : {

...
          "headers" : { },
          "thread_count": 1,
          "resource_stats" : {
            "total" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "min" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "max" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "average" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            }
          }
        }

Pros:
Will not need to refactor TaskResourceStats
Cons:
thread_count is not present in task-index-mapping.json so we would have breaking changes introducing that field unless we also exclude the thread count.

3. Provide per-thread stats in resource_stats

This way we would expose the thread id for each thread and the number of items in resource_stats would be the thread count.

"MZtXw1AITRKbdaeaOd62uQ:3637426" : {

...
          "headers" : { },
          "resource_stats" : [{
            "total" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "min" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "max" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "average" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "tid_1" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "tid_2" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "tid_3" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "tid_4" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            }
          }
        }

Pros:
Simple implementation and do not need to modify either the index mapping or the TaskResourceStats object
Cons:
Will make the Tasks API response very long and in cases where there is high concurrency can make the API response very long.

I propose implementing solution 1 as that keeps all of the relevant resource stats related information in the same field.

Validate other existing search stats consumers

The resourceStats of the Task object, which is what ultimately stores the resource stats for all the threads of a Task, is consumed in a few different places and we need to ensure that all the consumers still function correctly.

SearchShardTaskDetailsLogMessage

Log message class for search shard task information. No changes required here.

TopNSearchTasksLogger

The purpose of this class is to log high memory consuming search tasks, so it uses the sum total of memory consumption across all the threads to make that determination. This is fine as for concurrent segment search the total would be computed based on aggregate resource usage across all threads.

TransportListTasksAction

This gets the Task info in the Tasks API and what we’re primarily making the proposed changes for.

SearchBackpressureService

This service only calls TaskResourceTrackingService to refresh the stats. Task cancellation is handled by HeapUsageTracker and CpuUsageTracker. Both trackers are looking at task.getTotalResourceStats for the sum total of CPU and Memory usage of the task. This looks like it should function just fine in the multi-threaded search case without any changes, but at the very least none of the changes proposed above should affect the existing single-thread use cases so I will follow up on this consumer later as a part of the SearchBackPressure side of this issue.

@jed326
Copy link
Collaborator

jed326 commented May 5, 2023

tagging @sohami @yigithub @ketanv3 @dblock @reta for feedback. Thanks!

@sohami
Copy link
Collaborator Author

sohami commented May 8, 2023

@jed326 Thanks for the detailed proposal.

For 1, making INDEX_SEARCHER of type QueueResizableOpenSearchThreadPoolExecutor sounds reasonable to me. I will expect that once a search request is accepted for execution then it should not get rejected from the Index_Searcher threadpool. Also the thread count of search pool will limit number of tasks enqueued to the index_searcher threadpool. However, the concurrency per request can still drive the index_searcher enqueued tasks count. With different concurrency per search request, it will be good to dynamically resize this threadpool as well to meet the needs when system start seeing rejection on the index_searcher threadpool

For 2, related to Update Tasks API response, I like Option 1 which keeps all the information related to resource usage of a task in a single block. Option 3 seems verbose as you called out. If needed we can always add those information later as well. One thing to keep in mind is currently it gets serialized as an object instead of list. We should be able to preserve that to avoid clients from breaking. Seems like we can add a field in the TaskResourceStats class to achieve that which will keep track of thread count along with resourceUsage map.

"resource_stats" : {
            "total" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "min" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "max" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "average" : {
              "cpu_time_in_nanos" : 24721920,
              "memory_in_bytes" : 4441736
            },
            "thread_count": 1
          }

@jed326
Copy link
Collaborator

jed326 commented May 8, 2023

tagging @andrross for feedback too. Thanks!

@jed326
Copy link
Collaborator

jed326 commented May 15, 2023

Published first PR #7502 for part 1 of the proposed solutions which includes the threadpool change and related tests. Tagging @ketanv3 @andrross @sohami @ketanv3 @reta for feedback on both the design and the PR. Thanks!

@jed326
Copy link
Collaborator

jed326 commented May 17, 2023

I'm working on the API response PR still but in the meantime wanted to start some discussion on the search backpressure component.

Today in search backpressure we look at the total resource usage and we don't consider active vs inactive threads because with the single threaded model we don't expect any inactive threads. In this context "active thread" is from the perspective of the task, ie whether or not the threads that picked up the Runnable for the task are currently working on the Runnable or not.

In the concurrent search world this distinction may matter a bit more. For example, the same thread may work on multiple Runnables for the same task id. Bringing this back to search backpressure, this means that with the current model we are also considering the CPU and memory usage of threads that have already finished executing the Runnable they picked up.

From both the search backpressure and the stats API perspective, it seems like the question we are asking is "How much resources is this task consuming at the current point in time?". From the CPU time perspective it seems like we should not include non-active threads in our consideration because the thread is not consuming any more CPU once it is not active. Search backpressure does not currently consider the CPU usage anyways. From the memory perspective it seems difficult to determine if the heap usage for inactive threads has been released at that point in time so I think we could go either way here.

@ketanv3 since you worked on the search backpressure implementation do you have any thoughts on this?

@jed326
Copy link
Collaborator

jed326 commented May 24, 2023

We will need to backport #7502 to 2.x so that we aren't missing resource tracking whenever concurrent search is GA in 2.x. Tracking in #7743

@anasalkouz anasalkouz linked a pull request May 25, 2023 that will close this issue
6 tasks
@reta reta added v3.0.0 Issues and PRs related to version 3.0.0 v2.9.0 'Issues and PRs related to version v2.9.0' labels May 30, 2023
@reta reta closed this as completed in #7673 Jun 1, 2023
@github-project-automation github-project-automation bot moved this from In Progress to Done in Concurrent Search Jun 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request v2.9.0 'Issues and PRs related to version v2.9.0' v3.0.0 Issues and PRs related to version 3.0.0
Projects
Status: Done
4 participants