Skip to content

Commit

Permalink
fixup! Composer logging/metrics patch
Browse files Browse the repository at this point in the history
Fix silenting warning message comming from Celery by setting broker_connection_retry_on_startup config instead of logging filter.

Internal bug

Change-Id: Iad03078a9874ad366be1a3d5bbdee1d800250f02
GitOrigin-RevId: 074380a681a19785860828641b5e92ab7a6ffed6
  • Loading branch information
Cloud Composer Team committed Sep 16, 2024
1 parent 0ca6cc0 commit 518fec3
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 39 deletions.
16 changes: 11 additions & 5 deletions airflow/composer/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,17 @@ def pod_mutation_hook(pod: k8s.V1Pod):


@celeryd_init.connect
def setup_log_format(**kwargs):
"""Apply same format for Celery logs as we have for all other logs.
From https://github.com/celery/celery/issues/3599.
"""
def setup_logging_on_celeryd_init(**kwargs):
"""Customize Celery logging configuration as per Composer needs."""
from airflow.configuration import conf

# Apply same format for Celery logs as we have for all other logs.
# From https://github.com/celery/celery/issues/3599.
kwargs["conf"].worker_log_format = conf.get("logging", "LOG_FORMAT")

# Set broker_connection_retry_on_startup as broker_connection_retry to suppress CPendingDeprecationWarning
# coming from Celery:
# "The broker_connection_retry configuration setting will no longer determine whether broker connection
# retries are made during startup in Celery 6.0 and above. If you wish to retain the existing behavior for
# retrying connections on startup, you should set broker_connection_retry_on_startup to True."
kwargs["conf"].broker_connection_retry_on_startup = kwargs["conf"].broker_connection_retry
20 changes: 0 additions & 20 deletions airflow/composer/custom_log_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@
r"The '<class 'airflow.providers[a-zA-Z\.]*Hook'>' is missing [a-z_]* attribute and cannot be registered"
)

CELERY_KNOWN_DEPRECATION_WARNING_RE = re2.compile(
r".*CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer "
r"determine whether broker connection retries are made during startup in Celery 6.0 and above.*"
)


def _is_redis_warning(record):
"""Method that detects using Redis as result backend warnings."""
Expand Down Expand Up @@ -79,18 +74,6 @@ def _is_flower_warning(record):
return "Inspect method" in record_message and "failed" in record_message


def _is_celery_known_deprecation_warning(record):
"""Method that detects warnings produced by celery, which are not actionable.
With Celery 5.3.1 we see the following warning: "The broker_connection_retry configuration
setting will no longer determine whether broker connection retries are made during startup
in Celery 6.0 and above. If you wish to retain the existing behavior for retrying connections
on startup, you should set broker_connection_retry_on_startup to True."
This is not useful for Composer users and is no-op for them, so we silence this message.
"""
return CELERY_KNOWN_DEPRECATION_WARNING_RE.match(record.getMessage())


def _is_scheduled_duration_metric_warning(record):
"""Method that detects warnings about scheduled_duration metric, which should not be emitted."""
return "cannot record scheduled_duration for task" in record.getMessage()
Expand Down Expand Up @@ -141,9 +124,6 @@ def filter(self, record):
if _is_flower_warning(record):
return False

if _is_celery_known_deprecation_warning(record):
return False

# This warning is produced in standard use as well, there is an issue for that:
# https://github.com/apache/airflow/issues/34493.
if _is_scheduled_duration_metric_warning(record):
Expand Down
5 changes: 4 additions & 1 deletion tests/composer/test_airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,14 @@ def test_pod_mutation_hook_k8s_executor_long_name(self):
assert pod.metadata.name[:-8] == f"airflow-k8s-worker-{'A' * 35}-"


def test_setup_log_format():
def test_setup_logging_on_celeryd_init():
conf_mock = mock.Mock()
conf_mock.broker_connection_retry = True

signals.celeryd_init.send(
sender=None,
conf=conf_mock,
)

assert conf_mock.worker_log_format == conf.get("logging", "LOG_FORMAT")
assert conf_mock.broker_connection_retry_on_startup is True
13 changes: 0 additions & 13 deletions tests/composer/test_custom_log_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,6 @@ def test_ignoring_flower_warnings(self, flower_warning):

assert flower_warning not in temp_stdout.getvalue()

def test_ignoring_celery_deprecation_warning(self):
celery_warning = (
"CPendingDeprecationWarning: The broker_connection_retry configuration setting "
"will no longer determine whether broker connection retries are made during startup in "
"Celery 6.0 and above. If you wish to retain the existing behavior for retrying "
"connections on startup, you should set broker_connection_retry_on_startup to True."
)
logger = logging.getLogger("celery.worker")
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
logger.warning(celery_warning)

assert celery_warning not in temp_stdout.getvalue()

def test_ignoring_scheduled_duration_metric_warning(self):
logger = logging.getLogger("airflow.tasks")
message = (
Expand Down

0 comments on commit 518fec3

Please sign in to comment.