From e60845d798a15a2df1fbf3d0c4bbbb8aab784816 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Sun, 14 Jul 2024 01:24:26 -0700 Subject: [PATCH 1/7] Update metrics names to allow multiple executors to report metrics --- airflow/executors/base_executor.py | 62 ++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 098bb93215e3a..25ff602867cbb 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -30,6 +30,7 @@ from airflow.cli.cli_config import DefaultHelpParser from airflow.configuration import conf from airflow.exceptions import RemovedInAirflow3Warning +from airflow.executors.executor_loader import ExecutorLoader from airflow.models import Log from airflow.stats import Stats from airflow.traces import NO_TRACE_ID @@ -238,44 +239,65 @@ def heartbeat(self) -> None: num_running_tasks = len(self.running) num_queued_tasks = len(self.queued_tasks) - self.log.debug("%s running task instances", num_running_tasks) - self.log.debug("%s in queue", num_queued_tasks) - if open_slots == 0: - self.log.info("Executor parallelism limit reached. 0 open slots.") - else: - self.log.debug("%s open slots", open_slots) + self._emit_metrics(open_slots, num_running_tasks, num_queued_tasks) + self.trigger_tasks(open_slots) + + # Calling child class sync method + self.log.debug("Calling the %s sync method", self.__class__) + self.sync() + + def _emit_metrics(self, open_slots, num_running_tasks, num_queued_tasks): + """ + Emit metrics relevant to the Executor. + + In the case of multiple executors being configured, the metric names include the name of + executor to differentiate them from metrics from other executors. + If only one executor is configured, the metric names will not be changed. + """ + name = self.__class__.__name__ + multiple_executors_configured = len(ExecutorLoader.get_executor_names()) > 1 + if multiple_executors_configured: + metric_suffix = name + + open_slots_metric_name = f"executor.open_slots.{metric_suffix}" if multiple_executors_configured else "executor.open_slots" + queued_tasks_metric_name = f"executor.queued_tasks.{metric_suffix}" if multiple_executors_configured else "executor.queued_tasks" + running_tasks_metric_name = f"executor.running_tasks.{metric_suffix}" if multiple_executors_configured else "executor.running_tasks" + span = Trace.get_current_span() if span.is_recording(): span.add_event( name="executor", attributes={ - "executor.open_slots": open_slots, - "executor.queued_tasks": num_queued_tasks, - "executor.running_tasks": num_running_tasks, + open_slots_metric_name: open_slots, + queued_tasks_metric_name: num_queued_tasks, + running_tasks_metric_name: num_running_tasks, }, ) + self.log.debug("%s running task instances for executor %s", num_running_tasks, name) + self.log.debug("%s in queue for executor %s", num_queued_tasks, name) + if open_slots == 0: + self.log.info("Executor parallelism limit reached. 0 open slots.") + else: + self.log.debug("%s open slots for executor %s", open_slots, name) + Stats.gauge( - "executor.open_slots", value=open_slots, tags={"status": "open", "name": self.__class__.__name__} + open_slots_metric_name, + value=open_slots, + tags={"status": "open", "name": name}, ) Stats.gauge( - "executor.queued_tasks", + queued_tasks_metric_name, value=num_queued_tasks, - tags={"status": "queued", "name": self.__class__.__name__}, + tags={"status": "queued", "name": name}, ) Stats.gauge( - "executor.running_tasks", + running_tasks_metric_name, value=num_running_tasks, - tags={"status": "running", "name": self.__class__.__name__}, + tags={"status": "running", "name": name}, ) - self.trigger_tasks(open_slots) - - # Calling child class sync method - self.log.debug("Calling the %s sync method", self.__class__) - self.sync() - def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, QueuedTaskInstanceType]]: """ Orders the queued tasks by priority. From 1c44bcecc8086a56ce5ab48c7e296f525f01bd4e Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Wed, 17 Jul 2024 11:36:20 -0700 Subject: [PATCH 2/7] Update metrics.rst and helm chart to include new metrics format --- chart/files/statsd-mappings.yml | 15 +++++++++++++++ .../logging-monitoring/metrics.rst | 3 +++ 2 files changed, 18 insertions(+) diff --git a/chart/files/statsd-mappings.yml b/chart/files/statsd-mappings.yml index 14d95df7319c7..86d773fd20b7f 100644 --- a/chart/files/statsd-mappings.yml +++ b/chart/files/statsd-mappings.yml @@ -91,3 +91,18 @@ mappings: name: "airflow_pool_starving_tasks" labels: pool: "$1" + + - match: airflow.executor.open_slots.* + name: "airflow_executor_open_slots" + labels: + executor: "$1" + + - match: airflow.executor.queued_tasks.* + name: "airflow_executor_queued_tasks" + labels: + executor: "$1" + + - match: airflow.executor.running_tasks.* + name: "airflow_executor_running_tasks" + labels: + executor: "$1" diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index 49ba7cf422fbd..82597712a8abc 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -228,8 +228,11 @@ Name Description ``scheduler.tasks.executable`` Number of tasks that are ready for execution (set to queued) with respect to pool limits, DAG concurrency, executor state, and priority. +``executor.open_slots.`` Number of open slots on a specific executor. Only emitted when multiple executors are configured. ``executor.open_slots`` Number of open slots on executor +``executor.queued_tasks.`` Number of queued tasks on on a specific executor. Only emitted when multiple executors are configured. ``executor.queued_tasks`` Number of queued tasks on executor +``executor.running_tasks.`` Number of running tasks on on a specific executor. Only emitted when multiple executors are configured. ``executor.running_tasks`` Number of running tasks on executor ``pool.open_slots.`` Number of open slots in the pool ``pool.open_slots`` Number of open slots in the pool. Metric with pool_name tagging. From c8a4365b743f5020738b2928b5f21b1065a02473 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Wed, 17 Jul 2024 15:30:13 -0700 Subject: [PATCH 3/7] Add Unit test to ensure correct metrics names are being emitted depending on whether multiple executors are configured --- tests/executors/test_base_executor.py | 60 ++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index e6dc057ca576c..6c07cc5f50b30 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -30,6 +30,8 @@ from airflow.cli.cli_config import DefaultHelpParser, GroupCommand from airflow.cli.cli_parser import AirflowHelpFormatter from airflow.executors.base_executor import BaseExecutor, RunningRetryAttemptType +from airflow.executors.local_executor import LocalExecutor +from airflow.executors.sequential_executor import SequentialExecutor from airflow.models.baseoperator import BaseOperator from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.utils import timezone @@ -120,7 +122,7 @@ def test_fail_and_success(): @mock.patch("airflow.executors.base_executor.BaseExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") @mock.patch("airflow.executors.base_executor.Stats.gauge") -def test_gauge_executor_metrics(mock_stats_gauge, mock_trigger_tasks, mock_sync): +def test_gauge_executor_metrics_single_executor(mock_stats_gauge, mock_trigger_tasks, mock_sync): executor = BaseExecutor() executor.heartbeat() calls = [ @@ -133,6 +135,62 @@ def test_gauge_executor_metrics(mock_stats_gauge, mock_trigger_tasks, mock_sync) mock_stats_gauge.assert_has_calls(calls) +@mock.patch("airflow.executors.local_executor.LocalExecutor.sync") +@mock.patch("airflow.executors.sequential_executor.SequentialExecutor.sync") +@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") +@mock.patch("airflow.executors.base_executor.Stats.gauge") +@mock.patch("airflow.executors.executor_loader.ExecutorLoader.get_executor_names") +def test_gauge_executor_metrics_with_multiple_executors( + mock_get_executor_names, + mock_stats_gauge, + mock_trigger_tasks, + mock_sequential_sync, + mock_local_sync, +): + mock_get_executor_names.return_value = ["LocalExecutor", "AwsEcsExecutor"] + local_executor = LocalExecutor() + local_executor.heartbeat() + calls = [ + mock.call( + "executor.open_slots.LocalExecutor", + value=mock.ANY, + tags={"status": "open", "name": "LocalExecutor"}, + ), + mock.call( + "executor.queued_tasks.LocalExecutor", + value=mock.ANY, + tags={"status": "queued", "name": "LocalExecutor"}, + ), + mock.call( + "executor.running_tasks.LocalExecutor", + value=mock.ANY, + tags={"status": "running", "name": "LocalExecutor"}, + ), + ] + mock_stats_gauge.assert_has_calls(calls) + + sequential_executor = SequentialExecutor() + sequential_executor.heartbeat() + calls = [ + mock.call( + "executor.open_slots.SequentialExecutor", + value=mock.ANY, + tags={"status": "open", "name": "SequentialExecutor"}, + ), + mock.call( + "executor.queued_tasks.SequentialExecutor", + value=mock.ANY, + tags={"status": "queued", "name": "SequentialExecutor"}, + ), + mock.call( + "executor.running_tasks.SequentialExecutor", + value=mock.ANY, + tags={"status": "running", "name": "SequentialExecutor"}, + ), + ] + mock_stats_gauge.assert_has_calls(calls) + + @mock.patch("airflow.executors.base_executor.BaseExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") @mock.patch("airflow.executors.base_executor.Stats.gauge") From 0b7c9f20d68eaacd4c3415bf47bfda2db2e51773 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Wed, 17 Jul 2024 15:39:45 -0700 Subject: [PATCH 4/7] Convert unit test to parametrized format --- tests/executors/test_base_executor.py | 44 +++++++++------------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index 6c07cc5f50b30..9a7636615fcd7 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -135,6 +135,10 @@ def test_gauge_executor_metrics_single_executor(mock_stats_gauge, mock_trigger_t mock_stats_gauge.assert_has_calls(calls) +@pytest.mark.parametrize( + "executor_class, executor_name", + [(LocalExecutor, "LocalExecutor"), (SequentialExecutor, "SequentialExecutor")], +) @mock.patch("airflow.executors.local_executor.LocalExecutor.sync") @mock.patch("airflow.executors.sequential_executor.SequentialExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") @@ -146,46 +150,28 @@ def test_gauge_executor_metrics_with_multiple_executors( mock_trigger_tasks, mock_sequential_sync, mock_local_sync, + executor_class, + executor_name, ): - mock_get_executor_names.return_value = ["LocalExecutor", "AwsEcsExecutor"] - local_executor = LocalExecutor() - local_executor.heartbeat() - calls = [ - mock.call( - "executor.open_slots.LocalExecutor", - value=mock.ANY, - tags={"status": "open", "name": "LocalExecutor"}, - ), - mock.call( - "executor.queued_tasks.LocalExecutor", - value=mock.ANY, - tags={"status": "queued", "name": "LocalExecutor"}, - ), - mock.call( - "executor.running_tasks.LocalExecutor", - value=mock.ANY, - tags={"status": "running", "name": "LocalExecutor"}, - ), - ] - mock_stats_gauge.assert_has_calls(calls) + mock_get_executor_names.return_value = ["Exec1", "Exec2"] + executor = executor_class() + executor.heartbeat() - sequential_executor = SequentialExecutor() - sequential_executor.heartbeat() calls = [ mock.call( - "executor.open_slots.SequentialExecutor", + f"executor.open_slots.{executor_name}", value=mock.ANY, - tags={"status": "open", "name": "SequentialExecutor"}, + tags={"status": "open", "name": executor_name}, ), mock.call( - "executor.queued_tasks.SequentialExecutor", + f"executor.queued_tasks.{executor_name}", value=mock.ANY, - tags={"status": "queued", "name": "SequentialExecutor"}, + tags={"status": "queued", "name": executor_name}, ), mock.call( - "executor.running_tasks.SequentialExecutor", + f"executor.running_tasks.{executor_name}", value=mock.ANY, - tags={"status": "running", "name": "SequentialExecutor"}, + tags={"status": "running", "name": executor_name}, ), ] mock_stats_gauge.assert_has_calls(calls) From b019f2892b81c34a570a71122f7fa41aeba993fb Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Wed, 17 Jul 2024 16:12:46 -0700 Subject: [PATCH 5/7] Fix unit test for statsd --- helm_tests/other/test_statsd.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/helm_tests/other/test_statsd.py b/helm_tests/other/test_statsd.py index 8f65a0f6525bf..a2f4b870bd966 100644 --- a/helm_tests/other/test_statsd.py +++ b/helm_tests/other/test_statsd.py @@ -243,8 +243,9 @@ def test_statsd_configmap_by_default(self): mappings_yml = jmespath.search('data."mappings.yml"', docs[0]) mappings_yml_obj = yaml.safe_load(mappings_yml) - assert "airflow_dagrun_dependency_check" == mappings_yml_obj["mappings"][0]["name"] - assert "airflow_pool_starving_tasks" == mappings_yml_obj["mappings"][-1]["name"] + names = [mapping["name"] for mapping in mappings_yml_obj["mappings"]] + assert "airflow_dagrun_dependency_check" in names + assert "airflow_pool_starving_tasks" in names def test_statsd_configmap_when_exist_extra_mappings(self): extra_mapping = { From bc7ea195f1ca2d41868855421b6e6265bea6477b Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Sat, 20 Jul 2024 01:40:44 -0700 Subject: [PATCH 6/7] Rebase with main --- airflow/executors/base_executor.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 25ff602867cbb..173b6da8b5cad 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -260,10 +260,20 @@ def _emit_metrics(self, open_slots, num_running_tasks, num_queued_tasks): if multiple_executors_configured: metric_suffix = name - open_slots_metric_name = f"executor.open_slots.{metric_suffix}" if multiple_executors_configured else "executor.open_slots" - queued_tasks_metric_name = f"executor.queued_tasks.{metric_suffix}" if multiple_executors_configured else "executor.queued_tasks" - running_tasks_metric_name = f"executor.running_tasks.{metric_suffix}" if multiple_executors_configured else "executor.running_tasks" - + open_slots_metric_name = ( + f"executor.open_slots.{metric_suffix}" if multiple_executors_configured else "executor.open_slots" + ) + queued_tasks_metric_name = ( + f"executor.queued_tasks.{metric_suffix}" + if multiple_executors_configured + else "executor.queued_tasks" + ) + running_tasks_metric_name = ( + f"executor.running_tasks.{metric_suffix}" + if multiple_executors_configured + else "executor.running_tasks" + ) + span = Trace.get_current_span() if span.is_recording(): span.add_event( From a723dca7f5b79dd8bd5471107c8acbd39ed38d6c Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 23 Jul 2024 09:05:45 -0700 Subject: [PATCH 7/7] Add comment about why get_executor_names is being mocked --- tests/executors/test_base_executor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index 9a7636615fcd7..7c60cd42bc296 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -153,6 +153,8 @@ def test_gauge_executor_metrics_with_multiple_executors( executor_class, executor_name, ): + # The names of the executors aren't relevant for this test, so long as a list of length > 1 + # is returned. This forces the executor to use the multiple executors gauge logic. mock_get_executor_names.return_value = ["Exec1", "Exec2"] executor = executor_class() executor.heartbeat()