Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate modules for client credential types #11496

Merged
merged 1 commit into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
# ------------------------------------
from .authorization_code import AuthorizationCodeCredential
from .browser import InteractiveBrowserCredential
from .certificate import CertificateCredential
from .chained import ChainedTokenCredential
from .client_credential import CertificateCredential, ClientSecretCredential
from .client_secret import ClientSecretCredential
from .default import DefaultAzureCredential
from .environment import EnvironmentCredential
from .managed_identity import ManagedIdentityCredential
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from typing import TYPE_CHECKING

from .._authn_client import AuthnClient
from .._base import CertificateCredentialBase

if TYPE_CHECKING:
from azure.core.credentials import AccessToken
from typing import Any


class CertificateCredential(CertificateCredentialBase):
"""Authenticates as a service principal using a certificate.

:param str tenant_id: ID of the service principal's tenant. Also called its 'directory' ID.
:param str client_id: the service principal's client ID
:param str certificate_path: path to a PEM-encoded certificate file including the private key.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds.
:keyword password: The certificate's password. If a unicode string, it will be encoded as UTF-8. If the certificate
requires a different encoding, pass appropriately encoded bytes instead.
:paramtype password: str or bytes
"""

def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
# type: (*str, **Any) -> AccessToken
"""Request an access token for `scopes`.

.. note:: This method is called by Azure SDK clients. It isn't intended for use in application code.

:param str scopes: desired scopes for the access token. This method requires at least one scope.
:rtype: :class:`azure.core.credentials.AccessToken`
:raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The error's ``message``
attribute gives a reason. Any error response from Azure Active Directory is available as the error's
``response`` attribute.
"""
if not scopes:
raise ValueError("'get_token' requires at least one scope")

token = self._client.get_cached_token(scopes)
if not token:
data = self._get_request_data(*scopes)
token = self._client.request_token(scopes, form_data=data)
return token

