Skip to content

Commit

Permalink
Typings
Browse files Browse the repository at this point in the history
  • Loading branch information
awarecan committed Aug 22, 2018
1 parent 25b8ef3 commit c159810
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 99 deletions.
35 changes: 21 additions & 14 deletions homeassistant/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def auth_manager_from_config(
*[auth_mfa_module_from_config(hass, config)
for config in module_configs])
else:
modules = []
modules = ()
# So returned auth modules are in same order as config
module_hash = OrderedDict() # type: _MfaModuleDict
for module in modules:
Expand All @@ -78,7 +78,8 @@ class AuthManager:
"""Manage the authentication for Home Assistant."""

def __init__(self, hass: HomeAssistant, store: auth_store.AuthStore,
providers: _ProviderDict, mfa_modules) -> None:
providers: _ProviderDict, mfa_modules: _MfaModuleDict) \
-> None:
"""Initialize the auth manager."""
self.hass = hass
self._store = store
Expand Down Expand Up @@ -115,7 +116,8 @@ def auth_mfa_modules(self) -> List[MultiFactorAuthModule]:
"""Return a list of available auth modules."""
return list(self._mfa_modules.values())

def get_auth_mfa_module(self, module_id):
def get_auth_mfa_module(self, module_id: str) \
-> Optional[MultiFactorAuthModule]:
"""Return an multi-factor auth module, None if not found."""
return self._mfa_modules.get(module_id)

Expand All @@ -127,7 +129,8 @@ async def async_get_user(self, user_id: str) -> Optional[models.User]:
"""Retrieve a user."""
return await self._store.async_get_user(user_id)

async def async_get_user_by_credentials(self, credentials):
async def async_get_user_by_credentials(
self, credentials: models.Credentials) -> Optional[models.User]:
"""Get a user by credential, return None if not found."""
for user in await self.async_get_users():
for creds in user.credentials:
Expand Down Expand Up @@ -220,38 +223,42 @@ async def async_remove_credentials(

await self._store.async_remove_credentials(credentials)

async def async_enable_user_mfa(self, user, mfa_module_id, data):
async def async_enable_user_mfa(self, user: models.User,
mfa_module_id: str, data: Any) -> None:
"""Enable a multi-factor auth module for user."""
if mfa_module_id not in self._mfa_modules:
raise ValueError('Unable find multi-factor auth module: {}'
.format(mfa_module_id))
if user.system_generated:
raise ValueError('System generated users cannot enable '
'multi-factor auth module.')

module = self.get_auth_mfa_module(mfa_module_id)
if module is None:
raise ValueError('Unable find multi-factor auth module: {}'
.format(mfa_module_id))

if module.setup_schema is not None:
try:
# pylint: disable=not-callable
data = module.setup_schema(data)
except vol.Invalid as err:
raise ValueError('Data does not match schema: {}'.format(err))

return await module.async_setup_user(user.id, data)
await module.async_setup_user(user.id, data)

async def async_disable_user_mfa(self, user, mfa_module_id):
async def async_disable_user_mfa(self, user: models.User,
mfa_module_id: str) -> None:
"""Disable a multi-factor auth module for user."""
if mfa_module_id not in self._mfa_modules:
raise ValueError('Unable find multi-factor auth module: {}'
.format(mfa_module_id))
if user.system_generated:
raise ValueError('System generated users cannot disable '
'multi-factor auth module.')

module = self.get_auth_mfa_module(mfa_module_id)
if module is None:
raise ValueError('Unable find multi-factor auth module: {}'
.format(mfa_module_id))

await module.async_depose_user(user.id)

async def async_get_enabled_mfa(self, user):
async def async_get_enabled_mfa(self, user: models.User) -> List[str]:
"""List enabled mfa modules for user."""
module_ids = []
for module_id, module in self._mfa_modules.items():
Expand Down
130 changes: 67 additions & 63 deletions homeassistant/auth/mfa_modules/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Plugable auth modules for Home Assistant."""
from datetime import timedelta
import importlib
import logging
from datetime import timedelta
import types
from typing import Any, Dict, Optional

import voluptuous as vol
from voluptuous.humanize import humanize_error

from homeassistant import requirements
from homeassistant.const import CONF_ID, CONF_NAME, CONF_TYPE
from homeassistant.core import HomeAssistant
from homeassistant.util.decorator import Registry

MULTI_FACTOR_AUTH_MODULES = Registry()
Expand All @@ -26,112 +29,113 @@
_LOGGER = logging.getLogger(__name__)


async def auth_mfa_module_from_config(hass, config):
"""Initialize an auth module from a config."""
module_name = config[CONF_TYPE]
module = await _load_mfa_module(hass, module_name)

if module is None:
return None

try:
config = module.CONFIG_SCHEMA(config)
except vol.Invalid as err:
_LOGGER.error('Invalid configuration for multi-factor module %s: %s',
module_name, humanize_error(config, err))
return None

return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config)


async def _load_mfa_module(hass, module_name):
"""Load an mfa auth module."""
module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name)

try:
module = importlib.import_module(module_path)
except ImportError:
_LOGGER.warning('Unable to find %s', module_path)
return None

if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'):
return module

processed = hass.data.get(DATA_REQS)
if processed and module_name in processed:
return module

hass.data[DATA_REQS] = set()

req_success = await requirements.async_process_requirements(
hass, module_path, module.REQUIREMENTS)

if not req_success:
return None

processed.add(module_name)
return module


class MultiFactorAuthModule:
"""Multi-factor Auth Module of validation function."""

DEFAULT_TITLE = 'Unnamed auth module'

def __init__(self, hass, config):
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize an auth module."""
self.hass = hass
self.config = config
_LOGGER.debug('auth mfa module %s loaded.',
self.type if self.id is None else "{}[{}]".format(
self.type, self.id
))

@property
def id(self): # pylint: disable=invalid-name
def id(self) -> str: # pylint: disable=invalid-name
"""Return id of the auth module.
Default is same as type
"""
return self.config.get(CONF_ID, self.type)

@property
def type(self):
def type(self) -> str:
"""Return type of the module."""
return self.config[CONF_TYPE]
return self.config[CONF_TYPE] # type: ignore

@property
def name(self):
def name(self) -> str:
"""Return the name of the auth module."""
return self.config.get(CONF_NAME, self.DEFAULT_TITLE)

# Implement by extending class

@property
def input_schema(self):
def input_schema(self) -> vol.Schema:
"""Return a voluptuous schema to define mfa auth module's input."""
raise NotImplementedError

@property
def setup_schema(self):
def setup_schema(self) -> Optional[vol.Schema]:
"""Return a vol schema to validate mfa auth module's setup input.
Optional
"""
return None

async def async_setup_user(self, user_id, setup_data):
async def async_setup_user(self, user_id: str, setup_data: Any) -> None:
"""Set up user for mfa auth module."""
raise NotImplementedError

async def async_depose_user(self, user_id):
async def async_depose_user(self, user_id: str) -> None:
"""Remove user from mfa module."""
raise NotImplementedError

async def async_is_user_setup(self, user_id):
async def async_is_user_setup(self, user_id: str) -> bool:
"""Return whether user is setup."""
raise NotImplementedError

async def async_validation(self, user_id, user_input):
async def async_validation(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
raise NotImplementedError


async def auth_mfa_module_from_config(
hass: HomeAssistant, config: Dict[str, Any]) \
-> Optional[MultiFactorAuthModule]:
"""Initialize an auth module from a config."""
module_name = config[CONF_TYPE]
module = await _load_mfa_module(hass, module_name)

if module is None:
return None

try:
config = module.CONFIG_SCHEMA(config) # type: ignore
except vol.Invalid as err:
_LOGGER.error('Invalid configuration for multi-factor module %s: %s',
module_name, humanize_error(config, err))
return None

return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config) # type: ignore


async def _load_mfa_module(hass: HomeAssistant, module_name: str) \
-> Optional[types.ModuleType]:
"""Load an mfa auth module."""
module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name)

