From 556bfc74b9590e0e7746ffbdae7846033085285f Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Fri, 21 Oct 2022 20:47:37 +0200 Subject: [PATCH 01/23] remove generate_single_summary --- haystack/nodes/summarizer/base.py | 27 ++-- haystack/nodes/summarizer/transformers.py | 143 +++++++--------------- haystack/pipelines/standard_pipelines.py | 8 +- test/nodes/test_summarizer.py | 126 +++++++++---------- test/nodes/test_summarizer_translation.py | 6 +- 5 files changed, 119 insertions(+), 191 deletions(-) diff --git a/haystack/nodes/summarizer/base.py b/haystack/nodes/summarizer/base.py index d2402c4d13..917f1723ea 100644 --- a/haystack/nodes/summarizer/base.py +++ b/haystack/nodes/summarizer/base.py @@ -14,47 +14,34 @@ class BaseSummarizer(BaseComponent): outgoing_edges = 1 @abstractmethod - def predict(self, documents: List[Document], generate_single_summary: Optional[bool] = None) -> List[Document]: + def predict(self, documents: List[Document]) -> List[Document]: """ Abstract method for creating a summary. :param documents: Related documents (e.g. coming from a retriever) that the answer shall be conditioned on. - :param generate_single_summary: Whether to generate a single summary for all documents or one summary per document. - If set to "True", all docs will be joined to a single string that will then - be summarized. - Important: The summary will depend on the order of the supplied documents! - :return: List of Documents, where Document.content contains the summarization and Document.meta["context"] - the original, not summarized text + :return: List of Documents, where Document.meta["summary"] contains the summarization """ pass @abstractmethod def predict_batch( - self, - documents: Union[List[Document], List[List[Document]]], - generate_single_summary: Optional[bool] = None, - batch_size: Optional[int] = None, + self, documents: Union[List[Document], List[List[Document]]], batch_size: Optional[int] = None ) -> Union[List[Document], List[List[Document]]]: pass - def run(self, documents: List[Document], generate_single_summary: Optional[bool] = None): # type: ignore + def run(self, documents: List[Document]): # type: ignore results: Dict = {"documents": []} if documents: - results["documents"] = self.predict(documents=documents, generate_single_summary=generate_single_summary) + results["documents"] = self.predict(documents=documents) return results, "output_1" def run_batch( # type: ignore - self, - documents: Union[List[Document], List[List[Document]]], - generate_single_summary: Optional[bool] = None, - batch_size: Optional[int] = None, + self, documents: Union[List[Document], List[List[Document]]], batch_size: Optional[int] = None ): - results = self.predict_batch( - documents=documents, generate_single_summary=generate_single_summary, batch_size=batch_size - ) + results = self.predict_batch(documents=documents, batch_size=batch_size) return {"documents": results}, "output_1" diff --git a/haystack/nodes/summarizer/transformers.py b/haystack/nodes/summarizer/transformers.py index 531c246b26..ab5a5e8602 100644 --- a/haystack/nodes/summarizer/transformers.py +++ b/haystack/nodes/summarizer/transformers.py @@ -34,19 +34,18 @@ class TransformersSummarizer(BaseSummarizer): | | # Summarize | summary = summarizer.predict( - | documents=docs, - | generate_single_summary=True - | ) + | documents=docs) | - | # Show results (List of Documents, containing summary and original text) + | # Show results (List of Documents, containing summary and original content) | print(summary) | | [ | { - | "text": "California's largest electricity provider has turned off power to hundreds of thousands of customers.", + | "content": "PGE stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. ...", | ... | "meta": { - | "context": "PGE stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. ..." + | "summary": "California's largest electricity provider has turned off power to hundreds of thousands of customers.", + | ... | }, | ... | }, @@ -62,8 +61,6 @@ def __init__( min_length: int = 5, use_gpu: bool = True, clean_up_tokenization_spaces: bool = True, - separator_for_single_summary: str = " ", - generate_single_summary: bool = False, batch_size: int = 16, progress_bar: bool = True, use_auth_token: Optional[Union[str, bool]] = None, @@ -83,12 +80,6 @@ def __init__( :param min_length: Minimum length of summarized text :param use_gpu: Whether to use GPU (if available). :param clean_up_tokenization_spaces: Whether or not to clean up the potential extra spaces in the text output - :param separator_for_single_summary: If `generate_single_summary=True` in `predict()`, we need to join all docs - into a single text. This separator appears between those subsequent docs. - :param generate_single_summary: Whether to generate a single summary for all documents or one summary per document. - If set to "True", all docs will be joined to a single string that will then - be summarized. - Important: The summary will depend on the order of the supplied documents! :param batch_size: Number of documents to process at a time. :param progress_bar: Whether to show a progress bar. :param use_auth_token: The API token used to download private models from Huggingface. @@ -110,36 +101,31 @@ def __init__( f"using the first device {self.devices[0]}." ) - # TODO AutoModelForSeq2SeqLM is only necessary with transformers==4.1.1, with newer versions use the pipeline directly if tokenizer is None: tokenizer = model_name_or_path - model = AutoModelForSeq2SeqLM.from_pretrained( - pretrained_model_name_or_path=model_name_or_path, revision=model_version, use_auth_token=use_auth_token - ) + self.summarizer = pipeline( - "summarization", model=model, tokenizer=tokenizer, device=self.devices[0], use_auth_token=use_auth_token + task="summarization", + model=model_name_or_path, + tokenizer=tokenizer, + revision=model_version, + device=self.devices[0], + use_auth_token=use_auth_token, ) self.max_length = max_length self.min_length = min_length self.clean_up_tokenization_spaces = clean_up_tokenization_spaces - self.separator_for_single_summary = separator_for_single_summary - self.generate_single_summary = generate_single_summary self.print_log: Set[str] = set() self.batch_size = batch_size self.progress_bar = progress_bar - def predict(self, documents: List[Document], generate_single_summary: Optional[bool] = None) -> List[Document]: + def predict(self, documents: List[Document]) -> List[Document]: """ Produce the summarization from the supplied documents. These document can for example be retrieved via the Retriever. :param documents: Related documents (e.g. coming from a retriever) that the answer shall be conditioned on. - :param generate_single_summary: Whether to generate a single summary for all documents or one summary per document. - If set to "True", all docs will be joined to a single string that will then - be summarized. - Important: The summary will depend on the order of the supplied documents! - :return: List of Documents, where Document.text contains the summarization and Document.meta["context"] - the original, not summarized text + :return: List of Documents, where Document.meta["summary"] contains the summarization """ if self.min_length > self.max_length: raise AttributeError("min_length cannot be greater than max_length") @@ -147,16 +133,8 @@ def predict(self, documents: List[Document], generate_single_summary: Optional[b if len(documents) == 0: raise AttributeError("Summarizer needs at least one document to produce a summary.") - if generate_single_summary is None: - generate_single_summary = self.generate_single_summary - contexts: List[str] = [doc.content for doc in documents] - if generate_single_summary: - # Documents order is very important to produce summary. - # Different order of same documents produce different summary. - contexts = [self.separator_for_single_summary.join(contexts)] - encoded_input = self.summarizer.tokenizer(contexts, verbose=False) for input_id in encoded_input["input_ids"]: tokens_count: int = len(input_id) @@ -182,23 +160,14 @@ def predict(self, documents: List[Document], generate_single_summary: Optional[b result: List[Document] = [] - if generate_single_summary: - for context, summarized_answer in zip(contexts, summaries): - cur_doc = Document(content=summarized_answer["summary_text"], meta={"context": context}) - result.append(cur_doc) - else: - for context, summarized_answer, document in zip(contexts, summaries, documents): - cur_doc = Document(content=summarized_answer["summary_text"], meta=document.meta) - cur_doc.meta.update({"context": context}) - result.append(cur_doc) + for summary, document in zip(summaries, documents): + document.meta.update({"summary": summary["summary_text"]}) + result.append(document) return result def predict_batch( - self, - documents: Union[List[Document], List[List[Document]]], - generate_single_summary: Optional[bool] = None, - batch_size: Optional[int] = None, + self, documents: Union[List[Document], List[List[Document]]], batch_size: Optional[int] = None ) -> Union[List[Document], List[List[Document]]]: """ Produce the summarization from the supplied documents. @@ -206,11 +175,6 @@ def predict_batch( :param documents: Single list of related documents or list of lists of related documents (e.g. coming from a retriever) that the answer shall be conditioned on. - :param generate_single_summary: Whether to generate a single summary for each provided document list or - one summary per document. - If set to "True", all docs of a document list will be joined to a single string - that will then be summarized. - Important: The summary will depend on the order of the supplied documents! :param batch_size: Number of Documents to process at a time. """ @@ -225,34 +189,13 @@ def predict_batch( if batch_size is None: batch_size = self.batch_size - if generate_single_summary is None: - generate_single_summary = self.generate_single_summary - - single_doc_list = False - if isinstance(documents[0], Document): - single_doc_list = True - + single_doc_list = isinstance(documents[0], Document) if single_doc_list: - contexts = [doc.content for doc in documents if isinstance(doc, Document)] - else: - contexts = [ - [doc.content for doc in docs if isinstance(doc, Document)] - for docs in documents - if isinstance(docs, list) - ] - - if generate_single_summary: - if single_doc_list: - contexts = [self.separator_for_single_summary.join(contexts)] - else: - contexts = [self.separator_for_single_summary.join(context_group) for context_group in contexts] - number_of_docs = [1 for _ in contexts] + contexts = [doc.content for doc in documents] else: - if single_doc_list: - number_of_docs = [1 for _ in contexts] - else: - number_of_docs = [len(context_group) for context_group in contexts] - contexts = list(itertools.chain.from_iterable(contexts)) + contexts = [[doc.content for doc in docs] for docs in documents if isinstance(docs, list)] + number_of_docs = [len(context_group) for context_group in contexts] + contexts = list(itertools.chain.from_iterable(contexts)) encoded_input = self.summarizer.tokenizer(contexts, verbose=False) for input_id in encoded_input["input_ids"]: @@ -286,26 +229,26 @@ def predict_batch( ): summaries.extend(summary_batch) - # Group summaries together - grouped_summaries = [] - grouped_contexts = [] - left_idx = 0 - right_idx = 0 - for number in number_of_docs: - right_idx = left_idx + number - grouped_summaries.append(summaries[left_idx:right_idx]) - grouped_contexts.append(contexts[left_idx:right_idx]) - left_idx = right_idx - result = [] - for summary_group, context_group in zip(grouped_summaries, grouped_contexts): - cur_summaries = [ - Document(content=summary["summary_text"], meta={"context": context}) - for summary, context in zip(summary_group, context_group) - ] - if single_doc_list: - result.append(cur_summaries[0]) - else: - result.append(cur_summaries) # type: ignore + if single_doc_list: + for summary, document in zip(summaries, documents): + document.meta.update({"summary": summary["summary_text"]}) + result.append(document) + else: + # Group summaries together + grouped_summaries = [] + left_idx = 0 + right_idx = 0 + for number in number_of_docs: + right_idx = left_idx + number + grouped_summaries.append(summaries[left_idx:right_idx]) + left_idx = right_idx + + for summary_group, docs_group in zip(grouped_summaries, documents): + cur_summaries = [] + for summary, document in zip(summary_group, docs_group): + document.meta.update({"summary": summary["summary_text"]}) + cur_summaries.append(document) + result.append(cur_summaries) return result diff --git a/haystack/pipelines/standard_pipelines.py b/haystack/pipelines/standard_pipelines.py index de18bd8ac5..08c4a8705d 100644 --- a/haystack/pipelines/standard_pipelines.py +++ b/haystack/pipelines/standard_pipelines.py @@ -489,9 +489,9 @@ def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = for doc in docs: cur_answer = { "query": query, - "answer": doc.content, + "answer": doc.meta.pop("summary"), "document_id": doc.id, - "context": doc.meta.pop("context"), + "context": doc.content, "score": None, "offset_start": None, "offset_end": None, @@ -527,9 +527,9 @@ def run_batch(self, queries: List[str], params: Optional[dict] = None, debug: Op for doc in cur_docs: cur_answer = { "query": query, - "answer": doc.content, + "answer": doc.meta.pop("summary"), "document_id": doc.id, - "context": doc.meta.pop("context"), + "context": doc.content, "score": None, "offset_start": None, "offset_end": None, diff --git a/test/nodes/test_summarizer.py b/test/nodes/test_summarizer.py index 239bb71074..20f0f90ac7 100644 --- a/test/nodes/test_summarizer.py +++ b/test/nodes/test_summarizer.py @@ -43,15 +43,15 @@ def test_summarization(summarizer): summarized_docs = summarizer.predict(documents=DOCS) assert len(summarized_docs) == len(DOCS) for expected_summary, summary in zip(EXPECTED_SUMMARIES, summarized_docs): - assert expected_summary == summary.content + assert expected_summary == summary.meta["summary"] -@pytest.mark.integration -@pytest.mark.summarizer -def test_summarization_one_summary(summarizer): - summarized_docs = summarizer.predict(documents=SPLIT_DOCS, generate_single_summary=True) - assert len(summarized_docs) == 1 - assert EXPECTED_ONE_SUMMARIES[0] == summarized_docs[0].content +# @pytest.mark.integration +# @pytest.mark.summarizer +# def test_summarization_one_summary(summarizer): +# summarized_docs = summarizer.predict(documents=SPLIT_DOCS, generate_single_summary=True) +# assert len(summarized_docs) == 1 +# assert EXPECTED_ONE_SUMMARIES[0] == summarized_docs[0].content @pytest.mark.integration @@ -60,7 +60,7 @@ def test_summarization_batch_single_doc_list(summarizer): summarized_docs = summarizer.predict_batch(documents=DOCS) assert len(summarized_docs) == len(DOCS) for expected_summary, summary in zip(EXPECTED_SUMMARIES, summarized_docs): - assert expected_summary == summary.content + assert expected_summary == summary.meta["summary"] @pytest.mark.integration @@ -70,7 +70,7 @@ def test_summarization_batch_multiple_doc_lists(summarizer): assert len(summarized_docs) == 2 # Number of document lists assert len(summarized_docs[0]) == len(DOCS) for expected_summary, summary in zip(EXPECTED_SUMMARIES, summarized_docs[0]): - assert expected_summary == summary.content + assert expected_summary == summary.meta["summary"] @pytest.mark.integration @@ -92,57 +92,57 @@ def test_summarization_pipeline(document_store, retriever, summarizer): assert " The Eiffel Tower in Paris has officially opened its doors to the public." == answers[0]["answer"] -@pytest.mark.integration -@pytest.mark.summarizer -@pytest.mark.parametrize( - "retriever,document_store", [("embedding", "memory"), ("elasticsearch", "elasticsearch")], indirect=True -) -def test_summarization_pipeline_one_summary(document_store, retriever, summarizer): - document_store.write_documents(SPLIT_DOCS) - - if isinstance(retriever, EmbeddingRetriever) or isinstance(retriever, DensePassageRetriever): - document_store.update_embeddings(retriever=retriever) - - query = "Where is Eiffel Tower?" - pipeline = SearchSummarizationPipeline(retriever=retriever, summarizer=summarizer, return_in_answer_format=True) - output = pipeline.run( - query=query, params={"Retriever": {"top_k": 2}, "Summarizer": {"generate_single_summary": True}} - ) - answers = output["answers"] - assert len(answers) == 1 - assert answers[0]["answer"] in EXPECTED_ONE_SUMMARIES - - -@pytest.mark.summarizer -def test_metadata_summarizer(summarizer): - docs = [ - Document( - content="""PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires. Nearly 800 thousand customers were scheduled to be affected by the shutoffs which were expected to last through at least midday tomorrow.""", - meta={ - "sub_content": "Pegasus Example", - "topic": "California's Electricity", - "context": "Dummy - PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires.", - }, - ), - Document( - content="""The tower is 324 metres (1,063 ft) tall, about the same height as an 81-storey building, and the tallest structure in Paris. Its base is square, measuring 125 metres (410 ft) on each side. During its construction, the Eiffel Tower surpassed the Washington Monument to become the tallest man-made structure in the world, a title it held for 41 years until the Chrysler Building in New York City was finished in 1930. It was the first structure to reach a height of 300 metres. Due to the addition of a broadcasting aerial at the top of the tower in 1957, it is now taller than the Chrysler Building by 5.2 metres (17 ft). Excluding transmitters, the Eiffel Tower is the second tallest free-standing structure in France after the Millau Viaduct.""", - meta={"sub_content": "Paris best tour best tour", "topic": "Eiffel tower"}, - ), - ] - # Original input is overwritten after the "predict". So adding the same input as check_output to assess the output - check_output = deepcopy(docs) - - summary = summarizer.predict(documents=docs) - - assert len(summary[0].meta) == len(check_output[0].meta) - assert len(summary[1].meta) - 1 == len(check_output[1].meta) - assert ( - summary[0].meta["context"] - == """PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires. Nearly 800 thousand customers were scheduled to be affected by the shutoffs which were expected to last through at least midday tomorrow.""" - ) - - summary = summarizer.predict(documents=docs, generate_single_summary=True) - - assert len(summary) == 1 - summary[0].meta.pop("context") - assert not summary[0].meta # Remaining metadata is not returned in case of a single summary +# @pytest.mark.integration +# @pytest.mark.summarizer +# @pytest.mark.parametrize( +# "retriever,document_store", [("embedding", "memory"), ("elasticsearch", "elasticsearch")], indirect=True +# ) +# def test_summarization_pipeline_one_summary(document_store, retriever, summarizer): +# document_store.write_documents(SPLIT_DOCS) + +# if isinstance(retriever, EmbeddingRetriever) or isinstance(retriever, DensePassageRetriever): +# document_store.update_embeddings(retriever=retriever) + +# query = "Where is Eiffel Tower?" +# pipeline = SearchSummarizationPipeline(retriever=retriever, summarizer=summarizer, return_in_answer_format=True) +# output = pipeline.run( +# query=query, params={"Retriever": {"top_k": 2}, "Summarizer": {"generate_single_summary": True}} +# ) +# answers = output["answers"] +# assert len(answers) == 1 +# assert answers[0]["answer"] in EXPECTED_ONE_SUMMARIES + + +# @pytest.mark.summarizer +# def test_metadata_summarizer(summarizer): +# docs = [ +# Document( +# content="""PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires. Nearly 800 thousand customers were scheduled to be affected by the shutoffs which were expected to last through at least midday tomorrow.""", +# meta={ +# "sub_content": "Pegasus Example", +# "topic": "California's Electricity", +# "context": "Dummy - PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires.", +# }, +# ), +# Document( +# content="""The tower is 324 metres (1,063 ft) tall, about the same height as an 81-storey building, and the tallest structure in Paris. Its base is square, measuring 125 metres (410 ft) on each side. During its construction, the Eiffel Tower surpassed the Washington Monument to become the tallest man-made structure in the world, a title it held for 41 years until the Chrysler Building in New York City was finished in 1930. It was the first structure to reach a height of 300 metres. Due to the addition of a broadcasting aerial at the top of the tower in 1957, it is now taller than the Chrysler Building by 5.2 metres (17 ft). Excluding transmitters, the Eiffel Tower is the second tallest free-standing structure in France after the Millau Viaduct.""", +# meta={"sub_content": "Paris best tour best tour", "topic": "Eiffel tower"}, +# ), +# ] +# # Original input is overwritten after the "predict". So adding the same input as check_output to assess the output +# check_output = deepcopy(docs) + +# summary = summarizer.predict(documents=docs) + +# assert len(summary[0].meta) == len(check_output[0].meta) +# assert len(summary[1].meta) - 1 == len(check_output[1].meta) +# assert ( +# summary[0].meta["context"] +# == """PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires. Nearly 800 thousand customers were scheduled to be affected by the shutoffs which were expected to last through at least midday tomorrow.""" +# ) + +# summary = summarizer.predict(documents=docs, generate_single_summary=True) + +# assert len(summary) == 1 +# summary[0].meta.pop("context") +# assert not summary[0].meta # Remaining metadata is not returned in case of a single summary diff --git a/test/nodes/test_summarizer_translation.py b/test/nodes/test_summarizer_translation.py index bc5d8402f0..fe62dabf96 100644 --- a/test/nodes/test_summarizer_translation.py +++ b/test/nodes/test_summarizer_translation.py @@ -24,13 +24,11 @@ def test_summarization_pipeline_with_translator( pipeline = TranslationWrapperPipeline( input_translator=de_to_en_translator, output_translator=en_to_de_translator, pipeline=base_pipeline ) - output = pipeline.run( - query=query, params={"Retriever": {"top_k": 2}, "Summarizer": {"generate_single_summary": True}} - ) + output = pipeline.run(query=query, params={"Retriever": {"top_k": 2}}) # SearchSummarizationPipeline return answers but Summarizer return documents documents = output["documents"] assert len(documents) == 1 - assert documents[0].content in [ + assert documents[0].meta["summary"] in [ "Der Eiffelturm in Paris ist die höchste von Menschen geschaffene Struktur der Welt geworden.", "Der Eiffelturm in Paris hat offiziell seine Türen für die Öffentlichkeit geöffnet.", ] From 8197f4862de8b1063e04d4feed5b4a790ff4417f Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Fri, 21 Oct 2022 20:59:09 +0200 Subject: [PATCH 02/23] update schemas --- .../haystack-pipeline-1.11.0rc0.schema.json | 10 ---------- .../json-schemas/haystack-pipeline-main.schema.json | 10 ---------- 2 files changed, 20 deletions(-) diff --git a/haystack/json-schemas/haystack-pipeline-1.11.0rc0.schema.json b/haystack/json-schemas/haystack-pipeline-1.11.0rc0.schema.json index 5411f0e63b..cd3f016836 100644 --- a/haystack/json-schemas/haystack-pipeline-1.11.0rc0.schema.json +++ b/haystack/json-schemas/haystack-pipeline-1.11.0rc0.schema.json @@ -5969,11 +5969,6 @@ ], "title": "Devices" }, - "generate_single_summary": { - "default": false, - "title": "Generate Single Summary", - "type": "boolean" - }, "max_length": { "default": 200, "title": "Max Length", @@ -6005,11 +6000,6 @@ "title": "Progress Bar", "type": "boolean" }, - "separator_for_single_summary": { - "default": " ", - "title": "Separator For Single Summary", - "type": "string" - }, "tokenizer": { "anyOf": [ { diff --git a/haystack/json-schemas/haystack-pipeline-main.schema.json b/haystack/json-schemas/haystack-pipeline-main.schema.json index a286d64d16..27f932748d 100644 --- a/haystack/json-schemas/haystack-pipeline-main.schema.json +++ b/haystack/json-schemas/haystack-pipeline-main.schema.json @@ -5969,11 +5969,6 @@ ], "title": "Devices" }, - "generate_single_summary": { - "default": false, - "title": "Generate Single Summary", - "type": "boolean" - }, "max_length": { "default": 200, "title": "Max Length", @@ -6005,11 +6000,6 @@ "title": "Progress Bar", "type": "boolean" }, - "separator_for_single_summary": { - "default": " ", - "title": "Separator For Single Summary", - "type": "string" - }, "tokenizer": { "anyOf": [ { From 9e850c2e4e4c7ced15e9af7db4923c5bd95b062d Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Fri, 21 Oct 2022 21:31:09 +0200 Subject: [PATCH 03/23] remove unused import --- haystack/nodes/summarizer/transformers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/haystack/nodes/summarizer/transformers.py b/haystack/nodes/summarizer/transformers.py index ab5a5e8602..d4b583a615 100644 --- a/haystack/nodes/summarizer/transformers.py +++ b/haystack/nodes/summarizer/transformers.py @@ -6,7 +6,6 @@ import torch from tqdm.auto import tqdm from transformers import pipeline -from transformers.models.auto.modeling_auto import AutoModelForSeq2SeqLM from haystack.schema import Document from haystack.nodes.summarizer.base import BaseSummarizer From b69966c65e20407dc3edd26b1ef3cd60895bc33b Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Fri, 21 Oct 2022 23:12:39 +0200 Subject: [PATCH 04/23] fix mypy --- haystack/nodes/summarizer/transformers.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/haystack/nodes/summarizer/transformers.py b/haystack/nodes/summarizer/transformers.py index d4b583a615..2f6f7851ad 100644 --- a/haystack/nodes/summarizer/transformers.py +++ b/haystack/nodes/summarizer/transformers.py @@ -190,9 +190,13 @@ def predict_batch( single_doc_list = isinstance(documents[0], Document) if single_doc_list: - contexts = [doc.content for doc in documents] + contexts = [doc.content for doc in documents if isinstance(doc, Document)] else: - contexts = [[doc.content for doc in docs] for docs in documents if isinstance(docs, list)] + contexts = [ + [doc.content for doc in docs if isinstance(doc, Document)] + for docs in documents + if isinstance(docs, list) + ] number_of_docs = [len(context_group) for context_group in contexts] contexts = list(itertools.chain.from_iterable(contexts)) From 2fcf55d40e0b1147f3222ddf07d8607e2dbaab2c Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Sat, 22 Oct 2022 10:00:29 +0200 Subject: [PATCH 05/23] fix mypy --- haystack/nodes/summarizer/transformers.py | 24 +++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/haystack/nodes/summarizer/transformers.py b/haystack/nodes/summarizer/transformers.py index 2f6f7851ad..1837958393 100644 --- a/haystack/nodes/summarizer/transformers.py +++ b/haystack/nodes/summarizer/transformers.py @@ -188,8 +188,8 @@ def predict_batch( if batch_size is None: batch_size = self.batch_size - single_doc_list = isinstance(documents[0], Document) - if single_doc_list: + is_doclist_flat = isinstance(documents[0], Document) + if is_doclist_flat: contexts = [doc.content for doc in documents if isinstance(doc, Document)] else: contexts = [ @@ -232,12 +232,17 @@ def predict_batch( ): summaries.extend(summary_batch) - result = [] - if single_doc_list: - for summary, document in zip(summaries, documents): + if is_doclist_flat: + flat_result: List[Document] = [] + flat_doc_list: List[Document] = [doc for doc in documents if isinstance(doc, Document)] + for summary, document in zip(summaries, flat_doc_list): document.meta.update({"summary": summary["summary_text"]}) - result.append(document) + flat_result.append(document) + return flat_result else: + nested_result: List[List[Document]] = [] + nested_doc_list: List[List[Document]] = [lst for lst in documents if isinstance(lst, list)] + # Group summaries together grouped_summaries = [] left_idx = 0 @@ -247,11 +252,10 @@ def predict_batch( grouped_summaries.append(summaries[left_idx:right_idx]) left_idx = right_idx - for summary_group, docs_group in zip(grouped_summaries, documents): + for summary_group, docs_group in zip(grouped_summaries, nested_doc_list): cur_summaries = [] for summary, document in zip(summary_group, docs_group): document.meta.update({"summary": summary["summary_text"]}) cur_summaries.append(document) - result.append(cur_summaries) - - return result + nested_result.append(cur_summaries) + return nested_result From 39c7a84f3c07d668ad8d5f85a950c8bf423ac6b0 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Sat, 22 Oct 2022 11:47:57 +0200 Subject: [PATCH 06/23] test: summarizer doesnt change content --- test/pipelines/test_eval_batch.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/test/pipelines/test_eval_batch.py b/test/pipelines/test_eval_batch.py index a21dfff2f5..7962fe8902 100644 --- a/test/pipelines/test_eval_batch.py +++ b/test/pipelines/test_eval_batch.py @@ -54,6 +54,7 @@ def test_generativeqa_calculate_metrics( @pytest.mark.parametrize("retriever_with_docs", ["embedding"], indirect=True) def test_summarizer_calculate_metrics(document_store_with_docs: ElasticsearchDocumentStore, retriever_with_docs): document_store_with_docs.update_embeddings(retriever=retriever_with_docs) + print(document_store_with_docs.get_all_documents()) summarizer = TransformersSummarizer(model_name_or_path="sshleifer/distill-pegasus-xsum-16-4", use_gpu=False) pipeline = SearchSummarizationPipeline( retriever=retriever_with_docs, summarizer=summarizer, return_in_answer_format=True @@ -62,6 +63,8 @@ def test_summarizer_calculate_metrics(document_store_with_docs: ElasticsearchDoc labels=EVAL_LABELS, params={"Retriever": {"top_k": 5}}, context_matching_min_length=10 ) + print(eval_result) + metrics = eval_result.calculate_metrics(document_scope="context") assert "Retriever" in eval_result @@ -75,11 +78,11 @@ def test_summarizer_calculate_metrics(document_store_with_docs: ElasticsearchDoc assert metrics["Retriever"]["precision"] == 1.0 assert metrics["Retriever"]["ndcg"] == pytest.approx(0.9461, 1e-4) assert metrics["Summarizer"]["mrr"] == 1.0 - assert metrics["Summarizer"]["map"] == 0.735 - assert metrics["Summarizer"]["recall_multi_hit"] == 0.8 + assert metrics["Summarizer"]["map"] == pytest.approx(0.9167, 1e-4) + assert metrics["Summarizer"]["recall_multi_hit"] == pytest.approx(0.9167, 1e-4) assert metrics["Summarizer"]["recall_single_hit"] == 1.0 - assert metrics["Summarizer"]["precision"] == 0.8 - assert metrics["Summarizer"]["ndcg"] == pytest.approx(0.8422, 1e-4) + assert metrics["Summarizer"]["precision"] == 1.0 + assert metrics["Summarizer"]["ndcg"] == pytest.approx(0.9461, 1e-4) EVAL_LABELS = [ From 56141613edf48447454689eabc708b3cf8c64b49 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Sat, 22 Oct 2022 12:03:57 +0200 Subject: [PATCH 07/23] other test correction --- test/pipelines/test_eval.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/pipelines/test_eval.py b/test/pipelines/test_eval.py index d9a1ec34b5..b6579a4a31 100644 --- a/test/pipelines/test_eval.py +++ b/test/pipelines/test_eval.py @@ -75,11 +75,11 @@ def test_summarizer_calculate_metrics(document_store_with_docs: ElasticsearchDoc assert metrics["Retriever"]["precision"] == 1.0 assert metrics["Retriever"]["ndcg"] == pytest.approx(0.9461, 1e-4) assert metrics["Summarizer"]["mrr"] == 1.0 - assert metrics["Summarizer"]["map"] == 0.735 - assert metrics["Summarizer"]["recall_multi_hit"] == 0.8 + assert metrics["Summarizer"]["map"] == pytest.approx(0.9167, 1e-4) + assert metrics["Summarizer"]["recall_multi_hit"] == pytest.approx(0.9167, 1e-4) assert metrics["Summarizer"]["recall_single_hit"] == 1.0 - assert metrics["Summarizer"]["precision"] == 0.8 - assert metrics["Summarizer"]["ndcg"] == pytest.approx(0.8422, 1e-4) + assert metrics["Summarizer"]["precision"] == 1.0 + assert metrics["Summarizer"]["ndcg"] == pytest.approx(0.9461, 1e-4) @pytest.mark.parametrize("document_store", ["elasticsearch", "faiss", "memory", "milvus1"], indirect=True) From 5301120113314efe26054d3e748a604b67846012 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Sun, 23 Oct 2022 11:26:27 +0200 Subject: [PATCH 08/23] move test_summarizer_translation to test_extractor_translation --- test/nodes/test_extractor_translation.py | 33 ++++++++++++++++++++++ test/nodes/test_summarizer_translation.py | 34 ----------------------- 2 files changed, 33 insertions(+), 34 deletions(-) create mode 100644 test/nodes/test_extractor_translation.py delete mode 100644 test/nodes/test_summarizer_translation.py diff --git a/test/nodes/test_extractor_translation.py b/test/nodes/test_extractor_translation.py new file mode 100644 index 0000000000..350587b5b5 --- /dev/null +++ b/test/nodes/test_extractor_translation.py @@ -0,0 +1,33 @@ +import pytest + +from haystack.pipelines import TranslationWrapperPipeline, ExtractiveQAPipeline +from haystack.nodes import DensePassageRetriever, EmbeddingRetriever +from .test_summarizer import SPLIT_DOCS + +# Keeping few (retriever,document_store) combination to reduce test time +@pytest.mark.integration +@pytest.mark.elasticsearch +@pytest.mark.summarizer +@pytest.mark.parametrize( + "retriever,document_store,reader", + [("embedding", "memory", "farm"), ("elasticsearch", "elasticsearch", "farm")], + indirect=True, +) +def test_extractive_qa_pipeline_with_translator( + document_store, retriever, reader, en_to_de_translator, de_to_en_translator +): + document_store.write_documents(SPLIT_DOCS) + + if isinstance(retriever, EmbeddingRetriever) or isinstance(retriever, DensePassageRetriever): + document_store.update_embeddings(retriever=retriever) + + query = "Wo steht der Eiffelturm?" + base_pipeline = ExtractiveQAPipeline(retriever=retriever, reader=reader) + pipeline = TranslationWrapperPipeline( + input_translator=de_to_en_translator, output_translator=en_to_de_translator, pipeline=base_pipeline + ) + output = pipeline.run(query=query, params={"Retriever": {"top_k": 2}}) + assert len(output["documents"]) == 2 + answers_texts = [el["answer"] for el in output["answers"]] + + assert "Frankreich" in answers_texts diff --git a/test/nodes/test_summarizer_translation.py b/test/nodes/test_summarizer_translation.py deleted file mode 100644 index fe62dabf96..0000000000 --- a/test/nodes/test_summarizer_translation.py +++ /dev/null @@ -1,34 +0,0 @@ -import pytest - -from haystack.pipelines import TranslationWrapperPipeline, SearchSummarizationPipeline -from haystack.nodes import DensePassageRetriever, EmbeddingRetriever -from .test_summarizer import SPLIT_DOCS - -# Keeping few (retriever,document_store) combination to reduce test time -@pytest.mark.integration -@pytest.mark.elasticsearch -@pytest.mark.summarizer -@pytest.mark.parametrize( - "retriever,document_store", [("embedding", "memory"), ("elasticsearch", "elasticsearch")], indirect=True -) -def test_summarization_pipeline_with_translator( - document_store, retriever, summarizer, en_to_de_translator, de_to_en_translator -): - document_store.write_documents(SPLIT_DOCS) - - if isinstance(retriever, EmbeddingRetriever) or isinstance(retriever, DensePassageRetriever): - document_store.update_embeddings(retriever=retriever) - - query = "Wo steht der Eiffelturm?" - base_pipeline = SearchSummarizationPipeline(retriever=retriever, summarizer=summarizer) - pipeline = TranslationWrapperPipeline( - input_translator=de_to_en_translator, output_translator=en_to_de_translator, pipeline=base_pipeline - ) - output = pipeline.run(query=query, params={"Retriever": {"top_k": 2}}) - # SearchSummarizationPipeline return answers but Summarizer return documents - documents = output["documents"] - assert len(documents) == 1 - assert documents[0].meta["summary"] in [ - "Der Eiffelturm in Paris ist die höchste von Menschen geschaffene Struktur der Welt geworden.", - "Der Eiffelturm in Paris hat offiziell seine Türen für die Öffentlichkeit geöffnet.", - ] From 4e0f142d10759512b379df6d8b4356a05562dc62 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Sun, 23 Oct 2022 11:32:41 +0200 Subject: [PATCH 09/23] fix test --- test/nodes/test_extractor_translation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/nodes/test_extractor_translation.py b/test/nodes/test_extractor_translation.py index 350587b5b5..5d028f5e41 100644 --- a/test/nodes/test_extractor_translation.py +++ b/test/nodes/test_extractor_translation.py @@ -4,7 +4,7 @@ from haystack.nodes import DensePassageRetriever, EmbeddingRetriever from .test_summarizer import SPLIT_DOCS -# Keeping few (retriever,document_store) combination to reduce test time +# Keeping few (retriever,document_store,reader) combination to reduce test time @pytest.mark.integration @pytest.mark.elasticsearch @pytest.mark.summarizer @@ -28,6 +28,6 @@ def test_extractive_qa_pipeline_with_translator( ) output = pipeline.run(query=query, params={"Retriever": {"top_k": 2}}) assert len(output["documents"]) == 2 - answers_texts = [el["answer"] for el in output["answers"]] + answers_texts = [el.answer for el in output["answers"]] assert "Frankreich" in answers_texts From a23385ace4009f1735ff771097b0c8721f6f78a5 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Thu, 27 Oct 2022 16:32:53 +0200 Subject: [PATCH 10/23] first try for doc merger --- haystack/nodes/other/document_merger.py | 83 +++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 haystack/nodes/other/document_merger.py diff --git a/haystack/nodes/other/document_merger.py b/haystack/nodes/other/document_merger.py new file mode 100644 index 0000000000..65a4ebc53c --- /dev/null +++ b/haystack/nodes/other/document_merger.py @@ -0,0 +1,83 @@ +from collections import defaultdict +import logging +from math import inf + +from typing import Optional, List + +from haystack.schema import Document +from haystack.nodes import BaseNode + +logger = logging.getLogger(__name__) + + +class DocumentMerger(BaseNode): + """ + A node to merge the texts of the documents. + """ + + outgoing_edges = 1 + + def __init__(self, separator: str = " "): + """ + :param join_mode: `concatenate` to combine documents from multiple retrievers `merge` to aggregate scores of + individual documents, `reciprocal_rank_fusion` to apply rank based scoring. + :param weights: A node-wise list(length of list must be equal to the number of input nodes) of weights for + adjusting document scores when using the `merge` join_mode. By default, equal weight is given + to each retriever score. This param is not compatible with the `concatenate` join_mode. + :param top_k_join: Limit documents to top_k based on the resulting scores of the join. + :param sort_by_score: Whether to sort the incoming documents by their score. Set this to True if all your + Documents are coming with `score` values. Set to False if any of the Documents come + from sources where the `score` is set to `None`, like `TfidfRetriever` on Elasticsearch. + """ + super().__init__() + + self.separator = separator + + def merge(self, documents: List[Document], separator: Optional[str]) -> List[Document]: + + if len(documents) == 0: + raise AttributeError("Document Merger needs at least one document to merge.") + + if not all([doc.content_type == "text" for doc in documents]): + raise AttributeError( + "Not all the provided documents are textual. Document Merger only works on textual documents." + ) + + if separator is None: + separator = self.separator + + merged_content = separator.join([doc.content for doc in documents]) + + # better with other solutions? + # how to approach nested? + common_meta_dict = deepcopy(documents[0].meta) + for doc in documents[1:]: + if len(common_meta_dict) == 0: + break + for k, v in doc.meta.items(): + if k in common_meta_dict: + if common_meta_dict[k] != v: + del common_meta_dict[k] + + # common_meta_keys = set(documents[0].meta.keys()) + # for doc in documents[1:]: + # common_meta_keys.intersection_update(set(doc.meta.keys())) + + # for doc in documents: + # common_meta_keys=set.intersection(*map(set, my_dict)) + # for k,v doc.meta.items() + + return None + + def run(self, documents: List[Document], separator: Optional[str]): # type: ignore + """Method that gets executed when this class is used as a Node in a Haystack Pipeline""" + return None + + def run_batch( # type: ignore + self, + documents: Union[List[Document], List[List[Document]]], + separator: Optional[str], + batch_size: Optional[int] = None, + ): + + return None, "output_1" From 0401461616074a4c6dba0910135491737ff4b3ca Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Fri, 28 Oct 2022 17:47:32 +0200 Subject: [PATCH 11/23] reintroduce and deprecate generate_single_summary --- haystack/nodes/summarizer/base.py | 21 +++++++++++----- haystack/nodes/summarizer/transformers.py | 29 +++++++++++++++++++++-- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/haystack/nodes/summarizer/base.py b/haystack/nodes/summarizer/base.py index 917f1723ea..17198cdadd 100644 --- a/haystack/nodes/summarizer/base.py +++ b/haystack/nodes/summarizer/base.py @@ -14,34 +14,43 @@ class BaseSummarizer(BaseComponent): outgoing_edges = 1 @abstractmethod - def predict(self, documents: List[Document]) -> List[Document]: + def predict(self, documents: List[Document], generate_single_summary: Optional[bool] = None) -> List[Document]: """ Abstract method for creating a summary. :param documents: Related documents (e.g. coming from a retriever) that the answer shall be conditioned on. + :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 :return: List of Documents, where Document.meta["summary"] contains the summarization """ pass @abstractmethod def predict_batch( - self, documents: Union[List[Document], List[List[Document]]], batch_size: Optional[int] = None + self, + documents: Union[List[Document], List[List[Document]]], + generate_single_summary: Optional[bool] = None, + batch_size: Optional[int] = None, ) -> Union[List[Document], List[List[Document]]]: pass - def run(self, documents: List[Document]): # type: ignore + def run(self, documents: List[Document], generate_single_summary: Optional[bool] = None): # type: ignore results: Dict = {"documents": []} if documents: - results["documents"] = self.predict(documents=documents) + results["documents"] = self.predict(documents=documents, generate_single_summary=generate_single_summary) return results, "output_1" def run_batch( # type: ignore - self, documents: Union[List[Document], List[List[Document]]], batch_size: Optional[int] = None + self, + documents: Union[List[Document], List[List[Document]]], + generate_single_summary: Optional[bool] = None, + batch_size: Optional[int] = None, ): - results = self.predict_batch(documents=documents, batch_size=batch_size) + results = self.predict_batch( + documents=documents, batch_size=batch_size, generate_single_summary=generate_single_summary + ) return {"documents": results}, "output_1" diff --git a/haystack/nodes/summarizer/transformers.py b/haystack/nodes/summarizer/transformers.py index 1837958393..322a5f3b3b 100644 --- a/haystack/nodes/summarizer/transformers.py +++ b/haystack/nodes/summarizer/transformers.py @@ -60,6 +60,8 @@ def __init__( min_length: int = 5, use_gpu: bool = True, clean_up_tokenization_spaces: bool = True, + separator_for_single_summary: str = " ", + generate_single_summary: bool = False, batch_size: int = 16, progress_bar: bool = True, use_auth_token: Optional[Union[str, bool]] = None, @@ -79,6 +81,8 @@ def __init__( :param min_length: Minimum length of summarized text :param use_gpu: Whether to use GPU (if available). :param clean_up_tokenization_spaces: Whether or not to clean up the potential extra spaces in the text output + :param separator_for_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 + :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 :param batch_size: Number of documents to process at a time. :param progress_bar: Whether to show a progress bar. :param use_auth_token: The API token used to download private models from Huggingface. @@ -93,6 +97,13 @@ def __init__( """ super().__init__() + if generate_single_summary is True: + raise ValueError( + "'generate_single_summary' has been removed. Instead, you can use the Document Merger to merge documents before applying the Summarizer." + ) + self.separator_for_single_summary = separator_for_single_summary + self.generate_single_summary = generate_single_summary + self.devices, _ = initialize_device_settings(devices=devices, use_cuda=use_gpu, multi_gpu=False) if len(self.devices) > 1: logger.warning( @@ -118,14 +129,20 @@ def __init__( self.batch_size = batch_size self.progress_bar = progress_bar - def predict(self, documents: List[Document]) -> List[Document]: + def predict(self, documents: List[Document], generate_single_summary: Optional[bool] = None) -> List[Document]: """ Produce the summarization from the supplied documents. These document can for example be retrieved via the Retriever. :param documents: Related documents (e.g. coming from a retriever) that the answer shall be conditioned on. + :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 :return: List of Documents, where Document.meta["summary"] contains the summarization """ + if generate_single_summary is True: + raise ValueError( + "'generate_single_summary' has been removed. Instead, you can use the Document Merger to merge documents before applying the Summarizer." + ) + if self.min_length > self.max_length: raise AttributeError("min_length cannot be greater than max_length") @@ -166,7 +183,10 @@ def predict(self, documents: List[Document]) -> List[Document]: return result def predict_batch( - self, documents: Union[List[Document], List[List[Document]]], batch_size: Optional[int] = None + self, + documents: Union[List[Document], List[List[Document]]], + generate_single_summary: Optional[bool] = None, + batch_size: Optional[int] = None, ) -> Union[List[Document], List[List[Document]]]: """ Produce the summarization from the supplied documents. @@ -174,8 +194,13 @@ def predict_batch( :param documents: Single list of related documents or list of lists of related documents (e.g. coming from a retriever) that the answer shall be conditioned on. + :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 :param batch_size: Number of Documents to process at a time. """ + if generate_single_summary is True: + raise ValueError( + "'generate_single_summary' has been removed. Instead, you can use the Document Merger to merge documents before applying the Summarizer." + ) if self.min_length > self.max_length: raise AttributeError("min_length cannot be greater than max_length") From 5a6a537700cd5eff108bca2f75d61925fc830c95 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Sun, 30 Oct 2022 12:58:06 +0100 Subject: [PATCH 12/23] progress in document merger --- haystack/nodes/other/document_merger.py | 50 +++++++++++++++++++------ 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/haystack/nodes/other/document_merger.py b/haystack/nodes/other/document_merger.py index 65a4ebc53c..2a0551c46e 100644 --- a/haystack/nodes/other/document_merger.py +++ b/haystack/nodes/other/document_merger.py @@ -1,8 +1,9 @@ -from collections import defaultdict +from collections import MutableMapping import logging from math import inf +from copy import deepcopy -from typing import Optional, List +from typing import Optional, List, Dict, Any from haystack.schema import Document from haystack.nodes import BaseNode @@ -40,7 +41,7 @@ def merge(self, documents: List[Document], separator: Optional[str]) -> List[Doc if not all([doc.content_type == "text" for doc in documents]): raise AttributeError( - "Not all the provided documents are textual. Document Merger only works on textual documents." + "Some of the documents provided are non-textual. Document Merger only works on textual documents." ) if separator is None: @@ -48,16 +49,17 @@ def merge(self, documents: List[Document], separator: Optional[str]) -> List[Doc merged_content = separator.join([doc.content for doc in documents]) - # better with other solutions? - # how to approach nested? - common_meta_dict = deepcopy(documents[0].meta) - for doc in documents[1:]: - if len(common_meta_dict) == 0: + flattened_meta = [self._flatten_dict(d["meta"]) for d in documents] + + common_meta_flat_dict = deepcopy(flattened_meta[0]) + for doc in flattened_meta[1:]: + if len(common_meta_flat_dict) == 0: break - for k, v in doc.meta.items(): - if k in common_meta_dict: - if common_meta_dict[k] != v: - del common_meta_dict[k] + for k, v in doc.items(): + if k in common_meta_flat_dict: + if common_meta_flat_dict[k] != v: + del common_meta_flat_dict[k] + common_meta_nest_dict = self._nest_dict(common_meta_flat_dict) # common_meta_keys = set(documents[0].meta.keys()) # for doc in documents[1:]: @@ -81,3 +83,27 @@ def run_batch( # type: ignore ): return None, "output_1" + + def _flatten_dict(self, d: dict, parent_key="") -> dict: + items: List = [] + for k, v in d.items(): + new_key = (parent_key, k) if parent_key else k + if isinstance(v, MutableMapping): + items.extend(self._flatten_dict(v, new_key).items()) + else: + items.append((new_key, v)) + return dict(items) + + def _nest_dict(self, d: dict) -> dict: + nested_dict = {} + for key, value in d.items(): + target = nested_dict + if isinstance(key, tuple): + for k in key[:-1]: # traverse all keys but the last + target = target.setdefault(k, {}) + target[key[-1]] = value + else: + target[key] = value + while any(isinstance(k, tuple) for k in nested_dict.keys()): + nested_dict = nest(nested_dict) + return common_meta_nested_dict From 42ecf96d24b5c8e441e9029c3745873af2f79b60 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Mon, 31 Oct 2022 12:35:46 +0100 Subject: [PATCH 13/23] document merger! --- haystack/nodes/other/document_merger.py | 89 ++++++++++---------- haystack/pipelines/standard_pipelines.py | 18 +++- test/nodes/test_document_merger.py | 97 +++++++++++++++++++++ test/nodes/test_summarizer.py | 103 +++++++++-------------- 4 files changed, 195 insertions(+), 112 deletions(-) create mode 100644 test/nodes/test_document_merger.py diff --git a/haystack/nodes/other/document_merger.py b/haystack/nodes/other/document_merger.py index 2a0551c46e..f27ec9325f 100644 --- a/haystack/nodes/other/document_merger.py +++ b/haystack/nodes/other/document_merger.py @@ -1,17 +1,15 @@ from collections import MutableMapping import logging -from math import inf from copy import deepcopy - -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Union from haystack.schema import Document -from haystack.nodes import BaseNode +from haystack.nodes.base import BaseComponent logger = logging.getLogger(__name__) -class DocumentMerger(BaseNode): +class DocumentMerger(BaseComponent): """ A node to merge the texts of the documents. """ @@ -20,25 +18,20 @@ class DocumentMerger(BaseNode): def __init__(self, separator: str = " "): """ - :param join_mode: `concatenate` to combine documents from multiple retrievers `merge` to aggregate scores of - individual documents, `reciprocal_rank_fusion` to apply rank based scoring. - :param weights: A node-wise list(length of list must be equal to the number of input nodes) of weights for - adjusting document scores when using the `merge` join_mode. By default, equal weight is given - to each retriever score. This param is not compatible with the `concatenate` join_mode. - :param top_k_join: Limit documents to top_k based on the resulting scores of the join. - :param sort_by_score: Whether to sort the incoming documents by their score. Set this to True if all your - Documents are coming with `score` values. Set to False if any of the Documents come - from sources where the `score` is set to `None`, like `TfidfRetriever` on Elasticsearch. + :param separator: The separator that appears between subsequent merged documents. """ super().__init__() - self.separator = separator - def merge(self, documents: List[Document], separator: Optional[str]) -> List[Document]: + def merge(self, documents: List[Document], separator: Optional[str] = None) -> List[Document]: + """ + Produce a list made up of a single document, which contains all the texts of the documents provided. + :param separator: The separator that appears between subsequent merged documents. + :return: List of Documents + """ if len(documents) == 0: raise AttributeError("Document Merger needs at least one document to merge.") - if not all([doc.content_type == "text" for doc in documents]): raise AttributeError( "Some of the documents provided are non-textual. Document Merger only works on textual documents." @@ -48,9 +41,37 @@ def merge(self, documents: List[Document], separator: Optional[str]) -> List[Doc separator = self.separator merged_content = separator.join([doc.content for doc in documents]) + common_meta = self._extract_common_meta_dict(documents) - flattened_meta = [self._flatten_dict(d["meta"]) for d in documents] + merged_document = Document(content=merged_content, meta=common_meta) + return [merged_document] + def run(self, documents: List[Document], separator: Optional[str] = None): # type: ignore + results: Dict = {"documents": []} + if documents: + results["documents"] = self.merge(documents=documents, separator=separator) + return results, "output_1" + + def run_batch( # type: ignore + self, documents: Union[List[Document], List[List[Document]]], separator: Optional[str] = None + ): + is_doclist_flat = isinstance(documents[0], Document) + if is_doclist_flat: + flat_result: List[Document] = [] + flat_result = self.merge(documents=documents, separator=separator) + return {"documents": flat_result}, "output_1" + else: + nested_result: List[List[Document]] = [] + for docs_group in documents: + nested_result.append(self.merge(documents=docs_group, separator=separator)) + return {"documents": nested_result}, "output_1" + + def _extract_common_meta_dict(self, documents: List[Document]) -> dict: + """ + Given a list of documents, extract a dictionary containing the meta fields + that are common to all the documents + """ + flattened_meta = [self._flatten_dict(d.meta) for d in documents] common_meta_flat_dict = deepcopy(flattened_meta[0]) for doc in flattened_meta[1:]: if len(common_meta_flat_dict) == 0: @@ -59,30 +80,8 @@ def merge(self, documents: List[Document], separator: Optional[str]) -> List[Doc if k in common_meta_flat_dict: if common_meta_flat_dict[k] != v: del common_meta_flat_dict[k] - common_meta_nest_dict = self._nest_dict(common_meta_flat_dict) - - # common_meta_keys = set(documents[0].meta.keys()) - # for doc in documents[1:]: - # common_meta_keys.intersection_update(set(doc.meta.keys())) - - # for doc in documents: - # common_meta_keys=set.intersection(*map(set, my_dict)) - # for k,v doc.meta.items() - - return None - - def run(self, documents: List[Document], separator: Optional[str]): # type: ignore - """Method that gets executed when this class is used as a Node in a Haystack Pipeline""" - return None - - def run_batch( # type: ignore - self, - documents: Union[List[Document], List[List[Document]]], - separator: Optional[str], - batch_size: Optional[int] = None, - ): - - return None, "output_1" + common_meta_nested_dict = self._nest_dict(common_meta_flat_dict) + return common_meta_nested_dict def _flatten_dict(self, d: dict, parent_key="") -> dict: items: List = [] @@ -95,7 +94,7 @@ def _flatten_dict(self, d: dict, parent_key="") -> dict: return dict(items) def _nest_dict(self, d: dict) -> dict: - nested_dict = {} + nested_dict: dict = {} for key, value in d.items(): target = nested_dict if isinstance(key, tuple): @@ -105,5 +104,5 @@ def _nest_dict(self, d: dict) -> dict: else: target[key] = value while any(isinstance(k, tuple) for k in nested_dict.keys()): - nested_dict = nest(nested_dict) - return common_meta_nested_dict + nested_dict = self._nest_dict(nested_dict) + return nested_dict diff --git a/haystack/pipelines/standard_pipelines.py b/haystack/pipelines/standard_pipelines.py index 08c4a8705d..8a2d8e1e86 100644 --- a/haystack/pipelines/standard_pipelines.py +++ b/haystack/pipelines/standard_pipelines.py @@ -4,6 +4,7 @@ from pathlib import Path from functools import wraps from typing import List, Optional, Dict, Any, Union +from haystack.nodes.base import BaseComponent try: from typing import Literal @@ -18,6 +19,7 @@ from haystack.nodes.summarizer.base import BaseSummarizer from haystack.nodes.translator.base import BaseTranslator from haystack.nodes.question_generator.question_generator import QuestionGenerator +from haystack.nodes.other.document_merger import DocumentMerger from haystack.document_stores.base import BaseDocumentStore from haystack.pipelines.base import Pipeline @@ -456,17 +458,29 @@ class SearchSummarizationPipeline(BaseStandardPipeline): Pipeline that retrieves documents for a query and then summarizes those documents. """ - def __init__(self, summarizer: BaseSummarizer, retriever: BaseRetriever, return_in_answer_format: bool = False): + def __init__( + self, + summarizer: BaseSummarizer, + retriever: BaseRetriever, + generate_single_summary: bool = False, + return_in_answer_format: bool = False, + ): """ :param summarizer: Summarizer instance :param retriever: Retriever instance + :param generate_single_summary: Whether to generate a single summary for all documents or one summary per document. :param return_in_answer_format: Whether the results should be returned as documents (False) or in the answer format used in other QA pipelines (True). With the latter, you can use this pipeline as a "drop-in replacement" for other QA pipelines. """ self.pipeline = Pipeline() self.pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) - self.pipeline.add_node(component=summarizer, name="Summarizer", inputs=["Retriever"]) + if generate_single_summary is True: + document_merger = DocumentMerger() + self.pipeline.add_node(component=document_merger, name="Document Merger", inputs=["Retriever"]) + self.pipeline.add_node(component=summarizer, name="Summarizer", inputs=["Document Merger"]) + else: + self.pipeline.add_node(component=summarizer, name="Summarizer", inputs=["Retriever"]) self.return_in_answer_format = return_in_answer_format def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = None): diff --git a/test/nodes/test_document_merger.py b/test/nodes/test_document_merger.py new file mode 100644 index 0000000000..9a8ad78988 --- /dev/null +++ b/test/nodes/test_document_merger.py @@ -0,0 +1,97 @@ +from haystack import Document +from haystack.nodes.other.document_merger import DocumentMerger + +doc_dicts = [ + { + "meta": { + "name": "name_1", + "year": "2020", + "month": "01", + "flat_field": 1, + "nested_field": {1: 2, "a": 5, "c": {"3": 3}}, + }, + "content": "text_1", + }, + { + "meta": { + "name": "name_2", + "year": "2020", + "month": "02", + "flat_field": 1, + "nested_field": {1: 2, "a": 5, "c": {"3": 3}}, + }, + "content": "text_2", + }, + { + "meta": { + "name": "name_3", + "year": "2020", + "month": "03", + "flat_field": 1, + "nested_field": {1: 2, "a": 7, "c": {"3": 3}}, + }, + "content": "text_3", + }, + { + "meta": { + "name": "name_4", + "year": "2021", + "month": "01", + "flat_field": 1, + "nested_field": {1: 2, "a": 5, "c": {"3": 3}}, + }, + "content": "text_4", + }, + { + "meta": { + "name": "name_5", + "year": "2021", + "month": "02", + "flat_field": 1, + "nested_field": {1: 2, "a": 5, "c": {"3": 3}}, + }, + "content": "text_5", + }, + { + "meta": { + "name": "name_6", + "year": "2021", + "month": "03", + "flat_field": 1, + "nested_field": {1: 2, "a": 5, "c": {"3": 3}}, + }, + "content": "text_6", + }, +] + +documents = [Document.from_dict(doc) for doc in doc_dicts] + + +def test_document_merger_merge(): + separator = "|" + dm = DocumentMerger(separator=separator) + merged_list = dm.merge(documents) + + assert len(merged_list) == 1 + assert merged_list[0].content == separator.join([doc["content"] for doc in doc_dicts]) + assert merged_list[0].meta == {"flat_field": 1, "nested_field": {1: 2, "c": {"3": 3}}} + + +def test_document_merger_run(): + separator = "|" + dm = DocumentMerger(separator=separator) + result = dm.run(documents) + + assert len(result[0]["documents"]) == 1 + assert result[0]["documents"][0].content == separator.join([doc["content"] for doc in doc_dicts]) + assert result[0]["documents"][0].meta == {"flat_field": 1, "nested_field": {1: 2, "c": {"3": 3}}} + + +def test_document_merger_run_batch(): + separator = "|" + dm = DocumentMerger(separator=separator) + batch_result = dm.run_batch([documents, documents]) + + assert len(batch_result[0]["documents"]) == 2 + assert batch_result[0]["documents"][0][0].content == separator.join([doc["content"] for doc in doc_dicts]) + assert batch_result[0]["documents"][0][0].meta == {"flat_field": 1, "nested_field": {1: 2, "c": {"3": 3}}} diff --git a/test/nodes/test_summarizer.py b/test/nodes/test_summarizer.py index 20f0f90ac7..70476f68ad 100644 --- a/test/nodes/test_summarizer.py +++ b/test/nodes/test_summarizer.py @@ -1,10 +1,10 @@ -from copy import deepcopy - import pytest from haystack.schema import Document from haystack.pipelines import SearchSummarizationPipeline -from haystack.nodes import DensePassageRetriever, EmbeddingRetriever, TransformersSummarizer +from haystack.nodes import DensePassageRetriever, EmbeddingRetriever +from haystack.nodes.other.document_merger import DocumentMerger + DOCS = [ Document( @@ -46,14 +46,6 @@ def test_summarization(summarizer): assert expected_summary == summary.meta["summary"] -# @pytest.mark.integration -# @pytest.mark.summarizer -# def test_summarization_one_summary(summarizer): -# summarized_docs = summarizer.predict(documents=SPLIT_DOCS, generate_single_summary=True) -# assert len(summarized_docs) == 1 -# assert EXPECTED_ONE_SUMMARIES[0] == summarized_docs[0].content - - @pytest.mark.integration @pytest.mark.summarizer def test_summarization_batch_single_doc_list(summarizer): @@ -92,57 +84,38 @@ def test_summarization_pipeline(document_store, retriever, summarizer): assert " The Eiffel Tower in Paris has officially opened its doors to the public." == answers[0]["answer"] -# @pytest.mark.integration -# @pytest.mark.summarizer -# @pytest.mark.parametrize( -# "retriever,document_store", [("embedding", "memory"), ("elasticsearch", "elasticsearch")], indirect=True -# ) -# def test_summarization_pipeline_one_summary(document_store, retriever, summarizer): -# document_store.write_documents(SPLIT_DOCS) - -# if isinstance(retriever, EmbeddingRetriever) or isinstance(retriever, DensePassageRetriever): -# document_store.update_embeddings(retriever=retriever) - -# query = "Where is Eiffel Tower?" -# pipeline = SearchSummarizationPipeline(retriever=retriever, summarizer=summarizer, return_in_answer_format=True) -# output = pipeline.run( -# query=query, params={"Retriever": {"top_k": 2}, "Summarizer": {"generate_single_summary": True}} -# ) -# answers = output["answers"] -# assert len(answers) == 1 -# assert answers[0]["answer"] in EXPECTED_ONE_SUMMARIES - - -# @pytest.mark.summarizer -# def test_metadata_summarizer(summarizer): -# docs = [ -# Document( -# content="""PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires. Nearly 800 thousand customers were scheduled to be affected by the shutoffs which were expected to last through at least midday tomorrow.""", -# meta={ -# "sub_content": "Pegasus Example", -# "topic": "California's Electricity", -# "context": "Dummy - PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires.", -# }, -# ), -# Document( -# content="""The tower is 324 metres (1,063 ft) tall, about the same height as an 81-storey building, and the tallest structure in Paris. Its base is square, measuring 125 metres (410 ft) on each side. During its construction, the Eiffel Tower surpassed the Washington Monument to become the tallest man-made structure in the world, a title it held for 41 years until the Chrysler Building in New York City was finished in 1930. It was the first structure to reach a height of 300 metres. Due to the addition of a broadcasting aerial at the top of the tower in 1957, it is now taller than the Chrysler Building by 5.2 metres (17 ft). Excluding transmitters, the Eiffel Tower is the second tallest free-standing structure in France after the Millau Viaduct.""", -# meta={"sub_content": "Paris best tour best tour", "topic": "Eiffel tower"}, -# ), -# ] -# # Original input is overwritten after the "predict". So adding the same input as check_output to assess the output -# check_output = deepcopy(docs) - -# summary = summarizer.predict(documents=docs) - -# assert len(summary[0].meta) == len(check_output[0].meta) -# assert len(summary[1].meta) - 1 == len(check_output[1].meta) -# assert ( -# summary[0].meta["context"] -# == """PG&E stated it scheduled the blackouts in response to forecasts for high winds amid dry conditions. The aim is to reduce the risk of wildfires. Nearly 800 thousand customers were scheduled to be affected by the shutoffs which were expected to last through at least midday tomorrow.""" -# ) - -# summary = summarizer.predict(documents=docs, generate_single_summary=True) - -# assert len(summary) == 1 -# summary[0].meta.pop("context") -# assert not summary[0].meta # Remaining metadata is not returned in case of a single summary +# +# Document Merger + Summarizer tests +# + + +@pytest.mark.integration +@pytest.mark.summarizer +def test_summarization_one_summary(summarizer): + dm = DocumentMerger() + merged_document = dm.merge(documents=SPLIT_DOCS) + print(merged_document) + summarized_docs = summarizer.predict(documents=merged_document) + assert len(summarized_docs) == 1 + assert EXPECTED_ONE_SUMMARIES[0] == summarized_docs[0].meta["summary"] + + +@pytest.mark.integration +@pytest.mark.summarizer +@pytest.mark.parametrize( + "retriever,document_store", [("embedding", "memory"), ("elasticsearch", "elasticsearch")], indirect=True +) +def test_summarization_pipeline_one_summary(document_store, retriever, summarizer): + document_store.write_documents(SPLIT_DOCS) + + if isinstance(retriever, EmbeddingRetriever) or isinstance(retriever, DensePassageRetriever): + document_store.update_embeddings(retriever=retriever) + + query = "Where is Eiffel Tower?" + pipeline = SearchSummarizationPipeline( + retriever=retriever, summarizer=summarizer, generate_single_summary=True, return_in_answer_format=True + ) + output = pipeline.run(query=query, params={"Retriever": {"top_k": 2}}) + answers = output["answers"] + assert len(answers) == 1 + assert answers[0]["answer"] in EXPECTED_ONE_SUMMARIES From 037cfb749d80b958cf42b432c489138caacf920e Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Mon, 31 Oct 2022 12:53:10 +0100 Subject: [PATCH 14/23] mypy, pylint fixes --- haystack/nodes/other/document_merger.py | 16 +++++++++------- haystack/pipelines/standard_pipelines.py | 14 ++++++-------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/haystack/nodes/other/document_merger.py b/haystack/nodes/other/document_merger.py index f27ec9325f..e568251279 100644 --- a/haystack/nodes/other/document_merger.py +++ b/haystack/nodes/other/document_merger.py @@ -1,4 +1,3 @@ -from collections import MutableMapping import logging from copy import deepcopy from typing import Optional, List, Dict, Union @@ -57,13 +56,16 @@ def run_batch( # type: ignore ): is_doclist_flat = isinstance(documents[0], Document) if is_doclist_flat: - flat_result: List[Document] = [] - flat_result = self.merge(documents=documents, separator=separator) + flat_result: List[Document] = self.merge( + documents=[doc for doc in documents if isinstance(doc, Document)], separator=separator + ) return {"documents": flat_result}, "output_1" else: - nested_result: List[List[Document]] = [] - for docs_group in documents: - nested_result.append(self.merge(documents=docs_group, separator=separator)) + nested_result: List[List[Document]] = [ + self.merge(documents=docs_lst, separator=separator) + for docs_lst in documents + if isinstance(docs_lst, list) + ] return {"documents": nested_result}, "output_1" def _extract_common_meta_dict(self, documents: List[Document]) -> dict: @@ -87,7 +89,7 @@ def _flatten_dict(self, d: dict, parent_key="") -> dict: items: List = [] for k, v in d.items(): new_key = (parent_key, k) if parent_key else k - if isinstance(v, MutableMapping): + if isinstance(v, dict): items.extend(self._flatten_dict(v, new_key).items()) else: items.append((new_key, v)) diff --git a/haystack/pipelines/standard_pipelines.py b/haystack/pipelines/standard_pipelines.py index 8a2d8e1e86..85b9520084 100644 --- a/haystack/pipelines/standard_pipelines.py +++ b/haystack/pipelines/standard_pipelines.py @@ -1,28 +1,26 @@ import logging from abc import ABC from copy import deepcopy -from pathlib import Path from functools import wraps -from typing import List, Optional, Dict, Any, Union -from haystack.nodes.base import BaseComponent +from pathlib import Path +from typing import Any, Dict, List, Optional, Union try: from typing import Literal except ImportError: from typing_extensions import Literal # type: ignore -from haystack.schema import Document, EvaluationResult, MultiLabel +from haystack.document_stores.base import BaseDocumentStore from haystack.nodes.answer_generator.base import BaseGenerator from haystack.nodes.other.docs2answers import Docs2Answers +from haystack.nodes.other.document_merger import DocumentMerger +from haystack.nodes.question_generator.question_generator import QuestionGenerator from haystack.nodes.reader.base import BaseReader from haystack.nodes.retriever.base import BaseRetriever from haystack.nodes.summarizer.base import BaseSummarizer from haystack.nodes.translator.base import BaseTranslator -from haystack.nodes.question_generator.question_generator import QuestionGenerator -from haystack.nodes.other.document_merger import DocumentMerger -from haystack.document_stores.base import BaseDocumentStore from haystack.pipelines.base import Pipeline - +from haystack.schema import Document, EvaluationResult, MultiLabel logger = logging.getLogger(__name__) From f45d4830200241eb41d22aa2ab46ce8bbf50d163 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Mon, 31 Oct 2022 13:02:15 +0100 Subject: [PATCH 15/23] use generator --- haystack/nodes/other/document_merger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/nodes/other/document_merger.py b/haystack/nodes/other/document_merger.py index e568251279..87d37d2009 100644 --- a/haystack/nodes/other/document_merger.py +++ b/haystack/nodes/other/document_merger.py @@ -31,7 +31,7 @@ def merge(self, documents: List[Document], separator: Optional[str] = None) -> L """ if len(documents) == 0: raise AttributeError("Document Merger needs at least one document to merge.") - if not all([doc.content_type == "text" for doc in documents]): + if not all(doc.content_type == "text" for doc in documents): raise AttributeError( "Some of the documents provided are non-textual. Document Merger only works on textual documents." ) From e25fb408b59504130d68da04b44276d39b99a5e2 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Mon, 31 Oct 2022 15:04:29 +0100 Subject: [PATCH 16/23] added test that will fail in 1.12 --- test/nodes/test_summarizer.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/test/nodes/test_summarizer.py b/test/nodes/test_summarizer.py index 70476f68ad..ad7cb5a0da 100644 --- a/test/nodes/test_summarizer.py +++ b/test/nodes/test_summarizer.py @@ -1,5 +1,7 @@ +from ensurepip import version import pytest +import haystack from haystack.schema import Document from haystack.pipelines import SearchSummarizationPipeline from haystack.nodes import DensePassageRetriever, EmbeddingRetriever @@ -84,6 +86,19 @@ def test_summarization_pipeline(document_store, retriever, summarizer): assert " The Eiffel Tower in Paris has officially opened its doors to the public." == answers[0]["answer"] +haystack_version = tuple(int(num) for num in haystack.__version__.split(".")[:2]) +fail_in_v1_12 = pytest.mark.xfail( + haystack_version >= (1, 12), + reason="'generate_single_summary' should be removed in v1.12, as it was deprecated in v1.10", +) + + +@fail_in_v1_12 +def test_generate_single_summary_deprecated(summarizer, documents=DOCS): + with pytest.raises(ValueError): + summarizer.predict(documents, generate_single_summary=True) + + # # Document Merger + Summarizer tests # From 9558c9e5db3cbf2781c00d333a4c87f956e73484 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Tue, 1 Nov 2022 11:01:20 +0100 Subject: [PATCH 17/23] adapt to review --- docs/_src/api/pydoc/other.yml | 2 +- haystack/nodes/other/document_merger.py | 70 ++++++++--------------- haystack/nodes/summarizer/transformers.py | 8 +-- test/nodes/test_document_merger.py | 2 +- test/nodes/test_summarizer.py | 1 - test/pipelines/test_eval_batch.py | 3 - 6 files changed, 30 insertions(+), 56 deletions(-) diff --git a/docs/_src/api/pydoc/other.yml b/docs/_src/api/pydoc/other.yml index d94eb69743..5bc079a3e6 100644 --- a/docs/_src/api/pydoc/other.yml +++ b/docs/_src/api/pydoc/other.yml @@ -1,7 +1,7 @@ loaders: - type: python search_path: [../../../../haystack/nodes/other] - modules: ['docs2answers', 'join_docs', 'join_answers', 'route_documents'] + modules: ['docs2answers', 'join_docs', 'join_answers', 'route_documents', 'document_merger'] ignore_when_discovered: ['__init__'] processors: - type: filter diff --git a/haystack/nodes/other/document_merger.py b/haystack/nodes/other/document_merger.py index 87d37d2009..4355680007 100644 --- a/haystack/nodes/other/document_merger.py +++ b/haystack/nodes/other/document_merger.py @@ -1,6 +1,6 @@ import logging from copy import deepcopy -from typing import Optional, List, Dict, Union +from typing import Optional, List, Dict, Union, Any from haystack.schema import Document from haystack.nodes.base import BaseComponent @@ -30,17 +30,16 @@ def merge(self, documents: List[Document], separator: Optional[str] = None) -> L :return: List of Documents """ if len(documents) == 0: - raise AttributeError("Document Merger needs at least one document to merge.") + raise ValueError("Document Merger needs at least one document to merge.") if not all(doc.content_type == "text" for doc in documents): - raise AttributeError( + raise ValueError( "Some of the documents provided are non-textual. Document Merger only works on textual documents." ) - if separator is None: - separator = self.separator + separator = separator if separator is not None else self.separator merged_content = separator.join([doc.content for doc in documents]) - common_meta = self._extract_common_meta_dict(documents) + common_meta = self._keep_common_keys([doc.meta for doc in documents]) merged_document = Document(content=merged_content, meta=common_meta) return [merged_document] @@ -68,43 +67,22 @@ def run_batch( # type: ignore ] return {"documents": nested_result}, "output_1" - def _extract_common_meta_dict(self, documents: List[Document]) -> dict: - """ - Given a list of documents, extract a dictionary containing the meta fields - that are common to all the documents - """ - flattened_meta = [self._flatten_dict(d.meta) for d in documents] - common_meta_flat_dict = deepcopy(flattened_meta[0]) - for doc in flattened_meta[1:]: - if len(common_meta_flat_dict) == 0: - break - for k, v in doc.items(): - if k in common_meta_flat_dict: - if common_meta_flat_dict[k] != v: - del common_meta_flat_dict[k] - common_meta_nested_dict = self._nest_dict(common_meta_flat_dict) - return common_meta_nested_dict - - def _flatten_dict(self, d: dict, parent_key="") -> dict: - items: List = [] - for k, v in d.items(): - new_key = (parent_key, k) if parent_key else k - if isinstance(v, dict): - items.extend(self._flatten_dict(v, new_key).items()) - else: - items.append((new_key, v)) - return dict(items) - - def _nest_dict(self, d: dict) -> dict: - nested_dict: dict = {} - for key, value in d.items(): - target = nested_dict - if isinstance(key, tuple): - for k in key[:-1]: # traverse all keys but the last - target = target.setdefault(k, {}) - target[key[-1]] = value - else: - target[key] = value - while any(isinstance(k, tuple) for k in nested_dict.keys()): - nested_dict = self._nest_dict(nested_dict) - return nested_dict + def _keep_common_keys(self, list_of_dicts: List[Dict[str, Any]]) -> dict: + merge_dictionary = deepcopy(list_of_dicts[0]) + for key, value in list_of_dicts[0].items(): + + # if not all other dicts have this key, delete directly + if not all(key in dict.keys() for dict in list_of_dicts): + del merge_dictionary[key] + + # if they all have it and it's a dictionary, merge recursively + elif isinstance(value, dict): + # Get all the subkeys to merge in a new list + list_of_subdicts = [dictionary[key] for dictionary in list_of_dicts] + merge_dictionary[key] = self._keep_common_keys(list_of_subdicts) + + # If all dicts have this key and it's not a dictionary, delete only if the values differ + elif not all(value == dict[key] for dict in list_of_dicts): + del merge_dictionary[key] + + return merge_dictionary diff --git a/haystack/nodes/summarizer/transformers.py b/haystack/nodes/summarizer/transformers.py index 322a5f3b3b..577800725e 100644 --- a/haystack/nodes/summarizer/transformers.py +++ b/haystack/nodes/summarizer/transformers.py @@ -101,8 +101,6 @@ def __init__( raise ValueError( "'generate_single_summary' has been removed. Instead, you can use the Document Merger to merge documents before applying the Summarizer." ) - self.separator_for_single_summary = separator_for_single_summary - self.generate_single_summary = generate_single_summary self.devices, _ = initialize_device_settings(devices=devices, use_cuda=use_gpu, multi_gpu=False) if len(self.devices) > 1: @@ -135,7 +133,8 @@ def predict(self, documents: List[Document], generate_single_summary: Optional[b These document can for example be retrieved via the Retriever. :param documents: Related documents (e.g. coming from a retriever) that the answer shall be conditioned on. - :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 + :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12. + To obtain single summaries from multiple documents, consider using the [DocumentMerger](https://docs.haystack.deepset.ai/docs/document_merger). :return: List of Documents, where Document.meta["summary"] contains the summarization """ if generate_single_summary is True: @@ -194,7 +193,8 @@ def predict_batch( :param documents: Single list of related documents or list of lists of related documents (e.g. coming from a retriever) that the answer shall be conditioned on. - :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 + :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12. + To obtain single summaries from multiple documents, consider using the [DocumentMerger](https://docs.haystack.deepset.ai/docs/document_merger). :param batch_size: Number of Documents to process at a time. """ if generate_single_summary is True: diff --git a/test/nodes/test_document_merger.py b/test/nodes/test_document_merger.py index 9a8ad78988..f9bac328f1 100644 --- a/test/nodes/test_document_merger.py +++ b/test/nodes/test_document_merger.py @@ -8,7 +8,7 @@ "year": "2020", "month": "01", "flat_field": 1, - "nested_field": {1: 2, "a": 5, "c": {"3": 3}}, + "nested_field": {1: 2, "a": 5, "c": {"3": 3}, "d": "I will be dropped by the meta merge algorithm"}, }, "content": "text_1", }, diff --git a/test/nodes/test_summarizer.py b/test/nodes/test_summarizer.py index ad7cb5a0da..14bcee9d92 100644 --- a/test/nodes/test_summarizer.py +++ b/test/nodes/test_summarizer.py @@ -109,7 +109,6 @@ def test_generate_single_summary_deprecated(summarizer, documents=DOCS): def test_summarization_one_summary(summarizer): dm = DocumentMerger() merged_document = dm.merge(documents=SPLIT_DOCS) - print(merged_document) summarized_docs = summarizer.predict(documents=merged_document) assert len(summarized_docs) == 1 assert EXPECTED_ONE_SUMMARIES[0] == summarized_docs[0].meta["summary"] diff --git a/test/pipelines/test_eval_batch.py b/test/pipelines/test_eval_batch.py index 66927a67dc..3b48402d2d 100644 --- a/test/pipelines/test_eval_batch.py +++ b/test/pipelines/test_eval_batch.py @@ -54,7 +54,6 @@ def test_generativeqa_calculate_metrics( @pytest.mark.parametrize("retriever_with_docs", ["embedding"], indirect=True) def test_summarizer_calculate_metrics(document_store_with_docs: ElasticsearchDocumentStore, retriever_with_docs): document_store_with_docs.update_embeddings(retriever=retriever_with_docs) - print(document_store_with_docs.get_all_documents()) summarizer = TransformersSummarizer(model_name_or_path="sshleifer/distill-pegasus-xsum-16-4", use_gpu=False) pipeline = SearchSummarizationPipeline( retriever=retriever_with_docs, summarizer=summarizer, return_in_answer_format=True @@ -63,8 +62,6 @@ def test_summarizer_calculate_metrics(document_store_with_docs: ElasticsearchDoc labels=EVAL_LABELS, params={"Retriever": {"top_k": 5}}, context_matching_min_length=10 ) - print(eval_result) - metrics = eval_result.calculate_metrics(document_scope="context") assert "Retriever" in eval_result From 0cc9c5a727d0ebc1410381f5f86d20c8ef106358 Mon Sep 17 00:00:00 2001 From: anakin87 <44616784+anakin87@users.noreply.github.com> Date: Tue, 1 Nov 2022 11:11:51 +0100 Subject: [PATCH 18/23] extended deprecation docstring --- haystack/nodes/summarizer/transformers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/haystack/nodes/summarizer/transformers.py b/haystack/nodes/summarizer/transformers.py index 577800725e..b6305cadaa 100644 --- a/haystack/nodes/summarizer/transformers.py +++ b/haystack/nodes/summarizer/transformers.py @@ -82,7 +82,8 @@ def __init__( :param use_gpu: Whether to use GPU (if available). :param clean_up_tokenization_spaces: Whether or not to clean up the potential extra spaces in the text output :param separator_for_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 - :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12 + :param generate_single_summary: This parameter is deprecated and will be removed in Haystack 1.12. + To obtain single summaries from multiple documents, consider using the [DocumentMerger](https://docs.haystack.deepset.ai/docs/document_merger). :param batch_size: Number of documents to process at a time. :param progress_bar: Whether to show a progress bar. :param use_auth_token: The API token used to download private models from Huggingface. From c9fa9884fee08a282bf8c093b8dacf79cc15950c Mon Sep 17 00:00:00 2001 From: Sara Zan Date: Thu, 3 Nov 2022 11:01:14 +0100 Subject: [PATCH 19/23] Update test/nodes/test_extractor_translation.py --- test/nodes/test_extractor_translation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/nodes/test_extractor_translation.py b/test/nodes/test_extractor_translation.py index 5d028f5e41..ea2733d02e 100644 --- a/test/nodes/test_extractor_translation.py +++ b/test/nodes/test_extractor_translation.py @@ -10,7 +10,7 @@ @pytest.mark.summarizer @pytest.mark.parametrize( "retriever,document_store,reader", - [("embedding", "memory", "farm"), ("elasticsearch", "elasticsearch", "farm")], + [("embedding", "memory", "farm")], indirect=True, ) def test_extractive_qa_pipeline_with_translator( From 435c81e15e0f4eed4fb66c1abf60a724762f88e0 Mon Sep 17 00:00:00 2001 From: Sara Zan Date: Thu, 3 Nov 2022 11:11:45 +0100 Subject: [PATCH 20/23] Update test/nodes/test_summarizer.py --- test/nodes/test_summarizer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/nodes/test_summarizer.py b/test/nodes/test_summarizer.py index 14bcee9d92..ceb1381905 100644 --- a/test/nodes/test_summarizer.py +++ b/test/nodes/test_summarizer.py @@ -94,7 +94,8 @@ def test_summarization_pipeline(document_store, retriever, summarizer): @fail_in_v1_12 -def test_generate_single_summary_deprecated(summarizer, documents=DOCS): +def test_generate_single_summary_deprecated(): + summarizer = TransformersSummarizer(model_name_or_path="hf-internal-testing/tiny-random-bart", use_gpu=False) with pytest.raises(ValueError): summarizer.predict(documents, generate_single_summary=True) From 78f213792a9e258ebd8ec945c1acbb3a7cc6299b Mon Sep 17 00:00:00 2001 From: Sara Zan Date: Thu, 3 Nov 2022 14:28:36 +0100 Subject: [PATCH 21/23] Update test/nodes/test_summarizer.py --- test/nodes/test_summarizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/nodes/test_summarizer.py b/test/nodes/test_summarizer.py index ceb1381905..c3f4107b29 100644 --- a/test/nodes/test_summarizer.py +++ b/test/nodes/test_summarizer.py @@ -4,7 +4,7 @@ import haystack from haystack.schema import Document from haystack.pipelines import SearchSummarizationPipeline -from haystack.nodes import DensePassageRetriever, EmbeddingRetriever +from haystack.nodes import DensePassageRetriever, EmbeddingRetriever, TransformersSummarizer from haystack.nodes.other.document_merger import DocumentMerger From 4ccf07d7231a419ca3badb5b244eff1630fbb85a Mon Sep 17 00:00:00 2001 From: ZanSara Date: Thu, 3 Nov 2022 14:39:37 +0100 Subject: [PATCH 22/23] black --- test/nodes/test_extractor_translation.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/test/nodes/test_extractor_translation.py b/test/nodes/test_extractor_translation.py index ea2733d02e..b1ad8a56e3 100644 --- a/test/nodes/test_extractor_translation.py +++ b/test/nodes/test_extractor_translation.py @@ -8,11 +8,7 @@ @pytest.mark.integration @pytest.mark.elasticsearch @pytest.mark.summarizer -@pytest.mark.parametrize( - "retriever,document_store,reader", - [("embedding", "memory", "farm")], - indirect=True, -) +@pytest.mark.parametrize("retriever,document_store,reader", [("embedding", "memory", "farm")], indirect=True) def test_extractive_qa_pipeline_with_translator( document_store, retriever, reader, en_to_de_translator, de_to_en_translator ): From ab8e6ba33bdaace7300a4d020a940460fd3e3711 Mon Sep 17 00:00:00 2001 From: Sara Zan Date: Thu, 3 Nov 2022 14:58:14 +0100 Subject: [PATCH 23/23] documents fixture --- test/nodes/test_summarizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/nodes/test_summarizer.py b/test/nodes/test_summarizer.py index c3f4107b29..ae27ed52ca 100644 --- a/test/nodes/test_summarizer.py +++ b/test/nodes/test_summarizer.py @@ -97,7 +97,7 @@ def test_summarization_pipeline(document_store, retriever, summarizer): def test_generate_single_summary_deprecated(): summarizer = TransformersSummarizer(model_name_or_path="hf-internal-testing/tiny-random-bart", use_gpu=False) with pytest.raises(ValueError): - summarizer.predict(documents, generate_single_summary=True) + summarizer.predict([Document(content="irrelevant")], generate_single_summary=True) #