def _get_auth_client(self, tenant_id, **kwargs):
return AuthnClient(tenant=tenant_id, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License.
# ------------------------------------
from .._authn_client import AuthnClient
from .._base import ClientSecretCredentialBase, CertificateCredentialBase
from .._base import ClientSecretCredentialBase

try:
from typing import TYPE_CHECKING
Expand All @@ -12,7 +12,7 @@

if TYPE_CHECKING:
# pylint:disable=unused-import,ungrouped-imports
from typing import Any, Mapping
from typing import Any
from azure.core.credentials import AccessToken


Expand Down Expand Up @@ -53,43 +53,3 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
data = dict(self._form_data, scope=" ".join(scopes))
token = self._client.request_token(scopes, form_data=data)
return token


class CertificateCredential(CertificateCredentialBase):
"""Authenticates as a service principal using a certificate.

:param str tenant_id: ID of the service principal's tenant. Also called its 'directory' ID.
:param str client_id: the service principal's client ID
:param str certificate_path: path to a PEM-encoded certificate file including the private key.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds.
:keyword password: The certificate's password. If a unicode string, it will be encoded as UTF-8. If the certificate
requires a different encoding, pass appropriately encoded bytes instead.
:paramtype password: str or bytes
"""

def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
# type: (*str, **Any) -> AccessToken
"""Request an access token for `scopes`.

.. note:: This method is called by Azure SDK clients. It isn't intended for use in application code.

:param str scopes: desired scopes for the access token. This method requires at least one scope.
:rtype: :class:`azure.core.credentials.AccessToken`
:raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The error's ``message``
attribute gives a reason. Any error response from Azure Active Directory is available as the error's
``response`` attribute.
"""
if not scopes:
raise ValueError("'get_token' requires at least one scope")

token = self._client.get_cached_token(scopes)
if not token:
data = self._get_request_data(*scopes)
token = self._client.request_token(scopes, form_data=data)
return token

def _get_auth_client(self, tenant_id, **kwargs):
return AuthnClient(tenant=tenant_id, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from .. import CredentialUnavailableError
from .._constants import EnvironmentVariables
from .client_credential import CertificateCredential, ClientSecretCredential
from .certificate import CertificateCredential
from .client_secret import ClientSecretCredential
from .user_password import UsernamePasswordCredential


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from .default import DefaultAzureCredential
from .environment import EnvironmentCredential
from .managed_identity import ManagedIdentityCredential
from .client_credential import CertificateCredential, ClientSecretCredential
from .certificate import CertificateCredential
from .client_secret import ClientSecretCredential
from .shared_cache import SharedTokenCacheCredential
from .azure_cli import AzureCliCredential

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,13 @@

from .base import AsyncCredentialBase
from .._authn_client import AsyncAuthnClient
from ..._base import ClientSecretCredentialBase, CertificateCredentialBase
from ..._base import CertificateCredentialBase

if TYPE_CHECKING:
from typing import Any, Mapping
from typing import Any
from azure.core.credentials import AccessToken


class ClientSecretCredential(ClientSecretCredentialBase, AsyncCredentialBase):
"""Authenticates as a service principal using a client ID and client secret.

:param str tenant_id: ID of the service principal's tenant. Also called its 'directory' ID.
:param str client_id: the service principal's client ID
:param str client_secret: one of the service principal's client secrets

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds.
"""

def __init__(self, tenant_id: str, client_id: str, client_secret: str, **kwargs: "Any") -> None:
super(ClientSecretCredential, self).__init__(tenant_id, client_id, client_secret, **kwargs)
self._client = AsyncAuthnClient(tenant=tenant_id, **kwargs)

async def __aenter__(self):
await self._client.__aenter__()
return self

async def close(self):
"""Close the credential's transport session."""

await self._client.__aexit__()

async def get_token(self, *scopes: str, **kwargs: "Any") -> "AccessToken": # pylint:disable=unused-argument
"""Asynchronously request an access token for `scopes`.

.. note:: This method is called by Azure SDK clients. It isn't intended for use in application code.

:param str scopes: desired scopes for the access token. This method requires at least one scope.
:rtype: :class:`azure.core.credentials.AccessToken`
:raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The error's ``message``
attribute gives a reason. Any error response from Azure Active Directory is available as the error's
``response`` attribute.
"""
if not scopes:
raise ValueError("'get_token' requires at least one scope")

token = self._client.get_cached_token(scopes)
if not token:
data = dict(self._form_data, scope=" ".join(scopes))
token = await self._client.request_token(scopes, form_data=data)
return token # type: ignore


class CertificateCredential(CertificateCredentialBase, AsyncCredentialBase):
"""Authenticates as a service principal using a certificate.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from typing import TYPE_CHECKING

from .base import AsyncCredentialBase
from .._authn_client import AsyncAuthnClient
from ..._base import ClientSecretCredentialBase

if TYPE_CHECKING:
from typing import Any
from azure.core.credentials import AccessToken


class ClientSecretCredential(ClientSecretCredentialBase, AsyncCredentialBase):
"""Authenticates as a service principal using a client ID and client secret.

:param str tenant_id: ID of the service principal's tenant. Also called its 'directory' ID.
:param str client_id: the service principal's client ID
:param str client_secret: one of the service principal's client secrets

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds.
"""

def __init__(self, tenant_id: str, client_id: str, client_secret: str, **kwargs: "Any") -> None:
super(ClientSecretCredential, self).__init__(tenant_id, client_id, client_secret, **kwargs)
self._client = AsyncAuthnClient(tenant=tenant_id, **kwargs)

async def __aenter__(self):
await self._client.__aenter__()
return self

async def close(self):
"""Close the credential's transport session."""

await self._client.__aexit__()

async def get_token(self, *scopes: str, **kwargs: "Any") -> "AccessToken": # pylint:disable=unused-argument
"""Asynchronously request an access token for `scopes`.

.. note:: This method is called by Azure SDK clients. It isn't intended for use in application code.

:param str scopes: desired scopes for the access token. This method requires at least one scope.
:rtype: :class:`azure.core.credentials.AccessToken`
:raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The error's ``message``
attribute gives a reason. Any error response from Azure Active Directory is available as the error's
``response`` attribute.
"""
if not scopes:
raise ValueError("'get_token' requires at least one scope")

token = self._client.get_cached_token(scopes)
if not token:
data = dict(self._form_data, scope=" ".join(scopes))
token = await self._client.request_token(scopes, form_data=data)
return token # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from ... import CredentialUnavailableError
from ..._constants import EnvironmentVariables
from .client_credential import CertificateCredential, ClientSecretCredential
from .certificate import CertificateCredential
from .client_secret import ClientSecretCredential
from .base import AsyncCredentialBase

if TYPE_CHECKING:
Expand Down