Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cache to Variable and Connection when called at dag parsing time #30259

Merged
merged 29 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c09b83d
global secret cache
vandonr-amz Mar 21, 2023
215ce99
add invalidation mechanism
vandonr-amz Mar 22, 2023
a9fdb7a
fix tests
vandonr-amz Mar 22, 2023
0e99bb0
write new tests
vandonr-amz Mar 23, 2023
5f69849
have cache active only at dag parsing time
vandonr-amz Mar 29, 2023
2b5a178
make cache disabled by default
vandonr-amz Apr 5, 2023
416cda9
Merge remote-tracking branch 'origin/main' into vandonr/wip
vandonr-amz Apr 5, 2023
643d1d9
add option to doc
vandonr-amz Apr 5, 2023
37c59e8
target airflow v2.6
vandonr-amz Apr 5, 2023
a66142b
fix static check
vandonr-amz Apr 5, 2023
e3a0eb9
edit best practices doc to mention the Variables cache
vandonr-amz Apr 5, 2023
cae3818
fix doc ref
vandonr-amz Apr 6, 2023
717567c
add experimental tag to config options
vandonr-amz Apr 6, 2023
041bd80
Merge remote-tracking branch 'origin/main' into vandonr/wip
vandonr-amz Apr 6, 2023
2807825
Merge branch 'main' into vandonr/wip
vandonr-amz Apr 11, 2023
8b6ed39
Merge remote-tracking branch 'origin/main' into vandonr/wip
vandonr-amz Apr 24, 2023
65e9aab
capitalize docstring (new static check?)
vandonr-amz Apr 24, 2023
3340e32
Merge remote-tracking branch 'origin/main' into vandonr/wip
vandonr-amz May 9, 2023
2db7bd1
Merge remote-tracking branch 'origin/main' into vandonr/wip
vandonr-amz May 18, 2023
b0d9d34
Merge remote-tracking branch 'origin/main' into vandonr/wip
vandonr-amz Jul 20, 2023
27ca59c
extend cache to cover connections
vandonr-amz Jul 20, 2023
8dc1812
fix test
vandonr-amz Jul 20, 2023
83eea27
Merge remote-tracking branch 'origin/main' into vandonr/wip
vandonr-amz Jul 31, 2023
1f1e586
rephrase doc to mention that the feature is experimental
vandonr-amz Jul 31, 2023
13b4041
Merge branch 'main' into vandonr/wip
vandonr-amz Jul 31, 2023
ff89eb2
Merge branch 'main' into vandonr/wip
vandonr-amz Jul 31, 2023
3b8d178
Merge branch 'main' into vandonr/wip
vandonr-amz Aug 1, 2023
cba8b8d
rename NotPresent to NotPresentException
vandonr-amz Aug 2, 2023
c7ee505
remove ellipsis
vandonr-amz Aug 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,31 @@ secrets:
sensitive: true
example: ~
default: ""
use_cache:
description: |
.. note:: |experimental|

Enables local caching of Variables, when parsing DAGs only.
potiuk marked this conversation as resolved.
Show resolved Hide resolved
Using this option can make dag parsing faster if Variables are used in top level code, at the expense
of longer propagation time for changes.
Please note that this cache concerns only the DAG parsing step. There is no caching in place when DAG
tasks are run.
version_added: 2.6.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 2.6.0
version_added: 2.8.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 2.6.0
version_added: 2.7.0

type: boolean
example: ~
default: "False"
cache_ttl_seconds:
description: |
.. note:: |experimental|

When the cache is enabled, this is the duration for which we consider an entry in the cache to be
valid. Entries are refreshed if they are older than this many seconds.
It means that when the cache is enabled, this is the maximum amount of time you need to wait to see a
Variable change take effect.
version_added: 2.6.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 2.6.0
version_added: 2.8.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
version_added: 2.6.0
version_added: 2.7.0

type: integer
example: ~
default: "900"
cli:
description: ~
options:
Expand Down
5 changes: 5 additions & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.serialized_dag import SerializedDagModel
from airflow.secrets.cache import SecretCache
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths, might_contain_dag
Expand Down Expand Up @@ -1073,6 +1074,10 @@ def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_req

def start_new_processes(self):
"""Start more processors if we have enough slots and files to process."""
# initialize cache to mutualize calls to Variable.get in DAGs
# needs to be done before this process is forked to create the DAG parsing processes.
SecretCache.init()

