Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for opensearch-py client connections urllib3httpconnection, requestshttpconnection #445

Closed
wants to merge 3 commits into from

Conversation

saimedhi
Copy link
Contributor

Description

Added support for opensearch-py client connections urllib3httpconnection, requestshttpconnection.

Issues Resolved

#437

Follow up Issues

Document how to use urllib3httpconnection, requestshttpconnection client connections.

Usage examples:

  • below connection_class in --client-options is case insensitive.
    opensearch-benchmark execute-test --distribution-version=2.11.1 --workload=geonames --test-mode --kill-running-processes --client-options "connection_class:urllib3"

  • For urllib3httpconnection: both options (urllib3, urllib3httpconnection) will work. Case insensitive

  • For requestshttpconnection: both options (requests, requestshttpconnection) will work. Case insensitive


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

connection_class = osbenchmark.sync_connection.Urllib3HttpConnection
else:
connection_class = osbenchmark.sync_connection.Urllib3HttpConnection

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessarily complicated, and should raise an error if the user specified an invalid class.

connection_class = self.client_options.get("connection_class", "urllib3")
if connection_class == "requests":
    connection_class = osbenchmark.sync_connection.RequestsHttpConnection
elif connection_class == "..."
   ...
else:
    throw ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessarily complicated, and should raise an error if the user specified an invalid class.

connection_class = self.client_options.get("connection_class", "urllib3")
if connection_class == "requests":
    connection_class = osbenchmark.sync_connection.RequestsHttpConnection
elif connection_class == "..."
   ...
else:
    throw ...
  • Error handling is done here
  • I will simplify the code

Copy link
Collaborator

@gkamat gkamat Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @dblock Since Python does not have a switch or case statement, at least pre-3.10, you can use a structured if-elif block or perhaps a map. Also, a default arm with error handling is good practice, even if there is error handling elsewhere, since a reader of this section might not be aware of that.

