From 94a09a5e286e3e70be2519e578650a665ee08170 Mon Sep 17 00:00:00 2001 From: Jake Biesinger Date: Thu, 30 Aug 2018 13:12:46 -0700 Subject: [PATCH] [AIRFLOW-2900] Show code for packaged DAGs (#3749) --- airflow/models.py | 5 +++-- airflow/www/utils.py | 25 ++++++++++++++++++++++--- airflow/www/views.py | 2 +- airflow/www_rbac/utils.py | 20 ++++++++++++++++++++ airflow/www_rbac/views.py | 2 +- tests/www/test_utils.py | 34 ++++++++++++++++++++++++++++++++++ tests/www_rbac/test_utils.py | 35 +++++++++++++++++++++++++++++++++++ 7 files changed, 116 insertions(+), 7 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index 7437a19521b5f..f586139bb1145 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -335,7 +335,8 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): return found_dags mods = [] - if not zipfile.is_zipfile(filepath): + is_zipfile = zipfile.is_zipfile(filepath) + if not is_zipfile: if safe_mode and os.path.isfile(filepath): with open(filepath, 'rb') as f: content = f.read() @@ -407,7 +408,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): if isinstance(dag, DAG): if not dag.full_filepath: dag.full_filepath = filepath - if dag.fileloc != filepath: + if dag.fileloc != filepath and not is_zipfile: dag.fileloc = filepath try: dag.is_subdag = False diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 44fa5c4dcd6fb..09ac465c52d1b 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -18,16 +18,19 @@ # under the License. # from future import standard_library -standard_library.install_aliases() -from builtins import str -from builtins import object +standard_library.install_aliases() # noqa: E402 +from builtins import str, object from cgi import escape from io import BytesIO as IO import functools import gzip +import io import json +import os +import re import time +import zipfile from flask import after_this_request, request, Response from flask_admin.contrib.sqla.filters import FilterConverter @@ -366,6 +369,22 @@ def zipper(response): return view_func +def open_maybe_zipped(f, mode='r'): + """ + Opens the given file. If the path contains a folder with a .zip suffix, then + the folder is treated as a zip archive, opening the file inside the archive. + + :return: a file object, as in `open`, or as in `ZipFile.open`. + """ + + _, archive, filename = re.search( + r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups() + if archive and zipfile.is_zipfile(archive): + return zipfile.ZipFile(archive, mode=mode).open(filename) + else: + return io.open(f, mode=mode) + + def make_cache_key(*args, **kwargs): """ Used by cache to get a unique key per URL diff --git a/airflow/www/views.py b/airflow/www/views.py index 8f6725ef59b44..851ad90cb2770 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -670,7 +670,7 @@ def code(self): dag = dagbag.get_dag(dag_id) title = dag_id try: - with open(dag.fileloc, 'r') as f: + with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f: code = f.read() html_code = highlight( code, lexers.PythonLexer(), HtmlFormatter(linenos=True)) diff --git a/airflow/www_rbac/utils.py b/airflow/www_rbac/utils.py index 7bbdada5556f2..14e18f8bc30e7 100644 --- a/airflow/www_rbac/utils.py +++ b/airflow/www_rbac/utils.py @@ -26,6 +26,10 @@ import wtforms import bleach import markdown +import re +import zipfile +import os +import io from builtins import str from past.builtins import basestring @@ -197,6 +201,22 @@ def json_response(obj): mimetype="application/json") +def open_maybe_zipped(f, mode='r'): + """ + Opens the given file. If the path contains a folder with a .zip suffix, then + the folder is treated as a zip archive, opening the file inside the archive. + + :return: a file object, as in `open`, or as in `ZipFile.open`. + """ + + _, archive, filename = re.search( + r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups() + if archive and zipfile.is_zipfile(archive): + return zipfile.ZipFile(archive, mode=mode).open(filename) + else: + return io.open(f, mode=mode) + + def make_cache_key(*args, **kwargs): """ Used by cache to get a unique key per URL diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index 43e481ed0278c..a65e1d26e3995 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -379,7 +379,7 @@ def code(self): dag = dagbag.get_dag(dag_id) title = dag_id try: - with open(dag.fileloc, 'r') as f: + with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f: code = f.read() html_code = highlight( code, lexers.PythonLexer(), HtmlFormatter(linenos=True)) diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py index 9d788e88f1ed9..f5c3f01fb916d 100644 --- a/tests/www/test_utils.py +++ b/tests/www/test_utils.py @@ -190,6 +190,40 @@ def some_func(): self.assertEqual(anonymous_username, kwargs['owner']) mocked_session_instance.add.assert_called_once() + def test_open_maybe_zipped_normal_file(self): + with mock.patch( + 'io.open', mock.mock_open(read_data="data")) as mock_file: + utils.open_maybe_zipped('/path/to/some/file.txt') + mock_file.assert_called_with('/path/to/some/file.txt', mode='r') + + def test_open_maybe_zipped_normal_file_with_zip_in_name(self): + path = '/path/to/fakearchive.zip.other/file.txt' + with mock.patch( + 'io.open', mock.mock_open(read_data="data")) as mock_file: + utils.open_maybe_zipped(path) + mock_file.assert_called_with(path, mode='r') + + @mock.patch("zipfile.is_zipfile") + @mock.patch("zipfile.ZipFile") + def test_open_maybe_zipped_archive(self, mocked_ZipFile, mocked_is_zipfile): + mocked_is_zipfile.return_value = True + instance = mocked_ZipFile.return_value + instance.open.return_value = mock.mock_open(read_data="data") + + utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt') + + mocked_is_zipfile.assert_called_once() + (args, kwargs) = mocked_is_zipfile.call_args_list[0] + self.assertEqual('/path/to/archive.zip', args[0]) + + mocked_ZipFile.assert_called_once() + (args, kwargs) = mocked_ZipFile.call_args_list[0] + self.assertEqual('/path/to/archive.zip', args[0]) + + instance.open.assert_called_once() + (args, kwargs) = instance.open.call_args_list[0] + self.assertEqual('deep/path/to/file.txt', args[0]) + if __name__ == '__main__': unittest.main() diff --git a/tests/www_rbac/test_utils.py b/tests/www_rbac/test_utils.py index 05114881ddecf..c7179fa3a421d 100644 --- a/tests/www_rbac/test_utils.py +++ b/tests/www_rbac/test_utils.py @@ -18,6 +18,7 @@ # under the License. import unittest +import mock from xml.dom import minidom from airflow.www_rbac import utils @@ -109,6 +110,40 @@ def test_params_all(self): self.assertEqual('page=3&search=bash_&showPaused=False', utils.get_params(showPaused=False, page=3, search='bash_')) + def test_open_maybe_zipped_normal_file(self): + with mock.patch( + 'io.open', mock.mock_open(read_data="data")) as mock_file: + utils.open_maybe_zipped('/path/to/some/file.txt') + mock_file.assert_called_with('/path/to/some/file.txt', mode='r') + + def test_open_maybe_zipped_normal_file_with_zip_in_name(self): + path = '/path/to/fakearchive.zip.other/file.txt' + with mock.patch( + 'io.open', mock.mock_open(read_data="data")) as mock_file: + utils.open_maybe_zipped(path) + mock_file.assert_called_with(path, mode='r') + + @mock.patch("zipfile.is_zipfile") + @mock.patch("zipfile.ZipFile") + def test_open_maybe_zipped_archive(self, mocked_ZipFile, mocked_is_zipfile): + mocked_is_zipfile.return_value = True + instance = mocked_ZipFile.return_value + instance.open.return_value = mock.mock_open(read_data="data") + + utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt') + + mocked_is_zipfile.assert_called_once() + (args, kwargs) = mocked_is_zipfile.call_args_list[0] + self.assertEqual('/path/to/archive.zip', args[0]) + + mocked_ZipFile.assert_called_once() + (args, kwargs) = mocked_ZipFile.call_args_list[0] + self.assertEqual('/path/to/archive.zip', args[0]) + + instance.open.assert_called_once() + (args, kwargs) = instance.open.call_args_list[0] + self.assertEqual('deep/path/to/file.txt', args[0]) + if __name__ == '__main__': unittest.main()