Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confusing log for long running tasks: "dependency 'Task Instance Not Running' FAILED: Task is in the running state" #16163

Closed
yuqian90 opened this issue May 30, 2021 · 46 comments · Fixed by #37066
Assignees
Labels
affected_version:2.1 Issues Reported for 2.1 area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug

Comments

@yuqian90
Copy link
Contributor

yuqian90 commented May 30, 2021

Apache Airflow version: 1.10.* / 2.0.* / 2.1.*

Kubernetes version (if you are using kubernetes) (use kubectl version): Any

Environment:

  • Cloud provider or hardware configuration: Any
  • OS (e.g. from /etc/os-release): Any
  • Kernel (e.g. uname -a): Any
  • Install tools: Any
  • Others: N/A

What happened:

This line in the TaskInstance log is very misleading. It seems to happen for tasks that take longer than one hour. When users are waiting for tasks to finish and see this in the log, they often get confused. They may think something is wrong with their task or with Airflow. In fact, this line is harmless. It's simply saying "the TaskInstance is already running so it cannot be run again".

{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.

What you expected to happen:

The confusion is unnecessary. This line should be silenced in the log. Or it should log something clearer.

How to reproduce it:

Any task that takes more than an hour to run has this line in the log.

@yuqian90 yuqian90 added the kind:bug This is a clearly a bug label May 30, 2021
@yuqian90 yuqian90 self-assigned this May 30, 2021
@yuqian90 yuqian90 added this to the Airflow 2.1.1 milestone May 30, 2021
@eladkal eladkal added area:Scheduler including HA (high availability) scheduler affected_version:2.1 Issues Reported for 2.1 labels May 30, 2021
@david30907d
Copy link
Contributor

I bumped into this issue and I misunderstood until now 😅
thanks for pointing out it's okay to wait

@yuqian90
Copy link
Contributor Author

yuqian90 commented Jun 26, 2021

After some more investigation, it's very likely we see this log appearing an hour after a long running task started because of the default visibility_timeout setting in Celery. This code in default_celery.py sets visibility_timeout to 21600 only if the broker_url starts with redis or sqs. In our case we are using redis sentinels so it's still redis although the URL starts with sentinel. Therefore the visibility_timeout is left at 3600 which is the default according to celery documentation. The weird thing is that after I tried to manually change visibility_timeout to a very large integer in airflow.cfg, the same log still showed up exactly an hour after a task started. So it seems changing visibility_timeout in this case does not make any difference. Not sure if anyone experienced the same.

@david30907d maybe try changing visibility_timeout to a large number in your setup and see if it still happens after an hour. If it stops for you, it means visibility_timeout is probably the cause. There may be something wrong in our own setup causing changing visibility_timeout not to take effect.

def _broker_supports_visibility_timeout(url):
    return url.startswith("redis://") or url.startswith("sqs://")


log = logging.getLogger(__name__)

broker_url = conf.get('celery', 'BROKER_URL')

broker_transport_options = conf.getsection('celery_broker_transport_options') or {}
if 'visibility_timeout' not in broker_transport_options:
    if _broker_supports_visibility_timeout(broker_url):
        broker_transport_options['visibility_timeout'] = 21600

@david30907d
Copy link
Contributor

@yuqian90 thanks, very useful information. I'll give celery_broker_transport_options a shot!

@potiuk potiuk modified the milestones: Airflow 2.1.1, Airflow 2.2 Jun 28, 2021
@potiuk
Copy link
Member

potiuk commented Jun 28, 2021

Moved it to 2.2 now - I guess we have not addressed it yet in 2.1.1

@malthe
Copy link
Contributor

malthe commented Jul 2, 2021

In the case where the visibility timeout is reached, it's confusing that there is not a clear log line that the task has been killed for taking too long to complete.

(If that's indeed what is happening.)

@potiuk is it the case, that the Celery task is killed or is it simply no longer streaming logs into Airflow at that point?

@potiuk
Copy link
Member

potiuk commented Jul 2, 2021

Not sure. Needs investigation

@kaxil kaxil modified the milestones: Airflow 2.2, 2.2.1 Sep 14, 2021
@ashb
Copy link
Member

ashb commented Sep 14, 2021

@malthe That's not quite what is happening. My understanding is this:

  • The task message on celery is picked up and the TI starts running
  • After 1 hour (the celery visiblity timeout) celery makes the message visible again
  • A second worker picks up the celery message
  • When the second one runs, it gets this message.

So the first attempt is still running, and this message is the second concurrent worker (but for the same try_number) saying "I can't run this task, it's already running."

@malthe
Copy link
Contributor

malthe commented Sep 14, 2021

@ashb so regardless of whether the first worker is still running or defunct/dead, shouldn't the second worker be able to "take over" somehow?

Otherwise, what's the point in trying?

@ashb
Copy link
Member

ashb commented Sep 14, 2021

This is largely airflow and celery "fighting" over behaviour

@ashb ashb modified the milestones: Airflow 2.2.1, Airflow 2.2.2 Oct 14, 2021
@kaxil kaxil modified the milestones: Airflow 2.2.2, Airflow 2.3.0 Nov 1, 2021
@eugene-nikolaev
Copy link

Hi!
Just ran into the issue, having an "amqp://" starting broker url.
Will it be fixed in 2.3.0?

@yuqian90
Copy link
Contributor Author

yuqian90 commented Apr 8, 2022

@ashb so regardless of whether the first worker is still running or defunct/dead, shouldn't the second worker be able to "take over" somehow?

Otherwise, what's the point in trying?

From what i've observed, there's no point for the second Airflow worker to try running it. We should just silence the error.

{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.

@eugene-nikolaev
Copy link

Hi @yuqian90!
Do I understand correctly that without patching I couldn't configure that visibility_timeout with an amqp-starting broker
URL?

@ashb
Copy link
Member

ashb commented Apr 8, 2022

@yuqian90 How can the worker tell it is the "second" time it's running?

@ashb ashb modified the milestones: Airflow 2.3.0, Airflow 2.3.1 Apr 22, 2022
@ephraimbuddy ephraimbuddy removed this from the Airflow 2.3.1 milestone May 23, 2022
@ebguidion
Copy link

I am dealing with that issue since March 01, 2023. Airflow version : 2.2.2 and we are using with MWAA. Do you have any solution about it? When I run the tasks I am getting this error and subtask stucked in queued or running state. Here is the logs :

INFO - Dependencies not met for <TaskInstancedependency 'Task Instance Not Running' FAILED: Task is in the running state
INFO - Dependencies not met for <TaskInstance: dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.
INFO - Task is not able to be run

@sanagapalliavinash
Copy link

As an first assumption : it looks like an environment issue and as a temporary fix : to retires maximum extent.
For example: if one of the job runs for more than 10+ hours and due to this environment issue job is getting failed every 1 hour . We have added maximum retries so that at 4th retry the job is getting fixed automatically and error message is getting ignored.
For now retries might be the best option as an temporary fix

@ebguidion
Copy link

As an first assumption : it looks like an environment issue and as a temporary fix : to retires maximum extent. For example: if one of the job runs for more than 10+ hours and due to this environment issue job is getting failed every 1 hour . We have added maximum retries so that at 4th retry the job is getting fixed automatically and error message is getting ignored. For now retries might be the best option as an temporary fix

I also did the same and opened the ticket to Aws.

@sanagapalliavinash
Copy link

sanagapalliavinash commented Mar 3, 2023

Please also help us to know if you get any response for AWS with any solution : as we have also opened a ticket to Google Cloud

@marcus-moura
Copy link

I've been facing this problem too, in my case my task doesn't even run. It is a task to run a job in Databricks with DatabricksDeferrableOperator and its upstreams last for approximately 2 hours, but in the log of the tasks that fail, the execution date is less than the end date of the upstreams. So it looks like it ran without the upstreams having successfully completed.

When analyzing the scheduler log I found this:

[2023-03-08 06:06:06,617] {scheduler_job.py:687} ERROR - Executor reports task instance <TaskInstance: job_orchestration.sourcing.cubo_custos_job_new.cubo_custos_job_new scheduled__2023-03-07T05:00:00+00:00 [queued ]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?

@sanagapalliavinash
Copy link

One more observation : usually the becoz of below error: job used to fail or job will ignore the warning. Now job is getting stuck and its not even getting failed and moving forward.

Any one got any solution pls help

{taskinstance.py:999} INFO - Dependencies not met for <TaskInstance: file_data_transfer.taskid_data_transfer 2023-02-19T00:01:00+00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state

{taskinstance.py:999} INFO - Dependencies not met for <TaskInstance: file_data_transfer.taskid_data_transfer 2023-02-19T00:01:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.

{local_task_job.py:99} INFO - Task is not able to be run

@sanagapalliavinash
Copy link

Any one has any solution to fix this ??

@medina325
Copy link

...

It seems likely that the logs are lost for really long running celery tasks (perhaps due to the visibility_timeout, I am not sure), so people only see the Task is not able to be run error, but something else (likely their code) is deadlocking, they just cant see it.

Has anyone tried setting execution_timeout on those tasks, I assume that airflow will eventually mark that task as failed.

yes I have, and that's not the solution, task keeps being "externally set to failed"

@sanagapalliavinash
Copy link

...

It seems likely that the logs are lost for really long running celery tasks (perhaps due to the visibility_timeout, I am not sure), so people only see the Task is not able to be run error, but something else (likely their code) is deadlocking, they just cant see it.
Has anyone tried setting execution_timeout on those tasks, I assume that airflow will eventually mark that task as failed.

yes I have, and that's not the solution, task keeps being "externally set to failed"

What is the action you have applied ?? And will that be going to at least complete temporarily ???

@medina325
Copy link

medina325 commented Mar 13, 2023

I can give you the full logs right now, my task is a PythonOperator that inserts huge CSV files into Postgres. Concretely, I pass a method argument to the to_sql pandas method, in order to apply an upsert operation.

My task is always externally failed after 6 hours :/

@polidore
Copy link

same issue. a task that used to take ~4 hours is now taking ~8 hours since I switched to celery. it seems maybe on the retry it finishes?

@AndreiArion
Copy link

In our context (GCP Airflow 2.2.5 #16163 (comment)) the problem was correlated with Kubernetes pods being evicted due of lacking ressources. After scaling up our kubernetes cluster and updating to Airflow 2.4.3 (+ google-providers=8.8.0 due to https://stackoverflow.com/questions/74730471/gcstobigqueryoperator-not-working-in-composer-2-1-0-airflow-2-3-4/75319966#75319966 ) we have not seen this problem in the last month since upgrading .
As we no longer have pods evictions I cannot conclude if the issue was fixed in Airflow or the workaround was sufficient (preventing Airflow workers to be killed).

@ashb ashb removed the needs-triage label for new issues that we didn't triage yet label Apr 8, 2023
@ashb ashb removed this from the Airflow 2.5.4 milestone Apr 8, 2023
@MattiaGallegati
Copy link

Hello, are there any new evidence on this bug?

@thesuperzapper
Copy link
Contributor

thesuperzapper commented May 26, 2023

@ashb @potiuk and others watching, if you experience this issue, it's just that your task is taking more than 6 hours, in most cases your task will continue running, but you can't see your logs until the task finishes or fails.

As a temporary fix, you can increase the time it takes for this to happen by increasing the value of AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT to something like 86400 (24 hours).


I believe I have uncovered the cause of this issue and would appreciate feedback.

The question is, why are we losing the logs after 6 hours?
I believe this is because of a slightly complex combination of celery configs:

The visibility_timeout is the number of seconds celery will wait for an acknowledgment before it returns a task to the queue (for it to be picked up by another worker). But because task_acks_late is True, airflow's celery workers will not acknowledge tasks until AFTER the task has finished. Hence, if a task takes more than visibility_timeout seconds to finish, celery will tell another worker to start this task.

When this happens, the new instance of this task will realize the task is still running on the old worker (it's not failed, and is even heart-beating the TaskInstance), and will correctly wait (because a task can not be "running" in two places). But now the airflow UI only shows the logs for this new "phantom task", which will always be:

{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.

Effectively, this issue is the result of celery taking matters into its own hands (outside of the control of the airflow scheduler), and telling a new worker to start the task which is still running on the old worker.


Setting our task_acks_late to False will likely fix this issue. However, I am not sure what the other implications would be, as I can see that it has been set to True for most of Airflow's existence, it was set to True in this commit from 2015.

Celery's purpose of task_acks_late is to prevent "zombie" tasks that take too long. But Airflow already has "heartbeat" checks for zombie tasks, so there is probably no need to have celery re-issuing tasks.

Setting task_acks_late to False, and visibility_timeout to a much lower number (like 300 sec), will also have the added benefit of detecting celery transport failures


Finally, I want to highlight that Airflow's docs are confusing about what the default visibility_timeout is, because this piece of code will set it to 21600, when AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT is None, which is different from the celery default of 3600

@potiuk
Copy link
Member

potiuk commented Jun 8, 2023

Care for submitting a PR @thesuperzapper ? That would be a small thing to give back for the years of business you have on top of Airflow

@malthe
Copy link
Contributor

malthe commented Jun 9, 2023

According to the Celery docs:

Late ack means the task messages will be acknowledged after the task has been executed, not right before (the default behavior).

It seems that it's a way to ensure at-least-once processing rather than at-most-once. I would say that since Airflow does retrying on its own accord, we want the latter which is why it probably should remain disabled.

@thesuperzapper
Copy link
Contributor

@potiuk @malthe here is the PR to change task_acks_late to False:

Let's discuss the implications and do some testing in that thread.

@sourabp
Copy link

sourabp commented Sep 26, 2023

@thesuperzapper @potiuk , We are also facing the same issue, however in our case the previous set of logs are getting trimmed and the logs start with

{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be `run.

Logs of previously running task append post these lines.

Airflow version: 2.6.0

@richard-iovanisci
Copy link

I am seeing this as well with Airflow v2.5.1 (not using kubernetes) w/Celery Executor. We also utilize s3 remote logging, and observe regular log for the first hour, then Dependencies not met for <TaskInstance log is written to s3 and only that log is visible until the task completes (since airflow checks remote log first).

Once the task completes, the "real" log for the original task is appended below the Dependencies not met for <TaskInstance log...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment