Skip to content

Commit

Permalink
Add Brownfield Support of existing Elasticsearch indices (#2229)
Browse files Browse the repository at this point in the history
* Add method to transform existing ES index

* Add possibility to regularly add new records

* Fix types

* Restructure import statement

* Add use_system_proxy param

* Update Documentation & Code Style

* Change location and name + add test

* Update Documentation & Code Style

* Add test cases for metadata fields

* Fix linter

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
bogdankostic and github-actions[bot] authored Feb 22, 2022
1 parent 965cc86 commit 4bad21e
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 8 deletions.
7 changes: 6 additions & 1 deletion haystack/document_stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@

from haystack.document_stores.memory import InMemoryDocumentStore
from haystack.document_stores.deepsetcloud import DeepsetCloudDocumentStore
from haystack.document_stores.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl
from haystack.document_stores.utils import (
eval_data_from_json,
eval_data_from_jsonl,
squad_json_to_jsonl,
es_index_to_document_store,
)
1 change: 1 addition & 0 deletions haystack/document_stores/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from haystack.nodes.preprocessor import PreProcessor
from haystack.document_stores.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl


logger = logging.getLogger(__name__)

try:
Expand Down
8 changes: 5 additions & 3 deletions haystack/document_stores/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,9 @@ def __init__(
self.duplicate_documents = duplicate_documents
self.refresh_type = refresh_type

@classmethod
def _init_elastic_client(
self,
cls,
host: Union[str, List[str]],
port: Union[int, List[int]],
username: str,
Expand All @@ -256,7 +257,7 @@ def _init_elastic_client(
use_system_proxy: bool,
) -> Elasticsearch:

hosts = self._prepare_hosts(host, port)
hosts = cls._prepare_hosts(host, port)

if (api_key or api_key_id) and not (api_key and api_key_id):
raise ValueError("You must provide either both or none of `api_key_id` and `api_key`")
Expand Down Expand Up @@ -326,7 +327,8 @@ def _init_elastic_client(
)
return client

def _prepare_hosts(self, host, port):
@staticmethod
def _prepare_hosts(host, port):
# Create list of host(s) + port(s) to allow direct client connections to multiple elasticsearch nodes
if isinstance(host, list):
if isinstance(port, list):
Expand Down
4 changes: 2 additions & 2 deletions haystack/document_stores/filter_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sqlalchemy.sql import select
from sqlalchemy import and_, or_

from haystack.document_stores.utils import convert_date_to_rfc3339
from haystack.document_stores import utils


def nested_defaultdict() -> defaultdict:
Expand Down Expand Up @@ -261,7 +261,7 @@ def _get_weaviate_datatype(
if isinstance(value, str):
# Check if comparison value is a date
try:
value = convert_date_to_rfc3339(value)
value = utils.convert_date_to_rfc3339(value)
data_type = "valueDate"
# Comparison value is a plain string
except ValueError:
Expand Down
148 changes: 148 additions & 0 deletions haystack/document_stores/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import typing
from typing import Dict, List, Optional, Tuple, Union, Generator

import json
import logging
from datetime import datetime
from elasticsearch.helpers import scan
from tqdm.auto import tqdm

from haystack.document_stores.filter_utils import LogicalFilterClause
from haystack.schema import Document, Label, Answer, Span
from haystack.nodes.preprocessor import PreProcessor

if typing.TYPE_CHECKING:
# This results in a circular import if we don't use typing.TYPE_CHECKING
from haystack.document_stores.base import BaseDocumentStore


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -271,3 +279,143 @@ def convert_date_to_rfc3339(date: str) -> str:
converted_date = parsed_datetime.isoformat()

return converted_date


def es_index_to_document_store(
document_store: "BaseDocumentStore",
original_index_name: str,
original_content_field: str,
original_name_field: Optional[str] = None,
included_metadata_fields: Optional[List[str]] = None,
excluded_metadata_fields: Optional[List[str]] = None,
store_original_ids: bool = True,
index: Optional[str] = None,
preprocessor: Optional[PreProcessor] = None,
batch_size: int = 10_000,
host: Union[str, List[str]] = "localhost",
port: Union[int, List[int]] = 9200,
username: str = "",
password: str = "",
api_key_id: Optional[str] = None,
api_key: Optional[str] = None,
aws4auth=None,
scheme: str = "http",
ca_certs: Optional[str] = None,
verify_certs: bool = True,
timeout: int = 30,
use_system_proxy: bool = False,
) -> "BaseDocumentStore":
"""
This function provides brownfield support of existing Elasticsearch indexes by converting each of the records in
the provided index to haystack `Document` objects and writing them to the specified `DocumentStore`. It can be used
on a regular basis in order to add new records of the Elasticsearch index to the `DocumentStore`.
:param document_store: The haystack `DocumentStore` to write the converted `Document` objects to.
:param original_index_name: Elasticsearch index containing the records to be converted.
:param original_content_field: Elasticsearch field containing the text to be put in the `content` field of the
resulting haystack `Document` objects.
:param original_name_field: Optional Elasticsearch field containing the title title of the Document.
:param included_metadata_fields: List of Elasticsearch fields that shall be stored in the `meta` field of the
resulting haystack `Document` objects. If `included_metadata_fields` and `excluded_metadata_fields` are `None`,
all the fields found in the Elasticsearch records will be kept as metadata. You can specify only one of the
`included_metadata_fields` and `excluded_metadata_fields` parameters.
:param excluded_metadata_fields: List of Elasticsearch fields that shall be excluded from the `meta` field of the
resulting haystack `Document` objects. If `included_metadata_fields` and `excluded_metadata_fields` are `None`,
all the fields found in the Elasticsearch records will be kept as metadata. You can specify only one of the
`included_metadata_fields` and `excluded_metadata_fields` parameters.
:param store_original_ids: Whether to store the ID a record had in the original Elasticsearch index at the
`"_original_es_id"` metadata field of the resulting haystack `Document` objects. This should be set to `True`
if you want to continuously update the `DocumentStore` with new records inside your Elasticsearch index. If this
parameter was set to `False` on the first call of `es_index_to_document_store`,
all the indexed Documents in the `DocumentStore` will be overwritten in the second call.
:param index: Name of index in `document_store` to use to store the resulting haystack `Document` objects.
:param preprocessor: Optional PreProcessor that will be applied on the content field of the original Elasticsearch
record.
:param batch_size: Number of records to process at once.
:param host: URL(s) of Elasticsearch nodes.
:param port: Ports(s) of Elasticsearch nodes.
:param username: Username (standard authentication via http_auth).
:param password: Password (standard authentication via http_auth).
:param api_key_id: ID of the API key (altenative authentication mode to the above http_auth).
:param api_key: Secret value of the API key (altenative authentication mode to the above http_auth).
:param aws4auth: Authentication for usage with AWS Elasticsearch
(can be generated with the requests-aws4auth package).
:param scheme: `"https"` or `"http"`, protocol used to connect to your Elasticsearch instance.
:param ca_certs: Root certificates for SSL: it is a path to certificate authority (CA) certs on disk.
You can use certifi package with `certifi.where()` to find where the CA certs file is located in your machine.
:param verify_certs: Whether to be strict about ca certificates.
:param timeout: Number of seconds after which an Elasticsearch request times out.
:param use_system_proxy: Whether to use system proxy.
"""
# This import cannot be at the beginning of the file, as this would result in a circular import
from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore

# Initialize Elasticsearch client
es_client = ElasticsearchDocumentStore._init_elastic_client(
host=host,
port=port,
username=username,
password=password,
api_key=api_key,
api_key_id=api_key_id,
aws4auth=aws4auth,
scheme=scheme,
ca_certs=ca_certs,
verify_certs=verify_certs,
timeout=timeout,
use_system_proxy=use_system_proxy,
)

# Get existing original ES IDs inside DocumentStore in order to not reindex the corresponding records
existing_ids = [
doc.meta["_original_es_id"]
for doc in document_store.get_all_documents_generator(index=index)
if "_original_es_id" in doc.meta
]

# Iterate over each individual record
query: Dict[str, Dict] = {"query": {"bool": {"must": [{"match_all": {}}]}}}
if existing_ids:
filters = LogicalFilterClause.parse({"_id": {"$nin": existing_ids}}).convert_to_elasticsearch()
query["query"]["bool"]["filter"] = filters
records = scan(client=es_client, query=query, index=original_index_name)
number_of_records = es_client.count(index=original_index_name, body=query)["count"]
haystack_documents: List[Dict] = []
for idx, record in enumerate(tqdm(records, total=number_of_records, desc="Converting ES Records")):
# Write batch_size number of documents to haystack DocumentStore
if (idx + 1) % batch_size == 0:
document_store.write_documents(haystack_documents, index=index)
haystack_documents = []

# Get content and metadata of current record
content = record["_source"].pop(original_content_field, "")
if content:
record_doc = {"content": content, "meta": {}}

if original_name_field is not None:
if original_name_field in record["_source"]:
record_doc["meta"]["name"] = record["_source"].pop(original_name_field)
# Only add selected metadata fields
if included_metadata_fields is not None:
for metadata_field in included_metadata_fields:
if metadata_field in record["_source"]:
record_doc["meta"][metadata_field] = record["_source"][metadata_field]
# Add all metadata fields except for those in excluded_metadata_fields
else:
if excluded_metadata_fields is not None:
for metadata_field in excluded_metadata_fields:
record["_source"].pop(metadata_field, None)
record_doc["meta"].update(record["_source"])

if store_original_ids:
record_doc["meta"]["_original_es_id"] = record["_id"]

# Apply preprocessor if provided
preprocessed_docs = preprocessor.process(record_doc) if preprocessor is not None else [record_doc]

haystack_documents.extend(preprocessed_docs)

if haystack_documents:
document_store.write_documents(haystack_documents, index=index)

return document_store
47 changes: 45 additions & 2 deletions test/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
DC_TEST_INDEX,
SAMPLES_PATH,
)
from haystack.document_stores import WeaviateDocumentStore, DeepsetCloudDocumentStore
from haystack.document_stores import WeaviateDocumentStore, DeepsetCloudDocumentStore, InMemoryDocumentStore
from haystack.document_stores.base import BaseDocumentStore
from haystack.document_stores.utils import es_index_to_document_store
from haystack.errors import DuplicateDocumentError
from haystack.schema import Document, Label, Answer, Span
from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import EmbeddingRetriever
from haystack.nodes import EmbeddingRetriever, PreProcessor
from haystack.pipelines import DocumentSearchPipeline
from haystack.utils import DeepsetCloudError

Expand Down Expand Up @@ -1713,3 +1714,45 @@ def test_elasticsearch_search_field_mapping():

assert indexed_settings["haystack_search_field_mapping"]["mappings"]["properties"]["content"]["type"] == "text"
assert indexed_settings["haystack_search_field_mapping"]["mappings"]["properties"]["sub_content"]["type"] == "text"


@pytest.mark.parametrize("document_store_with_docs", ["elasticsearch"], indirect=True)
def test_elasticsearch_brownfield_support(document_store_with_docs):
new_document_store = InMemoryDocumentStore()
new_document_store = es_index_to_document_store(
document_store=new_document_store,
original_index_name="haystack_test",
original_content_field="content",
original_name_field="name",
included_metadata_fields=["date_field"],
index="test_brownfield_support",
)

original_documents = document_store_with_docs.get_all_documents(index="haystack_test")
transferred_documents = new_document_store.get_all_documents(index="test_brownfield_support")
assert len(original_documents) == len(transferred_documents)
assert all("name" in doc.meta for doc in transferred_documents)
assert all("date_field" in doc.meta for doc in transferred_documents)
assert all("meta_field" not in doc.meta for doc in transferred_documents)
assert all("numeric_field" not in doc.meta for doc in transferred_documents)

original_content = set([doc.content for doc in original_documents])
transferred_content = set([doc.content for doc in transferred_documents])
assert original_content == transferred_content

# Test transferring docs with PreProcessor
new_document_store = es_index_to_document_store(
document_store=new_document_store,
original_index_name="haystack_test",
original_content_field="content",
excluded_metadata_fields=["date_field"],
index="test_brownfield_support_2",
preprocessor=PreProcessor(split_length=1, split_respect_sentence_boundary=False),
)
transferred_documents = new_document_store.get_all_documents(index="test_brownfield_support_2")
assert all("date_field" not in doc.meta for doc in transferred_documents)
assert all("name" in doc.meta for doc in transferred_documents)
assert all("meta_field" in doc.meta for doc in transferred_documents)
assert all("numeric_field" in doc.meta for doc in transferred_documents)
# Check if number of transferred_documents is equal to number of unique words.
assert len(transferred_documents) == len(set(" ".join(original_content).split()))

0 comments on commit 4bad21e

Please sign in to comment.