diff --git a/homeassistant/auth/__init__.py b/homeassistant/auth/__init__.py index 504df9920b465..c0beba1a22760 100644 --- a/homeassistant/auth/__init__.py +++ b/homeassistant/auth/__init__.py @@ -55,7 +55,7 @@ async def auth_manager_from_config( *[auth_mfa_module_from_config(hass, config) for config in module_configs]) else: - modules = [] + modules = () # So returned auth modules are in same order as config module_hash = OrderedDict() # type: _MfaModuleDict for module in modules: @@ -78,7 +78,8 @@ class AuthManager: """Manage the authentication for Home Assistant.""" def __init__(self, hass: HomeAssistant, store: auth_store.AuthStore, - providers: _ProviderDict, mfa_modules) -> None: + providers: _ProviderDict, mfa_modules: _MfaModuleDict) \ + -> None: """Initialize the auth manager.""" self.hass = hass self._store = store @@ -115,7 +116,8 @@ def auth_mfa_modules(self) -> List[MultiFactorAuthModule]: """Return a list of available auth modules.""" return list(self._mfa_modules.values()) - def get_auth_mfa_module(self, module_id): + def get_auth_mfa_module(self, module_id: str) \ + -> Optional[MultiFactorAuthModule]: """Return an multi-factor auth module, None if not found.""" return self._mfa_modules.get(module_id) @@ -127,7 +129,8 @@ async def async_get_user(self, user_id: str) -> Optional[models.User]: """Retrieve a user.""" return await self._store.async_get_user(user_id) - async def async_get_user_by_credentials(self, credentials): + async def async_get_user_by_credentials( + self, credentials: models.Credentials) -> Optional[models.User]: """Get a user by credential, return None if not found.""" for user in await self.async_get_users(): for creds in user.credentials: @@ -220,16 +223,18 @@ async def async_remove_credentials( await self._store.async_remove_credentials(credentials) - async def async_enable_user_mfa(self, user, mfa_module_id, data): + async def async_enable_user_mfa(self, user: models.User, + mfa_module_id: str, data: Any) -> None: """Enable a multi-factor auth module for user.""" - if mfa_module_id not in self._mfa_modules: - raise ValueError('Unable find multi-factor auth module: {}' - .format(mfa_module_id)) if user.system_generated: raise ValueError('System generated users cannot enable ' 'multi-factor auth module.') module = self.get_auth_mfa_module(mfa_module_id) + if module is None: + raise ValueError('Unable find multi-factor auth module: {}' + .format(mfa_module_id)) + if module.setup_schema is not None: try: # pylint: disable=not-callable @@ -237,21 +242,23 @@ async def async_enable_user_mfa(self, user, mfa_module_id, data): except vol.Invalid as err: raise ValueError('Data does not match schema: {}'.format(err)) - return await module.async_setup_user(user.id, data) + await module.async_setup_user(user.id, data) - async def async_disable_user_mfa(self, user, mfa_module_id): + async def async_disable_user_mfa(self, user: models.User, + mfa_module_id: str) -> None: """Disable a multi-factor auth module for user.""" - if mfa_module_id not in self._mfa_modules: - raise ValueError('Unable find multi-factor auth module: {}' - .format(mfa_module_id)) if user.system_generated: raise ValueError('System generated users cannot disable ' 'multi-factor auth module.') module = self.get_auth_mfa_module(mfa_module_id) + if module is None: + raise ValueError('Unable find multi-factor auth module: {}' + .format(mfa_module_id)) + await module.async_depose_user(user.id) - async def async_get_enabled_mfa(self, user): + async def async_get_enabled_mfa(self, user: models.User) -> List[str]: """List enabled mfa modules for user.""" module_ids = [] for module_id, module in self._mfa_modules.items(): diff --git a/homeassistant/auth/mfa_modules/__init__.py b/homeassistant/auth/mfa_modules/__init__.py index 40f0177eb18fd..d0707c4a7452f 100644 --- a/homeassistant/auth/mfa_modules/__init__.py +++ b/homeassistant/auth/mfa_modules/__init__.py @@ -1,13 +1,16 @@ """Plugable auth modules for Home Assistant.""" +from datetime import timedelta import importlib import logging -from datetime import timedelta +import types +from typing import Any, Dict, Optional import voluptuous as vol from voluptuous.humanize import humanize_error from homeassistant import requirements from homeassistant.const import CONF_ID, CONF_NAME, CONF_TYPE +from homeassistant.core import HomeAssistant from homeassistant.util.decorator import Registry MULTI_FACTOR_AUTH_MODULES = Registry() @@ -26,69 +29,18 @@ _LOGGER = logging.getLogger(__name__) -async def auth_mfa_module_from_config(hass, config): - """Initialize an auth module from a config.""" - module_name = config[CONF_TYPE] - module = await _load_mfa_module(hass, module_name) - - if module is None: - return None - - try: - config = module.CONFIG_SCHEMA(config) - except vol.Invalid as err: - _LOGGER.error('Invalid configuration for multi-factor module %s: %s', - module_name, humanize_error(config, err)) - return None - - return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config) - - -async def _load_mfa_module(hass, module_name): - """Load an mfa auth module.""" - module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name) - - try: - module = importlib.import_module(module_path) - except ImportError: - _LOGGER.warning('Unable to find %s', module_path) - return None - - if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'): - return module - - processed = hass.data.get(DATA_REQS) - if processed and module_name in processed: - return module - - hass.data[DATA_REQS] = set() - - req_success = await requirements.async_process_requirements( - hass, module_path, module.REQUIREMENTS) - - if not req_success: - return None - - processed.add(module_name) - return module - - class MultiFactorAuthModule: """Multi-factor Auth Module of validation function.""" DEFAULT_TITLE = 'Unnamed auth module' - def __init__(self, hass, config): + def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None: """Initialize an auth module.""" self.hass = hass self.config = config - _LOGGER.debug('auth mfa module %s loaded.', - self.type if self.id is None else "{}[{}]".format( - self.type, self.id - )) @property - def id(self): # pylint: disable=invalid-name + def id(self) -> str: # pylint: disable=invalid-name """Return id of the auth module. Default is same as type @@ -96,42 +48,94 @@ def id(self): # pylint: disable=invalid-name return self.config.get(CONF_ID, self.type) @property - def type(self): + def type(self) -> str: """Return type of the module.""" - return self.config[CONF_TYPE] + return self.config[CONF_TYPE] # type: ignore @property - def name(self): + def name(self) -> str: """Return the name of the auth module.""" return self.config.get(CONF_NAME, self.DEFAULT_TITLE) # Implement by extending class @property - def input_schema(self): + def input_schema(self) -> vol.Schema: """Return a voluptuous schema to define mfa auth module's input.""" raise NotImplementedError @property - def setup_schema(self): + def setup_schema(self) -> Optional[vol.Schema]: """Return a vol schema to validate mfa auth module's setup input. Optional """ return None - async def async_setup_user(self, user_id, setup_data): + async def async_setup_user(self, user_id: str, setup_data: Any) -> None: """Set up user for mfa auth module.""" raise NotImplementedError - async def async_depose_user(self, user_id): + async def async_depose_user(self, user_id: str) -> None: """Remove user from mfa module.""" raise NotImplementedError - async def async_is_user_setup(self, user_id): + async def async_is_user_setup(self, user_id: str) -> bool: """Return whether user is setup.""" raise NotImplementedError - async def async_validation(self, user_id, user_input): + async def async_validation( + self, user_id: str, user_input: Dict[str, Any]) -> bool: """Return True if validation passed.""" raise NotImplementedError + + +async def auth_mfa_module_from_config( + hass: HomeAssistant, config: Dict[str, Any]) \ + -> Optional[MultiFactorAuthModule]: + """Initialize an auth module from a config.""" + module_name = config[CONF_TYPE] + module = await _load_mfa_module(hass, module_name) + + if module is None: + return None + + try: + config = module.CONFIG_SCHEMA(config) # type: ignore + except vol.Invalid as err: + _LOGGER.error('Invalid configuration for multi-factor module %s: %s', + module_name, humanize_error(config, err)) + return None + + return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config) # type: ignore + + +async def _load_mfa_module(hass: HomeAssistant, module_name: str) \ + -> Optional[types.ModuleType]: + """Load an mfa auth module.""" + module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name) + + try: + module = importlib.import_module(module_path) + except ImportError: + _LOGGER.warning('Unable to find %s', module_path) + return None + + if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'): + return module + + processed = hass.data.get(DATA_REQS) + if processed and module_name in processed: + return module + + processed = hass.data[DATA_REQS] = set() + + # https://github.com/python/mypy/issues/1424 + req_success = await requirements.async_process_requirements( + hass, module_path, module.REQUIREMENTS) # type: ignore + + if not req_success: + return None + + processed.add(module_name) + return module diff --git a/homeassistant/auth/mfa_modules/insecure_example.py b/homeassistant/auth/mfa_modules/insecure_example.py index 863d2ffe8f284..59b3f64d2e052 100644 --- a/homeassistant/auth/mfa_modules/insecure_example.py +++ b/homeassistant/auth/mfa_modules/insecure_example.py @@ -1,8 +1,11 @@ """Example auth module.""" import logging +from typing import Any, Dict, Optional import voluptuous as vol +from homeassistant.core import HomeAssistant + from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \ MULTI_FACTOR_AUTH_MODULE_SCHEMA @@ -22,22 +25,22 @@ class InsecureExampleModule(MultiFactorAuthModule): DEFAULT_TITLE = 'Insecure Personal Identify Number' - def __init__(self, hass, config): + def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None: """Initialize the user data store.""" super().__init__(hass, config) self._data = config['data'] @property - def input_schema(self): + def input_schema(self) -> vol.Schema: """Validate login flow input data.""" return vol.Schema({'pin': str}) @property - def setup_schema(self): + def setup_schema(self) -> Optional[vol.Schema]: """Validate async_setup_user input data.""" return vol.Schema({'pin': str}) - async def async_setup_user(self, user_id, setup_data): + async def async_setup_user(self, user_id: str, setup_data: Any) -> None: """Set up user to use mfa module.""" # data shall has been validate in caller pin = setup_data['pin'] @@ -50,7 +53,7 @@ async def async_setup_user(self, user_id, setup_data): self._data.append({'user_id': user_id, 'pin': pin}) - async def async_depose_user(self, user_id): + async def async_depose_user(self, user_id: str) -> None: """Remove user from mfa module.""" found = None for data in self._data: @@ -60,14 +63,15 @@ async def async_depose_user(self, user_id): if found: self._data.remove(found) - async def async_is_user_setup(self, user_id): + async def async_is_user_setup(self, user_id: str) -> bool: """Return whether user is setup.""" for data in self._data: if data['user_id'] == user_id: return True return False - async def async_validation(self, user_id, user_input): + async def async_validation( + self, user_id: str, user_input: Dict[str, Any]) -> bool: """Return True if validation passed.""" for data in self._data: if data['user_id'] == user_id: diff --git a/homeassistant/auth/providers/__init__.py b/homeassistant/auth/providers/__init__.py index 503d79fe458f1..e8ef7cbf3d4f8 100644 --- a/homeassistant/auth/providers/__init__.py +++ b/homeassistant/auth/providers/__init__.py @@ -1,5 +1,4 @@ """Auth providers for Home Assistant.""" -from collections import OrderedDict import importlib import logging import types @@ -15,7 +14,7 @@ from homeassistant.util.decorator import Registry from ..auth_store import AuthStore -from ..models import Credentials, UserMeta +from ..models import Credentials, User, UserMeta # noqa: F401 from ..mfa_modules import SESSION_EXPIRATION _LOGGER = logging.getLogger(__name__) @@ -167,11 +166,11 @@ class LoginFlow(data_entry_flow.FlowHandler): def __init__(self, auth_provider: AuthProvider) -> None: """Initialize the login flow.""" self._auth_provider = auth_provider - self._auth_module_id = None - self._auth_manager = auth_provider.hass.auth - self.available_mfa_modules = [] + self._auth_module_id = None # type: Optional[str] + self._auth_manager = auth_provider.hass.auth # type: ignore + self.available_mfa_modules = [] # type: List self.created_at = dt_util.utcnow() - self.user = None + self.user = None # type: Optional[User] async def async_step_init( self, user_input: Optional[Dict[str, str]] = None) \ @@ -183,7 +182,8 @@ async def async_step_init( """ raise NotImplementedError - async def async_step_select_mfa_module(self, user_input=None) \ + async def async_step_select_mfa_module( + self, user_input: Optional[Dict[str, str]] = None) \ -> Dict[str, Any]: """Handle the step of select mfa module.""" errors = {} @@ -199,16 +199,17 @@ async def async_step_select_mfa_module(self, user_input=None) \ self._auth_module_id = self.available_mfa_modules[0] return await self.async_step_mfa() - schema = OrderedDict() - schema['multi_factor_auth_module'] = vol.In(self.available_mfa_modules) - return self.async_show_form( step_id='select_mfa_module', - data_schema=vol.Schema(schema), + data_schema=vol.Schema({ + 'multi_factor_auth_module': vol.In(self.available_mfa_modules) + }), errors=errors, ) - async def async_step_mfa(self, user_input=None) -> Dict[str, Any]: + async def async_step_mfa( + self, user_input: Optional[Dict[str, str]] = None) \ + -> Dict[str, Any]: """Handle the step of mfa validation.""" errors = {} @@ -217,8 +218,7 @@ async def async_step_mfa(self, user_input=None) -> Dict[str, Any]: if auth_module is None: # Given an invalid input to async_step_select_mfa_module # will show invalid_auth_module error - return await self.async_step_select_mfa_module( - user_input={'multi_factor_auth_module': None}) + return await self.async_step_select_mfa_module(user_input={}) if user_input is not None: expires = self.created_at + SESSION_EXPIRATION @@ -226,7 +226,7 @@ async def async_step_mfa(self, user_input=None) -> Dict[str, Any]: errors['base'] = 'login_expired' else: result = await auth_module.async_validation( - self.user.id, user_input) + self.user.id, user_input) # type: ignore if not result: errors['base'] = 'invalid_auth'