Skip to content

Commit

Permalink
Refactor code for query flow
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Gaievski <[email protected]>
  • Loading branch information
martin-gaievski committed Nov 4, 2022
1 parent bc09d85 commit 1c66410
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 144 deletions.
2 changes: 1 addition & 1 deletion benchmarks/perf-tool/okpt/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _parse_context(context: Context, custom_context=None) -> str:
return "neighbors"

if context == Context.INDEX:
return "neighbors"
return "train"

if context == Context.QUERY:
return "test"
Expand Down
234 changes: 91 additions & 143 deletions benchmarks/perf-tool/okpt/test/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
so the profiling decorators aren't needed for some functions.
"""
import json
from abc import abstractmethod
from typing import Any, Dict, List

import numpy as np
Expand All @@ -18,7 +19,8 @@
from opensearchpy import OpenSearch, RequestsHttpConnection

from okpt.io.config.parsers.base import ConfigurationError
from okpt.io.config.parsers.util import parse_string_param, parse_int_param, parse_dataset, parse_bool_param, parse_list_param
from okpt.io.config.parsers.util import parse_string_param, parse_int_param, parse_dataset, parse_bool_param, \
parse_list_param
from okpt.io.dataset import Context
from okpt.io.utils.reader import parse_json_from_path
from okpt.test.steps import base
Expand Down Expand Up @@ -372,7 +374,8 @@ def action(doc_id):
partition = self.dataset.read(self.bulk_size)
if partition is None:
break
body = bulk_transform_with_attributes(partition, partition_attr, self.field_name, self.attributes_dataset_name, action, i, self.attribute_spec)
body = bulk_transform_with_attributes(partition, partition_attr, self.field_name,
self.attributes_dataset_name, action, i, self.attribute_spec)
bulk_index(self.opensearch, self.index_name, body)

self.dataset.reset()
Expand All @@ -383,11 +386,9 @@ def _get_measures(self) -> List[str]:
return ['took']


class QueryStep(OpenSearchStep):
class BaseQueryStep(OpenSearchStep):
"""See base class."""

label = 'query'

def __init__(self, step_config: StepConfig):
super().__init__(step_config)
self.k = parse_int_param('k', step_config.config, {}, 100)
Expand All @@ -410,29 +411,13 @@ def __init__(self, step_config: StepConfig):
self.dataset.size())
self.query_count = min(input_query_count, self.dataset.size())

neighbors_format = parse_string_param('neighbors_format',
step_config.config, {}, 'hdf5')
neighbors_path = parse_string_param('neighbors_path',
step_config.config, {}, None)
self.neighbors = parse_dataset(neighbors_format, neighbors_path,
Context.NEIGHBORS)
self.implicit_config = step_config.implicit_config
self.neighbors_format = parse_string_param('neighbors_format',
step_config.config, {}, 'hdf5')
self.neighbors_path = parse_string_param('neighbors_path',
step_config.config, {}, None)

def _action(self):

def get_body(vec):
return {
'size': self.k,
'query': {
'knn': {
self.field_name: {
'vector': vec,
'k': self.k
}
}
}
}

results = {}
query_responses = []
for _ in range(self.query_count):
Expand All @@ -441,7 +426,7 @@ def get_body(vec):
break
query_responses.append(
query_index(self.opensearch, self.index_name,
get_body(query[0]), [self.field_name]))
self.get_body(query[0]) , [self.field_name]))

results['took'] = [
float(query_response['took']) for query_response in query_responses
Expand Down Expand Up @@ -471,153 +456,115 @@ def _get_measures(self) -> List[str]:

return measures

class QueryWithFilterStep(OpenSearchStep):
@abstractmethod
def get_body(self, vec):
pass


class QueryStep(BaseQueryStep):
"""See base class."""

label = 'query_with_filter'
label = 'query'

def __init__(self, step_config: StepConfig):
super().__init__(step_config)
self.k = parse_int_param('k', step_config.config, {}, 100)
self.r = parse_int_param('r', step_config.config, {}, 1)
self.index_name = parse_string_param('index_name', step_config.config,
{}, None)
self.field_name = parse_string_param('field_name', step_config.config,
{}, None)
self.calculate_recall = parse_bool_param('calculate_recall',
step_config.config, {}, False)
self.neighbors = parse_dataset(self.neighbors_format, self.neighbors_path,
Context.NEIGHBORS)
self.implicit_config = step_config.implicit_config

self.filter_spec = parse_string_param('filter_spec', step_config.config, {}, None)
def get_body(self, vec):
return {
'size': self.k,
'query': {
'knn': {
self.field_name: {
'vector': vec,
'k': self.k
}
}
}
}

dataset_format = parse_string_param('dataset_format',
step_config.config, {}, 'hdf5')
dataset_path = parse_string_param('dataset_path',
step_config.config, {}, None)
self.dataset = parse_dataset(dataset_format, dataset_path,
Context.QUERY)

input_query_count = parse_int_param('query_count',
step_config.config, {},
self.dataset.size())
self.query_count = min(input_query_count, self.dataset.size())
class QueryWithFilterStep(BaseQueryStep):
"""See base class."""

neighbors_format = parse_string_param('neighbors_format',
step_config.config, {}, 'hdf5')
neighbors_path = parse_string_param('neighbors_path',
step_config.config, {}, None)
label = 'query_with_filter'

def __init__(self, step_config: StepConfig):
super().__init__(step_config)

neighbors_dataset = parse_string_param('neighbors_dataset',
step_config.config, {}, None)
step_config.config, {}, None)

self.neighbors = parse_dataset(neighbors_format, neighbors_path,
self.neighbors = parse_dataset(self.neighbors_format, self.neighbors_path,
Context.CUSTOM, neighbors_dataset)

self.filter_type = parse_string_param('filter_type', step_config.config, {}, 'SCRIPT')
self.filter_spec = parse_string_param('filter_spec', step_config.config, {}, None)
self.score_script_similarity = parse_string_param('score_script_similarity', step_config.config, {}, 'l2')

self.implicit_config = step_config.implicit_config

def _action(self):
def get_body_filter(vec):
filter_json = json.load(open(self.filter_spec))
if self.filter_type == 'FILTER':
return {
'size': self.k,
'query': {
'knn': {
self.field_name: {
'vector': vec,
'k': self.k,
'filter': filter_json
}
def get_body(self, vec):
filter_json = json.load(open(self.filter_spec))
if self.filter_type == 'FILTER':
return {
'size': self.k,
'query': {
'knn': {
self.field_name: {
'vector': vec,
'k': self.k,
'filter': filter_json
}
}
}
elif self.filter_type == 'SCRIPT':
return {
'size': self.k,
'query': {
'script_score': {
'query': {
'bool': {
'filter': filter_json
}
},
'script': {
'source': 'knn_score',
'lang': 'knn',
'params': {
'field': self.field_name,
'query_value': vec,
'space_type': self.score_script_similarity
}
}
elif self.filter_type == 'SCRIPT':
return {
'size': self.k,
'query': {
'script_score': {
'query': {
'bool': {
'filter': filter_json
}
},
'script': {
'source': 'knn_score',
'lang': 'knn',
'params': {
'field': self.field_name,
'query_value': vec,
'space_type': self.score_script_similarity
}
}
}
}
elif self.filter_type == 'BOOL_POST_FILTER':
return {
'size': self.k,
'query': {
'bool': {
'filter': filter_json,
'must': [
{
'knn': {
self.field_name: {
'vector': vec,
'k': self.k
}
}
elif self.filter_type == 'BOOL_POST_FILTER':
return {
'size': self.k,
'query': {
'bool': {
'filter': filter_json,
'must': [
{
'knn': {
self.field_name: {
'vector': vec,
'k': self.k
}
}
]
}
}
]
}
}
else:
raise ConfigurationError('Not supported filter type {}'.format(self.filter_type))

results = {}
query_responses = []

for i in range(self.query_count):
query = self.dataset.read(1)
if query is None:
break
body = get_body_filter(query[0])
query_responses.append(
query_index(self.opensearch, self.index_name,
body, [self.field_name]))

results['took'] = [
float(query_response['took']) for query_response in query_responses
]
results['memory_kb'] = get_cache_size_in_kb(self.endpoint, 80)

if self.calculate_recall:
ids = [[int(hit['_id'])
for hit in query_response['hits']['hits']]
for query_response in query_responses]
r_at_k = recall_at_r(ids, self.neighbors,
self.k, self.k, self.query_count)
results['recall@K'] = r_at_k
self.neighbors.reset()
r_at_r = recall_at_r(
ids, self.neighbors, self.r, self.k, self.query_count)
results[f'recall@{str(self.r)}'] = r_at_r
self.neighbors.reset()

self.dataset.reset()

return results

def _get_measures(self) -> List[str]:
measures = ['took', 'memory_kb']

if self.calculate_recall:
measures.extend(['recall@K', f'recall@{str(self.r)}'])
}
else:
raise ConfigurationError('Not supported filter type {}'.format(self.filter_type))

return measures

# Helper functions - (AKA not steps)
def bulk_transform(partition: np.ndarray, field_name: str, action,
Expand All @@ -640,6 +587,7 @@ def bulk_transform(partition: np.ndarray, field_name: str, action,
actions[1::2] = [{field_name: vec} for vec in partition.tolist()]
return actions


def bulk_transform_with_attributes(partition: np.ndarray, partition_attr, field_name: str, attributes_dataset_name: str,
action, offset: int, attributes_def) -> List[Dict[str, Any]]:
"""Partitions and transforms a list of vectors into OpenSearch's bulk
Expand Down Expand Up @@ -675,7 +623,7 @@ def bulk_transform_with_attributes(partition: np.ndarray, partition_attr, field_
elif attr_def_type == 'int':
val = int(partition_attr[attr_idx][attr_def_idx].decode())
actions[idx][attr_def_name] = val
idx+=2
idx += 2

return actions

Expand Down

0 comments on commit 1c66410

Please sign in to comment.