diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3d7876d9e5aab..dbd93715788a5 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -759,14 +759,15 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: """ if not self.scheduler_job_id: raise AirflowException(NOT_STARTED_MESSAGE) + new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id) kwargs = { 'field_selector': "status.phase=Succeeded", - 'label_selector': 'kubernetes_executor=True', + 'label_selector': f'kubernetes_executor=True,airflow-worker!={new_worker_id_label}', } pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs) for pod in pod_list.items: self.log.info("Attempting to adopt pod %s", pod.metadata.name) - pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(self.scheduler_job_id) + pod.metadata.labels['airflow-worker'] = new_worker_id_label try: kube_client.patch_namespaced_pod( name=pod.metadata.name,