Skip to content

Commit

Permalink
Typings
Browse files Browse the repository at this point in the history
  • Loading branch information
awarecan committed Aug 22, 2018
1 parent 25b8ef3 commit a1cf221
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 87 deletions.
18 changes: 11 additions & 7 deletions homeassistant/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class AuthManager:
"""Manage the authentication for Home Assistant."""

def __init__(self, hass: HomeAssistant, store: auth_store.AuthStore,
providers: _ProviderDict, mfa_modules) -> None:
providers: _ProviderDict, mfa_modules: _MfaModuleDict) \
-> None:
"""Initialize the auth manager."""
self.hass = hass
self._store = store
Expand Down Expand Up @@ -115,7 +116,7 @@ def auth_mfa_modules(self) -> List[MultiFactorAuthModule]:
"""Return a list of available auth modules."""
return list(self._mfa_modules.values())

def get_auth_mfa_module(self, module_id):
def get_auth_mfa_module(self, module_id: str) -> MultiFactorAuthModule:
"""Return an multi-factor auth module, None if not found."""
return self._mfa_modules.get(module_id)

Expand All @@ -127,7 +128,8 @@ async def async_get_user(self, user_id: str) -> Optional[models.User]:
"""Retrieve a user."""
return await self._store.async_get_user(user_id)

