Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[META] Protobuf for Search API #10684

Open
4 tasks
VachaShah opened this issue Oct 18, 2023 · 22 comments
Open
4 tasks

[META] Protobuf for Search API #10684

VachaShah opened this issue Oct 18, 2023 · 22 comments
Labels
Build Libraries & Interfaces enhancement Enhancement or improvement to existing feature or request Meta Meta issue, not directly linked to a PR Roadmap:Cost/Performance/Scale Project-wide roadmap label

Comments

@VachaShah
Copy link
Collaborator

VachaShah commented Oct 18, 2023

Proposal

With the experiment done for using protobuf in _cat/nodes API (see #6844 (comment)) and a 15-30% improvement depending on the size of the cluster, we can assess the benefits of protobuf for Search API in terms of serialization and de-serialization of the requests and responses in between nodes and at the REST level.

Next Steps

Do a similar experiment for the Search API to understand the performance improvements using protobuf. (This is a work in progress)

Details

In order to experiment and see incremental benefits, some of the classes I am targeting to convert to protobuf: SearchRequest, SearchResponse, SearchPhaseResult, FetchSearchResult, ShardFetchRequest, ShardSearchRequest, QueryFetchSearchResult, QuerySearchRequest, QuerySearchResult. Also, classes related to TransportSearchAction are required to support protobuf requests and responses.

Code changes

The code changes for the experiment are being added in a branch on my fork: https://github.com/VachaShah/OpenSearch/compare/poc-cat-nodes-protobuf...VachaShah:OpenSearch:poc-search-protobuf?expand=1

Sub tasks/Milestones

Next steps as discussed with @msfroh and @getsaurabh02. Let me know if I missed something here.

  • Convert QuerySearchResult, FetchSearchResult and QueryFetchSearchResult into a proto message and add support to communicate this response between nodes using protobuf bytes. This includes adding support for protobuf in OutboundHandler and InboundHandler. Deliver this as an experimental feature. There will be a feature flag which if enabled will allow node-to-node communication using protobuf.
  • Add support for aggregations (in QuerySearchResult), NestedIdentity (in SearchHit) and other optional fields in SearchHit as protobuf messages.
  • Repeat the same for other search results and requests.

Note: All of the above changes go behind an experimental feature flag. Once the incremental changes are in for the Search API:

  • Add backwards compatibility for the node-to-node communication protocols with version checks during node-to-node communication and remove the experimental feature flag.

Related

@VachaShah VachaShah added enhancement Enhancement or improvement to existing feature or request untriaged labels Oct 18, 2023
@VachaShah VachaShah self-assigned this Oct 18, 2023
@navneet1v
Copy link
Contributor

would like to do +1 on this feature. This will be great win specially for vector search where search payload increases with increase in dimension of the vectors.

@saratvemulapalli
Copy link
Member

would like to do +1 on this feature. This will be great win specially for vector search where search payload increases with increase in dimension of the vectors.

@navneet1v is there data we could look where the bottleneck is for vector search?

@navneet1v
Copy link
Contributor

@saratvemulapalli I don't have data available, because while running the benchmarks we remove the _source field. But what I can do is I can run a small benchmarks and provide you that info. Will that work?

@saratvemulapalli
Copy link
Member

@saratvemulapalli I don't have data available, because while running the benchmarks we remove the _source field. But what I can do is I can run a small benchmarks and provide you that info. Will that work?

Yeah what we'd like to know is serialization/de-serialization causing performance latency, and is that during query phase or fetch phase etc. This will help us narrow down which area to work on first.
The idea so far (theoretically) is we will work on fetch phase where the responses are serialized on data nodes and de-serialized on co-ordinator which will have the best benefits for protobuf.

@navneet1v
Copy link
Contributor

For vector search it will be fetch phase only, because that is where a vector having like lets say 100 dimension is getting serialized and de-serialized per document. Consider like every float is represented as 4bytes it becomes like 400 bytes just alone for vectors.

and the same gets transported over the wire to customers too. Hence once we start to make the change it will help a lot.

The idea so far (theoretically) is we will work on fetch phase where the responses are serialized on data nodes and de-serialized on co-ordinator which will have the best benefits for protobuf.

+1 on this.

From benchmarks I will see what I can provide, but I can surely help setup the benchmark code so that you guys can run of different custom OpenSearch to get more numbers if you want.

@VachaShah
Copy link
Collaborator Author

That would be super helpful @navneet1v!

@navneet1v
Copy link
Contributor

@saratvemulapalli , @VachaShah Here is one of the benchmarking notebook which you can use, to test k-NN with any OpenSearch cluster with Security Enabled.

https://github.com/navneet1v/OpenSearchVectorDB/blob/main/benchmarking/sift-128/sift-128-benchmarking.ipynb

This is something I was working on. This replicates the behavior on how we do perf testing in K-NN. Good thing about this is its is easy to run.

import numpy as np
from tqdm.notebook import tqdm

# search in the index
def searchQueryGen(input_array=X_TEST):
    for i, vec in enumerate(input_array):
        yield {
            "_source": False, # Don't get the source as this impacts latency
            "size": 100,
            "query": {
                "knn": {
                    "vec": {
                        "vector": vec.tolist(),
                        "k": 100
                    }
                }
            }
        }


neighbors_lists = []
search_latency = []
took_time = []
for query in tqdm(searchQueryGen(), total=len(X_TEST)):
    start = time.time()
    search_response = client.search(body=query, index=vector_index_name, _source=False, docvalue_fields=["_id"], stored_fields="_none_")
    end = time.time()
    search_latency.append(end - start)
    took_time.append(search_response["took"])
    search_hits = search_response['hits']['hits']
    search_neighbors = [int(hit["fields"]["_id"][0]) for hit in search_hits]
    neighbors_lists.append(search_neighbors)

You can remove all the optimization that we have added:

  1. not getting _source for documents.
  2. not getting stored fields.
  3. Getting _id as doc value rather than from stored field.

Please let me know if you need any more details happy to help.

@VachaShah VachaShah moved this from Todo to In Progress in Performance Roadmap Oct 30, 2023
@getsaurabh02 getsaurabh02 added the v2.12.0 Issues and PRs related to version 2.12.0 label Oct 30, 2023
@getsaurabh02
Copy link
Member

This is super exciting @VachaShah!!
Would you like to share insights on what Transport classes are good candidates for trying out and experimenting the improvements first. Do you think instrumenting the node-to-node interaction for Query and Fetch phases, especially targeting implementations for SearchPhaseResult could be a good idea?

This could include heavily exercised code paths during query executions such as QuerySearchResult, FetchSearchResult, ScrollQuerySearchResult and more.

@VachaShah
Copy link
Collaborator Author

VachaShah commented Nov 1, 2023

Thank you @getsaurabh02! I have added the classes that I am targeting first in the issue description. They include the Query and Fetch phase implementations of SearchPhaseResult and other related request, response and transport action classes.

@VachaShah
Copy link
Collaborator Author

The code for end-to-end working POC with _search_protobuf API which is a version of _search API with requests, responses and node-to-node communication using protobuf is built on top of _cat/nodes API POC. The code can be found in this diff: https://github.com/VachaShah/OpenSearch/compare/poc-cat-nodes-protobuf...VachaShah:OpenSearch:poc-search-protobuf?expand=1.

The current POC is for QUERY_THEN_FETCH search types with some embedded objects in the Response as bytes. The next step is to convert those embedded objects into proto messages as well.

I am going to micro benchmarking the protobuf integrated API to compare with the original search API.

Next steps

  • Convert the embedded objects in responses to proto messages.
  • Micro benchmarking for numbers.

@hdhalter
Copy link

@VachaShah - Will this require documentation? If so, can you please create a doc issue, let me know who will be submitting the doc PR, and add it to the unified tracker project? Thank you!

@VachaShah
Copy link
Collaborator Author

@VachaShah - Will this require documentation? If so, can you please create a doc issue, let me know who will be submitting the doc PR, and add it to the unified tracker project? Thank you!

@hdhalter This does not require documentation as of now since it is a performance improvement. I am going to divide this meta issue into sub tasks and if any of them have a need for documentation, I will make sure to create a doc issue for those.

@VachaShah
Copy link
Collaborator Author

In order to divide this issue into deliverables, converting this issue into a meta issue. Sub tasks will be listed in the issue description.

@VachaShah VachaShah changed the title [Proposal] Protobuf for Search API [META] Protobuf for Search API Dec 14, 2023
@VachaShah VachaShah added the Meta Meta issue, not directly linked to a PR label Dec 14, 2023
@saratvemulapalli
Copy link
Member

Thanks @VachaShah for breaking it down.
Tagging @dbwiddis who is interested in contributing.

@VachaShah
Copy link
Collaborator Author

VachaShah commented Jan 3, 2024

Benchmarks

The benchmarks are taken using opensearch-benchmark for both the original search API and protobuf version of the API for default searches in benchmarks. The workload used is nyc_taxis.

5 nodes cluster

Seeing a 19.1% decrease in latency for default search with protobuf integration.

Original API

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00248333 min
Min cumulative indexing time across primary shards 0.00248333 min
Median cumulative indexing time across primary shards 0.00248333 min
Max cumulative indexing time across primary shards 0.00248333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00153333 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00153333 min
Median cumulative refresh time across primary shards 0.00153333 min
Max cumulative refresh time across primary shards 0.00153333 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.008 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000252848 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 26349.6 docs/s
Mean Throughput index 26349.6 docs/s
Median Throughput index 26349.6 docs/s
Max Throughput index 26349.6 docs/s
50th percentile latency index 31.8154 ms
100th percentile latency index 38.6998 ms
50th percentile service time index 31.8154 ms
100th percentile service time index 38.6998 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 163.69 ops/s
Mean Throughput wait-until-merges-finish 163.69 ops/s
Median Throughput wait-until-merges-finish 163.69 ops/s
Max Throughput wait-until-merges-finish 163.69 ops/s
100th percentile latency wait-until-merges-finish 5.62406 ms
100th percentile service time wait-until-merges-finish 5.62406 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 83.27 ops/s
Mean Throughput default 83.27 ops/s
Median Throughput default 83.27 ops/s
Max Throughput default 83.27 ops/s
100th percentile latency default 17.6042 ms
100th percentile service time default 5.37617 ms
error rate default 0 %
Min Throughput range 112.38 ops/s
Mean Throughput range 112.38 ops/s
Median Throughput range 112.38 ops/s
Max Throughput range 112.38 ops/s
100th percentile latency range 14.9054 ms
100th percentile service time range 5.7996 ms
error rate range 0 %

API with Protobuf

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00171667 min
Min cumulative indexing time across primary shards 0.00171667 min
Median cumulative indexing time across primary shards 0.00171667 min
Max cumulative indexing time across primary shards 0.00171667 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.001 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.001 min
Median cumulative refresh time across primary shards 0.001 min
Max cumulative refresh time across primary shards 0.001 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.004 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000253422 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 31452.4 docs/s
Mean Throughput index 31452.4 docs/s
Median Throughput index 31452.4 docs/s
Max Throughput index 31452.4 docs/s
50th percentile latency index 25.6299 ms
100th percentile latency index 26.2214 ms
50th percentile service time index 25.6299 ms
100th percentile service time index 26.2214 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 292.51 ops/s
Mean Throughput wait-until-merges-finish 292.51 ops/s
Median Throughput wait-until-merges-finish 292.51 ops/s
Max Throughput wait-until-merges-finish 292.51 ops/s
100th percentile latency wait-until-merges-finish 3.01014 ms
100th percentile service time wait-until-merges-finish 3.01014 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 110.58 ops/s
Mean Throughput default 110.58 ops/s
Median Throughput default 110.58 ops/s
Max Throughput default 110.58 ops/s
100th percentile latency default 14.2405 ms
100th percentile service time default 3.97145 ms
error rate default 0 %
Min Throughput range 142.97 ops/s
Mean Throughput range 142.97 ops/s
Median Throughput range 142.97 ops/s
Max Throughput range 142.97 ops/s
100th percentile latency range 11.715 ms
100th percentile service time range 4.50792 ms
error rate range 0 %

10 nodes cluster

Seeing a 23.03% decrease in latency for default search with protobuf integration.

Original API

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00198333 min
Min cumulative indexing time across primary shards 0.00198333 min
Median cumulative indexing time across primary shards 0.00198333 min
Max cumulative indexing time across primary shards 0.00198333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00126667 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00126667 min
Median cumulative refresh time across primary shards 0.00126667 min
Max cumulative refresh time across primary shards 0.00126667 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.007 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000223138 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 5
Min Throughput index 24834.7 docs/s
Mean Throughput index 24834.7 docs/s
Median Throughput index 24834.7 docs/s
Max Throughput index 24834.7 docs/s
50th percentile latency index 41.3685 ms
100th percentile latency index 45.1308 ms
50th percentile service time index 41.3685 ms
100th percentile service time index 45.1308 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 167.27 ops/s
Mean Throughput wait-until-merges-finish 167.27 ops/s
Median Throughput wait-until-merges-finish 167.27 ops/s
Max Throughput wait-until-merges-finish 167.27 ops/s
100th percentile latency wait-until-merges-finish 5.51836 ms
100th percentile service time wait-until-merges-finish 5.51836 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 90.29 ops/s
Mean Throughput default 90.29 ops/s
Median Throughput default 90.29 ops/s
Max Throughput default 90.29 ops/s
100th percentile latency default 18.8456 ms
100th percentile service time default 6.55805 ms
error rate default 0 %
Min Throughput range 100.27 ops/s
Mean Throughput range 100.27 ops/s
Median Throughput range 100.27 ops/s
Max Throughput range 100.27 ops/s
100th percentile latency range 19.1147 ms
100th percentile service time range 8.94176 ms
error rate range 0 %

API with protobuf

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00223333 min
Min cumulative indexing time across primary shards 0.00223333 min
Median cumulative indexing time across primary shards 0.00223333 min
Max cumulative indexing time across primary shards 0.00223333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00111667 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00111667 min
Median cumulative refresh time across primary shards 0.00111667 min
Max cumulative refresh time across primary shards 0.00111667 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0 s
Total Young Gen GC count 0
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000251911 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 27541 docs/s
Mean Throughput index 27541 docs/s
Median Throughput index 27541 docs/s
Max Throughput index 27541 docs/s
50th percentile latency index 30.1443 ms
100th percentile latency index 31.1228 ms
50th percentile service time index 30.1443 ms
100th percentile service time index 31.1228 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 221.57 ops/s
Mean Throughput wait-until-merges-finish 221.57 ops/s
Median Throughput wait-until-merges-finish 221.57 ops/s
Max Throughput wait-until-merges-finish 221.57 ops/s
100th percentile latency wait-until-merges-finish 3.96129 ms
100th percentile service time wait-until-merges-finish 3.96129 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 138.62 ops/s
Mean Throughput default 138.62 ops/s
Median Throughput default 138.62 ops/s
Max Throughput default 138.62 ops/s
100th percentile latency default 15.7351 ms
100th percentile service time default 5.31692 ms
error rate default 0 %
Min Throughput range 117.22 ops/s
Mean Throughput range 117.22 ops/s
Median Throughput range 117.22 ops/s
Max Throughput range 117.22 ops/s
100th percentile latency range 15.5039 ms
100th percentile service time range 6.77485 ms
error rate range 0 %

@Bukhtawar
Copy link
Collaborator

Just a quick note, please also benchmark/profile for CPU and JVM overhead reduction with the change during ser/de.

@backslasht
Copy link
Contributor

@VachaShah - 100th percentile is generally skewed, 90th percentile is more reliable, can you please share the numbers for 90th and 99th percentile. Also, are these single runs or average of 'N' runs?

@VachaShah
Copy link
Collaborator Author

@VachaShah - 100th percentile is generally skewed, 90th percentile is more reliable, can you please share the numbers for 90th and 99th percentile. Also, are these single runs or average of 'N' runs?

OSB runs 100 iterations for search, so this is average for those 100 runs. OSB publishes the 100th percentile for the operations, I think the OSB code needs to be modified to get the other percentiles. @gkamat Is there a way to customize this from command line?

@VachaShah
Copy link
Collaborator Author

VachaShah commented May 13, 2024

Update

With the discussion on using protobuf for API request/response and node-to-node communication in the transport layer, we have first taken up making the transport layer abstract to support multiple protocols for serialization and deserialization. This will decouple the node-to-node communication in the transport layer from the current serialization mechanism (which is now referred to as native protocol in the codebase).

After these changes, we will take up adding protobuf into the codebase for search API in a way that the API request/response layer is not tightly coupled with the serialization mechanisms (which is the case currently for example how Writeable is implemented by model classes and request/response classes).

Transport layer abstractions and decoupling

Introduction of protobuf for search API (WIP - might be divided into more PRs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Build Libraries & Interfaces enhancement Enhancement or improvement to existing feature or request Meta Meta issue, not directly linked to a PR Roadmap:Cost/Performance/Scale Project-wide roadmap label
Projects
Status: 3.0.0 (TBD)
Status: New
Status: In Progress
Development

No branches or pull requests

9 participants