From c9b95dca3dfcfd51fbd4176c86ff78e3a11f3c07 Mon Sep 17 00:00:00 2001 From: hterik Date: Thu, 29 Sep 2022 16:42:53 +0200 Subject: [PATCH] Don't re-patch pods that are already controlled by current worker After the scheduler has launched many pods, it keeps trying to re-adopt them by patching every pod. Each patch-operation involves a remote API-call which can be be very slow. In the meantime the scheduler can not do anything else. By ignoring the pods that already have the expected label, the list query-result will be shorter and the number of patch-queries much less. We had an unlucky moment in our environment, where each patch-operation started taking 100ms each, with 200 pods in flight it accumulates into 20 seconds of blocked scheduler. --- airflow/executors/kubernetes_executor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3d7876d9e5aab..dbd93715788a5 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -759,14 +759,15 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: """ if not self.scheduler_job_id: raise AirflowException(NOT_STARTED_MESSAGE) + new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id) kwargs = { 'field_selector': "status.phase=Succeeded", - 'label_selector': 'kubernetes_executor=True', + 'label_selector': f'kubernetes_executor=True,airflow-worker!={new_worker_id_label}', } pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs) for pod in pod_list.items: self.log.info("Attempting to adopt pod %s", pod.metadata.name) - pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(self.scheduler_job_id) + pod.metadata.labels['airflow-worker'] = new_worker_id_label try: kube_client.patch_namespaced_pod( name=pod.metadata.name,