diff --git a/.github/workflows/linux_ci.yml b/.github/workflows/linux_ci.yml index 530626e8e5..3b09144185 100644 --- a/.github/workflows/linux_ci.yml +++ b/.github/workflows/linux_ci.yml @@ -298,6 +298,8 @@ jobs: pip install ui/ - name: Run tests + env: + PINECONE_API_KEY: ${{ secrets.PINECONE_API_KEY }} run: pytest -s ${{ matrix.test-path }} @@ -346,3 +348,36 @@ jobs: run: | export MILVUS1_ENABLED=1 pytest -s test/test_document_store.py test/test_eval.py test/test_faiss_and_milvus.py test/test_pipeline.py test/test_retriever.py test/test_standard_pipelines.py --document_store_type="milvus1" + + test-pinecone: + needs: build-cache + runs-on: ubuntu-20.04 + + steps: + - uses: actions/checkout@v2 + - run: echo "date=$(date +'%Y-%m-%d')" >> $GITHUB_ENV + + - name: Set up Python 3.7 + uses: actions/setup-python@v2 + with: + python-version: 3.7 + + - name: Cache Python + uses: actions/cache@v2 + with: + path: ${{ env.pythonLocation }} + key: linux-${{ env.date }}-${{ hashFiles('**/setup.py') }}-${{ hashFiles('**/setup.cfg') }}-${{ hashFiles('**/pyproject.toml') }} + + - name: Install pdftotext + run: wget --no-check-certificate https://dl.xpdfreader.com/xpdf-tools-linux-4.03.tar.gz && tar -xvf xpdf-tools-linux-4.03.tar.gz && sudo cp xpdf-tools-linux-4.03/bin64/pdftotext /usr/local/bin + + # Haystack needs to be reinstalled at this stage to make sure the current commit's version is the one getting tested. + # The cache can last way longer than a specific action's run, so older Haystack version could be carried over. + - name: Reinstall Haystack + run: | + pip install .[test] + + - name: Run tests + env: + PINECONE_API_KEY: ${{ secrets.PINECONE_API_KEY }} + run: pytest -s test/test_document_store.py test/test_pipeline.py test/test_standard_pipelines.py test/test_pipeline_extractive_qa.py --document_store_type="pinecone" diff --git a/docs/_src/api/api/document_store.md b/docs/_src/api/api/document_store.md index 5fec82f662..c5422c956c 100644 --- a/docs/_src/api/api/document_store.md +++ b/docs/_src/api/api/document_store.md @@ -4120,6 +4120,337 @@ exists. None + + +# Module pinecone + + + +## PineconeDocumentStore + +```python +class PineconeDocumentStore(SQLDocumentStore) +``` + +Document store for very large scale embedding based dense retrievers like the DPR. This is a hosted document store, +this means that your vectors will not be stored locally but in the cloud. This means that the similarity +search will be run on the cloud as well. + +It implements the Pinecone vector database ([https://www.pinecone.io](https://www.pinecone.io)) +to perform similarity search on vectors. In order to use this document store, you need an API key that you can +obtain by creating an account on the [Pinecone website](https://www.pinecone.io). + +The document text is stored using the SQLDocumentStore, while +the vector embeddings and metadata (for filtering) are indexed in a Pinecone Index. + + + +#### \_\_init\_\_ + +```python +def __init__(api_key: str, environment: str = "us-west1-gcp", sql_url: str = "sqlite:///pinecone_document_store.db", pinecone_index: Optional[pinecone.Index] = None, embedding_dim: int = 768, return_embedding: bool = False, index: str = "document", similarity: str = "cosine", replicas: int = 1, shards: int = 1, embedding_field: str = "embedding", progress_bar: bool = True, duplicate_documents: str = "overwrite") +``` + +**Arguments**: + +- `api_key`: Pinecone vector database API key ([https://app.pinecone.io](https://app.pinecone.io)). +- `environment`: Pinecone cloud environment uses `"us-west1-gcp"` by default. Other GCP and AWS regions are +supported, contact Pinecone [here](https://www.pinecone.io/contact/) if required. +- `sql_url`: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale +deployment, Postgres is recommended. +- `pinecone_index`: pinecone-client Index object, an index will be initialized or loaded if not specified. +- `embedding_dim`: The embedding vector size. +- `return_embedding`: Whether to return document embeddings. +- `index`: Name of index in document store to use. +- `similarity`: The similarity function used to compare document vectors. `"dot_product"` is the default +since it is more performant with DPR embeddings. `"cosine"` is recommended if you are using a +Sentence-Transformer model. +In both cases, the returned values in Document.score are normalized to be in range [0,1]: + - For `"dot_product"`: `expit(np.asarray(raw_score / 100))` + - For `"cosine"`: `(raw_score + 1) / 2` +- `replicas`: The number of replicas. Replicas duplicate the index. They provide higher availability and +throughput. +- `shards`: The number of shards to be used in the index. We recommend to use 1 shard per 1GB of data. +- `embedding_field`: Name of field containing an embedding vector. +- `progress_bar`: Whether to show a tqdm progress bar or not. +Can be helpful to disable in production deployments to keep the logs clean. +- `duplicate_documents`: Handle duplicate documents based on parameter options.\ +Parameter options: + - `"skip"`: Ignore the duplicate documents. + - `"overwrite"`: Update any existing documents with the same ID when adding documents. + - `"fail"`: An error is raised if the document ID of the document being added already exists. + + + +#### write\_documents + +```python +def write_documents(documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 32, duplicate_documents: Optional[str] = None, headers: Optional[Dict[str, str]] = None) +``` + +Add new documents to the DocumentStore. + +**Arguments**: + +- `documents`: List of `Dicts` or list of `Documents`. If they already contain embeddings, we'll index them +right away in Pinecone. If not, you can later call `update_embeddings()` to create & index them. +- `index`: Index name for storing the docs and metadata. +- `batch_size`: Number of documents to process at a time. When working with large number of documents, +batching can help to reduce the memory footprint. +- `duplicate_documents`: handle duplicate documents based on parameter options. +Parameter options: + - `"skip"`: Ignore the duplicate documents. + - `"overwrite"`: Update any existing documents with the same ID when adding documents. + - `"fail"`: An error is raised if the document ID of the document being added already exists. +- `headers`: PineconeDocumentStore does not support headers. + +**Raises**: + +- `DuplicateDocumentError`: Exception trigger on duplicate document. + + + +#### update\_embeddings + +```python +def update_embeddings(retriever: "BaseRetriever", index: Optional[str] = None, update_existing_embeddings: bool = True, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, batch_size: int = 32) +``` + +Updates the embeddings in the document store using the encoding model specified in the retriever. + +This can be useful if you want to add or change the embeddings for your documents (e.g. after changing the +retriever config). + +**Arguments**: + +- `retriever`: Retriever to use to get embeddings for text. +- `index`: Index name for which embeddings are to be updated. If set to `None`, the default `self.index` is +used. +- `update_existing_embeddings`: Whether to update existing embeddings of the documents. If set to `False`, +only documents without embeddings are processed. This mode can be used for incremental updating of +embeddings, wherein, only newly indexed documents get processed. +- `filters`: Optional filters to narrow down the documents for which embeddings are to be updated. +Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical +operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, +`"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. +Logical operator keys take a dictionary of metadata field names and/or logical operators as +value. Metadata field names take a dictionary of comparison operators as value. Comparison +operator keys take a single value or (in case of `"$in"`) a list of values as value. +If no logical operator is provided, `"$and"` is used as default operation. If no comparison +operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default +operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` +- `batch_size`: Number of documents to process at a time. When working with large number of documents, +batching can help reduce memory footprint. + + + +#### get\_all\_documents\_generator + +```python +def get_all_documents_generator(index: Optional[str] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, return_embedding: Optional[bool] = None, batch_size: int = 32, headers: Optional[Dict[str, str]] = None) -> Generator[Document, None, None] +``` + +Get all documents from the document store. Under-the-hood, documents are fetched in batches from the + +document store and yielded as individual documents. This method can be used to iteratively process +a large number of documents without having to load all documents in memory. + +**Arguments**: + +- `index`: Name of the index to get the documents from. If None, the +DocumentStore's default index (self.index) will be used. +- `filters`: Optional filters to narrow down the documents for which embeddings are to be updated. +Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical +operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, +`"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. +Logical operator keys take a dictionary of metadata field names and/or logical operators as +value. Metadata field names take a dictionary of comparison operators as value. Comparison +operator keys take a single value or (in case of `"$in"`) a list of values as value. +If no logical operator is provided, `"$and"` is used as default operation. If no comparison +operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default +operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` +- `return_embedding`: Whether to return the document embeddings. +- `batch_size`: When working with large number of documents, batching can help reduce memory footprint. +- `headers`: PineconeDocumentStore does not support headers. + + + +#### get\_embedding\_count + +```python +def get_embedding_count(index: Optional[str] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None) -> int +``` + +Return the count of embeddings in the document store. + + + +#### update\_document\_meta + +```python +def update_document_meta(id: str, meta: Dict[str, str], index: str = None) +``` + +Update the metadata dictionary of a document by specifying its string id + + + +#### delete\_documents + +```python +def delete_documents(index: Optional[str] = None, ids: Optional[List[str]] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, headers: Optional[Dict[str, str]] = None) +``` + +Delete documents from the document store. + +**Arguments**: + +- `index`: Index name to delete the documents from. If `None`, the DocumentStore's default index +(`self.index`) will be used. +- `ids`: Optional list of IDs to narrow down the documents to be deleted. +- `filters`: Optional filters to narrow down the documents for which embeddings are to be updated. +Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical +operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, +`"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. +Logical operator keys take a dictionary of metadata field names and/or logical operators as +value. Metadata field names take a dictionary of comparison operators as value. Comparison +operator keys take a single value or (in case of `"$in"`) a list of values as value. +If no logical operator is provided, `"$and"` is used as default operation. If no comparison +operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default +operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` +- `headers`: PineconeDocumentStore does not support headers. + + + +#### query\_by\_embedding + +```python +def query_by_embedding(query_emb: np.ndarray, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, top_k: int = 10, index: Optional[str] = None, return_embedding: Optional[bool] = None, headers: Optional[Dict[str, str]] = None) -> List[Document] +``` + +Find the document that is most similar to the provided `query_emb` by using a vector similarity metric. + +**Arguments**: + +- `query_emb`: Embedding of the query (e.g. gathered from DPR). +- `filters`: Optional filters to narrow down the search space to documents whose metadata fulfill certain +conditions. +Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical +operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, +`"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. +Logical operator keys take a dictionary of metadata field names and/or logical operators as +value. Metadata field names take a dictionary of comparison operators as value. Comparison +operator keys take a single value or (in case of `"$in"`) a list of values as value. +If no logical operator is provided, `"$and"` is used as default operation. If no comparison +operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default +operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + # or simpler using default operators + filters = { + "type": "article", + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": ["economy", "politics"], + "publisher": "nytimes" + } + } + ``` + To use the same logical operator multiple times on the same level, logical operators take + optionally a list of dictionaries as value. + __Example__: + ```python + filters = { + "$or": [ + { + "$and": { + "Type": "News Paper", + "Date": { + "$lt": "2019-01-01" + } + } + }, + { + "$and": { + "Type": "Blog Post", + "Date": { + "$gte": "2019-01-01" + } + } + } + ] + } + ``` +- `top_k`: How many documents to return. +- `index`: The name of the index from which to retrieve documents. +- `return_embedding`: Whether to return document embedding. +- `headers`: PineconeDocumentStore does not support headers. + + + +#### load + +```python +@classmethod +def load(cls) +``` + +Default class method used for loading indexes. Not applicable to the PineconeDocumentStore. + # Module utils diff --git a/docs/_src/api/api/evaluation.md b/docs/_src/api/api/evaluation.md index 2cbf0b4dd9..fed855e80b 100644 --- a/docs/_src/api/api/evaluation.md +++ b/docs/_src/api/api/evaluation.md @@ -123,7 +123,7 @@ Print the evaluation results #### semantic\_answer\_similarity ```python -def semantic_answer_similarity(predictions: List[List[str]], gold_labels: List[List[str]], sas_model_name_or_path: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2") -> Tuple[List[float], List[float]] +def semantic_answer_similarity(predictions: List[List[str]], gold_labels: List[List[str]], sas_model_name_or_path: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", batch_size: int = 32, use_gpu: bool = True) -> Tuple[List[float], List[float]] ``` Computes Transformer-based similarity of predicted answer to gold labels to derive a more meaningful metric than EM or F1. @@ -137,6 +137,9 @@ Returns per QA pair a) the similarity of the most likely prediction (top 1) to a - `gold_labels`: Labels as list of multiple possible answers per question - `sas_model_name_or_path`: SentenceTransformers semantic textual similarity model, should be path or string pointing to downloadable models. +- `batch_size`: Number of prediction label pairs to encode at once. +- `use_gpu`: Whether to use a GPU or the CPU for calculating semantic answer similarity. +Falls back to CPU if no GPU is available. **Returns**: diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index 91f401853e..700eca06fd 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -440,7 +440,7 @@ then be found in the dict returned by this method under the key "_debug" ```python @send_event -def eval(labels: List[MultiLabel], documents: Optional[List[List[Document]]] = None, params: Optional[dict] = None, sas_model_name_or_path: str = None, add_isolated_node_eval: bool = False) -> EvaluationResult +def eval(labels: List[MultiLabel], documents: Optional[List[List[Document]]] = None, params: Optional[dict] = None, sas_model_name_or_path: str = None, sas_batch_size: int = 32, sas_use_gpu: bool = True, add_isolated_node_eval: bool = False) -> EvaluationResult ``` Evaluates the pipeline by running the pipeline once per query in debug mode @@ -466,6 +466,9 @@ If you use custom cross encoders please make sure they work with sentence_transf - Good default for multiple languages: "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" - Large, powerful, but slow model for English only: "cross-encoder/stsb-roberta-large" - Large model for German only: "deepset/gbert-large-sts" +- `sas_batch_size`: Number of prediction label pairs to encode at once by CrossEncoder or SentenceTransformer while calculating SAS. +- `sas_use_gpu`: Whether to use a GPU or the CPU for calculating semantic answer similarity. +Falls back to CPU if no GPU is available. - `add_isolated_node_eval`: If set to True, in addition to the integrated evaluation of the pipeline, each node is evaluated in isolated evaluation mode. This mode helps to understand the bottlenecks of a pipeline in terms of output quality of each individual node. If a node performs much better in the isolated evaluation than in the integrated evaluation, the previous node needs to be optimized to improve the pipeline's performance. diff --git a/docs/_src/api/pydoc/document-store.yml b/docs/_src/api/pydoc/document-store.yml index e70bb65457..ae233e1567 100644 --- a/docs/_src/api/pydoc/document-store.yml +++ b/docs/_src/api/pydoc/document-store.yml @@ -1,7 +1,7 @@ loaders: - type: python search_path: [../../../../haystack/document_stores] - modules: ['base', 'elasticsearch', 'memory', 'sql', 'faiss', 'milvus1', 'milvus2', 'weaviate', 'graphdb', 'deepsetcloud', 'utils'] + modules: ['base', 'elasticsearch', 'memory', 'sql', 'faiss', 'milvus1', 'milvus2', 'weaviate', 'graphdb', 'deepsetcloud', 'pinecone', 'utils'] ignore_when_discovered: ['__init__'] processors: - type: filter diff --git a/haystack/document_stores/__init__.py b/haystack/document_stores/__init__.py index 5b179d26ec..aef5320932 100644 --- a/haystack/document_stores/__init__.py +++ b/haystack/document_stores/__init__.py @@ -15,6 +15,7 @@ SQLDocumentStore = safe_import("haystack.document_stores.sql", "SQLDocumentStore", "sql") FAISSDocumentStore = safe_import("haystack.document_stores.faiss", "FAISSDocumentStore", "faiss") +PineconeDocumentStore = safe_import("haystack.document_stores.pinecone", "PineconeDocumentStore", "pinecone") if os.getenv("MILVUS1_ENABLED"): MilvusDocumentStore = safe_import("haystack.document_stores.milvus1", "Milvus1DocumentStore", "milvus1") else: diff --git a/haystack/document_stores/filter_utils.py b/haystack/document_stores/filter_utils.py index d1d69196a7..9b7bc8c5f7 100644 --- a/haystack/document_stores/filter_utils.py +++ b/haystack/document_stores/filter_utils.py @@ -145,6 +145,12 @@ def convert_to_weaviate(self): """ pass + def convert_to_pinecone(self): + """ + Converts the LogicalFilterClause instance to a Pinecone filter. + """ + pass + def _merge_es_range_queries(self, conditions: List[Dict]) -> List[Dict[str, Dict]]: """ Merges Elasticsearch range queries that perform on the same metadata field. @@ -237,6 +243,12 @@ def convert_to_weaviate(self): """ pass + def convert_to_pinecone(self): + """ + Converts the ComparisonOperation instance to a Pinecone comparison operator. + """ + pass + @abstractmethod def invert(self) -> "ComparisonOperation": """ @@ -309,6 +321,14 @@ def convert_to_weaviate(self) -> Dict[str, Union[str, int, float, bool, List[Dic else: return conditions[0] + def convert_to_pinecone(self) -> Dict[str, Union[str, int, float, bool, List[Dict]]]: + conditions = [condition.invert().convert_to_pinecone() for condition in self.conditions] + if len(conditions) > 1: + # Conditions in self.conditions are by default combined with AND which becomes OR according to DeMorgan + return {"$or": conditions} + else: + return conditions[0] + def invert(self) -> Union[LogicalFilterClause, ComparisonOperation]: # This method is called when a "$not" operation is embedded in another "$not" operation. Therefore, we don't # invert the operations here, as two "$not" operation annihilate each other. @@ -344,6 +364,10 @@ def convert_to_weaviate(self) -> Dict[str, Union[str, List[Dict]]]: conditions = [condition.convert_to_weaviate() for condition in self.conditions] return {"operator": "And", "operands": conditions} + def convert_to_pinecone(self) -> Dict[str, Union[str, List[Dict]]]: + conditions = [condition.convert_to_pinecone() for condition in self.conditions] + return {"$and": conditions} + def invert(self) -> "OrOperation": return OrOperation([condition.invert() for condition in self.conditions]) @@ -372,6 +396,10 @@ def convert_to_weaviate(self) -> Dict[str, Union[str, List[Dict]]]: conditions = [condition.convert_to_weaviate() for condition in self.conditions] return {"operator": "Or", "operands": conditions} + def convert_to_pinecone(self) -> Dict[str, Union[str, List[Dict]]]: + conditions = [condition.convert_to_pinecone() for condition in self.conditions] + return {"$or": conditions} + def invert(self) -> AndOperation: return AndOperation([condition.invert() for condition in self.conditions]) @@ -399,6 +427,9 @@ def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, int, float, boo comp_value_type, comp_value = self._get_weaviate_datatype() return {"path": [self.field_name], "operator": "Equal", comp_value_type: comp_value} + def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[List[str], str, int, float, bool]]]: + return {self.field_name: {"$eq": self.comparison_value}} + def invert(self) -> "NeOperation": return NeOperation(self.field_name, self.comparison_value) @@ -435,6 +466,10 @@ def convert_to_weaviate(self) -> Dict[str, Union[str, List[Dict]]]: return filter_dict + def convert_to_pinecone(self) -> Dict[str, Dict[str, List]]: + assert isinstance(self.comparison_value, list), "'$in' operation requires comparison value to be a list." + return {self.field_name: {"$in": self.comparison_value}} + def invert(self) -> "NinOperation": return NinOperation(self.field_name, self.comparison_value) @@ -462,6 +497,9 @@ def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, int, float, boo comp_value_type, comp_value = self._get_weaviate_datatype() return {"path": [self.field_name], "operator": "NotEqual", comp_value_type: comp_value} + def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[List[str], str, int, float, bool]]]: + return {self.field_name: {"$ne": self.comparison_value}} + def invert(self) -> "EqOperation": return EqOperation(self.field_name, self.comparison_value) @@ -498,6 +536,10 @@ def convert_to_weaviate(self) -> Dict[str, Union[str, List[Dict]]]: return filter_dict + def convert_to_pinecone(self) -> Dict[str, Dict[str, List]]: + assert isinstance(self.comparison_value, list), "'$in' operation requires comparison value to be a list." + return {self.field_name: {"$nin": self.comparison_value}} + def invert(self) -> "InOperation": return InOperation(self.field_name, self.comparison_value) @@ -526,6 +568,12 @@ def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, float, int]]: assert not isinstance(comp_value, list), "Comparison value for '$gt' operation must not be a list." return {"path": [self.field_name], "operator": "GreaterThan", comp_value_type: comp_value} + def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[float, int]]]: + assert not isinstance( + self.comparison_value, (list, str) + ), "Comparison value for '$gt' operation must be a float or int." + return {self.field_name: {"$gt": self.comparison_value}} + def invert(self) -> "LteOperation": return LteOperation(self.field_name, self.comparison_value) @@ -554,6 +602,12 @@ def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, float, int]]: assert not isinstance(comp_value, list), "Comparison value for '$gte' operation must not be a list." return {"path": [self.field_name], "operator": "GreaterThanEqual", comp_value_type: comp_value} + def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[float, int]]]: + assert not isinstance( + self.comparison_value, (list, str) + ), "Comparison value for '$gte' operation must be a float or int." + return {self.field_name: {"$gte": self.comparison_value}} + def invert(self) -> "LtOperation": return LtOperation(self.field_name, self.comparison_value) @@ -582,6 +636,12 @@ def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, float, int]]: assert not isinstance(comp_value, list), "Comparison value for '$lt' operation must not be a list." return {"path": [self.field_name], "operator": "LessThan", comp_value_type: comp_value} + def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[float, int]]]: + assert not isinstance( + self.comparison_value, (list, str) + ), "Comparison value for '$lt' operation must be a float or int." + return {self.field_name: {"$lt": self.comparison_value}} + def invert(self) -> "GteOperation": return GteOperation(self.field_name, self.comparison_value) @@ -610,5 +670,11 @@ def convert_to_weaviate(self) -> Dict[str, Union[List[str], str, float, int]]: assert not isinstance(comp_value, list), "Comparison value for '$lte' operation must not be a list." return {"path": [self.field_name], "operator": "LessThanEqual", comp_value_type: comp_value} + def convert_to_pinecone(self) -> Dict[str, Dict[str, Union[float, int]]]: + assert not isinstance( + self.comparison_value, (list, str) + ), "Comparison value for '$lte' operation must be a float or int." + return {self.field_name: {"$lte": self.comparison_value}} + def invert(self) -> "GtOperation": return GtOperation(self.field_name, self.comparison_value) diff --git a/haystack/document_stores/pinecone.py b/haystack/document_stores/pinecone.py new file mode 100644 index 0000000000..a964d57fb8 --- /dev/null +++ b/haystack/document_stores/pinecone.py @@ -0,0 +1,698 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from haystack.nodes.retriever import BaseRetriever + +import logging +from typing import Union, List, Optional, Dict, Generator +from tqdm.auto import tqdm + +import pinecone +import numpy as np + +from haystack.schema import Document +from haystack.document_stores.sql import SQLDocumentStore +from haystack.document_stores.base import get_batches_from_generator +from haystack.document_stores.filter_utils import LogicalFilterClause +from haystack.errors import DocumentStoreError + + +logger = logging.getLogger(__name__) + + +class PineconeDocumentStore(SQLDocumentStore): + """ + Document store for very large scale embedding based dense retrievers like the DPR. This is a hosted document store, + this means that your vectors will not be stored locally but in the cloud. This means that the similarity + search will be run on the cloud as well. + + It implements the Pinecone vector database ([https://www.pinecone.io](https://www.pinecone.io)) + to perform similarity search on vectors. In order to use this document store, you need an API key that you can + obtain by creating an account on the [Pinecone website](https://www.pinecone.io). + + The document text is stored using the SQLDocumentStore, while + the vector embeddings and metadata (for filtering) are indexed in a Pinecone Index. + """ + + top_k_limit = 10_000 + top_k_limit_vectors = 1_000 + + def __init__( + self, + api_key: str, + environment: str = "us-west1-gcp", + sql_url: str = "sqlite:///pinecone_document_store.db", + pinecone_index: Optional[pinecone.Index] = None, + embedding_dim: int = 768, + return_embedding: bool = False, + index: str = "document", + similarity: str = "cosine", + replicas: int = 1, + shards: int = 1, + embedding_field: str = "embedding", + progress_bar: bool = True, + duplicate_documents: str = "overwrite", + ): + """ + :param api_key: Pinecone vector database API key ([https://app.pinecone.io](https://app.pinecone.io)). + :param environment: Pinecone cloud environment uses `"us-west1-gcp"` by default. Other GCP and AWS regions are + supported, contact Pinecone [here](https://www.pinecone.io/contact/) if required. + :param sql_url: SQL connection URL for database. It defaults to local file based SQLite DB. For large scale + deployment, Postgres is recommended. + :param pinecone_index: pinecone-client Index object, an index will be initialized or loaded if not specified. + :param embedding_dim: The embedding vector size. + :param return_embedding: Whether to return document embeddings. + :param index: Name of index in document store to use. + :param similarity: The similarity function used to compare document vectors. `"dot_product"` is the default + since it is more performant with DPR embeddings. `"cosine"` is recommended if you are using a + Sentence-Transformer model. + In both cases, the returned values in Document.score are normalized to be in range [0,1]: + - For `"dot_product"`: `expit(np.asarray(raw_score / 100))` + - For `"cosine"`: `(raw_score + 1) / 2` + :param replicas: The number of replicas. Replicas duplicate the index. They provide higher availability and + throughput. + :param shards: The number of shards to be used in the index. We recommend to use 1 shard per 1GB of data. + :param embedding_field: Name of field containing an embedding vector. + :param progress_bar: Whether to show a tqdm progress bar or not. + Can be helpful to disable in production deployments to keep the logs clean. + :param duplicate_documents: Handle duplicate documents based on parameter options.\ + + Parameter options: + - `"skip"`: Ignore the duplicate documents. + - `"overwrite"`: Update any existing documents with the same ID when adding documents. + - `"fail"`: An error is raised if the document ID of the document being added already exists. + """ + + # Connect to Pinecone server using python client binding + pinecone.init(api_key=api_key, environment=environment) + self._api_key = api_key + + # Formal similarity string + if similarity in ("dot_product", "cosine"): + self.metric_type = similarity + elif similarity in ("l2", "euclidean"): + self.metric_type = "euclidean" + else: + raise ValueError( + "The Pinecone document store can currently only support dot_product, cosine and euclidean metrics. " + "Please set similarity to one of the above." + ) + + self.index = index + self.embedding_dim = embedding_dim + self.return_embedding = return_embedding + self.embedding_field = embedding_field + self.progress_bar = progress_bar + self.duplicate_documents = duplicate_documents + + # Pinecone index params + self.replicas = replicas + self.shards = shards + + # Initialize dictionary of index connections + self.pinecone_indexes: Dict[str, pinecone.Index] = {} + clean_index = self._sanitize_index_name(index) + if pinecone_index: + self.pinecone_indexes[clean_index] = pinecone_index + else: + self.pinecone_indexes[clean_index] = self._create_index_if_not_exist( + embedding_dim=self.embedding_dim, + index=clean_index, + metric_type=self.metric_type, + replicas=self.replicas, + shards=self.shards, + ) + + self.return_embedding = return_embedding + self.embedding_field = embedding_field + + self.progress_bar = progress_bar + + super().__init__(url=sql_url, index=clean_index, duplicate_documents=duplicate_documents) + + # self._validate_index_sync() + + def _sanitize_index_name(self, index: str) -> str: + return index.replace("_", "-").lower() + + def _create_index_if_not_exist( + self, + embedding_dim: int, + index: Optional[str] = None, + metric_type: Optional[str] = "cosine", + replicas: Optional[int] = 1, + shards: Optional[int] = 1, + ): + """ + Create a new index for storing documents in case an + index with the name doesn't exist already. + """ + index = index or self.index + index = self._sanitize_index_name(index) + + # Skip if already exists + if index in self.pinecone_indexes.keys(): + index_connection = self.pinecone_indexes[index] + else: + # Search pinecone hosted indexes and create an index if it does not exist + if index not in pinecone.list_indexes(): + pinecone.create_index( + name=index, dimension=embedding_dim, metric=metric_type, replicas=replicas, shards=shards + ) + index_connection = pinecone.Index(index) + + # Get index statistics + stats = index_connection.describe_index_stats() + dims = stats["dimension"] + count = stats["namespaces"][""]["vector_count"] if stats["namespaces"].get("") else 0 + logger.info(f"Index statistics: name: {index}, embedding dimensions: {dims}, record count: {count}") + # return index connection + return index_connection + + def _validate_index_sync(self): + """ + This check ensures the correct document database was loaded. If it fails, make sure you provided the same path + to the SQL database as when you created the original Pinecone index. + """ + if not self.get_document_count() == self.get_embedding_count(): + raise DocumentStoreError( + "The number of documents present in the SQL database does not " + "match the number of embeddings in Pinecone. Make sure your Pinecone " + "index aligns to the same database that was used when creating the " + "original index." + ) + + def write_documents( + self, + documents: Union[List[dict], List[Document]], + index: Optional[str] = None, + batch_size: int = 32, + duplicate_documents: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + ): + """ + Add new documents to the DocumentStore. + + :param documents: List of `Dicts` or list of `Documents`. If they already contain embeddings, we'll index them + right away in Pinecone. If not, you can later call `update_embeddings()` to create & index them. + :param index: Index name for storing the docs and metadata. + :param batch_size: Number of documents to process at a time. When working with large number of documents, + batching can help to reduce the memory footprint. + :param duplicate_documents: handle duplicate documents based on parameter options. + + Parameter options: + - `"skip"`: Ignore the duplicate documents. + - `"overwrite"`: Update any existing documents with the same ID when adding documents. + - `"fail"`: An error is raised if the document ID of the document being added already exists. + :param headers: PineconeDocumentStore does not support headers. + :raises DuplicateDocumentError: Exception trigger on duplicate document. + """ + if headers: + raise NotImplementedError("PineconeDocumentStore does not support headers.") + + index = index or self.index + index = self._sanitize_index_name(index) + duplicate_documents = duplicate_documents or self.duplicate_documents + assert ( + duplicate_documents in self.duplicate_documents_options + ), f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" + + if index not in self.pinecone_indexes: + self.pinecone_indexes[index] = self._create_index_if_not_exist( + embedding_dim=self.embedding_dim, + index=index, + metric_type=self.metric_type, + replicas=self.replicas, + shards=self.shards, + ) + + field_map = self._create_document_field_map() + document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] + document_objects = self._handle_duplicate_documents( + documents=document_objects, index=index, duplicate_documents=duplicate_documents + ) + if len(document_objects) > 0: + add_vectors = False if document_objects[0].embedding is None else True + with tqdm( + total=len(document_objects), disable=not self.progress_bar, position=0, desc="Writing Documents" + ) as progress_bar: + for i in range(0, len(document_objects), batch_size): + ids = [doc.id for doc in document_objects[i : i + batch_size]] + metadata = [doc.meta for doc in document_objects[i : i + batch_size]] + if add_vectors: + embeddings = [doc.embedding for doc in document_objects[i : i + batch_size]] + embeddings_to_index = np.array(embeddings, dtype="float32") + + if self.similarity == "cosine": + self.normalize_embedding(embeddings_to_index) + # Convert embeddings to list objects + embeddings = [embed.tolist() for embed in embeddings] + data_to_write_to_pinecone = zip(ids, embeddings, metadata) + # Metadata fields and embeddings are stored in Pinecone + self.pinecone_indexes[index].upsert(vectors=data_to_write_to_pinecone) + + docs_to_write_to_sql = document_objects[i : i + batch_size] + super(PineconeDocumentStore, self).write_documents( + docs_to_write_to_sql, index=index, duplicate_documents=duplicate_documents + ) + progress_bar.update(batch_size) + progress_bar.close() + + def _create_document_field_map(self) -> Dict: + return {self.embedding_field: "embedding"} + + def update_embeddings( + self, + retriever: "BaseRetriever", + index: Optional[str] = None, + update_existing_embeddings: bool = True, + filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, + batch_size: int = 32, + ): + """ + Updates the embeddings in the document store using the encoding model specified in the retriever. + This can be useful if you want to add or change the embeddings for your documents (e.g. after changing the + retriever config). + + :param retriever: Retriever to use to get embeddings for text. + :param index: Index name for which embeddings are to be updated. If set to `None`, the default `self.index` is + used. + :param update_existing_embeddings: Whether to update existing embeddings of the documents. If set to `False`, + only documents without embeddings are processed. This mode can be used for incremental updating of + embeddings, wherein, only newly indexed documents get processed. + :param filters: Optional filters to narrow down the documents for which embeddings are to be updated. + Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical + operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, + `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. + Logical operator keys take a dictionary of metadata field names and/or logical operators as + value. Metadata field names take a dictionary of comparison operators as value. Comparison + operator keys take a single value or (in case of `"$in"`) a list of values as value. + If no logical operator is provided, `"$and"` is used as default operation. If no comparison + operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default + operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` + :param batch_size: Number of documents to process at a time. When working with large number of documents, + batching can help reduce memory footprint. + """ + index = index or self.index + index = self._sanitize_index_name(index) + + if index not in self.pinecone_indexes: + raise ValueError( + f"Couldn't find a the index '{index}' in Pinecone. Try to init the " + f"PineconeDocumentStore() again ..." + ) + + document_count = self.get_document_count(index=index, filters=filters) + if document_count == 0: + logger.warning("Calling DocumentStore.update_embeddings() on an empty index") + return + + logger.info(f"Updating embeddings for {document_count} docs...") + + result = self._query( + index=index, + vector_ids=None, + batch_size=batch_size, + filters=filters, + only_documents_without_embedding=not update_existing_embeddings, + ) + batched_documents = get_batches_from_generator(result, batch_size) + with tqdm( + total=document_count, disable=not self.progress_bar, position=0, unit=" docs", desc="Updating Embedding" + ) as progress_bar: + for document_batch in batched_documents: + embeddings = retriever.embed_documents(document_batch) # type: ignore + assert len(document_batch) == len(embeddings) + + embeddings_to_index = np.array(embeddings, dtype="float32") + if self.similarity == "cosine": + self.normalize_embedding(embeddings_to_index) + embeddings = embeddings_to_index.tolist() + + metadata = [] + ids = [] + for doc in document_batch: + metadata.append(doc.meta) + ids.append(doc.id) + # update existing vectors in pinecone index + self.pinecone_indexes[index].upsert(vectors=zip(ids, embeddings, metadata)) + + progress_bar.set_description_str("Documents Processed") + progress_bar.update(batch_size) + + def get_all_documents( + self, + index: Optional[str] = None, + filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, + return_embedding: Optional[bool] = None, + batch_size: int = 32, + headers: Optional[Dict[str, str]] = None, + ) -> List[Document]: + + if headers: + raise NotImplementedError("PineconeDocumentStore does not support headers.") + + result = self.get_all_documents_generator( + index=index, filters=filters, return_embedding=return_embedding, batch_size=batch_size + ) + documents = list(result) + return documents + + def get_all_documents_generator( + self, + index: Optional[str] = None, + filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, + return_embedding: Optional[bool] = None, + batch_size: int = 32, + headers: Optional[Dict[str, str]] = None, + ) -> Generator[Document, None, None]: + """ + Get all documents from the document store. Under-the-hood, documents are fetched in batches from the + document store and yielded as individual documents. This method can be used to iteratively process + a large number of documents without having to load all documents in memory. + + :param index: Name of the index to get the documents from. If None, the + DocumentStore's default index (self.index) will be used. + :param filters: Optional filters to narrow down the documents for which embeddings are to be updated. + Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical + operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, + `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. + Logical operator keys take a dictionary of metadata field names and/or logical operators as + value. Metadata field names take a dictionary of comparison operators as value. Comparison + operator keys take a single value or (in case of `"$in"`) a list of values as value. + If no logical operator is provided, `"$and"` is used as default operation. If no comparison + operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default + operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` + :param return_embedding: Whether to return the document embeddings. + :param batch_size: When working with large number of documents, batching can help reduce memory footprint. + :param headers: PineconeDocumentStore does not support headers. + """ + if headers: + raise NotImplementedError("PineconeDocumentStore does not support headers.") + if return_embedding is None: + return_embedding = self.return_embedding + + index = index or self.index + index = self._sanitize_index_name(index) + documents = super(PineconeDocumentStore, self).get_all_documents_generator( + index=index, filters=filters, batch_size=batch_size, return_embedding=False + ) + + for doc in documents: + if return_embedding: + self._attach_embedding_to_document(document=doc, index=index) + yield doc + + def get_documents_by_id( + self, + ids: List[str], + index: Optional[str] = None, + batch_size: int = 32, + headers: Optional[Dict[str, str]] = None, + return_embedding: Optional[bool] = None, + ) -> List[Document]: + + if headers: + raise NotImplementedError("PineconeDocumentStore does not support headers.") + + if return_embedding is None: + return_embedding = self.return_embedding + + index = index or self.index + index = self._sanitize_index_name(index) + + documents = super().get_documents_by_id(ids=ids, index=index, batch_size=batch_size) + if return_embedding: + for doc in documents: + self._attach_embedding_to_document(document=doc, index=index) + + return documents + + def get_embedding_count( + self, index: Optional[str] = None, filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None + ) -> int: + """ + Return the count of embeddings in the document store. + """ + if filters: + raise NotImplementedError("Filters are not supported for get_embedding_count in PineconeDocumentStore") + + index = index or self.index + index = self._sanitize_index_name(index) + if not self.pinecone_indexes.get(index, False): + raise ValueError(f"No index named {index} found in Pinecone.") + + stats = self.pinecone_indexes[index].describe_index_stats() + # if no namespace return zero + count = stats["namespaces"][""]["vector_count"] if "" in stats["namespaces"] else 0 + return count + + def update_document_meta(self, id: str, meta: Dict[str, str], index: str = None): + """ + Update the metadata dictionary of a document by specifying its string id + """ + index = index or self.index + index = self._sanitize_index_name(index) + if index in self.pinecone_indexes: + doc = self.get_documents_by_id(ids=[id], index=index, return_embedding=True)[0] + if doc.embedding is not None: + self.pinecone_indexes[index].upsert(vectors=([id], [doc.embedding.tolist()], [meta])) + + super().update_document_meta(id=id, meta=meta, index=index) + + def delete_documents( + self, + index: Optional[str] = None, + ids: Optional[List[str]] = None, + filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, + headers: Optional[Dict[str, str]] = None, + ): + """ + Delete documents from the document store. + + :param index: Index name to delete the documents from. If `None`, the DocumentStore's default index + (`self.index`) will be used. + :param ids: Optional list of IDs to narrow down the documents to be deleted. + :param filters: Optional filters to narrow down the documents for which embeddings are to be updated. + Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical + operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, + `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. + Logical operator keys take a dictionary of metadata field names and/or logical operators as + value. Metadata field names take a dictionary of comparison operators as value. Comparison + operator keys take a single value or (in case of `"$in"`) a list of values as value. + If no logical operator is provided, `"$and"` is used as default operation. If no comparison + operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default + operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + ``` + :param headers: PineconeDocumentStore does not support headers. + """ + if headers: + raise NotImplementedError("PineconeDocumentStore does not support headers.") + + index = index or self.index + index = self._sanitize_index_name(index) + if index in self.pinecone_indexes: + if ids is None and filters is None: + self.pinecone_indexes[index].delete(delete_all=True) + else: + affected_docs = self.get_all_documents(filters=filters, return_embedding=False) + if ids: + affected_docs = [doc for doc in affected_docs if doc.id in ids] + + doc_ids = [doc.id for doc in affected_docs] + self.pinecone_indexes[index].delete(ids=doc_ids) + + super().delete_documents(index=index, ids=ids, filters=filters) + + def query_by_embedding( + self, + query_emb: np.ndarray, + filters: Optional[Dict[str, Union[Dict, List, str, int, float, bool]]] = None, + top_k: int = 10, + index: Optional[str] = None, + return_embedding: Optional[bool] = None, + headers: Optional[Dict[str, str]] = None, + ) -> List[Document]: + """ + Find the document that is most similar to the provided `query_emb` by using a vector similarity metric. + + :param query_emb: Embedding of the query (e.g. gathered from DPR). + :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain + conditions. + Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical + operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, + `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. + Logical operator keys take a dictionary of metadata field names and/or logical operators as + value. Metadata field names take a dictionary of comparison operators as value. Comparison + operator keys take a single value or (in case of `"$in"`) a list of values as value. + If no logical operator is provided, `"$and"` is used as default operation. If no comparison + operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default + operation. + __Example__: + ```python + filters = { + "$and": { + "type": {"$eq": "article"}, + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": {"$in": ["economy", "politics"]}, + "publisher": {"$eq": "nytimes"} + } + } + } + # or simpler using default operators + filters = { + "type": "article", + "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, + "rating": {"$gte": 3}, + "$or": { + "genre": ["economy", "politics"], + "publisher": "nytimes" + } + } + ``` + To use the same logical operator multiple times on the same level, logical operators take + optionally a list of dictionaries as value. + __Example__: + ```python + filters = { + "$or": [ + { + "$and": { + "Type": "News Paper", + "Date": { + "$lt": "2019-01-01" + } + } + }, + { + "$and": { + "Type": "Blog Post", + "Date": { + "$gte": "2019-01-01" + } + } + } + ] + } + ``` + :param top_k: How many documents to return. + :param index: The name of the index from which to retrieve documents. + :param return_embedding: Whether to return document embedding. + :param headers: PineconeDocumentStore does not support headers. + """ + if headers: + raise NotImplementedError("PineconeDocumentStore does not support headers.") + + if return_embedding is None: + return_embedding = self.return_embedding + self._limit_check(top_k, include_values=return_embedding) + + if filters is not None: + filters = LogicalFilterClause.parse(filters).convert_to_pinecone() + + index = index or self.index + index = self._sanitize_index_name(index) + + if index not in self.pinecone_indexes: + raise DocumentStoreError( + f"Index named '{index}' does not exist. Try reinitializing PineconeDocumentStore() and running " + f"'update_embeddings()' to create and populate an index." + ) + + query_emb = query_emb.reshape(1, -1).astype(np.float32) + if self.similarity == "cosine": + self.normalize_embedding(query_emb) + + res = self.pinecone_indexes[index].query(query_emb.tolist(), top_k=top_k, include_values=False, filter=filters) + + score_matrix = [] + vector_id_matrix = [] + for match in res["results"][0]["matches"]: + score_matrix.append(match["score"]) + vector_id_matrix.append(match["id"]) + documents = self.get_documents_by_id(vector_id_matrix, index=index, return_embedding=return_embedding) + + # assign query score to each document + scores_for_vector_ids: Dict[str, float] = {str(v_id): s for v_id, s in zip(vector_id_matrix, score_matrix)} + for i, doc in enumerate(documents): + raw_score = scores_for_vector_ids[doc.id] + doc.score = self.finalize_raw_score(raw_score, self.similarity) + + return documents + + def _attach_embedding_to_document(self, document: Document, index: str): + """ + Fetches the Document's embedding from the specified Pinecone index and attaches it to the Document's + embedding field. + """ + result = self.pinecone_indexes[index].fetch(ids=[document.id]) + if result["vectors"].get(document.id, False): + embedding = result["vectors"][document.id].get("values", None) + document.embedding = np.asarray(embedding, dtype=np.float32) + + def _limit_check(self, top_k: int, include_values: Optional[bool] = None): + """ + Confirms the top_k value does not exceed Pinecone vector database limits. + """ + if include_values: + if top_k > self.top_k_limit_vectors: + raise DocumentStoreError( + f"PineconeDocumentStore allows requests of no more than {self.top_k_limit_vectors} records " + f"when returning embedding values. This request is attempting to return {top_k} records." + ) + else: + if top_k > self.top_k_limit: + raise DocumentStoreError( + f"PineconeDocumentStore allows requests of no more than {self.top_k_limit} records. " + f"This request is attempting to return {top_k} records." + ) + + @classmethod + def load(cls): + """ + Default class method used for loading indexes. Not applicable to the PineconeDocumentStore. + """ + raise NotImplementedError("load method not supported for PineconeDocumentStore") diff --git a/haystack/nodes/evaluator/evaluator.py b/haystack/nodes/evaluator/evaluator.py index 134c855ed1..6e4ac3184b 100644 --- a/haystack/nodes/evaluator/evaluator.py +++ b/haystack/nodes/evaluator/evaluator.py @@ -394,7 +394,7 @@ def semantic_answer_similarity( gold_labels: List[List[str]], sas_model_name_or_path: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", batch_size: int = 32, - use_gpu: bool = True + use_gpu: bool = True, ) -> Tuple[List[float], List[float]]: """ Computes Transformer-based similarity of predicted answer to gold labels to derive a more meaningful metric than EM or F1. @@ -416,8 +416,8 @@ def semantic_answer_similarity( cross_encoder_used = False if config.architectures is not None: cross_encoder_used = any(arch.endswith("ForSequenceClassification") for arch in config.architectures) - - device = None if use_gpu else 'cpu' + + device = None if use_gpu else "cpu" # Compute similarities top_1_sas = [] diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index 29ef831086..c45655f452 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -768,8 +768,11 @@ def eval( gold_labels = df["gold_answers"].values predictions = [[a] for a in df["answer"].values] sas, _ = semantic_answer_similarity( - predictions=predictions, gold_labels=gold_labels, sas_model_name_or_path=sas_model_name_or_path, - batch_size=sas_batch_size, use_gpu=sas_use_gpu + predictions=predictions, + gold_labels=gold_labels, + sas_model_name_or_path=sas_model_name_or_path, + batch_size=sas_batch_size, + use_gpu=sas_use_gpu, ) df["sas"] = sas diff --git a/setup.cfg b/setup.cfg index 66649e1de8..3fcc41fdb7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -143,12 +143,16 @@ milvus = farm-haystack[sql,only-milvus] weaviate = weaviate-client==2.5.0 +only-pinecone = + pinecone-client +pinecone = + farm-haystack[sql,only-pinecone] graphdb = SPARQLWrapper docstores = - farm-haystack[faiss,milvus,weaviate,graphdb] + farm-haystack[faiss,milvus,weaviate,graphdb,pinecone] docstores-gpu = - farm-haystack[faiss-gpu,milvus,weaviate,graphdb] + farm-haystack[faiss-gpu,milvus,weaviate,graphdb,pinecone] crawler = selenium webdriver-manager diff --git a/test/conftest.py b/test/conftest.py index ee8c6b4a6c..24e3039e2c 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -8,6 +8,9 @@ import uuid import logging from pathlib import Path +import os + +import pinecone import responses from sqlalchemy import create_engine, text import posthog @@ -32,7 +35,7 @@ from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore import weaviate from haystack.document_stores.weaviate import WeaviateDocumentStore - from haystack.document_stores import MilvusDocumentStore + from haystack.document_stores import MilvusDocumentStore, PineconeDocumentStore from haystack.document_stores.graphdb import GraphDBKnowledgeGraph from haystack.document_stores.faiss import FAISSDocumentStore from haystack.document_stores.sql import SQLDocumentStore @@ -136,7 +139,7 @@ def pytest_collection_modifyitems(config, items): keywords.extend(i.split("-")) else: keywords.append(i) - for cur_doc_store in ["elasticsearch", "faiss", "sql", "memory", "milvus1", "milvus", "weaviate"]: + for cur_doc_store in ["elasticsearch", "faiss", "sql", "memory", "milvus1", "milvus", "weaviate", "pinecone"]: if cur_doc_store in keywords and cur_doc_store not in document_store_types_to_run: skip_docstore = pytest.mark.skip( reason=f'{cur_doc_store} is disabled. Enable via pytest --document_store_type="{cur_doc_store}"' @@ -150,6 +153,11 @@ def pytest_collection_modifyitems(config, items): skip_milvus = pytest.mark.skip(reason="Skipping Tests for 'milvus', as Milvus1 seems to be installed.") item.add_marker(skip_milvus) + # Skip PineconeDocumentStore if PINECONE_API_KEY not in environment variables + if not os.environ.get("PINECONE_API_KEY", False) and "pinecone" in keywords: + skip_pinecone = pytest.mark.skip(reason="PINECONE_API_KEY not in environment variables.") + item.add_marker(skip_pinecone) + # # Empty mocks, as a base for unit tests. @@ -629,7 +637,7 @@ def ensure_ids_are_correct_uuids(docs: list, document_store: object) -> None: d["id"] = str(uuid.uuid4()) -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate", "pinecone"]) def document_store_with_docs(request, test_docs_xs, tmp_path): embedding_dim = request.node.get_closest_marker("embedding_dim", pytest.mark.embedding_dim(768)) document_store = get_document_store( @@ -653,8 +661,13 @@ def document_store(request, tmp_path): if isinstance(document_store, MilvusDocumentStore) and not milvus1: document_store.collection.drop() + # Make sure to delete Pinecone indexes, required for tests using different embedding dimensions + if isinstance(document_store, PineconeDocumentStore): + for index in document_store.pinecone_indexes: + pinecone.delete_index(index) + -@pytest.fixture(params=["memory", "faiss", "milvus1", "milvus", "elasticsearch"]) +@pytest.fixture(params=["memory", "faiss", "milvus1", "milvus", "elasticsearch", "pinecone"]) def document_store_dot_product(request, tmp_path): embedding_dim = request.node.get_closest_marker("embedding_dim", pytest.mark.embedding_dim(768)) document_store = get_document_store( @@ -667,7 +680,7 @@ def document_store_dot_product(request, tmp_path): document_store.delete_documents() -@pytest.fixture(params=["memory", "faiss", "milvus1", "milvus", "elasticsearch"]) +@pytest.fixture(params=["memory", "faiss", "milvus1", "milvus", "elasticsearch", "pinecone"]) def document_store_dot_product_with_docs(request, test_docs_xs, tmp_path): embedding_dim = request.node.get_closest_marker("embedding_dim", pytest.mark.embedding_dim(768)) document_store = get_document_store( @@ -681,7 +694,7 @@ def document_store_dot_product_with_docs(request, test_docs_xs, tmp_path): document_store.delete_documents() -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "milvus1"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "milvus1", "pinecone"]) def document_store_dot_product_small(request, tmp_path): embedding_dim = request.node.get_closest_marker("embedding_dim", pytest.mark.embedding_dim(3)) document_store = get_document_store( @@ -694,7 +707,7 @@ def document_store_dot_product_small(request, tmp_path): document_store.delete_documents() -@pytest.fixture(params=["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate"]) +@pytest.fixture(params=["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate", "pinecone"]) def document_store_small(request, tmp_path): embedding_dim = request.node.get_closest_marker("embedding_dim", pytest.mark.embedding_dim(3)) document_store = get_document_store( @@ -825,6 +838,16 @@ def get_document_store( ) document_store.weaviate_client.schema.delete_all() document_store._create_schema_and_index_if_not_exist() + + elif document_store_type == "pinecone": + document_store = PineconeDocumentStore( + api_key=os.environ["PINECONE_API_KEY"], + embedding_dim=embedding_dim, + embedding_field=embedding_field, + index=index, + similarity=similarity, + ) + else: raise Exception(f"No document store fixture for '{document_store_type}'") diff --git a/test/test_document_store.py b/test/test_document_store.py index 62102f6d5e..facb42996d 100644 --- a/test/test_document_store.py +++ b/test/test_document_store.py @@ -125,7 +125,9 @@ def test_write_with_duplicate_doc_ids(document_store): document_store.write_documents(duplicate_documents, duplicate_documents="fail") -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus1", "weaviate"], indirect=True) +@pytest.mark.parametrize( + "document_store", ["elasticsearch", "faiss", "memory", "milvus1", "weaviate", "pinecone"], indirect=True +) def test_write_with_duplicate_doc_ids_custom_index(document_store): duplicate_documents = [ Document(content="Doc1", id_hash_keys=["content"]), @@ -218,7 +220,9 @@ def test_get_all_documents_with_incorrect_filter_value(document_store_with_docs) assert len(documents) == 0 -@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch", "sql", "weaviate", "memory"], indirect=True) +@pytest.mark.parametrize( + "document_store_with_docs", ["elasticsearch", "sql", "weaviate", "memory", "pinecone"], indirect=True +) def test_extended_filter(document_store_with_docs): # Test comparison operators individually documents = document_store_with_docs.get_all_documents(filters={"meta_field": {"$eq": "test1"}}) @@ -712,7 +716,7 @@ def test_delete_documents_by_id_with_filters(document_store_with_docs): # exclude weaviate because it does not support storing labels -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus1"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus1", "pinecone"], indirect=True) def test_labels(document_store): label = Label( query="question1", @@ -800,7 +804,7 @@ def test_labels(document_store): # exclude weaviate because it does not support storing labels -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus1"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus1", "pinecone"], indirect=True) def test_multilabel(document_store): labels = [ Label( @@ -916,7 +920,7 @@ def test_multilabel(document_store): # exclude weaviate because it does not support storing labels -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus1"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus1", "pinecone"], indirect=True) def test_multilabel_no_answer(document_store): labels = [ Label( @@ -1171,7 +1175,7 @@ def test_multilabel_meta_aggregations(document_store): assert multi_label.filters == l.filters -@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "milvus1", "weaviate"], indirect=True) +@pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "milvus1", "weaviate", "pinecone"], indirect=True) # Currently update_document_meta() is not implemented for Memory doc store def test_update_meta(document_store): documents = [ diff --git a/test/test_retriever.py b/test/test_retriever.py index 3b4a86e732..a581e87ec4 100644 --- a/test/test_retriever.py +++ b/test/test_retriever.py @@ -151,7 +151,7 @@ def test_elasticsearch_custom_query(): @pytest.mark.slow @pytest.mark.parametrize( - "document_store", ["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate"], indirect=True + "document_store", ["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate", "pinecone"], indirect=True ) @pytest.mark.parametrize("retriever", ["dpr"], indirect=True) def test_dpr_embedding(document_store, retriever, docs): @@ -182,7 +182,7 @@ def test_dpr_embedding(document_store, retriever, docs): @pytest.mark.slow @pytest.mark.parametrize( - "document_store", ["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate"], indirect=True + "document_store", ["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate", "pinecone"], indirect=True ) @pytest.mark.parametrize("retriever", ["retribert"], indirect=True) @pytest.mark.embedding_dim(128) diff --git a/test/test_standard_pipelines.py b/test/test_standard_pipelines.py index 00764d4b97..d5c5f9e5be 100644 --- a/test/test_standard_pipelines.py +++ b/test/test_standard_pipelines.py @@ -52,7 +52,7 @@ def test_faq_pipeline(retriever, document_store): @pytest.mark.parametrize("retriever", ["embedding"], indirect=True) @pytest.mark.parametrize( - "document_store", ["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate"], indirect=True + "document_store", ["elasticsearch", "faiss", "memory", "milvus1", "milvus", "weaviate", "pinecone"], indirect=True ) def test_document_search_pipeline(retriever, document_store): documents = [