Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add retry to gcp auth #28475

Merged
merged 51 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
c22b8c1
Update 2.50 release notes to include new Kafka topicPattern feature
johnjcasey Jul 5, 2023
f7cf5de
Merge remote-tracking branch 'origin/master'
johnjcasey Jul 24, 2023
ab68ecb
Merge remote-tracking branch 'origin/master'
johnjcasey Aug 9, 2023
40ad1d5
Merge remote-tracking branch 'origin/master'
johnjcasey Aug 31, 2023
6c9c28d
Create groovy class for io performance tests
johnjcasey Aug 31, 2023
520c9d1
delete unnecessary class
johnjcasey Aug 31, 2023
062de23
fix env call
johnjcasey Aug 31, 2023
01fc25b
Merge pull request #181 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
9c9f86b
fix call to gradle
johnjcasey Aug 31, 2023
92306fa
Merge pull request #182 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
925ce55
run on hosted runner for testing
johnjcasey Aug 31, 2023
2dcfb70
Merge pull request #183 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
117ef8b
add additional checkout
johnjcasey Aug 31, 2023
1f73cda
Merge pull request #184 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
cb6e01b
add destination for triggered tests
johnjcasey Aug 31, 2023
a9e86aa
Merge pull request #185 from johnjcasey/feature/automate-performance-…
johnjcasey Aug 31, 2023
8ea6c51
move env variables to correct location
johnjcasey Sep 1, 2023
d8822d7
Merge pull request #186 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
320a4cc
try uploading against separate dataset
johnjcasey Sep 1, 2023
e89b59e
Merge pull request #187 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
1cd4e55
try without a user
johnjcasey Sep 1, 2023
4473f17
Merge pull request #188 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 1, 2023
4fc5b8e
update branch checkout, try to view the failure log
johnjcasey Sep 5, 2023
706650d
Merge pull request #189 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
59069f2
run on failure
johnjcasey Sep 5, 2023
7f79b62
Merge pull request #190 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
6f51976
update to use correct BigQuery instance
johnjcasey Sep 5, 2023
e95d920
Merge pull request #191 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
df716cb
convert to matrix
johnjcasey Sep 5, 2023
4d8eded
Merge pull request #192 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
4bf0826
add result reporting
johnjcasey Sep 5, 2023
403f054
Merge pull request #193 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
d40d04b
add failure clause
johnjcasey Sep 5, 2023
aca4b2e
Merge pull request #194 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 5, 2023
2739e92
remove failure clause, update to run on self-hosted
johnjcasey Sep 5, 2023
bd6efeb
address comments, clean up build
johnjcasey Sep 6, 2023
226a655
clarify branching
johnjcasey Sep 6, 2023
9c7286b
Merge pull request #195 from johnjcasey/feature/automate-performance-…
johnjcasey Sep 6, 2023
c63f112
Update auth to retry getting credentials from GCE
johnjcasey Sep 7, 2023
5b1b2c2
Merge branch 'apache:master' into master
johnjcasey Sep 15, 2023
864ce98
Merge remote-tracking branch 'origin/master' into feature/add-retry-t…
johnjcasey Sep 15, 2023
1c51395
Re-order imports
johnjcasey Sep 15, 2023
1bacada
Add test case
johnjcasey Sep 15, 2023
8eca7d0
Update exception log
johnjcasey Sep 19, 2023
59310a2
Add failure test
johnjcasey Sep 19, 2023
0da1faa
Update removal of retrying method
johnjcasey Sep 20, 2023
e7e66f2
rework via mock
johnjcasey Sep 20, 2023
a54cd14
Clear credentials cache for idempotent tests
johnjcasey Sep 25, 2023
aceae65
Remove handler after test
johnjcasey Sep 26, 2023
9bd54a7
Change retry timeout to facilitate shorter retrys for anonymous acces…
johnjcasey Sep 26, 2023
e4a9746
reset credentials before and after test
johnjcasey Sep 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions sdks/python/apache_beam/internal/gcp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils import retry

# google.auth is only available when Beam is installed with the gcp extra.
try:
Expand Down Expand Up @@ -149,8 +150,7 @@ def _get_service_credentials(pipeline_options):

