diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index bc5699361bac0..cfd31cda894a2 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -239,12 +239,12 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non from airflow.models.taskinstance import TaskInstance hybrid_executor_enabled = hasattr(TaskInstance, "executor") - default_executor = None + default_executor_alias = None if hybrid_executor_enabled: from airflow.executors.executor_loader import ExecutorLoader - default_executor = str(ExecutorLoader.get_default_executor_name()) - default_executor = default_executor.strip(":") + default_executor_name = ExecutorLoader.get_default_executor_name() + default_executor_alias = default_executor_name.alias with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"): self.log.debug("Clearing tasks that have not been launched") @@ -254,7 +254,10 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non ) if self.kubernetes_queue: query = query.where(TaskInstance.queue == self.kubernetes_queue) - elif hybrid_executor_enabled and default_executor == KUBERNETES_EXECUTOR: + # KUBERNETES_EXECUTOR is the string name/alias of the "core" executor represented by this + # module. The ExecutorName for "core" executors always contains an alias and cannot be modified + # to be different from the constant (in this case KUBERNETES_EXECUTOR). + elif hybrid_executor_enabled and default_executor_alias == KUBERNETES_EXECUTOR: query = query.where( or_( TaskInstance.executor == KUBERNETES_EXECUTOR,