Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rerunning a dag in Airflow 2.7.3 causes a missing table issue #41894

Closed
2 tasks done
jeremiahishere opened this issue Aug 30, 2024 · 6 comments
Closed
2 tasks done

Rerunning a dag in Airflow 2.7.3 causes a missing table issue #41894

jeremiahishere opened this issue Aug 30, 2024 · 6 comments
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet pending-response

Comments

@jeremiahishere
Copy link

jeremiahishere commented Aug 30, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.3

What happened?

My team is working on upgrading an old system to the latest stable version of Airflow. We are currently stuck on the 2.7.3 upgrade. The first time a test is run in 2.7.3 that calls airflow/modles/dag.py _get_or_create_dagrun, it passes. All subsequent test runs fail with the following error:

sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id'

What you think should happen instead?

The error occurs when running a dag with the same dag id and execution date more than once. This happens in our integration tests. The if dr: block below is run and the error occurs when session.commit() is called.

def _get_or_create_dagrun(...) -> DagRun:
    dr: DagRun = session.scalar(
        select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
    )
    if dr:
        session.delete(dr)
        session.commit() # this line
    dr = ...

I believe that rerunning a dag id on an execution date should work or should raise an error based on overwriting a previous dag run.

How to reproduce

In our system this is reproducible in test on any dag with a hard coded execution date. Here is a sample setup. I can fill out classes if it is helpful.

dags/my_dag.py

def my_dag(dag_config: MyConfig) -> DAG:
    with DAG(...) as dag:
        my_operator = MyOperator(...)

        other_operator = ...

        return dag

test/integration/test_my_dag.py

# test setup
# ...

DAG_RUN_DATE = datetime(
    year=2024,
    month=4,
    day=26,
    hour=3,
    minute=00,
    tzinfo=pendulum.timezone("America/New_York")

dag_config = MyConfig(
  dag_id="my_test_dag",
  ...
)

dag = my_dag(dag_config)
run_dag(dag, DAG_RUN_DATE)

# test assertions
# ...

test/helpers.py

from airflow.utils.session import provide_session
@provide_session
def run_dag(dag, date, session=None):
    dag.test(
        execution_date=date,
    )

Operating System

CentOS 7

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.10.0
apache-airflow-providers-celery==3.4.1
apache-airflow-providers-cncf-kubernetes==7.8.0
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-datadog==3.4.0
apache-airflow-providers-docker==3.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-hashicorp==3.5.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-jenkins==3.4.0
apache-airflow-providers-microsoft-azure==8.1.0
apache-airflow-providers-opsgenie==5.2.0
apache-airflow-providers-postgres==5.7.1
apache-airflow-providers-redis==3.4.0
apache-airflow-providers-slack==8.3.0
apache-airflow-providers-sqlite==3.5.0
apache-airflow-providers-ssh==3.8.1

Deployment

Docker-Compose

Deployment details

No response

Anything else?

I have found a number of issues on this github page that target similar loading problems with the ab_user table. None of them mention running a dag twice as part of the reproduction steps.

We wrote a patch to fix the issue. This patch will get us through the Airflow 2.7.3 upgrade so we can continue upgrading. We don't understand why there aren't other people with the same problem.

def patched_get_or_create_dagrun(...) -> DagRun:
    # CHANGES
    from airflow.auth.managers.fab.models import User
    from sqlalchemy import select
    # END OF CHANGES

    dr: DagRun = session.scalar(
        select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
    )
    if dr:
        session.delete(dr)
        session.commit() # this line
    ...

import airflow.models.dag as af_dag
af_dag._get_or_create_dagrun = patched_get_or_create_dagrun

Our integration tests generally upload data to an internal s3 analog and use the file path to partition the data based on the date. Making this system dynamic would be a pretty big rewrite so we are looking for options. Is there a standard practice for airflow integration testing with hard coded dates that we have missed? Are we doing something out of the ordinary for airflow?

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jeremiahishere jeremiahishere added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Aug 30, 2024
Copy link

boring-cyborg bot commented Aug 30, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@jscheffl
Copy link
Contributor

Thanks for reporting the bug. Whereas retrospective version 2.7 is pretty old and very probably we will not release patches.

Do I understand it right that mainly you have problems in your integration tests but your production system is running fine? Or is it a regular (production) use case to run a DAG several times with the same execution date?

We wrote a patch to fix the issue. This patch will get us through the Airflow 2.7.3 upgrade so we can continue upgrading. We don't understand why there aren't other people with the same problem.

I assume most users do not re-use the execution date. The execution date is defined ans unique as it was in the past part of the primary key. This had changed within the Ariflow 2 version stream and run_id became the primary key. In Airflow 3 we will remove the execution date as field as it was replaced with logical date. Also we have seen that a unique-ness of this field blocks some specific use cases in regards of data driven even processing.

Anyway... question is: Shall we keep this bug report as a kind of documentation of "Known problems"? Do you expect that something on the very old version 2.7 is patched?

My direct proposal would be to move-away from re-using the execution date in your tests. In the unit and integration tests we do with Airflow we always clean the DB before adding new test DAG records in order to prevent a clash. As re-using the execution date is not a supported use case I also would believe a patch would be earliest accepted in 2.10 line but I doubt a bit that this is a real bug/problem.

@jeremiahishere
Copy link
Author

It is failing in test for us. Production is not failing because the dag execution dates are based on the current time. It started failing when upgrading from airflow 2.6 to 2.7. I would love to be given the time to rewrite four years of test fixtures but I don't think that is going to happen.

Should I expect this behavior in main or the v2-10-stable branch? I see roughly the same code here: https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L3786-L3791

What is the use case where this code works as intended, deleting the previous dagrun and creating a new one? Is there something special about the test environment that skips loading the ab_user table metadata? Is there a different place to patch that will be easier to maintain as we upgrade across versions?

@jeremiahishere
Copy link
Author

Is there an official way to clean the database between tests? I found this cleanup dag: https://github.com/apache/airflow/blob/v2-10-stable/airflow/utils/db_cleanup.py. I am also comfortable playing around with copies of the .sqlite file backing the database.

I would like to get closer to a standard use case for our testing environment. If you could point me to documentation, I would appreciate it.

@potiuk
Copy link
Member

potiuk commented Sep 3, 2024

I do not know too much about the root cause, but let me post some things that might help you to straighten things out:

  • 'airflow db reset` will reset the database. I believe if you create the DB from scratch it should work.
  • It's also likely that the ab_user in your original database historically were created in a different schema
  • there were some issues releated to the index in 2.7.2 : Fix unfound ab_user table in the CLI session #34120 Fix foreign key warning re ab_user.id #34656 - you can take a look there and see if you can figure out what's wrong in your database
  • this index is going to be dropped in Airflow 3 . You can likely drop it now as well.

@potiuk
Copy link
Member

potiuk commented Sep 3, 2024

Also converting it into discussion. This is not "airflow issue" per se - I doubt we will be able to make a PR from it that will fix something - it's more of possibly troubleshooting what's wrong.

@apache apache locked and limited conversation to collaborators Sep 3, 2024
@potiuk potiuk converted this issue into discussion #41980 Sep 3, 2024

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet pending-response
Projects
None yet
Development

No branches or pull requests

3 participants