-
-
Notifications
You must be signed in to change notification settings - Fork 31.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Time-based Onetime Password Multi-factor Auth
Add TOTP setup flow, generate QR code
- Loading branch information
Showing
10 changed files
with
435 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
"""Time-based One Time Password auth module.""" | ||
import logging | ||
from typing import Any, Dict, Optional, Tuple # noqa: F401 | ||
|
||
import voluptuous as vol | ||
|
||
from homeassistant.auth.models import User | ||
from homeassistant.core import HomeAssistant | ||
from homeassistant.loader import bind_hass | ||
|
||
from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \ | ||
MULTI_FACTOR_AUTH_MODULE_SCHEMA, SetupFlow | ||
|
||
REQUIREMENTS = ['pyotp==2.2.6', 'PyQRCode==1.2.1', 'pypng==0.0.18'] | ||
|
||
CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend({ | ||
}, extra=vol.PREVENT_EXTRA) | ||
|
||
STORAGE_VERSION = 1 | ||
STORAGE_KEY = 'auth_module.totp' | ||
STORAGE_USERS = 'users' | ||
STORAGE_USER_ID = 'user_id' | ||
STORAGE_OTA_SECRET = 'ota_secret' | ||
|
||
INPUT_FIELD_CODE = 'code' | ||
|
||
DUMMY_SECRET = 'FPPTH34D4E3MI2HG' | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def _generate_qr_code(data: str) -> str: | ||
"""Generate a base64 PNG string represent QR Code image of data.""" | ||
import pyqrcode | ||
|
||
qr_code = pyqrcode.create(data) | ||
return str(qr_code.png_as_base64_str(scale=4)) | ||
|
||
|
||
@bind_hass | ||
async def _async_generate_secret(hass: HomeAssistant, username: str) \ | ||
-> Tuple[str, str, str]: | ||
"""Generate a secret, url, and QR code.""" | ||
def generate_secret_helper(username: str) -> Tuple[str, str, str]: | ||
"""Generate a secret, url, and QR code.""" | ||
import pyotp | ||
|
||
ota_secret = pyotp.random_base32() | ||
url = pyotp.totp.TOTP(ota_secret).provisioning_uri( | ||
username, issuer_name="Home Assistant") | ||
image = _generate_qr_code(url) | ||
return ota_secret, url, image | ||
|
||
return await hass.async_add_executor_job( | ||
generate_secret_helper, username) | ||
|
||
|
||
@MULTI_FACTOR_AUTH_MODULES.register('totp') | ||
class TotpAuthModule(MultiFactorAuthModule): | ||
"""Auth module validate time-based one time password.""" | ||
|
||
DEFAULT_TITLE = 'Time-based One Time Password' | ||
|
||
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None: | ||
"""Initialize the user data store.""" | ||
super().__init__(hass, config) | ||
self._users = None # type: Optional[Dict[str, str]] | ||
self._user_store = hass.helpers.storage.Store( | ||
STORAGE_VERSION, STORAGE_KEY) | ||
|
||
@property | ||
def input_schema(self) -> vol.Schema: | ||
"""Validate login flow input data.""" | ||
return vol.Schema({INPUT_FIELD_CODE: str}) | ||
|
||
async def _async_load(self) -> None: | ||
"""Load stored data.""" | ||
data = await self._user_store.async_load() | ||
|
||
if data is None: | ||
data = {STORAGE_USERS: {}} | ||
|
||
self._users = data.get(STORAGE_USERS, {}) | ||
|
||
async def _async_save(self) -> None: | ||
"""Save data.""" | ||
await self._user_store.async_save({STORAGE_USERS: self._users}) | ||
|
||
def _add_ota_secret(self, user_id: str, | ||
secret: Optional[str] = None) -> str: | ||
"""Create a ota_secret for user.""" | ||
import pyotp | ||
|
||
ota_secret = secret or pyotp.random_base32() # type: str | ||
|
||
self._users[user_id] = ota_secret # type: ignore | ||
return ota_secret | ||
|
||
async def async_setup_flow(self, user_id: str) -> SetupFlow: | ||
"""Return a data entry flow handler for setup module. | ||
Mfa module should extend SetupFlow | ||
""" | ||
user = await self.hass.auth.async_get_user(user_id) # type: ignore | ||
return TotpSetupFlow(self, user) | ||
|
||
async def async_setup_user(self, user_id: str, setup_data: Any) -> str: | ||
"""Set up auth module for user.""" | ||
if self._users is None: | ||
await self._async_load() | ||
|
||
result = await self.hass.async_add_executor_job( | ||
self._add_ota_secret, user_id, setup_data.get('secret')) | ||
|
||
await self._async_save() | ||
return result | ||
|
||
async def async_depose_user(self, user_id: str) -> None: | ||
"""Depose auth module for user.""" | ||
if self._users is None: | ||
await self._async_load() | ||
|
||
if self._users.pop(user_id, None): # type: ignore | ||
await self._async_save() | ||
|
||
async def async_is_user_setup(self, user_id: str) -> bool: | ||
"""Return whether user is setup.""" | ||
if self._users is None: | ||
await self._async_load() | ||
|
||
return user_id in self._users # type: ignore | ||
|
||
async def async_validation( | ||
self, user_id: str, user_input: Dict[str, Any]) -> bool: | ||
"""Return True if validation passed.""" | ||
if self._users is None: | ||
await self._async_load() | ||
|
||
# user_input has been validate in caller | ||
return await self.hass.async_add_executor_job( | ||
self._validate_2fa, user_id, user_input[INPUT_FIELD_CODE]) | ||
|
||
def _validate_2fa(self, user_id: str, code: str) -> bool: | ||
"""Validate two factor authentication code.""" | ||
import pyotp | ||
|
||
ota_secret = self._users.get(user_id) # type: ignore | ||
if ota_secret is None: | ||
# even we cannot find user, we still do verify | ||
# to make timing the same as if user was found. | ||
pyotp.TOTP(DUMMY_SECRET).verify(code) | ||
return False | ||
|
||
return bool(pyotp.TOTP(ota_secret).verify(code)) | ||
|
||
|
||
class TotpSetupFlow(SetupFlow): | ||
"""Handler for the setup flow.""" | ||
|
||
def __init__(self, auth_module: TotpAuthModule, | ||
user: User) -> None: | ||
"""Initialize the setup flow.""" | ||
super().__init__(auth_module, user.id) | ||
self._auth_module = auth_module # type: TotpAuthModule | ||
self._user = user | ||
self._ota_secret = None # type: Optional[str] | ||
self._url = None # type Optional[str] | ||
self._image = None # type Optional[str] | ||
|
||
async def async_step_init( | ||
self, user_input: Optional[Dict[str, str]] = None) \ | ||
-> Dict[str, Any]: | ||
"""Handle the first step of setup flow. | ||
Return self.async_show_form(step_id='init') if user_input == None. | ||
Return self.async_create_entry(data={'result': result}) if finish. | ||
""" | ||
import pyotp | ||
|
||
errors = {} # type: Dict[str, str] | ||
|
||
if user_input: | ||
verified = await self.hass.async_add_executor_job( # type: ignore | ||
pyotp.TOTP(self._ota_secret).verify, user_input['code']) | ||
if verified: | ||
result = await self._auth_module.async_setup_user( | ||
self._user_id, {'secret': self._ota_secret}) | ||
return self.async_create_entry( | ||
title=self._auth_module.name, | ||
data={'result': result} | ||
) | ||
|
||
errors['base'] = 'invalid_code' | ||
|
||
else: | ||
self._ota_secret, self._url, self._image = ( # type: ignore | ||
await _async_generate_secret( | ||
self._auth_module.hass, str(self._user.name))) | ||
|
||
return self.async_show_form( | ||
step_id='init', | ||
data_schema=self._auth_module.input_schema, | ||
description_placeholders={ | ||
'code': self._ota_secret, | ||
'url': self._url, | ||
'qr_code': self._image | ||
}, | ||
errors=errors | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
{ | ||
"mfa_setup":{ | ||
"totp": { | ||
"title": "TOTP", | ||
"step": { | ||
"init": { | ||
"title": "Scan this QR code with your app", | ||
"description": "Scan the QR code with your authentication app, such as **Google Authenticator** or **Authy**. \nOr enter {code} instead. \n\n[QR Code]{url}", | ||
}, | ||
}, | ||
"error": { | ||
"invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock on Home Assistant system is accurate." | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
"""Auth component utils.""" | ||
from functools import wraps | ||
|
||
from homeassistant.components import websocket_api | ||
from homeassistant.core import HomeAssistant | ||
|
||
|
||
def validate_current_user( | ||
only_owner=False, only_system_user=False, allow_system_user=True, | ||
only_active_user=True, only_inactive_user=False): | ||
"""Decorate function validating login user exist in current WS connection. | ||
Will write out error message if not authenticated. | ||
""" | ||
def validator(func): | ||
"""Decorate func.""" | ||
@wraps(func) | ||
def check_current_user(hass: HomeAssistant, | ||
connection: websocket_api.ActiveConnection, | ||
msg): | ||
"""Check current user.""" | ||
def output_error(message_id, message): | ||
"""Output error message.""" | ||
connection.send_message_outside(websocket_api.error_message( | ||
msg['id'], message_id, message)) | ||
|
||
if connection.user is None: | ||
output_error('no_user', 'Not authenticated as a user') | ||
return | ||
|
||
if only_owner and not connection.user.is_owner: | ||
output_error('only_owner', 'Only allowed as owner') | ||
return | ||
|
||
if (only_system_user and | ||
not connection.user.system_generated): | ||
output_error('only_system_user', | ||
'Only allowed as system user') | ||
return | ||
|
||
if (not allow_system_user | ||
and connection.user.system_generated): | ||
output_error('not_system_user', 'Not allowed as system user') | ||
return | ||
|
||
if (only_active_user and | ||
not connection.user.is_active): | ||
output_error('only_active_user', | ||
'Only allowed as active user') | ||
return | ||
|
||
if only_inactive_user and connection.user.is_active: | ||
output_error('only_inactive_user', | ||
'Not allowed as active user') | ||
return | ||
|
||
return func(hass, connection, msg) | ||
|
||
return check_current_user | ||
|
||
return validator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.