Skip to content

Commit

Permalink
[enrich/demographics] Fix fetching authors with min and max dates
Browse files Browse the repository at this point in the history
This code fixes fetching all authors with min and max dates.
Currently, ElasticSearch and OpenSearch use `composite` aggregation
to paginate all buckets.

The old query only returns 10000 items and also returns the
following warning message:

```
Deprecation: This aggregation creates too many buckets (10001) and
will throw an error in future versions. You should update the
[search.max_buckets] cluster setting or use the [composite]
aggregation to paginate all buckets in multiple requests.
```

Tests added acconrdingly.

Signed-off-by: Quan Zhou <[email protected]>
  • Loading branch information
zhquan committed Jun 3, 2022
1 parent a7743b1 commit d6db745
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 28 deletions.
107 changes: 79 additions & 28 deletions grimoire_elk/enriched/enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -1879,29 +1879,13 @@ def run_demography(self, date_field, author_field, log_prefix, contribution_type
In case there is no specific contribution type, by default all contributions will be considered.
"""
# The first step is to find the current min and max date for all the authors
authors_min_max_data = {}

es_query = Enrich.authors_min_max_dates(date_field,
author_field=author_field,
contribution_type=contribution_type)
r = self.requests.post(self.elastic.index_url + "/_search",
data=es_query, headers=HEADER_JSON,
verify=False)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as ex:
logger.error("{} error getting authors mix and max date. Aborted.".format(log_prefix))
logger.error(ex)
return

for author in r.json()['aggregations']['author']['buckets']:
authors_min_max_data[author['key']] = author
authors_min_max_data = self.fetch_authors_min_max_dates(log_prefix, author_field, contribution_type, date_field)

# Then we update the min max dates of all authors
for author_key in authors_min_max_data:
author_min_date = authors_min_max_data[author_key]['min']['value_as_string']
author_max_date = authors_min_max_data[author_key]['max']['value_as_string']

for author in authors_min_max_data:
author_min_date = author['min']['value_as_string']
author_max_date = author['max']['value_as_string']
author_key = author['key']['author_uuid']
field_name = contribution_type if contribution_type else 'demography'
es_update = Enrich.update_author_min_max_date(author_min_date, author_max_date,
author_key, field_name, author_field=author_field)
Expand All @@ -1927,6 +1911,45 @@ def run_demography(self, date_field, author_field, log_prefix, contribution_type
logger.error(ex)
return

def fetch_authors_min_max_dates(self, log_prefix, author_field, contribution_type, date_field):
""" Fetch all authors with their first and last date of activity.
:param log_prefix: log prefix used on logger.
:param author_field: field of the author.
:param contribution_type: name of the contribution type (if any) which the dates are computed for.
In case there is no specific contribution type, by default all contributions will be considered.
:param date_field: field used to find the mix and max dates for the author's activity.
:return: dictionary of authors with min and max dates.
"""
after = None

while True:
es_query = Enrich.authors_min_max_dates(date_field,
author_field=author_field,
contribution_type=contribution_type,
after=after)
r = self.requests.post(self.elastic.index_url + "/_search",
data=es_query, headers=HEADER_JSON,
verify=False)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as ex:
logger.error("{} error getting authors mix and max date. Aborted.".format(log_prefix))
logger.error(ex)
return

aggregations_author = r.json()['aggregations']['author']

# When there are no more elements, it will return an empty list of buckets
if not aggregations_author['buckets']:
return

after = aggregations_author['after_key'][author_field]

for author in aggregations_author['buckets']:
yield author

def check_version_conflicts(self, es_update, version_conflicts, log_prefix, max_retries=5):
"""
Check if there are version conflicts within a query response and retries the request.
Expand Down Expand Up @@ -1956,20 +1979,32 @@ def check_version_conflicts(self, es_update, version_conflicts, log_prefix, max_
self.check_version_conflicts(es_update, r.json()['version_conflicts'], log_prefix, max_retries=retries)

@staticmethod
def authors_min_max_dates(date_field, author_field="author_uuid", contribution_type=None):
def authors_min_max_dates(date_field, author_field="author_uuid", contribution_type=None, after=None):
"""
Get the aggregation of author with their min and max activity dates
:param date_field: field used to find the mix and max dates for the author's activity
:param author_field: field of the author
:param contribution_type: name of the contribution type (if any) which the dates are computed for.
In case there is no specific contribution type, by default all contributions will be considered.
:param after: value used for pagination
:return: the query to be executed to get the authors min and max aggregation data
"""

# Limit aggregations: https://github.com/elastic/elasticsearch/issues/18838
# 30000 seems to be a sensible number of the number of people in git
# Limit aggregations:
# - OpenSearch: 10000
# - https://opensearch.org/docs/latest/opensearch/bucket-agg/
# - ElasticSearch: 10000
# - https://discuss.elastic.co/t/increasing-max-buckets-for-specific-visualizations/187390/4
# - When you try to fetch more than 10000 it will return this error message:
# {
# "type": "too_many_buckets_exception",
# "reason": "Trying to create too many buckets. Must be less than or equal to: [10000] but was [20000].
# This limit can be set by changing the [search.max_buckets] cluster level setting.",
# "max_buckets": 10000
# }

query_type = ""
if contribution_type:
query_type = """"query": {
Expand All @@ -1981,15 +2016,31 @@ def authors_min_max_dates(date_field, author_field="author_uuid", contribution_t
}
}
},""" % contribution_type

query_after = ""
if after:
query_after = """"after": {
"%s": "%s"
},""" % (author_field, after)

es_query = """
{
"size": 0,
%s
"aggs": {
"author": {
"terms": {
"field": "%s",
"size": 30000
"composite": {
"sources": [
{
"%s": {
"terms": {
"field": "%s"
}
}
}
],
%s
"size": 10000
},
"aggs": {
"min": {
Expand All @@ -2006,7 +2057,7 @@ def authors_min_max_dates(date_field, author_field="author_uuid", contribution_t
}
}
}
""" % (query_type, author_field, date_field, date_field)
""" % (query_type, author_field, author_field, query_after, date_field, date_field)

return es_query

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
title: Fix fetching authors with min and max dates
category: fixed
author: Quan Zhou <[email protected]>
issue: null
notes: >
The old query only returns 10000 items due to ElasticSearch and
OpenSearch now use `composite` aggregation to paginate all buckets.
55 changes: 55 additions & 0 deletions tests/data/author_min_max_dates_1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 10000,
"relation" : "gte"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"author" : {
"after_key" : {
"author_uuid" : "007a56d0322c518859dde2a0c6ed9143fa141c61"
},
"buckets" : [
{
"key" : {
"author_uuid" : "00032fabbbf033467d7bd307df81b654c0fa53d8"
},
"doc_count" : 1,
"min" : {
"value" : 1.623225379E12,
"value_as_string" : "2021-06-09T07:56:19.000Z"
},
"max" : {
"value" : 1.623225379E12,
"value_as_string" : "2021-06-09T07:56:19.000Z"
}
},
{
"key" : {
"author_uuid" : "007a56d0322c518859dde2a0c6ed9143fa141c61"
},
"doc_count" : 1,
"min" : {
"value" : 1.626183289E12,
"value_as_string" : "2021-07-13T13:34:49.000Z"
},
"max" : {
"value" : 1.626183289E12,
"value_as_string" : "2021-07-13T13:34:49.000Z"
}
}
]
}
}
}
55 changes: 55 additions & 0 deletions tests/data/author_min_max_dates_2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 4,
"successful" : 4,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 10000,
"relation" : "gte"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"author" : {
"after_key" : {
"author_uuid" : "00d36515f739794b941586e5d0a102b5ff3a0cc2"
},
"buckets" : [
{
"key" : {
"author_uuid" : "00cc95a5950523a42c969f15c7c36c4530417f13"
},
"doc_count" : 1,
"min" : {
"value" : 1.474160034E12,
"value_as_string" : "2016-09-18T00:53:54.000Z"
},
"max" : {
"value" : 1.474160034E12,
"value_as_string" : "2016-09-18T00:53:54.000Z"
}
},
{
"key" : {
"author_uuid" : "00d36515f739794b941586e5d0a102b5ff3a0cc2"
},
"doc_count" : 1,
"min" : {
"value" : 1.526521972E12,
"value_as_string" : "2018-05-17T01:52:52.000Z"
},
"max" : {
"value" : 1.526521972E12,
"value_as_string" : "2018-05-17T01:52:52.000Z"
}
}
]
}
}
}
20 changes: 20 additions & 0 deletions tests/data/author_min_max_dates_empty.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 4,
"successful" : 4,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 4,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"author" : {
"buckets" : [ ]
}
}
}
Loading

0 comments on commit d6db745

Please sign in to comment.