while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.popleft()
# Stop creating duplicate processor i.e. processor with the same filepath
Expand Down
11 changes: 11 additions & 0 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from airflow.exceptions import AirflowException, AirflowNotFoundException, RemovedInAirflow3Warning
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
from airflow.secrets.cache import SecretCache
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.secrets_masker import mask_secret
from airflow.utils.module_loading import import_string
Expand Down Expand Up @@ -451,10 +452,20 @@ def get_connection_from_secrets(cls, conn_id: str) -> Connection:
:param conn_id: connection id
:return: connection
"""
# check cache first
# enabled only if SecretCache.init() has been called first
try:
uri = SecretCache.get_connection_uri(conn_id)
return Connection(conn_id=conn_id, uri=uri)
except SecretCache.NotPresentException:
pass # continue business

# iterate over backends if not in cache (or expired)
for secrets_backend in ensure_secrets_loaded():
try:
conn = secrets_backend.get_connection(conn_id=conn_id)
if conn:
SecretCache.save_connection_uri(conn_id, conn.get_uri())
return conn
except Exception:
log.exception(
Expand Down
24 changes: 21 additions & 3 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.configuration import ensure_secrets_loaded
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
from airflow.secrets.cache import SecretCache
from airflow.secrets.metastore import MetastoreBackend
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.secrets_masker import mask_secret
Expand Down Expand Up @@ -175,6 +176,10 @@ def set(
Variable.delete(key, session=session)
session.add(Variable(key=key, val=stored_value, description=description))
session.flush()
# invalidate key in cache for faster propagation
# we cannot save the value set because it's possible that it's shadowed by a custom backend
# (see call to check_for_write_conflict above)
SecretCache.invalidate_variable(key)

@staticmethod
@provide_session
Expand Down Expand Up @@ -210,7 +215,9 @@ def delete(key: str, session: Session = None) -> int:

:param key: Variable Keys
"""
return session.execute(delete(Variable).where(Variable.key == key)).rowcount
rows = session.execute(delete(Variable).where(Variable.key == key)).rowcount
SecretCache.invalidate_variable(key)
return rows

def rotate_fernet_key(self):
"""Rotate Fernet Key."""
Expand Down Expand Up @@ -256,15 +263,26 @@ def get_variable_from_secrets(key: str) -> str | None:
:param key: Variable Key
:return: Variable Value
"""
# check cache first
# enabled only if SecretCache.init() has been called first
try:
return SecretCache.get_variable(key)
except SecretCache.NotPresentException:
pass # continue business

var_val = None
# iterate over backends if not in cache (or expired)
for secrets_backend in ensure_secrets_loaded():
try:
var_val = secrets_backend.get_variable(key=key)
if var_val is not None:
return var_val
break
except Exception:
log.exception(
"Unable to retrieve variable from secrets backend (%s). "
"Checking subsequent secrets backend.",
type(secrets_backend).__name__,
)
return None

SecretCache.save_variable(key, var_val) # we save None as well
return var_val
127 changes: 127 additions & 0 deletions airflow/secrets/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import datetime
import multiprocessing

from airflow.configuration import conf


class SecretCache:
"""A static class to manage the global secret cache."""

__manager: multiprocessing.managers.SyncManager | None = None
_cache: dict[str, _CacheValue] | None = None
_ttl: datetime.timedelta

class NotPresentException(Exception):
"""Raised when a key is not present in the cache."""

...
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved

class _CacheValue:
def __init__(self, value: str | None) -> None:
self.value = value
self.date = datetime.datetime.utcnow()

def is_expired(self, ttl: datetime.timedelta) -> bool:
return datetime.datetime.utcnow() - self.date > ttl

_VARIABLE_PREFIX = "__v_"
_CONNECTION_PREFIX = "__c_"

@classmethod
def init(cls):
"""Initializes the cache, provided the configuration allows it. Safe to call several times."""
if cls._cache is not None:
return
use_cache = conf.getboolean(section="secrets", key="use_cache", fallback=False)
if not use_cache:
return
if cls.__manager is None:
# it is not really necessary to save the manager, but doing so allows to reuse it between tests,
# making them run a lot faster because this operation takes ~300ms each time
cls.__manager = multiprocessing.Manager()
cls._cache = cls.__manager.dict()
ttl_seconds = conf.getint(section="secrets", key="cache_ttl_seconds", fallback=15 * 60)
cls._ttl = datetime.timedelta(seconds=ttl_seconds)

@classmethod
def reset(cls):
"""For test purposes only."""
cls._cache = None

@classmethod
def get_variable(cls, key: str) -> str | None:
"""
Tries to get the value associated with the key from the cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: some parameters are missing from the docstring

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add it, but it'd look like :param key: the key, only adding clutter imho.
Also, it's not enforced by static checks, so my feeling is that if we feel that adding docstring for params is useless, we shouldn't do it.


:return: The saved value (which can be None) if present in cache and not expired,
a NotPresent exception otherwise.
"""
return cls._get(key, cls._VARIABLE_PREFIX)

@classmethod
def get_connection_uri(cls, conn_id: str) -> str:
"""
Tries to get the uri associated with the conn_id from the cache.

