-
-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(account): Encapsulated email verification logic
- Loading branch information
Showing
13 changed files
with
204 additions
and
176 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,105 +1,66 @@ | ||
import time | ||
from typing import Any, Dict, Optional, Tuple | ||
|
||
from django.contrib.auth import get_user_model | ||
from django.contrib import messages | ||
from django.http import HttpRequest | ||
from django.urls import reverse | ||
|
||
from allauth.account import app_settings | ||
from allauth.account import app_settings, signals | ||
from allauth.account.adapter import get_adapter | ||
from allauth.account.internal.flows.login_by_code import compare_code | ||
from allauth.account.models import EmailAddress, EmailConfirmationMixin | ||
from allauth.core import context | ||
|
||
|
||
EMAIL_VERIFICATION_CODE_SESSION_KEY = "account_email_verification_code" | ||
|
||
|
||
class EmailVerificationModel(EmailConfirmationMixin): | ||
def __init__(self, email_address: EmailAddress, key: Optional[str] = None): | ||
self.email_address = email_address | ||
if not key: | ||
key = request_email_verification_code( | ||
context.request, user=email_address.user, email=email_address.email | ||
) | ||
self.key = key | ||
from allauth.account.models import EmailAddress | ||
from allauth.core.internal.httpkit import get_frontend_url | ||
from allauth.utils import build_absolute_uri | ||
|
||
@classmethod | ||
def create(cls, email_address: EmailAddress): | ||
return EmailVerificationModel(email_address) | ||
|
||
@classmethod | ||
def from_key(cls, key): | ||
verification, _ = get_pending_verification(context.request, peek=True) | ||
if not verification or not compare_code(actual=key, expected=verification.key): | ||
return None | ||
return verification | ||
def verify_email(request: HttpRequest, email_address: EmailAddress) -> bool: | ||
""" | ||
Marks the email address as confirmed on the db | ||
""" | ||
from allauth.account.models import EmailAddress | ||
from allauth.account.utils import emit_email_changed | ||
|
||
def key_expired(self): | ||
return False | ||
|
||
|
||
def request_email_verification_code( | ||
request: HttpRequest, | ||
user, | ||
email: str, | ||
) -> str: | ||
code = "" | ||
pending_verification = { | ||
"at": time.time(), | ||
"failed_attempts": 0, | ||
"email": email, | ||
} | ||
pretend = user is None | ||
if not pretend: | ||
adapter = get_adapter() | ||
code = adapter.generate_email_verification_code() | ||
assert user._meta.pk | ||
pending_verification.update( | ||
{ | ||
"user_id": user._meta.pk.value_to_string(user), | ||
"email": email, | ||
"code": code, | ||
} | ||
from_email_address = ( | ||
EmailAddress.objects.filter(user_id=email_address.user_id) | ||
.exclude(pk=email_address.pk) | ||
.first() | ||
) | ||
if not email_address.set_verified(commit=False): | ||
get_adapter(request).add_message( | ||
request, | ||
messages.ERROR, | ||
"account/messages/email_confirmation_failed.txt", | ||
{"email": email_address.email}, | ||
) | ||
request.session[EMAIL_VERIFICATION_CODE_SESSION_KEY] = pending_verification | ||
return code | ||
return False | ||
email_address.set_as_primary(conditional=(not app_settings.CHANGE_EMAIL)) | ||
email_address.save() | ||
if app_settings.CHANGE_EMAIL: | ||
for instance in EmailAddress.objects.filter( | ||
user_id=email_address.user_id | ||
).exclude(pk=email_address.pk): | ||
instance.remove() | ||
emit_email_changed(request, from_email_address, email_address) | ||
|
||
signals.email_confirmed.send( | ||
sender=EmailAddress, | ||
request=request, | ||
email_address=email_address, | ||
) | ||
get_adapter(request).add_message( | ||
request, | ||
messages.SUCCESS, | ||
"account/messages/email_confirmed.txt", | ||
{"email": email_address.email}, | ||
) | ||
return True | ||
|
||
def get_pending_verification( | ||
request: HttpRequest, peek: bool = False | ||
) -> Tuple[Optional[EmailVerificationModel], Optional[Dict[str, Any]]]: | ||
if peek: | ||
data = request.session.get(EMAIL_VERIFICATION_CODE_SESSION_KEY) | ||
else: | ||
data = request.session.pop(EMAIL_VERIFICATION_CODE_SESSION_KEY, None) | ||
if not data: | ||
return None, None | ||
if time.time() - data["at"] >= app_settings.EMAIL_VERIFICATION_BY_CODE_TIMEOUT: | ||
request.session.pop(EMAIL_VERIFICATION_CODE_SESSION_KEY, None) | ||
return None, None | ||
if user_id_str := data.get("user_id"): | ||
user_id = get_user_model()._meta.pk.to_python(user_id_str) # type: ignore[union-attr] | ||
user = get_user_model().objects.get(pk=user_id) | ||
email = data["email"] | ||
try: | ||
email_address = EmailAddress.objects.get_for_user(user, email) | ||
except EmailAddress.DoesNotExist: | ||
email_address = EmailAddress(user=user, email=email) | ||
verification = EmailVerificationModel(email_address, key=data["code"]) | ||
else: | ||
verification = None | ||
return verification, data | ||
|
||
def get_email_verification_url(request: HttpRequest, emailconfirmation) -> str: | ||
"""Constructs the email confirmation (activation) url. | ||
def record_invalid_attempt( | ||
request: HttpRequest, pending_verification: Dict[str, Any] | ||
) -> bool: | ||
n = pending_verification["failed_attempts"] | ||
n += 1 | ||
pending_verification["failed_attempts"] = n | ||
if n >= app_settings.EMAIL_VERIFICATION_BY_CODE_MAX_ATTEMPTS: | ||
request.session.pop(EMAIL_VERIFICATION_CODE_SESSION_KEY, None) | ||
return False | ||
else: | ||
request.session[EMAIL_VERIFICATION_CODE_SESSION_KEY] = pending_verification | ||
return True | ||
Note that if you have architected your system such that email | ||
confirmations are sent outside of the request context `request` | ||
can be `None` here. | ||
""" | ||
url = get_frontend_url(request, "account_confirm_email", key=emailconfirmation.key) | ||
if not url: | ||
url = reverse("account_confirm_email", args=[emailconfirmation.key]) | ||
url = build_absolute_uri(request, url) | ||
return url |
105 changes: 105 additions & 0 deletions
105
allauth/account/internal/flows/email_verification_by_code.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import time | ||
from typing import Any, Dict, Optional, Tuple | ||
|
||
from django.contrib.auth import get_user_model | ||
from django.http import HttpRequest | ||
|
||
from allauth.account import app_settings | ||
from allauth.account.adapter import get_adapter | ||
from allauth.account.internal.flows.login_by_code import compare_code | ||
from allauth.account.models import EmailAddress, EmailConfirmationMixin | ||
from allauth.core import context | ||
|
||
|
||
EMAIL_VERIFICATION_CODE_SESSION_KEY = "account_email_verification_code" | ||
|
||
|
||
class EmailVerificationModel(EmailConfirmationMixin): | ||
def __init__(self, email_address: EmailAddress, key: Optional[str] = None): | ||
self.email_address = email_address | ||
if not key: | ||
key = request_email_verification_code( | ||
context.request, user=email_address.user, email=email_address.email | ||
) | ||
self.key = key | ||
|
||
@classmethod | ||
def create(cls, email_address: EmailAddress): | ||
return EmailVerificationModel(email_address) | ||
|
||
@classmethod | ||
def from_key(cls, key): | ||
verification, _ = get_pending_verification(context.request, peek=True) | ||
if not verification or not compare_code(actual=key, expected=verification.key): | ||
return None | ||
return verification | ||
|
||
def key_expired(self): | ||
return False | ||
|
||
|
||
def request_email_verification_code( | ||
request: HttpRequest, | ||
user, | ||
email: str, | ||
) -> str: | ||
code = "" | ||
pending_verification = { | ||
"at": time.time(), | ||
"failed_attempts": 0, | ||
"email": email, | ||
} | ||
pretend = user is None | ||
if not pretend: | ||
adapter = get_adapter() | ||
code = adapter.generate_email_verification_code() | ||
assert user._meta.pk | ||
pending_verification.update( | ||
{ | ||
"user_id": user._meta.pk.value_to_string(user), | ||
"email": email, | ||
"code": code, | ||
} | ||
) | ||
request.session[EMAIL_VERIFICATION_CODE_SESSION_KEY] = pending_verification | ||
return code | ||
|
||
|
||
def get_pending_verification( | ||
request: HttpRequest, peek: bool = False | ||
) -> Tuple[Optional[EmailVerificationModel], Optional[Dict[str, Any]]]: | ||
if peek: | ||
data = request.session.get(EMAIL_VERIFICATION_CODE_SESSION_KEY) | ||
else: | ||
data = request.session.pop(EMAIL_VERIFICATION_CODE_SESSION_KEY, None) | ||
if not data: | ||
return None, None | ||
if time.time() - data["at"] >= app_settings.EMAIL_VERIFICATION_BY_CODE_TIMEOUT: | ||
request.session.pop(EMAIL_VERIFICATION_CODE_SESSION_KEY, None) | ||
return None, None | ||
if user_id_str := data.get("user_id"): | ||
user_id = get_user_model()._meta.pk.to_python(user_id_str) # type: ignore[union-attr] | ||
user = get_user_model().objects.get(pk=user_id) | ||
email = data["email"] | ||
try: | ||
email_address = EmailAddress.objects.get_for_user(user, email) | ||
except EmailAddress.DoesNotExist: | ||
email_address = EmailAddress(user=user, email=email) | ||
verification = EmailVerificationModel(email_address, key=data["code"]) | ||
else: | ||
verification = None | ||
return verification, data | ||
|
||
|
||
def record_invalid_attempt( | ||
request: HttpRequest, pending_verification: Dict[str, Any] | ||
) -> bool: | ||
n = pending_verification["failed_attempts"] | ||
n += 1 | ||
pending_verification["failed_attempts"] = n | ||
if n >= app_settings.EMAIL_VERIFICATION_BY_CODE_MAX_ATTEMPTS: | ||
request.session.pop(EMAIL_VERIFICATION_CODE_SESSION_KEY, None) | ||
return False | ||
else: | ||
request.session[EMAIL_VERIFICATION_CODE_SESSION_KEY] = pending_verification | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.