diff --git a/.github/workflows/neurips23.yml b/.github/workflows/neurips23.yml index 6b03422b..212653e6 100644 --- a/.github/workflows/neurips23.yml +++ b/.github/workflows/neurips23.yml @@ -63,6 +63,9 @@ jobs: - algorithm: mysteryann dataset: random-xs track: ood + - algorithm: cufe + dataset: random-xs + track: streaming - algorithm: sustech-whu dataset: sparse-small track: sparse diff --git a/neurips23/streaming/cufe/Dockerfile b/neurips23/streaming/cufe/Dockerfile new file mode 100644 index 00000000..245635fa --- /dev/null +++ b/neurips23/streaming/cufe/Dockerfile @@ -0,0 +1,16 @@ +FROM neurips23 + +RUN apt update +RUN apt install -y software-properties-common +RUN add-apt-repository -y ppa:git-core/ppa +RUN apt update +RUN DEBIAN_FRONTEND=noninteractive apt install -y git make cmake g++ libaio-dev libgoogle-perftools-dev libunwind-dev clang-format libboost-dev libboost-program-options-dev libmkl-full-dev libcpprest-dev python3.10 + +ADD "https://github.com/AbdelrahmanMohamed129/DiskANN/tree/farah" latest_commit +RUN git clone https://github.com/AbdelrahmanMohamed129/DiskANN --branch farah +WORKDIR /home/app/DiskANN +RUN git pull +RUN pip3 install virtualenv build +RUN python3 -m build +RUN pip install dist/diskannpy-0.5.0rc3.post1-cp310-cp310-linux_x86_64.whl +WORKDIR /home/app \ No newline at end of file diff --git a/neurips23/streaming/cufe/config.yaml b/neurips23/streaming/cufe/config.yaml new file mode 100644 index 00000000..41fcf0ae --- /dev/null +++ b/neurips23/streaming/cufe/config.yaml @@ -0,0 +1,100 @@ +random-xs: + cufe: + docker-tag: neurips23-streaming-cufe + module: neurips23.streaming.cufe.diskann-str + constructor: diskannCUFE + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":50, "T":8}] +random-xs-clustered: + cufe: + docker-tag: neurips23-streaming-cufe + module: neurips23.streaming.cufe.diskann-str + constructor: diskannCUFE + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":50, "T":8}] +msspacev-1M: + cufe: + docker-tag: neurips23-streaming-cufe + module: neurips23.streaming.cufe.diskann-str + constructor: diskannCUFE + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":300, "T":16}, + {"Ls":100, "T":16}] +msturing-1M: + cufe: + docker-tag: neurips23-streaming-cufe + module: neurips23.streaming.cufe.diskann-str + constructor: diskannCUFE + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":300, "T":16}, + {"Ls":100, "T":16}] +msspacev-10M: + cufe: + docker-tag: neurips23-streaming-cufe + module: neurips23.streaming.cufe.diskann-str + constructor: diskannCUFE + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":100, "T":16}] +msturing-10M: + cufe: + docker-tag: neurips23-streaming-cufe + module: neurips23.streaming.cufe.diskann-str + constructor: diskannCUFE + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":100, "T":16}] +msturing-10M-clustered: + cufe: + docker-tag: neurips23-streaming-cufe + module: neurips23.streaming.cufe.diskann-str + constructor: diskannCUFE + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":100, "T":16}] +msturing-30M-clustered: + cufe: + docker-tag: neurips23-streaming-cufe + module: neurips23.streaming.cufe.diskann-str + constructor: diskannCUFE + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}, + {"R":32, "L":70, "insert_threads":16, "consolidate_threads":16}, + {"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":70, "T":16}] diff --git a/neurips23/streaming/cufe/diskann-str.py b/neurips23/streaming/cufe/diskann-str.py new file mode 100644 index 00000000..68d37ba7 --- /dev/null +++ b/neurips23/streaming/cufe/diskann-str.py @@ -0,0 +1,107 @@ +from __future__ import absolute_import +import psutil +import os +import time +import numpy as np + +import diskannpy + +from neurips23.streaming.base import BaseStreamingANN + +class diskannCUFE(BaseStreamingANN): + def __init__(self, metric, index_params): + self.name = "diskannCUFE" + if (index_params.get("R")==None): + print("Error: missing parameter R") + return + if (index_params.get("L")==None): + print("Error: missing parameter L") + return + self._index_params = index_params + self._metric = metric + + self.R = index_params.get("R") + self.L = index_params.get("L") + self.insert_threads = index_params.get("insert_threads") + self.consolidate_threads = index_params.get("consolidate_threads") + + def index_name(self): + return f"R{self.R}_L{self.L}" + + def create_index_dir(self, dataset): + index_dir = os.path.join(os.getcwd(), "data", "indices", "streaming") + os.makedirs(index_dir, mode=0o777, exist_ok=True) + index_dir = os.path.join(index_dir, 'diskann') + os.makedirs(index_dir, mode=0o777, exist_ok=True) + index_dir = os.path.join(index_dir, dataset.short_name()) + os.makedirs(index_dir, mode=0o777, exist_ok=True) + index_dir = os.path.join(index_dir, self.index_name()) + os.makedirs(index_dir, mode=0o777, exist_ok=True) + return index_dir + + def translate_dist_fn(self, metric): + if metric == 'euclidean': + return 'l2' + elif metric == 'ip': + return 'mips' + else: + raise Exception('Invalid metric') + + def translate_dtype(self, dtype:str): + if dtype == 'uint8': + return np.uint8 + elif dtype == 'int8': + return np.int8 + elif dtype == 'float32': + return np.float32 + else: + raise Exception('Invalid data type') + + def setup(self, dtype, max_pts, ndim): + self.index = diskannpy.DynamicMemoryIndex( + distance_metric = self.translate_dist_fn(self._metric), + vector_dtype = self.translate_dtype(dtype), + max_vectors = max_pts, + dimensions = ndim, + graph_degree = self.R, + complexity=self.L, + num_threads = self.insert_threads, #to allocate scratch space for up to 64 search threads + initial_search_complexity = 100 + ) + self.max_pts = max_pts + print('Index class constructed and ready for update/search') + self.active_indices = set() + self.num_unprocessed_deletes = 0 + + def insert(self, X, ids): + self.active_indices.update(ids+1) + print('#active pts', len(self.active_indices), '#unprocessed deletes', self.num_unprocessed_deletes) + if len(self.active_indices) + self.num_unprocessed_deletes >= self.max_pts: + self.index.consolidate_delete() + self.num_unprocessed_deletes = 0 + + retvals = self.index.batch_insert(X, ids+1) + if -1 in retvals: + print('insertion failed') + print('insertion return values', retvals) + + def delete(self, ids): + for id in ids: + self.index.mark_deleted(id+1) + self.active_indices.difference_update(ids+1) + self.num_unprocessed_deletes += len(ids) + + def query(self, X, k): + """Carry out a batch query for k-NN of query set X.""" + nq, dim = (np.shape(X)) + self.res, self.query_dists = self.index.batch_search( + X, k, self.Ls, self.search_threads) + self.res = self.res-1 + + def set_query_arguments(self, query_args): + self._query_args = query_args + self.Ls = 0 if query_args.get("Ls") == None else query_args.get("Ls") + self.search_threads = self._query_args.get("T") + + def __str__(self): + return f'diskann({self.index_name(), self._query_args})'