From 1c66410daa94ce715cd540bc788edc56691457c5 Mon Sep 17 00:00:00 2001 From: Martin Gaievski Date: Thu, 3 Nov 2022 21:00:04 -0700 Subject: [PATCH] Refactor code for query flow Signed-off-by: Martin Gaievski --- benchmarks/perf-tool/okpt/io/dataset.py | 2 +- benchmarks/perf-tool/okpt/test/steps/steps.py | 234 +++++++----------- 2 files changed, 92 insertions(+), 144 deletions(-) diff --git a/benchmarks/perf-tool/okpt/io/dataset.py b/benchmarks/perf-tool/okpt/io/dataset.py index 046b6eded..001563bab 100644 --- a/benchmarks/perf-tool/okpt/io/dataset.py +++ b/benchmarks/perf-tool/okpt/io/dataset.py @@ -94,7 +94,7 @@ def _parse_context(context: Context, custom_context=None) -> str: return "neighbors" if context == Context.INDEX: - return "neighbors" + return "train" if context == Context.QUERY: return "test" diff --git a/benchmarks/perf-tool/okpt/test/steps/steps.py b/benchmarks/perf-tool/okpt/test/steps/steps.py index d62f2858a..14c345a9d 100644 --- a/benchmarks/perf-tool/okpt/test/steps/steps.py +++ b/benchmarks/perf-tool/okpt/test/steps/steps.py @@ -9,6 +9,7 @@ so the profiling decorators aren't needed for some functions. """ import json +from abc import abstractmethod from typing import Any, Dict, List import numpy as np @@ -18,7 +19,8 @@ from opensearchpy import OpenSearch, RequestsHttpConnection from okpt.io.config.parsers.base import ConfigurationError -from okpt.io.config.parsers.util import parse_string_param, parse_int_param, parse_dataset, parse_bool_param, parse_list_param +from okpt.io.config.parsers.util import parse_string_param, parse_int_param, parse_dataset, parse_bool_param, \ + parse_list_param from okpt.io.dataset import Context from okpt.io.utils.reader import parse_json_from_path from okpt.test.steps import base @@ -372,7 +374,8 @@ def action(doc_id): partition = self.dataset.read(self.bulk_size) if partition is None: break - body = bulk_transform_with_attributes(partition, partition_attr, self.field_name, self.attributes_dataset_name, action, i, self.attribute_spec) + body = bulk_transform_with_attributes(partition, partition_attr, self.field_name, + self.attributes_dataset_name, action, i, self.attribute_spec) bulk_index(self.opensearch, self.index_name, body) self.dataset.reset() @@ -383,11 +386,9 @@ def _get_measures(self) -> List[str]: return ['took'] -class QueryStep(OpenSearchStep): +class BaseQueryStep(OpenSearchStep): """See base class.""" - label = 'query' - def __init__(self, step_config: StepConfig): super().__init__(step_config) self.k = parse_int_param('k', step_config.config, {}, 100) @@ -410,29 +411,13 @@ def __init__(self, step_config: StepConfig): self.dataset.size()) self.query_count = min(input_query_count, self.dataset.size()) - neighbors_format = parse_string_param('neighbors_format', - step_config.config, {}, 'hdf5') - neighbors_path = parse_string_param('neighbors_path', - step_config.config, {}, None) - self.neighbors = parse_dataset(neighbors_format, neighbors_path, - Context.NEIGHBORS) - self.implicit_config = step_config.implicit_config + self.neighbors_format = parse_string_param('neighbors_format', + step_config.config, {}, 'hdf5') + self.neighbors_path = parse_string_param('neighbors_path', + step_config.config, {}, None) def _action(self): - def get_body(vec): - return { - 'size': self.k, - 'query': { - 'knn': { - self.field_name: { - 'vector': vec, - 'k': self.k - } - } - } - } - results = {} query_responses = [] for _ in range(self.query_count): @@ -441,7 +426,7 @@ def get_body(vec): break query_responses.append( query_index(self.opensearch, self.index_name, - get_body(query[0]), [self.field_name])) + self.get_body(query[0]) , [self.field_name])) results['took'] = [ float(query_response['took']) for query_response in query_responses @@ -471,153 +456,115 @@ def _get_measures(self) -> List[str]: return measures -class QueryWithFilterStep(OpenSearchStep): + @abstractmethod + def get_body(self, vec): + pass + + +class QueryStep(BaseQueryStep): """See base class.""" - label = 'query_with_filter' + label = 'query' def __init__(self, step_config: StepConfig): super().__init__(step_config) - self.k = parse_int_param('k', step_config.config, {}, 100) - self.r = parse_int_param('r', step_config.config, {}, 1) - self.index_name = parse_string_param('index_name', step_config.config, - {}, None) - self.field_name = parse_string_param('field_name', step_config.config, - {}, None) - self.calculate_recall = parse_bool_param('calculate_recall', - step_config.config, {}, False) + self.neighbors = parse_dataset(self.neighbors_format, self.neighbors_path, + Context.NEIGHBORS) + self.implicit_config = step_config.implicit_config - self.filter_spec = parse_string_param('filter_spec', step_config.config, {}, None) + def get_body(self, vec): + return { + 'size': self.k, + 'query': { + 'knn': { + self.field_name: { + 'vector': vec, + 'k': self.k + } + } + } + } - dataset_format = parse_string_param('dataset_format', - step_config.config, {}, 'hdf5') - dataset_path = parse_string_param('dataset_path', - step_config.config, {}, None) - self.dataset = parse_dataset(dataset_format, dataset_path, - Context.QUERY) - input_query_count = parse_int_param('query_count', - step_config.config, {}, - self.dataset.size()) - self.query_count = min(input_query_count, self.dataset.size()) +class QueryWithFilterStep(BaseQueryStep): + """See base class.""" - neighbors_format = parse_string_param('neighbors_format', - step_config.config, {}, 'hdf5') - neighbors_path = parse_string_param('neighbors_path', - step_config.config, {}, None) + label = 'query_with_filter' + + def __init__(self, step_config: StepConfig): + super().__init__(step_config) neighbors_dataset = parse_string_param('neighbors_dataset', - step_config.config, {}, None) + step_config.config, {}, None) - self.neighbors = parse_dataset(neighbors_format, neighbors_path, + self.neighbors = parse_dataset(self.neighbors_format, self.neighbors_path, Context.CUSTOM, neighbors_dataset) self.filter_type = parse_string_param('filter_type', step_config.config, {}, 'SCRIPT') + self.filter_spec = parse_string_param('filter_spec', step_config.config, {}, None) self.score_script_similarity = parse_string_param('score_script_similarity', step_config.config, {}, 'l2') self.implicit_config = step_config.implicit_config - def _action(self): - def get_body_filter(vec): - filter_json = json.load(open(self.filter_spec)) - if self.filter_type == 'FILTER': - return { - 'size': self.k, - 'query': { - 'knn': { - self.field_name: { - 'vector': vec, - 'k': self.k, - 'filter': filter_json - } + def get_body(self, vec): + filter_json = json.load(open(self.filter_spec)) + if self.filter_type == 'FILTER': + return { + 'size': self.k, + 'query': { + 'knn': { + self.field_name: { + 'vector': vec, + 'k': self.k, + 'filter': filter_json } } } - elif self.filter_type == 'SCRIPT': - return { - 'size': self.k, - 'query': { - 'script_score': { - 'query': { - 'bool': { - 'filter': filter_json - } - }, - 'script': { - 'source': 'knn_score', - 'lang': 'knn', - 'params': { - 'field': self.field_name, - 'query_value': vec, - 'space_type': self.score_script_similarity - } + } + elif self.filter_type == 'SCRIPT': + return { + 'size': self.k, + 'query': { + 'script_score': { + 'query': { + 'bool': { + 'filter': filter_json + } + }, + 'script': { + 'source': 'knn_score', + 'lang': 'knn', + 'params': { + 'field': self.field_name, + 'query_value': vec, + 'space_type': self.score_script_similarity } } } } - elif self.filter_type == 'BOOL_POST_FILTER': - return { - 'size': self.k, - 'query': { - 'bool': { - 'filter': filter_json, - 'must': [ - { - 'knn': { - self.field_name: { - 'vector': vec, - 'k': self.k - } + } + elif self.filter_type == 'BOOL_POST_FILTER': + return { + 'size': self.k, + 'query': { + 'bool': { + 'filter': filter_json, + 'must': [ + { + 'knn': { + self.field_name: { + 'vector': vec, + 'k': self.k } } - ] - } + } + ] } } - else: - raise ConfigurationError('Not supported filter type {}'.format(self.filter_type)) - - results = {} - query_responses = [] - - for i in range(self.query_count): - query = self.dataset.read(1) - if query is None: - break - body = get_body_filter(query[0]) - query_responses.append( - query_index(self.opensearch, self.index_name, - body, [self.field_name])) - - results['took'] = [ - float(query_response['took']) for query_response in query_responses - ] - results['memory_kb'] = get_cache_size_in_kb(self.endpoint, 80) - - if self.calculate_recall: - ids = [[int(hit['_id']) - for hit in query_response['hits']['hits']] - for query_response in query_responses] - r_at_k = recall_at_r(ids, self.neighbors, - self.k, self.k, self.query_count) - results['recall@K'] = r_at_k - self.neighbors.reset() - r_at_r = recall_at_r( - ids, self.neighbors, self.r, self.k, self.query_count) - results[f'recall@{str(self.r)}'] = r_at_r - self.neighbors.reset() - - self.dataset.reset() - - return results - - def _get_measures(self) -> List[str]: - measures = ['took', 'memory_kb'] - - if self.calculate_recall: - measures.extend(['recall@K', f'recall@{str(self.r)}']) + } + else: + raise ConfigurationError('Not supported filter type {}'.format(self.filter_type)) - return measures # Helper functions - (AKA not steps) def bulk_transform(partition: np.ndarray, field_name: str, action, @@ -640,6 +587,7 @@ def bulk_transform(partition: np.ndarray, field_name: str, action, actions[1::2] = [{field_name: vec} for vec in partition.tolist()] return actions + def bulk_transform_with_attributes(partition: np.ndarray, partition_attr, field_name: str, attributes_dataset_name: str, action, offset: int, attributes_def) -> List[Dict[str, Any]]: """Partitions and transforms a list of vectors into OpenSearch's bulk @@ -675,7 +623,7 @@ def bulk_transform_with_attributes(partition: np.ndarray, partition_attr, field_ elif attr_def_type == 'int': val = int(partition_attr[attr_idx][attr_def_idx].decode()) actions[idx][attr_def_name] = val - idx+=2 + idx += 2 return actions