From 2afd2a7d56c64375a8d3786b0f55149a57045ced Mon Sep 17 00:00:00 2001 From: ninsbl Date: Mon, 29 Jan 2024 12:37:23 +0100 Subject: [PATCH 01/15] get task_acks_late from config --- airflow/providers/celery/executors/default_celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/celery/executors/default_celery.py b/airflow/providers/celery/executors/default_celery.py index eaaf0fd0b4f31..3a571dc4ef1c3 100644 --- a/airflow/providers/celery/executors/default_celery.py +++ b/airflow/providers/celery/executors/default_celery.py @@ -72,7 +72,7 @@ def _broker_supports_visibility_timeout(url): "accept_content": ["json"], "event_serializer": "json", "worker_prefetch_multiplier": conf.getint("celery", "worker_prefetch_multiplier", fallback=1), - "task_acks_late": True, + "task_acks_late": conf.getboolean("celery", "task_acks_late", fallback=False), "task_default_queue": conf.get("operators", "DEFAULT_QUEUE"), "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"), "task_track_started": conf.getboolean("celery", "task_track_started", fallback=True), From b91bc435dc0271e3a24951b3f6d70382c36e3afd Mon Sep 17 00:00:00 2001 From: ninsbl Date: Mon, 29 Jan 2024 12:37:49 +0100 Subject: [PATCH 02/15] add task_acks_late to config --- airflow/providers/celery/provider.yaml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/airflow/providers/celery/provider.yaml b/airflow/providers/celery/provider.yaml index 947dfcb6b1a01..f019a6b27fac4 100644 --- a/airflow/providers/celery/provider.yaml +++ b/airflow/providers/celery/provider.yaml @@ -276,6 +276,20 @@ config: type: float example: ~ default: "1.0" + task_acks_late: + description: | + If task runs exceed the visibility_timeout, Celery will re-assign new tasks, even if the + originally started task is still running successfully. The newly created task instance runs + then concurrently and the Airflow UI and logs only show an error message: + 'Task Instance Not Running' FAILED: Task is in the running state + Setting task_acks_late to True will force Celery to wait until a task is finished before a + new task unstance is assigned. This effectively overrides the visibility timeout. + See also: + https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late + version_added: ~ + type: boolean + example: ~ + default: "False" task_track_started: description: | Celery task will report its status as 'started' when the task is executed by a worker. From 00228e3c2dfab6d781ca1c9ac80a5b3f371d79e2 Mon Sep 17 00:00:00 2001 From: Stefan Blumentrath Date: Tue, 30 Jan 2024 19:49:29 +0100 Subject: [PATCH 03/15] Update airflow/providers/celery/provider.yaml Co-authored-by: Hussein Awala --- airflow/providers/celery/provider.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/celery/provider.yaml b/airflow/providers/celery/provider.yaml index f019a6b27fac4..1ff71f0331ba4 100644 --- a/airflow/providers/celery/provider.yaml +++ b/airflow/providers/celery/provider.yaml @@ -289,7 +289,7 @@ config: version_added: ~ type: boolean example: ~ - default: "False" + default: "True" task_track_started: description: | Celery task will report its status as 'started' when the task is executed by a worker. From ead016d968b03426d73f5a509e005d044dd20640 Mon Sep 17 00:00:00 2001 From: Stefan Blumentrath Date: Tue, 30 Jan 2024 19:49:58 +0100 Subject: [PATCH 04/15] Update airflow/providers/celery/executors/default_celery.py Co-authored-by: Hussein Awala --- airflow/providers/celery/executors/default_celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/celery/executors/default_celery.py b/airflow/providers/celery/executors/default_celery.py index 3a571dc4ef1c3..54810f8d9512a 100644 --- a/airflow/providers/celery/executors/default_celery.py +++ b/airflow/providers/celery/executors/default_celery.py @@ -72,7 +72,7 @@ def _broker_supports_visibility_timeout(url): "accept_content": ["json"], "event_serializer": "json", "worker_prefetch_multiplier": conf.getint("celery", "worker_prefetch_multiplier", fallback=1), - "task_acks_late": conf.getboolean("celery", "task_acks_late", fallback=False), + "task_acks_late": conf.getboolean("celery", "task_acks_late", fallback=True), "task_default_queue": conf.get("operators", "DEFAULT_QUEUE"), "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"), "task_track_started": conf.getboolean("celery", "task_track_started", fallback=True), From a7ee86f2401afe5df80bbaba91bcd060de0d0ffd Mon Sep 17 00:00:00 2001 From: ninsbl Date: Tue, 30 Jan 2024 23:53:10 +0100 Subject: [PATCH 05/15] add basic test --- tests/providers/celery/executors/test_celery_executor.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/providers/celery/executors/test_celery_executor.py b/tests/providers/celery/executors/test_celery_executor.py index e6446c0f6fa6b..6e6861daec5c4 100644 --- a/tests/providers/celery/executors/test_celery_executor.py +++ b/tests/providers/celery/executors/test_celery_executor.py @@ -358,3 +358,12 @@ def test_sentinel_kwargs_loaded_from_string(): assert default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["sentinel_kwargs"] == { "service_name": "mymaster" } + + +@conf_vars({("celery", "task_acks_late"): "True"}) +def test_celery_task_acks_late_loaded_from_string(): + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is True From a7592eb3da981db4979ed0af712d20c8e9e21a84 Mon Sep 17 00:00:00 2001 From: ninsbl Date: Wed, 31 Jan 2024 00:07:33 +0100 Subject: [PATCH 06/15] test acks_late False --- tests/providers/celery/executors/test_celery_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/providers/celery/executors/test_celery_executor.py b/tests/providers/celery/executors/test_celery_executor.py index 6e6861daec5c4..450312bd63198 100644 --- a/tests/providers/celery/executors/test_celery_executor.py +++ b/tests/providers/celery/executors/test_celery_executor.py @@ -360,10 +360,10 @@ def test_sentinel_kwargs_loaded_from_string(): } -@conf_vars({("celery", "task_acks_late"): "True"}) +@conf_vars({("celery", "task_acks_late"): "False"}) def test_celery_task_acks_late_loaded_from_string(): import importlib # reload celery conf to apply the new config importlib.reload(default_celery) - assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is True + assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is False From bd57b969ed7b2a9b8d6467ebffd2dce0fc0eff1e Mon Sep 17 00:00:00 2001 From: Stefan Blumentrath Date: Wed, 31 Jan 2024 00:38:14 +0100 Subject: [PATCH 07/15] linting --- airflow/providers/celery/executors/default_celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/celery/executors/default_celery.py b/airflow/providers/celery/executors/default_celery.py index 54810f8d9512a..01b13b1c5a8af 100644 --- a/airflow/providers/celery/executors/default_celery.py +++ b/airflow/providers/celery/executors/default_celery.py @@ -72,7 +72,7 @@ def _broker_supports_visibility_timeout(url): "accept_content": ["json"], "event_serializer": "json", "worker_prefetch_multiplier": conf.getint("celery", "worker_prefetch_multiplier", fallback=1), - "task_acks_late": conf.getboolean("celery", "task_acks_late", fallback=True), + "task_acks_late": conf.getboolean("celery", "task_acks_late", fallback=True), "task_default_queue": conf.get("operators", "DEFAULT_QUEUE"), "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"), "task_track_started": conf.getboolean("celery", "task_track_started", fallback=True), From e07955038d534c8a5bdd506651361341b68c06de Mon Sep 17 00:00:00 2001 From: Stefan Blumentrath Date: Wed, 31 Jan 2024 07:03:23 +0100 Subject: [PATCH 08/15] Update airflow/providers/celery/provider.yaml Co-authored-by: Niko Oliveira --- airflow/providers/celery/provider.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/providers/celery/provider.yaml b/airflow/providers/celery/provider.yaml index 1ff71f0331ba4..b6ed26f0dd403 100644 --- a/airflow/providers/celery/provider.yaml +++ b/airflow/providers/celery/provider.yaml @@ -278,12 +278,12 @@ config: default: "1.0" task_acks_late: description: | - If task runs exceed the visibility_timeout, Celery will re-assign new tasks, even if the - originally started task is still running successfully. The newly created task instance runs - then concurrently and the Airflow UI and logs only show an error message: - 'Task Instance Not Running' FAILED: Task is in the running state + If an Airflow task's execution time exceeds the visibility_timeout, Celery will re-assign the task to a Celery worker, even if the + original task is still running successfully. The new task instance then runs + concurrently with the original task and the Airflow UI and logs only show an error message: + 'Task Instance Not Running' FAILED: Task is in the running state' Setting task_acks_late to True will force Celery to wait until a task is finished before a - new task unstance is assigned. This effectively overrides the visibility timeout. + new task instance is assigned. This effectively overrides the visibility timeout. See also: https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late version_added: ~ From 1d9aa5eaf495472b113b66cb96a19d0663b3cd88 Mon Sep 17 00:00:00 2001 From: Stefan Blumentrath Date: Wed, 31 Jan 2024 07:03:37 +0100 Subject: [PATCH 09/15] Update airflow/providers/celery/provider.yaml Co-authored-by: Niko Oliveira --- airflow/providers/celery/provider.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/celery/provider.yaml b/airflow/providers/celery/provider.yaml index b6ed26f0dd403..2ccf259c20385 100644 --- a/airflow/providers/celery/provider.yaml +++ b/airflow/providers/celery/provider.yaml @@ -288,7 +288,7 @@ config: https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late version_added: ~ type: boolean - example: ~ + example: "True" default: "True" task_track_started: description: | From 395f6672980e68d45e6fd8db4df032a2bc10d585 Mon Sep 17 00:00:00 2001 From: ninsbl Date: Wed, 31 Jan 2024 11:49:52 +0100 Subject: [PATCH 10/15] linting --- airflow/providers/celery/provider.yaml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow/providers/celery/provider.yaml b/airflow/providers/celery/provider.yaml index 2ccf259c20385..80b7583a5ea62 100644 --- a/airflow/providers/celery/provider.yaml +++ b/airflow/providers/celery/provider.yaml @@ -278,9 +278,10 @@ config: default: "1.0" task_acks_late: description: | - If an Airflow task's execution time exceeds the visibility_timeout, Celery will re-assign the task to a Celery worker, even if the - original task is still running successfully. The new task instance then runs - concurrently with the original task and the Airflow UI and logs only show an error message: + If an Airflow task's execution time exceeds the visibility_timeout, Celery will re-assign the + task to a Celery worker, even if the original task is still running successfully. The new task + instance then runs concurrently with the original task and the Airflow UI and logs only show an + error message: 'Task Instance Not Running' FAILED: Task is in the running state' Setting task_acks_late to True will force Celery to wait until a task is finished before a new task instance is assigned. This effectively overrides the visibility timeout. From 841952715dcbe36fb21e93fac1d0e5d588eccbae Mon Sep 17 00:00:00 2001 From: Stefan Blumentrath Date: Sat, 3 Feb 2024 11:34:44 +0100 Subject: [PATCH 11/15] Update spelling_wordlist.txt --- docs/spelling_wordlist.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 1dc8d5693b398..b32231363419b 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -5,6 +5,7 @@ accountmaking aci Ack ack +acks ackIds acknowledgement actionCard From 15fe25987ecc9c6c38dda02cd853e8e87205e296 Mon Sep 17 00:00:00 2001 From: Stefan Blumentrath Date: Sat, 3 Feb 2024 12:26:13 +0100 Subject: [PATCH 12/15] alphabetic order --- docs/spelling_wordlist.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index b32231363419b..1df817304e861 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -5,9 +5,9 @@ accountmaking aci Ack ack -acks ackIds acknowledgement +acks actionCard Acyclic acyclic From b3bf903fa7b8558c9dd3f96a6c69fde9a3631eb9 Mon Sep 17 00:00:00 2001 From: Stefan Blumentrath Date: Mon, 5 Feb 2024 22:26:55 +0100 Subject: [PATCH 13/15] define version added --- airflow/providers/celery/provider.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/celery/provider.yaml b/airflow/providers/celery/provider.yaml index 80b7583a5ea62..9c94ee3280273 100644 --- a/airflow/providers/celery/provider.yaml +++ b/airflow/providers/celery/provider.yaml @@ -287,7 +287,7 @@ config: new task instance is assigned. This effectively overrides the visibility timeout. See also: https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late - version_added: ~ + version_added: 2.9.0 type: boolean example: "True" default: "True" From 6cb1a13bdd779dc05f74fff461812663aa5dface Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Mon, 5 Feb 2024 23:42:02 +0100 Subject: [PATCH 14/15] Update airflow/providers/celery/provider.yaml --- airflow/providers/celery/provider.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/celery/provider.yaml b/airflow/providers/celery/provider.yaml index 9c94ee3280273..80b7583a5ea62 100644 --- a/airflow/providers/celery/provider.yaml +++ b/airflow/providers/celery/provider.yaml @@ -287,7 +287,7 @@ config: new task instance is assigned. This effectively overrides the visibility timeout. See also: https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late - version_added: 2.9.0 + version_added: ~ type: boolean example: "True" default: "True" From 9be5a5e6d628d6de919502f607e2295308dd7aa1 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Mon, 5 Feb 2024 23:46:43 +0100 Subject: [PATCH 15/15] Update airflow/providers/celery/provider.yaml Co-authored-by: Jarek Potiuk --- airflow/providers/celery/provider.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/celery/provider.yaml b/airflow/providers/celery/provider.yaml index 80b7583a5ea62..185b688ce9020 100644 --- a/airflow/providers/celery/provider.yaml +++ b/airflow/providers/celery/provider.yaml @@ -287,7 +287,7 @@ config: new task instance is assigned. This effectively overrides the visibility timeout. See also: https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late - version_added: ~ + version_added: 3.6.0 type: boolean example: "True" default: "True"