Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read tenant and authority from VS Code user settings #18846

Merged
merged 11 commits into from
Jun 4, 2021
20 changes: 14 additions & 6 deletions sdk/identity/azure-identity/azure/identity/_credentials/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,24 @@ class DefaultAzureCredential(ChainedTokenCredential):
:keyword str shared_cache_tenant_id: Preferred tenant for :class:`~azure.identity.SharedTokenCacheCredential`.
Defaults to the value of environment variable AZURE_TENANT_ID, if any.
:keyword str visual_studio_code_tenant_id: Tenant ID to use when authenticating with
:class:`~azure.identity.VisualStudioCodeCredential`.
:class:`~azure.identity.VisualStudioCodeCredential`. Defaults to the "Azure: Tenant" setting in VS Code's user
settings or, when that setting has no value, the "organizations" tenant, which supports only Azure Active
Directory work or school accounts.
"""

def __init__(self, **kwargs):
# type: (**Any) -> None
authority = kwargs.pop("authority", None)

vscode_tenant_id = kwargs.pop(
"visual_studio_code_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
)
vscode_args = {}
if authority:
vscode_args["authority"] = authority
if vscode_tenant_id:
vscode_args["tenant_id"] = vscode_tenant_id

authority = normalize_authority(authority) if authority else get_default_authority()

interactive_browser_tenant_id = kwargs.pop(
Expand All @@ -93,10 +105,6 @@ def __init__(self, **kwargs):
"shared_cache_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
)

vscode_tenant_id = kwargs.pop(
"visual_studio_code_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
)

exclude_environment_credential = kwargs.pop("exclude_environment_credential", False)
exclude_managed_identity_credential = kwargs.pop("exclude_managed_identity_credential", False)
exclude_shared_token_cache_credential = kwargs.pop("exclude_shared_token_cache_credential", False)
Expand All @@ -120,7 +128,7 @@ def __init__(self, **kwargs):
except Exception as ex: # pylint:disable=broad-except
_LOGGER.info("Shared token cache is unavailable: '%s'", ex)
if not exclude_visual_studio_code_credential:
credentials.append(VisualStudioCodeCredential(tenant_id=vscode_tenant_id))
credentials.append(VisualStudioCodeCredential(**vscode_args))
if not exclude_cli_credential:
credentials.append(AzureCliCredential())
if not exclude_powershell_credential:
Expand Down
138 changes: 108 additions & 30 deletions sdk/identity/azure-identity/azure/identity/_credentials/vscode.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,125 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import abc
import os
import sys
from typing import TYPE_CHECKING
from typing import cast, TYPE_CHECKING

from .._exceptions import CredentialUnavailableError
from .._constants import AZURE_VSCODE_CLIENT_ID
from .._internal import validate_tenant_id
from .._constants import AzureAuthorityHosts, AZURE_VSCODE_CLIENT_ID, EnvironmentVariables
from .._internal import normalize_authority, validate_tenant_id
from .._internal.aad_client import AadClient
from .._internal.get_token_mixin import GetTokenMixin

if sys.platform.startswith("win"):
from .._internal.win_vscode_adapter import get_credentials
from .._internal.win_vscode_adapter import get_refresh_token, get_user_settings
elif sys.platform.startswith("darwin"):
from .._internal.macos_vscode_adapter import get_credentials
from .._internal.macos_vscode_adapter import get_refresh_token, get_user_settings
else:
from .._internal.linux_vscode_adapter import get_credentials
from .._internal.linux_vscode_adapter import get_refresh_token, get_user_settings

if TYPE_CHECKING:
# pylint:disable=unused-import,ungrouped-imports
from typing import Any, Optional
from typing import Any, Dict, Optional
from azure.core.credentials import AccessToken
from .._internal.aad_client import AadClientBase

try:
ABC = abc.ABC
except AttributeError: # Python 2.7, abc exists, but not ABC
ABC = abc.ABCMeta("ABC", (object,), {"__slots__": ()}) # type: ignore

class VisualStudioCodeCredential(GetTokenMixin):
"""Authenticates as the Azure user signed in to Visual Studio Code.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds.
:keyword str tenant_id: ID of the tenant the credential should authenticate in. Defaults to the "organizations"
tenant, which supports only Azure Active Directory work or school accounts.
"""

class _VSCodeCredentialBase(ABC):
def __init__(self, **kwargs):
# type: (**Any) -> None
super(VisualStudioCodeCredential, self).__init__()
super(_VSCodeCredentialBase, self).__init__()

user_settings = get_user_settings()
self._cloud = user_settings.get("azure.cloud", "AzureCloud")
self._refresh_token = None
self._client = kwargs.pop("_client", None)
self._tenant_id = kwargs.pop("tenant_id", None) or "organizations"
validate_tenant_id(self._tenant_id)
self._unavailable_reason = ""

self._client = kwargs.get("_client")
if not self._client:
self._client = AadClient(self._tenant_id, AZURE_VSCODE_CLIENT_ID, **kwargs)
self._initialize(user_settings, **kwargs)
if not (self._client or self._unavailable_reason):
self._unavailable_reason = "Initialization failed"

@abc.abstractmethod
def _get_client(self, **kwargs):
# type: (**Any) -> AadClientBase
pass

def _get_refresh_token(self):
# type: () -> str
if not self._refresh_token:
self._refresh_token = get_refresh_token(self._cloud)
if not self._refresh_token:
raise CredentialUnavailableError(message="Failed to get Azure user details from Visual Studio Code.")
return self._refresh_token

def _initialize(self, vscode_user_settings, **kwargs):
# type: (Dict, **Any) -> None
"""Build a client from kwargs merged with VS Code user settings.

