Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend metadata filtering support in ElasticsearchDocumentStore #2108

Merged
merged 31 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
afa5876
Add extended filtering to ESDocumentStore
bogdankostic Feb 1, 2022
1df8b40
Add Docstrings
bogdankostic Feb 1, 2022
73094cb
Fix definition of filter queries
bogdankostic Feb 2, 2022
fe9239b
Fix mypy
bogdankostic Feb 2, 2022
dc89bde
Add tests
bogdankostic Feb 2, 2022
8d1737a
Add latest docstring and tutorial changes
github-actions[bot] Feb 2, 2022
ff0a176
Adapt Docstrings
bogdankostic Feb 2, 2022
06796bc
Merge remote-tracking branch 'origin/master' into extend_es_filter
bogdankostic Feb 2, 2022
5443e4a
Merge remote-tracking branch 'origin/extend_es_filter' into extend_es…
bogdankostic Feb 2, 2022
00333c4
Adapt tests to added test_docs
bogdankostic Feb 2, 2022
1ea45dd
Adapt tests to added test_docs
bogdankostic Feb 3, 2022
e503a7e
Adapt tests to added test_docs
bogdankostic Feb 2, 2022
4d0f198
Merge remote-tracking branch 'origin/extend_es_filter' into extend_es…
bogdankostic Feb 3, 2022
fea6c87
Adapt tests to added test_docs
bogdankostic Feb 3, 2022
e8affd4
Add filtering utils for same representation in all doc stores
bogdankostic Feb 3, 2022
8f2332b
Apply balck formatting
bogdankostic Feb 3, 2022
1735c4e
Merge remote-tracking branch 'origin/master' into extend_es_filter
bogdankostic Feb 3, 2022
b4477fc
Update documentation
github-actions[bot] Feb 3, 2022
93ef74f
Fix mypy
bogdankostic Feb 3, 2022
5754574
Apply Black
github-actions[bot] Feb 3, 2022
9181fdd
Fix mypy
bogdankostic Feb 3, 2022
3da8eb9
Merge remote-tracking branch 'origin/extend_es_filter' into extend_es…
bogdankostic Feb 3, 2022
6a0c55e
Adopt Doc Strings
bogdankostic Feb 3, 2022
df91a6f
Add more tests
bogdankostic Feb 3, 2022
5c0723e
Apply Black
github-actions[bot] Feb 3, 2022
7f002b5
Allow filtering in OpenSearchDocStore
bogdankostic Feb 3, 2022
d89d5ca
Merge remote-tracking branch 'origin/extend_es_filter' into extend_es…
bogdankostic Feb 3, 2022
1d714b9
Update documentation
github-actions[bot] Feb 3, 2022
4da5ab4
Adapt Docstrings
bogdankostic Feb 4, 2022
5e329ec
Merge remote-tracking branch 'origin/extend_es_filter' into extend_es…
bogdankostic Feb 4, 2022
40c1993
Update documentation
github-actions[bot] Feb 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
402 changes: 375 additions & 27 deletions docs/_src/api/api/document_store.md

Large diffs are not rendered by default.

493 changes: 398 additions & 95 deletions haystack/document_stores/elasticsearch.py

Large diffs are not rendered by default.

293 changes: 293 additions & 0 deletions haystack/document_stores/filter_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
from typing import Union, List, Dict
from abc import ABC, abstractmethod
from collections import defaultdict


def nested_defaultdict():
bogdankostic marked this conversation as resolved.
Show resolved Hide resolved
"""
Data structure that recursively adds a dictionary as value if a key does not exist. Advantage: In nested dictionary
structures, we don't need to check if a key already exists (which can become hard to maintain in nested dictionaries
with many levels) but access the existing value if a key exists and create an empty dictionary if a key does not
exist.
"""
return defaultdict(nested_defaultdict)


class LogicalFilterClause(ABC):
"""
Class that is able to parse a filter and convert it to the format that the underlying databases of our
DocumentStores require.

Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical
operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, `"$gte"`, `"$lt"`,
`"$lte"`) or a metadata field name.
Logical operator keys take a dictionary of metadata field names and/or logical operators as
value. Metadata field names take a dictionary of comparison operators as value. Comparison
operator keys take a single value or (in case of `"$in"`) a list of values as value.
If no logical operator is provided, `"$and"` is used as default operation. If no comparison
operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default
operation.
Example:
```python
filters = {
"$and": {
"type": {"$eq": "article"},
"date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
"rating": {"$gte": 3},
"$or": {
"genre": {"$in": ["economy", "politics"]},
"publisher": {"$eq": "nytimes"}
}
}
}
# or simpler using default operators
filters = {
"type": "article",
"date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
"rating": {"$gte": 3},
"$or": {
"genre": ["economy", "politics"],
"publisher": "nytimes"
}
}
```

To use the same logical operator multiple times on the same level, logical operators take optionally a list of
dictionaries as value.

Example:
```python
filters = {
"$or": [
{
"$and": {
"Type": "News Paper",
"Date": {
"$lt": "2019-01-01"
}
}
},
{
"$and": {
"Type": "Blog Post",
"Date": {
"$gte": "2019-01-01"
}
}
}
]
}
```

"""

