-
-
Notifications
You must be signed in to change notification settings - Fork 31.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add multi-factor authentication modules (#15489)
* Get user after login flow finished * Add multi factor authentication support * Typings
- Loading branch information
Showing
18 changed files
with
925 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
"""Plugable auth modules for Home Assistant.""" | ||
from datetime import timedelta | ||
import importlib | ||
import logging | ||
import types | ||
from typing import Any, Dict, Optional | ||
|
||
import voluptuous as vol | ||
from voluptuous.humanize import humanize_error | ||
|
||
from homeassistant import requirements | ||
from homeassistant.const import CONF_ID, CONF_NAME, CONF_TYPE | ||
from homeassistant.core import HomeAssistant | ||
from homeassistant.util.decorator import Registry | ||
|
||
MULTI_FACTOR_AUTH_MODULES = Registry() | ||
|
||
MULTI_FACTOR_AUTH_MODULE_SCHEMA = vol.Schema({ | ||
vol.Required(CONF_TYPE): str, | ||
vol.Optional(CONF_NAME): str, | ||
# Specify ID if you have two mfa auth module for same type. | ||
vol.Optional(CONF_ID): str, | ||
}, extra=vol.ALLOW_EXTRA) | ||
|
||
SESSION_EXPIRATION = timedelta(minutes=5) | ||
|
||
DATA_REQS = 'mfa_auth_module_reqs_processed' | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class MultiFactorAuthModule: | ||
"""Multi-factor Auth Module of validation function.""" | ||
|
||
DEFAULT_TITLE = 'Unnamed auth module' | ||
|
||
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None: | ||
"""Initialize an auth module.""" | ||
self.hass = hass | ||
self.config = config | ||
|
||
@property | ||
def id(self) -> str: # pylint: disable=invalid-name | ||
"""Return id of the auth module. | ||
Default is same as type | ||
""" | ||
return self.config.get(CONF_ID, self.type) | ||
|
||
@property | ||
def type(self) -> str: | ||
"""Return type of the module.""" | ||
return self.config[CONF_TYPE] # type: ignore | ||
|
||
@property | ||
def name(self) -> str: | ||
"""Return the name of the auth module.""" | ||
return self.config.get(CONF_NAME, self.DEFAULT_TITLE) | ||
|
||
# Implement by extending class | ||
|
||
@property | ||
def input_schema(self) -> vol.Schema: | ||
"""Return a voluptuous schema to define mfa auth module's input.""" | ||
raise NotImplementedError | ||
|
||
@property | ||
def setup_schema(self) -> Optional[vol.Schema]: | ||
"""Return a vol schema to validate mfa auth module's setup input. | ||
Optional | ||
""" | ||
return None | ||
|
||
async def async_setup_user(self, user_id: str, setup_data: Any) -> None: | ||
"""Set up user for mfa auth module.""" | ||
raise NotImplementedError | ||
|
||
async def async_depose_user(self, user_id: str) -> None: | ||
"""Remove user from mfa module.""" | ||
raise NotImplementedError | ||
|
||
async def async_is_user_setup(self, user_id: str) -> bool: | ||
"""Return whether user is setup.""" | ||
raise NotImplementedError | ||
|
||
async def async_validation( | ||
self, user_id: str, user_input: Dict[str, Any]) -> bool: | ||
"""Return True if validation passed.""" | ||
raise NotImplementedError | ||
|
||
|
||
async def auth_mfa_module_from_config( | ||
hass: HomeAssistant, config: Dict[str, Any]) \ | ||
-> Optional[MultiFactorAuthModule]: | ||
"""Initialize an auth module from a config.""" | ||
module_name = config[CONF_TYPE] | ||
module = await _load_mfa_module(hass, module_name) | ||
|
||
if module is None: | ||
return None | ||
|
||
try: | ||
config = module.CONFIG_SCHEMA(config) # type: ignore | ||
except vol.Invalid as err: | ||
_LOGGER.error('Invalid configuration for multi-factor module %s: %s', | ||
module_name, humanize_error(config, err)) | ||
return None | ||
|
||
return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config) # type: ignore | ||
|
||
|
||
async def _load_mfa_module(hass: HomeAssistant, module_name: str) \ | ||
-> Optional[types.ModuleType]: | ||
"""Load an mfa auth module.""" | ||
module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name) | ||
|
||
try: | ||
module = importlib.import_module(module_path) | ||
except ImportError: | ||
_LOGGER.warning('Unable to find %s', module_path) | ||
return None | ||
|
||
if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'): | ||
return module | ||
|
||
processed = hass.data.get(DATA_REQS) | ||
if processed and module_name in processed: | ||
return module | ||
|
||
processed = hass.data[DATA_REQS] = set() | ||
|
||
# https://github.com/python/mypy/issues/1424 | ||
req_success = await requirements.async_process_requirements( | ||
hass, module_path, module.REQUIREMENTS) # type: ignore | ||
|
||
if not req_success: | ||
return None | ||
|
||
processed.add(module_name) | ||
return module |
Oops, something went wrong.