The first stable version of this credential defaulted to Public Cloud and the "organizations"
tenant when it failed to read VS Code user settings. That behavior is preserved here.
"""

# Precedence for authority:
# 1) VisualStudioCodeCredential(authority=...)
# 2) $AZURE_AUTHORITY_HOST
# 3) authority matching VS Code's "azure.cloud" setting
# 4) default: Public Cloud
authority = kwargs.pop("authority", None) or os.environ.get(EnvironmentVariables.AZURE_AUTHORITY_HOST)
if not authority:
# the application didn't specify an authority, so we figure it out from VS Code settings
if self._cloud == "AzureCloud":
authority = AzureAuthorityHosts.AZURE_PUBLIC_CLOUD
elif self._cloud == "AzureChinaCloud":
authority = AzureAuthorityHosts.AZURE_CHINA
elif self._cloud == "AzureGermanCloud":
authority = AzureAuthorityHosts.AZURE_GERMANY
elif self._cloud == "AzureUSGovernment":
authority = AzureAuthorityHosts.AZURE_GOVERNMENT
else:
# If the value is anything else ("AzureCustomCloud" is the only other known value),
# we need the user to provide the authority because VS Code has no setting for it and
# we can't guess confidently.
self._unavailable_reason = (
'VS Code is configured to use a custom cloud. Set keyword argument "authority"'
+ ' with the Azure Active Directory endpoint for cloud "{}"'.format(self._cloud)
)
return

# Precedence for tenant ID:
# 1) VisualStudioCodeCredential(tenant_id=...)
# 2) "azure.tenant" in VS Code user settings
# 3) default: organizations
tenant_id = kwargs.pop("tenant_id", None) or vscode_user_settings.get("azure.tenant", "organizations")
validate_tenant_id(tenant_id)
if tenant_id.lower() == "adfs":
self._unavailable_reason = "VisualStudioCodeCredential authentication unavailable. ADFS is not supported."
return

self._client = self._get_client(
authority=normalize_authority(authority), client_id=AZURE_VSCODE_CLIENT_ID, tenant_id=tenant_id, **kwargs
)


class VisualStudioCodeCredential(_VSCodeCredentialBase, GetTokenMixin):
"""Authenticates as the Azure user signed in to Visual Studio Code.

:keyword str authority: authority of an Azure Active Directory endpoint, for example "login.microsoftonline.com".
This argument is required for a custom cloud and usually unnecessary otherwise. Defaults to the authority
matching the "Azure: Cloud" setting in VS Code's user settings or, when that setting has no value, the
authority for Azure Public Cloud.
:keyword str tenant_id: ID of the tenant the credential should authenticate in. Defaults to the "Azure: Tenant"
setting in VS Code's user settings or, when that setting has no value, the "organizations" tenant, which
supports only Azure Active Directory work or school accounts.
"""

def get_token(self, *scopes, **kwargs):
# type: (*str, **Any) -> AccessToken
Expand All @@ -55,21 +133,21 @@ def get_token(self, *scopes, **kwargs):
:raises ~azure.identity.CredentialUnavailableError: the credential cannot retrieve user details from Visual
Studio Code
"""
if self._tenant_id.lower() == "adfs":
raise CredentialUnavailableError(
message="VisualStudioCodeCredential authentication unavailable. ADFS is not supported."
)
if self._unavailable_reason:
raise CredentialUnavailableError(message=self._unavailable_reason)
return super(VisualStudioCodeCredential, self).get_token(*scopes, **kwargs)

def _acquire_token_silently(self, *scopes):
# type: (*str) -> Optional[AccessToken]
self._client = cast(AadClient, self._client)
return self._client.get_cached_access_token(scopes)

