diff --git a/dev/perf/scheduler_dag_execution_timing.py b/dev/perf/scheduler_dag_execution_timing.py index d150eed41df03..cbc4ca6e8fc67 100755 --- a/dev/perf/scheduler_dag_execution_timing.py +++ b/dev/perf/scheduler_dag_execution_timing.py @@ -278,7 +278,7 @@ def main(num_runs, repeat, pre_create_dag_runs, executor_class, dag_ids): executor = ShortCircuitExecutor(dag_ids_to_watch=dag_ids, num_runs=num_runs) scheduler_job = Job(executor=executor) - job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids, do_pickle=False) + job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids) executor.job_runner = job_runner total_tasks = sum(len(dag.tasks) for dag in dags) @@ -301,7 +301,7 @@ def main(num_runs, repeat, pre_create_dag_runs, executor_class, dag_ids): reset_dag(dag, session) executor.reset(dag_ids) scheduler_job = Job(executor=executor) - job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids, do_pickle=False) + job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids) executor.scheduler_job = scheduler_job gc.disable() diff --git a/dev/perf/sql_queries.py b/dev/perf/sql_queries.py index 6303d5b6fcd36..60ca8f33f7103 100644 --- a/dev/perf/sql_queries.py +++ b/dev/perf/sql_queries.py @@ -123,7 +123,7 @@ def run_scheduler_job(with_db_reset=False) -> None: if with_db_reset: reset_db() - job_runner = SchedulerJobRunner(job=Job(), subdir=DAG_FOLDER, do_pickle=False, num_runs=3) + job_runner = SchedulerJobRunner(job=Job(), subdir=DAG_FOLDER, num_runs=3) run_job(job=job_runner.job, execute_callable=job_runner._execute) diff --git a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index acd1afcba995a..a8c69871ab9c3 100644 --- a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -56,6 +56,7 @@ class CeleryKubernetesExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + # TODO: Remove this flag once providers depend on Airflow 3.0 supports_pickling: bool = True supports_sentry: bool = False @@ -159,7 +160,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, @@ -167,6 +167,7 @@ def queue_task_instance( ignore_ti_state: bool = False, pool: str | None = None, cfg_path: str | None = None, + **kwargs, ) -> None: """Queues task instance via celery or kubernetes executor.""" from airflow.models.taskinstance import SimpleTaskInstance @@ -175,10 +176,14 @@ def queue_task_instance( self.log.debug( "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key ) + + # TODO: Remove this once providers depend on Airflow 3.0 + if not hasattr(task_instance, "pickle_id"): + del kwargs["pickle_id"] + executor.queue_task_instance( task_instance=task_instance, mark_success=mark_success, - pickle_id=pickle_id, ignore_all_deps=ignore_all_deps, ignore_depends_on_past=ignore_depends_on_past, wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, @@ -186,6 +191,7 @@ def queue_task_instance( ignore_ti_state=ignore_ti_state, pool=pool, cfg_path=cfg_path, + **kwargs, ) def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 63755d3d11a1c..d24a59a95d102 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -45,6 +45,7 @@ class LocalKubernetesExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + # TODO: Remove this attribute once providers rely on Airflow >=3.0.0 supports_pickling: bool = False supports_sentry: bool = False @@ -146,7 +147,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, @@ -154,6 +154,7 @@ def queue_task_instance( ignore_ti_state: bool = False, pool: str | None = None, cfg_path: str | None = None, + **kwargs, ) -> None: """Queues task instance via local or kubernetes executor.""" from airflow.models.taskinstance import SimpleTaskInstance @@ -162,10 +163,13 @@ def queue_task_instance( self.log.debug( "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key ) + + if not hasattr(task_instance, "pickle_id"): + del kwargs["pickle_id"] + executor.queue_task_instance( task_instance=task_instance, mark_success=mark_success, - pickle_id=pickle_id, ignore_all_deps=ignore_all_deps, ignore_depends_on_past=ignore_depends_on_past, wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, @@ -173,6 +177,7 @@ def queue_task_instance( ignore_ti_state=ignore_ti_state, pool=pool, cfg_path=cfg_path, + **kwargs, ) def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index 71fae6691c6fe..2fa72deab0aab 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -110,9 +110,6 @@ def teardown_method(self) -> None: db.clear_db_runs() db.clear_db_jobs() - def test_supports_pickling(self): - assert CeleryExecutor.supports_pickling - def test_supports_sentry(self): assert CeleryExecutor.supports_sentry diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index 13ca0ed828c65..ea143edd82987 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1750,9 +1750,6 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat "Reading from k8s pod logs failed: error_fetching_pod_log", ] - def test_supports_pickling(self): - assert KubernetesExecutor.supports_pickling - def test_supports_sentry(self): assert not KubernetesExecutor.supports_sentry