From 5aacd81a435745d639cb84ad4aa1db957dccd7b9 Mon Sep 17 00:00:00 2001 From: saimedhi Date: Mon, 29 Jan 2024 12:46:03 -0800 Subject: [PATCH 1/6] Added client processing time metric Signed-off-by: saimedhi --- osbenchmark/client.py | 33 ++++++ osbenchmark/metrics.py | 6 +- osbenchmark/worker_coordinator/runner.py | 106 +++++++++++++++++- .../worker_coordinator/worker_coordinator.py | 22 +++- 4 files changed, 155 insertions(+), 12 deletions(-) diff --git a/osbenchmark/client.py b/osbenchmark/client.py index 7a93944cb..a35743d5e 100644 --- a/osbenchmark/client.py +++ b/osbenchmark/client.py @@ -56,16 +56,28 @@ def request_start(self): @property def request_end(self): return self.ctx["request_end"] + + @property + def client_request_start(self): + return self.ctx["client_request_start"] + + @property + def client_request_end(self): + return self.ctx["client_request_end"] async def __aexit__(self, exc_type, exc_val, exc_tb): # propagate earliest request start and most recent request end to parent request_start = self.request_start request_end = self.request_end + client_request_start = self.client_request_start + client_request_end = self.client_request_end self.ctx_holder.restore_context(self.token) # don't attempt to restore these values on the top-level context as they don't exist if self.token.old_value != contextvars.Token.MISSING: self.ctx_holder.update_request_start(request_start) self.ctx_holder.update_request_end(request_end) + self.ctx_holder.update_client_request_start(client_request_start) + self.ctx_holder.update_client_request_end(client_request_end) self.token = None return False @@ -101,6 +113,27 @@ def update_request_end(cls, new_request_end): meta = cls.request_context.get() meta["request_end"] = new_request_end + @classmethod + def update_client_request_start(cls, new_client_request_start): + meta = cls.request_context.get() + if "client_request_start" not in meta: + print("updated client_request_start", new_client_request_start) + meta["client_request_start"] = new_client_request_start + + @classmethod + def update_client_request_end(cls, new_client_request_end): + meta = cls.request_context.get() + print("updated client_request_end", new_client_request_end) + meta["client_request_end"] = new_client_request_end + + @classmethod + def on_client_request_start(cls): + cls.update_client_request_start(time.perf_counter()) + + @classmethod + def on_client_request_end(cls): + cls.update_client_request_end(time.perf_counter()) + @classmethod def on_request_start(cls): cls.update_request_start(time.perf_counter()) diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index e9a480288..70bb91a0d 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -1742,6 +1742,7 @@ def __call__(self): self.summary_stats("throughput", t, op_type), self.single_latency(t, op_type), self.single_latency(t, op_type, metric_name="service_time"), + self.single_latency(t, op_type, metric_name="client_processing_time"), self.single_latency(t, op_type, metric_name="processing_time"), error_rate, duration, @@ -1991,6 +1992,8 @@ def op_metrics(op_item, key, single_value=False): all_results.append(op_metrics(item, "latency")) if "service_time" in item: all_results.append(op_metrics(item, "service_time")) + if "client_processing_time" in item: + all_results.append(op_metrics(item, "client_processing_time")) if "processing_time" in item: all_results.append(op_metrics(item, "processing_time")) if "error_rate" in item: @@ -2035,13 +2038,14 @@ def op_metrics(op_item, key, single_value=False): def v(self, d, k, default=None): return d.get(k, default) if d else default - def add_op_metrics(self, task, operation, throughput, latency, service_time, processing_time, error_rate, duration, meta): + def add_op_metrics(self, task, operation, throughput, latency, service_time, client_processing_time, processing_time, error_rate, duration, meta): doc = { "task": task, "operation": operation, "throughput": throughput, "latency": latency, "service_time": service_time, + "client_processing_time": client_processing_time, "processing_time": processing_time, "error_rate": error_rate, "duration": duration diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 51ad3f735..34ea0053e 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -44,7 +44,7 @@ from osbenchmark import exceptions, workload from osbenchmark.utils import convert - +from osbenchmark.client import RequestContextHolder # Mapping from operation type to specific runner from osbenchmark.utils.parse import parse_int_parameter, parse_string_parameter @@ -211,6 +211,20 @@ def _transport_request_params(self, params): headers.update({"x-opaque-id": opaque_id}) return request_params, headers +request_context_holder = RequestContextHolder() + +def time_func(func): + async def advised(*args, **kwargs): + st = time.perf_counter() + print("started", st) + request_context_holder.on_client_request_start() + rsl = await func(*args, **kwargs) + request_context_holder.on_client_request_end() + en = time.perf_counter() + print("ended", en) + return rsl + return advised + class Delegator: """ @@ -489,14 +503,16 @@ async def __call__(self, opensearch, params): # errors have occurred we only need a small amount of information from the potentially large response. if not detailed_results: opensearch.return_raw_response() - + request_context_holder.on_client_request_start() + if with_action_metadata: api_kwargs.pop("index", None) # only half of the lines are documents response = await opensearch.bulk(params=bulk_params, **api_kwargs) else: response = await opensearch.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) - + + request_context_holder.on_client_request_end() stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response) meta_data = { @@ -642,9 +658,11 @@ async def __call__(self, opensearch, params): for attempt in range(retries): try: + request_context_holder.on_client_request_start() await opensearch.bulk( body=params["body"] ) + request_context_holder.on_client_request_end() return size, "docs" except ConnectionTimeout: @@ -673,7 +691,9 @@ async def __call__(self, opensearch, params): if mode == "polling": complete = False try: + request_context_holder.on_client_request_start() await opensearch.indices.forcemerge(**merge_params) + request_context_holder.on_client_request_end() complete = True except opensearchpy.ConnectionTimeout: pass @@ -684,7 +704,9 @@ async def __call__(self, opensearch, params): # empty nodes response indicates no tasks complete = True else: - await opensearch.indices.forcemerge(**merge_params) + request_context_holder.on_client_request_start() + await opensearch.indices.forcemerge(**merge_params) + request_context_holder.on_client_request_end() def __repr__(self, *args, **kwargs): return "force-merge" @@ -710,7 +732,9 @@ async def __call__(self, opensearch, params): api_kwargs = self._default_kw_params(params) index = api_kwargs.pop("index", "_all") condition = params.get("condition") + request_context_holder.on_client_request_start() response = await opensearch.indices.stats(index=index, metric="_all", **api_kwargs) + request_context_holder.on_client_request_end() if condition: path = mandatory(condition, "path", repr(self)) expected_value = mandatory(condition, "expected-value", repr(self)) @@ -743,6 +767,7 @@ class NodeStats(Runner): Gather node stats for all nodes. """ + @time_func async def __call__(self, opensearch, params): request_timeout = params.get("request-timeout") await opensearch.nodes.stats(metric="_all", request_timeout=request_timeout) @@ -970,10 +995,12 @@ async def _scroll_query(opensearch, params): took = props.get("took", 0) all_results_collected = (size is not None and hits < size) or hits == 0 else: + request_context_holder.on_client_request_start() r = await opensearch.transport.perform_request("GET", "/_search/scroll", body={"scroll_id": scroll_id, "scroll": "10s"}, params=request_params, headers=headers) + request_context_holder.on_client_request_end() props = parse(r, ["timed_out", "took"], ["hits.hits"]) timed_out = timed_out or props.get("timed_out", False) took += props.get("took", 0) @@ -1106,16 +1133,21 @@ def calculate_recall(predictions, neighbors, top_k): search_method = params.get("operation-type") if search_method == "paginated-search": + print("11") return await _search_after_query(opensearch, params) elif search_method == "scroll-search": + print("22") return await _scroll_query(opensearch, params) elif "pages" in params: + print("33") logging.getLogger(__name__).warning("Invoking a scroll search with the 'search' operation is deprecated " "and will be removed in a future release. Use 'scroll-search' instead.") return await _scroll_query(opensearch, params) elif search_method == "vector-search": + print("44") return await _vector_search_query_with_recall(opensearch, params) else: + print("55") return await _request_body_query(opensearch, params) async def _raw_search(self, opensearch, doc_type, index, body, params, headers=None): @@ -1126,7 +1158,10 @@ async def _raw_search(self, opensearch, doc_type, index, body, params, headers=N components.append(doc_type) components.append("_search") path = "/".join(components) - return await opensearch.transport.perform_request("GET", "/" + path, params=params, body=body, headers=headers) + request_context_holder.on_client_request_start() + response = await opensearch.transport.perform_request("GET", "/" + path, params=params, body=body, headers=headers) + request_context_holder.on_client_request_end() + return response def _query_headers(self, params): # reduces overhead due to decompression of very large responses @@ -1211,8 +1246,10 @@ def status(v): else: # we're good with any count of relocating shards. expected_relocating_shards = sys.maxsize - + + request_context_holder.on_client_request_start() result = await opensearch.cluster.health(**api_kw_params) + request_context_holder.on_client_request_end() cluster_status = result["status"] relocating_shards = result["relocating_shards"] @@ -1232,6 +1269,7 @@ def __repr__(self, *args, **kwargs): class PutPipeline(Runner): + @time_func async def __call__(self, opensearch, params): await opensearch.ingest.put_pipeline(id=mandatory(params, "id", self), body=mandatory(params, "body", self), @@ -1244,6 +1282,7 @@ def __repr__(self, *args, **kwargs): # TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474 class CreateSearchPipeline(Runner): + @time_func async def __call__(self, opensearch, params): endpoint = "/_search/pipeline/" + mandatory(params, "id", self) await opensearch.transport.perform_request(method="PUT", url=endpoint, body=mandatory(params, "body", self)) @@ -1252,6 +1291,7 @@ def __repr__(self, *args, **kwargs): return "create-search-pipeline" class Refresh(Runner): + @time_func async def __call__(self, opensearch, params): await opensearch.indices.refresh(index=params.get("index", "_all")) @@ -1261,13 +1301,16 @@ def __repr__(self, *args, **kwargs): class CreateIndex(Runner): async def __call__(self, opensearch, params): + print("CreateIndex __call__ printed") indices = mandatory(params, "indices", self) api_params = self._default_kw_params(params) ## ignore invalid entries rather than erroring for term in ["index", "body"]: api_params.pop(term, None) for index, body in indices: + request_context_holder.on_client_request_start() await opensearch.indices.create(index=index, body=body, **api_params) + request_context_holder.on_client_request_end() return { "weight": len(indices), "unit": "ops", @@ -1283,7 +1326,9 @@ async def __call__(self, opensearch, params): data_streams = mandatory(params, "data-streams", self) request_params = mandatory(params, "request-params", self) for data_stream in data_streams: + request_context_holder.on_client_request_start() await opensearch.indices.create_data_stream(data_stream, params=request_params) + request_context_holder.on_client_request_end() return { "weight": len(data_streams), "unit": "ops", @@ -1295,6 +1340,7 @@ def __repr__(self, *args, **kwargs): class DeleteIndex(Runner): + @time_func async def __call__(self, opensearch, params): ops = 0 @@ -1304,11 +1350,15 @@ async def __call__(self, opensearch, params): for index_name in indices: if not only_if_exists: + request_context_holder.on_client_request_start() await opensearch.indices.delete(index=index_name, params=request_params) + request_context_holder.on_client_request_end() ops += 1 elif only_if_exists and await opensearch.indices.exists(index=index_name): self.logger.info("Index [%s] already exists. Deleting it.", index_name) + request_context_holder.on_client_request_start() await opensearch.indices.delete(index=index_name, params=request_params) + request_context_holder.on_client_request_end() ops += 1 return { @@ -1331,11 +1381,15 @@ async def __call__(self, opensearch, params): for data_stream in data_streams: if not only_if_exists: + request_context_holder.on_client_request_start() await opensearch.indices.delete_data_stream(data_stream, ignore=[404], params=request_params) + request_context_holder.on_client_request_end() ops += 1 elif only_if_exists and await opensearch.indices.exists(index=data_stream): self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) + request_context_holder.on_client_request_start() await opensearch.indices.delete_data_stream(data_stream, params=request_params) + request_context_holder.on_client_request_end() ops += 1 return { @@ -1353,8 +1407,10 @@ async def __call__(self, opensearch, params): templates = mandatory(params, "templates", self) request_params = mandatory(params, "request-params", self) for template, body in templates: + request_context_holder.on_client_request_start() await opensearch.cluster.put_component_template(name=template, body=body, params=request_params) + request_context_holder.on_client_request_end() return { "weight": len(templates), "unit": "ops", @@ -1382,11 +1438,15 @@ async def _exists(name): ops_count = 0 for template_name in template_names: if not only_if_exists: + request_context_holder.on_client_request_start() await opensearch.cluster.delete_component_template(name=template_name, params=request_params, ignore=[404]) + request_context_holder.on_client_request_end() ops_count += 1 elif only_if_exists and await _exists(template_name): self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) + request_context_holder.on_client_request_start() await opensearch.cluster.delete_component_template(name=template_name, params=request_params) + request_context_holder.on_client_request_end() ops_count += 1 return { "weight": ops_count, @@ -1404,7 +1464,9 @@ async def __call__(self, opensearch, params): templates = mandatory(params, "templates", self) request_params = mandatory(params, "request-params", self) for template, body in templates: + request_context_holder.on_client_request_start() await opensearch.cluster.put_index_template(name=template, body=body, params=request_params) + request_context_holder.on_client_request_end() return { "weight": len(templates), @@ -1425,11 +1487,15 @@ async def __call__(self, opensearch, params): for template_name, delete_matching_indices, index_pattern in templates: if not only_if_exists: + request_context_holder.on_client_request_start() await opensearch.indices.delete_index_template(name=template_name, params=request_params, ignore=[404]) + request_context_holder.on_client_request_end() ops_count += 1 elif only_if_exists and await opensearch.indices.exists_template(template_name): self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) + request_context_holder.on_client_request_start() await opensearch.indices.delete_index_template(name=template_name, params=request_params) + request_context_holder.on_client_request_end() ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: @@ -1451,9 +1517,11 @@ async def __call__(self, opensearch, params): templates = mandatory(params, "templates", self) request_params = params.get("request-params", {}) for template, body in templates: + request_context_holder.on_client_request_start() await opensearch.indices.put_template(name=template, body=body, params=request_params) + request_context_holder.on_client_request_end() return { "weight": len(templates), "unit": "ops", @@ -1473,11 +1541,15 @@ async def __call__(self, opensearch, params): for template_name, delete_matching_indices, index_pattern in template_names: if not only_if_exists: + request_context_holder.on_client_request_start() await opensearch.indices.delete_template(name=template_name, params=request_params) + request_context_holder.on_client_request_end() ops_count += 1 elif only_if_exists and await opensearch.indices.exists_template(template_name): self.logger.info("Index template [%s] already exists. Deleting it.", template_name) + request_context_holder.on_client_request_start() await opensearch.indices.delete_template(name=template_name, params=request_params) + request_context_holder.on_client_request_end() ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: @@ -1561,7 +1633,9 @@ async def __call__(self, opensearch, params): # kick off the shrink operation index_suffix = remove_prefix(source_index, source_indices_stem) final_target_index = target_index if len(index_suffix) == 0 else target_index+index_suffix + request_context_holder.on_client_request_start() await opensearch.indices.shrink(index=source_index, target=final_target_index, body=target_body) + request_context_holder.on_client_request_end() self.logger.info("Waiting for shrink to finish for index [%s] ...", source_index) await self._wait_for(opensearch, final_target_index, f"shrink for index [{final_target_index}]") @@ -1590,11 +1664,13 @@ async def __call__(self, opensearch, params): #counter-intuitive, but preserves prior behavior headers = None + request_context_holder.on_client_request_start() await opensearch.transport.perform_request(method=params.get("method", "GET"), url=path, headers=headers, body=params.get("body"), params=request_params) + request_context_holder.on_client_request_end() def __repr__(self, *args, **kwargs): return "raw-request" @@ -1620,6 +1696,7 @@ class DeleteSnapshotRepository(Runner): """ Deletes a snapshot repository """ + @time_func async def __call__(self, opensearch, params): await opensearch.snapshot.delete_repository(repository=mandatory(params, "repository", repr(self))) @@ -1631,6 +1708,7 @@ class CreateSnapshotRepository(Runner): """ Creates a new snapshot repository """ + @time_func async def __call__(self, opensearch, params): request_params = params.get("request-params", {}) await opensearch.snapshot.create_repository(repository=mandatory(params, "repository", repr(self)), @@ -1645,6 +1723,7 @@ class CreateSnapshot(Runner): """ Creates a new snapshot repository """ + @time_func async def __call__(self, opensearch, params): wait_for_completion = params.get("wait-for-completion", False) repository = mandatory(params, "repository", repr(self)) @@ -1711,6 +1790,7 @@ class RestoreSnapshot(Runner): """ Restores a snapshot from an already registered repository """ + @time_func async def __call__(self, opensearch, params): api_kwargs = self._default_kw_params(params) await opensearch.snapshot.restore(repository=mandatory(params, "repository", repr(self)), @@ -1736,7 +1816,9 @@ async def __call__(self, opensearch, params): # The nesting level is ok here given the structure of the API response # pylint: disable=too-many-nested-blocks while not all_shards_done: + request_context_holder.on_client_request_start() response = await opensearch.indices.recovery(index=index) + request_context_holder.on_client_request_end() # This might happen if we happen to call the API before the next recovery is scheduled. if not response: self.logger.debug("Empty index recovery response for [%s].", index) @@ -1776,6 +1858,7 @@ def __repr__(self, *args, **kwargs): class PutSettings(Runner): + @time_func async def __call__(self, opensearch, params): await opensearch.cluster.put_settings(body=mandatory(params, "body", repr(self))) @@ -1784,6 +1867,7 @@ def __repr__(self, *args, **kwargs): class CreateTransform(Runner): + @time_func async def __call__(self, opensearch, params): transform_id = mandatory(params, "transform-id", self) body = mandatory(params, "body", self) @@ -1795,6 +1879,7 @@ def __repr__(self, *args, **kwargs): class StartTransform(Runner): + @time_func async def __call__(self, opensearch, params): transform_id = mandatory(params, "transform-id", self) timeout = params.get("timeout") @@ -1920,6 +2005,7 @@ def __repr__(self, *args, **kwargs): class DeleteTransform(Runner): + @time_func async def __call__(self, opensearch, params): transform_id = mandatory(params, "transform-id", self) force = params.get("force", False) @@ -1931,6 +2017,7 @@ def __repr__(self, *args, **kwargs): class SubmitAsyncSearch(Runner): + @time_func async def __call__(self, opensearch, params): request_params = params.get("request-params", {}) response = await opensearch.async_search.submit(body=mandatory(params, "body", self), @@ -1962,8 +2049,10 @@ async def __call__(self, opensearch, params): request_params = params.get("request-params", {}) stats = {} for search_id, search in async_search_ids(searches): + request_context_holder.on_client_request_start() response = await opensearch.async_search.get(id=search_id, params=request_params) + request_context_holder.on_client_request_end() is_running = response["is_running"] success = success and not is_running if not is_running: @@ -1990,7 +2079,9 @@ class DeleteAsyncSearch(Runner): async def __call__(self, opensearch, params): searches = mandatory(params, "delete-results-for", self) for search_id, search in async_search_ids(searches): + request_context_holder.on_client_request_start() await opensearch.async_search.delete(id=search_id) + request_context_holder.on_client_request_end() CompositeContext.remove(search) def __repr__(self, *args, **kwargs): @@ -1998,6 +2089,7 @@ def __repr__(self, *args, **kwargs): class CreatePointInTime(Runner): + @time_func async def __call__(self, opensearch, params): op_name = mandatory(params, "name", self) index = mandatory(params, "index", self) @@ -2013,6 +2105,7 @@ def __repr__(self, *args, **kwargs): class DeletePointInTime(Runner): + @time_func async def __call__(self, opensearch, params): pit_op = params.get("with-point-in-time-from", None) request_params = params.get("request-params", {}) @@ -2031,6 +2124,7 @@ def __repr__(self, *args, **kwargs): class ListAllPointInTime(Runner): + @time_func async def __call__(self, opensearch, params): request_params = params.get("request-params", {}) await opensearch.list_all_point_in_time(params=request_params, headers=None) diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index bcfa96d31..b6f1d8a85 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -868,6 +868,12 @@ def __call__(self, raw_samples): sample_type=sample.sample_type, absolute_time=sample.absolute_time, relative_time=sample.relative_time, meta_data=meta_data) + self.metrics_store.put_value_cluster_level(name="client_processing_time", value=convert.seconds_to_ms(sample.client_processing_time), + unit="ms", task=sample.task.name, + operation=sample.operation_name, operation_type=sample.operation_type, + sample_type=sample.sample_type, absolute_time=sample.absolute_time, + relative_time=sample.relative_time, meta_data=meta_data) + self.metrics_store.put_value_cluster_level(name="processing_time", value=convert.seconds_to_ms(sample.processing_time), unit="ms", task=sample.task.name, operation=sample.operation_name, operation_type=sample.operation_type, @@ -1181,11 +1187,12 @@ def __init__(self, start_timestamp, buffer_size=16384): self.logger = logging.getLogger(__name__) def add(self, task, client_id, sample_type, meta_data, absolute_time, request_start, latency, service_time, - processing_time, throughput, ops, ops_unit, time_period, percent_completed, dependent_timing=None): + client_processing_time, processing_time, throughput, ops, ops_unit, time_period, percent_completed, + dependent_timing=None): try: self.q.put_nowait( Sample(client_id, absolute_time, request_start, self.start_timestamp, task, sample_type, meta_data, - latency, service_time, processing_time, throughput, ops, ops_unit, time_period, + latency, service_time, client_processing_time, processing_time, throughput, ops, ops_unit, time_period, percent_completed, dependent_timing)) except queue.Full: self.logger.warning("Dropping sample for [%s] due to a full sampling queue.", task.operation.name) @@ -1203,7 +1210,7 @@ def samples(self): class Sample: def __init__(self, client_id, absolute_time, request_start, task_start, task, sample_type, request_meta_data, latency, - service_time, processing_time, throughput, total_ops, total_ops_unit, time_period, + service_time, client_processing_time, processing_time, throughput, total_ops, total_ops_unit, time_period, percent_completed, dependent_timing=None, operation_name=None, operation_type=None): self.client_id = client_id self.absolute_time = absolute_time @@ -1214,6 +1221,7 @@ def __init__(self, client_id, absolute_time, request_start, task_start, task, sa self.request_meta_data = request_meta_data self.latency = latency self.service_time = service_time + self.client_processing_time = client_processing_time self.processing_time = processing_time self.throughput = throughput self.total_ops = total_ops @@ -1246,7 +1254,7 @@ def dependent_timings(self): if self._dependent_timing: for t in self._dependent_timing: yield Sample(self.client_id, t["absolute_time"], t["request_start"], self.task_start, self.task, - self.sample_type, self.request_meta_data, 0, t["service_time"], 0, 0, self.total_ops, + self.sample_type, self.request_meta_data, 0, t["service_time"], 0, 0, 0, self.total_ops, self.total_ops_unit, self.time_period, self.percent_completed, None, t["operation"], t["operation-type"]) @@ -1579,12 +1587,16 @@ async def __call__(self, *args, **kwargs): processing_start = time.perf_counter() self.schedule_handle.before_request(processing_start) async with self.opensearch["default"].new_request_context() as request_context: + print("runner", runner) total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error) request_start = request_context.request_start request_end = request_context.request_end + client_request_start = request_context.client_request_start + client_request_end = request_context.client_request_end processing_end = time.perf_counter() service_time = request_end - request_start + client_processing_time = (client_request_end - client_request_start) - service_time processing_time = processing_end - processing_start time_period = request_end - total_start self.schedule_handle.after_request(processing_end, total_ops, total_ops_unit, request_meta_data) @@ -1620,7 +1632,7 @@ async def __call__(self, *args, **kwargs): self.sampler.add(self.task, self.client_id, sample_type, request_meta_data, absolute_processing_start, request_start, - latency, service_time, processing_time, throughput, total_ops, total_ops_unit, + latency, service_time, client_processing_time, processing_time, throughput, total_ops, total_ops_unit, time_period, progress, request_meta_data.pop("dependent_timing", None)) if completed: From a5688a2c354556cb47eb1eedbeb1a87ec61891f4 Mon Sep 17 00:00:00 2001 From: saimedhi Date: Mon, 29 Jan 2024 14:40:56 -0800 Subject: [PATCH 2/6] Added client processing time metric Signed-off-by: saimedhi --- tests/worker_coordinator/runner_test.py | 501 +++++++++++++----- .../worker_coordinator_test.py | 77 ++- 2 files changed, 424 insertions(+), 154 deletions(-) diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 01a2a3c3f..4e4f2dd3c 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -349,9 +349,11 @@ def test_list_length(self): class BulkIndexRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_bulk_index_missing_params(self, opensearch): + async def test_bulk_index_missing_params(self, opensearch, on_client_request_start, on_client_request_end): bulk_response = { "errors": False, "took": 8 @@ -375,9 +377,11 @@ async def test_bulk_index_missing_params(self, opensearch): "Parameter source for operation 'bulk-index' did not provide the mandatory parameter 'action-metadata-present'. " "Add it to your parameter source and try again.", ctx.exception.args[0]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_bulk_index_success_with_metadata(self, opensearch): + async def test_bulk_index_success_with_metadata(self, opensearch, on_client_request_start, on_client_request_end): bulk_response = { "errors": False, "took": 8 @@ -410,9 +414,11 @@ async def test_bulk_index_success_with_metadata(self, opensearch): opensearch.bulk.assert_called_with(body=bulk_params["body"], params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_simple_bulk_with_timeout_and_headers(self, opensearch): + async def test_simple_bulk_with_timeout_and_headers(self, opensearch, on_client_request_start, on_client_request_end): bulk_response = { "errors": False, "took": 8 @@ -452,9 +458,11 @@ async def test_simple_bulk_with_timeout_and_headers(self, opensearch): opaque_id="DESIRED-OPAQUE-ID", request_timeout=3.0) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_bulk_index_success_without_metadata_with_doc_type(self, opensearch): + async def test_bulk_index_success_without_metadata_with_doc_type(self, opensearch, on_client_request_start, on_client_request_end): bulk_response = { "errors": False, "took": 8 @@ -485,9 +493,11 @@ async def test_bulk_index_success_without_metadata_with_doc_type(self, opensearc opensearch.bulk.assert_called_with(body=bulk_params["body"], index="test-index", doc_type="_doc", params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_bulk_index_success_without_metadata_and_without_doc_type(self, opensearch): + async def test_bulk_index_success_without_metadata_and_without_doc_type(self, opensearch, on_client_request_start, on_client_request_end): bulk_response = { "errors": False, "took": 8 @@ -517,9 +527,11 @@ async def test_bulk_index_success_without_metadata_and_without_doc_type(self, op opensearch.bulk.assert_called_with(body=bulk_params["body"], index="test-index", doc_type=None, params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_bulk_index_error(self, opensearch): + async def test_bulk_index_error(self, opensearch, on_client_request_start, on_client_request_end): bulk_response = { "took": 5, "errors": True, @@ -586,9 +598,11 @@ async def test_bulk_index_error(self, opensearch): opensearch.bulk.assert_called_with(body=bulk_params["body"], params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_bulk_index_error_no_shards(self, opensearch): + async def test_bulk_index_error_no_shards(self, opensearch, on_client_request_start, on_client_request_end): bulk_response = { "took": 20, "errors": True, @@ -653,9 +667,11 @@ async def test_bulk_index_error_no_shards(self, opensearch): opensearch.bulk.assert_called_with(body=bulk_params["body"], params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_mixed_bulk_with_simple_stats(self, opensearch): + async def test_mixed_bulk_with_simple_stats(self, opensearch, on_client_request_start, on_client_request_end): bulk_response = { "took": 30, "ingest_took": 20, @@ -761,9 +777,11 @@ async def test_mixed_bulk_with_simple_stats(self, opensearch): opensearch.bulk.assert_called_with(body=bulk_params["body"], params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_mixed_bulk_with_detailed_stats_body_as_string(self, opensearch): + async def test_mixed_bulk_with_detailed_stats_body_as_string(self, opensearch, on_client_request_start, on_client_request_end): opensearch.bulk.return_value = as_future({ "took": 30, "ingest_took": 20, @@ -953,9 +971,11 @@ async def test_mixed_bulk_with_detailed_stats_body_as_string(self, opensearch): result = await bulk(opensearch, bulk_params) self.assertNotIn("ingest_took", result) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_simple_bulk_with_detailed_stats_body_as_list(self, opensearch): + async def test_simple_bulk_with_detailed_stats_body_as_list(self, opensearch, on_client_request_start, on_client_request_end): opensearch.bulk.return_value = as_future({ "took": 30, "ingest_took": 20, @@ -1028,9 +1048,11 @@ async def test_simple_bulk_with_detailed_stats_body_as_list(self, opensearch): result = await bulk(opensearch, bulk_params) self.assertNotIn("ingest_took", result) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, opensearch): + async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, opensearch, on_client_request_start, on_client_request_end): opensearch.bulk.return_value = as_future({ "took": 30, "ingest_took": 20, @@ -1076,18 +1098,22 @@ async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, o class ForceMergeRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_force_merge_with_defaults(self, opensearch): + async def test_force_merge_with_defaults(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() await force_merge(opensearch, params={"index": "_all"}) opensearch.indices.forcemerge.assert_called_once_with(index="_all") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_force_merge_with_timeout_and_headers(self, opensearch): + async def test_force_merge_with_timeout_and_headers(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() await force_merge(opensearch, params={"index": "_all", @@ -1100,9 +1126,11 @@ async def test_force_merge_with_timeout_and_headers(self, opensearch): opaque_id="test-id", request_timeout=3.0) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_force_merge_override_request_timeout(self, opensearch): + async def test_force_merge_override_request_timeout(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() @@ -1110,9 +1138,11 @@ async def test_force_merge_override_request_timeout(self, opensearch): opensearch.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=50000) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_force_merge_with_params(self, opensearch): + async def test_force_merge_with_params(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() @@ -1120,18 +1150,22 @@ async def test_force_merge_with_params(self, opensearch): opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_force_merge_with_polling_no_timeout(self, opensearch): + async def test_force_merge_with_polling_no_timeout(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() await force_merge(opensearch, params={"index" : "_all", "mode": "polling", 'poll-period': 0}) opensearch.indices.forcemerge.assert_called_once_with(index="_all") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_force_merge_with_polling(self, opensearch): + async def test_force_merge_with_polling(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) opensearch.tasks.list.side_effect = [ as_future({ @@ -1179,9 +1213,11 @@ async def test_force_merge_with_polling(self, opensearch): await force_merge(opensearch, params={"index": "_all", "mode": "polling", "poll-period": 0}) opensearch.indices.forcemerge.assert_called_once_with(index="_all") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_force_merge_with_polling_and_params(self, opensearch): + async def test_force_merge_with_polling_and_params(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) opensearch.tasks.list.side_effect = [ as_future({ @@ -1233,9 +1269,11 @@ async def test_force_merge_with_polling_and_params(self, opensearch): class IndicesStatsRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_indices_stats_without_parameters(self, opensearch): + async def test_indices_stats_without_parameters(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.stats.return_value = as_future({}) indices_stats = runner.IndicesStats() result = await indices_stats(opensearch, params={}) @@ -1245,9 +1283,11 @@ async def test_indices_stats_without_parameters(self, opensearch): opensearch.indices.stats.assert_called_once_with(index="_all", metric="_all") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_indices_stats_with_timeout_and_headers(self, opensearch): + async def test_indices_stats_with_timeout_and_headers(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.stats.return_value = as_future({}) indices_stats = runner.IndicesStats() result = await indices_stats(opensearch, params={"request-timeout": 3.0, @@ -1263,9 +1303,11 @@ async def test_indices_stats_with_timeout_and_headers(self, opensearch): opaque_id="test-id1", request_timeout=3.0) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_indices_stats_with_failed_condition(self, opensearch): + async def test_indices_stats_with_failed_condition(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.stats.return_value = as_future({ "_all": { "total": { @@ -1297,9 +1339,11 @@ async def test_indices_stats_with_failed_condition(self, opensearch): opensearch.indices.stats.assert_called_once_with(index="logs-*", metric="_all") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_indices_stats_with_successful_condition(self, opensearch): + async def test_indices_stats_with_successful_condition(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.stats.return_value = as_future({ "_all": { "total": { @@ -1331,9 +1375,11 @@ async def test_indices_stats_with_successful_condition(self, opensearch): opensearch.indices.stats.assert_called_once_with(index="logs-*", metric="_all") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_indices_stats_with_non_existing_path(self, opensearch): + async def test_indices_stats_with_non_existing_path(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.stats.return_value = as_future({ "indices": { "total": { @@ -1367,9 +1413,11 @@ async def test_indices_stats_with_non_existing_path(self, opensearch): class QueryRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_match_only_request_body_defined(self, opensearch): + async def test_query_match_only_request_body_defined(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -1423,9 +1471,11 @@ async def test_query_match_only_request_body_defined(self, opensearch): ) opensearch.clear_scroll.assert_not_called() + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_with_timeout_and_headers(self, opensearch): + async def test_query_with_timeout_and_headers(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -1482,9 +1532,11 @@ async def test_query_with_timeout_and_headers(self, opensearch): ) opensearch.clear_scroll.assert_not_called() + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_match_using_request_params(self, opensearch): + async def test_query_match_using_request_params(self, opensearch, on_client_request_start, on_client_request_end): response = { "timed_out": False, "took": 62, @@ -1540,9 +1592,11 @@ async def test_query_match_using_request_params(self, opensearch): ) opensearch.clear_scroll.assert_not_called() + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_no_detailed_results(self, opensearch): + async def test_query_no_detailed_results(self, opensearch, on_client_request_start, on_client_request_end): response = { "timed_out": False, "took": 62, @@ -1594,9 +1648,11 @@ async def test_query_no_detailed_results(self, opensearch): ) opensearch.clear_scroll.assert_not_called() + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_hits_total_as_number(self, opensearch): + async def test_query_hits_total_as_number(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -1649,9 +1705,11 @@ async def test_query_hits_total_as_number(self, opensearch): ) opensearch.clear_scroll.assert_not_called() + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_match_all(self, opensearch): + async def test_query_match_all(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -1705,9 +1763,11 @@ async def test_query_match_all(self, opensearch): ) opensearch.clear_scroll.assert_not_called() + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_match_all_doc_type_fallback(self, opensearch): + async def test_query_match_all_doc_type_fallback(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -1762,9 +1822,11 @@ async def test_query_match_all_doc_type_fallback(self, opensearch): ) opensearch.clear_scroll.assert_not_called() + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_scroll_query_only_one_page(self, opensearch): + async def test_scroll_query_only_one_page(self, opensearch, on_client_request_start, on_client_request_end): # page 1 search_response = { "_scroll_id": "some-scroll-id", @@ -1824,9 +1886,11 @@ async def test_scroll_query_only_one_page(self, opensearch): ) opensearch.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_scroll_query_no_request_cache(self, opensearch): + async def test_scroll_query_no_request_cache(self, opensearch, on_client_request_start, on_client_request_end): # page 1 search_response = { "_scroll_id": "some-scroll-id", @@ -1886,9 +1950,11 @@ async def test_scroll_query_no_request_cache(self, opensearch): ) opensearch.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_scroll_query_only_one_page_only_request_body_defined(self, opensearch): + async def test_scroll_query_only_one_page_only_request_body_defined(self, opensearch, on_client_request_start, on_client_request_end): # page 1 search_response = { "_scroll_id": "some-scroll-id", @@ -1948,9 +2014,11 @@ async def test_scroll_query_only_one_page_only_request_body_defined(self, opense opensearch.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_scroll_query_with_explicit_number_of_pages(self, opensearch): + async def test_scroll_query_with_explicit_number_of_pages(self, opensearch, on_client_request_start, on_client_request_end): # page 1 search_response = { "_scroll_id": "some-scroll-id", @@ -2022,9 +2090,11 @@ async def test_scroll_query_with_explicit_number_of_pages(self, opensearch): opensearch.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_scroll_query_cannot_clear_scroll(self, opensearch): + async def test_scroll_query_cannot_clear_scroll(self, opensearch, on_client_request_start, on_client_request_end): # page 1 search_response = { "_scroll_id": "some-scroll-id", @@ -2073,9 +2143,11 @@ async def test_scroll_query_cannot_clear_scroll(self, opensearch): opensearch.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_scroll_query_request_all_pages(self, opensearch): + async def test_scroll_query_request_all_pages(self, opensearch, on_client_request_start, on_client_request_end): # page 1 search_response = { "_scroll_id": "some-scroll-id", @@ -2147,9 +2219,11 @@ async def test_scroll_query_request_all_pages(self, opensearch): opensearch.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_search_pipeline_using_request_params(self, opensearch): + async def test_search_pipeline_using_request_params(self, opensearch, on_client_request_start, on_client_request_end): response = { "timed_out": False, "took": 62, @@ -2209,9 +2283,11 @@ async def test_search_pipeline_using_request_params(self, opensearch): class VectorSearchQueryRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_search_with_perfect_recall(self, opensearch): + async def test_query_vector_search_with_perfect_recall(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -2285,9 +2361,11 @@ async def test_query_vector_search_with_perfect_recall(self, opensearch): headers={"Accept-Encoding": "identity"} ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_search_with_no_results(self, opensearch): + async def test_query_vector_search_with_no_results(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 1 @@ -2340,9 +2418,11 @@ async def test_query_vector_search_with_no_results(self, opensearch): headers={"Accept-Encoding": "identity"} ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_search_with_imperfect_recall(self, opensearch): + async def test_query_vector_search_with_imperfect_recall(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -2416,9 +2496,11 @@ async def test_query_vector_search_with_imperfect_recall(self, opensearch): headers={"Accept-Encoding": "identity"} ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_search_with_few_results_than_ground_truth(self, opensearch): + async def test_query_vector_search_with_few_results_than_ground_truth(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -2488,9 +2570,11 @@ async def test_query_vector_search_with_few_results_than_ground_truth(self, open headers={"Accept-Encoding": "identity"} ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_search_with_zero_recall_1(self, opensearch): + async def test_query_vector_search_with_zero_recall_1(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -2560,9 +2644,11 @@ async def test_query_vector_search_with_zero_recall_1(self, opensearch): headers={"Accept-Encoding": "identity"} ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_search_with_custom_id_field(self, opensearch): + async def test_query_vector_search_with_custom_id_field(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -2650,9 +2736,11 @@ async def test_query_vector_search_with_custom_id_field(self, opensearch): headers={"Accept-Encoding": "identity"} ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_search_with_custom_id_field_inside_source(self, opensearch): + async def test_query_vector_search_with_custom_id_field_inside_source(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -2738,9 +2826,11 @@ async def test_query_vector_search_with_custom_id_field_inside_source(self, open class PutPipelineRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_pipeline(self, opensearch): + async def test_create_pipeline(self, opensearch, on_client_request_start, on_client_request_end): opensearch.ingest.put_pipeline.return_value = as_future() r = runner.PutPipeline() @@ -2764,9 +2854,11 @@ async def test_create_pipeline(self, opensearch): opensearch.ingest.put_pipeline.assert_called_once_with(id="rename", body=params["body"], master_timeout=None, timeout=None) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_body_mandatory(self, opensearch): + async def test_param_body_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.ingest.put_pipeline.return_value = as_future() r = runner.PutPipeline() @@ -2781,9 +2873,11 @@ async def test_param_body_mandatory(self, opensearch): self.assertEqual(0, opensearch.ingest.put_pipeline.call_count) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_id_mandatory(self, opensearch): + async def test_param_id_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.ingest.put_pipeline.return_value = as_future() r = runner.PutPipeline() @@ -2800,9 +2894,11 @@ async def test_param_id_mandatory(self, opensearch): class ClusterHealthRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_waits_for_expected_cluster_status(self, opensearch): + async def test_waits_for_expected_cluster_status(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.health.return_value = as_future({ "status": "green", "relocating_shards": 0 @@ -2827,9 +2923,11 @@ async def test_waits_for_expected_cluster_status(self, opensearch): opensearch.cluster.health.assert_called_once_with(params={"wait_for_status": "green"}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_accepts_better_cluster_status(self, opensearch): + async def test_accepts_better_cluster_status(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.health.return_value = as_future({ "status": "green", "relocating_shards": 0 @@ -2854,9 +2952,11 @@ async def test_accepts_better_cluster_status(self, opensearch): opensearch.cluster.health.assert_called_once_with(params={"wait_for_status": "yellow"}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_cluster_health_with_timeout_and_headers(self, opensearch): + async def test_cluster_health_with_timeout_and_headers(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.health.return_value = as_future({ "status": "green", "relocating_shards": 0 @@ -2887,9 +2987,11 @@ async def test_cluster_health_with_timeout_and_headers(self, opensearch): params={"wait_for_status": "yellow"}, request_timeout=3.0) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_rejects_relocating_shards(self, opensearch): + async def test_rejects_relocating_shards(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.health.return_value = as_future({ "status": "yellow", "relocating_shards": 3 @@ -2917,9 +3019,11 @@ async def test_rejects_relocating_shards(self, opensearch): opensearch.cluster.health.assert_called_once_with(index="logs-*", params={"wait_for_status": "red", "wait_for_no_relocating_shards": True}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_rejects_unknown_cluster_status(self, opensearch): + async def test_rejects_unknown_cluster_status(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.health.return_value = as_future({ "status": None, "relocating_shards": 0 @@ -2946,9 +3050,11 @@ async def test_rejects_unknown_cluster_status(self, opensearch): class CreateIndexRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_creates_multiple_indices(self, opensearch): + async def test_creates_multiple_indices(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.create.return_value = as_future() r = runner.CreateIndex() @@ -2978,9 +3084,11 @@ async def test_creates_multiple_indices(self, opensearch): mock.call(index="indexB", body={"settings": {}}, params=request_params) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_with_timeout_and_headers(self, opensearch): + async def test_create_with_timeout_and_headers(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.create.return_value = as_future() create_index_runner = runner.CreateIndex() @@ -3015,9 +3123,11 @@ async def test_create_with_timeout_and_headers(self, opensearch): params={"wait_for_active_shards": "true"}, request_timeout=3.0) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_ignore_invalid_params(self, opensearch): + async def test_ignore_invalid_params(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.create.return_value = as_future() r = runner.CreateIndex() @@ -3047,9 +3157,11 @@ async def test_ignore_invalid_params(self, opensearch): body={"settings": {}}, params={"wait_for_active_shards": "true"}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_indices_mandatory(self, opensearch): + async def test_param_indices_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.create.return_value = as_future() r = runner.CreateIndex() @@ -3064,9 +3176,11 @@ async def test_param_indices_mandatory(self, opensearch): class CreateDataStreamRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_creates_multiple_data_streams(self, opensearch): + async def test_creates_multiple_data_streams(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.create_data_stream.return_value = as_future() r = runner.CreateDataStream() @@ -3096,9 +3210,11 @@ async def test_creates_multiple_data_streams(self, opensearch): mock.call("data-stream-B", params=request_params) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_data_streams_mandatory(self, opensearch): + async def test_param_data_streams_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.create_data_stream.return_value = as_future() r = runner.CreateDataStream() @@ -3113,9 +3229,11 @@ async def test_param_data_streams_mandatory(self, opensearch): class DeleteIndexRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_existing_indices(self, opensearch): + async def test_deletes_existing_indices(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.exists.side_effect = [as_future(False), as_future(True)] opensearch.indices.delete.return_value = as_future() r = runner.DeleteIndex() @@ -3135,9 +3253,11 @@ async def test_deletes_existing_indices(self, opensearch): opensearch.indices.delete.assert_called_once_with(index="indexB", params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_all_indices(self, opensearch): + async def test_deletes_all_indices(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.delete.return_value = as_future() r = runner.DeleteIndex() @@ -3166,9 +3286,11 @@ async def test_deletes_all_indices(self, opensearch): class DeleteDataStreamRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_existing_data_streams(self, opensearch): + async def test_deletes_existing_data_streams(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.exists.side_effect = [as_future(False), as_future(True)] opensearch.indices.delete_data_stream.return_value = as_future() @@ -3190,9 +3312,11 @@ async def test_deletes_existing_data_streams(self, opensearch): opensearch.indices.delete_data_stream.assert_called_once_with("data-stream-B", params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_all_data_streams(self, opensearch): + async def test_deletes_all_data_streams(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.delete_data_stream.return_value = as_future() r = runner.DeleteDataStream() @@ -3222,9 +3346,11 @@ async def test_deletes_all_data_streams(self, opensearch): class CreateIndexTemplateRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_index_templates(self, opensearch): + async def test_create_index_templates(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.put_template.return_value = as_future() r = runner.CreateIndexTemplate() @@ -3253,9 +3379,11 @@ async def test_create_index_templates(self, opensearch): mock.call(name="templateB", body={"settings": {}}, params=params["request-params"]) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_templates_mandatory(self, opensearch): + async def test_param_templates_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.put_template.return_value = as_future() r = runner.CreateIndexTemplate() @@ -3270,9 +3398,11 @@ async def test_param_templates_mandatory(self, opensearch): class DeleteIndexTemplateRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_all_index_templates(self, opensearch): + async def test_deletes_all_index_templates(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.delete_template.return_value = as_future() opensearch.indices.delete.return_value = as_future() @@ -3302,9 +3432,11 @@ async def test_deletes_all_index_templates(self, opensearch): ]) opensearch.indices.delete.assert_called_once_with(index="logs-*") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_only_existing_index_templates(self, opensearch): + async def test_deletes_only_existing_index_templates(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.exists_template.side_effect = [as_future(False), as_future(True)] opensearch.indices.delete_template.return_value = as_future() @@ -3334,9 +3466,11 @@ async def test_deletes_only_existing_index_templates(self, opensearch): # not called because the matching index is empty. self.assertEqual(0, opensearch.indices.delete.call_count) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_templates_mandatory(self, opensearch): + async def test_param_templates_mandatory(self, opensearch, on_client_request_start, on_client_request_end): r = runner.DeleteIndexTemplate() params = {} @@ -3349,9 +3483,11 @@ async def test_param_templates_mandatory(self, opensearch): class CreateComponentTemplateRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_index_templates(self, opensearch): + async def test_create_index_templates(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.put_component_template.return_value = as_future() r = runner.CreateComponentTemplate() params = { @@ -3378,9 +3514,11 @@ async def test_create_index_templates(self, opensearch): params=params["request-params"]) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_templates_mandatory(self, opensearch): + async def test_param_templates_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.put_component_template.return_value = as_future() r = runner.CreateComponentTemplate() @@ -3395,9 +3533,11 @@ async def test_param_templates_mandatory(self, opensearch): class DeleteComponentTemplateRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_all_index_templates(self, opensearch): + async def test_deletes_all_index_templates(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.delete_component_template.return_value = as_future() opensearch.cluster.delete_component_template.return_value = as_future() @@ -3425,9 +3565,11 @@ async def test_deletes_all_index_templates(self, opensearch): mock.call(name="templateB", params=params["request-params"], ignore=[404]) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_only_existing_index_templates(self, opensearch): + async def test_deletes_only_existing_index_templates(self, opensearch, on_client_request_start, on_client_request_end): def _side_effect(http_method, path): if http_method == "HEAD": @@ -3459,9 +3601,11 @@ def _side_effect(http_method, path): opensearch.cluster.delete_component_template.assert_called_once_with(name="templateB", params=params["request-params"]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_templates_mandatory(self, opensearch): + async def test_param_templates_mandatory(self, opensearch, on_client_request_start, on_client_request_end): r = runner.DeleteComponentTemplate() params = {} @@ -3474,9 +3618,11 @@ async def test_param_templates_mandatory(self, opensearch): class CreateComposableTemplateRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_index_templates(self, opensearch): + async def test_create_index_templates(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.put_index_template.return_value = as_future() r = runner.CreateComposableTemplate() params = { @@ -3504,9 +3650,11 @@ async def test_create_index_templates(self, opensearch): "composed_of":["ct3","ct4"]}, params=params["request-params"]) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_templates_mandatory(self, opensearch): + async def test_param_templates_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.put_index_template.return_value = as_future() r = runner.CreateComposableTemplate() @@ -3521,9 +3669,11 @@ async def test_param_templates_mandatory(self, opensearch): class DeleteComposableTemplateRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_all_index_templates(self, opensearch): + async def test_deletes_all_index_templates(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.delete_index_template.return_value = as_future() opensearch.indices.delete.return_value = as_future() @@ -3554,9 +3704,11 @@ async def test_deletes_all_index_templates(self, opensearch): ]) opensearch.indices.delete.assert_called_once_with(index="logs-*") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_deletes_only_existing_index_templates(self, opensearch): + async def test_deletes_only_existing_index_templates(self, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.exists_template.side_effect = [as_future(False), as_future(True)] opensearch.indices.delete_index_template.return_value = as_future() @@ -3586,9 +3738,11 @@ async def test_deletes_only_existing_index_templates(self, opensearch): # not called because the matching index is empty. self.assertEqual(0, opensearch.indices.delete.call_count) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_templates_mandatory(self, opensearch): + async def test_param_templates_mandatory(self, opensearch, on_client_request_start, on_client_request_end): r = runner.DeleteComposableTemplate() params = {} @@ -3601,9 +3755,11 @@ async def test_param_templates_mandatory(self, opensearch): class RawRequestRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_raises_missing_slash(self, opensearch): + async def test_raises_missing_slash(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.RawRequest() @@ -3619,9 +3775,11 @@ async def test_raises_missing_slash(self, opensearch): mock.call("RawRequest failed. Path parameter: [%s] must begin with a '/'.", params["path"]) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_issue_request_with_defaults(self, opensearch): + async def test_issue_request_with_defaults(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.RawRequest() @@ -3636,9 +3794,11 @@ async def test_issue_request_with_defaults(self, opensearch): body=None, params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_issue_delete_index(self, opensearch): + async def test_issue_delete_index(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.RawRequest() @@ -3658,9 +3818,11 @@ async def test_issue_delete_index(self, opensearch): body=None, params={"ignore": [400, 404], "pretty": "true"}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_issue_create_index(self, opensearch): + async def test_issue_create_index(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.RawRequest() @@ -3689,9 +3851,11 @@ async def test_issue_create_index(self, opensearch): }, params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_issue_msearch(self, opensearch): + async def test_issue_msearch(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.RawRequest() @@ -3720,9 +3884,11 @@ async def test_issue_msearch(self, opensearch): ], params={}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_raw_with_timeout_and_opaqueid(self, opensearch): + async def test_raw_with_timeout_and_opaqueid(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.RawRequest() @@ -3787,9 +3953,11 @@ async def test_sleep(self, sleep, opensearch): class DeleteSnapshotRepositoryTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_delete_snapshot_repository(self, opensearch): + async def test_delete_snapshot_repository(self, opensearch, on_client_request_start, on_client_request_end): opensearch.snapshot.delete_repository.return_value = as_future() params = { "repository": "backups" @@ -3802,9 +3970,11 @@ async def test_delete_snapshot_repository(self, opensearch): class CreateSnapshotRepositoryTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_snapshot_repository(self, opensearch): + async def test_create_snapshot_repository(self, opensearch, on_client_request_start, on_client_request_end): opensearch.snapshot.create_repository.return_value = as_future() params = { "repository": "backups", @@ -3830,9 +4000,11 @@ async def test_create_snapshot_repository(self, opensearch): class CreateSnapshotTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_snapshot_no_wait(self, opensearch): + async def test_create_snapshot_no_wait(self, opensearch, on_client_request_start, on_client_request_end): opensearch.snapshot.create.return_value = as_future({}) params = { @@ -3858,9 +4030,11 @@ async def test_create_snapshot_no_wait(self, opensearch): params={"request_timeout": 7200}, wait_for_completion=False) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_snapshot_wait_for_completion(self, opensearch): + async def test_create_snapshot_wait_for_completion(self, opensearch, on_client_request_start, on_client_request_end): opensearch.snapshot.create.return_value = as_future({ "snapshot": { "snapshot": "snapshot-001", @@ -3911,9 +4085,11 @@ async def test_create_snapshot_wait_for_completion(self, opensearch): class WaitForSnapshotCreateTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_wait_for_snapshot_create_entire_lifecycle(self, opensearch): + async def test_wait_for_snapshot_create_entire_lifecycle(self, opensearch, on_client_request_start, on_client_request_end): opensearch.snapshot.status.side_effect = [ # empty response as_future({}), @@ -4017,9 +4193,11 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, opensearch): self.assertEqual(3, opensearch.snapshot.status.call_count) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_wait_for_snapshot_create_immediate_success(self, opensearch): + async def test_wait_for_snapshot_create_immediate_success(self, opensearch, on_client_request_start, on_client_request_end): opensearch.snapshot.status.return_value = as_future({ "snapshots": [ { @@ -4063,9 +4241,11 @@ async def test_wait_for_snapshot_create_immediate_success(self, opensearch): snapshot="snapshot-001", ignore_unavailable=True) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_wait_for_snapshot_create_failure(self, opensearch): + async def test_wait_for_snapshot_create_failure(self, opensearch, on_client_request_start, on_client_request_end): snapshot_status = { "snapshots": [ { @@ -4095,9 +4275,11 @@ async def test_wait_for_snapshot_create_failure(self, opensearch): class RestoreSnapshotTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_restore_snapshot(self, opensearch): + async def test_restore_snapshot(self, opensearch, on_client_request_start, on_client_request_end): opensearch.snapshot.restore.return_value = as_future() params = { @@ -4117,9 +4299,11 @@ async def test_restore_snapshot(self, opensearch): wait_for_completion=True, params={"request_timeout": 7200}) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_restore_snapshot_with_body(self, opensearch): + async def test_restore_snapshot_with_body(self, opensearch, on_client_request_start, on_client_request_end): opensearch.snapshot.restore.return_value = as_future() params = { "repository": "backups", @@ -4154,9 +4338,11 @@ async def test_restore_snapshot_with_body(self, opensearch): class IndicesRecoveryTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_waits_for_ongoing_indices_recovery(self, opensearch): + async def test_waits_for_ongoing_indices_recovery(self, opensearch, on_client_request_start, on_client_request_end): # empty response opensearch.indices.recovery.side_effect = [ # recovery did not yet start @@ -4302,11 +4488,13 @@ async def test_waits_for_ongoing_indices_recovery(self, opensearch): class ShrinkIndexTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") # To avoid real sleeps in unit tests @mock.patch("asyncio.sleep", return_value=as_future()) @run_async - async def test_shrink_index_with_shrink_node(self, sleep, opensearch): + async def test_shrink_index_with_shrink_node(self, sleep, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.get.return_value = as_future({ "src": {} }) @@ -4356,11 +4544,13 @@ async def test_shrink_index_with_shrink_node(self, sleep, opensearch): } }) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") # To avoid real sleeps in unit tests @mock.patch("asyncio.sleep", return_value=as_future()) @run_async - async def test_shrink_index_derives_shrink_node(self, sleep, opensearch): + async def test_shrink_index_derives_shrink_node(self, sleep, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.get.return_value = as_future({ "src": {} }) @@ -4440,11 +4630,13 @@ async def test_shrink_index_derives_shrink_node(self, sleep, opensearch): } }) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") # To avoid real sleeps in unit tests @mock.patch("asyncio.sleep", return_value=as_future()) @run_async - async def test_shrink_index_pattern_with_shrink_node(self, sleep, opensearch): + async def test_shrink_index_pattern_with_shrink_node(self, sleep, opensearch, on_client_request_start, on_client_request_end): opensearch.indices.get.return_value = as_future({ "src1": {}, "src2": {}, "src-2020": {} }) @@ -4537,9 +4729,11 @@ async def test_shrink_index_pattern_with_shrink_node(self, sleep, opensearch): class PutSettingsTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_put_settings(self, opensearch): + async def test_put_settings(self, opensearch, on_client_request_start, on_client_request_end): opensearch.cluster.put_settings.return_value = as_future() params = { "body": { @@ -4560,9 +4754,11 @@ async def test_put_settings(self, opensearch): class CreateTransformTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_transform(self, opensearch): + async def test_create_transform(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transform.put_transform.return_value = as_future() params = { @@ -4603,9 +4799,11 @@ async def test_create_transform(self, opensearch): class StartTransformTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_start_transform(self, opensearch): + async def test_start_transform(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transform.start_transform.return_value = as_future() transform_id = "a-transform" @@ -4621,9 +4819,11 @@ async def test_start_transform(self, opensearch): class WaitForTransformTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_wait_for_transform(self, opensearch): + async def test_wait_for_transform(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transform.stop_transform.return_value = as_future() transform_id = "a-transform" params = { @@ -4685,9 +4885,11 @@ async def test_wait_for_transform(self, opensearch): wait_for_checkpoint=params["wait-for-checkpoint"] ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_wait_for_transform_progress(self, opensearch): + async def test_wait_for_transform_progress(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transform.stop_transform.return_value = as_future() transform_id = "a-transform" params = { @@ -4869,9 +5071,11 @@ async def test_wait_for_transform_progress(self, opensearch): class DeleteTransformTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_delete_transform(self, opensearch): + async def test_delete_transform(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transform.delete_transform.return_value = as_future() transform_id = "a-transform" @@ -4888,9 +5092,11 @@ async def test_delete_transform(self, opensearch): class SubmitAsyncSearchTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_submit_async_search(self, opensearch): + async def test_submit_async_search(self, opensearch, on_client_request_start, on_client_request_end): opensearch.async_search.submit.return_value = as_future({"id": "12345"}) r = runner.SubmitAsyncSearch() params = { @@ -4916,9 +5122,11 @@ async def test_submit_async_search(self, opensearch): class GetAsyncSearchTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_get_async_search(self, opensearch): + async def test_get_async_search(self, opensearch, on_client_request_start, on_client_request_end): opensearch.async_search.get.return_value = as_future({ "is_running": False, "response": { @@ -4958,9 +5166,11 @@ async def test_get_async_search(self, opensearch): class DeleteAsyncSearchTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_delete_async_search(self, opensearch): + async def test_delete_async_search(self, opensearch, on_client_request_start, on_client_request_end): opensearch.async_search.delete.side_effect = [ as_future({}), as_future({}) @@ -4983,9 +5193,11 @@ async def test_delete_async_search(self, opensearch): class CreatePointInTimeTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_creates_point_in_time(self, opensearch): + async def test_creates_point_in_time(self, opensearch, on_client_request_start, on_client_request_end): pit_id = "0123456789abcdef" params = { "name": "open-pit-test", @@ -4999,9 +5211,11 @@ async def test_creates_point_in_time(self, opensearch): await r(opensearch, params) self.assertEqual(pit_id, runner.CompositeContext.get("open-pit-test")) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_can_only_be_run_in_composite(self, opensearch): + async def test_can_only_be_run_in_composite(self, opensearch, on_client_request_start, on_client_request_end): pit_id = "0123456789abcdef" params = { "name": "open-pit-test", @@ -5017,9 +5231,11 @@ async def test_can_only_be_run_in_composite(self, opensearch): self.assertEqual("This operation is only allowed inside a composite operation.", ctx.exception.args[0]) class DeletePointInTimeTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_delete_point_in_time(self, opensearch): + async def test_delete_point_in_time(self, opensearch, on_client_request_start, on_client_request_end): pit_id = "0123456789abcdef" params = { "name": "close-pit-test", @@ -5033,9 +5249,11 @@ async def test_delete_point_in_time(self, opensearch): opensearch.delete_point_in_time.assert_called_once_with(body={"pit_id": ["0123456789abcdef"]}, params={}, headers=None) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_delete_point_in_time_without_context(self, opensearch): + async def test_delete_point_in_time_without_context(self, opensearch, on_client_request_start, on_client_request_end): params = { "name": "close-pit-test", } @@ -5045,9 +5263,11 @@ async def test_delete_point_in_time_without_context(self, opensearch): opensearch.delete_point_in_time.assert_called_once_with(body=None, all=True, params={}, headers=None) class ListAllPointInTimeTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_get_all_point_in_time(self, opensearch): + async def test_get_all_point_in_time(self, opensearch, on_client_request_start, on_client_request_end): pit_id = "0123456789abcdef" params = {} opensearch.list_all_point_in_time.return_value = as_future({ @@ -5063,9 +5283,11 @@ async def test_get_all_point_in_time(self, opensearch): await r(opensearch, params) opensearch.list_all_point_in_time.assert_called_once() + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_get_all_point_in_time_in_composite(self, opensearch): + async def test_get_all_point_in_time_in_composite(self, opensearch, on_client_request_start, on_client_request_end): pit_id = "0123456789abcdef" params = {} opensearch.list_all_point_in_time.return_value = as_future({ @@ -5085,9 +5307,11 @@ async def test_get_all_point_in_time_in_composite(self, opensearch): class QueryWithSearchAfterScrollTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_search_after_with_pit(self, opensearch): + async def test_search_after_with_pit(self, opensearch, on_client_request_start, on_client_request_end): pit_op = "open-point-in-time1" pit_id = "0123456789abcdef" params = { @@ -5188,9 +5412,11 @@ async def test_search_after_with_pit(self, opensearch): }, headers=None)]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_search_after_without_pit(self, opensearch): + async def test_search_after_without_pit(self, opensearch, on_client_request_start, on_client_request_end): params = { "name": "search-with-pit", "operation-type": "paginated-search", @@ -5431,9 +5657,12 @@ def tearDown(self): runner.remove_runner("counter") runner.remove_runner("call-recorder") + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") + @mock.patch('osbenchmark.client.RequestContextHolder.new_request_context') @run_async - async def test_execute_multiple_streams(self, opensearch): + async def test_execute_multiple_streams(self, opensearch, on_client_request_start, on_client_request_end,new_request_context): opensearch.transport.perform_request.side_effect = [ # raw-request as_future(), @@ -5508,9 +5737,12 @@ async def test_execute_multiple_streams(self, opensearch): headers=None) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") + @mock.patch('osbenchmark.client.RequestContextHolder.new_request_context') @run_async - async def test_propagates_violated_assertions(self, opensearch): + async def test_propagates_violated_assertions(self, opensearch, on_client_request_start, on_client_request_end, new_request_context): opensearch.transport.perform_request.side_effect = [ # search as_future(io.StringIO(json.dumps({ @@ -5567,9 +5799,12 @@ async def test_propagates_violated_assertions(self, opensearch): headers=None) ]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") + @mock.patch('osbenchmark.client.RequestContextHolder.new_request_context') @run_async - async def test_executes_tasks_in_specified_order(self, opensearch): + async def test_executes_tasks_in_specified_order(self, opensearch, on_client_request_start, on_client_request_end, new_request_context): opensearch.transport.perform_request.return_value = as_future() params = { @@ -5696,9 +5931,11 @@ async def test_adds_request_timings(self): self.assertIn("request_end", timing) self.assertGreater(timing["request_end"], timing["request_start"]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_limits_connections(self, opensearch): + async def test_limits_connections(self, opensearch, on_client_request_start, on_client_request_end): params = { "max-connections": 2, "requests": [ @@ -5734,9 +5971,11 @@ async def test_limits_connections(self, opensearch): # composite runner should limit to two concurrent connections self.assertEqual(2, self.counter_runner.max_value) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_rejects_invalid_stream(self, opensearch): + async def test_rejects_invalid_stream(self, opensearch, on_client_request_start, on_client_request_end): # params contains a "streams" property (plural) but it should be "stream" (singular) params = { "max-connections": 2, @@ -5765,9 +6004,11 @@ async def test_rejects_invalid_stream(self, opensearch): self.assertEqual("Requests structure must contain [stream] or [operation-type].", ctx.exception.args[0]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_rejects_unsupported_operations(self, opensearch): + async def test_rejects_unsupported_operations(self, opensearch, on_client_request_start, on_client_request_end): params = { "requests": [ { @@ -5812,9 +6053,11 @@ def request_end(self): async def __aexit__(self, exc_type, exc_val, exc_tb): return False + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_merges_timing_info(self, opensearch): + async def test_merges_timing_info(self, opensearch, on_client_request_start, on_client_request_end): multi_cluster_client = {"default": opensearch} opensearch.new_request_context.return_value = RequestTimingTests.StaticRequestTiming(task_start=2) @@ -5845,9 +6088,11 @@ async def test_merges_timing_info(self, opensearch): delegate.assert_called_once_with(multi_cluster_client, params) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_creates_new_timing_info(self, opensearch): + async def test_creates_new_timing_info(self, opensearch, on_client_request_start, on_client_request_end): multi_cluster_client = {"default": opensearch} opensearch.new_request_context.return_value = RequestTimingTests.StaticRequestTiming(task_start=2) @@ -6156,9 +6401,11 @@ def test_prefix_doesnt_exit(self): class CreateSearchPipelineRunnerTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_create_search_pipeline(self, opensearch): + async def test_create_search_pipeline(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.CreateSearchPipeline() @@ -6194,9 +6441,11 @@ async def test_create_search_pipeline(self, opensearch): url='/_search/pipeline/test_pipeline', body=params["body"]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_body_mandatory(self, opensearch): + async def test_param_body_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.CreateSearchPipeline() @@ -6212,9 +6461,11 @@ async def test_param_body_mandatory(self, opensearch): self.assertEqual(0, opensearch.transport.perform_request.call_count) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_param_id_mandatory(self, opensearch): + async def test_param_id_mandatory(self, opensearch, on_client_request_start, on_client_request_end): opensearch.transport.perform_request.return_value = as_future() r = runner.CreateSearchPipeline() @@ -6228,4 +6479,4 @@ async def test_param_id_mandatory(self, opensearch): "Add it to your parameter source and try again."): await r(opensearch, params) - self.assertEqual(0, opensearch.transport.perform_request.call_count) + self.assertEqual(0, opensearch.transport.perform_request.call_count) \ No newline at end of file diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index bf7865288..86daf00cd 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -270,6 +270,9 @@ def throughput(self, absolute_time, relative_time, value): def service_time(self, absolute_time, relative_time, value): return self.request_metric(absolute_time, relative_time, "service_time", value) + + def client_processing_time(self, absolute_time, relative_time, value): + return self.request_metric(absolute_time, relative_time, "client_processing_time", value) def processing_time(self, absolute_time, relative_time, value): return self.request_metric(absolute_time, relative_time, "processing_time", value) @@ -300,17 +303,17 @@ def test_all_samples(self, metrics_store): samples = [ worker_coordinator.Sample( 0, 38598, 24, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2), + None, 0.01, 0.007, 0.0007, 0.009, None, 5000, "docs", 1, 1 / 2), worker_coordinator.Sample( 0, 38599, 25, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2), + None, 0.01, 0.007, 0.0007, 0.009, None, 5000, "docs", 2, 2 / 2), ] post_process(samples) calls = [ - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), - self.latency(38599, 25, 10.0), self.service_time(38599, 25, 7.0), self.processing_time(38599, 25, 9.0), + self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), + self.latency(38599, 25, 10.0), self.service_time(38599, 25, 7.0), self.client_processing_time(38599, 25, 0.7), self.processing_time(38599, 25, 9.0), self.throughput(38598, 24, 5000), self.throughput(38599, 25, 5000), ] @@ -328,17 +331,17 @@ def test_downsamples(self, metrics_store): samples = [ worker_coordinator.Sample( 0, 38598, 24, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2), + None, 0.01, 0.007, 0.0007, 0.009, None, 5000, "docs", 1, 1 / 2), worker_coordinator.Sample( 0, 38599, 25, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2), + None, 0.01, 0.007, 0.0007, 0.009, None, 5000, "docs", 2, 2 / 2), ] post_process(samples) calls = [ # only the first out of two request samples is included, throughput metrics are still complete - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), + self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), self.throughput(38598, 24, 5000), self.throughput(38599, 25, 5000), ] @@ -355,7 +358,7 @@ def test_dependent_samples(self, metrics_store): samples = [ worker_coordinator.Sample( 0, 38598, 24, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2, + None, 0.01, 0.007, 0.0007, 0.009, None, 5000, "docs", 1, 1 / 2, dependent_timing=[ { "absolute_time": 38601, @@ -377,7 +380,7 @@ def test_dependent_samples(self, metrics_store): post_process(samples) calls = [ - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), + self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), # dependent timings self.service_time(38601, 25, 50.0), self.service_time(38602, 26, 80.0), @@ -725,8 +728,8 @@ def test_different_sample_types(self): op = workload.Operation("index", workload.OperationType.Bulk, param_source="worker-coordinator-test-param-source") samples = [ - worker_coordinator.Sample(0, 1470838595, 21, 0, op, metrics.SampleType.Warmup, None, -1, -1, -1, None, 3000, "docs", 1, 1), - worker_coordinator.Sample(0, 1470838595.5, 21.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 2500, "docs", 1, 1), + worker_coordinator.Sample(0, 1470838595, 21, 0, op, metrics.SampleType.Warmup, None, -1, -1, -1, -1, None, 3000, "docs", 1, 1), + worker_coordinator.Sample(0, 1470838595.5, 21.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 2500, "docs", 1, 1), ] aggregated = self.calculate_global_throughput(samples) @@ -743,15 +746,15 @@ def test_single_metrics_aggregation(self): op = workload.Operation("index", workload.OperationType.Bulk, param_source="worker-coordinator-test-param-source") samples = [ - worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 1, 1 / 9), - worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 2, 2 / 9), - worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 3, 3 / 9), - worker_coordinator.Sample(0, 38598, 24, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 4, 4 / 9), - worker_coordinator.Sample(0, 38599, 25, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 5, 5 / 9), - worker_coordinator.Sample(0, 38600, 26, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 6, 6 / 9), - worker_coordinator.Sample(1, 38598.5, 24.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 4.5, 7 / 9), - worker_coordinator.Sample(1, 38599.5, 25.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 5.5, 8 / 9), - worker_coordinator.Sample(1, 38600.5, 26.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 6.5, 9 / 9) + worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 1, 1 / 9), + worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 2, 2 / 9), + worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 3, 3 / 9), + worker_coordinator.Sample(0, 38598, 24, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 4, 4 / 9), + worker_coordinator.Sample(0, 38599, 25, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 5, 5 / 9), + worker_coordinator.Sample(0, 38600, 26, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 6, 6 / 9), + worker_coordinator.Sample(1, 38598.5, 24.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 4.5, 7 / 9), + worker_coordinator.Sample(1, 38599.5, 25.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 5.5, 8 / 9), + worker_coordinator.Sample(1, 38600.5, 26.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 6.5, 9 / 9) ] aggregated = self.calculate_global_throughput(samples) @@ -774,9 +777,9 @@ def test_use_provided_throughput(self): param_source="worker-coordinator-test-param-source") samples = [ - worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 1, 1 / 3), - worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 2, 2 / 3), - worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 3, 3 / 3), + worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 1, 1 / 3), + worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 2, 2 / 3), + worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 3, 3 / 3), ] aggregated = self.calculate_global_throughput(samples) @@ -1103,6 +1106,14 @@ async def __aenter__(self): def request_start(self): return self.current_request_start + @property + def client_request_start(self): + return self.current_request_start - 0.0025 + + @property + def client_request_end(self): + return self.current_request_start + 0.0525 + @property def request_end(self): return self.current_request_start + 0.05 @@ -1149,9 +1160,11 @@ def setUp(self): runner.register_runner("unit-test-recovery", self.runner_with_progress, async_runner=True) runner.register_runner("override-throughput", self.runner_overriding_throughput, async_runner=True) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_execute_schedule_in_throughput_mode(self, opensearch): + async def test_execute_schedule_in_throughput_mode(self, opensearch, on_client_request_start, on_client_request_end): task_start = time.perf_counter() opensearch.new_request_context.return_value = AsyncExecutorTests.StaticRequestTiming(task_start=task_start) @@ -1327,15 +1340,19 @@ async def test_execute_schedule_runner_overrides_times(self, opensearch): self.assertIsNotNone(sample.service_time) self.assertIsNotNone(sample.time_period) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_execute_schedule_throughput_throttled(self, opensearch): + async def test_execute_schedule_throughput_throttled(self, opensearch, on_client_request_start, on_client_request_end): def perform_request(*args, **kwargs): return as_future() opensearch.init_request_context.return_value = { - "request_start": 0, - "request_end": 10 + "client_request_start": 0, + "request_start": 1, + "request_end": 11, + "client_request_end": 12 } # as this method is called several times we need to return a fresh instance every time as the previous # one has been "consumed". @@ -1394,8 +1411,10 @@ def perform_request(*args, **kwargs): @run_async async def test_cancel_execute_schedule(self, opensearch): opensearch.init_request_context.return_value = { - "request_start": 0, - "request_end": 10 + "client_request_start": 0, + "request_start": 1, + "request_end": 11, + "client_request_end": 12 } opensearch.bulk.return_value = as_future(io.StringIO('{"errors": false, "took": 8}')) From f912304bfe088e8936566798f81391d93f36c38a Mon Sep 17 00:00:00 2001 From: saimedhi Date: Mon, 29 Jan 2024 14:57:01 -0800 Subject: [PATCH 3/6] Added client processing time metric Signed-off-by: saimedhi --- osbenchmark/client.py | 4 +-- osbenchmark/metrics.py | 3 +- osbenchmark/worker_coordinator/runner.py | 20 ++++--------- .../worker_coordinator/worker_coordinator.py | 6 ++-- tests/worker_coordinator/runner_test.py | 10 +++++-- .../worker_coordinator_test.py | 29 ++++++++++++------- 6 files changed, 37 insertions(+), 35 deletions(-) diff --git a/osbenchmark/client.py b/osbenchmark/client.py index a35743d5e..1eb325765 100644 --- a/osbenchmark/client.py +++ b/osbenchmark/client.py @@ -56,7 +56,7 @@ def request_start(self): @property def request_end(self): return self.ctx["request_end"] - + @property def client_request_start(self): return self.ctx["client_request_start"] @@ -117,13 +117,11 @@ def update_request_end(cls, new_request_end): def update_client_request_start(cls, new_client_request_start): meta = cls.request_context.get() if "client_request_start" not in meta: - print("updated client_request_start", new_client_request_start) meta["client_request_start"] = new_client_request_start @classmethod def update_client_request_end(cls, new_client_request_end): meta = cls.request_context.get() - print("updated client_request_end", new_client_request_end) meta["client_request_end"] = new_client_request_end @classmethod diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index 70bb91a0d..fdca7d444 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -2038,7 +2038,8 @@ def op_metrics(op_item, key, single_value=False): def v(self, d, k, default=None): return d.get(k, default) if d else default - def add_op_metrics(self, task, operation, throughput, latency, service_time, client_processing_time, processing_time, error_rate, duration, meta): + def add_op_metrics(self, task, operation, throughput, latency, service_time, client_processing_time, + processing_time, error_rate, duration, meta): doc = { "task": task, "operation": operation, diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 34ea0053e..ea0ab2c8a 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -215,13 +215,9 @@ def _transport_request_params(self, params): def time_func(func): async def advised(*args, **kwargs): - st = time.perf_counter() - print("started", st) request_context_holder.on_client_request_start() rsl = await func(*args, **kwargs) request_context_holder.on_client_request_end() - en = time.perf_counter() - print("ended", en) return rsl return advised @@ -504,14 +500,14 @@ async def __call__(self, opensearch, params): if not detailed_results: opensearch.return_raw_response() request_context_holder.on_client_request_start() - + if with_action_metadata: api_kwargs.pop("index", None) # only half of the lines are documents response = await opensearch.bulk(params=bulk_params, **api_kwargs) else: response = await opensearch.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) - + request_context_holder.on_client_request_end() stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response) @@ -704,8 +700,8 @@ async def __call__(self, opensearch, params): # empty nodes response indicates no tasks complete = True else: - request_context_holder.on_client_request_start() - await opensearch.indices.forcemerge(**merge_params) + request_context_holder.on_client_request_start() + await opensearch.indices.forcemerge(**merge_params) request_context_holder.on_client_request_end() def __repr__(self, *args, **kwargs): @@ -1133,21 +1129,16 @@ def calculate_recall(predictions, neighbors, top_k): search_method = params.get("operation-type") if search_method == "paginated-search": - print("11") return await _search_after_query(opensearch, params) elif search_method == "scroll-search": - print("22") return await _scroll_query(opensearch, params) elif "pages" in params: - print("33") logging.getLogger(__name__).warning("Invoking a scroll search with the 'search' operation is deprecated " "and will be removed in a future release. Use 'scroll-search' instead.") return await _scroll_query(opensearch, params) elif search_method == "vector-search": - print("44") return await _vector_search_query_with_recall(opensearch, params) else: - print("55") return await _request_body_query(opensearch, params) async def _raw_search(self, opensearch, doc_type, index, body, params, headers=None): @@ -1246,7 +1237,7 @@ def status(v): else: # we're good with any count of relocating shards. expected_relocating_shards = sys.maxsize - + request_context_holder.on_client_request_start() result = await opensearch.cluster.health(**api_kw_params) request_context_holder.on_client_request_end() @@ -1301,7 +1292,6 @@ def __repr__(self, *args, **kwargs): class CreateIndex(Runner): async def __call__(self, opensearch, params): - print("CreateIndex __call__ printed") indices = mandatory(params, "indices", self) api_params = self._default_kw_params(params) ## ignore invalid entries rather than erroring diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index b6f1d8a85..8029a14fd 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -868,12 +868,13 @@ def __call__(self, raw_samples): sample_type=sample.sample_type, absolute_time=sample.absolute_time, relative_time=sample.relative_time, meta_data=meta_data) - self.metrics_store.put_value_cluster_level(name="client_processing_time", value=convert.seconds_to_ms(sample.client_processing_time), + self.metrics_store.put_value_cluster_level(name="client_processing_time", + value=convert.seconds_to_ms(sample.client_processing_time), unit="ms", task=sample.task.name, operation=sample.operation_name, operation_type=sample.operation_type, sample_type=sample.sample_type, absolute_time=sample.absolute_time, relative_time=sample.relative_time, meta_data=meta_data) - + self.metrics_store.put_value_cluster_level(name="processing_time", value=convert.seconds_to_ms(sample.processing_time), unit="ms", task=sample.task.name, operation=sample.operation_name, operation_type=sample.operation_type, @@ -1587,7 +1588,6 @@ async def __call__(self, *args, **kwargs): processing_start = time.perf_counter() self.schedule_handle.before_request(processing_start) async with self.opensearch["default"].new_request_context() as request_context: - print("runner", runner) total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error) request_start = request_context.request_start request_end = request_context.request_end diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 4e4f2dd3c..16fdd2730 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -497,7 +497,9 @@ async def test_bulk_index_success_without_metadata_with_doc_type(self, opensearc @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_bulk_index_success_without_metadata_and_without_doc_type(self, opensearch, on_client_request_start, on_client_request_end): + async def test_bulk_index_success_without_metadata_and_without_doc_type(self, opensearch, + on_client_request_start, + on_client_request_end): bulk_response = { "errors": False, "took": 8 @@ -1052,7 +1054,9 @@ async def test_simple_bulk_with_detailed_stats_body_as_list(self, opensearch, on @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, opensearch, on_client_request_start, on_client_request_end): + async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, opensearch, + on_client_request_start, + on_client_request_end): opensearch.bulk.return_value = as_future({ "took": 30, "ingest_took": 20, @@ -6479,4 +6483,4 @@ async def test_param_id_mandatory(self, opensearch, on_client_request_start, on_ "Add it to your parameter source and try again."): await r(opensearch, params) - self.assertEqual(0, opensearch.transport.perform_request.call_count) \ No newline at end of file + self.assertEqual(0, opensearch.transport.perform_request.call_count) diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index 86daf00cd..ef89e4ee1 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -270,7 +270,7 @@ def throughput(self, absolute_time, relative_time, value): def service_time(self, absolute_time, relative_time, value): return self.request_metric(absolute_time, relative_time, "service_time", value) - + def client_processing_time(self, absolute_time, relative_time, value): return self.request_metric(absolute_time, relative_time, "client_processing_time", value) @@ -312,8 +312,10 @@ def test_all_samples(self, metrics_store): post_process(samples) calls = [ - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), - self.latency(38599, 25, 10.0), self.service_time(38599, 25, 7.0), self.client_processing_time(38599, 25, 0.7), self.processing_time(38599, 25, 9.0), + self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), + self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), + self.latency(38599, 25, 10.0), self.service_time(38599, 25, 7.0), + self.client_processing_time(38599, 25, 0.7), self.processing_time(38599, 25, 9.0), self.throughput(38598, 24, 5000), self.throughput(38599, 25, 5000), ] @@ -341,7 +343,8 @@ def test_downsamples(self, metrics_store): calls = [ # only the first out of two request samples is included, throughput metrics are still complete - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), + self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), + self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), self.throughput(38598, 24, 5000), self.throughput(38599, 25, 5000), ] @@ -380,7 +383,8 @@ def test_dependent_samples(self, metrics_store): post_process(samples) calls = [ - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), + self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), + self.client_processing_time(38598, 24, 0.7), self.processing_time(38598, 24, 9.0), # dependent timings self.service_time(38601, 25, 50.0), self.service_time(38602, 26, 80.0), @@ -728,8 +732,10 @@ def test_different_sample_types(self): op = workload.Operation("index", workload.OperationType.Bulk, param_source="worker-coordinator-test-param-source") samples = [ - worker_coordinator.Sample(0, 1470838595, 21, 0, op, metrics.SampleType.Warmup, None, -1, -1, -1, -1, None, 3000, "docs", 1, 1), - worker_coordinator.Sample(0, 1470838595.5, 21.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 2500, "docs", 1, 1), + worker_coordinator.Sample(0, 1470838595, 21, 0, op, metrics.SampleType.Warmup, + None, -1, -1, -1, -1, None, 3000, "docs", 1, 1), + worker_coordinator.Sample(0, 1470838595.5, 21.5, 0, op, metrics.SampleType.Normal, + None, -1, -1, -1, -1, None, 2500, "docs", 1, 1), ] aggregated = self.calculate_global_throughput(samples) @@ -752,9 +758,12 @@ def test_single_metrics_aggregation(self): worker_coordinator.Sample(0, 38598, 24, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 4, 4 / 9), worker_coordinator.Sample(0, 38599, 25, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 5, 5 / 9), worker_coordinator.Sample(0, 38600, 26, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 6, 6 / 9), - worker_coordinator.Sample(1, 38598.5, 24.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 4.5, 7 / 9), - worker_coordinator.Sample(1, 38599.5, 25.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 5.5, 8 / 9), - worker_coordinator.Sample(1, 38600.5, 26.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 6.5, 9 / 9) + worker_coordinator.Sample(1, 38598.5, 24.5, 0, op, metrics.SampleType.Normal, + None, -1, -1, -1, -1, None, 5000, "docs", 4.5, 7 / 9), + worker_coordinator.Sample(1, 38599.5, 25.5, 0, op, metrics.SampleType.Normal, + None, -1, -1, -1, -1, None, 5000, "docs", 5.5, 8 / 9), + worker_coordinator.Sample(1, 38600.5, 26.5, 0, op, metrics.SampleType.Normal, + None, -1, -1, -1, -1, None, 5000, "docs", 6.5, 9 / 9) ] aggregated = self.calculate_global_throughput(samples) From 24c66d8098cfb7cc4b3e639121908f4a106b33b2 Mon Sep 17 00:00:00 2001 From: saimedhi Date: Wed, 31 Jan 2024 09:23:42 -0800 Subject: [PATCH 4/6] Added client processing time metric Signed-off-by: saimedhi --- osbenchmark/worker_coordinator/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index ea0ab2c8a..618038b75 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -216,9 +216,9 @@ def _transport_request_params(self, params): def time_func(func): async def advised(*args, **kwargs): request_context_holder.on_client_request_start() - rsl = await func(*args, **kwargs) + response = await func(*args, **kwargs) request_context_holder.on_client_request_end() - return rsl + return response return advised From 777d2b25efb3151a2f332333c0b4dc025d88ef1c Mon Sep 17 00:00:00 2001 From: saimedhi Date: Wed, 31 Jan 2024 13:40:23 -0800 Subject: [PATCH 5/6] Added client processing time metric Signed-off-by: saimedhi --- osbenchmark/client.py | 12 +++++++----- osbenchmark/worker_coordinator/runner.py | 1 - 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/osbenchmark/client.py b/osbenchmark/client.py index 1eb325765..448ba360a 100644 --- a/osbenchmark/client.py +++ b/osbenchmark/client.py @@ -55,7 +55,7 @@ def request_start(self): @property def request_end(self): - return self.ctx["request_end"] + return max((value for value in self.ctx["request_end_list"] if value < self.client_request_end)) @property def client_request_start(self): @@ -67,10 +67,10 @@ def client_request_end(self): async def __aexit__(self, exc_type, exc_val, exc_tb): # propagate earliest request start and most recent request end to parent - request_start = self.request_start - request_end = self.request_end client_request_start = self.client_request_start client_request_end = self.client_request_end + request_start = self.request_start + request_end = self.request_end self.ctx_holder.restore_context(self.token) # don't attempt to restore these values on the top-level context as they don't exist if self.token.old_value != contextvars.Token.MISSING: @@ -105,13 +105,15 @@ def restore_context(cls, token): def update_request_start(cls, new_request_start): meta = cls.request_context.get() # this can happen if multiple requests are sent on the wire for one logical request (e.g. scrolls) - if "request_start" not in meta: + if "request_start" not in meta and "client_request_start" in meta: meta["request_start"] = new_request_start @classmethod def update_request_end(cls, new_request_end): meta = cls.request_context.get() - meta["request_end"] = new_request_end + if "request_end_list" not in meta: + meta["request_end_list"] = [] + meta["request_end_list"].append(new_request_end) @classmethod def update_client_request_start(cls, new_client_request_start): diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 618038b75..4d77f4898 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -1330,7 +1330,6 @@ def __repr__(self, *args, **kwargs): class DeleteIndex(Runner): - @time_func async def __call__(self, opensearch, params): ops = 0 From 9c71618630e117c062d9035ceabaf01692f37191 Mon Sep 17 00:00:00 2001 From: saimedhi Date: Wed, 31 Jan 2024 14:51:37 -0800 Subject: [PATCH 6/6] Added client processing time metric Signed-off-by: saimedhi --- osbenchmark/worker_coordinator/runner.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 4d77f4898..33501c99a 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -1330,6 +1330,7 @@ def __repr__(self, *args, **kwargs): class DeleteIndex(Runner): + @time_func async def __call__(self, opensearch, params): ops = 0 @@ -1339,15 +1340,11 @@ async def __call__(self, opensearch, params): for index_name in indices: if not only_if_exists: - request_context_holder.on_client_request_start() await opensearch.indices.delete(index=index_name, params=request_params) - request_context_holder.on_client_request_end() ops += 1 elif only_if_exists and await opensearch.indices.exists(index=index_name): self.logger.info("Index [%s] already exists. Deleting it.", index_name) - request_context_holder.on_client_request_start() await opensearch.indices.delete(index=index_name, params=request_params) - request_context_holder.on_client_request_end() ops += 1 return {