diff --git a/airflow/models.py b/airflow/models.py index 428923ff9e73f..0207ac25252dc 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -61,7 +61,9 @@ from sqlalchemy.ext.declarative import declarative_base, declared_attr from sqlalchemy.orm import reconstructor, relationship, synonym -from croniter import croniter +from croniter import ( + croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError +) import six from airflow import settings, utils @@ -413,8 +415,18 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): try: dag.is_subdag = False self.bag_dag(dag, parent_dag=dag, root_dag=dag) + if isinstance(dag._schedule_interval, six.string_types): + croniter(dag._schedule_interval) found_dags.append(dag) found_dags += dag.subdags + except (CroniterBadCronError, + CroniterBadDateError, + CroniterNotAlphaError) as cron_e: + self.log.exception("Failed to bag_dag: %s", dag.full_filepath) + self.import_errors[dag.full_filepath] = \ + "Invalid Cron expression: " + str(cron_e) + self.file_last_changed[dag.full_filepath] = \ + file_last_changed_on_disk except AirflowDagCycleException as cycle_exception: self.log.exception("Failed to bag_dag: %s", dag.full_filepath) self.import_errors[dag.full_filepath] = str(cycle_exception) diff --git a/tests/dags/test_invalid_cron.py b/tests/dags/test_invalid_cron.py new file mode 100755 index 0000000000000..51a0e43cb500a --- /dev/null +++ b/tests/dags/test_invalid_cron.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.timezone import datetime + +# The schedule_interval specified here is an INVALID +# Cron expression. This invalid DAG will be used to +# test whether dagbag.process_file() can identify +# invalid Cron expression. +dag1 = DAG( + dag_id='test_invalid_cron', + start_date=datetime(2015, 1, 1), + schedule_interval="0 100 * * *") +dag1_task1 = DummyOperator( + task_id='task1', + dag=dag1, + owner='airflow') diff --git a/tests/dags/test_zip_invalid_cron.zip b/tests/dags/test_zip_invalid_cron.zip new file mode 100644 index 0000000000000..fe45153abe85a Binary files /dev/null and b/tests/dags/test_zip_invalid_cron.zip differ diff --git a/tests/models.py b/tests/models.py index 55fa41bd90bab..5b31a634f4a7f 100644 --- a/tests/models.py +++ b/tests/models.py @@ -56,7 +56,7 @@ from airflow.utils.trigger_rule import TriggerRule from mock import patch, ANY from parameterized import parameterized -from tempfile import NamedTemporaryFile +from tempfile import mkdtemp, NamedTemporaryFile DEFAULT_DATE = timezone.datetime(2016, 1, 1) TEST_DAGS_FOLDER = os.path.join( @@ -1175,6 +1175,19 @@ def test_zip(self): dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")) self.assertTrue(dagbag.get_dag("test_zip_dag")) + def test_process_file_cron_validity_check(self): + """ + test if an invalid cron expression + as schedule interval can be identified + """ + invalid_dag_files = ["test_invalid_cron.py", "test_zip_invalid_cron.zip"] + dagbag = models.DagBag(dag_folder=mkdtemp()) + + self.assertEqual(len(dagbag.import_errors), 0) + for d in invalid_dag_files: + dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d)) + self.assertEqual(len(dagbag.import_errors), len(invalid_dag_files)) + @patch.object(DagModel,'get_current') def test_get_dag_without_refresh(self, mock_dagmodel): """