Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Worker Coordinator Async Profiler #710

Draft
wants to merge 40 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7708d5b
Add debuggers
IanHoang Nov 15, 2024
b173589
Add more debug statements
IanHoang Nov 19, 2024
e047385
Add logger statements to track path of vectorsearch workload
IanHoang Nov 19, 2024
8642079
Add extra debugging statement
IanHoang Nov 19, 2024
c21b420
Add request Cache
IanHoang Nov 19, 2024
63de8c2
Fix cache
IanHoang Nov 19, 2024
f47d91e
Add info to see if we can see request ids
IanHoang Nov 20, 2024
234b514
Block out request cache
IanHoang Nov 20, 2024
334b6d2
Move logging initilialization up
IanHoang Nov 20, 2024
495a3fc
Add headers logging statement
IanHoang Nov 20, 2024
756916e
Add headers
IanHoang Nov 20, 2024
89fb8d9
Add fix
IanHoang Nov 20, 2024
3e184a2
Add request id before and after sending request
IanHoang Nov 20, 2024
345e30d
Change to uuid4
IanHoang Nov 20, 2024
4125111
fix
IanHoang Nov 20, 2024
c29ff67
fix
IanHoang Nov 20, 2024
9e0aaa5
Add more logging statements for request id
IanHoang Nov 20, 2024
512d052
Add more logging statements for request id
IanHoang Nov 20, 2024
e097528
Add more statements
IanHoang Nov 20, 2024
7b0bfee
Add repsonse request id
IanHoang Nov 20, 2024
80d2adb
Add client id and task to profiler
IanHoang Nov 21, 2024
839ac6a
Refine the profiler
IanHoang Nov 21, 2024
621ecd0
Add type check
IanHoang Nov 21, 2024
6451068
Add extra lines
IanHoang Nov 21, 2024
0c6b568
Add parametrs in CLI for updating AsyncProfiler
IanHoang Nov 22, 2024
a297e83
Add params comments to AsyncProfiler
IanHoang Nov 22, 2024
5c276fb
Improve error handling
IanHoang Nov 22, 2024
1d61375
Update error handling
IanHoang Nov 22, 2024
6ba7113
Add logging to find when workload plugins are loaded
IanHoang Nov 22, 2024
943e8aa
Update logging statement
IanHoang Nov 22, 2024
1f07e7f
Remove request_id param and also all unnecessary logging statements
IanHoang Dec 10, 2024
06f1c77
Remove request_id that was added for debugging purposes
IanHoang Dec 10, 2024
5521e4a
Resolve conflict
IanHoang Dec 10, 2024
51daee8
Merge branch 'main' into add-worker-coordinator-profiler
IanHoang Dec 10, 2024
b7511c3
Address pylint errors
IanHoang Dec 10, 2024
3a8d1ae
Revise help text for --profiling-sort-type
IanHoang Dec 10, 2024
e668c98
Remove extra new lines and revise help text
IanHoang Dec 11, 2024
7268568
Add back spaces
IanHoang Dec 11, 2024
823f14d
Removed redundant code
IanHoang Dec 11, 2024
d92842d
Add all sort types available in library and remove additional white s…
IanHoang Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
test_execution_orchestrator, results_publisher, \
metrics, workload, exceptions, log
from osbenchmark.builder import provision_config, builder
from osbenchmark.worker_coordinator import worker_coordinator
from osbenchmark.workload_generator import workload_generator
from osbenchmark.utils import io, convert, process, console, net, opts, versions
from osbenchmark import aggregator
Expand Down Expand Up @@ -581,9 +582,15 @@ def add_workload_source(subparser):
action="store_true")
test_execution_parser.add_argument(
"--enable-worker-coordinator-profiling",
help="Enables a profiler for analyzing the performance of calls in Benchmark's worker coordinator (default: false).",
help="Enables a profiler for analyzing the performance of calls in OSB's worker coordinator (default: false). "
"Outputs to ~/.benchmark/logs/ as profile.log",
IanHoang marked this conversation as resolved.
Show resolved Hide resolved
default=False,
action="store_true")
test_execution_parser.add_argument(
"--profiling-sort-type",
help=f"Sort profile.log by sort types (column names). Only applies if --enable-worker-coordinator-profiling is provided. "
IanHoang marked this conversation as resolved.
Show resolved Hide resolved
f"Available sort types: {worker_coordinator.AsyncProfiler.SORT_TYPES}. Default is None.",
default=None)
Comment on lines +592 to +593
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May help to add a check to warn if this argument is used but profiling is not enabled.
Also, does it make sense to provide an option for sort order (asc/desc)?
Finally, it may be useful to be able to specify only specific tasks for profiling.

