From db7ce1b6a539ec6fec3f543b4c7f674440ee49ad Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 17 Nov 2020 18:48:57 +0100 Subject: [PATCH] Adds mechanism for provider package discovery. (#12383) This is a simple mechanism that will allow us to dynamically discover and register all provider packages in the Airflow core. Closes: #11422 GitOrigin-RevId: 2c0920fba5d2f05d2e29cead91127686af277ec2 --- BREEZE.rst | 29 +++--- MANIFEST.in | 1 + STATIC_CODE_CHECKS.rst | 2 + {dev => airflow}/provider.yaml.schema.json | 0 airflow/providers/google/provider.yaml | 3 + airflow/providers_manager.py | 85 +++++++++++++++++ breeze-complete | 1 + .../MANIFEST_TEMPLATE.in.jinja2 | 2 + docs/exts/provider_yaml_utils.py | 2 +- .../pre_commit_check_provider_yaml_files.py | 32 +++++-- setup.py | 1 + tests/core/test_providers_manager.py | 91 +++++++++++++++++++ 12 files changed, 224 insertions(+), 25 deletions(-) rename {dev => airflow}/provider.yaml.schema.json (100%) create mode 100644 airflow/providers_manager.py create mode 100644 tests/core/test_providers_manager.py diff --git a/BREEZE.rst b/BREEZE.rst index 364dd7d8624..21582415369 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -2006,20 +2006,21 @@ This is the current syntax for `./breeze <./breeze>`_: Run selected static checks for currently changed files. You should specify static check that you would like to run or 'all' to run all checks. One of: - all all-but-pylint airflow-config-yaml airflow-providers-available base-operator - bats-tests bats-in-container-tests black build build-providers-dependencies - check-apache-license check-builtin-literals check-executables-have-shebangs - check-hooks-apply check-integrations check-merge-conflict check-xml - consistent-pylint daysago-import-check debug-statements detect-private-key doctoc - dont-use-safe-filter end-of-file-fixer fix-encoding-pragma flake8 forbid-tabs - helm-lint incorrect-use-of-LoggingMixin insert-license isort language-matters - lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm - no-providers-in-core-examples no-relative-imports pre-commit-descriptions - provide-create-sessions providers-init-file provider-yamls pydevd pydocstyle pylint - pylint-tests python-no-log-warn pyupgrade restrict-start_date rst-backticks - setup-order setup-extra-packages shellcheck sort-in-the-wild stylelint - trailing-whitespace update-breeze-file update-extras update-local-yml-file - update-setup-cfg-file version-sync yamllint + all all-but-pylint airflow-config-yaml airflow-providers-available + airflow-provider-yaml-files-ok base-operator bats-tests bats-in-container-tests + black build build-providers-dependencies check-apache-license check-builtin-literals + check-executables-have-shebangs check-hooks-apply check-integrations + check-merge-conflict check-xml consistent-pylint daysago-import-check + debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer + fix-encoding-pragma flake8 forbid-tabs helm-lint incorrect-use-of-LoggingMixin + insert-license isort language-matters lint-dockerfile lint-openapi markdownlint + mermaid mixed-line-ending mypy mypy-helm no-providers-in-core-examples + no-relative-imports pre-commit-descriptions provide-create-sessions + providers-init-file provider-yamls pydevd pydocstyle pylint pylint-tests + python-no-log-warn pyupgrade restrict-start_date rst-backticks setup-order + setup-extra-packages shellcheck sort-in-the-wild stylelint trailing-whitespace + update-breeze-file update-extras update-local-yml-file update-setup-cfg-file + version-sync yamllint You can pass extra arguments including options to to the pre-commit framework as passed after --. For example: diff --git a/MANIFEST.in b/MANIFEST.in index 1bbfc0a85f7..a0b613374c0 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -32,5 +32,6 @@ global-exclude __pycache__ *.pyc include airflow/alembic.ini include airflow/api_connexion/openapi/v1.yaml include airflow/git_version +include airflow/provider.yaml.schema.json include airflow/serialization/schema.json include airflow/utils/python_virtualenv_script.jinja2 diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst index ad7b12df1ac..4a47ba096fd 100644 --- a/STATIC_CODE_CHECKS.rst +++ b/STATIC_CODE_CHECKS.rst @@ -50,6 +50,8 @@ require Breeze Docker images to be installed locally: ----------------------------------- ---------------------------------------------------------------- ------------ ``airflow-providers-available`` Checks that providers are properly declared by extras ----------------------------------- ---------------------------------------------------------------- ------------ +``airflow-provider-yaml-files-ok`` Checks that providers yaml files are valid +----------------------------------- ---------------------------------------------------------------- ------------ ``base-operator`` Checks that BaseOperator is imported properly ----------------------------------- ---------------------------------------------------------------- ------------ ``bats-tests`` Runs BATS bash unit tests diff --git a/dev/provider.yaml.schema.json b/airflow/provider.yaml.schema.json similarity index 100% rename from dev/provider.yaml.schema.json rename to airflow/provider.yaml.schema.json diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 0c953871d14..2de985d66bc 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -382,6 +382,9 @@ sensors: - integration-name: Google Cloud Storage Transfer Service python-modules: - airflow.providers.google.cloud.sensors.cloud_storage_transfer_service + - integration-name: Google Dataflow + python-modules: + - airflow.providers.google.cloud.sensors.dataflow - integration-name: Google Dataproc python-modules: - airflow.providers.google.cloud.sensors.dataproc diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py new file mode 100644 index 00000000000..cf5b1d90120 --- /dev/null +++ b/airflow/providers_manager.py @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manages all providers.""" +import importlib +import json +import logging +import pkgutil +import traceback +from typing import Dict + +import jsonschema +import yaml + +try: + import importlib.resources as importlib_resources +except ImportError: + # Try backported to PY<37 `importlib_resources`. + import importlib_resources + + +log = logging.getLogger(__name__) + + +def _load_schema() -> Dict: + return json.loads(importlib_resources.read_text('airflow', 'provider.yaml.schema.json')) + + +class ProvidersManager: + """Manages all provider packages.""" + + def __init__(self): + self._provider_directory = {} + try: + from airflow import providers + except ImportError as e: + log.warning("No providers are present or error when importing them! :%s", e) + return + self._schema = _load_schema() + self.__find_all_providers(providers.__path__) + + def __find_all_providers(self, paths: str): + def onerror(_): + exception_string = traceback.format_exc() + log.warning(exception_string) + + for module_info in pkgutil.walk_packages(paths, prefix="airflow.providers.", onerror=onerror): + try: + imported_module = importlib.import_module(module_info.name) + except Exception as e: # noqa pylint: disable=broad-except + log.warning("Error when importing %s:%s", module_info.name, e) + continue + try: + provider = importlib_resources.read_text(imported_module, 'provider.yaml') + provider_info = yaml.safe_load(provider) + jsonschema.validate(provider_info, schema=self._schema) + self._provider_directory[provider_info['package-name']] = provider_info + except FileNotFoundError: + # This is OK - this is not a provider package + pass + except TypeError as e: + if "is not a package" not in str(e): + log.warning("Error when loading 'provider.yaml' file from %s:%s}", module_info.name, e) + # Otherwise this is OK - this is likely a module + except Exception as e: # noqa pylint: disable=broad-except + log.warning("Error when loading 'provider.yaml' file from %s:%s", module_info.name, e) + + @property + def providers(self): + """Returns information about available providers.""" + return self._provider_directory diff --git a/breeze-complete b/breeze-complete index cc316675673..2e6b4d457d9 100644 --- a/breeze-complete +++ b/breeze-complete @@ -69,6 +69,7 @@ all all-but-pylint airflow-config-yaml airflow-providers-available +airflow-provider-yaml-files-ok base-operator bats-tests bats-in-container-tests diff --git a/dev/provider_packages/MANIFEST_TEMPLATE.in.jinja2 b/dev/provider_packages/MANIFEST_TEMPLATE.in.jinja2 index 4fd2e90309a..9963ebd81ae 100644 --- a/dev/provider_packages/MANIFEST_TEMPLATE.in.jinja2 +++ b/dev/provider_packages/MANIFEST_TEMPLATE.in.jinja2 @@ -27,6 +27,8 @@ include airflow/providers/google/cloud/example_dags/*.sql include airflow/providers/papermill/example_dags/*.ipynb {% endif %} +include airflow/providers/{{ PROVIDER_PATH }}/provider.yaml + include NOTICE include LICENSE include CHANGELOG.txt diff --git a/docs/exts/provider_yaml_utils.py b/docs/exts/provider_yaml_utils.py index a06e9f4b905..5e9b1ae25d3 100644 --- a/docs/exts/provider_yaml_utils.py +++ b/docs/exts/provider_yaml_utils.py @@ -24,7 +24,7 @@ import yaml ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) -PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "dev", "provider.yaml.schema.json") +PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "airflow", "provider.yaml.schema.json") def _load_schema() -> Dict[str, Any]: diff --git a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py index eef82c5818e..fd748016119 100755 --- a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py +++ b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py @@ -23,7 +23,7 @@ from collections import Counter from glob import glob from itertools import chain, product -from typing import Any, Dict +from typing import Any, Dict, Iterable import jsonschema import yaml @@ -36,7 +36,7 @@ ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir)) DOCS_DIR = os.path.join(ROOT_DIR, 'docs') -PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "dev", "provider.yaml.schema.json") +PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "airflow", "provider.yaml.schema.json") CORE_INTEGRATIONS = ["SQL", "Local"] @@ -53,10 +53,10 @@ def _load_schema() -> Dict[str, Any]: return content -def _load_package_data(): +def _load_package_data(package_paths: Iterable[str]): schema = _load_schema() result = {} - for provider_yaml_path in sorted(glob(f"{ROOT_DIR}/airflow/providers/**/provider.yaml", recursive=True)): + for provider_yaml_path in package_paths: with open(provider_yaml_path) as yaml_file: provider = yaml.safe_load(yaml_file) rel_path = os.path.relpath(provider_yaml_path, ROOT_DIR) @@ -259,10 +259,22 @@ def check_doc_files(yaml_files: Dict[str, Dict]): sys.exit(1) -all_yaml_files: Dict[str, Dict] = _load_package_data() +if __name__ == '__main__': + all_provider_files = sorted(glob(f"{ROOT_DIR}/airflow/providers/**/provider.yaml", recursive=True)) + if len(sys.argv) > 1: + paths = sorted(sys.argv[1:]) + else: + paths = all_provider_files -check_integration_duplicates(all_yaml_files) -check_completeness_of_list_of_hooks_sensors_hooks(all_yaml_files) -check_completeness_of_list_of_transfers(all_yaml_files) -check_invalid_integration(all_yaml_files) -check_doc_files(all_yaml_files) + all_parsed_yaml_files: Dict[str, Dict] = _load_package_data(paths) + + all_files_loaded = len(all_provider_files) == len(paths) + check_integration_duplicates(all_parsed_yaml_files) + + check_completeness_of_list_of_hooks_sensors_hooks(all_parsed_yaml_files) + check_completeness_of_list_of_transfers(all_parsed_yaml_files) + + if all_files_loaded: + # Only check those if all provider files are loaded + check_doc_files(all_parsed_yaml_files) + check_invalid_integration(all_parsed_yaml_files) diff --git a/setup.py b/setup.py index fe2522c543e..624e4206d7b 100644 --- a/setup.py +++ b/setup.py @@ -837,6 +837,7 @@ def is_package_excluded(package: str, exclusion_list: List[str]): 'graphviz>=0.12', 'gunicorn>=19.5.0, <20.0', 'iso8601>=0.1.12', + 'importlib_resources; python_version<"3.7"', 'jinja2>=2.10.1, <2.12.0', 'json-merge-patch==0.2', 'jsonschema~=3.0', diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py new file mode 100644 index 00000000000..c18566e8dd6 --- /dev/null +++ b/tests/core/test_providers_manager.py @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from airflow.providers_manager import ProvidersManager + +ALL_PROVIDERS = [ + 'apache-airflow-providers-amazon', + 'apache-airflow-providers-apache-cassandra', + 'apache-airflow-providers-apache-druid', + 'apache-airflow-providers-apache-hdfs', + 'apache-airflow-providers-apache-hive', + 'apache-airflow-providers-apache-kylin', + 'apache-airflow-providers-apache-livy', + 'apache-airflow-providers-apache-pig', + 'apache-airflow-providers-apache-pinot', + 'apache-airflow-providers-apache-spark', + 'apache-airflow-providers-apache-sqoop', + 'apache-airflow-providers-celery', + 'apache-airflow-providers-cloudant', + 'apache-airflow-providers-cncf-kubernetes', + 'apache-airflow-providers-databricks', + 'apache-airflow-providers-datadog', + 'apache-airflow-providers-dingding', + 'apache-airflow-providers-discord', + 'apache-airflow-providers-docker', + 'apache-airflow-providers-elasticsearch', + 'apache-airflow-providers-exasol', + 'apache-airflow-providers-facebook', + 'apache-airflow-providers-ftp', + 'apache-airflow-providers-google', + 'apache-airflow-providers-grpc', + 'apache-airflow-providers-hashicorp', + 'apache-airflow-providers-http', + 'apache-airflow-providers-imap', + 'apache-airflow-providers-jdbc', + 'apache-airflow-providers-jenkins', + 'apache-airflow-providers-jira', + 'apache-airflow-providers-microsoft-azure', + 'apache-airflow-providers-microsoft-mssql', + 'apache-airflow-providers-microsoft-winrm', + 'apache-airflow-providers-mongo', + 'apache-airflow-providers-mysql', + 'apache-airflow-providers-odbc', + 'apache-airflow-providers-openfaas', + 'apache-airflow-providers-opsgenie', + 'apache-airflow-providers-oracle', + 'apache-airflow-providers-pagerduty', + 'apache-airflow-providers-papermill', + 'apache-airflow-providers-plexus', + 'apache-airflow-providers-postgres', + 'apache-airflow-providers-presto', + 'apache-airflow-providers-qubole', + 'apache-airflow-providers-redis', + 'apache-airflow-providers-salesforce', + 'apache-airflow-providers-samba', + 'apache-airflow-providers-segment', + 'apache-airflow-providers-sendgrid', + 'apache-airflow-providers-sftp', + 'apache-airflow-providers-singularity', + 'apache-airflow-providers-slack', + 'apache-airflow-providers-snowflake', + 'apache-airflow-providers-sqlite', + 'apache-airflow-providers-ssh', + 'apache-airflow-providers-vertica', + 'apache-airflow-providers-yandex', + 'apache-airflow-providers-zendesk', +] + + +class TestProviderManager(unittest.TestCase): + def test_providers_are_loaded(self): + provider_manager = ProvidersManager() + provider_list = list(provider_manager.providers.keys()) + provider_list.sort() + self.assertEqual(ALL_PROVIDERS, provider_list)