Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support BigQuery OAuth using a refresh token and client secrets #2805

Merged
merged 4 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Save manifest at the same time we save the run_results at the end of a run ([#2765](https://github.com/fishtown-analytics/dbt/issues/2765), [#2799](https://github.com/fishtown-analytics/dbt/pull/2799))
- Added dbt_invocation_id for each BigQuery job to enable performance analysis ([#2808](https://github.com/fishtown-analytics/dbt/issues/2808), [#2809](https://github.com/fishtown-analytics/dbt/pull/2809))
- Save cli and rpc arguments in run_results.json ([#2510](https://github.com/fishtown-analytics/dbt/issues/2510), [#2813](https://github.com/fishtown-analytics/dbt/pull/2813))
- Added support for BigQuery connections using refresh tokens ([#2344](https://github.com/fishtown-analytics/dbt/issues/2344), [#2805](https://github.com/fishtown-analytics/dbt/pull/2805))

### Under the hood
- Added strategy-specific validation to improve the relevancy of compilation errors for the `timestamp` and `check` snapshot strategies. (([#2787](https://github.com/fishtown-analytics/dbt/issues/2787), [#2791](https://github.com/fishtown-analytics/dbt/pull/2791))
Expand Down
37 changes: 31 additions & 6 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import google.cloud.exceptions
from google.api_core import retry, client_info
from google.auth import impersonated_credentials
from google.oauth2 import service_account
from google.oauth2 import (
credentials as GoogleCredentials,
service_account as GoogleServiceAccountCredentials
)

from dbt.utils import format_bytes, format_rows_number
from dbt.clients import agate_helper, gcloud
Expand Down Expand Up @@ -51,19 +54,30 @@ class BigQueryConnectionMethod(StrEnum):
OAUTH = 'oauth'
SERVICE_ACCOUNT = 'service-account'
SERVICE_ACCOUNT_JSON = 'service-account-json'
OAUTH_SECRETS = 'oauth-secrets'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you going to rename this to refresh-token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i decided not to! I actually updated this PR to also accept an access token directly via the token configuration. So, it's kind of just a grab bag of oauth auth methods. Google lets you supply:

  • a client id / client secret / refresh token / token uri OR
  • a standalone access token

The access token is going to expire after ~60 mins (not well defined), but it was a one-line change and feels like a reasonable enough use case to support. You buy it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha — does that mean that this resolves #2802 as well?



@dataclass
class BigQueryCredentials(Credentials):
method: BigQueryConnectionMethod
keyfile: Optional[str] = None
keyfile_json: Optional[Dict[str, Any]] = None
timeout_seconds: Optional[int] = 300
location: Optional[str] = None
priority: Optional[Priority] = None
retries: Optional[int] = 1
maximum_bytes_billed: Optional[int] = None
impersonate_service_account: Optional[str] = None

# Keyfile json creds
keyfile: Optional[str] = None
keyfile_json: Optional[Dict[str, Any]] = None

# oauth-secrets
token: Optional[str] = None
refresh_token: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
token_uri: Optional[str] = None

_ALIASES = {
'project': 'database',
'dataset': 'schema',
Expand Down Expand Up @@ -111,12 +125,13 @@ def exception_handler(self, sql):
message = "Access denied while running query"
self.handle_error(e, message)

except google.auth.exceptions.RefreshError:
except google.auth.exceptions.RefreshError as e:
message = "Unable to generate access token, if you're using " \
"impersonate_service_account, make sure your " \
'initial account has the "roles/' \
'iam.serviceAccountTokenCreator" role on the ' \
'account you are trying to impersonate.'
'account you are trying to impersonate.\n\n' \
f'{str(e)}'
raise RuntimeException(message)

except Exception as e:
Expand Down Expand Up @@ -152,7 +167,7 @@ def commit(self):
@classmethod
def get_bigquery_credentials(cls, profile_credentials):
method = profile_credentials.method
creds = service_account.Credentials
creds = GoogleServiceAccountCredentials.Credentials

if method == BigQueryConnectionMethod.OAUTH:
credentials, project_id = google.auth.default(scopes=cls.SCOPE)
Expand All @@ -166,6 +181,16 @@ def get_bigquery_credentials(cls, profile_credentials):
details = profile_credentials.keyfile_json
return creds.from_service_account_info(details, scopes=cls.SCOPE)

elif method == BigQueryConnectionMethod.OAUTH_SECRETS:
return GoogleCredentials.Credentials(
token=profile_credentials.token,
refresh_token=profile_credentials.refresh_token,
client_id=profile_credentials.client_id,
client_secret=profile_credentials.client_secret,
token_uri=profile_credentials.token_uri,
scopes=cls.SCOPE
)

error = ('Invalid `method` in profile: "{}"'.format(method))
raise FailedToConnectException(error)

Expand Down
59 changes: 59 additions & 0 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,31 @@ def setUp(self):
'threads': 1,
'impersonate_service_account': '[email protected]'
},
'oauth-credentials-token': {
'type': 'bigquery',
'method': 'oauth-secrets',
'token': 'abc',
'project': 'dbt-unit-000000',
'schema': 'dummy_schema',
'threads': 1,
'location': 'Luna Station',
'priority': 'batch',
'maximum_bytes_billed': 0,
},
'oauth-credentials': {
'type': 'bigquery',
'method': 'oauth-secrets',
'client_id': 'abc',
'client_secret': 'def',
'refresh_token': 'ghi',
'token_uri': 'jkl',
'project': 'dbt-unit-000000',
'schema': 'dummy_schema',
'threads': 1,
'location': 'Luna Station',
'priority': 'batch',
'maximum_bytes_billed': 0,
},
},
'target': 'oauth',
}
Expand Down Expand Up @@ -145,6 +170,40 @@ def test_acquire_connection_service_account_validations(self, mock_open_connecti
connection.handle
mock_open_connection.assert_called_once()

@patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
def test_acquire_connection_oauth_token_validations(self, mock_open_connection):
adapter = self.get_adapter('oauth-credentials-token')
try:
connection = adapter.acquire_connection('dummy')
self.assertEqual(connection.type, 'bigquery')

except dbt.exceptions.ValidationException as e:
self.fail('got ValidationException: {}'.format(str(e)))

except BaseException as e:
raise

mock_open_connection.assert_not_called()
connection.handle
mock_open_connection.assert_called_once()

@patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
def test_acquire_connection_oauth_credentials_validations(self, mock_open_connection):
adapter = self.get_adapter('oauth-credentials')
try:
connection = adapter.acquire_connection('dummy')
self.assertEqual(connection.type, 'bigquery')

except dbt.exceptions.ValidationException as e:
self.fail('got ValidationException: {}'.format(str(e)))

except BaseException as e:
raise

mock_open_connection.assert_not_called()
connection.handle
mock_open_connection.assert_called_once()

@patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
def test_acquire_connection_impersonated_service_account_validations(self, mock_open_connection):
adapter = self.get_adapter('impersonate')
Expand Down