diff --git a/ckan/lib/authenticator.py b/ckan/lib/authenticator.py index 8ef5941aa54..fe25fb64e5d 100644 --- a/ckan/lib/authenticator.py +++ b/ckan/lib/authenticator.py @@ -1,8 +1,8 @@ -# encoding: utf-8 +from __future__ import annotations import logging -import ckan.plugins as plugins -from typing import Any, Mapping, Optional +from ckan import model, plugins +from typing import Any from ckan.common import g, request from ckan.lib import captcha @@ -12,7 +12,9 @@ log = logging.getLogger(__name__) -def default_authenticate(identity: 'Mapping[str, Any]') -> Optional["User"]: +def default_authenticate( + identity: dict[str, Any] +) -> model.User | model.AnonymousUser | None: if not ('login' in identity and 'password' in identity): return None @@ -47,7 +49,9 @@ def default_authenticate(identity: 'Mapping[str, Any]') -> Optional["User"]: return None -def ckan_authenticator(identity: 'Mapping[str, Any]') -> Optional["User"]: +def ckan_authenticator( + identity: dict[str, Any] +) -> model.User | model.AnonymousUser | None: """Allows extensions that have implemented `IAuthenticator.authenticate()` to hook into the CKAN authentication process with a custom implementation. diff --git a/ckan/plugins/interfaces.py b/ckan/plugins/interfaces.py index de0bea22bff..fbc1063d347 100644 --- a/ckan/plugins/interfaces.py +++ b/ckan/plugins/interfaces.py @@ -1750,13 +1750,16 @@ def abort( return (status_code, detail, headers, comment) def authenticate( - self, identity: 'Mapping[str, Any]' - ) -> model.User | None: + self, identity: dict[str, Any] + ) -> model.User | model.AnonymousUser | None: """Called before the authentication starts (that is after clicking the login button) - Plugins should return a user object if the authentication was - successful, or ``None``` otherwise. + Plugins should return: + + * `model.User` object if the authentication was successful + * `model.AnonymousUser` object if the authentication failed + * `None` to try authentication with different implementations. """ diff --git a/ckanext/example_iauthenticator/plugin.py b/ckanext/example_iauthenticator/plugin.py index 1ba9edab093..5afcc961a58 100644 --- a/ckanext/example_iauthenticator/plugin.py +++ b/ckanext/example_iauthenticator/plugin.py @@ -1,8 +1,9 @@ -# encoding: utf-8 +from __future__ import annotations +from typing import Any from flask import Blueprint, make_response -from ckan import plugins as p +from ckan import model, plugins as p toolkit = p.toolkit @@ -52,6 +53,14 @@ def logout(self): return toolkit.redirect_to(u'example_iauthenticator.custom_logout') + def authenticate( + self, identity: dict[str, Any] + ) -> model.User | model.AnonymousUser | None: + if identity.get("use_fallback"): + return None + + return model.AnonymousUser() + # IBlueprint def get_blueprint(self): diff --git a/ckanext/example_iauthenticator/tests/test_example_iauthenticator.py b/ckanext/example_iauthenticator/tests/test_example_iauthenticator.py index f5e5e4eb071..af3b387bc28 100644 --- a/ckanext/example_iauthenticator/tests/test_example_iauthenticator.py +++ b/ckanext/example_iauthenticator/tests/test_example_iauthenticator.py @@ -1,8 +1,12 @@ # encoding: utf-8 +from typing import Any import pytest +from faker import Faker import ckan.plugins as p +from ckan import model +from ckan.lib.authenticator import ckan_authenticator toolkit = p.toolkit @@ -36,3 +40,35 @@ def test_logout_redirects(self, app): assert resp.status_code == 302 assert resp.headers[u'Location'] == toolkit.url_for(u'example_iauthenticator.custom_logout', _external=True) + + @pytest.mark.usefixtures("with_request_context") + def test_fallback_authentication(self, user_factory: Any, faker: Faker): + """If IAuthenticator.authenticate returns `None`, application tries + other authenticators. + + """ + + password = faker.password() + user = user_factory(password=password) + result = ckan_authenticator({ + "login": user["name"], + "password": password, + "use_fallback": True + }) + assert result is not None + assert result.name == user["name"] + + @pytest.mark.usefixtures("with_request_context") + def test_rejected_authentication(self, user_factory: Any, faker: Faker): + """If IAuthenticator.authenticate returns `model.AnonymousUser`, + application ignores other implementations. + + """ + + password = faker.password() + user = user_factory(password=password) + result = ckan_authenticator({ + "login": user["name"], + "password": password + }) + assert isinstance(result, model.AnonymousUser)