Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenSearch Benchmark index workload for k-NN #364

Merged
merged 23 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
399 changes: 399 additions & 0 deletions benchmarks/osb/README.md

Large diffs are not rendered by default.

Empty file added benchmarks/osb/__init__.py
Empty file.
Empty file.
176 changes: 176 additions & 0 deletions benchmarks/osb/extensions/data_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import os
import numpy as np
from abc import ABC, ABCMeta, abstractmethod
from enum import Enum
from typing import cast
import h5py
import struct


class Context(Enum):
"""DataSet context enum. Can be used to add additional context for how a
data-set should be interpreted.
"""
INDEX = 1
QUERY = 2
NEIGHBORS = 3


class DataSet(ABC):
"""DataSet interface. Used for reading data-sets from files.

Methods:
read: Read a chunk of data from the data-set
seek: Get to position in the data-set
size: Gets the number of items in the data-set
reset: Resets internal state of data-set to beginning
"""
__metaclass__ = ABCMeta

@abstractmethod
def read(self, chunk_size: int):
pass

@abstractmethod
def seek(self, offset: int):
pass

@abstractmethod
def size(self):
pass

@abstractmethod
def reset(self):
pass


class HDF5DataSet(DataSet):
""" Data-set format corresponding to `ANN Benchmarks
<https://github.com/erikbern/ann-benchmarks#data-sets>`_
"""

def __init__(self, dataset_path: str, context: Context):
file = h5py.File(dataset_path)
self.data = cast(h5py.Dataset, file[self._parse_context(context)])
self.current = 0

def read(self, chunk_size: int):
if self.current >= self.size():
return None

jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
end_i = self.current + chunk_size
if end_i > self.size():
end_i = self.size()

v = cast(np.ndarray, self.data[self.current:end_i])
self.current = end_i
return v

def seek(self, offset: int):
if offset >= self.size():
raise Exception("Offset is greater than the size")
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved

self.current = offset

def size(self):
return self.data.len()

def reset(self):
self.current = 0

@staticmethod
def _parse_context(context: Context) -> str:
if context == Context.NEIGHBORS:
return "neighbors"

if context == Context.INDEX:
return "train"

if context == Context.QUERY:
return "test"

raise Exception("Unsupported context")


class BigANNVectorDataSet(DataSet):
""" Data-set format for vector data-sets for `Big ANN Benchmarks
<https://big-ann-benchmarks.com/index.html#bench-datasets>`_
"""

def __init__(self, dataset_path: str):
self.file = open(dataset_path, 'rb')
self.file.seek(0, os.SEEK_END)
num_bytes = self.file.tell()
self.file.seek(0)

if num_bytes < 8:
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
raise Exception("File is invalid")

self.num_points = int.from_bytes(self.file.read(4), "little")
self.dimension = int.from_bytes(self.file.read(4), "little")
bytes_per_num = self._get_data_size(dataset_path)

if (num_bytes - 8) != self.num_points * self.dimension * bytes_per_num:
raise Exception("File is invalid")

self.reader = self._value_reader(dataset_path)
self.current = 0
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved

def read(self, chunk_size: int):
if self.current >= self.size():
return None

end_i = self.current + chunk_size
if end_i > self.size():
end_i = self.size()

v = np.asarray([self._read_vector() for _ in
range(end_i - self.current)])
self.current = end_i
return v

def seek(self, offset: int):
if offset >= self.size():
raise Exception("Offset is greater than the size")

self.file.seek(offset)
self.current = offset

def _read_vector(self):
return np.asarray([self.reader(self.file) for _ in
range(self.dimension)])

def size(self):
return self.num_points

def reset(self):
self.file.seek(8) # Seek to 8 bytes to skip re-reading metadata
self.current = 0

@staticmethod
def _get_data_size(file_name):
ext = file_name.split('.')[-1]
if ext == "u8bin":
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
return 1
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved

if ext == "fbin":
return 4
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved

raise Exception("Unknown extension")

@staticmethod
def _value_reader(file_name):
ext = file_name.split('.')[-1]
if ext == "u8bin":
return lambda file: float(int.from_bytes(file.read(1), "little"))

if ext == "fbin":
return lambda file: struct.unpack('<f', file.read(4))

raise Exception("Unknown extension")
79 changes: 79 additions & 0 deletions benchmarks/osb/extensions/param_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
import copy

from .data_set import Context, HDF5DataSet, DataSet, BigANNVectorDataSet
from .util import bulk_transform, parse_string_parameter, parse_int_parameter, \
ConfigurationError


def register(registry):
registry.register_param_source(
"bulk-from-data-set", BulkVectorsFromDataSetParamSource
)


class BulkVectorsFromDataSetParamSource:
def __init__(self, workload, params, **kwargs):
self.data_set_format = parse_string_parameter("data_set_format", params)
self.data_set_path = parse_string_parameter("data_set_path", params)
self.data_set: DataSet = self._read_data_set()

self.field_name: str = parse_string_parameter("field", params)
self.index_name: str = parse_string_parameter("index", params)
self.bulk_size: int = parse_int_parameter("bulk_size", params)
self.retries: int = parse_int_parameter("retries", params, 10)
self.num_vectors: int = parse_int_parameter(
"num_vectors", params, self.data_set.size()
)
self.total = self.num_vectors
self.current = 0
self.infinite = False
self.percent_completed = 0
self.offset = 0

