From 89fdc0326a0cf71d8e95155047092102695d00b2 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Mon, 16 Dec 2024 22:37:47 -0800 Subject: [PATCH] Compare k8s executor against alias, not full ExecutorName repr (#44967) --- .../cncf/kubernetes/executors/kubernetes_executor.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index bc5699361bac0..cfd31cda894a2 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -239,12 +239,12 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non from airflow.models.taskinstance import TaskInstance hybrid_executor_enabled = hasattr(TaskInstance, "executor") - default_executor = None + default_executor_alias = None if hybrid_executor_enabled: from airflow.executors.executor_loader import ExecutorLoader - default_executor = str(ExecutorLoader.get_default_executor_name()) - default_executor = default_executor.strip(":") + default_executor_name = ExecutorLoader.get_default_executor_name() + default_executor_alias = default_executor_name.alias with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"): self.log.debug("Clearing tasks that have not been launched") @@ -254,7 +254,10 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non ) if self.kubernetes_queue: query = query.where(TaskInstance.queue == self.kubernetes_queue) - elif hybrid_executor_enabled and default_executor == KUBERNETES_EXECUTOR: + # KUBERNETES_EXECUTOR is the string name/alias of the "core" executor represented by this + # module. The ExecutorName for "core" executors always contains an alias and cannot be modified + # to be different from the constant (in this case KUBERNETES_EXECUTOR). + elif hybrid_executor_enabled and default_executor_alias == KUBERNETES_EXECUTOR: query = query.where( or_( TaskInstance.executor == KUBERNETES_EXECUTOR,