try:
module = importlib.import_module(module_path)
except ImportError:
_LOGGER.warning('Unable to find %s', module_path)
return None

if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'):
return module

processed = hass.data.get(DATA_REQS)
if processed and module_name in processed:
return module

processed = hass.data[DATA_REQS] = set()

# https://github.com/python/mypy/issues/1424
req_success = await requirements.async_process_requirements(
hass, module_path, module.REQUIREMENTS) # type: ignore

if not req_success:
return None

processed.add(module_name)
return module
18 changes: 11 additions & 7 deletions homeassistant/auth/mfa_modules/insecure_example.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Example auth module."""
import logging
from typing import Any, Dict, Optional

import voluptuous as vol

from homeassistant.core import HomeAssistant

from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \
MULTI_FACTOR_AUTH_MODULE_SCHEMA

Expand All @@ -22,22 +25,22 @@ class InsecureExampleModule(MultiFactorAuthModule):

DEFAULT_TITLE = 'Insecure Personal Identify Number'

def __init__(self, hass, config):
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize the user data store."""
super().__init__(hass, config)
self._data = config['data']

@property
def input_schema(self):
def input_schema(self) -> vol.Schema:
"""Validate login flow input data."""
return vol.Schema({'pin': str})

@property
def setup_schema(self):
def setup_schema(self) -> Optional[vol.Schema]:
"""Validate async_setup_user input data."""
return vol.Schema({'pin': str})

async def async_setup_user(self, user_id, setup_data):
async def async_setup_user(self, user_id: str, setup_data: Any) -> None:
"""Set up user to use mfa module."""
# data shall has been validate in caller
pin = setup_data['pin']
Expand All @@ -50,7 +53,7 @@ async def async_setup_user(self, user_id, setup_data):

self._data.append({'user_id': user_id, 'pin': pin})

async def async_depose_user(self, user_id):
async def async_depose_user(self, user_id: str) -> None:
"""Remove user from mfa module."""
found = None
for data in self._data:
Expand All @@ -60,14 +63,15 @@ async def async_depose_user(self, user_id):
if found:
self._data.remove(found)

async def async_is_user_setup(self, user_id):
async def async_is_user_setup(self, user_id: str) -> bool:
"""Return whether user is setup."""
for data in self._data:
if data['user_id'] == user_id:
return True
return False

async def async_validation(self, user_id, user_input):
async def async_validation(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
for data in self._data:
if data['user_id'] == user_id:
Expand Down
Loading

0 comments on commit c159810

Please sign in to comment.