Skip to content

Commit

Permalink
Check Cron Expression Validity in DagBag.process_file() (apache#3698)
Browse files Browse the repository at this point in the history
A DAG can be imported as a .py script properly,
but the Cron expression inside as "schedule_interval" may be
invalid, like "0 100 * * *".

This commit helps check the validity of Cron expression in DAG
files (.py) and packaged DAG files (.zip), and help show
exception messages in web UI by add these exceptions into
metadata "import_error".
  • Loading branch information
XD-DENG authored and Chris Fei committed Jan 23, 2019
1 parent 4738690 commit 97358e8
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 2 deletions.
14 changes: 13 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.orm import reconstructor, relationship, synonym

from croniter import croniter
from croniter import (
croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError
)
import six

from airflow import settings, utils
Expand Down Expand Up @@ -443,8 +445,18 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True, include_d
if not include_dag_ids or any(item.startswith(dag.dag_id) for item in include_dag_ids):
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
if isinstance(dag._schedule_interval, six.string_types):
croniter(dag._schedule_interval)
found_dags.append(dag)
found_dags += dag.subdags
except (CroniterBadCronError,
CroniterBadDateError,
CroniterNotAlphaError) as cron_e:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = \
"Invalid Cron expression: " + str(cron_e)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
except AirflowDagCycleException as cycle_exception:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = str(cycle_exception)
Expand Down
35 changes: 35 additions & 0 deletions tests/dags/test_invalid_cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.timezone import datetime

# The schedule_interval specified here is an INVALID
# Cron expression. This invalid DAG will be used to
# test whether dagbag.process_file() can identify
# invalid Cron expression.
dag1 = DAG(
dag_id='test_invalid_cron',
start_date=datetime(2015, 1, 1),
schedule_interval="0 100 * * *")
dag1_task1 = DummyOperator(
task_id='task1',
dag=dag1,
owner='airflow')
Binary file added tests/dags/test_zip_invalid_cron.zip
Binary file not shown.
15 changes: 14 additions & 1 deletion tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from airflow.utils.trigger_rule import TriggerRule
from mock import patch, ANY
from parameterized import parameterized
from tempfile import NamedTemporaryFile
from tempfile import mkdtemp, NamedTemporaryFile

DEFAULT_DATE = timezone.datetime(2016, 1, 1)
TEST_DAGS_FOLDER = os.path.join(
Expand Down Expand Up @@ -1175,6 +1175,19 @@ def test_zip(self):
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip"))
self.assertTrue(dagbag.get_dag("test_zip_dag"))

def test_process_file_cron_validity_check(self):
"""
test if an invalid cron expression
as schedule interval can be identified
"""
invalid_dag_files = ["test_invalid_cron.py", "test_zip_invalid_cron.zip"]
dagbag = models.DagBag(dag_folder=mkdtemp())

self.assertEqual(len(dagbag.import_errors), 0)
for d in invalid_dag_files:
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))
self.assertEqual(len(dagbag.import_errors), len(invalid_dag_files))

@patch.object(DagModel,'get_current')
def test_get_dag_without_refresh(self, mock_dagmodel):
"""
Expand Down

0 comments on commit 97358e8

Please sign in to comment.