def _request_token(self, *scopes, **kwargs):
# type: (*str, **Any) -> AccessToken
if not self._refresh_token:
self._refresh_token = get_credentials()
if not self._refresh_token:
raise CredentialUnavailableError(message="Failed to get Azure user details from Visual Studio Code.")
refresh_token = self._get_refresh_token()
self._client = cast(AadClient, self._client)
return self._client.obtain_token_by_refresh_token(scopes, refresh_token, **kwargs)

return self._client.obtain_token_by_refresh_token(scopes, self._refresh_token, **kwargs)
def _get_client(self, **kwargs):
# type: (**Any) -> AadClient
return AadClient(**kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,6 @@ class _SECRET_SCHEMA(ct.Structure):
_libsecret = None # type: ignore


def _get_user_settings_path():
app_data_folder = os.environ["HOME"]
return os.path.join(app_data_folder, ".config", "Code", "User", "settings.json")


def _get_user_settings():
path = _get_user_settings_path()
try:
with open(path) as file:
data = json.load(file)
environment_name = data.get("azure.cloud", "AzureCloud")
return environment_name
except IOError:
return "AzureCloud"


def _get_refresh_token(service_name, account_name):
if not _libsecret:
return None
Expand All @@ -88,18 +72,26 @@ def _get_refresh_token(service_name, account_name):
_c_str(account_name),
None,
)
if err.value == 0:
if err.value == 0 and p_str:
return p_str.decode("utf-8")

return None


def get_credentials():
def get_user_settings():
try:
path = os.path.join(os.environ["HOME"], ".config", "Code", "User", "settings.json")
with open(path) as file:
return json.load(file)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.debug('Exception reading VS Code user settings: "%s"', ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
return {}


def get_refresh_token(cloud_name):
try:
environment_name = _get_user_settings()
credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name)
return credentials
except Exception as ex: # pylint: disable=broad-except
return _get_refresh_token(VSCODE_CREDENTIALS_SECTION, cloud_name)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.debug(
'Exception retrieving VS Code credentials: "%s"', ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,23 @@
_LOGGER = logging.getLogger(__name__)


def _get_user_settings_path():
app_data_folder = os.environ["USER"]
return os.path.join(app_data_folder, "Library", "Application Support", "Code", "User", "settings.json")


def _get_user_settings():
path = _get_user_settings_path()
def get_user_settings():
try:
path = os.path.join(os.environ["HOME"], "Library", "Application Support", "Code", "User", "settings.json")
with open(path) as file:
data = json.load(file)
environment_name = data.get("azure.cloud", "AzureCloud")
return environment_name
except IOError:
return "AzureCloud"
return json.load(file)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.debug('Exception reading VS Code user settings: "%s"', ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
return {}


def _get_refresh_token(service_name, account_name):
key_chain = Keychain()
def get_refresh_token(cloud_name):
try:
return key_chain.get_generic_password(service_name, account_name)
key_chain = Keychain()
return key_chain.get_generic_password(VSCODE_CREDENTIALS_SECTION, cloud_name)
except KeychainError:
return None


def get_credentials():
try:
environment_name = _get_user_settings()
credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name)
return credentials
except Exception as ex: # pylint: disable=broad-except
except Exception as ex: # pylint:disable=broad-except
_LOGGER.debug(
'Exception retrieving VS Code credentials: "%s"', ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,19 @@ def _read_credential(service_name, account_name):
return None


def _get_user_settings_path():
app_data_folder = os.environ["APPDATA"]
return os.path.join(app_data_folder, "Code", "User", "settings.json")


def _get_user_settings():
path = _get_user_settings_path()
def get_user_settings():
try:
path = os.path.join(os.environ["APPDATA"], "Code", "User", "settings.json")
with open(path) as file:
data = json.load(file)
environment_name = data.get("azure.cloud", "AzureCloud")
return environment_name
except IOError:
return "AzureCloud"


def _get_refresh_token(service_name, account_name):
return _read_credential(service_name, account_name)
return json.load(file)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.debug('Exception reading VS Code user settings: "%s"', ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
return {}


def get_credentials():
def get_refresh_token(cloud_name):
try:
environment_name = _get_user_settings()
credentials = _get_refresh_token(VSCODE_CREDENTIALS_SECTION, environment_name)
return credentials
return _read_credential(VSCODE_CREDENTIALS_SECTION, cloud_name)
except Exception as ex: # pylint: disable=broad-except
_LOGGER.debug(
'Exception retrieving VS Code credentials: "%s"', ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG)
Expand Down
Loading