Skip to content

Commit

Permalink
Fix type annotations and lint
Browse files Browse the repository at this point in the history
  • Loading branch information
jgreben committed Nov 27, 2024
1 parent d45a74e commit f0a618a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 19 deletions.
18 changes: 11 additions & 7 deletions libsys_airflow/plugins/digital_bookplates/dag_979_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.decorators import task
from airflow.models import DagRun
from airflow.models import Variable
from airflow.utils.state import State
from airflow.utils.state import DagRunState

from libsys_airflow.plugins.digital_bookplates.bookplates import (
launch_digital_bookplate_979_dag,
Expand All @@ -19,13 +19,17 @@


@task
def failed_979_dags() -> list:
def failed_979_dags() -> dict:
"""
Find all of the failed digital_bookplate_979 DAG runs
"""
start_date = datetime.now(timezone.utc) - timedelta(30)
dag_runs = DagRun.find(state="failed", dag_id="digital_bookplate_979", execution_start_date=start_date)
db_979_dags = {"digital_bookplate_979s": []}
dag_runs = DagRun.find(
state=DagRunState.FAILED,
dag_id="digital_bookplate_979",
execution_start_date=start_date,
)
db_979_dags: dict = {"digital_bookplate_979s": []}
for dag_run in dag_runs:
logger.info(f"Found: {dag_run.run_id}")
db_979_dags["digital_bookplate_979s"].append(dag_run.run_id)
Expand All @@ -41,13 +45,13 @@ def run_failed_979_dags(**kwargs):
params = kwargs.get("dags", {})
dag_runs = params.get("digital_bookplate_979s", {})
devs_email_addr = Variable.get("EMAIL_DEVS")

for dag in dag_runs:
logger.info(f"Re-running dag with id: {dag}")
dag_run = DagRun.find(run_id=dag)
ti = dag_run[0].get_task_instance("retrieve_druids_for_instance_task")
prev_val = ti.xcom_pull("retrieve_druids_for_instance_task")

for key, value in prev_val.items():
instance_id = key
fund = value
Expand All @@ -56,7 +60,7 @@ def run_failed_979_dags(**kwargs):
instance_uuid=instance_id, funds=fund
)
logger.info(f"Launching new dag: {new_dag_run_id}")

launch_poll_for_979_dags_email(dag_runs=dag_runs, email=devs_email_addr)

return None
26 changes: 14 additions & 12 deletions tests/digital_bookplates/test_retry_failed_979s.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

from libsys_airflow.plugins.digital_bookplates.dag_979_retries import (
failed_979_dags,
run_failed_979_dags
run_failed_979_dags,
)


def month():
return datetime.now().month


def uuids():
return [
"47bf71da-5ca4-496d-a34e-fbc121cd3f1b",
Expand All @@ -23,38 +24,38 @@ def uuids():
"d062b41a-6809-4f1b-830f-48a3f95df98e",
]


def mock_dag_runs():
mock_dag_runs = []

def mock_get_state():
return 'failed'

def mock_get_task_instance(*args):
task_instance = MagicMock()
task_instance.xcom_pull = mock_xcom_pull
return task_instance

for id, idx in enumerate(uuids()):
mock_dag_run = MagicMock()
mock_dag_run.get_state = mock_get_state
mock_dag_run.run_id = f"scheduled__2024-{month()}-{idx}"
mock_dag_run.dag.dag_id = "digital_bookplate_979"
mock_dag_run.conf = {
"druids_for_instance_id": {id: {}}
}
mock_dag_run.conf = {"druids_for_instance_id": {id: {}}}
mock_dag_run.get_task_instance = mock_get_task_instance
mock_dag_runs.append(mock_dag_run)

return mock_dag_runs


def mock_xcom_pull(*args, **kwargs):
return {
uuids()[-1]: [
{
'fund_name': 'KLEINH',
'druid': 'vy482pt7540',
'image_filename': 'vy482pt7540_00_0001.jp2',
'title': 'The Herbert A. Klein Book Fund'
'fund_name': 'KLEINH',
'druid': 'vy482pt7540',
'image_filename': 'vy482pt7540_00_0001.jp2',
'title': 'The Herbert A. Klein Book Fund',
}
]
}
Expand All @@ -64,6 +65,7 @@ def mock_xcom_pull(*args, **kwargs):
def mock_variable(monkeypatch):
def mock_get(key, *args):
return "[email protected]"

monkeypatch.setattr(Variable, "get", mock_get)


Expand All @@ -82,7 +84,7 @@ def test_find_failed_979_dags(mocker, caplog):
"libsys_airflow.plugins.digital_bookplates.dag_979_retries.DagRun.find",
return_value=mock_dag_runs(),
)

failed_dags = failed_979_dags.function()
assert len(failed_dags["digital_bookplate_979s"]) == 5
assert f"Found: scheduled__2024-{month()}-4" in caplog.text
Expand All @@ -97,7 +99,7 @@ def test_run_failed_979_dags(mocker, mock_variable, mock_dag_bag, caplog):
"libsys_airflow.plugins.digital_bookplates.bookplates.DagBag",
return_value=mock_dag_bag,
)

dag_runs = {"digital_bookplate_979s": mock_dag_runs()}
run_failed_979_dags.function(dags=dag_runs)

Expand Down

0 comments on commit f0a618a

Please sign in to comment.