def _read_data_set(self):
if self.data_set_format == "hdf5":
data_set = HDF5DataSet(self.data_set_path, Context.INDEX)
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
elif self.data_set_format == "bigann":
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
data_set = BigANNVectorDataSet(self.data_set_path)
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ConfigurationError("Invalid data set format")
return data_set

def partition(self, partition_index, total_partitions):
if self.data_set.size() % total_partitions != 0:
raise Exception("Data set must be divisible by number of clients")
VijayanB marked this conversation as resolved.
Show resolved Hide resolved

partition_x = copy.copy(self)
partition_x.num_vectors = int(self.num_vectors / total_partitions)
partition_x.offset = int(partition_index * partition_x.num_vectors)

# We need to create a new instance of the data set for each client
partition_x.data_set = partition_x._read_data_set()
partition_x.data_set.seek(partition_x.offset)
partition_x.current = partition_x.offset
return partition_x

def params(self):

if self.current >= self.num_vectors + self.offset:
raise StopIteration

def action(doc_id):
return {'index': {'_index': self.index_name, '_id': doc_id}}

partition = self.data_set.read(self.bulk_size)
body = bulk_transform(partition, self.field_name, action, self.current)
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
size = int(len(body) / 2)
self.current += size
self.percent_completed = float(self.current)/self.total

return {
"body": body,
"retries": self.retries,
"size": size
}
13 changes: 13 additions & 0 deletions benchmarks/osb/extensions/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

from .param_sources import register as param_sources_register
from .runners import register as runners_register


def register(registry):
param_sources_register(registry)
runners_register(registry)
118 changes: 118 additions & 0 deletions benchmarks/osb/extensions/runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

from .util import parse_int_parameter, parse_string_parameter
import time


def register(registry):
registry.register_runner(
"custom-vector-bulk", BulkVectorsFromDataSetRunner(), async_runner=True
)
registry.register_runner(
"custom-refresh", CustomRefreshRunner(), async_runner=True
)
registry.register_runner(
"train-model", TrainModelRunner(), async_runner=True
)
registry.register_runner(
"delete-model", DeleteModelRunner(), async_runner=True
)


class BulkVectorsFromDataSetRunner:

async def __call__(self, opensearch, params):
size = parse_int_parameter("size", params)
retries = parse_int_parameter("retries", params, 0) + 1

for _ in range(retries):
try:
await opensearch.bulk(
body=params["body"],
timeout='5m'
)

return size, "docs"
except:
pass

raise TimeoutError('Failed to submit bulk request in specified number '
'of retries')

def __repr__(self, *args, **kwargs):
return "custom-vector-bulk"


class CustomRefreshRunner:

async def __call__(self, opensearch, params):
retries = parse_int_parameter("retries", params, 0) + 1

for _ in range(retries):
try:
await opensearch.indices.refresh(
index=parse_string_parameter("index", params)
)

return
except:
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we capture the error and log. Also, we should know whether is it transient error or actual error. We should only retry transient error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will catch connection timeout error and log a message with it.


raise TimeoutError('Failed to refresh the index in specified number '
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
'of retries')

def __repr__(self, *args, **kwargs):
return "custom-refresh"


class TrainModelRunner:

async def __call__(self, opensearch, params):
# Train a model and wait for it training to complete
body = params["body"]
timeout = parse_int_parameter("timeout", params)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like retry than timeout. If we want timeout, then we should start timer and check value less than time out before iterating.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the proposal. I set it as timeout because the call returns before training finishes. So once the call is submitted, I check every second if the model state is created.

Copy link
Member

@VijayanB VijayanB Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At line 73, 1 second is elapsed, but you are not accounting number of seconds it takes to execute from line 74 till 83. I believe it is more than a second that it is spent inside the loop. First, i will check is it possible to pass timeout to request itself and wait for the response ( or timeout exception from API itself). If that is not possible, we need a stop watch at line 71, and checks whether stop watch reached expected seconds as loop condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the request itself isnt timing out. We are basically waiting on training to finish. I can setup the stop watch in the loop.

model_id = parse_string_parameter("model_id", params)

method = "POST"
model_uri = "/_plugins/_knn/models/{}".format(model_id)
await opensearch.transport.perform_request(method, "{}/_train".format(model_uri), body=body)
i = 0
while i < timeout:
time.sleep(1)
model_response = await opensearch.transport.perform_request("GET", model_uri)

if 'state' not in model_response.keys():
continue

if model_response['state'] == 'created':
#TODO: Return model size as well
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
return 1, "models_trained"

if model_response['state'] == 'failed':
raise Error("Failed to create model: {}".format(model_response))

i += 1

raise TimeoutError('Failed to create model: {}'.format(model_id))

def __repr__(self, *args, **kwargs):
return "train-model"


class DeleteModelRunner:

async def __call__(self, opensearch, params):
# Delete model provided by model id
method = "DELETE"
model_id = parse_string_parameter("model_id", params)
uri = "/_plugins/_knn/models/{}".format(model_id)

# Ignore if model doesnt exist
await opensearch.transport.perform_request(method, uri, params={"ignore": [400, 404]})

def __repr__(self, *args, **kwargs):
return "delete-model"
Loading