Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Exact Search to return only K results and added client side latency metric for query Benchmarks #933

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions benchmarks/perf-tool/okpt/test/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ def _action(self):
results['took'] = [
float(query_response['took']) for query_response in query_responses
]
results['client_time'] = [
float(query_response['client_time']) for query_response in query_responses
]
results['memory_kb'] = get_cache_size_in_kb(self.endpoint, self.port)

if self.calculate_recall:
Expand All @@ -472,7 +475,7 @@ def _action(self):
return results

def _get_measures(self) -> List[str]:
measures = ['took', 'memory_kb']
measures = ['took', 'memory_kb', 'client_time']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be nice to have a unit as well. client_time_millis or client_time_seconds

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we use constant?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this I don't see constant will make a lot of sense. Plus these are benchmark scripts, so I don't see a big advantage here.

For time unit, we don't do it for other metrics. Hence I feel for consistency we should keep it. Everything is already in milliseconds only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as its the same unit as took, thats fine

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its the same unit.


if self.calculate_recall:
measures.extend(['recall@K', f'recall@{str(self.r)}'])
Expand Down Expand Up @@ -783,9 +786,13 @@ def get_cache_size_in_kb(endpoint, port):

def query_index(opensearch: OpenSearch, index_name: str, body: dict,
excluded_fields: list):
return opensearch.search(index=index_name,
start_time = round(time.time()*1000)
queryResponse = opensearch.search(index=index_name,
body=body,
_source_excludes=excluded_fields)
end_time = round(time.time() * 1000)
queryResponse['client_time'] = end_time - start_time
return queryResponse


def bulk_index(opensearch: OpenSearch, index_name: str, body: List):
Expand Down
38 changes: 31 additions & 7 deletions src/main/java/org/opensearch/knn/index/query/KNNWeight.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.search.FilteredDocIdSetIterator;
import org.apache.lucene.search.HitQueue;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.Bits;
Expand Down Expand Up @@ -291,18 +293,40 @@ private Map<Integer, Float> doExactSearch(final LeafReaderContext leafReaderCont
try {
final BinaryDocValues values = DocValues.getBinary(leafReaderContext.reader(), fieldInfo.name);
final SpaceType spaceType = SpaceType.getSpace(fieldInfo.getAttribute(SPACE_TYPE));

//Creating min heap and init with MAX DocID and Score as -INF.
final HitQueue queue = new HitQueue(this.knnQuery.getK(), true);
ScoreDoc topDoc = queue.top();
final Map<Integer, Float> docToScore = new HashMap<>();
for (int j : filterIdsArray) {
int docId = values.advance(j);
BytesRef value = values.binaryValue();
ByteArrayInputStream byteStream = new ByteArrayInputStream(value.bytes, value.offset, value.length);
for (int filterId : filterIdsArray) {
int docId = values.advance(filterId);
final BytesRef value = values.binaryValue();
final ByteArrayInputStream byteStream = new ByteArrayInputStream(value.bytes, value.offset,
value.length);
final KNNVectorSerializer vectorSerializer = KNNVectorSerializerFactory.getSerializerByStreamContent(byteStream);
final float[] vector = vectorSerializer.byteToFloatArray(byteStream);
// making min score as high score as this is closest to the vector
// Calculates a similarity score between the two vectors with a specified function. Higher similarity
// scores correspond to closer vectors.
float score = spaceType.getVectorSimilarityFunction().compare(queryVector, vector);
docToScore.put(docId, score);
if(score > topDoc.score) {
topDoc.score = score;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this logic.
You are updating topDoc with new score and new docId. Which means, the queue will replace current top document with new top document. Instead shouldn't we remove the bottom doc and add the new topDoc in the queue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a min heap and not max heap. Plus we did the init of the heap. The init of the heap will set all the values as DocId: MAX_DOC_ID and Score as -INF. Now, everytime we update the top, the top element will have score of -INF.

I will add comments around this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. So we basically need to keep track of the worst result we have come across so far so that we know which one to replace.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is correct. This is what min heap does for you.

topDoc.doc = docId;
// As the HitQueue is min heap, updating top will bring the doc with -INF score or worst score we
// have seen till now on top.
topDoc = queue.updateTop();
}
}
// If scores are negative we will remove them.
// This is done, because there can be negative values in the Heap as we init the heap with Score as -INF.
// If filterIds < k, the some values in heap can have a negative score.
while (queue.size() > 0 && queue.top().score < 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If queue.top() score is negative, doesn't that mean every score in the queue is negative? Then, we can simply empties it instead of looping through it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that heap is init with all values having score of -INF. So in case filterIds < k, some ids in the heap can have -INF value. Hence we need to remove them.

queue.pop();
}

while (queue.size() > 0) {
final ScoreDoc doc = queue.pop();
docToScore.put(doc.doc, doc.score);
}

return docToScore;
} catch (Exception e) {
log.error("Error while getting the doc values to do the k-NN Search for query : {}", this.knnQuery);
Expand Down