From 624df6389280869244878888e449afbfb94ef754 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Mon, 15 Jul 2024 15:58:40 +0530 Subject: [PATCH 1/2] fix(bigquery/ingestor): handle quota exceeded for project.list requests --- .../source/bigquery_v2/bigquery_schema.py | 58 ++++++++++++------- .../tests/unit/test_bigquery_source.py | 33 +++++++---- 2 files changed, 60 insertions(+), 31 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 7bb9becfc9a0d0..2e5722ad27e62d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -155,7 +155,7 @@ def get_query_result(self, query: str) -> RowIterator: resp = self.bq_client.query(query) return resp.result() - def get_projects(self) -> List[BigqueryProject]: + def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]: def _should_retry(exc: BaseException) -> bool: logger.debug( f"Exception occured for project.list api. Reason: {exc}. Retrying api request..." @@ -163,26 +163,44 @@ def _should_retry(exc: BaseException) -> bool: self.report.num_list_projects_retry_request += 1 return True + page_token = None + projects: List[BigqueryProject] = [] with self.report.list_projects: - self.report.num_list_projects_api_requests += 1 - # Bigquery API has limit in calling project.list request i.e. 2 request per second. - # https://cloud.google.com/bigquery/quotas#api_request_quotas - # Whenever this limit reached an exception occur with msg - # 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.' - # Hence, added the api request retry of 15 min. - # We already tried adding rate_limit externally, proving max_result and page_size - # to restrict the request calls inside list_project but issue still occured. - projects_iterator = self.bq_client.list_projects( - retry=retry.Retry( - predicate=_should_retry, initial=10, maximum=180, timeout=900 - ) - ) - projects: List[BigqueryProject] = [ - BigqueryProject(id=p.project_id, name=p.friendly_name) - for p in projects_iterator - ] - self.report.num_listed_projects = len(projects) - return projects + while True: + try: + self.report.num_list_projects_api_requests += 1 + # Bigquery API has limit in calling project.list request i.e. 2 request per second. + # https://cloud.google.com/bigquery/quotas#api_request_quotas + # Whenever this limit reached an exception occur with msg + # 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.' + # Hence, added the api request retry of 15 min. + # We already tried adding rate_limit externally, proving max_result and page_size + # to restrict the request calls inside list_project but issue still occured. + projects_iterator = self.bq_client.list_projects( + max_results=max_results_per_page, + page_token=page_token, + timeout=900, + retry=retry.Retry( + predicate=_should_retry, + initial=10, + maximum=180, + multiplier=4, + timeout=900, + ), + ) + _projects: List[BigqueryProject] = [ + BigqueryProject(id=p.project_id, name=p.friendly_name) + for p in projects_iterator + ] + projects.extend(_projects) + self.report.num_list_projects = len(projects) + page_token = projects_iterator.next_page_token + if not page_token: + break + except Exception as e: + logger.error(f"Error getting projects. {e}", exc_info=True) + return [] + return projects def get_datasets_for_project_id( self, project_id: str, maxResults: Optional[int] = None diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index ea32db0ef27574..cadd1204dfdd93 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -278,24 +278,35 @@ def test_get_projects_with_single_project_id(get_bq_client_mock): def test_get_projects_by_list(get_bq_client_mock): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock - client_mock.list_projects.return_value = [ - SimpleNamespace( - project_id="test-1", - friendly_name="one", - ), - SimpleNamespace( - project_id="test-2", - friendly_name="two", - ), - ] + + first_page = MagicMock() + first_page.__iter__.return_value = iter( + [ + SimpleNamespace(project_id="test-1", friendly_name="one"), + SimpleNamespace(project_id="test-2", friendly_name="two"), + ] + ) + first_page.next_page_token = "token1" + + second_page = MagicMock() + second_page.__iter__.return_value = iter( + [ + SimpleNamespace(project_id="test-3", friendly_name="three"), + SimpleNamespace(project_id="test-4", friendly_name="four"), + ] + ) + second_page.next_page_token = None + client_mock.list_projects.side_effect = [first_page, second_page] config = BigQueryV2Config.parse_obj({}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) assert source._get_projects() == [ BigqueryProject("test-1", "one"), BigqueryProject("test-2", "two"), + BigqueryProject("test-3", "three"), + BigqueryProject("test-4", "four"), ] - assert client_mock.list_projects.call_count == 1 + assert client_mock.list_projects.call_count == 2 @patch.object(BigQuerySchemaApi, "get_projects") From 1ad90bbb35c0c84a7d429e9b1a185dcc1e5dad43 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Wed, 17 Jul 2024 15:54:12 +0530 Subject: [PATCH 2/2] fix: Resolve Conflict --- .../src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py | 2 +- metadata-ingestion/tests/unit/test_bigquery_source.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 2e5722ad27e62d..d73ac46c862ea1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -193,7 +193,7 @@ def _should_retry(exc: BaseException) -> bool: for p in projects_iterator ] projects.extend(_projects) - self.report.num_list_projects = len(projects) + self.report.num_listed_projects = len(projects) page_token = projects_iterator.next_page_token if not page_token: break diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index cadd1204dfdd93..746cf9b0acfc3e 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -358,7 +358,7 @@ def test_get_projects_list_failure( caplog.clear() with caplog.at_level(logging.ERROR): projects = source._get_projects() - assert len(caplog.records) == 1 + assert len(caplog.records) == 2 assert error_str in caplog.records[0].msg assert len(source.report.failures) == 1 assert projects == []