def __init__(self, conditions: List["LogicalFilterClause"]):
self.conditions = conditions

@classmethod
def parse(cls, filter_term: Union[dict, List[dict]]):
"""
Parses a filter dictionary/list and returns a LogicalFilterClause instance.

:param filter_term: Dictionary or list that contains the filter definition.
"""
conditions = []

if isinstance(filter_term, dict):
filter_term = [filter_term]
for item in filter_term:
for key, value in item.items():
if key == "$not":
conditions.append(NotOperation.parse(value))
elif key == "$and":
conditions.append(AndOperation.parse(value))
elif key == "$or":
conditions.append(OrOperation.parse(value))
# Key needs to be a metadata field
else:
conditions.extend(ComparisonOperation.parse(key, value))

if cls == LogicalFilterClause:
if len(conditions) == 1:
return conditions[0]
else:
return AndOperation(conditions)
else:
return cls(conditions)

@abstractmethod
def convert_to_elasticsearch(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably I missed this one, but where does this method get implemented? I saw it being called in the ElasticsearchDocumentStore...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method gets implemented in each of the LogicalOperation and FilterOperation classes, such that each operation is responsible for converting their part of the whole filter clause.

"""
Converts the LogicalFilterClause instance to an Elasticsearch filter.
"""
pass

def _merge_es_range_queries(self, conditions: List[Dict]) -> List[Dict]:
"""
Merges Elasticsearch range queries that perform on the same metadata field.
"""

range_conditions = [cond["range"] for cond in filter(lambda condition: "range" in condition, conditions)]
if range_conditions:
conditions = [condition for condition in conditions if "range" not in condition]
range_conditions_dict = nested_defaultdict()
for condition in range_conditions:
field_name = list(condition.keys())[0]
operation = list(condition[field_name].keys())[0]
comparison_value = condition[field_name][operation]
range_conditions_dict[field_name][operation] = comparison_value

for field_name, comparison_operations in range_conditions_dict.items():
conditions.append({"range": {field_name: comparison_operations}})

return conditions


class ComparisonOperation(ABC):
def __init__(self, field_name: str, comparison_value: Union[str, float, List]):
self.field_name = field_name
self.comparison_value = comparison_value

@classmethod
def parse(cls, field_name, comparison_clause: Union[Dict, List, str, float]):
comparison_operations: List[ComparisonOperation] = []

if isinstance(comparison_clause, dict):
for comparison_operation, comparison_value in comparison_clause.items():
if comparison_operation == "$eq":
comparison_operations.append(EqOperation(field_name, comparison_value))
elif comparison_operation == "$in":
comparison_operations.append(InOperation(field_name, comparison_value))
elif comparison_operation == "$ne":
comparison_operations.append(NeOperation(field_name, comparison_value))
elif comparison_operation == "$nin":
comparison_operations.append(NinOperation(field_name, comparison_value))
elif comparison_operation == "$gt":
comparison_operations.append(GtOperation(field_name, comparison_value))
elif comparison_operation == "$gte":
comparison_operations.append(GteOperation(field_name, comparison_value))
elif comparison_operation == "$lt":
comparison_operations.append(LtOperation(field_name, comparison_value))
elif comparison_operation == "$lte":
comparison_operations.append(LteOperation(field_name, comparison_value))

# No comparison operator is given, so we use the default operators "$in" if the comparison value is a list and
# "$eq" in every other case
elif isinstance(comparison_clause, list):
comparison_operations.append(InOperation(field_name, comparison_clause))
else:
comparison_operations.append((EqOperation(field_name, comparison_clause)))

return comparison_operations

@abstractmethod
def convert_to_elasticsearch(self):
"""
Converts the ComparisonOperation instance to an Elasticsearch query.
"""
pass


class NotOperation(LogicalFilterClause):
"""
Handles conversion of logical 'NOT' operations.
"""

def convert_to_elasticsearch(self):
conditions = [condition.convert_to_elasticsearch() for condition in self.conditions]
conditions = self._merge_es_range_queries(conditions)
return {"bool": {"must_not": conditions}}


class AndOperation(LogicalFilterClause):
"""
Handles conversion of logical 'AND' operations.
"""

def convert_to_elasticsearch(self):
conditions = [condition.convert_to_elasticsearch() for condition in self.conditions]
conditions = self._merge_es_range_queries(conditions)
return {"bool": {"must": conditions}}


class OrOperation(LogicalFilterClause):
"""
Handles conversion of logical 'OR' operations.
"""

def convert_to_elasticsearch(self):
conditions = [condition.convert_to_elasticsearch() for condition in self.conditions]
conditions = self._merge_es_range_queries(conditions)
return {"bool": {"should": conditions}}


class EqOperation(ComparisonOperation):
"""
Handles conversion of the '$eq' comparison operation.
"""

def convert_to_elasticsearch(self):
return {"term": {self.field_name: self.comparison_value}}


class InOperation(ComparisonOperation):
"""
Handles conversion of the '$in' comparison operation.
"""

def convert_to_elasticsearch(self):
return {"terms": {self.field_name: self.comparison_value}}


class NeOperation(ComparisonOperation):
"""
Handles conversion of the '$ne' comparison operation.
"""

def convert_to_elasticsearch(self):
return {"bool": {"must_not": {"term": {self.field_name: self.comparison_value}}}}


class NinOperation(ComparisonOperation):
"""
Handles conversion of the '$nin' comparison operation.
"""

def convert_to_elasticsearch(self):
return {"bool": {"must_not": {"terms": {self.field_name: self.comparison_value}}}}


class GtOperation(ComparisonOperation):
"""
Handles conversion of the '$gt' comparison operation.
"""

def convert_to_elasticsearch(self):
return {"range": {self.field_name: {"gt": self.comparison_value}}}


class GteOperation(ComparisonOperation):
"""
Handles conversion of the '$gte' comparison operation.
"""

def convert_to_elasticsearch(self):
return {"range": {self.field_name: {"gte": self.comparison_value}}}


class LtOperation(ComparisonOperation):
"""
Handles conversion of the '$lt' comparison operation.
"""

def convert_to_elasticsearch(self):
return {"range": {self.field_name: {"lt": self.comparison_value}}}


class LteOperation(ComparisonOperation):
"""
Handles conversion of the '$lte' comparison operation.
"""

def convert_to_elasticsearch(self):
return {"range": {self.field_name: {"lte": self.comparison_value}}}
42 changes: 31 additions & 11 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,6 @@ def pytest_collection_modifyitems(config, items):
item.add_marker(skip_docstore)


@pytest.fixture
def tmpdir(tmpdir):
"""
Makes pytest's tmpdir fixture fully compatible with pathlib
"""
return Path(tmpdir)


@pytest.fixture(scope="function", autouse=True)
def gc_cleanup(request):
"""
Expand Down Expand Up @@ -344,12 +336,30 @@ def de_to_en_translator():
def test_docs_xs():
return [
# current "dict" format for a document
{"content": "My name is Carla and I live in Berlin", "meta": {"meta_field": "test1", "name": "filename1"}},
{
"content": "My name is Carla and I live in Berlin",
"meta": {"meta_field": "test1", "name": "filename1", "date_field": "2020-03-01", "numeric_field": 5.5},
},
# metafield at the top level for backward compatibility
{"content": "My name is Paul and I live in New York", "meta_field": "test2", "name": "filename2"},
{
"content": "My name is Paul and I live in New York",
"meta_field": "test2",
"name": "filename2",
"date_field": "2019-10-01",
"numeric_field": 5,
},
# Document object for a doc
Document(
content="My name is Christelle and I live in Paris", meta={"meta_field": "test3", "name": "filename3"}
content="My name is Christelle and I live in Paris",
meta={"meta_field": "test3", "name": "filename3", "date_field": "2018-10-01", "numeric_field": 4.5},
),
Document(
content="My name is Camila and I live in Madrid",
meta={"meta_field": "test4", "name": "filename4", "date_field": "2021-02-01", "numeric_field": 3},
),
Document(
content="My name is Matteo and I live in Rome",
meta={"meta_field": "test5", "name": "filename5", "date_field": "2019-01-01", "numeric_field": 0},
),
]

Expand Down Expand Up @@ -543,6 +553,16 @@ def document_store_with_docs(request, test_docs_xs, tmp_path):
document_store = get_document_store(
document_store_type=request.param, embedding_dim=embedding_dim.args[0], tmp_path=tmp_path
)
# TODO: remove the following part once we allow numbers as metadatfield value in WeaviateDocumentStore
if request.param == "weaviate":
for doc in test_docs_xs:
if isinstance(doc, Document):
doc.meta["numeric_field"] = str(doc.meta["numeric_field"])
else:
if "meta" in doc:
doc["meta"]["numeric_field"] = str(doc["meta"]["numeric_field"])
else:
doc["numeric_field"] = str(doc["numeric_field"])
document_store.write_documents(test_docs_xs)
yield document_store
document_store.delete_documents()
Expand Down
Loading