Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support BigQuery OAuth using a refresh token and client secrets #2805

Merged
merged 4 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Added schema and dbt versions to JSON artifacts ([#2670](https://github.com/fishtown-analytics/dbt/issues/2670), [#2767](https://github.com/fishtown-analytics/dbt/pull/2767))
- Added ability to snapshot hard-deleted records (opt-in with `invalidate_hard_deletes` config option). ([#249](https://github.com/fishtown-analytics/dbt/issues/249), [#2749](https://github.com/fishtown-analytics/dbt/pull/2749))
- Improved error messages for YAML selectors ([#2700](https://github.com/fishtown-analytics/dbt/issues/2700), [#2781](https://github.com/fishtown-analytics/dbt/pull/2781))
- Added support for BigQuery connections using refresh tokens ([#2344](https://github.com/fishtown-analytics/dbt/issues/2344), [#2805](https://github.com/fishtown-analytics/dbt/pull/2805))

### Under the hood
- Added strategy-specific validation to improve the relevancy of compilation errors for the `timestamp` and `check` snapshot strategies. (([#2787](https://github.com/fishtown-analytics/dbt/issues/2787), [#2791](https://github.com/fishtown-analytics/dbt/pull/2791))
Expand Down
36 changes: 30 additions & 6 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import google.cloud.exceptions
from google.api_core import retry, client_info
from google.auth import impersonated_credentials
from google.oauth2 import service_account
from google.oauth2 import (
credentials as GoogleCredentials,
service_account as GoogleServiceAccountCredentials
)

from dbt.utils import format_bytes, format_rows_number
from dbt.clients import agate_helper, gcloud
Expand Down Expand Up @@ -50,19 +53,29 @@ class BigQueryConnectionMethod(StrEnum):
OAUTH = 'oauth'
SERVICE_ACCOUNT = 'service-account'
SERVICE_ACCOUNT_JSON = 'service-account-json'
BEARER = 'bearer'


@dataclass
class BigQueryCredentials(Credentials):
method: BigQueryConnectionMethod
keyfile: Optional[str] = None
keyfile_json: Optional[Dict[str, Any]] = None
timeout_seconds: Optional[int] = 300
location: Optional[str] = None
priority: Optional[Priority] = None
retries: Optional[int] = 1
maximum_bytes_billed: Optional[int] = None
impersonate_service_account: Optional[str] = None

# Keyfile json creds
keyfile: Optional[str] = None
keyfile_json: Optional[Dict[str, Any]] = None

# Bearer token creds
refresh_token: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
token_uri: Optional[str] = None

_ALIASES = {
'project': 'database',
'dataset': 'schema',
Expand Down Expand Up @@ -110,12 +123,13 @@ def exception_handler(self, sql):
message = "Access denied while running query"
self.handle_error(e, message)

except google.auth.exceptions.RefreshError:
except google.auth.exceptions.RefreshError as e:
message = "Unable to generate access token, if you're using " \
"impersonate_service_account, make sure your " \
'initial account has the "roles/' \
'iam.serviceAccountTokenCreator" role on the ' \
'account you are trying to impersonate.'
'account you are trying to impersonate.\n\n' \
f'{str(e)}'
raise RuntimeException(message)

except Exception as e:
Expand Down Expand Up @@ -151,7 +165,7 @@ def commit(self):
@classmethod
def get_bigquery_credentials(cls, profile_credentials):
method = profile_credentials.method
creds = service_account.Credentials
creds = GoogleServiceAccountCredentials.Credentials

if method == BigQueryConnectionMethod.OAUTH:
credentials, project_id = google.auth.default(scopes=cls.SCOPE)
Expand All @@ -165,6 +179,16 @@ def get_bigquery_credentials(cls, profile_credentials):
details = profile_credentials.keyfile_json
return creds.from_service_account_info(details, scopes=cls.SCOPE)

elif method == BigQueryConnectionMethod.BEARER:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtcohen6 do you have any thoughts / opinions on what we call this auth method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind bearer as long as it reflects what BQ calls this (right?). I prefer it to oauth-secrets

Copy link
Contributor Author

@drewbanin drewbanin Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ it's actually different than the method described in that link, which is kind of why i do not want to call this bearer! Bearer auth actually feels more to me like the change described in #2802 :D

After some more thought, I think we should roll with refresh-token on this one

Copy link
Contributor

@jtcohen6 jtcohen6 Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QED. Let's call this something else. oauth-refresh-token?

Edit: refresh-token also works, we both got there

return GoogleCredentials.Credentials(
token=None,
refresh_token=profile_credentials.refresh_token,
client_id=profile_credentials.client_id,
client_secret=profile_credentials.client_secret,
token_uri=profile_credentials.token_uri,
scopes=cls.SCOPE
)

error = ('Invalid `method` in profile: "{}"'.format(method))
raise FailedToConnectException(error)

Expand Down
31 changes: 31 additions & 0 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ def setUp(self):
'threads': 1,
'impersonate_service_account': '[email protected]'
},
'oauth-credentials': {
'type': 'bigquery',
'method': 'bearer',
'client_id': 'abc',
'client_secret': 'def',
'refresh_token': 'ghi',
'token_uri': 'jkl',
'project': 'dbt-unit-000000',
'schema': 'dummy_schema',
'threads': 1,
'location': 'Luna Station',
'priority': 'batch',
'maximum_bytes_billed': 0,
},
},
'target': 'oauth',
}
Expand Down Expand Up @@ -145,6 +159,23 @@ def test_acquire_connection_service_account_validations(self, mock_open_connecti
connection.handle
mock_open_connection.assert_called_once()

@patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
def test_acquire_connection_service_account_validations(self, mock_open_connection):
adapter = self.get_adapter('oauth-credentials')
try:
connection = adapter.acquire_connection('dummy')
self.assertEqual(connection.type, 'bigquery')

except dbt.exceptions.ValidationException as e:
self.fail('got ValidationException: {}'.format(str(e)))

except BaseException as e:
raise

mock_open_connection.assert_not_called()
connection.handle
mock_open_connection.assert_called_once()

@patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
def test_acquire_connection_impersonated_service_account_validations(self, mock_open_connection):
adapter = self.get_adapter('impersonate')
Expand Down