Skip to content

Commit

Permalink
Feature/add retry to gcp auth (#28475)
Browse files Browse the repository at this point in the history
* Update 2.50 release notes to include new Kafka topicPattern feature

* Create groovy class for io performance tests
Create gradle task and github actions config for GCS using this.

* delete unnecessary class

* fix env call

* fix call to gradle

* run on hosted runner for testing

* add additional checkout

* add destination for triggered tests

* move env variables to correct location

* try uploading against separate dataset

* try without a user

* update branch checkout, try to view the failure log

* run on failure

* update to use correct BigQuery instance

* convert to matrix

* add result reporting

* add failure clause

* remove failure clause, update to run on self-hosted

* address comments, clean up build

* clarify branching

* Update auth to retry getting credentials from GCE

* Re-order imports

* Add test case

* Update exception log

* Add failure test

* Update removal of retrying method

* rework via mock

* Clear credentials cache for idempotent tests

* Remove handler after test
Change retry timeout to facilitate shorter retrys for anonymous access cases

* Change retry timeout to facilitate shorter retrys for anonymous access cases

* reset credentials before and after test
  • Loading branch information
johnjcasey authored Sep 27, 2023
1 parent 5c4bea2 commit 725a2d6
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 3 deletions.
14 changes: 11 additions & 3 deletions sdks/python/apache_beam/internal/gcp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils import retry

# google.auth is only available when Beam is installed with the gcp extra.
try:
Expand Down Expand Up @@ -152,8 +153,7 @@ def _get_service_credentials(pipeline_options):

try:
# pylint: disable=c-extension-no-member
credentials, _ = google.auth.default(
scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
credentials = _Credentials._get_credentials_with_retrys(pipeline_options)
credentials = _Credentials._add_impersonation_credentials(
credentials, pipeline_options)
credentials = _ApitoolsCredentialsAdapter(credentials)
Expand All @@ -164,10 +164,18 @@ def _get_service_credentials(pipeline_options):
except Exception as e:
_LOGGER.warning(
'Unable to find default credentials to use: %s\n'
'Connecting anonymously.',
'Connecting anonymously. This is expected if no '
'credentials are needed to access GCP resources.',
e)
return None

@staticmethod
@retry.with_exponential_backoff(num_retries=4, initial_delay_secs=2)
def _get_credentials_with_retrys(pipeline_options):
credentials, _ = google.auth.default(
scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
return credentials

@staticmethod
def _add_impersonation_credentials(credentials, pipeline_options):
gcs_options = pipeline_options.view_as(GoogleCloudOptions)
Expand Down
135 changes: 135 additions & 0 deletions sdks/python/apache_beam/internal/gcp/auth_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import unittest

import mock

from apache_beam.internal.gcp import auth
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions

try:
import google.auth as gauth
except ImportError:
gauth = None


class MockLoggingHandler(logging.Handler):
"""Mock logging handler to check for expected logs."""
def __init__(self, *args, **kwargs):
self.reset()
logging.Handler.__init__(self, *args, **kwargs)

def emit(self, record):
self.messages[record.levelname.lower()].append(record.getMessage())

def reset(self):
self.messages = {
'debug': [],
'info': [],
'warning': [],
'error': [],
'critical': [],
}


@unittest.skipIf(gauth is None, 'Google Auth dependencies are not installed')
class AuthTest(unittest.TestCase):
@mock.patch('google.auth.default')
def test_auth_with_retrys(self, unused_mock_arg):
pipeline_options = PipelineOptions()
pipeline_options.view_as(
GoogleCloudOptions).impersonate_service_account = False

credentials = ('creds', 1)

self.is_called = False

def side_effect(scopes=None):
if self.is_called:
return credentials
else:
self.is_called = True
raise IOError('Failed')

google_auth_mock = mock.MagicMock()
gauth.default = google_auth_mock
google_auth_mock.side_effect = side_effect

# _Credentials caches the actual credentials.
# This resets it for idempotent tests.
if auth._Credentials._credentials_init:
auth._Credentials._credentials_init = False
auth._Credentials._credentials = None

returned_credentials = auth.get_service_credentials(pipeline_options)

# _Credentials caches the actual credentials.
# This resets it for idempotent tests.
if auth._Credentials._credentials_init:
auth._Credentials._credentials_init = False
auth._Credentials._credentials = None

self.assertEqual('creds', returned_credentials._google_auth_credentials)

@mock.patch(
'apache_beam.internal.gcp.auth._Credentials._get_credentials_with_retrys')
def test_auth_with_retrys_always_fail(self, unused_mock_arg):
pipeline_options = PipelineOptions()
pipeline_options.view_as(
GoogleCloudOptions).impersonate_service_account = False

loggerHandler = MockLoggingHandler()

auth._LOGGER.addHandler(loggerHandler)

#Remove call to retrying method, as otherwise test takes ~10 minutes to run
def raise_(scopes=None):
raise IOError('Failed')

retry_auth_mock = mock.MagicMock()
auth._Credentials._get_credentials_with_retrys = retry_auth_mock
retry_auth_mock.side_effect = raise_

# _Credentials caches the actual credentials.
# This resets it for idempotent tests.
if auth._Credentials._credentials_init:
auth._Credentials._credentials_init = False
auth._Credentials._credentials = None

returned_credentials = auth.get_service_credentials(pipeline_options)

self.assertEqual(None, returned_credentials)
self.assertEqual([
'Unable to find default credentials to use: Failed\n'
'Connecting anonymously. This is expected if no credentials are '
'needed to access GCP resources.'
],
loggerHandler.messages.get('warning'))

# _Credentials caches the actual credentials.
# This resets it for idempotent tests.
if auth._Credentials._credentials_init:
auth._Credentials._credentials_init = False
auth._Credentials._credentials = None

auth._LOGGER.removeHandler(loggerHandler)


if __name__ == '__main__':
unittest.main()

0 comments on commit 725a2d6

Please sign in to comment.