From b08c8623bb48571409bc58d4ae5e6dbdde25c64d Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Thu, 22 Aug 2024 14:30:09 +0530 Subject: [PATCH] Sync v2-10-stable with v2-10-test to release python client v2.10.0 (#41610) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Enable pull requests to be run from v*test branches (#41474) (#41476) Since we switch from direct push of cherry-picking to open PRs against v*test branch, we should enable PRs to run for the target branch. (cherry picked from commit a9363e6a30d73a647ed7d45c92d46d1f6f98513f) * Prevent provider lowest-dependency tests to run in non-main branch (#41478) (#41481) When running tests in v2-10-test branch, lowest depenency tests are run for providers - because when calculating separate tests, the "skip_provider_tests" has not been used to filter them out. This PR fixes it. (cherry picked from commit 75da5074969ec874040ea094d5afe00b7f02be76) * Make PROD image building works in non-main PRs (#41480) (#41484) The PROD image building fails currently in non-main because it attempts to build source provider packages rather than use them from PyPi when PR is run against "v-test" branch. This PR fixes it: * PROD images in non-main-targetted build will pull providers from PyPI rather than build them * they use PyPI constraints to install the providers * they use UV - which should speed up building of the images (cherry picked from commit 4d5f1c42a7873329b1b6b8b9b39db2c3033b46df) * Add WebEncoder for trigger page rendering to avoid render failure (#41350) (#41485) Co-authored-by: M. Olcay Tercanlı * Incorrect try number subtraction producing invalid span id for OTEL airflow (issue #41501) (#41502) (#41535) * Fix for issue #39336 * removed unnecessary import (cherry picked from commit dd3c3a7a43102c967d76cdcfe1f2f8ebeef4e212) Co-authored-by: Howard Yoo <32691630+howardyoo@users.noreply.github.com> * Fix failing pydantic v1 tests (#41534) (#41541) We need to exclude some versions of Pydantic v1 because it conflicts with aws provider. (cherry picked from commit a033c5f15a033c751419506ea77ffdbacdd37705) * Fix Non-DB test calculation for main builds (#41499) (#41543) Pytest has a weird behaviour that it will not collect tests from parent folder when subfolder of it is specified after the parent folder. This caused some non-db tests from providers folder have been skipped during main build. The issue in Pytest 8.2 (used to work before) is tracked at https://github.com/pytest-dev/pytest/issues/12605 (cherry picked from commit d48982692c54d024d7c05e1efb7cd2adeb7d896c) * Add changelog for airflow python client 2.10.0 (#41583) (#41584) * Add changelog for airflow python client 2.10.0 * Update client version (cherry picked from commit 317a28ed435960e7184e357a2f128806c34612fa) * Make all test pass in Database Isolation mode (#41567) This adds dedicated "DatabaseIsolation" test to airflow v2-10-test branch.. The DatabaseIsolation test will run all "db-tests" with enabled DB isolation mode and running `internal-api` component - groups of tests marked with "skip-if-database-isolation" will be skipped. * Upgrade build and chart dependencies (#41570) (#41588) (cherry picked from commit c88192c466cb91842310f82a61eaa48b39439bef) Co-authored-by: Jarek Potiuk * Limit watchtower as depenendcy as 3.3.0 breaks moin. (#41612) (cherry picked from commit 1b602d50266184d118db52a674baeab29b1f5688) * Enable running Pull Requests against v2-10-stable branch (#41624) (cherry picked from commit e306e7f7bc1ef12aeab0fc09e018accda3684a2f) * Fix tests/models/test_variable.py for database isolation mode (#41414) * Fix tests/models/test_variable.py for database isolation mode * Review feedback (cherry picked from commit 736ebfe3fe2bd67406d5a50dacbfa1e43767d4ce) * Make latest botocore tests green (#41626) The latest botocore tests are conflicting with a few requirements and until apache-beam upcoming version is released we need to do some manual exclusions. Those exclusions should make latest botocore test green again. (cherry picked from commit a13ccbbdec8e59f30218f604fca8cbb999fcb757) * Simpler task retrieval for taskinstance test (#41389) The test has been updated for DB isolation but the retrieval of task was not intuitive and it could lead to flaky tests possibly (cherry picked from commit f25adf14ad486bac818fe3fdcd61eb3355e8ec9b) * Skip database isolation case for task mapping taskinstance tests (#41471) Related: #41067 (cherry picked from commit 7718bd7a6ed7fb476e4920315b6d11f1ac465f44) * Skipping tests for db isolation because similar tests were skipped (#41450) (cherry picked from commit e94b508b946471420488cc466d92301b54b4c5ae) --------- Co-authored-by: Jarek Potiuk Co-authored-by: Brent Bovenzi Co-authored-by: M. Olcay Tercanlı Co-authored-by: Howard Yoo <32691630+howardyoo@users.noreply.github.com> Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Co-authored-by: Bugra Ozturk --- .../endpoints/rpc_api_endpoint.py | 9 +-- airflow/api_internal/internal_api_call.py | 2 +- airflow/models/variable.py | 66 ++++++++++++++++++- airflow/serialization/enums.py | 1 + airflow/serialization/serialized_objects.py | 16 ++++- tests/models/test_taskinstance.py | 9 ++- tests/models/test_variable.py | 8 ++- tests/sensors/test_external_task_sensor.py | 2 + 8 files changed, 99 insertions(+), 14 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index a85964af4f64a..c3d8b671fbb08 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -126,9 +126,9 @@ def initialize_method_map() -> dict[str, Callable]: # XCom.get_many, # Not supported because it returns query XCom.clear, XCom.set, - Variable.set, - Variable.update, - Variable.delete, + Variable._set, + Variable._update, + Variable._delete, DAG.fetch_callback, DAG.fetch_dagrun, DagRun.fetch_task_instances, @@ -237,7 +237,8 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse: response = json.dumps(output_json) if output_json is not None else None log.debug("Sending response: %s", response) return Response(response=response, headers={"Content-Type": "application/json"}) - except AirflowException as e: # In case of AirflowException transport the exception class back to caller + # In case of AirflowException or other selective known types, transport the exception class back to caller + except (KeyError, AttributeError, AirflowException) as e: exception_json = BaseSerialization.serialize(e, use_pydantic_models=True) response = json.dumps(exception_json) log.debug("Sending exception response: %s", response) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index fc0945b3c0fe0..8838377877bec 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -159,7 +159,7 @@ def wrapper(*args, **kwargs): if result is None or result == b"": return None result = BaseSerialization.deserialize(json.loads(result), use_pydantic_models=True) - if isinstance(result, AirflowException): + if isinstance(result, (KeyError, AttributeError, AirflowException)): raise result return result diff --git a/airflow/models/variable.py b/airflow/models/variable.py index 63b71303bc803..563cac46e8c84 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -154,7 +154,6 @@ def get( @staticmethod @provide_session - @internal_api_call def set( key: str, value: Any, @@ -167,6 +166,35 @@ def set( This operation overwrites an existing variable. + :param key: Variable Key + :param value: Value to set for the Variable + :param description: Description of the Variable + :param serialize_json: Serialize the value to a JSON string + :param session: Session + """ + Variable._set( + key=key, value=value, description=description, serialize_json=serialize_json, session=session + ) + # invalidate key in cache for faster propagation + # we cannot save the value set because it's possible that it's shadowed by a custom backend + # (see call to check_for_write_conflict above) + SecretCache.invalidate_variable(key) + + @staticmethod + @provide_session + @internal_api_call + def _set( + key: str, + value: Any, + description: str | None = None, + serialize_json: bool = False, + session: Session = None, + ) -> None: + """ + Set a value for an Airflow Variable with a given Key. + + This operation overwrites an existing variable. + :param key: Variable Key :param value: Value to set for the Variable :param description: Description of the Variable @@ -190,7 +218,6 @@ def set( @staticmethod @provide_session - @internal_api_call def update( key: str, value: Any, @@ -200,6 +227,27 @@ def update( """ Update a given Airflow Variable with the Provided value. + :param key: Variable Key + :param value: Value to set for the Variable + :param serialize_json: Serialize the value to a JSON string + :param session: Session + """ + Variable._update(key=key, value=value, serialize_json=serialize_json, session=session) + # We need to invalidate the cache for internal API cases on the client side + SecretCache.invalidate_variable(key) + + @staticmethod + @provide_session + @internal_api_call + def _update( + key: str, + value: Any, + serialize_json: bool = False, + session: Session = None, + ) -> None: + """ + Update a given Airflow Variable with the Provided value. + :param key: Variable Key :param value: Value to set for the Variable :param serialize_json: Serialize the value to a JSON string @@ -219,11 +267,23 @@ def update( @staticmethod @provide_session - @internal_api_call def delete(key: str, session: Session = None) -> int: """ Delete an Airflow Variable for a given key. + :param key: Variable Keys + """ + rows = Variable._delete(key=key, session=session) + SecretCache.invalidate_variable(key) + return rows + + @staticmethod + @provide_session + @internal_api_call + def _delete(key: str, session: Session = None) -> int: + """ + Delete an Airflow Variable for a given key. + :param key: Variable Keys """ rows = session.execute(delete(Variable).where(Variable.key == key)).rowcount diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py index a5bd5e3646e83..f216ce7316103 100644 --- a/airflow/serialization/enums.py +++ b/airflow/serialization/enums.py @@ -46,6 +46,7 @@ class DagAttributeTypes(str, Enum): RELATIVEDELTA = "relativedelta" BASE_TRIGGER = "base_trigger" AIRFLOW_EXC_SER = "airflow_exc_ser" + BASE_EXC_SER = "base_exc_ser" DICT = "dict" SET = "set" TUPLE = "tuple" diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 6d0bbd4e23fd8..84ad5679182bb 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -692,6 +692,15 @@ def serialize( ), type_=DAT.AIRFLOW_EXC_SER, ) + elif isinstance(var, (KeyError, AttributeError)): + return cls._encode( + cls.serialize( + {"exc_cls_name": var.__class__.__name__, "args": [var.args], "kwargs": {}}, + use_pydantic_models=use_pydantic_models, + strict=strict, + ), + type_=DAT.BASE_EXC_SER, + ) elif isinstance(var, BaseTrigger): return cls._encode( cls.serialize(var.serialize(), use_pydantic_models=use_pydantic_models, strict=strict), @@ -834,13 +843,16 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: return decode_timezone(var) elif type_ == DAT.RELATIVEDELTA: return decode_relativedelta(var) - elif type_ == DAT.AIRFLOW_EXC_SER: + elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER: deser = cls.deserialize(var, use_pydantic_models=use_pydantic_models) exc_cls_name = deser["exc_cls_name"] args = deser["args"] kwargs = deser["kwargs"] del deser - exc_cls = import_string(exc_cls_name) + if type_ == DAT.AIRFLOW_EXC_SER: + exc_cls = import_string(exc_cls_name) + else: + exc_cls = import_string(f"builtins.{exc_cls_name}") return exc_cls(*args, **kwargs) elif type_ == DAT.BASE_TRIGGER: tr_cls_name, kwargs = cls.deserialize(var, use_pydantic_models=use_pydantic_models) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index ba5ac7c7b54ad..cbd38e9b390c8 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -1444,7 +1444,10 @@ def test_check_task_dependencies( # Parameterized tests to check for the correct firing # of the trigger_rule under various circumstances of mapped task # Numeric fields are in order: - # successes, skipped, failed, upstream_failed, done,removed + # successes, skipped, failed, upstream_failed, done,remove + # Does not work for database isolation mode because there is local test monkeypatching of upstream_failed + # That never gets propagated to internal_api + @pytest.mark.skip_if_database_isolation_mode @pytest.mark.parametrize( "trigger_rule, upstream_states, flag_upstream_failed, expect_state, expect_completed", [ @@ -1540,8 +1543,10 @@ def do_something_else(i): monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_: upstream_states) ti = dr.get_task_instance("do_something_else", session=session) ti.map_index = 0 + base_task = ti.task + for map_index in range(1, 5): - ti = TaskInstance(dr.task_instances[-1].task, run_id=dr.run_id, map_index=map_index) + ti = TaskInstance(base_task, run_id=dr.run_id, map_index=map_index) session.add(ti) ti.dag_run = dr session.flush() diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index e3d5c023a24ab..6fb6fa15f214c 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -47,6 +47,7 @@ def setup_test_cases(self): db.clear_db_variables() crypto._fernet = None + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet @conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"): "True"}) def test_variable_no_encryption(self, session): """ @@ -60,6 +61,7 @@ def test_variable_no_encryption(self, session): # should mask anything. That logic is tested in test_secrets_masker.py self.mask_secret.assert_called_once_with("value", "key") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()}) def test_variable_with_encryption(self, session): """ @@ -70,6 +72,7 @@ def test_variable_with_encryption(self, session): assert test_var.is_encrypted assert test_var.val == "value" + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet @pytest.mark.parametrize("test_value", ["value", ""]) def test_var_with_encryption_rotate_fernet_key(self, test_value, session): """ @@ -152,6 +155,7 @@ def test_variable_update(self, session): Variable.update(key="test_key", value="value2", session=session) assert "value2" == Variable.get("test_key") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, API server has other ENV def test_variable_update_fails_on_non_metastore_variable(self, session): with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"): with pytest.raises(AttributeError): @@ -281,6 +285,7 @@ def test_caching_caches(self, mock_ensure_secrets: mock.Mock): mock_backend.get_variable.assert_called_once() # second call was not made because of cache assert first == second + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other env def test_cache_invalidation_on_set(self, session): with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="from_env"): a = Variable.get("key") # value is saved in cache @@ -316,7 +321,7 @@ def test_masking_only_secret_values(variable_value, deserialize_json, expected_m val=variable_value, ) session.add(var) - session.flush() + session.commit() # Make sure we re-load it, not just get the cached object back session.expunge(var) _secrets_masker().patterns = set() @@ -326,5 +331,4 @@ def test_masking_only_secret_values(variable_value, deserialize_json, expected_m for expected_masked_value in expected_masked_values: assert expected_masked_value in _secrets_masker().patterns finally: - session.rollback() db.clear_db_variables() diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 07ed3ab321d3a..0ca13b343f230 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -809,6 +809,7 @@ def test_catch_invalid_allowed_states(self): dag=self.dag, ) + @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_external_task_sensor_waits_for_task_check_existence(self): op = ExternalTaskSensor( task_id="test_external_task_sensor_check", @@ -821,6 +822,7 @@ def test_external_task_sensor_waits_for_task_check_existence(self): with pytest.raises(AirflowException): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_external_task_sensor_waits_for_dag_check_existence(self): op = ExternalTaskSensor( task_id="test_external_task_sensor_check",