Skip to content

Commit

Permalink
feat: Add Pluggable auth support (#988)
Browse files Browse the repository at this point in the history
* Port identity pool credentials

* access_token retrieved

* -> pluggable

* Update pluggable.py

* Create test_pluggable.py

* Unit tests

* Address pr issues
  • Loading branch information
Chuan Ren authored Mar 1, 2022
1 parent 818e6d2 commit 63ad5e9
Show file tree
Hide file tree
Showing 3 changed files with 790 additions and 0 deletions.
6 changes: 6 additions & 0 deletions google/auth/_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ def _get_external_account_credentials(
credentials = aws.Credentials.from_info(
info, scopes=scopes, default_scopes=default_scopes
)
elif info.get("credential_source").get("executable") is not None:
from google.auth import pluggable

credentials = pluggable.Credentials.from_info(
info, scopes=scopes, default_scopes=default_scopes
)
else:
try:
# Check if configuration corresponds to an Identity Pool credentials.
Expand Down
273 changes: 273 additions & 0 deletions google/auth/pluggable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Pluggable Credentials.
This module provides credentials to access Google Cloud resources from on-prem
or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
tokens) retrieved from local file locations or local servers. This includes
Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with
Hub with Hub workload identity enabled).
These credentials are recommended over the use of service account credentials
in on-prem/non-Google Cloud platforms as they do not involve the management of
long-live service account private keys.
Pluggable Credentials are initialized using external_account arguments which
are typically loaded from third-party executables. Unlike other
credentials that can be initialized with a list of explicit arguments, secrets
or credentials, external account clients use the environment and hints/guidelines
provided by the external_account JSON file to retrieve credentials and exchange
them for Google access tokens.
"""

try:
from collections.abc import Mapping
# Python 2.7 compatibility
except ImportError: # pragma: NO COVER
from collections import Mapping
import io
import json
import os
import subprocess

from google.auth import _helpers
from google.auth import exceptions
from google.auth import external_account

# External account JSON type identifier.
EXECUTABLE_SUPPORTED_MAX_VERSION = 1

class Credentials(external_account.Credentials):
"""External account credentials sourced from executables."""

def __init__(
self,
audience,
subject_token_type,
token_url,
credential_source,
service_account_impersonation_url=None,
client_id=None,
client_secret=None,
quota_project_id=None,
scopes=None,
default_scopes=None,
workforce_pool_user_project=None,
):
"""Instantiates an external account credentials object from a executables.
Args:
audience (str): The STS audience field.
subject_token_type (str): The subject token type.
token_url (str): The STS endpoint URL.
credential_source (Mapping): The credential source dictionary used to
provide instructions on how to retrieve external credential to be
exchanged for Google access tokens.
Example credential_source for pluggable credential::
{
"executable": {
"command": "/path/to/get/credentials.sh --arg1=value1 --arg2=value2",
"timeout_millis": 5000,
"output_file": "/path/to/generated/cached/credentials"
}
}
service_account_impersonation_url (Optional[str]): The optional service account
impersonation getAccessToken URL.
client_id (Optional[str]): The optional client ID.
client_secret (Optional[str]): The optional client secret.
quota_project_id (Optional[str]): The optional quota project ID.
scopes (Optional[Sequence[str]]): Optional scopes to request during the
authorization grant.
default_scopes (Optional[Sequence[str]]): Default scopes passed by a
Google client library. Use 'scopes' for user-defined scopes.
workforce_pool_user_project (Optona[str]): The optional workforce pool user
project number when the credential corresponds to a workforce pool and not
a workload Pluggable. The underlying principal must still have
serviceusage.services.use IAM permission to use the project for
billing/quota.
Raises:
google.auth.exceptions.RefreshError: If an error is encountered during
access token retrieval logic.
ValueError: For invalid parameters.
.. note:: Typically one of the helper constructors
:meth:`from_file` or
:meth:`from_info` are used instead of calling the constructor directly.
"""

super(Credentials, self).__init__(
audience=audience,
subject_token_type=subject_token_type,
token_url=token_url,
credential_source=credential_source,
service_account_impersonation_url=service_account_impersonation_url,
client_id=client_id,
client_secret=client_secret,
quota_project_id=quota_project_id,
scopes=scopes,
default_scopes=default_scopes,
workforce_pool_user_project=workforce_pool_user_project,
)
if not isinstance(credential_source, Mapping):
self._credential_source_executable = None
raise ValueError(
"Missing credential_source. The credential_source is not a dict."
)
else:
self._credential_source_executable = credential_source.get("executable")
if not self._credential_source_executable:
raise ValueError(
"Missing credential_source. An 'executable' must be provided."
)
self._credential_source_executable_command = self._credential_source_executable.get("command")
self._credential_source_executable_timeout_millis = self._credential_source_executable.get("timeout_millis")
self._credential_source_executable_output_file = self._credential_source_executable.get("output_file")

# environment_id is only supported in AWS or dedicated future external
# account credentials.
if "environment_id" in credential_source:
raise ValueError(
"Invalid Pluggable credential_source field 'environment_id'"
)

if not self._credential_source_executable_command:
raise ValueError(
"Missing command. Executable command must be provided."
)
if not self._credential_source_executable_timeout_millis:
raise ValueError(
"Missing timeout_millis. Executable timeout millis must be provided."
)

@_helpers.copy_docstring(external_account.Credentials)
def retrieve_subject_token(self, request):
env_allow_executables = os.environ.get('GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES')
if env_allow_executables != '1':
raise ValueError(
"Executables need to be explicitly allowed (set GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES to '1') to run."
)

# Inject env vars
original_audience = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE")
os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience
original_subject_token_type = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE")
os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type
original_interactive = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE")
os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = "0" # Always set to 0 until interactive mode is implemented.
original_service_account_impersonation_url = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL")
if self._service_account_impersonation_url is not None:
os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = self._service_account_impersonation_url
original_credential_source_executable_output_file = os.getenv("GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE")
if self._credential_source_executable_output_file is not None:
os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = self._credential_source_executable_output_file

result = subprocess.run(self._credential_source_executable_command.split(), timeout=self._credential_source_executable_timeout_millis/1000, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

# Reset env vars
if original_audience is not None:
os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = original_audience
else:
del os.environ["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"]
if original_subject_token_type is not None:
os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self.original_subject_token_type
else:
del os.environ["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"]
if original_interactive is not None:
os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = original_interactive
else:
del os.environ["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"]
if original_service_account_impersonation_url is not None:
os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"] = original_service_account_impersonation_url
elif os.getenv("GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL") is not None:
del os.environ["GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL"]
if original_credential_source_executable_output_file is not None:
os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"] = original_credential_source_executable_output_file
elif os.getenv("GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE") is not None:
del os.environ["GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE"]

if result.returncode != 0:
raise exceptions.RefreshError(
"Executable exited with non-zero return code {}. Error: {}".format(result.returncode, result.stdout)
)
else:
data = result.stdout.decode('utf-8')
response = json.loads(data)
if not response['success']:
raise exceptions.RefreshError(
"Executable returned unsuccessful response: {}.".format(response)
)
elif response['version'] > EXECUTABLE_SUPPORTED_MAX_VERSION:
raise exceptions.RefreshError(
"Executable returned unsupported version {}.".format(response['version'])
)
elif response["token_type"] == "urn:ietf:params:oauth:token-type:jwt" or response["token_type"] == "urn:ietf:params:oauth:token-type:id_token": # OIDC
return response["id_token"]
elif response["token_type"] == "urn:ietf:params:oauth:token-type:saml2": # SAML
return response["saml_response"]
else:
raise exceptions.RefreshError(
"Executable returned unsupported token type."
)

@classmethod
def from_info(cls, info, **kwargs):
"""Creates a Pluggable Credentials instance from parsed external account info.
Args:
info (Mapping[str, str]): The Pluggable external account info in Google
format.
kwargs: Additional arguments to pass to the constructor.
Returns:
google.auth.pluggable.Credentials: The constructed
credentials.
Raises:
ValueError: For invalid parameters.
"""
return cls(
audience=info.get("audience"),
subject_token_type=info.get("subject_token_type"),
token_url=info.get("token_url"),
service_account_impersonation_url=info.get(
"service_account_impersonation_url"
),
client_id=info.get("client_id"),
client_secret=info.get("client_secret"),
credential_source=info.get("credential_source"),
quota_project_id=info.get("quota_project_id"),
workforce_pool_user_project=info.get("workforce_pool_user_project"),
**kwargs
)

@classmethod
def from_file(cls, filename, **kwargs):
"""Creates an Pluggable Credentials instance from an external account json file.
Args:
filename (str): The path to the Pluggable external account json file.
kwargs: Additional arguments to pass to the constructor.
Returns:
google.auth.pluggable.Credentials: The constructed
credentials.
"""
with io.open(filename, "r", encoding="utf-8") as json_file:
data = json.load(json_file)
return cls.from_info(data, **kwargs)
Loading

0 comments on commit 63ad5e9

Please sign in to comment.