@@ -1449,7 +1449,17 @@ async def run(self):
def os_clients(all_hosts, all_client_options):
opensearch = {}
for cluster_name, cluster_hosts in all_hosts.items():
opensearch[cluster_name] = client.OsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async()
if "connection_class" in all_client_options["default"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, collapse the nested if/else.

if asyncio.iscoroutinefunction(s.transport.close):
await s.transport.close()
else:
s.transport.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally there should be a sync and an async worker coordinator that have a close method, or a transport class that wraps the async version so that you can change this code to just s.transport.close().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be refactored in a future check-in.

connection_class = osbenchmark.sync_connection.Urllib3HttpConnection
else:
connection_class = osbenchmark.sync_connection.Urllib3HttpConnection

Copy link
Collaborator

@gkamat gkamat Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @dblock Since Python does not have a switch or case statement, at least pre-3.10, you can use a structured if-elif block or perhaps a map. Also, a default arm with error handling is good practice, even if there is error handling elsewhere, since a reader of this section might not be aware of that.

if "amazon_aws_log_in" not in self.client_options:
return opensearchpy.OpenSearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options)
return self.BenchmarkOpenSearch(hosts=self.hosts, ssl_context=self.ssl_context,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would suggest renaming this to BenchmarkSyncOpenSearch to match the other class. Likewise with create() vis-a-vis create_async(). Please add comments prior to each indicating the presence of the other.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you might. consider renaming both classes to be more descriptive, something like PythonSyncClient and PythonAsyncClient, so future clients can be named accordingly.

if asyncio.iscoroutinefunction(s.transport.close):
await s.transport.close()
else:
s.transport.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be refactored in a future check-in.

Comment on lines +42 to +45
request_context_holder.on_request_start()
status, headers, raw_data = super().perform_request(method=method, url=url, params=params, body=body, timeout=timeout,
allow_redirects=allow_redirects, ignore=ignore, headers=headers)
request_context_holder.on_request_end()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no error handling here. If the request fails, the on_request_end callback will not be executed, leading to incorrect metrics in the command dispatcher, or possibly an error there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this may be the wrong location to insert these calls, if they are intended to measure service time. They should be at the lowest level possible just prior to the HTTP socket send call. The aiohttp library has callbacks, but the others might not. In that case, the calls will need to be somewhere within opensearchpy, not here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dblock, any ideas on how we can get the exact timing? Should we make changes in opensearch-py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the issue that @gkamat is bringing up, the calling code and the implementation should become a with block / __enter__ and __exit__ so that on_request_end is always called, even on error.

On the second issue of lowest level I think we are trying to measure "client overhead" that begins "around" the client. I also do think it would be good to get lower level HTTPs library metrics for pure transport. Finally, the entire client will become swappable, so keeping the code here makes sense to me.

@gkamat WDYT?

Copy link
Collaborator

@gkamat gkamat Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dblock, yes, either a context manager or exception handling will be an appropriate mechanism to handle the issue.

Regarding the second point, I believe those calls were added by @saimedhi to measure service time, which reflects the time the request is placed on the wire and received from the network (as accurately as is feasible). The aiohttp client provides trace event callbacks that make this possible. The requests library has a response hook here, but couldn't find a request callback. Have not looked at urllib3.

You are correct about the client processing time in that it measures latency around the client and that location above would be the correct entry point to measure this metric. But I believe it is the other metric that is being captured here and the difference between the two would be pure client overhead.

Comment on lines +83 to +86
request_context_holder.on_request_start()
status, headers, raw_data = super().perform_request(method=method, url=url, params=params, body=body,
timeout=timeout, ignore=ignore, headers=headers)
request_context_holder.on_request_end()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Comment on lines +493 to +506
if "BenchmarkOpenSearch" in str(opensearch):
if with_action_metadata:
api_kwargs.pop("index", None)
# only half of the lines are documents
response = opensearch.bulk(params=bulk_params, **api_kwargs)
else:
response = opensearch.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)
else:
response = await opensearch.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)

if with_action_metadata:
api_kwargs.pop("index", None)
# only half of the lines are documents
response = await opensearch.bulk(params=bulk_params, **api_kwargs)
else:
response = await opensearch.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)
Copy link
Collaborator

@gkamat gkamat Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of code duplication in this file that is likely not necessary. It should be possible to indirect the call to the client API via a routine that calls the desired sync or async function by switching on the predicate. The rest of the code can stay the same. Alternatively, a ternary operator is perhaps a better option.

Copy link
Contributor Author

@saimedhi saimedhi Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gkamat, can we do it as below?

  • Adding await conditionally on each line where opensearch call is made. This prevents code duplication.
if with_action_metadata:
  api_kwargs.pop("index", None)
  response = opensearch.bulk(params=bulk_params, **api_kwargs) if "BenchmarkOpenSearch" in str(opensearch) else await opensearch.bulk(params=bulk_params, **api_kwargs)
else:
    response = opensearch.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) if "BenchmarkOpenSearch" in str(opensearch) else await opensearch.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the ternary operator referred to above. It will be more compact to encapsulate the predicate within a function. Also, it is probably cleaner to define the classes at the top level in client.py, import them and test directly, rather than using str():

opensearch.__class__ == PythonSyncClient

or something like that.

@@ -1449,7 +1449,17 @@ async def run(self):
def os_clients(all_hosts, all_client_options):
opensearch = {}
for cluster_name, cluster_hosts in all_hosts.items():
opensearch[cluster_name] = client.OsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async()
if "connection_class" in all_client_options["default"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider changing the user flag to backend-client or client, since this flag will continue to be used for future clients like a Rust client, C++ client, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename the user flag to backend-client

@saimedhi
Copy link
Contributor Author

saimedhi commented Apr 5, 2024

Closing this PR. I will resubmit it once opensearch-py has service time metrics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

4 participants