diff --git a/grimoire_elk/enriched/enrich.py b/grimoire_elk/enriched/enrich.py index 7c816fd4b..5bf2bedaa 100644 --- a/grimoire_elk/enriched/enrich.py +++ b/grimoire_elk/enriched/enrich.py @@ -1879,29 +1879,13 @@ def run_demography(self, date_field, author_field, log_prefix, contribution_type In case there is no specific contribution type, by default all contributions will be considered. """ # The first step is to find the current min and max date for all the authors - authors_min_max_data = {} - - es_query = Enrich.authors_min_max_dates(date_field, - author_field=author_field, - contribution_type=contribution_type) - r = self.requests.post(self.elastic.index_url + "/_search", - data=es_query, headers=HEADER_JSON, - verify=False) - try: - r.raise_for_status() - except requests.exceptions.HTTPError as ex: - logger.error("{} error getting authors mix and max date. Aborted.".format(log_prefix)) - logger.error(ex) - return - - for author in r.json()['aggregations']['author']['buckets']: - authors_min_max_data[author['key']] = author + authors_min_max_data = self.fetch_authors_min_max_dates(log_prefix, author_field, contribution_type, date_field) # Then we update the min max dates of all authors - for author_key in authors_min_max_data: - author_min_date = authors_min_max_data[author_key]['min']['value_as_string'] - author_max_date = authors_min_max_data[author_key]['max']['value_as_string'] - + for author in authors_min_max_data: + author_min_date = author['min']['value_as_string'] + author_max_date = author['max']['value_as_string'] + author_key = author['key']['author_uuid'] field_name = contribution_type if contribution_type else 'demography' es_update = Enrich.update_author_min_max_date(author_min_date, author_max_date, author_key, field_name, author_field=author_field) @@ -1927,6 +1911,45 @@ def run_demography(self, date_field, author_field, log_prefix, contribution_type logger.error(ex) return + def fetch_authors_min_max_dates(self, log_prefix, author_field, contribution_type, date_field): + """ Fetch all authors with their first and last date of activity. + + :param log_prefix: log prefix used on logger. + :param author_field: field of the author. + :param contribution_type: name of the contribution type (if any) which the dates are computed for. + In case there is no specific contribution type, by default all contributions will be considered. + :param date_field: field used to find the mix and max dates for the author's activity. + + :return: dictionary of authors with min and max dates. + """ + after = None + + while True: + es_query = Enrich.authors_min_max_dates(date_field, + author_field=author_field, + contribution_type=contribution_type, + after=after) + r = self.requests.post(self.elastic.index_url + "/_search", + data=es_query, headers=HEADER_JSON, + verify=False) + try: + r.raise_for_status() + except requests.exceptions.HTTPError as ex: + logger.error("{} error getting authors mix and max date. Aborted.".format(log_prefix)) + logger.error(ex) + return + + aggregations_author = r.json()['aggregations']['author'] + + # When there are no more elements, it will return an empty list of buckets + if not aggregations_author['buckets']: + return + + after = aggregations_author['after_key'][author_field] + + for author in aggregations_author['buckets']: + yield author + def check_version_conflicts(self, es_update, version_conflicts, log_prefix, max_retries=5): """ Check if there are version conflicts within a query response and retries the request. @@ -1956,7 +1979,7 @@ def check_version_conflicts(self, es_update, version_conflicts, log_prefix, max_ self.check_version_conflicts(es_update, r.json()['version_conflicts'], log_prefix, max_retries=retries) @staticmethod - def authors_min_max_dates(date_field, author_field="author_uuid", contribution_type=None): + def authors_min_max_dates(date_field, author_field="author_uuid", contribution_type=None, after=None): """ Get the aggregation of author with their min and max activity dates @@ -1964,12 +1987,24 @@ def authors_min_max_dates(date_field, author_field="author_uuid", contribution_t :param author_field: field of the author :param contribution_type: name of the contribution type (if any) which the dates are computed for. In case there is no specific contribution type, by default all contributions will be considered. + :param after: value used for pagination :return: the query to be executed to get the authors min and max aggregation data """ - # Limit aggregations: https://github.com/elastic/elasticsearch/issues/18838 - # 30000 seems to be a sensible number of the number of people in git + # Limit aggregations: + # - OpenSearch: 10000 + # - https://opensearch.org/docs/latest/opensearch/bucket-agg/ + # - ElasticSearch: 10000 + # - https://discuss.elastic.co/t/increasing-max-buckets-for-specific-visualizations/187390/4 + # - When you try to fetch more than 10000 it will return this error message: + # { + # "type": "too_many_buckets_exception", + # "reason": "Trying to create too many buckets. Must be less than or equal to: [10000] but was [20000]. + # This limit can be set by changing the [search.max_buckets] cluster level setting.", + # "max_buckets": 10000 + # } + query_type = "" if contribution_type: query_type = """"query": { @@ -1981,15 +2016,31 @@ def authors_min_max_dates(date_field, author_field="author_uuid", contribution_t } } },""" % contribution_type + + query_after = "" + if after: + query_after = """"after": { + "%s": "%s" + },""" % (author_field, after) + es_query = """ { "size": 0, %s "aggs": { "author": { - "terms": { - "field": "%s", - "size": 30000 + "composite": { + "sources": [ + { + "%s": { + "terms": { + "field": "%s" + } + } + } + ], + %s + "size": 10000 }, "aggs": { "min": { @@ -2006,7 +2057,7 @@ def authors_min_max_dates(date_field, author_field="author_uuid", contribution_t } } } - """ % (query_type, author_field, date_field, date_field) + """ % (query_type, author_field, author_field, query_after, date_field, date_field) return es_query diff --git a/releases/unreleased/fix-fetching-authors-with-min-and-max-dates.yml b/releases/unreleased/fix-fetching-authors-with-min-and-max-dates.yml new file mode 100644 index 000000000..1402cb42f --- /dev/null +++ b/releases/unreleased/fix-fetching-authors-with-min-and-max-dates.yml @@ -0,0 +1,8 @@ +--- +title: Fix fetching authors with min and max dates +category: fixed +author: Quan Zhou +issue: null +notes: > + The old query only returns 10000 items due to ElasticSearch and + OpenSearch now use `composite` aggregation to paginate all buckets. diff --git a/tests/data/author_min_max_dates_1.json b/tests/data/author_min_max_dates_1.json new file mode 100644 index 000000000..ebf29ac47 --- /dev/null +++ b/tests/data/author_min_max_dates_1.json @@ -0,0 +1,55 @@ +{ + "took" : 2, + "timed_out" : false, + "_shards" : { + "total" : 2, + "successful" : 2, + "skipped" : 0, + "failed" : 0 + }, + "hits" : { + "total" : { + "value" : 10000, + "relation" : "gte" + }, + "max_score" : null, + "hits" : [ ] + }, + "aggregations" : { + "author" : { + "after_key" : { + "author_uuid" : "007a56d0322c518859dde2a0c6ed9143fa141c61" + }, + "buckets" : [ + { + "key" : { + "author_uuid" : "00032fabbbf033467d7bd307df81b654c0fa53d8" + }, + "doc_count" : 1, + "min" : { + "value" : 1.623225379E12, + "value_as_string" : "2021-06-09T07:56:19.000Z" + }, + "max" : { + "value" : 1.623225379E12, + "value_as_string" : "2021-06-09T07:56:19.000Z" + } + }, + { + "key" : { + "author_uuid" : "007a56d0322c518859dde2a0c6ed9143fa141c61" + }, + "doc_count" : 1, + "min" : { + "value" : 1.626183289E12, + "value_as_string" : "2021-07-13T13:34:49.000Z" + }, + "max" : { + "value" : 1.626183289E12, + "value_as_string" : "2021-07-13T13:34:49.000Z" + } + } + ] + } + } +} diff --git a/tests/data/author_min_max_dates_2.json b/tests/data/author_min_max_dates_2.json new file mode 100644 index 000000000..a372b8630 --- /dev/null +++ b/tests/data/author_min_max_dates_2.json @@ -0,0 +1,55 @@ +{ + "took" : 4, + "timed_out" : false, + "_shards" : { + "total" : 4, + "successful" : 4, + "skipped" : 0, + "failed" : 0 + }, + "hits" : { + "total" : { + "value" : 10000, + "relation" : "gte" + }, + "max_score" : null, + "hits" : [ ] + }, + "aggregations" : { + "author" : { + "after_key" : { + "author_uuid" : "00d36515f739794b941586e5d0a102b5ff3a0cc2" + }, + "buckets" : [ + { + "key" : { + "author_uuid" : "00cc95a5950523a42c969f15c7c36c4530417f13" + }, + "doc_count" : 1, + "min" : { + "value" : 1.474160034E12, + "value_as_string" : "2016-09-18T00:53:54.000Z" + }, + "max" : { + "value" : 1.474160034E12, + "value_as_string" : "2016-09-18T00:53:54.000Z" + } + }, + { + "key" : { + "author_uuid" : "00d36515f739794b941586e5d0a102b5ff3a0cc2" + }, + "doc_count" : 1, + "min" : { + "value" : 1.526521972E12, + "value_as_string" : "2018-05-17T01:52:52.000Z" + }, + "max" : { + "value" : 1.526521972E12, + "value_as_string" : "2018-05-17T01:52:52.000Z" + } + } + ] + } + } +} diff --git a/tests/data/author_min_max_dates_empty.json b/tests/data/author_min_max_dates_empty.json new file mode 100644 index 000000000..a62a29702 --- /dev/null +++ b/tests/data/author_min_max_dates_empty.json @@ -0,0 +1,20 @@ +{ + "took" : 4, + "timed_out" : false, + "_shards" : { + "total" : 4, + "successful" : 4, + "skipped" : 0, + "failed" : 0 + }, + "hits" : { + "total" : 4, + "max_score" : 0.0, + "hits" : [ ] + }, + "aggregations" : { + "author" : { + "buckets" : [ ] + } + } +} diff --git a/tests/test_enrich.py b/tests/test_enrich.py index b9ff85a5d..65e001f8a 100644 --- a/tests/test_enrich.py +++ b/tests/test_enrich.py @@ -20,6 +20,9 @@ # import configparser +import json + +import httpretty import requests import sys import unittest @@ -39,10 +42,56 @@ CONFIG_FILE = 'tests.conf' +def setup_http_server(url, not_handle_status_code=False): + """Setup a mock HTTP server""" + + http_requests = [] + + body_content_1 = read_file("data/author_min_max_dates_1.json") + body_content_2 = read_file("data/author_min_max_dates_2.json") + body_content_empty = read_file("data/author_min_max_dates_empty.json") + + def request_callback(method, uri, headers): + + status_code = 200 + composite = method.parsed_body['aggs']['author']['composite'] + if "after" in composite: + if composite["after"]["author_uuid"] == "007a56d0322c518859dde2a0c6ed9143fa141c61": + body = body_content_2 + else: + body = body_content_empty + else: + body = body_content_1 + http_requests.append(httpretty.last_request()) + + return status_code, headers, body + + httpretty.register_uri(httpretty.POST, + url, + match_querystring=True, + responses=[ + httpretty.Response(body=request_callback) + ]) + + return http_requests + + +def read_file(filename): + with open(filename) as f: + return f.read() + + class TestEnrich(unittest.TestCase): def setUp(self): + config = configparser.ConfigParser() + config.read(CONFIG_FILE) + es_con = dict(config.items('ElasticSearch'))['url'] + git_enriched = "git_enriched" + enrich_backend = get_connectors()["git"][2]() + elastic_enrich = get_elastic(es_con, git_enriched, True, enrich_backend) self._enrich = Enrich() + self._enrich.elastic = elastic_enrich self.empty_item = { "author_id": "", @@ -704,6 +753,131 @@ def test_copy_raw_fields(self): self.assertEqual(eitem['uuid'], expected['uuid']) self.assertEqual(eitem['extra'], expected['extra']) + def test_authors_min_max_dates(self): + expected_es_query = """ + { + "size": 0, + "aggs": { + "author": { + "composite": { + "sources": [ + { + "author_uuid": { + "terms": { + "field": "author_uuid" + } + } + } + ], + "size": 10000 + }, + "aggs": { + "min": { + "min": { + "field": "grimoire_creation_date" + } + }, + "max": { + "max": { + "field": "grimoire_creation_date" + } + } + } + } + } + } + """ + es_query = self._enrich.authors_min_max_dates("grimoire_creation_date", + author_field="author_uuid", + contribution_type=None, + after=None) + self.assertDictEqual(json.loads(es_query), json.loads(expected_es_query)) + + expected_es_query = """ + { + "size": 0, + "aggs": { + "author": { + "composite": { + "sources": [ + { + "author_uuid": { + "terms": { + "field": "author_uuid" + } + } + } + ], + "after": { + "author_uuid": "uuid" + }, + "size": 10000 + }, + "aggs": { + "min": { + "min": { + "field": "grimoire_creation_date" + } + }, + "max": { + "max": { + "field": "grimoire_creation_date" + } + } + } + } + } + } + """ + es_query = self._enrich.authors_min_max_dates("grimoire_creation_date", + author_field="author_uuid", + contribution_type=None, + after="uuid") + self.assertDictEqual(json.loads(es_query), json.loads(expected_es_query)) + + @httpretty.activate + def test_fetch_authors_min_max_dates(self): + + es_search_url = "{}/_search".format(self._enrich.elastic.index_url) + _ = setup_http_server(es_search_url) + + log_prefix = "[git] Demography" + author_field = "author_uuid" + date_field = "grimoire_creation_date" + + expected = [ + { + 'key': {'author_uuid': '00032fabbbf033467d7bd307df81b654c0fa53d8'}, + 'doc_count': 1, + 'min': {'value': 1623225379000.0, 'value_as_string': '2021-06-09T07:56:19.000Z'}, + 'max': {'value': 1623225379000.0, 'value_as_string': '2021-06-09T07:56:19.000Z'} + }, + { + 'key': {'author_uuid': '007a56d0322c518859dde2a0c6ed9143fa141c61'}, + 'doc_count': 1, + 'min': {'value': 1626183289000.0, 'value_as_string': '2021-07-13T13:34:49.000Z'}, + 'max': {'value': 1626183289000.0, 'value_as_string': '2021-07-13T13:34:49.000Z'} + }, + { + 'key': {'author_uuid': '00cc95a5950523a42c969f15c7c36c4530417f13'}, + 'doc_count': 1, + 'min': {'value': 1474160034000.0, 'value_as_string': '2016-09-18T00:53:54.000Z'}, + 'max': {'value': 1474160034000.0, 'value_as_string': '2016-09-18T00:53:54.000Z'} + }, + { + 'key': {'author_uuid': '00d36515f739794b941586e5d0a102b5ff3a0cc2'}, + 'doc_count': 1, + 'min': {'value': 1526521972000.0, 'value_as_string': '2018-05-17T01:52:52.000Z'}, + 'max': {'value': 1526521972000.0, 'value_as_string': '2018-05-17T01:52:52.000Z'} + } + ] + authors_min_max_data = self._enrich.fetch_authors_min_max_dates(log_prefix, author_field, + None, date_field) + all_authors = [] + for author_key in authors_min_max_data: + all_authors.append(author_key) + self.assertListEqual(all_authors, expected) + if __name__ == '__main__': unittest.main()