-
Notifications
You must be signed in to change notification settings - Fork 749
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #202 from stephenleo/master
Adds Open Distro Elastic Search's KNN plugin support. Closes #174.
- Loading branch information
Showing
5 changed files
with
223 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import logging | ||
from time import sleep | ||
from urllib.error import URLError | ||
from urllib.request import Request, urlopen | ||
|
||
from elasticsearch import Elasticsearch | ||
from elasticsearch.helpers import bulk | ||
|
||
from ann_benchmarks.algorithms.base import BaseANN | ||
|
||
from .elasticsearch import es_wait | ||
|
||
# Configure the logger. | ||
logging.getLogger("elasticsearch").setLevel(logging.WARN) | ||
|
||
class OpenDistroKNN(BaseANN): | ||
def __init__(self, metric, dimension, method_param): | ||
self.metric = {"angular": "cosinesimil", "euclidean": "l2"}[metric] | ||
self.dimension = dimension | ||
self.method_param = method_param | ||
self.param_string = "-".join(k+"-"+str(v) for k,v in self.method_param.items()).lower() | ||
self.name = f"od-{self.param_string}" | ||
self.es = Elasticsearch(["http://localhost:9200"]) | ||
es_wait() | ||
|
||
def fit(self, X): | ||
body = { | ||
"settings": { | ||
"index": { | ||
"knn": True, | ||
"knn.space_type": self.metric, | ||
"knn.algo_param.ef_construction": self.method_param["efConstruction"], | ||
"knn.algo_param.m": self.method_param["M"] | ||
}, | ||
"number_of_shards": 1, | ||
"number_of_replicas": 0, | ||
"refresh_interval": -1 | ||
} | ||
} | ||
|
||
mapping = { | ||
"properties": { | ||
"id": {"type": "keyword", "store": True}, | ||
"vec": {"type": "knn_vector", "dimension": self.dimension} | ||
} | ||
} | ||
|
||
self.es.indices.create(self.name, body=body) | ||
self.es.indices.put_mapping(mapping, self.name) | ||
|
||
print("Uploading data to the Index:", self.name) | ||
def gen(): | ||
for i, vec in enumerate(X): | ||
yield { "_op_type": "index", "_index": self.name, "vec": vec.tolist(), 'id': str(i + 1) } | ||
|
||
(_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9, request_timeout=10) | ||
assert len(errors) == 0, errors | ||
|
||
print("Force Merge...") | ||
self.es.indices.forcemerge(self.name, max_num_segments=1, request_timeout=1000) | ||
|
||
print("Refreshing the Index...") | ||
self.es.indices.refresh(self.name, request_timeout=1000) | ||
|
||
print("Running Warmup API...") | ||
res = urlopen(Request("http://localhost:9200/_opendistro/_knn/warmup/"+self.name+"?pretty")) | ||
print(res.read().decode("utf-8")) | ||
|
||
def set_query_arguments(self, ef): | ||
body = { | ||
"settings": { | ||
"index": {"knn.algo_param.ef_search": ef} | ||
} | ||
} | ||
self.es.indices.put_settings(body=body) | ||
|
||
def query(self, q, n): | ||
body = { | ||
"query": { | ||
"knn": { | ||
"vec": {"vector": q.tolist(), "k": n} | ||
} | ||
} | ||
} | ||
|
||
res = self.es.search(index=self.name, body=body, size=n, _source=False, docvalue_fields=['id'], | ||
stored_fields="_none_", filter_path=["hits.hits.fields.id"], request_timeout=10) | ||
|
||
return [int(h['fields']['id'][0]) - 1 for h in res['hits']['hits']] | ||
|
||
def batch_query(self, X, n): | ||
self.batch_res = [self.query(q, n) for q in X] | ||
|
||
def get_batch_results(self): | ||
return self.batch_res | ||
|
||
def freeIndex(self): | ||
self.es.indices.delete(index=self.name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# Warning! Do not use this config in production! This is only for testing and security has been turned off. | ||
|
||
FROM ann-benchmarks | ||
|
||
WORKDIR /home/app | ||
|
||
# Install Open Distro following instructions from https://opendistro.github.io/for-elasticsearch-docs/docs/install/deb/ | ||
RUN apt-get install software-properties-common -y | ||
RUN add-apt-repository ppa:openjdk-r/ppa \ | ||
&& apt update \ | ||
&& apt install openjdk-11-jdk -y | ||
RUN apt install unzip -y \ | ||
&& apt-get install wget -y | ||
RUN wget -qO - https://d3g5vo6xdbdb9a.cloudfront.net/GPG-KEY-opendistroforelasticsearch | apt-key add - | ||
RUN echo "deb https://d3g5vo6xdbdb9a.cloudfront.net/apt stable main" | tee -a /etc/apt/sources.list.d/opendistroforelasticsearch.list | ||
RUN wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.1-amd64.deb \ | ||
&& dpkg -i elasticsearch-oss-7.9.1-amd64.deb | ||
RUN apt-get update \ | ||
&& apt install opendistroforelasticsearch -y | ||
|
||
# Install python client. | ||
RUN python3 -m pip install --upgrade elasticsearch==7.9.1 | ||
|
||
# Configure elasticsearch and JVM for single-node, single-core. | ||
RUN echo '\ | ||
opendistro_security.disabled: true\n\ | ||
discovery.type: single-node\n\ | ||
network.host: 0.0.0.0\n\ | ||
node.master: true\n\ | ||
node.data: true\n\ | ||
node.processors: 1\n\ | ||
thread_pool.write.size: 1\n\ | ||
thread_pool.search.size: 1\n\ | ||
thread_pool.search.queue_size: 1\n\ | ||
path.data: /var/lib/elasticsearch\n\ | ||
path.logs: /var/log/elasticsearch\n\ | ||
' > /etc/elasticsearch/elasticsearch.yml | ||
|
||
RUN echo '\ | ||
-Xms3G\n\ | ||
-Xmx3G\n\ | ||
-XX:+UseG1GC\n\ | ||
-XX:G1ReservePercent=25\n\ | ||
-XX:InitiatingHeapOccupancyPercent=30\n\ | ||
-XX:+HeapDumpOnOutOfMemoryError\n\ | ||
-XX:HeapDumpPath=/var/lib/elasticsearch\n\ | ||
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log\n\ | ||
-Xlog:gc*,gc+age=trace,safepoint:file=/var/log/elasticsearch/gc.log:utctime,pid,tags:filecount=32,filesize=64m' > /etc/elasticsearch/jvm.options | ||
|
||
# Make sure you can start the service. | ||
RUN service elasticsearch start && service elasticsearch stop | ||
|
||
# Custom entrypoint that also starts the Elasticsearch server. | ||
RUN echo 'service elasticsearch start && python3 -u run_algorithm.py "$@"' > entrypoint.sh | ||
ENTRYPOINT ["/bin/bash", "/home/app/entrypoint.sh"] |