Skip to content

Commit

Permalink
fix(ingest/bigquery): handle quota exceeded for project.list requests (
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware authored Jul 17, 2024
1 parent 11c5c3e commit ec788df
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,34 +155,52 @@ def get_query_result(self, query: str) -> RowIterator:
resp = self.bq_client.query(query)
return resp.result()

def get_projects(self) -> List[BigqueryProject]:
def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]:
def _should_retry(exc: BaseException) -> bool:
logger.debug(
f"Exception occured for project.list api. Reason: {exc}. Retrying api request..."
)
self.report.num_list_projects_retry_request += 1
return True

page_token = None
projects: List[BigqueryProject] = []
with self.report.list_projects:
self.report.num_list_projects_api_requests += 1
# Bigquery API has limit in calling project.list request i.e. 2 request per second.
# https://cloud.google.com/bigquery/quotas#api_request_quotas
# Whenever this limit reached an exception occur with msg
# 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.'
# Hence, added the api request retry of 15 min.
# We already tried adding rate_limit externally, proving max_result and page_size
# to restrict the request calls inside list_project but issue still occured.
projects_iterator = self.bq_client.list_projects(
retry=retry.Retry(
predicate=_should_retry, initial=10, maximum=180, timeout=900
)
)
projects: List[BigqueryProject] = [
BigqueryProject(id=p.project_id, name=p.friendly_name)
for p in projects_iterator
]
self.report.num_listed_projects = len(projects)
return projects
while True:
try:
self.report.num_list_projects_api_requests += 1
# Bigquery API has limit in calling project.list request i.e. 2 request per second.
# https://cloud.google.com/bigquery/quotas#api_request_quotas
# Whenever this limit reached an exception occur with msg
# 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.'
# Hence, added the api request retry of 15 min.
# We already tried adding rate_limit externally, proving max_result and page_size
# to restrict the request calls inside list_project but issue still occured.
projects_iterator = self.bq_client.list_projects(
max_results=max_results_per_page,
page_token=page_token,
timeout=900,
retry=retry.Retry(
predicate=_should_retry,
initial=10,
maximum=180,
multiplier=4,
timeout=900,
),
)
_projects: List[BigqueryProject] = [
BigqueryProject(id=p.project_id, name=p.friendly_name)
for p in projects_iterator
]
projects.extend(_projects)
self.report.num_listed_projects = len(projects)
page_token = projects_iterator.next_page_token
if not page_token:
break
except Exception as e:
logger.error(f"Error getting projects. {e}", exc_info=True)
return []
return projects

def get_datasets_for_project_id(
self, project_id: str, maxResults: Optional[int] = None
Expand Down
35 changes: 23 additions & 12 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,24 +278,35 @@ def test_get_projects_with_single_project_id(get_bq_client_mock):
def test_get_projects_by_list(get_bq_client_mock):
client_mock = MagicMock()
get_bq_client_mock.return_value = client_mock
client_mock.list_projects.return_value = [
SimpleNamespace(
project_id="test-1",
friendly_name="one",
),
SimpleNamespace(
project_id="test-2",
friendly_name="two",
),
]

first_page = MagicMock()
first_page.__iter__.return_value = iter(
[
SimpleNamespace(project_id="test-1", friendly_name="one"),
SimpleNamespace(project_id="test-2", friendly_name="two"),
]
)
first_page.next_page_token = "token1"

second_page = MagicMock()
second_page.__iter__.return_value = iter(
[
SimpleNamespace(project_id="test-3", friendly_name="three"),
SimpleNamespace(project_id="test-4", friendly_name="four"),
]
)
second_page.next_page_token = None
client_mock.list_projects.side_effect = [first_page, second_page]

config = BigQueryV2Config.parse_obj({})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1"))
assert source._get_projects() == [
BigqueryProject("test-1", "one"),
BigqueryProject("test-2", "two"),
BigqueryProject("test-3", "three"),
BigqueryProject("test-4", "four"),
]
assert client_mock.list_projects.call_count == 1
assert client_mock.list_projects.call_count == 2


@patch.object(BigQuerySchemaApi, "get_projects")
Expand Down Expand Up @@ -347,7 +358,7 @@ def test_get_projects_list_failure(
caplog.clear()
with caplog.at_level(logging.ERROR):
projects = source._get_projects()
assert len(caplog.records) == 1
assert len(caplog.records) == 2
assert error_str in caplog.records[0].msg
assert len(source.report.failures) == 1
assert projects == []
Expand Down

0 comments on commit ec788df

Please sign in to comment.