Skip to content

Commit

Permalink
Upgrade FAB to 4.3.0 (#29766)
Browse files Browse the repository at this point in the history
FAB 4.3.0 added rate limiting and we would like to upgrade to this
version.

This requires to bring some of the changes from the PRs merged in
Flask App Builder:

* dpgaspar/Flask-AppBuilder#1976
* dpgaspar/Flask-AppBuilder#1997
* dpgaspar/Flask-AppBuilder#1999

While Flask App Builder disabled rate limitig by default, Airlfow
is "end product" using it and we decided to enable it.

GitOrigin-RevId: b7f05008ae45fc208456f7f64c19d08ad1cf7313
  • Loading branch information
potiuk authored and ahidalgob committed Nov 7, 2023
1 parent aed7788 commit 673e264
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 9 deletions.
14 changes: 14 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,20 @@ webserver:
type: boolean
example: ~
default: "False"
auth_rate_limited:
description: |
Boolean for enabling rate limiting on authentication endpoints.
version_added: 2.6.0
type: boolean
example: ~
default: "True"
auth_rate_limit:
description: |
Rate limit for authentication endpoints.
version_added: 2.6.0
type: string
example: ~
default: "5 per 40 second"

email:
description: |
Expand Down
6 changes: 6 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,12 @@ enable_swagger_ui = True
# Boolean for running Internal API in the webserver.
run_internal_api = False

# Boolean for enabling rate limiting on authentication endpoints.
auth_rate_limited = True

# Rate limit for authentication endpoints.
auth_rate_limit = 5 per 40 second

[email]

# Configuration email backend and whether to
Expand Down
23 changes: 23 additions & 0 deletions airflow/www/extensions/init_appbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def __init__(
static_url_path="/appbuilder",
security_manager_class=None,
update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"),
auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", fallback=True),
auth_rate_limit=conf.get("webserver", "AUTH_RATE_LIMIT", fallback="5 per 40 second"),
):
"""
App-builder constructor.
Expand All @@ -146,6 +148,10 @@ def __init__(
:param update_perms:
optional, update permissions flag (Boolean) you can use
FAB_UPDATE_PERMS config key also
:param auth_rate_limited:
optional, rate limit authentication attempts if set to True (defaults to True)
:param auth_rate_limit:
optional, rate limit authentication attempts configuration (defaults "to 5 per 40 second")
"""
self.baseviews = []
self._addon_managers = []
Expand All @@ -158,6 +164,8 @@ def __init__(
self.static_url_path = static_url_path
self.app = app
self.update_perms = update_perms
self.auth_rate_limited = auth_rate_limited
self.auth_rate_limit = auth_rate_limit
if app is not None:
self.init_app(app, session)

Expand All @@ -172,10 +180,13 @@ def init_app(self, app, session):
app.config.setdefault("APP_ICON", "")
app.config.setdefault("LANGUAGES", {"en": {"flag": "gb", "name": "English"}})
app.config.setdefault("ADDON_MANAGERS", [])
app.config.setdefault("RATELIMIT_ENABLED", self.auth_rate_limited)
app.config.setdefault("FAB_API_MAX_PAGE_SIZE", 100)
app.config.setdefault("FAB_BASE_TEMPLATE", self.base_template)
app.config.setdefault("FAB_STATIC_FOLDER", self.static_folder)
app.config.setdefault("FAB_STATIC_URL_PATH", self.static_url_path)
app.config.setdefault("AUTH_RATE_LIMITED", self.auth_rate_limited)
app.config.setdefault("AUTH_RATE_LIMIT", self.auth_rate_limit)

self.app = app

Expand Down Expand Up @@ -433,6 +444,7 @@ def add_view(
if self.app:
self.register_blueprint(baseview)
self._add_permission(baseview)
self.add_limits(baseview)
self.add_link(
name=name,
href=href,
Expand Down Expand Up @@ -563,6 +575,11 @@ def security_converge(self, dry=False) -> dict:
"""
return self.sm.security_converge(self.baseviews, self.menu, dry)

def get_url_for_login_with(self, next_url: str | None = None) -> str:
if self.sm.auth_view is None:
return ""
return url_for(f"{self.sm.auth_view.endpoint}.{'login'}", next=next_url)

@property
def get_url_for_login(self):
return url_for(f"{self.sm.auth_view.endpoint}.login")
Expand All @@ -585,6 +602,10 @@ def get_url_for_locale(self, lang):
locale=lang,
)

def add_limits(self, baseview) -> None:
if hasattr(baseview, "limits"):
self.sm.add_limit_view(baseview)

def add_permissions(self, update_perms=False):
if self.update_perms or update_perms:
for baseview in self.baseviews:
Expand Down Expand Up @@ -653,4 +674,6 @@ def init_appbuilder(app) -> AirflowAppBuilder:
security_manager_class=security_manager_class,
base_template="airflow/main.html",
update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"),
auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", fallback=True),
auth_rate_limit=conf.get("webserver", "AUTH_RATE_LIMIT", fallback="5 per 40 second"),
)
47 changes: 46 additions & 1 deletion airflow/www/fab_security/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from typing import Any
from uuid import uuid4

from flask import current_app, g, session, url_for
from flask import Flask, current_app, g, session, url_for
from flask_appbuilder import AppBuilder
from flask_appbuilder.const import (
AUTH_DB,
Expand Down Expand Up @@ -67,6 +67,8 @@
)
from flask_babel import lazy_gettext as _
from flask_jwt_extended import JWTManager, current_user as current_user_jwt
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_login import AnonymousUserMixin, LoginManager, current_user
from werkzeug.security import check_password_hash, generate_password_hash

Expand Down Expand Up @@ -252,6 +254,10 @@ def __init__(self, appbuilder):
app.config.setdefault("AUTH_LDAP_LASTNAME_FIELD", "sn")
app.config.setdefault("AUTH_LDAP_EMAIL_FIELD", "mail")

# Rate limiting
app.config.setdefault("AUTH_RATE_LIMITED", True)
app.config.setdefault("AUTH_RATE_LIMIT", "5 per 40 second")

if self.auth_type == AUTH_OID:
from flask_openid import OpenID

Expand Down Expand Up @@ -280,6 +286,14 @@ def __init__(self, appbuilder):
# Setup Flask-Jwt-Extended
self.jwt_manager = self.create_jwt_manager(app)

# Setup Flask-Limiter
self.limiter = self.create_limiter(app)

def create_limiter(self, app: Flask) -> Limiter:
limiter = Limiter(key_func=get_remote_address)
limiter.init_app(app)
return limiter

def create_login_manager(self, app) -> LoginManager:
"""
Override to implement your custom login manager instance
Expand Down Expand Up @@ -514,6 +528,14 @@ def oauth_providers(self):
"""Oauth providers"""
return self.appbuilder.get_app.config["OAUTH_PROVIDERS"]

@property
def is_auth_limited(self) -> bool:
return self.appbuilder.get_app.config["AUTH_RATE_LIMITED"]

@property
def auth_rate_limit(self) -> str:
return self.appbuilder.get_app.config["AUTH_RATE_LIMIT"]

@property
def current_user(self):
"""Current user object"""
Expand Down Expand Up @@ -750,6 +772,11 @@ def register_views(self):

self.appbuilder.add_view_no_menu(self.auth_view)

# this needs to be done after the view is added, otherwise the blueprint
# is not initialized
if self.is_auth_limited:
self.limiter.limit(self.auth_rate_limit, methods=["POST"])(self.auth_view.blueprint)

self.user_view = self.appbuilder.add_view(
self.user_view,
"List Users",
Expand Down Expand Up @@ -1397,6 +1424,24 @@ def get_user_menu_access(self, menu_names: list[str] | None = None) -> set[str]:
else:
return self._get_user_permission_resources(None, "menu_access", resource_names=menu_names)

def add_limit_view(self, baseview):
if not baseview.limits:
return

for limit in baseview.limits:
self.limiter.limit(
limit_value=limit.limit_value,
key_func=limit.key_func,
per_method=limit.per_method,
methods=limit.methods,
error_message=limit.error_message,
exempt_when=limit.exempt_when,
override_defaults=limit.override_defaults,
deduct_when=limit.deduct_when,
on_breach=limit.on_breach,
cost=limit.cost,
)(baseview.blueprint)

def add_permissions_view(self, base_action_names, resource_name): # Keep name for compatibility with FAB.
"""
Adds an action on a resource to the backend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,24 @@ certs and keys.
ssl_key = <path to key>
ssl_cert = <path to cert>
ssl_cacert = <path to cacert>
Rate limiting
-------------

Airflow can be configured to limit the number of authentication requests in a given time window. We are using
`Flask-Limiter <https://flask-limiter.readthedocs.io/en/stable/>`_ to achieve that and by default Airflow
uses per-webserver default limit of 5 requests per 40 second fixed window. By default no common storage for
rate limits is used between the gunicorn processes you run so rate-limit is applied separately for each process,
so assuming random distribution of the requests by gunicorn with single webserver instance and default 4
gunicorn workers, the effective rate limit is 5 x 4 = 20 requests per 40 second window (more or less).
However you can configure the rate limit to be shared between the processes by using rate limit storage via
setting the ``RATELIMIT_*`` configuration settings in ``webserver_config.py``.
For example, to use Redis as a rate limit storage you can use the following configuration (you need
to set ``redis_host`` to your Redis instance)

```
RATELIMIT_STORAGE_URI = 'redis://redis_host:6379/0
```

You can also configure other rate limit settings in ``webserver_config.py`` - for more details, see the
`Flask Limiter rate limit configuration <https://flask-limiter.readthedocs.io/en/stable/configuration.html>`_.
12 changes: 12 additions & 0 deletions docs/apache-airflow/howto/set-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,15 @@ the example below.
generated using the secret key has a short expiry time though - make sure that time on ALL the machines
that you run airflow components on is synchronized (for example using ntpd) otherwise you might get
"forbidden" errors when the logs are accessed.


Configuring Flask Application for Airflow Webserver
===================================================

Airflow uses Flask to render the web UI. When you initialize the Airflow webserver, predefined configuration
is used, based on the ``webserver`` section of the ``airflow.cfg`` file. You can override these settings
and add any extra settings however by adding flask configuration to ``webserver_config.py`` file in your
``$AIRFLOW_HOME`` directory. This file is automatically loaded by the webserver.

For example if you would like to change rate limit strategy to "moving window", you can set the
``RATELIMIT_STRATEGY`` to ``moving-window``.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ install_requires =
# `airflow/www/fab_security` with their upstream counterparts. In particular, make sure any breaking changes,
# for example any new methods, are accounted for.
# NOTE! When you change the value here, you also have to update flask-appbuilder[oauth] in setup.py
flask-appbuilder==4.1.4
flask-appbuilder==4.3.0
flask-caching>=1.5.0
flask-login>=0.6.2
flask-session>=0.4.0
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def write_version(filename: str = str(AIRFLOW_SOURCES_ROOT / "airflow" / "git_ve
flask_appbuilder_oauth = [
"authlib>=1.0.0",
# The version here should be upgraded at the same time as flask-appbuilder in setup.cfg
"flask-appbuilder[oauth]==4.1.4",
"flask-appbuilder[oauth]==4.3.0",
]
kerberos = [
"pykerberos>=1.1.13",
Expand Down
7 changes: 5 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,12 @@ def fake_sleep(seconds):

@pytest.fixture(scope="session")
def app():
from airflow.www import app
from tests.test_utils.config import conf_vars

return app.create_app(testing=True)
with conf_vars({("webserver", "auth_rate_limited"): "False"}):
from airflow.www import app

yield app.create_app(testing=True)


@pytest.fixture
Expand Down
4 changes: 2 additions & 2 deletions tests/test_utils/www.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
from airflow.models import Log


def client_with_login(app, **kwargs):
def client_with_login(app, expected_response_code=302, **kwargs):
patch_path = "airflow.www.fab_security.manager.check_password_hash"
with mock.patch(patch_path) as check_password_hash:
check_password_hash.return_value = True
client = app.test_client()
resp = client.post("/login/", data=kwargs)
assert resp.status_code == 302
assert resp.status_code == expected_response_code
return client


Expand Down
5 changes: 4 additions & 1 deletion tests/www/views/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airflow.models import DagBag
from airflow.www.app import create_app
from tests.test_utils.api_connexion_utils import delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.decorators import dont_initialize_flask_app_submodules
from tests.test_utils.www import client_with_login, client_without_login

Expand Down Expand Up @@ -62,7 +63,8 @@ def app(examples_dag_bag):
]
)
def factory():
return create_app(testing=True)
with conf_vars({("webserver", "auth_rate_limited"): "False"}):
return create_app(testing=True)

app = factory()
app.config["WTF_CSRF_ENABLED"] = False
Expand Down Expand Up @@ -110,6 +112,7 @@ def factory():

@pytest.fixture()
def admin_client(app):

return client_with_login(app, username="test_admin", password="test_admin")


Expand Down
7 changes: 6 additions & 1 deletion tests/www/views/test_views_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ def log_app(backup_modules, log_path):
"init_api_connexion",
]
)
@conf_vars({("logging", "logging_config_class"): "airflow_local_settings.LOGGING_CONFIG"})
@conf_vars(
{
("logging", "logging_config_class"): "airflow_local_settings.LOGGING_CONFIG",
("webserver", "auth_rate_limited"): "False",
}
)
def factory():
app = create_app(testing=True)
app.config["WTF_CSRF_ENABLED"] = False
Expand Down
Loading

0 comments on commit 673e264

Please sign in to comment.