From c1daf84a751d751a8e825acbb5bde009fb427638 Mon Sep 17 00:00:00 2001 From: HTErik <89977373+hterik@users.noreply.github.com> Date: Tue, 18 Oct 2022 14:54:42 +0200 Subject: [PATCH] Don't re-patch pods that are already controlled by current worker (#26778) After the scheduler has launched many pods, it keeps trying to re-adopt them by patching every pod. Each patch-operation involves a remote API-call which can be be very slow. In the meantime the scheduler can not do anything else. By ignoring the pods that already have the expected label, the list query-result will be shorter and the number of patch-queries much less. We had an unlucky moment in our environment, where each patch-operation started taking 100ms each, with 200 pods in flight it accumulates into 20 seconds of blocked scheduler. (cherry picked from commit 27ec5620b3d1b93ae68a98e9253e243c6cf70d71) --- airflow/executors/kubernetes_executor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index a718aa1907209..b1822872ed9c8 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -754,14 +754,15 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: """ if not self.scheduler_job_id: raise AirflowException(NOT_STARTED_MESSAGE) + new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id) kwargs = { 'field_selector': "status.phase=Succeeded", - 'label_selector': 'kubernetes_executor=True', + 'label_selector': f'kubernetes_executor=True,airflow-worker!={new_worker_id_label}', } pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs) for pod in pod_list.items: self.log.info("Attempting to adopt pod %s", pod.metadata.name) - pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(self.scheduler_job_id) + pod.metadata.labels['airflow-worker'] = new_worker_id_label try: kube_client.patch_namespaced_pod( name=pod.metadata.name,