-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-2855] Check Cron Expression Validity in DagBag.process_file() #3698
Conversation
0fa05cf
to
68f5f3d
Compare
Codecov Report
@@ Coverage Diff @@
## master #3698 +/- ##
==========================================
+ Coverage 77.57% 77.57% +<.01%
==========================================
Files 204 204
Lines 15770 15776 +6
==========================================
+ Hits 12233 12239 +6
Misses 3537 3537
Continue to review full report at Codecov.
|
self.import_errors[dag.full_filepath] = \ | ||
"Invalid Cron expression: " + str(cron_e) | ||
self.file_last_changed[dag.full_filepath] = \ | ||
file_last_changed_on_disk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing something but do we need to perform the check twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One is for .py
DAG file, and the another is for package DAG file (.zip
file).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we append checked m
to mods
here, https://github.com/apache/incubator-airflow/pull/3698/files#diff-a32a363fa616685db3bfefba947535b2R370, and the if statement evaluating zip file or not ends here https://github.com/apache/incubator-airflow/pull/3698/files#diff-a32a363fa616685db3bfefba947535b2R416. So I think the mods
we are iterating here, https://github.com/apache/incubator-airflow/pull/3698/files#diff-a32a363fa616685db3bfefba947535b2R418, is a combination of modules from .py
file and modules from zip files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @yrqls21 , you're right. Thanks for pointing this out.
I'll modify accordingly.
@@ -1038,6 +1038,21 @@ def test_zip(self): | |||
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")) | |||
self.assertTrue(dagbag.get_dag("test_zip_dag")) | |||
|
|||
def test_process_file_cron_validity_check(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we do still want to keep the file in 644 instead of 755 mode.
tests/models.py
Outdated
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d)) | ||
|
||
for d in invalid_dag_files: | ||
self.assertTrue(any([d in k and "Invalid Cron expression" in v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: I think we don't need any
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The statement inside any()
is returning a List of Boolean values.
Any()
helps check if there is at least one True
element.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we expecting more than one import error from importing the dedicated test DAG file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dagbag.import_errors
is a Dict whose Key are file path and Value are exception details.
Here what I did is to check "File is the one I'm intending to check" AND "exception is the exception that I'm looking into". The Length of this Boolean value List is the same as the number of entries in metadata import_error
, in which there may be other errors other than those I'm checking here (here I only try to identify Cron invalidity errors.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sry I missed the part that you are parsing the default DAG_HOME-indeed there might be other import errors. Would you double check what would be processed by calling models.DagBag() please? If "test_invalid_cron.py", "test_zip_invalid_cron.zip"
are included I guess we don't need to reprocess them and if not I guess we don't need to parse the default DAG_HOME at all.
Back to the original point, what originally made me feel a bit strange is that we can actually check the errors in O(n)
but now we do that in O(n^m), m=# of import errors
. But it practically makes no difference so that's why it is a NIT, it's your freedom to give or take.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @yrqls21 , great points!
- For your 1st point, actually you're right. It's not really necessary to re-parse these two files, i.e. the code below can be moved.
for d in invalid_dag_files:
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))
But I purposely keep them here to make it clear to readers what I'm testing here.
- Your 2nd point is great. I agree it makes no practical difference here, given
n
andm
would never be too big here. But I think I will change this part as well. Always good to keep performance in mind.
Thanks for your reviewing!
airflow/models.py
Outdated
@@ -414,6 +416,16 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): | |||
self.bag_dag(dag, parent_dag=dag, root_dag=dag) | |||
found_dags.append(dag) | |||
found_dags += dag.subdags | |||
if isinstance(dag._schedule_interval, six.string_types): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put the check before we append the dag
to found_dags
?( if that's what we do for other dags with import errors)
Thanks @yrqls21 again for the review inputs. Very helpful for refining this PR! Have updated accordingly and ensured all tests passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests/models.py
Outdated
as schedule interval can be identified | ||
""" | ||
invalid_dag_files = ["test_invalid_cron.py", "test_zip_invalid_cron.zip"] | ||
dagbag = models.DagBag() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: I understand that you want your test to show clearly that you are processing those two test files so it make sense to keep the dagbag.process_file
line. Then I guess we can here process something like an empty/made up directory--the test DAG files in the default DAG_HOME may grow and then we have 2x parsing time growth for the additional file, and our CI is already painfully lengthy ;)
tests/models.py
Outdated
for k, v in dagbag.import_errors.items() | ||
if "Invalid Cron expression" in v] | ||
for d in invalid_dag_files: | ||
self.assertTrue(d in files_with_cron_error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sry for being picky here, just NIT, it is still O(n^2)
here since files_with_cron_error
is a list :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yrqls21 come on, n
is 2 here LOL
But I do agree with your another point on the DagBag
's dag_folder
. What I have done in my latest commit is to pass a temporary directory to dag_folder
of DagBag
, so that it will not go scan all the test DAGs.
In addition, changing dag_folder
of DagBag
to a temp directory also makes the testing much easier. We simply need to check whether the number of errors grows from 0
to # of invalid Cron tests
. No more big O
LOL!
I do love your reviews! All valid and helpful points. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOL I was just messing around for the holy perfectionism spirit. We can even be more assertive if we know those two are import error from cron exception. Good job for making the change to improve Airflow! Thank you!
(it's still O(n)
tho ;))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:-| Crazy now LOL
Thanks again! And look forward to your reviewing in any of my future commits!
Refined how we test this new feature, as suggested by @yrqls21 . |
Definitely, looking forward for more of your good work too!
Xiaodong <[email protected]>于2018年8月6日 周一下午7:15写道:
… ***@***.**** commented on this pull request.
------------------------------
In tests/models.py
<#3698 (comment)>
:
> + def test_process_file_cron_validity_check(self):
+ """
+ test if an invalid cron expression
+ as schedule interval can be identified
+ """
+ invalid_dag_files = ["test_invalid_cron.py", "test_zip_invalid_cron.zip"]
+ dagbag = models.DagBag()
+
+ for d in invalid_dag_files:
+ dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))
+
+ files_with_cron_error = [os.path.split(k)[1]
+ for k, v in dagbag.import_errors.items()
+ if "Invalid Cron expression" in v]
+ for d in invalid_dag_files:
+ self.assertTrue(d in files_with_cron_error)
:-| Crazy now LOL
Thanks again! And look forward to your reviewing in any of my future
commits!
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3698 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AHdN1qtmNAbLB2MnwwW7xP9l9YM3MWt8ks5uOPhDgaJpZM4VvZ7k>
.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm
tests/models.py
Outdated
@@ -56,7 +56,7 @@ | |||
from airflow.utils.trigger_rule import TriggerRule | |||
from mock import patch, ANY | |||
from parameterized import parameterized | |||
from tempfile import NamedTemporaryFile | |||
from tempfile import NamedTemporaryFile, mkdtemp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit: could you move mkdtemp before NamedTemporaryFile given we put lower case import before upper case(L57)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated as suggested. Thanks.
A DAG can be imported as a .py script properly, but the Cron expression inside as "schedule_interval" may be invalid, like "0 100 * * *". This commit helps check the validity of Cron expression in DAG files (.py) and packaged DAG files (.zip), and help show exception messages in web UI by add these exceptions into metadata "import_error".
thanks @XD-DENG @yrqls21 |
TY @feng-tao! |
apache#3698) A DAG can be imported as a .py script properly, but the Cron expression inside as "schedule_interval" may be invalid, like "0 100 * * *". This commit helps check the validity of Cron expression in DAG files (.py) and packaged DAG files (.zip), and help show exception messages in web UI by add these exceptions into metadata "import_error".
apache#3698) A DAG can be imported as a .py script properly, but the Cron expression inside as "schedule_interval" may be invalid, like "0 100 * * *". This commit helps check the validity of Cron expression in DAG files (.py) and packaged DAG files (.zip), and help show exception messages in web UI by add these exceptions into metadata "import_error".
apache#3698) A DAG can be imported as a .py script properly, but the Cron expression inside as "schedule_interval" may be invalid, like "0 100 * * *". This commit helps check the validity of Cron expression in DAG files (.py) and packaged DAG files (.zip), and help show exception messages in web UI by add these exceptions into metadata "import_error".
apache#3698) A DAG can be imported as a .py script properly, but the Cron expression inside as "schedule_interval" may be invalid, like "0 100 * * *". This commit helps check the validity of Cron expression in DAG files (.py) and packaged DAG files (.zip), and help show exception messages in web UI by add these exceptions into metadata "import_error".
apache#3698) A DAG can be imported as a .py script properly, but the Cron expression inside as "schedule_interval" may be invalid, like "0 100 * * *". This commit helps check the validity of Cron expression in DAG files (.py) and packaged DAG files (.zip), and help show exception messages in web UI by add these exceptions into metadata "import_error".
A DAG can be imported as a .py script properly, but the Cron expression inside as "schedule_interval" may be invalid, like "0 100 * * *". This commit helps check the validity of Cron expression in DAG files (.py) and packaged DAG files (.zip), and help show exception messages in web UI by add these exceptions into metadata "import_error".
Jira
Description
schedule_interval
of DAGs can either be timedelta or a Cron expression.When it's a Cron expression, there is no mechanism to check its validity at this moment. If there is anything wrong with the Cron expression itself, it will cause issues when methods
following_schedule()
andprevious_schedule()
are invoked (will affect scheduling).However, exceptions will only be written into logs. From Web UI, it’s hard for users to identify this issue & the source while no new task can be initiated (especially for users who’re not very familiar with Cron).
It may be good to show error messages in web UI when a DAG's Cron expression (as schedule_interval) can not be parsed by
croniter
properly (this is implemented by adding these exceptions into metadataimport_error
, whose entries will be shown as error messages in web UI).Related tests are added as well.
Screenshot
Tests
Commits
Documentation
Code Quality
git diff upstream/master -u -- "*.py" | flake8 --diff