try:
# pylint: disable=c-extension-no-member
credentials, _ = google.auth.default(
scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
credentials = _Credentials._get_credentials_with_retrys(pipeline_options)
credentials = _Credentials._add_impersonation_credentials(
credentials, pipeline_options)
credentials = _ApitoolsCredentialsAdapter(credentials)
Expand All @@ -161,10 +161,18 @@ def _get_service_credentials(pipeline_options):
except Exception as e:
_LOGGER.warning(
'Unable to find default credentials to use: %s\n'
'Connecting anonymously.',
'Connecting anonymously. This is expected if no '
'credentials are needed to access GCP resources.',
e)
return None

@staticmethod
@retry.with_exponential_backoff(num_retries=4, initial_delay_secs=2)
def _get_credentials_with_retrys(pipeline_options):
credentials, _ = google.auth.default(
scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
return credentials

@staticmethod
def _add_impersonation_credentials(credentials, pipeline_options):
gcs_options = pipeline_options.view_as(GoogleCloudOptions)
Expand Down
135 changes: 135 additions & 0 deletions sdks/python/apache_beam/internal/gcp/auth_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import unittest

import mock

from apache_beam.internal.gcp import auth
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions

try:
import google.auth as gauth
except ImportError:
gauth = None


class MockLoggingHandler(logging.Handler):
"""Mock logging handler to check for expected logs."""
def __init__(self, *args, **kwargs):
self.reset()
logging.Handler.__init__(self, *args, **kwargs)

def emit(self, record):
self.messages[record.levelname.lower()].append(record.getMessage())

def reset(self):
self.messages = {
'debug': [],
'info': [],
'warning': [],
'error': [],
'critical': [],
}


@unittest.skipIf(gauth is None, 'Google Auth dependencies are not installed')
class AuthTest(unittest.TestCase):
@mock.patch('google.auth.default')
def test_auth_with_retrys(self, unused_mock_arg):
pipeline_options = PipelineOptions()
pipeline_options.view_as(
GoogleCloudOptions).impersonate_service_account = False

credentials = ('creds', 1)

self.is_called = False

def side_effect(scopes=None):
if self.is_called:
return credentials
else:
self.is_called = True
raise IOError('Failed')

google_auth_mock = mock.MagicMock()
gauth.default = google_auth_mock
google_auth_mock.side_effect = side_effect

# _Credentials caches the actual credentials.
# This resets it for idempotent tests.
if auth._Credentials._credentials_init:
auth._Credentials._credentials_init = False
auth._Credentials._credentials = None

returned_credentials = auth.get_service_credentials(pipeline_options)

# _Credentials caches the actual credentials.
# This resets it for idempotent tests.
if auth._Credentials._credentials_init:
auth._Credentials._credentials_init = False
auth._Credentials._credentials = None

self.assertEqual('creds', returned_credentials._google_auth_credentials)
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved

@mock.patch(
'apache_beam.internal.gcp.auth._Credentials._get_credentials_with_retrys')
def test_auth_with_retrys_always_fail(self, unused_mock_arg):
pipeline_options = PipelineOptions()
pipeline_options.view_as(
GoogleCloudOptions).impersonate_service_account = False

loggerHandler = MockLoggingHandler()

auth._LOGGER.addHandler(loggerHandler)

#Remove call to retrying method, as otherwise test takes ~10 minutes to run
def raise_(scopes=None):
raise IOError('Failed')

retry_auth_mock = mock.MagicMock()
auth._Credentials._get_credentials_with_retrys = retry_auth_mock
retry_auth_mock.side_effect = raise_

# _Credentials caches the actual credentials.
# This resets it for idempotent tests.
if auth._Credentials._credentials_init:
auth._Credentials._credentials_init = False
auth._Credentials._credentials = None

returned_credentials = auth.get_service_credentials(pipeline_options)

self.assertEqual(None, returned_credentials)
self.assertEqual([
'Unable to find default credentials to use: Failed\n'
'Connecting anonymously. This is expected if no credentials are '
'needed to access GCP resources.'
],
loggerHandler.messages.get('warning'))

# _Credentials caches the actual credentials.
# This resets it for idempotent tests.
if auth._Credentials._credentials_init:
auth._Credentials._credentials_init = False
auth._Credentials._credentials = None

auth._LOGGER.removeHandler(loggerHandler)


if __name__ == '__main__':
unittest.main()