diff --git a/haystack/__init__.py b/haystack/__init__.py index b04c0c254f..baa9b55804 100644 --- a/haystack/__init__.py +++ b/haystack/__init__.py @@ -1,6 +1,7 @@ import logging import pandas as pd +from haystack.finder import Finder pd.options.display.max_colwidth = 80 @@ -11,44 +12,4 @@ logging.getLogger('transformers').setLevel(logging.WARNING) logging.getLogger('farm.eval').setLevel(logging.INFO) -class Finder: - """ - Finder ties together instances of the Reader and Retriever class. - - It provides an interface to predict top n answers for a given question. - """ - - def __init__(self, reader, retriever): - self.retriever = retriever - self.reader = reader - - def get_answers(self, question, top_k_reader=1, top_k_retriever=10, filters=None): - """ - Get top k answers for a given question. - - :param question: the question string - :param top_k_reader: number of answers returned by the reader - :param top_k_retriever: number of text units to be retrieved - :param filters: limit scope to documents having the given tags and their corresponding values. - The format for the dict is {"tag-1": "value-1", "tag-2": "value-2" ...} - :return: - """ - - # 1) Optional: reduce the search space via document tags - if filters: - candidate_doc_ids = self.retriever.document_store.get_document_ids_by_tags(filters) - else: - candidate_doc_ids = None - - # 2) Apply retriever to get fast candidate paragraphs - paragraphs, meta_data = self.retriever.retrieve(question, top_k=top_k_retriever, candidate_doc_ids=candidate_doc_ids) - - # 3) Apply reader to get granular answer(s) - logger.info(f"Applying the reader now to look for the answer in detail ...") - results = self.reader.predict(question=question, - paragraphs=paragraphs, - meta_data_paragraphs=meta_data, - top_k=top_k_reader) - - return results diff --git a/haystack/database/elasticsearch.py b/haystack/database/elasticsearch.py index bb2a418236..b9884cc54b 100644 --- a/haystack/database/elasticsearch.py +++ b/haystack/database/elasticsearch.py @@ -1,85 +1,124 @@ from elasticsearch import Elasticsearch -from elasticsearch_dsl import Search, Document as ESDoc, Text, connections -from haystack.database.base import BaseDocumentStore - +from elasticsearch.helpers import scan -class Document(ESDoc): - name = Text() - text = Text() - tags = Text() - - class Index: - name = "document" +from haystack.database.base import BaseDocumentStore class ElasticsearchDocumentStore(BaseDocumentStore): - def __init__(self, host="localhost", username="", password="", index="document"): + def __init__( + self, + host="localhost", + username="", + password="", + index="document", + search_fields="text", + text_field="text", + name_field="name", + doc_id_field="document_id", + tag_fields=None, + custom_mapping=None, + ): self.client = Elasticsearch(hosts=[{"host": host}], http_auth=(username, password)) - self.connections = connections.create_connection(hosts=[{"host": host}], http_auth=(username, password)) - Document.init() # create mapping if not exists. + # if no custom_mapping is supplied, use the default mapping + if not custom_mapping: + custom_mapping = { + "mappings": { + "properties": { + name_field: {"type": "text"}, + text_field: {"type": "text"}, + doc_id_field: {"type": "text"}, + } + } + } + # create an index if not exists + self.client.indices.create(index=index, ignore=400, body=custom_mapping) self.index = index + # configure mappings to ES fields that will be used for querying / displaying results + if type(search_fields) == str: + search_fields = [search_fields] + self.search_fields = search_fields + self.text_field = text_field + self.name_field = name_field + self.tag_fields = tag_fields + self.doc_id_field = doc_id_field + def get_document_by_id(self, id): query = {"filter": {"term": {"_id": id}}} result = self.client.search(index=self.index, body=query)["hits"]["hits"] if result: - document = {"id": result["_id"], "name": result["name"], "text": result["text"]} + document = { + "id": result[self.doc_id_field], + "name": result[self.name_field], + "text": result[self.text_field], + } else: document = None return document - def get_document_ids_by_tags(self, tags): - query = { - "query": { - "bool": { - "should": [ - { - "terms": { - "tags": tags - } - } - ] - } - } - } + def get_document_by_name(self, name): + query = {"filter": {"term": {self.name_field: name}}} result = self.client.search(index=self.index, body=query)["hits"]["hits"] - documents = [] + if result: + document = { + "id": result[self.doc_id_field], + "name": result[self.name_field], + "text": result[self.text_field], + } + else: + document = None + return document + + def get_document_ids_by_tags(self, tags): + term_queries = [{"terms": {key: value}} for key, value in tags.items()] + query = {"query": {"bool": {"must": term_queries}}} + result = self.client.search(index=self.index, body=query, size=10000)["hits"]["hits"] + doc_ids = [] for hit in result: - documents.append({"id": hit["_id"], "name": hit["name"], "text": hit["text"]}) - return documents + doc_ids.append(hit["_id"]) + return doc_ids def write_documents(self, documents): - for doc in documents: - d = Document( - name=doc["name"], - text=doc["text"], - document_id=doc.get("document_id", None), - tags=doc.get("tags", None), - ) - d.save() + for d in documents: + self.client.index(index=self.index, body=d) def get_document_count(self): - s = Search(using=self.client, index=self.index) - return s.count() + result = self.client.count() + count = result["count"] + return count def get_all_documents(self): - search = Search(using=self.client, index=self.index).scan() + result = scan(self.client, query={"query": {"match_all": {}}}, index=self.index) documents = [] - for hit in search: + for hit in result: documents.append( { - "id": hit.meta["id"], - "name": hit["name"], - "text": hit["text"], + "id": hit["_source"][self.doc_id_field], + "name": hit["_source"][self.name_field], + "text": hit["_source"][self.text_field], } ) return documents - def query(self, query, top_k=10): - search = Search(using=self.client, index=self.index).query("match", text=query)[:top_k].execute() + def query(self, query, top_k=10, candidate_doc_ids=None): + # TODO: + # for now: we keep the current structure of candidate_doc_ids for compatibility with SQL documentstores + # midterm: get rid of it and do filtering with tags directly in this query + + body = { + "size": top_k, + "query": { + "bool": { + "must": [{"multi_match": {"query": query, "type": "most_fields", "fields": self.search_fields}}] + } + }, + } + if candidate_doc_ids: + body["query"]["bool"]["filter"] = [{"terms": {"_id": candidate_doc_ids}}] + result = self.client.search(index=self.index, body=body)["hits"]["hits"] paragraphs = [] meta_data = [] - for hit in search: - paragraphs.append(hit["text"]) - meta_data.append({"paragraph_id": hit.meta["id"], "document_id": hit["document_id"]}) + for hit in result: + paragraphs.append(hit["_source"][self.text_field]) + meta_data.append({"paragraph_id": hit["_id"], "document_id": hit["_source"][self.doc_id_field]}) return paragraphs, meta_data diff --git a/haystack/finder.py b/haystack/finder.py new file mode 100644 index 0000000000..e32fa45921 --- /dev/null +++ b/haystack/finder.py @@ -0,0 +1,43 @@ +import logging +logger = logging.getLogger(__name__) + +class Finder: + """ + Finder ties together instances of the Reader and Retriever class. + + It provides an interface to predict top n answers for a given question. + """ + + def __init__(self, reader, retriever): + self.retriever = retriever + self.reader = reader + + def get_answers(self, question, top_k_reader=1, top_k_retriever=10, filters=None): + """ + Get top k answers for a given question. + + :param question: the question string + :param top_k_reader: number of answers returned by the reader + :param top_k_retriever: number of text units to be retrieved + :param filters: limit scope to documents having the given tags and their corresponding values. + The format for the dict is {"tag-1": "value-1", "tag-2": "value-2" ...} + :return: + """ + + # 1) Optional: reduce the search space via document tags + if filters: + candidate_doc_ids = self.retriever.document_store.get_document_ids_by_tags(filters) + else: + candidate_doc_ids = None + + # 2) Apply retriever to get fast candidate paragraphs + paragraphs, meta_data = self.retriever.retrieve(question, top_k=top_k_retriever, candidate_doc_ids=candidate_doc_ids) + + # 3) Apply reader to get granular answer(s) + logger.info(f"Applying the reader now to look for the answer in detail ...") + results = self.reader.predict(question=question, + paragrahps=paragraphs, + meta_data_paragraphs=meta_data, + top_k=top_k_reader) + + return results \ No newline at end of file diff --git a/haystack/retriever/elasticsearch.py b/haystack/retriever/elasticsearch.py index d80ef6a72c..3cdb3f4e17 100644 --- a/haystack/retriever/elasticsearch.py +++ b/haystack/retriever/elasticsearch.py @@ -6,4 +6,5 @@ def __init__(self, document_store): self.document_store = document_store def retrieve(self, query, candidate_doc_ids=None, top_k=10): - return self.document_store.query(query, top_k) + + return self.document_store.query(query, top_k, candidate_doc_ids) diff --git a/requirements.txt b/requirements.txt index f2f77332a8..940065558c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,5 +6,4 @@ flask_sqlalchemy pandas psycopg2-binary sklearn -elasticsearch -elasticsearch_dsl \ No newline at end of file +elasticsearch \ No newline at end of file