Skip to content

Commit

Permalink
[AIRFLOW-2900] Show code for packaged DAGs (apache#3749)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakebiesinger authored and galak75 committed Nov 23, 2018
1 parent f1804be commit 94a09a5
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 7 deletions.
5 changes: 3 additions & 2 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
return found_dags

mods = []
if not zipfile.is_zipfile(filepath):
is_zipfile = zipfile.is_zipfile(filepath)
if not is_zipfile:
if safe_mode and os.path.isfile(filepath):
with open(filepath, 'rb') as f:
content = f.read()
Expand Down Expand Up @@ -407,7 +408,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
if isinstance(dag, DAG):
if not dag.full_filepath:
dag.full_filepath = filepath
if dag.fileloc != filepath:
if dag.fileloc != filepath and not is_zipfile:
dag.fileloc = filepath
try:
dag.is_subdag = False
Expand Down
25 changes: 22 additions & 3 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
# under the License.
#
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import object
standard_library.install_aliases() # noqa: E402
from builtins import str, object

from cgi import escape
from io import BytesIO as IO
import functools
import gzip
import io
import json
import os
import re
import time
import zipfile

from flask import after_this_request, request, Response
from flask_admin.contrib.sqla.filters import FilterConverter
Expand Down Expand Up @@ -366,6 +369,22 @@ def zipper(response):
return view_func


def open_maybe_zipped(f, mode='r'):
"""
Opens the given file. If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive, opening the file inside the archive.
:return: a file object, as in `open`, or as in `ZipFile.open`.
"""

_, archive, filename = re.search(
r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups()
if archive and zipfile.is_zipfile(archive):
return zipfile.ZipFile(archive, mode=mode).open(filename)
else:
return io.open(f, mode=mode)


def make_cache_key(*args, **kwargs):
"""
Used by cache to get a unique key per URL
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ def code(self):
dag = dagbag.get_dag(dag_id)
title = dag_id
try:
with open(dag.fileloc, 'r') as f:
with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
code = f.read()
html_code = highlight(
code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
Expand Down
20 changes: 20 additions & 0 deletions airflow/www_rbac/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import wtforms
import bleach
import markdown
import re
import zipfile
import os
import io

from builtins import str
from past.builtins import basestring
Expand Down Expand Up @@ -197,6 +201,22 @@ def json_response(obj):
mimetype="application/json")


def open_maybe_zipped(f, mode='r'):
"""
Opens the given file. If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive, opening the file inside the archive.
:return: a file object, as in `open`, or as in `ZipFile.open`.
"""

_, archive, filename = re.search(
r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups()
if archive and zipfile.is_zipfile(archive):
return zipfile.ZipFile(archive, mode=mode).open(filename)
else:
return io.open(f, mode=mode)


def make_cache_key(*args, **kwargs):
"""
Used by cache to get a unique key per URL
Expand Down
2 changes: 1 addition & 1 deletion airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def code(self):
dag = dagbag.get_dag(dag_id)
title = dag_id
try:
with open(dag.fileloc, 'r') as f:
with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
code = f.read()
html_code = highlight(
code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
Expand Down
34 changes: 34 additions & 0 deletions tests/www/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,40 @@ def some_func():
self.assertEqual(anonymous_username, kwargs['owner'])
mocked_session_instance.add.assert_called_once()

def test_open_maybe_zipped_normal_file(self):
with mock.patch(
'io.open', mock.mock_open(read_data="data")) as mock_file:
utils.open_maybe_zipped('/path/to/some/file.txt')
mock_file.assert_called_with('/path/to/some/file.txt', mode='r')

def test_open_maybe_zipped_normal_file_with_zip_in_name(self):
path = '/path/to/fakearchive.zip.other/file.txt'
with mock.patch(
'io.open', mock.mock_open(read_data="data")) as mock_file:
utils.open_maybe_zipped(path)
mock_file.assert_called_with(path, mode='r')

@mock.patch("zipfile.is_zipfile")
@mock.patch("zipfile.ZipFile")
def test_open_maybe_zipped_archive(self, mocked_ZipFile, mocked_is_zipfile):
mocked_is_zipfile.return_value = True
instance = mocked_ZipFile.return_value
instance.open.return_value = mock.mock_open(read_data="data")

utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt')

mocked_is_zipfile.assert_called_once()
(args, kwargs) = mocked_is_zipfile.call_args_list[0]
self.assertEqual('/path/to/archive.zip', args[0])

mocked_ZipFile.assert_called_once()
(args, kwargs) = mocked_ZipFile.call_args_list[0]
self.assertEqual('/path/to/archive.zip', args[0])

instance.open.assert_called_once()
(args, kwargs) = instance.open.call_args_list[0]
self.assertEqual('deep/path/to/file.txt', args[0])


if __name__ == '__main__':
unittest.main()
35 changes: 35 additions & 0 deletions tests/www_rbac/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.

import unittest
import mock
from xml.dom import minidom

from airflow.www_rbac import utils
Expand Down Expand Up @@ -109,6 +110,40 @@ def test_params_all(self):
self.assertEqual('page=3&search=bash_&showPaused=False',
utils.get_params(showPaused=False, page=3, search='bash_'))

def test_open_maybe_zipped_normal_file(self):
with mock.patch(
'io.open', mock.mock_open(read_data="data")) as mock_file:
utils.open_maybe_zipped('/path/to/some/file.txt')
mock_file.assert_called_with('/path/to/some/file.txt', mode='r')

def test_open_maybe_zipped_normal_file_with_zip_in_name(self):
path = '/path/to/fakearchive.zip.other/file.txt'
with mock.patch(
'io.open', mock.mock_open(read_data="data")) as mock_file:
utils.open_maybe_zipped(path)
mock_file.assert_called_with(path, mode='r')

@mock.patch("zipfile.is_zipfile")
@mock.patch("zipfile.ZipFile")
def test_open_maybe_zipped_archive(self, mocked_ZipFile, mocked_is_zipfile):
mocked_is_zipfile.return_value = True
instance = mocked_ZipFile.return_value
instance.open.return_value = mock.mock_open(read_data="data")

utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt')

mocked_is_zipfile.assert_called_once()
(args, kwargs) = mocked_is_zipfile.call_args_list[0]
self.assertEqual('/path/to/archive.zip', args[0])

mocked_ZipFile.assert_called_once()
(args, kwargs) = mocked_ZipFile.call_args_list[0]
self.assertEqual('/path/to/archive.zip', args[0])

instance.open.assert_called_once()
(args, kwargs) = instance.open.call_args_list[0]
self.assertEqual('deep/path/to/file.txt', args[0])


if __name__ == '__main__':
unittest.main()

0 comments on commit 94a09a5

Please sign in to comment.