Copy link
Collaborator Author

@IanHoang IanHoang Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May help to add a check to warn if this argument is used but profiling is not enabled.

Rishabh made a call out earlier in this PR that suggests moving the responsibility of checking if sort types exists to the argparser. This is doable and requires adding more classes and leveraging the action field in the arguments. This does add more complexity to benchmark.py though.

does it make sense to provide an option for sort order (asc/desc)?

I initially wanted to minimize the number of arguments for AsyncProfiler since it exists in the execute-test parser, which already has a lot of arguments. I could look to incorporate it still.

Finally, it may be useful to be able to specify only specific tasks for profiling.

This is a fair call out but similar to the question above, I'm concerned about the number of arguments in execute-test and wonder if users will be able to spot that these are for the AsyncProfiler. Still, both of these are arguments that users would actually want. Can aim to have this as a draft and see if there's other ways we can achieve this.

test_execution_parser.add_argument(
"--enable-assertions",
help="Enables assertion checks for tasks (default: false).",
Expand Down Expand Up @@ -912,6 +919,7 @@ def configure_test(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling_sort_type", args.profiling_sort_type)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error)
cfg.add(
Expand Down
1 change: 1 addition & 0 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ class Query(Runner):
def __init__(self):
super().__init__()
self._extractor = SearchAfterExtractor()
self.logger = logging.getLogger(__name__)

async def __call__(self, opensearch, params):
request_params, headers = self._transport_request_params(params)
Expand Down
40 changes: 35 additions & 5 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ def receiveMsg_PrepareWorkload(self, msg, sender):
# the workload might have been loaded on a different machine (the coordinator machine) so we force a workload
# update to ensure we use the latest version of plugins.
load_workload(self.cfg)
self.logger.info("Preparing plugins, param sources, runners, and components for workload now")
load_workload_plugins(self.cfg, self.workload.name, register_workload_processor=tpr.register_workload_processor,
force_update=True)
# we expect on_prepare_workload can take a long time. seed a queue of tasks and delegate to child workers
Expand Down Expand Up @@ -1019,6 +1020,7 @@ def calculate_worker_assignments(host_configs, client_count):
class ClientAllocations:
def __init__(self):
self.allocations = []
self.logger = logging.getLogger(__name__)

def add(self, client_id, tasks):
self.allocations.append({
Expand Down Expand Up @@ -1467,6 +1469,7 @@ def __init__(self, cfg, workload, task_allocations, sampler, cancel, complete, a
self.complete = complete
self.abort_on_error = abort_on_error
self.profiling_enabled = self.cfg.opts("worker_coordinator", "profiling")
self.profiling_sort_type = self.cfg.opts("worker_coordinator", "profiling_sort_type")
self.assertions_enabled = self.cfg.opts("worker_coordinator", "assertions")
self.debug_event_loop = self.cfg.opts("system", "async.debug", mandatory=False, default_value=False)
self.logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1526,7 +1529,9 @@ def os_clients(all_hosts, all_client_options):
async_executor = AsyncExecutor(
client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete,
task.error_behavior(self.abort_on_error), self.cfg)
final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor
final_executor = AsyncProfiler(
async_executor, client_id,
task, self.profiling_sort_type) if self.profiling_enabled else async_executor
aws.append(final_executor())
run_start = time.perf_counter()
try:
Expand All @@ -1544,12 +1549,21 @@ def os_clients(all_hosts, all_client_options):


class AsyncProfiler:
def __init__(self, target):
SORT_TYPES = ["ncall", "tsub", "ttot", "tavg"]

def __init__(self, target, client_id, task, sort_type):
"""
:param target: The actual executor which should be profiled.
:param client_id: The client that is being profiled.
:param task: The task in the workload that is being profiled.
:param sort_type: If not None, the column to sort profiled results on. If None, output is not sorted
"""
self.target = target
self.client_id = client_id
self.task = task
self.sort_type = sort_type
self.profile_logger = logging.getLogger("benchmark.profile")
self.logger = logging.getLogger(__name__)

async def __call__(self, *args, **kwargs):
# initialize lazily, we don't need it in the majority of cases
Expand All @@ -1562,17 +1576,32 @@ async def __call__(self, *args, **kwargs):
finally:
yappi.stop()
s = python_io.StringIO()
yappi.get_func_stats().print_all(out=s, columns={

stats = yappi.get_func_stats()

if self.sort_type:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enforce this check while accepting the params.
e.g.

parser.add_argument(
    '--mode',
    type=str,
    choices=['normal', 'advanced', 'expert'],  # List of valid options
    required=True,  # Make this argument mandatory
    help="Choose a mode: 'normal', 'advanced', or 'expert'"
)

This way argparser will throw exception.

Copy link
Collaborator Author

@IanHoang IanHoang Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would work if --enable-worker-coordinator-profiling was required in execute-test but it's currently optional. --sort-type is not needed unless the user wants to specify the a column for the Yappi library to sort on.

Looking at the argparse documentation, we'll need to create additional classes that inherit from argparse.Actions and some additional if statements in benchmark.py to enforce Argparser to throw the exception, which requires new changes as oppose to this if statement within the AsyncProfiler. @rishabh6788 Let me know if you have other thoughts on this?

If we find an approach with argparse that we like and doesn't add too much to the existing benchmark.py, I can open a separate PR to address this as well so that there is less changes per PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even when you set a default value if flag not provided?

if self.sort_type not in self.SORT_TYPES:
raise exceptions.SystemSetupError(
f"{self.sort_type} is an invalid sort type. "
f"Available sort types in Async Profiler are: {self.SORT_TYPES}"
)

self.logger.info("Using Async Profiler with sort type: %s", self.sort_type)
stats.sort(sort_type=self.sort_type, sort_order='desc')
else:
self.logger.info("Using Async Profiler without sort type")

stats.print_all(out=s, columns={
0: ("name", 140),
1: ("ncall", 8),
2: ("tsub", 8),
3: ("ttot", 8),
4: ("tavg", 8)
})

profile = "\n=== Profile START ===\n"
profile = f"\n=== Profile start for client id [{self.client_id}] and task [{self.task}] ===\n"
IanHoang marked this conversation as resolved.
Show resolved Hide resolved
profile += s.getvalue()
profile += "=== Profile END ==="
profile += "\n=== Profile END ===\n"
self.profile_logger.info(profile)


Expand Down Expand Up @@ -1827,6 +1856,7 @@ class Allocator:

def __init__(self, schedule):
self.schedule = schedule
self.logger = logging.getLogger(__name__)

@property
def allocations(self):
Expand Down
3 changes: 2 additions & 1 deletion osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ def __init__(self, workload, params, **kwargs):
class SearchParamSource(ParamSource):
def __init__(self, workload, params, **kwargs):
super().__init__(workload, params, **kwargs)
self.logger = logging.getLogger(__name__)
target_name = get_target(workload, params)
type_name = params.get("type")
if params.get("data-stream") and type_name:
Expand Down Expand Up @@ -1085,7 +1086,7 @@ def __init__(self, workloads, params, query_params, **kwargs):

self.filter_type = self.query_params.get(self.PARAMS_NAME_FILTER_TYPE)
self.filter_body = self.query_params.get(self.PARAMS_NAME_FILTER_BODY)

self.logger = logging.getLogger(__name__)

if self.PARAMS_NAME_FILTER in params:
self.query_params.update({
Expand Down
5 changes: 4 additions & 1 deletion tests/worker_coordinator/worker_coordinator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1680,7 +1680,10 @@ async def f(x):
await asyncio.sleep(x)
return x * 2

profiler = worker_coordinator.AsyncProfiler(f)
client_id = 2
task = "queries"
sort_type = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more test to verify any other sort type option?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can but this test is more for seeing if the wrapper is called

profiler = worker_coordinator.AsyncProfiler(f, client_id, task, sort_type)
start = time.perf_counter()
# this should take roughly 1 second and should return something
return_value = await profiler(1)
Expand Down
Loading