-
Notifications
You must be signed in to change notification settings - Fork 14.5k
/
test_plugins_manager.py
450 lines (343 loc) · 17.1 KB
/
test_plugins_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import importlib
import inspect
import logging
import os
import sys
from pathlib import Path
from unittest import mock
import pytest
from airflow.hooks.base import BaseHook
from airflow.listeners.listener import get_listener_manager
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.module_loading import qualname
from airflow.www import app as application
from setup import AIRFLOW_SOURCES_ROOT
from tests.test_utils.config import conf_vars
from tests.test_utils.mock_plugins import mock_plugin_manager
pytestmark = pytest.mark.db_test
importlib_metadata_string = "importlib_metadata"
try:
import importlib_metadata
# If importlib_metadata is installed, it takes precedence over built-in importlib.metadata in PY39
# so we should use the default declared above
except ImportError:
try:
import importlib.metadata
# only when we do not have importlib_metadata, the importlib.metadata is actually used
importlib_metadata = "importlib.metadata" # type: ignore
except ImportError:
raise Exception(
"Either importlib_metadata must be installed or importlib.metadata must be"
" available in system libraries (Python 3.9+). We seem to have neither."
)
ON_LOAD_EXCEPTION_PLUGIN = """
from airflow.plugins_manager import AirflowPlugin
class AirflowTestOnLoadExceptionPlugin(AirflowPlugin):
name = 'preload'
def on_load(self, *args, **kwargs):
raise Exception("oops")
"""
@pytest.fixture(autouse=True, scope="module")
def clean_plugins():
get_listener_manager().clear()
yield
get_listener_manager().clear()
@pytest.mark.db_test
class TestPluginsRBAC:
@pytest.fixture(autouse=True)
def _set_attrs(self, app):
self.app = app
self.appbuilder = app.appbuilder
def test_flaskappbuilder_views(self):
from tests.plugins.test_plugin import v_appbuilder_package
appbuilder_class_name = str(v_appbuilder_package["view"].__class__.__name__)
plugin_views = [
view for view in self.appbuilder.baseviews if view.blueprint.name == appbuilder_class_name
]
assert len(plugin_views) == 1
# view should have a menu item matching category of v_appbuilder_package
links = [
menu_item
for menu_item in self.appbuilder.menu.menu
if menu_item.name == v_appbuilder_package["category"]
]
assert len(links) == 1
# menu link should also have a link matching the name of the package.
link = links[0]
assert link.name == v_appbuilder_package["category"]
assert link.childs[0].name == v_appbuilder_package["name"]
def test_flaskappbuilder_menu_links(self):
from tests.plugins.test_plugin import appbuilder_mitem, appbuilder_mitem_toplevel
# menu item (category) should exist matching appbuilder_mitem.category
categories = [
menu_item
for menu_item in self.appbuilder.menu.menu
if menu_item.name == appbuilder_mitem["category"]
]
assert len(categories) == 1
# menu link should be a child in the category
category = categories[0]
assert category.name == appbuilder_mitem["category"]
assert category.childs[0].name == appbuilder_mitem["name"]
assert category.childs[0].href == appbuilder_mitem["href"]
# a top level link isn't nested in a category
top_levels = [
menu_item
for menu_item in self.appbuilder.menu.menu
if menu_item.name == appbuilder_mitem_toplevel["name"]
]
assert len(top_levels) == 1
link = top_levels[0]
assert link.href == appbuilder_mitem_toplevel["href"]
assert link.label == appbuilder_mitem_toplevel["label"]
def test_app_blueprints(self):
from tests.plugins.test_plugin import bp
# Blueprint should be present in the app
assert "test_plugin" in self.app.blueprints
assert self.app.blueprints["test_plugin"].name == bp.name
def test_app_static_folder(self):
# Blueprint static folder should be properly set
assert AIRFLOW_SOURCES_ROOT / "airflow" / "www" / "static" == Path(self.app.static_folder).resolve()
@pytest.mark.db_test
def test_flaskappbuilder_nomenu_views():
from tests.plugins.test_plugin import v_nomenu_appbuilder_package
class AirflowNoMenuViewsPlugin(AirflowPlugin):
appbuilder_views = [v_nomenu_appbuilder_package]
appbuilder_class_name = str(v_nomenu_appbuilder_package["view"].__class__.__name__)
with mock_plugin_manager(plugins=[AirflowNoMenuViewsPlugin()]):
appbuilder = application.create_app(testing=True).appbuilder
plugin_views = [view for view in appbuilder.baseviews if view.blueprint.name == appbuilder_class_name]
assert len(plugin_views) == 1
class TestPluginsManager:
@pytest.fixture(autouse=True, scope="function")
def clean_plugins(self):
from airflow import plugins_manager
plugins_manager.loaded_plugins = set()
plugins_manager.plugins = []
def test_no_log_when_no_plugins(self, caplog):
with mock_plugin_manager(plugins=[]):
from airflow import plugins_manager
plugins_manager.ensure_plugins_loaded()
assert caplog.record_tuples == []
def test_should_load_plugins_from_property(self, caplog):
class AirflowTestPropertyPlugin(AirflowPlugin):
name = "test_property_plugin"
@property
def hooks(self):
class TestPropertyHook(BaseHook):
pass
return [TestPropertyHook]
with mock_plugin_manager(plugins=[AirflowTestPropertyPlugin()]):
from airflow import plugins_manager
caplog.set_level(logging.DEBUG, "airflow.plugins_manager")
plugins_manager.ensure_plugins_loaded()
assert "AirflowTestPropertyPlugin" in str(plugins_manager.plugins)
assert "TestPropertyHook" in str(plugins_manager.registered_hooks)
assert caplog.records[-1].levelname == "DEBUG"
assert caplog.records[-1].msg == "Loading %d plugin(s) took %.2f seconds"
def test_loads_filesystem_plugins(self, caplog):
from airflow import plugins_manager
with mock.patch("airflow.plugins_manager.plugins", []):
plugins_manager.load_plugins_from_plugin_directory()
assert 6 == len(plugins_manager.plugins)
for plugin in plugins_manager.plugins:
if "AirflowTestOnLoadPlugin" in str(plugin):
assert "postload" == plugin.name
break
else:
pytest.fail("Wasn't able to find a registered `AirflowTestOnLoadPlugin`")
assert caplog.record_tuples == []
def test_loads_filesystem_plugins_exception(self, caplog, tmp_path):
from airflow import plugins_manager
with mock.patch("airflow.plugins_manager.plugins", []):
(tmp_path / "testplugin.py").write_text(ON_LOAD_EXCEPTION_PLUGIN)
with conf_vars({("core", "plugins_folder"): os.fspath(tmp_path)}):
plugins_manager.load_plugins_from_plugin_directory()
assert plugins_manager.plugins == []
received_logs = caplog.text
assert "Failed to import plugin" in received_logs
assert "testplugin.py" in received_logs
def test_should_warning_about_incompatible_plugins(self, caplog):
class AirflowAdminViewsPlugin(AirflowPlugin):
name = "test_admin_views_plugin"
admin_views = [mock.MagicMock()]
class AirflowAdminMenuLinksPlugin(AirflowPlugin):
name = "test_menu_links_plugin"
menu_links = [mock.MagicMock()]
with mock_plugin_manager(
plugins=[AirflowAdminViewsPlugin(), AirflowAdminMenuLinksPlugin()]
), caplog.at_level(logging.WARNING, logger="airflow.plugins_manager"):
from airflow import plugins_manager
plugins_manager.initialize_web_ui_plugins()
assert caplog.record_tuples == [
(
"airflow.plugins_manager",
logging.WARNING,
"Plugin 'test_admin_views_plugin' may not be compatible with the current Airflow version. "
"Please contact the author of the plugin.",
),
(
"airflow.plugins_manager",
logging.WARNING,
"Plugin 'test_menu_links_plugin' may not be compatible with the current Airflow version. "
"Please contact the author of the plugin.",
),
]
def test_should_not_warning_about_fab_plugins(self, caplog):
class AirflowAdminViewsPlugin(AirflowPlugin):
name = "test_admin_views_plugin"
appbuilder_views = [mock.MagicMock()]
class AirflowAdminMenuLinksPlugin(AirflowPlugin):
name = "test_menu_links_plugin"
appbuilder_menu_items = [mock.MagicMock()]
with mock_plugin_manager(
plugins=[AirflowAdminViewsPlugin(), AirflowAdminMenuLinksPlugin()]
), caplog.at_level(logging.WARNING, logger="airflow.plugins_manager"):
from airflow import plugins_manager
plugins_manager.initialize_web_ui_plugins()
assert caplog.record_tuples == []
def test_should_not_warning_about_fab_and_flask_admin_plugins(self, caplog):
class AirflowAdminViewsPlugin(AirflowPlugin):
name = "test_admin_views_plugin"
admin_views = [mock.MagicMock()]
appbuilder_views = [mock.MagicMock()]
class AirflowAdminMenuLinksPlugin(AirflowPlugin):
name = "test_menu_links_plugin"
menu_links = [mock.MagicMock()]
appbuilder_menu_items = [mock.MagicMock()]
with mock_plugin_manager(
plugins=[AirflowAdminViewsPlugin(), AirflowAdminMenuLinksPlugin()]
), caplog.at_level(logging.WARNING, logger="airflow.plugins_manager"):
from airflow import plugins_manager
plugins_manager.initialize_web_ui_plugins()
assert caplog.record_tuples == []
def test_entrypoint_plugin_errors_dont_raise_exceptions(self, caplog):
"""
Test that Airflow does not raise an error if there is any Exception because of a plugin.
"""
from airflow.plugins_manager import import_errors, load_entrypoint_plugins
mock_dist = mock.Mock()
mock_dist.metadata = {"Name": "test-dist"}
mock_entrypoint = mock.Mock()
mock_entrypoint.name = "test-entrypoint"
mock_entrypoint.group = "airflow.plugins"
mock_entrypoint.module = "test.plugins.test_plugins_manager"
mock_entrypoint.load.side_effect = ImportError("my_fake_module not found")
mock_dist.entry_points = [mock_entrypoint]
with mock.patch(
f"{importlib_metadata_string}.distributions", return_value=[mock_dist]
), caplog.at_level(logging.ERROR, logger="airflow.plugins_manager"):
load_entrypoint_plugins()
received_logs = caplog.text
# Assert Traceback is shown too
assert "Traceback (most recent call last):" in received_logs
assert "my_fake_module not found" in received_logs
assert "Failed to import plugin test-entrypoint" in received_logs
assert ("test.plugins.test_plugins_manager", "my_fake_module not found") in import_errors.items()
def test_registering_plugin_macros(self, request):
"""
Tests whether macros that originate from plugins are being registered correctly.
"""
from airflow import macros
from airflow.plugins_manager import integrate_macros_plugins
def cleanup_macros():
"""Reloads the airflow.macros module such that the symbol table is reset after the test."""
# We're explicitly deleting the module from sys.modules and importing it again
# using import_module() as opposed to using importlib.reload() because the latter
# does not undo the changes to the airflow.macros module that are being caused by
# invoking integrate_macros_plugins()
del sys.modules["airflow.macros"]
importlib.import_module("airflow.macros")
request.addfinalizer(cleanup_macros)
def custom_macro():
return "foo"
class MacroPlugin(AirflowPlugin):
name = "macro_plugin"
macros = [custom_macro]
with mock_plugin_manager(plugins=[MacroPlugin()]):
# Ensure the macros for the plugin have been integrated.
integrate_macros_plugins()
# Test whether the modules have been created as expected.
plugin_macros = importlib.import_module(f"airflow.macros.{MacroPlugin.name}")
for macro in MacroPlugin.macros:
# Verify that the macros added by the plugin are being set correctly
# on the plugin's macro module.
assert hasattr(plugin_macros, macro.__name__)
# Verify that the symbol table in airflow.macros has been updated with an entry for
# this plugin, this is necessary in order to allow the plugin's macros to be used when
# rendering templates.
assert hasattr(macros, MacroPlugin.name)
def test_registering_plugin_listeners(self):
from airflow import plugins_manager
with mock.patch("airflow.plugins_manager.plugins", []):
plugins_manager.load_plugins_from_plugin_directory()
plugins_manager.integrate_listener_plugins(get_listener_manager())
assert get_listener_manager().has_listeners
listeners = get_listener_manager().pm.get_plugins()
listener_names = [el.__name__ if inspect.ismodule(el) else qualname(el) for el in listeners]
# sort names as order of listeners is not guaranteed
assert [
"tests.listeners.class_listener.ClassBasedListener",
"tests.listeners.empty_listener",
] == sorted(listener_names)
def test_should_import_plugin_from_providers(self):
from airflow import plugins_manager
with mock.patch("airflow.plugins_manager.plugins", []):
assert len(plugins_manager.plugins) == 0
plugins_manager.load_providers_plugins()
assert len(plugins_manager.plugins) >= 2
def test_does_not_double_import_entrypoint_provider_plugins(self):
from airflow import plugins_manager
mock_entrypoint = mock.Mock()
mock_entrypoint.name = "test-entrypoint-plugin"
mock_entrypoint.module = "module_name_plugin"
mock_dist = mock.Mock()
mock_dist.metadata = {"Name": "test-entrypoint-plugin"}
mock_dist.version = "1.0.0"
mock_dist.entry_points = [mock_entrypoint]
with mock.patch("airflow.plugins_manager.plugins", []):
assert len(plugins_manager.plugins) == 0
plugins_manager.load_entrypoint_plugins()
plugins_manager.load_providers_plugins()
assert len(plugins_manager.plugins) == 2
class TestPluginsDirectorySource:
def test_should_return_correct_path_name(self):
from airflow import plugins_manager
source = plugins_manager.PluginsDirectorySource(__file__)
assert "test_plugins_manager.py" == source.path
assert "$PLUGINS_FOLDER/test_plugins_manager.py" == str(source)
assert "<em>$PLUGINS_FOLDER/</em>test_plugins_manager.py" == source.__html__()
class TestEntryPointSource:
def test_should_return_correct_source_details(self):
from airflow import plugins_manager
mock_entrypoint = mock.Mock()
mock_entrypoint.name = "test-entrypoint-plugin"
mock_entrypoint.module = "module_name_plugin"
mock_dist = mock.Mock()
mock_dist.metadata = {"Name": "test-entrypoint-plugin"}
mock_dist.version = "1.0.0"
mock_dist.entry_points = [mock_entrypoint]
with mock.patch(f"{importlib_metadata_string}.distributions", return_value=[mock_dist]):
plugins_manager.load_entrypoint_plugins()
source = plugins_manager.EntryPointSource(mock_entrypoint, mock_dist)
assert str(mock_entrypoint) == source.entrypoint
assert "test-entrypoint-plugin==1.0.0: " + str(mock_entrypoint) == str(source)
assert "<em>test-entrypoint-plugin==1.0.0:</em> " + str(mock_entrypoint) == source.__html__()