From a9ac157b5d6993d451eaa1c4432faa074b847666 Mon Sep 17 00:00:00 2001 From: vlieven Date: Fri, 11 Oct 2024 17:12:35 +0200 Subject: [PATCH] Split providers out of the main "airflow/" tree into a UV workspace project (#42505) (#42624) This is only a partial split so far. It moves all the code and tests, but leaves the creation of `core/` to a separate PR as this is already large enough. In addition to the straight file rename the other changes I had to make here are: - Some mypy/typing fixes. Mypy can be fragile about what it picks up when, so maybe some of those changes were caused by that. But the typing changes aren't large. - Improve typing in common.sql type stub Again, likely a mypy file oddity, but the types should be safe - Removed the `check-providers-init-file-missing` check This isn't needed now that airflow/providers shouldn't exist at all in the main tree. - Create a "dev.tests_common" package that contains helper files and common pytest fixtures Since the provider tests are no longer under tests/ they don't automatically share the fixtures from the parent `tests/conftest.py` so they needed extracted. Ditto for `tests.test_utils` -- they can't be easily imported in provider tests anymore, so they are moved to a more explicit shared location. In future we should switch how the CI image is built to make better use of UV caching than our own approach as that would remvoe a lot of custom code. Co-authored-by: Ash Berlin-Taylor Co-authored-by: Ryan Hatter <25823361+RNHTTR@users.noreply.github.com> --- .../executors/kubernetes_executor.py | 1 + .../executors/kubernetes_executor_utils.py | 10 -------- .../executors/test_kubernetes_executor.py | 23 ++++++++++++------- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 7c6e0d8852e5f..4301a54f02922 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -482,6 +482,7 @@ def _change_state( self.running.remove(key) except KeyError: self.log.debug("TI key not in running, not adding to event_buffer: %s", key) + return # If we don't have a TI state, look it up from the db. event_buffer expects the TI state if state is None: diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 495c09ea23e7c..42d2a97dffed1 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -273,16 +273,6 @@ def process_status( (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) ) elif status == "Succeeded": - # We get multiple events once the pod hits a terminal state, and we only want to - # send it along to the scheduler once. - # If our event type is DELETED, or the pod has a deletion timestamp, we've already - # seen the initial Succeeded event and sent it along to the scheduler. - if event["type"] == "DELETED" or pod.metadata.deletion_timestamp: - self.log.info( - "Skipping event for Succeeded pod %s - event for this pod already sent to executor", - pod_name, - ) - return self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string) self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version)) elif status == "Running": diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index 5240bf0faecb2..07b42ee3fc22a 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -784,6 +784,21 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_ finally: executor.end() + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_key_not_in_running(self, mock_get_kube_client, mock_kubernetes_job_watcher): + executor = self.kubernetes_executor + executor.start() + try: + key = ("dag_id", "task_id", "run_id", "try_number1") + executor.running = set() + executor._change_state(key, State.SUCCESS, "pod_name", "default") + assert executor.event_buffer.get(key) is None + assert executor.running == set() + finally: + executor.end() + @pytest.mark.db_test @pytest.mark.parametrize( "multi_namespace_mode_namespace_list, watchers_keys", @@ -1858,14 +1873,6 @@ def test_process_status_succeeded(self): # We don't know the TI state, so we send in None self.assert_watcher_queue_called_once_with_state(None) - def test_process_status_succeeded_dedup_timestamp(self): - self.pod.status.phase = "Succeeded" - self.pod.metadata.deletion_timestamp = timezone.utcnow() - self.events.append({"type": "MODIFIED", "object": self.pod}) - - self._run() - self.watcher.watcher_queue.put.assert_not_called() - @pytest.mark.parametrize( "ti_state", [