diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 380d968afa1..8de1d048af2 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1600,6 +1600,20 @@ webserver: type: boolean example: ~ default: "False" + auth_rate_limited: + description: | + Boolean for enabling rate limiting on authentication endpoints. + version_added: 2.6.0 + type: boolean + example: ~ + default: "True" + auth_rate_limit: + description: | + Rate limit for authentication endpoints. + version_added: 2.6.0 + type: string + example: ~ + default: "5 per 40 second" email: description: | diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bac6870f726..771f2f5faf1 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -824,6 +824,12 @@ enable_swagger_ui = True # Boolean for running Internal API in the webserver. run_internal_api = False +# Boolean for enabling rate limiting on authentication endpoints. +auth_rate_limited = True + +# Rate limit for authentication endpoints. +auth_rate_limit = 5 per 40 second + [email] # Configuration email backend and whether to diff --git a/airflow/www/extensions/init_appbuilder.py b/airflow/www/extensions/init_appbuilder.py index fe1ae0938d4..dbd2d939382 100644 --- a/airflow/www/extensions/init_appbuilder.py +++ b/airflow/www/extensions/init_appbuilder.py @@ -125,6 +125,8 @@ def __init__( static_url_path="/appbuilder", security_manager_class=None, update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"), + auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", fallback=True), + auth_rate_limit=conf.get("webserver", "AUTH_RATE_LIMIT", fallback="5 per 40 second"), ): """ App-builder constructor. @@ -146,6 +148,10 @@ def __init__( :param update_perms: optional, update permissions flag (Boolean) you can use FAB_UPDATE_PERMS config key also + :param auth_rate_limited: + optional, rate limit authentication attempts if set to True (defaults to True) + :param auth_rate_limit: + optional, rate limit authentication attempts configuration (defaults "to 5 per 40 second") """ self.baseviews = [] self._addon_managers = [] @@ -158,6 +164,8 @@ def __init__( self.static_url_path = static_url_path self.app = app self.update_perms = update_perms + self.auth_rate_limited = auth_rate_limited + self.auth_rate_limit = auth_rate_limit if app is not None: self.init_app(app, session) @@ -172,10 +180,13 @@ def init_app(self, app, session): app.config.setdefault("APP_ICON", "") app.config.setdefault("LANGUAGES", {"en": {"flag": "gb", "name": "English"}}) app.config.setdefault("ADDON_MANAGERS", []) + app.config.setdefault("RATELIMIT_ENABLED", self.auth_rate_limited) app.config.setdefault("FAB_API_MAX_PAGE_SIZE", 100) app.config.setdefault("FAB_BASE_TEMPLATE", self.base_template) app.config.setdefault("FAB_STATIC_FOLDER", self.static_folder) app.config.setdefault("FAB_STATIC_URL_PATH", self.static_url_path) + app.config.setdefault("AUTH_RATE_LIMITED", self.auth_rate_limited) + app.config.setdefault("AUTH_RATE_LIMIT", self.auth_rate_limit) self.app = app @@ -433,6 +444,7 @@ def add_view( if self.app: self.register_blueprint(baseview) self._add_permission(baseview) + self.add_limits(baseview) self.add_link( name=name, href=href, @@ -563,6 +575,11 @@ def security_converge(self, dry=False) -> dict: """ return self.sm.security_converge(self.baseviews, self.menu, dry) + def get_url_for_login_with(self, next_url: str | None = None) -> str: + if self.sm.auth_view is None: + return "" + return url_for(f"{self.sm.auth_view.endpoint}.{'login'}", next=next_url) + @property def get_url_for_login(self): return url_for(f"{self.sm.auth_view.endpoint}.login") @@ -585,6 +602,10 @@ def get_url_for_locale(self, lang): locale=lang, ) + def add_limits(self, baseview) -> None: + if hasattr(baseview, "limits"): + self.sm.add_limit_view(baseview) + def add_permissions(self, update_perms=False): if self.update_perms or update_perms: for baseview in self.baseviews: @@ -653,4 +674,6 @@ def init_appbuilder(app) -> AirflowAppBuilder: security_manager_class=security_manager_class, base_template="airflow/main.html", update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"), + auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", fallback=True), + auth_rate_limit=conf.get("webserver", "AUTH_RATE_LIMIT", fallback="5 per 40 second"), ) diff --git a/airflow/www/fab_security/manager.py b/airflow/www/fab_security/manager.py index a23315b8141..8f0363186bf 100644 --- a/airflow/www/fab_security/manager.py +++ b/airflow/www/fab_security/manager.py @@ -26,7 +26,7 @@ from typing import Any from uuid import uuid4 -from flask import current_app, g, session, url_for +from flask import Flask, current_app, g, session, url_for from flask_appbuilder import AppBuilder from flask_appbuilder.const import ( AUTH_DB, @@ -67,6 +67,8 @@ ) from flask_babel import lazy_gettext as _ from flask_jwt_extended import JWTManager, current_user as current_user_jwt +from flask_limiter import Limiter +from flask_limiter.util import get_remote_address from flask_login import AnonymousUserMixin, LoginManager, current_user from werkzeug.security import check_password_hash, generate_password_hash @@ -252,6 +254,10 @@ def __init__(self, appbuilder): app.config.setdefault("AUTH_LDAP_LASTNAME_FIELD", "sn") app.config.setdefault("AUTH_LDAP_EMAIL_FIELD", "mail") + # Rate limiting + app.config.setdefault("AUTH_RATE_LIMITED", True) + app.config.setdefault("AUTH_RATE_LIMIT", "5 per 40 second") + if self.auth_type == AUTH_OID: from flask_openid import OpenID @@ -280,6 +286,14 @@ def __init__(self, appbuilder): # Setup Flask-Jwt-Extended self.jwt_manager = self.create_jwt_manager(app) + # Setup Flask-Limiter + self.limiter = self.create_limiter(app) + + def create_limiter(self, app: Flask) -> Limiter: + limiter = Limiter(key_func=get_remote_address) + limiter.init_app(app) + return limiter + def create_login_manager(self, app) -> LoginManager: """ Override to implement your custom login manager instance @@ -514,6 +528,14 @@ def oauth_providers(self): """Oauth providers""" return self.appbuilder.get_app.config["OAUTH_PROVIDERS"] + @property + def is_auth_limited(self) -> bool: + return self.appbuilder.get_app.config["AUTH_RATE_LIMITED"] + + @property + def auth_rate_limit(self) -> str: + return self.appbuilder.get_app.config["AUTH_RATE_LIMIT"] + @property def current_user(self): """Current user object""" @@ -750,6 +772,11 @@ def register_views(self): self.appbuilder.add_view_no_menu(self.auth_view) + # this needs to be done after the view is added, otherwise the blueprint + # is not initialized + if self.is_auth_limited: + self.limiter.limit(self.auth_rate_limit, methods=["POST"])(self.auth_view.blueprint) + self.user_view = self.appbuilder.add_view( self.user_view, "List Users", @@ -1397,6 +1424,24 @@ def get_user_menu_access(self, menu_names: list[str] | None = None) -> set[str]: else: return self._get_user_permission_resources(None, "menu_access", resource_names=menu_names) + def add_limit_view(self, baseview): + if not baseview.limits: + return + + for limit in baseview.limits: + self.limiter.limit( + limit_value=limit.limit_value, + key_func=limit.key_func, + per_method=limit.per_method, + methods=limit.methods, + error_message=limit.error_message, + exempt_when=limit.exempt_when, + override_defaults=limit.override_defaults, + deduct_when=limit.deduct_when, + on_breach=limit.on_breach, + cost=limit.cost, + )(baseview.blueprint) + def add_permissions_view(self, base_action_names, resource_name): # Keep name for compatibility with FAB. """ Adds an action on a resource to the backend diff --git a/docs/apache-airflow/administration-and-deployment/security/webserver.rst b/docs/apache-airflow/administration-and-deployment/security/webserver.rst index 7e3cf3b6dfb..d5e551f8b15 100644 --- a/docs/apache-airflow/administration-and-deployment/security/webserver.rst +++ b/docs/apache-airflow/administration-and-deployment/security/webserver.rst @@ -263,3 +263,24 @@ certs and keys. ssl_key = ssl_cert = ssl_cacert = + +Rate limiting +------------- + +Airflow can be configured to limit the number of authentication requests in a given time window. We are using +`Flask-Limiter `_ to achieve that and by default Airflow +uses per-webserver default limit of 5 requests per 40 second fixed window. By default no common storage for +rate limits is used between the gunicorn processes you run so rate-limit is applied separately for each process, +so assuming random distribution of the requests by gunicorn with single webserver instance and default 4 +gunicorn workers, the effective rate limit is 5 x 4 = 20 requests per 40 second window (more or less). +However you can configure the rate limit to be shared between the processes by using rate limit storage via +setting the ``RATELIMIT_*`` configuration settings in ``webserver_config.py``. +For example, to use Redis as a rate limit storage you can use the following configuration (you need +to set ``redis_host`` to your Redis instance) + +``` +RATELIMIT_STORAGE_URI = 'redis://redis_host:6379/0 +``` + +You can also configure other rate limit settings in ``webserver_config.py`` - for more details, see the +`Flask Limiter rate limit configuration `_. diff --git a/docs/apache-airflow/howto/set-config.rst b/docs/apache-airflow/howto/set-config.rst index 0f7cf585040..02fa9bac2dc 100644 --- a/docs/apache-airflow/howto/set-config.rst +++ b/docs/apache-airflow/howto/set-config.rst @@ -130,3 +130,15 @@ the example below. generated using the secret key has a short expiry time though - make sure that time on ALL the machines that you run airflow components on is synchronized (for example using ntpd) otherwise you might get "forbidden" errors when the logs are accessed. + + +Configuring Flask Application for Airflow Webserver +=================================================== + +Airflow uses Flask to render the web UI. When you initialize the Airflow webserver, predefined configuration +is used, based on the ``webserver`` section of the ``airflow.cfg`` file. You can override these settings +and add any extra settings however by adding flask configuration to ``webserver_config.py`` file in your +``$AIRFLOW_HOME`` directory. This file is automatically loaded by the webserver. + +For example if you would like to change rate limit strategy to "moving window", you can set the +``RATELIMIT_STRATEGY`` to ``moving-window``. diff --git a/setup.cfg b/setup.cfg index c4cb3f08092..8c52f566cf5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -93,7 +93,7 @@ install_requires = # `airflow/www/fab_security` with their upstream counterparts. In particular, make sure any breaking changes, # for example any new methods, are accounted for. # NOTE! When you change the value here, you also have to update flask-appbuilder[oauth] in setup.py - flask-appbuilder==4.1.4 + flask-appbuilder==4.3.0 flask-caching>=1.5.0 flask-login>=0.6.2 flask-session>=0.4.0 diff --git a/setup.py b/setup.py index 24859ea2ceb..f29a923c769 100644 --- a/setup.py +++ b/setup.py @@ -278,7 +278,7 @@ def write_version(filename: str = str(AIRFLOW_SOURCES_ROOT / "airflow" / "git_ve flask_appbuilder_oauth = [ "authlib>=1.0.0", # The version here should be upgraded at the same time as flask-appbuilder in setup.cfg - "flask-appbuilder[oauth]==4.1.4", + "flask-appbuilder[oauth]==4.3.0", ] kerberos = [ "pykerberos>=1.1.13", diff --git a/tests/conftest.py b/tests/conftest.py index 680bee6f6c1..0b4c2931fc6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -447,9 +447,12 @@ def fake_sleep(seconds): @pytest.fixture(scope="session") def app(): - from airflow.www import app + from tests.test_utils.config import conf_vars - return app.create_app(testing=True) + with conf_vars({("webserver", "auth_rate_limited"): "False"}): + from airflow.www import app + + yield app.create_app(testing=True) @pytest.fixture diff --git a/tests/test_utils/www.py b/tests/test_utils/www.py index d46e3f69ee5..ea0295af2c7 100644 --- a/tests/test_utils/www.py +++ b/tests/test_utils/www.py @@ -22,13 +22,13 @@ from airflow.models import Log -def client_with_login(app, **kwargs): +def client_with_login(app, expected_response_code=302, **kwargs): patch_path = "airflow.www.fab_security.manager.check_password_hash" with mock.patch(patch_path) as check_password_hash: check_password_hash.return_value = True client = app.test_client() resp = client.post("/login/", data=kwargs) - assert resp.status_code == 302 + assert resp.status_code == expected_response_code return client diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py index e04b5ec96ee..209a106df6a 100644 --- a/tests/www/views/conftest.py +++ b/tests/www/views/conftest.py @@ -28,6 +28,7 @@ from airflow.models import DagBag from airflow.www.app import create_app from tests.test_utils.api_connexion_utils import delete_user +from tests.test_utils.config import conf_vars from tests.test_utils.decorators import dont_initialize_flask_app_submodules from tests.test_utils.www import client_with_login, client_without_login @@ -62,7 +63,8 @@ def app(examples_dag_bag): ] ) def factory(): - return create_app(testing=True) + with conf_vars({("webserver", "auth_rate_limited"): "False"}): + return create_app(testing=True) app = factory() app.config["WTF_CSRF_ENABLED"] = False @@ -110,6 +112,7 @@ def factory(): @pytest.fixture() def admin_client(app): + return client_with_login(app, username="test_admin", password="test_admin") diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index 6939ed9dca3..207e9a3d89f 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -73,7 +73,12 @@ def log_app(backup_modules, log_path): "init_api_connexion", ] ) - @conf_vars({("logging", "logging_config_class"): "airflow_local_settings.LOGGING_CONFIG"}) + @conf_vars( + { + ("logging", "logging_config_class"): "airflow_local_settings.LOGGING_CONFIG", + ("webserver", "auth_rate_limited"): "False", + } + ) def factory(): app = create_app(testing=True) app.config["WTF_CSRF_ENABLED"] = False diff --git a/tests/www/views/test_views_rate_limit.py b/tests/www/views/test_views_rate_limit.py new file mode 100644 index 00000000000..150d38e59e0 --- /dev/null +++ b/tests/www/views/test_views_rate_limit.py @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.www.app import create_app +from tests.test_utils.config import conf_vars +from tests.test_utils.decorators import dont_initialize_flask_app_submodules +from tests.test_utils.www import client_with_login + + +@pytest.fixture() +def app_with_rate_limit_one(examples_dag_bag): + @dont_initialize_flask_app_submodules( + skip_all_except=[ + "init_api_connexion", + "init_appbuilder", + "init_appbuilder_links", + "init_appbuilder_views", + "init_flash_views", + "init_jinja_globals", + "init_plugins", + "init_airflow_session_interface", + "init_check_user_active", + ] + ) + def factory(): + with conf_vars( + {("webserver", "auth_rate_limited"): "True", ("webserver", "auth_rate_limit"): "1 per 20 second"} + ): + return create_app(testing=True) + + app = factory() + app.config["WTF_CSRF_ENABLED"] = False + return app + + +def test_rate_limit_one(app_with_rate_limit_one): + client_with_login( + app_with_rate_limit_one, expected_response_code=302, username="test_admin", password="test_admin" + ) + client_with_login( + app_with_rate_limit_one, expected_response_code=429, username="test_admin", password="test_admin" + ) + client_with_login( + app_with_rate_limit_one, expected_response_code=429, username="test_admin", password="test_admin" + ) + + +def test_rate_limit_disabled(app): + client_with_login(app, expected_response_code=302, username="test_admin", password="test_admin") + client_with_login(app, expected_response_code=302, username="test_admin", password="test_admin") + client_with_login(app, expected_response_code=302, username="test_admin", password="test_admin")