From 8aeed4283a570ad8958b39805e6a0813a8c4c2c1 Mon Sep 17 00:00:00 2001 From: Xiaodong Date: Wed, 8 Aug 2018 09:07:43 +0800 Subject: [PATCH] [AIRFLOW-2855] Check Cron Expression Validity in DagBag.process_file() (#3698) A DAG can be imported as a .py script properly, but the Cron expression inside as "schedule_interval" may be invalid, like "0 100 * * *". This commit helps check the validity of Cron expression in DAG files (.py) and packaged DAG files (.zip), and help show exception messages in web UI by add these exceptions into metadata "import_error". --- airflow/models.py | 14 ++++++++++- tests/dags/test_invalid_cron.py | 35 +++++++++++++++++++++++++++ tests/dags/test_zip_invalid_cron.zip | Bin 0 -> 1389 bytes tests/models.py | 15 +++++++++++- 4 files changed, 62 insertions(+), 2 deletions(-) create mode 100755 tests/dags/test_invalid_cron.py create mode 100644 tests/dags/test_zip_invalid_cron.zip diff --git a/airflow/models.py b/airflow/models.py index 428923ff9e73f..0207ac25252dc 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -61,7 +61,9 @@ from sqlalchemy.ext.declarative import declarative_base, declared_attr from sqlalchemy.orm import reconstructor, relationship, synonym -from croniter import croniter +from croniter import ( + croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError +) import six from airflow import settings, utils @@ -413,8 +415,18 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): try: dag.is_subdag = False self.bag_dag(dag, parent_dag=dag, root_dag=dag) + if isinstance(dag._schedule_interval, six.string_types): + croniter(dag._schedule_interval) found_dags.append(dag) found_dags += dag.subdags + except (CroniterBadCronError, + CroniterBadDateError, + CroniterNotAlphaError) as cron_e: + self.log.exception("Failed to bag_dag: %s", dag.full_filepath) + self.import_errors[dag.full_filepath] = \ + "Invalid Cron expression: " + str(cron_e) + self.file_last_changed[dag.full_filepath] = \ + file_last_changed_on_disk except AirflowDagCycleException as cycle_exception: self.log.exception("Failed to bag_dag: %s", dag.full_filepath) self.import_errors[dag.full_filepath] = str(cycle_exception) diff --git a/tests/dags/test_invalid_cron.py b/tests/dags/test_invalid_cron.py new file mode 100755 index 0000000000000..51a0e43cb500a --- /dev/null +++ b/tests/dags/test_invalid_cron.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.timezone import datetime + +# The schedule_interval specified here is an INVALID +# Cron expression. This invalid DAG will be used to +# test whether dagbag.process_file() can identify +# invalid Cron expression. +dag1 = DAG( + dag_id='test_invalid_cron', + start_date=datetime(2015, 1, 1), + schedule_interval="0 100 * * *") +dag1_task1 = DummyOperator( + task_id='task1', + dag=dag1, + owner='airflow') diff --git a/tests/dags/test_zip_invalid_cron.zip b/tests/dags/test_zip_invalid_cron.zip new file mode 100644 index 0000000000000000000000000000000000000000..fe45153abe85a1b22026bc3f3c36b09d6b06d585 GIT binary patch literal 1389 zcmWIWW@Zs#-~hr?ovgkLNI--^fT1L{xFkL^uPiYqGbKK`C_hiHpfWUqhk@llTJ-Dv zY0));&!?w`#OB{}5UBaSK4K!f0u%E@tv}}5CSEy1-neSBJRwQlaBuA z49Ut~X1O+0RXDA6k_p$MuQLjs%$o9qLEE?0DQL}-tB(RASw3x3*yvLnSCHB3FH(3f z1!?U_emC8T3%n)n)wsnL15)S9Bm-MGB67m*$D7AS`i_3zH42I>E$F>MJAA9QA z{U*cn>ZI&q5!GF9=k#UV4K~cZyH2xYUdYvx7QuHU4Hvp!DJ)w5$Fg?$JWU;kG92zVYxJmn5qVdaCtvnO zCnjS++=ETcir}llomX=e@UTb#Tso zw(1-A)DL$o#Jr-TSnciNw5O!ssCuj(8aU_M-=FdaT(`%5P~Ngh$$XzE>;LBCSC~Is z+0k7ocVnlmnM8Te(;EFHPk)K#FAV0soEU1%p{9AqhOPV|*8=I@$PzZErTtnSf?y|v#md<BdUw+e@^-z`HP-iiUwk=Q?A@xR4~uqPW6+h|U~5{ha5~X^-oqQexdXh}Ij+|} zRJ+c^z+lG8zz_h;C|uy|!P|wJQ8Dos`-UeEi*H>bM|EwW8-4w(<<4|dlvn<6OygYY%Cn^?(7`P zt!|tgD_oo|9?Pw4>>eBLZqCgeY_80mt}e_53^R6)=^4?5kRO