async def async_get_user_by_credentials(self, credentials):
async def async_get_user_by_credentials(
self, credentials: models.Credentials) -> Optional[models.User]:
"""Get a user by credential, return None if not found."""
for user in await self.async_get_users():
for creds in user.credentials:
Expand Down Expand Up @@ -220,7 +222,8 @@ async def async_remove_credentials(

await self._store.async_remove_credentials(credentials)

async def async_enable_user_mfa(self, user, mfa_module_id, data):
async def async_enable_user_mfa(self, user: models.User,
mfa_module_id: str, data: Any) -> None:
"""Enable a multi-factor auth module for user."""
if mfa_module_id not in self._mfa_modules:
raise ValueError('Unable find multi-factor auth module: {}'
Expand All @@ -237,9 +240,10 @@ async def async_enable_user_mfa(self, user, mfa_module_id, data):
except vol.Invalid as err:
raise ValueError('Data does not match schema: {}'.format(err))

return await module.async_setup_user(user.id, data)
await module.async_setup_user(user.id, data)

async def async_disable_user_mfa(self, user, mfa_module_id):
async def async_disable_user_mfa(self, user: models.User,
mfa_module_id: str) -> None:
"""Disable a multi-factor auth module for user."""
if mfa_module_id not in self._mfa_modules:
raise ValueError('Unable find multi-factor auth module: {}'
Expand All @@ -251,7 +255,7 @@ async def async_disable_user_mfa(self, user, mfa_module_id):
module = self.get_auth_mfa_module(mfa_module_id)
await module.async_depose_user(user.id)

async def async_get_enabled_mfa(self, user):
async def async_get_enabled_mfa(self, user: models.User) -> List[str]:
"""List enabled mfa modules for user."""
module_ids = []
for module_id, module in self._mfa_modules.items():
Expand Down
126 changes: 64 additions & 62 deletions homeassistant/auth/mfa_modules/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Plugable auth modules for Home Assistant."""
from datetime import timedelta
import importlib
import logging
from datetime import timedelta
import types
from typing import Any, Dict, Optional

import voluptuous as vol
from voluptuous.humanize import humanize_error

from homeassistant import requirements
from homeassistant.const import CONF_ID, CONF_NAME, CONF_TYPE
from homeassistant.core import HomeAssistant
from homeassistant.util.decorator import Registry

MULTI_FACTOR_AUTH_MODULES = Registry()
Expand All @@ -26,112 +29,111 @@
_LOGGER = logging.getLogger(__name__)


async def auth_mfa_module_from_config(hass, config):
"""Initialize an auth module from a config."""
module_name = config[CONF_TYPE]
module = await _load_mfa_module(hass, module_name)

if module is None:
return None

try:
config = module.CONFIG_SCHEMA(config)
except vol.Invalid as err:
_LOGGER.error('Invalid configuration for multi-factor module %s: %s',
module_name, humanize_error(config, err))
return None

return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config)


async def _load_mfa_module(hass, module_name):
"""Load an mfa auth module."""
module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name)

try:
module = importlib.import_module(module_path)
except ImportError:
_LOGGER.warning('Unable to find %s', module_path)
return None

if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'):
return module

processed = hass.data.get(DATA_REQS)
if processed and module_name in processed:
return module

hass.data[DATA_REQS] = set()

req_success = await requirements.async_process_requirements(
hass, module_path, module.REQUIREMENTS)

if not req_success:
return None

processed.add(module_name)
return module


class MultiFactorAuthModule:
"""Multi-factor Auth Module of validation function."""

DEFAULT_TITLE = 'Unnamed auth module'

def __init__(self, hass, config):
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize an auth module."""
self.hass = hass
self.config = config
_LOGGER.debug('auth mfa module %s loaded.',
self.type if self.id is None else "{}[{}]".format(
self.type, self.id
))

@property
def id(self): # pylint: disable=invalid-name
def id(self) -> str: # pylint: disable=invalid-name
"""Return id of the auth module.
Default is same as type
"""
return self.config.get(CONF_ID, self.type)

@property
def type(self):
def type(self) -> str:
"""Return type of the module."""
return self.config[CONF_TYPE]

@property
def name(self):
def name(self) -> str:
"""Return the name of the auth module."""
return self.config.get(CONF_NAME, self.DEFAULT_TITLE)

# Implement by extending class

@property
def input_schema(self):
def input_schema(self) -> vol.Schema:
"""Return a voluptuous schema to define mfa auth module's input."""
raise NotImplementedError

@property
def setup_schema(self):
def setup_schema(self) -> Optional[vol.Schema]:
"""Return a vol schema to validate mfa auth module's setup input.
Optional
"""
return None

async def async_setup_user(self, user_id, setup_data):
async def async_setup_user(self, user_id: str, setup_data: Any) -> None:
"""Set up user for mfa auth module."""
raise NotImplementedError

async def async_depose_user(self, user_id):
async def async_depose_user(self, user_id: str) -> None:
"""Remove user from mfa module."""
raise NotImplementedError

async def async_is_user_setup(self, user_id):
async def async_is_user_setup(self, user_id: str) -> bool:
"""Return whether user is setup."""
raise NotImplementedError

async def async_validation(self, user_id, user_input):
async def async_validation(self, user_id: str, user_input) -> bool:
"""Return True if validation passed."""
raise NotImplementedError


async def auth_mfa_module_from_config(
hass: HomeAssistant, config: Dict[str, Any]) \
-> Optional[MultiFactorAuthModule]:
"""Initialize an auth module from a config."""
module_name = config[CONF_TYPE]
module = await _load_mfa_module(hass, module_name)

if module is None:
return None

try:
config = module.CONFIG_SCHEMA(config) # type: ignore
except vol.Invalid as err:
_LOGGER.error('Invalid configuration for multi-factor module %s: %s',
module_name, humanize_error(config, err))
return None

return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config) # type: ignore


async def _load_mfa_module(hass: HomeAssistant, module_name: str) \
-> Optional[types.ModuleType]:
"""Load an mfa auth module."""
module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name)

try:
module = importlib.import_module(module_path)
except ImportError:
_LOGGER.warning('Unable to find %s', module_path)
return None

if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'):
return module

processed = hass.data.get(DATA_REQS)
if processed and module_name in processed:
return module

hass.data[DATA_REQS] = set()

req_success = await requirements.async_process_requirements(
hass, module_path, module.REQUIREMENTS)

if not req_success:
return None

processed.add(module_name)
return module
17 changes: 10 additions & 7 deletions homeassistant/auth/mfa_modules/insecure_example.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Example auth module."""
import logging
from typing import Any, Dict, Optional

import voluptuous as vol

from homeassistant.core import HomeAssistant

from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \
MULTI_FACTOR_AUTH_MODULE_SCHEMA

Expand All @@ -22,22 +25,22 @@ class InsecureExampleModule(MultiFactorAuthModule):

DEFAULT_TITLE = 'Insecure Personal Identify Number'

def __init__(self, hass, config):
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize the user data store."""
super().__init__(hass, config)
self._data = config['data']

@property
def input_schema(self):
def input_schema(self) -> vol.Schema:
"""Validate login flow input data."""
return vol.Schema({'pin': str})

@property
def setup_schema(self):
def setup_schema(self) -> Optional[vol.Schema]:
"""Validate async_setup_user input data."""
return vol.Schema({'pin': str})

async def async_setup_user(self, user_id, setup_data):
async def async_setup_user(self, user_id: str, setup_data: Any) -> None:
"""Set up user to use mfa module."""
# data shall has been validate in caller
pin = setup_data['pin']
Expand All @@ -50,7 +53,7 @@ async def async_setup_user(self, user_id, setup_data):

self._data.append({'user_id': user_id, 'pin': pin})

async def async_depose_user(self, user_id):
async def async_depose_user(self, user_id: str) -> None:
"""Remove user from mfa module."""
found = None
for data in self._data:
Expand All @@ -60,14 +63,14 @@ async def async_depose_user(self, user_id):
if found:
self._data.remove(found)

async def async_is_user_setup(self, user_id):
async def async_is_user_setup(self, user_id: str) -> bool:
"""Return whether user is setup."""
for data in self._data:
if data['user_id'] == user_id:
return True
return False

async def async_validation(self, user_id, user_input):
async def async_validation(self, user_id: str, user_input) -> bool:
"""Return True if validation passed."""
for data in self._data:
if data['user_id'] == user_id:
Expand Down
24 changes: 13 additions & 11 deletions homeassistant/auth/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from homeassistant.util.decorator import Registry

from ..auth_store import AuthStore
from ..models import Credentials, UserMeta
from ..models import Credentials, User, UserMeta # noqa: F401
from ..mfa_modules import SESSION_EXPIRATION

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -168,10 +168,10 @@ def __init__(self, auth_provider: AuthProvider) -> None:
"""Initialize the login flow."""
self._auth_provider = auth_provider
self._auth_module_id = None
self._auth_manager = auth_provider.hass.auth
self.available_mfa_modules = []
self._auth_manager = auth_provider.hass.auth # type: ignore
self.available_mfa_modules = [] # type: List
self.created_at = dt_util.utcnow()
self.user = None
self.user = None # type: Optional[User]

async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None) \
Expand All @@ -183,7 +183,8 @@ async def async_step_init(
"""
raise NotImplementedError

async def async_step_select_mfa_module(self, user_input=None) \
async def async_step_select_mfa_module(
self, user_input: Optional[Dict[str, str]] = None) \
-> Dict[str, Any]:
"""Handle the step of select mfa module."""
errors = {}
Expand All @@ -208,24 +209,25 @@ async def async_step_select_mfa_module(self, user_input=None) \
errors=errors,
)

async def async_step_mfa(self, user_input=None) -> Dict[str, Any]:
async def async_step_mfa(
self, user_input: Optional[Dict[str, str]] = None) \
-> Dict[str, Any]:
"""Handle the step of mfa validation."""
errors = {}

auth_module = self._auth_manager.get_auth_mfa_module(
auth_module = self._auth_manager.get_auth_mfa_module( # type: ignore
self._auth_module_id)
if auth_module is None:
# Given an invalid input to async_step_select_mfa_module
# will show invalid_auth_module error
return await self.async_step_select_mfa_module(
user_input={'multi_factor_auth_module': None})
return await self.async_step_select_mfa_module(user_input={})

if user_input is not None:
expires = self.created_at + SESSION_EXPIRATION
if dt_util.utcnow() > expires:
errors['base'] = 'login_expired'
else:
result = await auth_module.async_validation(
result = await auth_module.async_validation( # type: ignore
self.user.id, user_input)
if not result:
errors['base'] = 'invalid_auth'
Expand All @@ -235,7 +237,7 @@ async def async_step_mfa(self, user_input=None) -> Dict[str, Any]:

return self.async_show_form(
step_id='mfa',
data_schema=auth_module.input_schema,
data_schema=auth_module.input_schema, # type: ignore
errors=errors,
)

Expand Down

0 comments on commit a1cf221

Please sign in to comment.