Skip to content

Commit

Permalink
Streaming/cufe (#211)
Browse files Browse the repository at this point in the history
* init

* config yaml update

* workflow

* CI tests

* Removing our CI

---------

Co-authored-by: Abdelrahman Ezzat <[email protected]>
Co-authored-by: Ubuntu <[email protected]>
  • Loading branch information
3 people authored Nov 6, 2023
1 parent 9b14ca8 commit f799bc8
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/neurips23.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ jobs:
- algorithm: mysteryann
dataset: random-xs
track: ood
- algorithm: cufe
dataset: random-xs
track: streaming
- algorithm: sustech-whu
dataset: sparse-small
track: sparse
Expand Down
16 changes: 16 additions & 0 deletions neurips23/streaming/cufe/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM neurips23

RUN apt update
RUN apt install -y software-properties-common
RUN add-apt-repository -y ppa:git-core/ppa
RUN apt update
RUN DEBIAN_FRONTEND=noninteractive apt install -y git make cmake g++ libaio-dev libgoogle-perftools-dev libunwind-dev clang-format libboost-dev libboost-program-options-dev libmkl-full-dev libcpprest-dev python3.10

ADD "https://github.com/AbdelrahmanMohamed129/DiskANN/tree/farah" latest_commit
RUN git clone https://github.com/AbdelrahmanMohamed129/DiskANN --branch farah
WORKDIR /home/app/DiskANN
RUN git pull
RUN pip3 install virtualenv build
RUN python3 -m build
RUN pip install dist/diskannpy-0.5.0rc3.post1-cp310-cp310-linux_x86_64.whl
WORKDIR /home/app
100 changes: 100 additions & 0 deletions neurips23/streaming/cufe/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
random-xs:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":50, "T":8}]
random-xs-clustered:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":50, "T":8}]
msspacev-1M:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":300, "T":16},
{"Ls":100, "T":16}]
msturing-1M:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":300, "T":16},
{"Ls":100, "T":16}]
msspacev-10M:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":100, "T":16}]
msturing-10M:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":100, "T":16}]
msturing-10M-clustered:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":100, "T":16}]
msturing-30M-clustered:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16},
{"R":32, "L":70, "insert_threads":16, "consolidate_threads":16},
{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":70, "T":16}]
107 changes: 107 additions & 0 deletions neurips23/streaming/cufe/diskann-str.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import absolute_import
import psutil
import os
import time
import numpy as np

import diskannpy

from neurips23.streaming.base import BaseStreamingANN

class diskannCUFE(BaseStreamingANN):
def __init__(self, metric, index_params):
self.name = "diskannCUFE"
if (index_params.get("R")==None):
print("Error: missing parameter R")
return
if (index_params.get("L")==None):
print("Error: missing parameter L")
return
self._index_params = index_params
self._metric = metric

self.R = index_params.get("R")
self.L = index_params.get("L")
self.insert_threads = index_params.get("insert_threads")
self.consolidate_threads = index_params.get("consolidate_threads")

def index_name(self):
return f"R{self.R}_L{self.L}"

def create_index_dir(self, dataset):
index_dir = os.path.join(os.getcwd(), "data", "indices", "streaming")
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, 'diskann')
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, dataset.short_name())
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, self.index_name())
os.makedirs(index_dir, mode=0o777, exist_ok=True)
return index_dir

def translate_dist_fn(self, metric):
if metric == 'euclidean':
return 'l2'
elif metric == 'ip':
return 'mips'
else:
raise Exception('Invalid metric')

def translate_dtype(self, dtype:str):
if dtype == 'uint8':
return np.uint8
elif dtype == 'int8':
return np.int8
elif dtype == 'float32':
return np.float32
else:
raise Exception('Invalid data type')

def setup(self, dtype, max_pts, ndim):
self.index = diskannpy.DynamicMemoryIndex(
distance_metric = self.translate_dist_fn(self._metric),
vector_dtype = self.translate_dtype(dtype),
max_vectors = max_pts,
dimensions = ndim,
graph_degree = self.R,
complexity=self.L,
num_threads = self.insert_threads, #to allocate scratch space for up to 64 search threads
initial_search_complexity = 100
)
self.max_pts = max_pts
print('Index class constructed and ready for update/search')
self.active_indices = set()
self.num_unprocessed_deletes = 0

def insert(self, X, ids):
self.active_indices.update(ids+1)
print('#active pts', len(self.active_indices), '#unprocessed deletes', self.num_unprocessed_deletes)
if len(self.active_indices) + self.num_unprocessed_deletes >= self.max_pts:
self.index.consolidate_delete()
self.num_unprocessed_deletes = 0

retvals = self.index.batch_insert(X, ids+1)
if -1 in retvals:
print('insertion failed')
print('insertion return values', retvals)

def delete(self, ids):
for id in ids:
self.index.mark_deleted(id+1)
self.active_indices.difference_update(ids+1)
self.num_unprocessed_deletes += len(ids)

def query(self, X, k):
"""Carry out a batch query for k-NN of query set X."""
nq, dim = (np.shape(X))
self.res, self.query_dists = self.index.batch_search(
X, k, self.Ls, self.search_threads)
self.res = self.res-1

def set_query_arguments(self, query_args):
self._query_args = query_args
self.Ls = 0 if query_args.get("Ls") == None else query_args.get("Ls")
self.search_threads = self._query_args.get("T")

def __str__(self):
return f'diskann({self.index_name(), self._query_args})'

0 comments on commit f799bc8

Please sign in to comment.