:return: The saved uri if present in cache and not expired,
a NotPresent exception otherwise.
"""
val = cls._get(conn_id, cls._CONNECTION_PREFIX)
if val: # there shouldn't be any empty entries in the connections cache, but we enforce it here.
return val
raise cls.NotPresentException

@classmethod
def _get(cls, key: str, prefix: str) -> str | None:
if cls._cache is None:
# using an exception for misses allow to meaningfully cache None values
raise cls.NotPresentException

val = cls._cache.get(f"{prefix}{key}")
if val and not val.is_expired(cls._ttl):
return val.value
raise cls.NotPresentException

@classmethod
def save_variable(cls, key: str, value: str | None):
"""Saves the value for that key in the cache, if initialized."""
cls._save(key, value, cls._VARIABLE_PREFIX)

@classmethod
def save_connection_uri(cls, conn_id: str, uri: str):
"""Saves the uri representation for that connection in the cache, if initialized."""
if uri is None:
# connections raise exceptions if not present, so we shouldn't have any None value to save.
return
cls._save(conn_id, uri, cls._CONNECTION_PREFIX)

@classmethod
def _save(cls, key: str, value: str | None, prefix: str):
if cls._cache is not None:
cls._cache[f"{prefix}{key}"] = cls._CacheValue(value)

@classmethod
def invalidate_variable(cls, key: str):
"""Invalidates (actually removes) the value stored in the cache for that Variable."""
if cls._cache is not None:
# second arg ensures no exception if key is absent
cls._cache.pop(f"{cls._VARIABLE_PREFIX}{key}", None)
23 changes: 15 additions & 8 deletions docs/apache-airflow/best-practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,13 @@ Some cases of dynamic DAG generation are described in the :doc:`howto/dynamic-da
Airflow Variables
-----------------

As mentioned in the previous chapter, :ref:`best_practices/top_level_code`. you should avoid
using Airflow Variables at top level Python code of DAGs. You can use the Airflow Variables freely inside the
``execute()`` methods of the operators, but you can also pass the Airflow Variables to the existing operators
via Jinja template, which will delay reading the value until the task execution.
Using Airflow Variables yields network calls and database access, so their usage in top-level Python code for DAGs
should be avoided as much as possible, as mentioned in the previous chapter, :ref:`best_practices/top_level_code`.
If Airflow Variables must be used in top-level DAG code, then their impact on DAG parsing can be mitigated by
:ref:`enabling the experimental cache<config:secrets__use_cache>`, configured with a sensible :ref:`ttl<config:secrets__cache_ttl_seconds>`.

You can use the Airflow Variables freely inside the ``execute()`` methods of the operators, but you can also pass the
Airflow Variables to the existing operators via Jinja template, which will delay reading the value until the task execution.

The template syntax to do this is:

Expand All @@ -217,28 +220,32 @@ or if you need to deserialize a json object from the variable :

{{ var.json.<variable_name> }}

In top-level code, variables using jinja templates do not produce a request until a task is running, whereas, ``Variable.get()`` produces a request every time the dag file is parsed by the scheduler. Using ``Variable.get()`` will lead to suboptimal performance in the dag file processing. In some cases this can cause the dag file to timeout before it is fully parsed.
In top-level code, variables using jinja templates do not produce a request until a task is running, whereas,
``Variable.get()`` produces a request every time the dag file is parsed by the scheduler if caching is not enabled.
Using ``Variable.get()`` without :ref:`enabling caching<config:secrets__use_cache>` will lead to suboptimal
performance in the dag file processing.
In some cases this can cause the dag file to timeout before it is fully parsed.

Bad example:

.. code-block:: python

from airflow.models import Variable

foo_var = Variable.get("foo") # DON'T DO THAT
foo_var = Variable.get("foo") # AVOID THAT
bash_use_variable_bad_1 = BashOperator(
task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)

bash_use_variable_bad_2 = BashOperator(
task_id="bash_use_variable_bad_2",
bash_command=f"echo variable foo=${Variable.get('foo')}", # DON'T DO THAT
bash_command=f"echo variable foo=${Variable.get('foo')}", # AVOID THAT
)

bash_use_variable_bad_3 = BashOperator(
task_id="bash_use_variable_bad_3",
bash_command="echo variable foo=${foo_env}",
env={"foo_env": Variable.get("foo")}, # DON'T DO THAT
env={"foo_env": Variable.get("foo")}, # AVOID THAT
)


Expand Down
8 changes: 8 additions & 0 deletions tests/always/test_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@

from airflow.configuration import ensure_secrets_loaded, initialize_secrets_backends
from airflow.models import Connection, Variable
from airflow.secrets.cache import SecretCache
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_variables


class TestConnectionsFromSecrets:
def setup_method(self) -> None:
SecretCache.reset()

@mock.patch("airflow.secrets.metastore.MetastoreBackend.get_connection")
@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_connection")
def test_get_connection_second_try(self, mock_env_get, mock_meta_get):
Expand Down Expand Up @@ -114,6 +118,7 @@ def test_backend_fallback_to_env_var(self, mock_get_connection):
class TestVariableFromSecrets:
def setup_method(self) -> None:
clear_db_variables()
SecretCache.reset()

def teardown_method(self) -> None:
clear_db_variables()
Expand All @@ -126,7 +131,10 @@ def test_get_variable_second_try(self, mock_env_get, mock_meta_get):
Metastore DB
"""
mock_env_get.return_value = None
mock_meta_get.return_value = "val"

Variable.get_variable_from_secrets("fake_var_key")

mock_meta_get.assert_called_once_with(key="fake_var_key")
mock_env_get.assert_called_once_with(key="fake_var_key")

Expand Down
Loading