Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming/cufe #211

Merged
merged 9 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/neurips23.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ jobs:
- algorithm: mysteryann
dataset: random-xs
track: ood
- algorithm: cufe
dataset: random-xs
track: streaming
- algorithm: sustech-whu
dataset: sparse-small
track: sparse
Expand Down
16 changes: 16 additions & 0 deletions neurips23/streaming/cufe/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM neurips23

RUN apt update
RUN apt install -y software-properties-common
RUN add-apt-repository -y ppa:git-core/ppa
RUN apt update
RUN DEBIAN_FRONTEND=noninteractive apt install -y git make cmake g++ libaio-dev libgoogle-perftools-dev libunwind-dev clang-format libboost-dev libboost-program-options-dev libmkl-full-dev libcpprest-dev python3.10

ADD "https://github.com/AbdelrahmanMohamed129/DiskANN/tree/farah" latest_commit
RUN git clone https://github.com/AbdelrahmanMohamed129/DiskANN --branch farah
WORKDIR /home/app/DiskANN
RUN git pull
RUN pip3 install virtualenv build
RUN python3 -m build
RUN pip install dist/diskannpy-0.5.0rc3.post1-cp310-cp310-linux_x86_64.whl
WORKDIR /home/app
100 changes: 100 additions & 0 deletions neurips23/streaming/cufe/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
random-xs:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":50, "T":8}]
random-xs-clustered:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":50, "T":8}]
msspacev-1M:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":300, "T":16},
{"Ls":100, "T":16}]
msturing-1M:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":300, "T":16},
{"Ls":100, "T":16}]
msspacev-10M:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":100, "T":16}]
msturing-10M:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":100, "T":16}]
msturing-10M-clustered:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":100, "T":16}]
msturing-30M-clustered:
cufe:
docker-tag: neurips23-streaming-cufe
module: neurips23.streaming.cufe.diskann-str
constructor: diskannCUFE
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16},
{"R":32, "L":70, "insert_threads":16, "consolidate_threads":16},
{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":70, "T":16}]
107 changes: 107 additions & 0 deletions neurips23/streaming/cufe/diskann-str.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import absolute_import
import psutil
import os
import time
import numpy as np

import diskannpy

from neurips23.streaming.base import BaseStreamingANN

class diskannCUFE(BaseStreamingANN):
def __init__(self, metric, index_params):
self.name = "diskannCUFE"
if (index_params.get("R")==None):
print("Error: missing parameter R")
return
if (index_params.get("L")==None):
print("Error: missing parameter L")
return
self._index_params = index_params
self._metric = metric

self.R = index_params.get("R")
self.L = index_params.get("L")
self.insert_threads = index_params.get("insert_threads")
self.consolidate_threads = index_params.get("consolidate_threads")

def index_name(self):
return f"R{self.R}_L{self.L}"

def create_index_dir(self, dataset):
index_dir = os.path.join(os.getcwd(), "data", "indices", "streaming")
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, 'diskann')
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, dataset.short_name())
os.makedirs(index_dir, mode=0o777, exist_ok=True)
index_dir = os.path.join(index_dir, self.index_name())
os.makedirs(index_dir, mode=0o777, exist_ok=True)
return index_dir

def translate_dist_fn(self, metric):
if metric == 'euclidean':
return 'l2'
elif metric == 'ip':
return 'mips'
else:
raise Exception('Invalid metric')

def translate_dtype(self, dtype:str):
if dtype == 'uint8':
return np.uint8
elif dtype == 'int8':
return np.int8
elif dtype == 'float32':
return np.float32
else:
raise Exception('Invalid data type')

def setup(self, dtype, max_pts, ndim):
self.index = diskannpy.DynamicMemoryIndex(
distance_metric = self.translate_dist_fn(self._metric),
vector_dtype = self.translate_dtype(dtype),
max_vectors = max_pts,
dimensions = ndim,
graph_degree = self.R,
complexity=self.L,
num_threads = self.insert_threads, #to allocate scratch space for up to 64 search threads
initial_search_complexity = 100
)
self.max_pts = max_pts
print('Index class constructed and ready for update/search')
self.active_indices = set()
self.num_unprocessed_deletes = 0

def insert(self, X, ids):
self.active_indices.update(ids+1)
print('#active pts', len(self.active_indices), '#unprocessed deletes', self.num_unprocessed_deletes)
if len(self.active_indices) + self.num_unprocessed_deletes >= self.max_pts:
self.index.consolidate_delete()
self.num_unprocessed_deletes = 0

retvals = self.index.batch_insert(X, ids+1)
if -1 in retvals:
print('insertion failed')
print('insertion return values', retvals)

def delete(self, ids):
for id in ids:
self.index.mark_deleted(id+1)
self.active_indices.difference_update(ids+1)
self.num_unprocessed_deletes += len(ids)

def query(self, X, k):
"""Carry out a batch query for k-NN of query set X."""
nq, dim = (np.shape(X))
self.res, self.query_dists = self.index.batch_search(
X, k, self.Ls, self.search_threads)
self.res = self.res-1

def set_query_arguments(self, query_args):
self._query_args = query_args
self.Ls = 0 if query_args.get("Ls") == None else query_args.get("Ls")
self.search_threads = self._query_args.get("T")

def __str__(self):
return f'diskann({self.index_name(), self._query_args})'
Loading