diff --git a/.gitignore b/.gitignore index 9f3b2235e..088495b67 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ output-artifacts/ *.egg-info/ /.eggs/ /pip-wheel-metadata +/o2a/o2a_libs/dist diff --git a/.kokoro/kokoro_build.sh b/.kokoro/kokoro_build.sh index 824543543..c9633110b 100755 --- a/.kokoro/kokoro_build.sh +++ b/.kokoro/kokoro_build.sh @@ -25,9 +25,9 @@ # parameters, will print the full command, with credentials, in the build logs. # set -x -# Code under repo is checked out to ${KOKORO_ARTIFACTS_DIR}/github. +# Code under repo is checked out to ${KOKORO_ARTIFACTS_DIR}/git. # The final directory name in this path is determined by the scm name specified # in the job configuration. cd "${KOKORO_ARTIFACTS_DIR}/git/oozie-to-airflow" -.kokoro/tests/run_tests.sh +.kokoro/run_tests.sh diff --git a/.kokoro/run_tests.sh b/.kokoro/run_tests.sh index 0d31f1fae..ce673d25a 100755 --- a/.kokoro/run_tests.sh +++ b/.kokoro/run_tests.sh @@ -20,6 +20,12 @@ O2A_DIR="$( cd "${MY_DIR}/.." && pwd )" export PATH="${HOME}/.local/bin:${PATH}" if [[ ! -z "${KOKORO_BUILD_ID}" ]]; then # export vars only for Kokoro job + # add USER env variable for the unit tests + export USER="kokoro" + + # add TERM env variable for the conversion tests + export TERM="xterm-256color" + # Setup service account credentials export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/kokoro/service-account-key.json @@ -28,14 +34,14 @@ if [[ ! -z "${KOKORO_BUILD_ID}" ]]; then # export vars only for Kokoro job export COMPOSER_TESTS_PROJECT_ID=PROJECT_ID fi +# install xmllint +sudo apt-get install -y libxml2-utils + # prepare python environment -pyenv install --skip-existing 3.6.15 -pyenv install --skip-existing 3.7.10 pyenv install --skip-existing 3.8.10 pyenv install --skip-existing 3.9.5 -pyenv install --skip-existing 3.10.9 -pyenv global 3.6.15 3.7.10 3.8.10 3.9.5 3.10.9 -python -m pip install -r ${O2A_DIR}/requirements.txt +pyenv global 3.8.10 3.9.5 +python -m pip install --user -r ${O2A_DIR}/requirements.txt echo -e "******************** Running unit tests... ********************\n" "${O2A_DIR}/bin/o2a-run-all-unit-tests" diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9392f646c..168620f78 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,13 +14,13 @@ --- repos: - repo: https://github.com/ambv/black - rev: 19.3b0 + rev: 23.1a1 hooks: - id: black name: Formats python files using black - language_version: python3.6 + language_version: python3.8 - repo: https://github.com/Lucas-C/pre-commit-hooks - rev: v1.1.6 + rev: v1.5.2 hooks: - id: insert-license name: Add licence for all XML, md files diff --git a/.travis.yml b/.travis.yml index 809862b01..7eebdbd74 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ language: python env: - SLUGIFY_USES_TEXT_UNIDECODE=yes WITH_COVERAGE=true python: - - "3.6.7" + - "3.8.10" cache: pip addons: apt: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index de96ce608..778e1a8ec 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -85,7 +85,7 @@ using [virtualenvwrapper](https://virtualenvwrapper.readthedocs.io/en/latest/). An example of such local environment setup (with virtualenvwrapper): ```bash -mkvirtualenv -p python3.6 oozie-to-airflow +mkvirtualenv -p python3.8 oozie-to-airflow pip install -e . ``` diff --git a/README.md b/README.md index 6d338f116..0c2b0352b 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ A tool to easily convert between [Apache Oozie](http://oozie.apache.org/) workflows and [Apache Airflow](https://airflow.apache.org) workflows. -The program targets Apache Airflow >= 1.10 and Apache Oozie 1.0 XML schema. +The program targets Apache Airflow >= 2.x and Apache Oozie 1.0 XML schema. If you want to contribute to the project, please take a look at [CONTRIBUTING.md](CONTRIBUTING.md) @@ -105,7 +105,7 @@ There are a few differences noted below: # Running the Program -Note that you need Python >= 3.6 to run the converter. +Note that you need Python >= 3.8 to run the converter. ## Installing from PyPi @@ -211,10 +211,10 @@ executed in composer environment where the libs are copied to Composer's DAG fol ## Control nodes ### Fork and Join -A [fork node](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.1.5_Fork_and_Join_Control_Nodes) +A [fork node](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.1.5_Fork_and_Join_Control_Nodes) splits the path of execution into multiple concurrent paths of execution. -A [join node](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.1.5_Fork_and_Join_Control_Nodes) +A [join node](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.1.5_Fork_and_Join_Control_Nodes) waits until every concurrent execution of the previous fork node arrives to it. The fork and join nodes must be used in pairs. The join node assumes concurrent execution paths are children of the same fork node. ~~~~ @@ -233,7 +233,7 @@ assumes concurrent execution paths are children of the same fork node. ### Decision -A [decision node](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.1.4_Decision_Control_Node) +A [decision node](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.1.4_Decision_Control_Node) enables a workflow to make a selection on the execution path to follow. The behavior of a decision node can be seen as a switch-case statement. @@ -259,7 +259,7 @@ Predicates are JSP Expression Language (EL) expressions (refer to section 4.2 of ~~~~ ### Start -The [start node](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.1.1_Start_Control_Node) +The [start node](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.1.1_Start_Control_Node) is the entry point for a workflow job, it indicates the first workflow node the workflow job must transition to. When a workflow is started, it automatically transitions to the node specified in the start . @@ -275,7 +275,7 @@ A workflow definition must have one start node. ~~~~ ### End -The [end node](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.1.2_End_Control_Node) +The [end node](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.1.2_End_Control_Node) is the end for a workflow job, it indicates that the workflow job has completed successfully. When a workflow job reaches the end it finishes successfully (SUCCEEDED). @@ -294,7 +294,7 @@ A workflow definition must have one end node. ### Kill -The [kill node](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.1.3_Kill_Control_Node) +The [kill node](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.1.3_Kill_Control_Node) allows a workflow job to exit with an error. When a workflow job reaches the kill it finishes in error (KILLED). @@ -315,7 +315,7 @@ A workflow definition may have zero or more kill nodes. ## EL Functions -As of now, a very minimal set of [Oozie EL](https://oozie.apache.org/docs/4.0.1/WorkflowFunctionalSpec.html#a4.2_Expression_Language_Functions) +As of now, a very minimal set of [Oozie EL](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a4.2_Expression_Language_Functions) functions are supported. The way they work is that an EL expression is being translated to a jinja template. The translation is performed using [Lark](https://lark-parser.readthedocs.io/en/latest/). All required variables should be passed in `job.properties`. Equivalents of EL functions can be found in @@ -336,7 +336,7 @@ parsed by the Airflow workers and then available to all DAGs. ## Workflow and node notifications Workflow jobs can be configured to make an HTTP GET notification upon start and end of a workflow action node -and upon the start and completion of a workflow job. More information in [Oozie docs](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a5_Workflow_Notifications). +and upon the start and completion of a workflow job. More information in [Oozie docs](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a5_Workflow_Notifications). Oozie-to-Airflow supports this feature. The `job.properties` file has contain URLs for workflow and action node notifications - example below: @@ -412,7 +412,7 @@ treats file/archive somewhat erraticaly. This is not a blocker to run most of th some particular complex workflows might be problematic. Further testing with real, production Oozie workflows is needed to verify our implementation. -[Example Oozie docs](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.2.2.1_Adding_Files_and_Archives_for_the_Job) +[Example Oozie docs](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.2.2.1_Adding_Files_and_Archives_for_the_Job) * [File/Archive in Pig doesn't work](https://github.com/GoogleCloudPlatform/oozie-to-airflow/issues/243) @@ -427,7 +427,7 @@ configuration options the following ones are not supported (but can be easily ad ## Support for uber.jar feature -The uber.jar feature is not supported. [Oozie docs](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#AppDeployment) +The uber.jar feature is not supported. [Oozie docs](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#AppDeployment) * [Support uber.jar feature](https://github.com/GoogleCloudPlatform/oozie-to-airflow/issues/140) @@ -442,14 +442,14 @@ LD_LIBRARY_PATH/CLASSPATH. Currently only Java Mapper supports it. ## Custom messages missing for Kill Node The Kill Node might have custom log message specified. This is not implemented. -[Oozie docs](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.1.3_Kill_Control_Node) +[Oozie docs](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.1.3_Kill_Control_Node) * [Add handling of custom Kill Node message](https://github.com/GoogleCloudPlatform/oozie-to-airflow/issues/97) ## Capturing output is not supported In several actions you can capture output from tasks. This is not yet implemented. -[Example Oozie docs](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.2.6_Java_Action) +[Example Oozie docs](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.2.6_Java_Action) * [Add support for capture-ouput](https://github.com/GoogleCloudPlatform/oozie-to-airflow/issues/155) @@ -505,8 +505,8 @@ on-premise setup. Here are some details about the environment that is supported: ### Cloud Composer -* composer-1.5.0-airflow-1.10.1 -* python version 3 (3.6.6) +* composer-2.2.0-airflow-2.5.1 +* python version 3 (3.8.10) * machine n1-standard-1 * node count: 3 * Additional PyPi packages: @@ -518,24 +518,22 @@ on-premise setup. Here are some details about the environment that is supported: * primary disk size, 50 GB * Image 1.3.29-debian9 * Hadoop version -* Init action: [oozie-5.1.sh](dataproc/oozie-5.1.sh) +* Init action: [oozie-5.2.sh](dataproc/oozie-5.2.sh) Those are the steps you should follow to set it up: 1. Create a Dataproc cluster see [Creating Dataproc Cluster](#creating-dataproc-cluster) below 1. Create a [Cloud Composer Environment](https://cloud.google.com/composer/docs/how-to/managing/creating#creating_a_new_environment) - with at least Airflow version 1.10 to test the Apache Airflow workflows. - Since Airflow 1.10 is in Beta for Cloud Composer, you must - [enable beta features in Cloud Console](https://cloud.google.com/composer/docs/concepts/beta-support#enable-beta)) + with at least Airflow version 2.0 to test the Apache Airflow workflows. 1. Set up all required [Airflow Connections](https://airflow.apache.org/howto/connection/index.html) in Composer. This is required for things like `SSHOperator`. ### Creating Dataproc cluster We prepared Dataproc [initialization action](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/init-actions) -that allows to run Oozie 5.1.0 on Dataproc. +that allows to run Oozie 5.2.0 on Dataproc. -Please upload [oozie-5.1.sh](dataproc/oozie-5.1.sh) to your GCS bucket and create cluster using following command: +Please upload [oozie-5.2.sh](dataproc/oozie-5.2.sh) to your GCS bucket and create cluster using following command: Note that you need at least 20GB RAM to run Oozie jobs on the cluster. The custom machine type below has enough RAM to handle oozie. @@ -767,7 +765,7 @@ The converted DAG uses the `DataProcHadoopOperator` in Airflow. **1. Exit status not available** -From the [Oozie documentation](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.2.2_Map-Reduce_Action): +From the [Oozie documentation](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.2.2_Map-Reduce_Action): > The counters of the Hadoop job and job exit status (FAILED, KILLED or SUCCEEDED) must be available to the workflow job after the Hadoop jobs ends. This information can be used from within decision nodes and other actions configurations. @@ -778,7 +776,7 @@ Issue in Github: [Implement exit status and counters in MapReduce Action](https: **2. Configuration options** -From the [Oozie documentation](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.2.2_Map-Reduce_Action) +From the [Oozie documentation](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.2.2_Map-Reduce_Action) (the strikethrough is from us): > Hadoop JobConf properties can be specified as part of > - ~~the config-default.xml or~~ @@ -876,7 +874,7 @@ The converted DAG uses the `DataProcPigOperator` in Airflow. ### Known limitations **1. Configuration options** -From the [Oozie documentation](https://oozie.apache.org/docs/5.1.0/WorkflowFunctionalSpec.html#a3.2.3_Pig_Action) +From the [Oozie documentation](https://oozie.apache.org/docs/5.2.0/WorkflowFunctionalSpec.html#a3.2.3_Pig_Action) (the strikethrough is from us): > Hadoop JobConf properties can be specified as part of > - ~~the config-default.xml or~~ @@ -917,7 +915,7 @@ action with Pig by invoking `gcloud dataproc jobs submit pig --cluster= **1. Exit status not available** -From the [Oozie documentation](https://oozie.apache.org/docs/5.1.0/DG_ShellActionExtension.html): +From the [Oozie documentation](https://oozie.apache.org/docs/5.2.0/DG_ShellActionExtension.html): > The output (STDOUT) of the Shell job can be made available to the workflow job after the Shell job ends. This information could be used from within decision nodes. @@ -929,7 +927,7 @@ Issue in Github: [Finalize shell mapper](https://github.com/GoogleCloudPlatform/ **2. No Shell launcher configuration** -From the [Oozie documentation](https://oozie.apache.org/docs/5.1.0/DG_ShellActionExtension.html): +From the [Oozie documentation](https://oozie.apache.org/docs/5.2.0/DG_ShellActionExtension.html): > Shell launcher configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements. @@ -959,14 +957,14 @@ The converted DAG uses the `DataProcSparkOperator` in Airflow. **1. Only tasks written in Java are supported** -From the [Oozie documentation](https://oozie.apache.org/docs/5.1.0/DG_ShellActionExtension.html): +From the [Oozie documentation](https://oozie.apache.org/docs/5.2.0/DG_ShellActionExtension.html): > The jar element indicates a comma separated list of jars or python files. The solution was tested with only a single Jar file. **2. No Spark launcher configuration** -From the [Oozie documentation](https://oozie.apache.org/docs/5.1.0/DG_SparkActionExtension.html): +From the [Oozie documentation](https://oozie.apache.org/docs/5.2.0/DG_SparkActionExtension.html): > Shell launcher configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements. diff --git a/bin/o2a b/bin/o2a index 349a39651..83167883a 100755 --- a/bin/o2a +++ b/bin/o2a @@ -23,7 +23,7 @@ sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), path.pardir))) if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 6): print("") print( - "ERROR! You need to run this script in python version >= 3.6 (and you have {}.{})".format( + "ERROR! You need to run this script in python version >= 3.8 (and you have {}.{})".format( sys.version_info.major, sys.version_info.minor ) ) diff --git a/bin/o2a_lib-package-upload b/bin/o2a_lib-package-upload new file mode 100755 index 000000000..4be1c1c8a --- /dev/null +++ b/bin/o2a_lib-package-upload @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -euo pipefail +MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +O2A_DIR="$( cd "${MY_DIR}/.." && pwd )" + +pushd "${O2A_DIR}" +echo +"${MY_DIR}/o2a-confirm" "Preparing production package" +echo +rm -rvf o2a/o2a_libs/dist/* +rm -rvf o2a/o2a_libs/src/o2a_lib.egg-info +rm -rvf .eggs +python3 -m pip install --upgrade build +python3 -m build o2a/o2a_libs +echo +"${MY_DIR}/o2a-confirm" "Uploading to production PyPi" +echo +python3 -m twine upload --repository-url https://upload.pypi.org/legacy/ o2a/o2a_libs/dist/* +popd diff --git a/bin/o2a_lib-package-upload-test b/bin/o2a_lib-package-upload-test new file mode 100755 index 000000000..9f85361d7 --- /dev/null +++ b/bin/o2a_lib-package-upload-test @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -euo pipefail +MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +O2A_DIR="$( cd "${MY_DIR}/.." && pwd )" + +pushd "${O2A_DIR}" +echo +"${MY_DIR}/o2a-confirm" "Preparing test package" +echo +rm -rvf o2a/o2a_libs/dist/* +rm -rvf o2a/o2a_libs/src/o2a_lib.egg-info +rm -rvf .eggs +python3 -m pip install --upgrade build +python3 -m build o2a/o2a_libs +echo +"${MY_DIR}/o2a-confirm" "Uploading to test PyPi" +echo +python3 -m twine upload --repository-url https://test.pypi.org/legacy/ o2a/o2a_libs/dist/* +popd diff --git a/dataproc/oozie-5.1.sh b/dataproc/oozie-5.2.sh similarity index 84% rename from dataproc/oozie-5.1.sh rename to dataproc/oozie-5.2.sh index 74602c57c..c33d2f1f2 100644 --- a/dataproc/oozie-5.1.sh +++ b/dataproc/oozie-5.2.sh @@ -30,17 +30,26 @@ function retry_command() { function install_maven() { retry_command "apt-get update" retry_command "apt-get install -y maven" + retry_command "apt-get install -y xmlstarlet" } function get_oozie(){ - wget https://www.apache.org/dist/oozie/5.1.0/oozie-5.1.0.tar.gz -P /tmp - tar -xzvf /tmp/oozie-5.1.0.tar.gz --directory /tmp/ + wget https://www.apache.org/dist/oozie/5.2.1/oozie-5.2.1.tar.gz -P /tmp + tar -xzvf /tmp/oozie-5.2.1.tar.gz --directory /tmp/ } function build_oozie(){ - /tmp/oozie-5.1.0/bin/mkdistro.sh -DskipTests -Puber - tar -zxvf /tmp/oozie-5.1.0/distro/target/oozie-5.1.0-distro.tar.gz -C /usr/local/lib - mv /usr/local/lib/oozie-5.1.0/ /usr/local/lib/oozie + xmlstarlet ed -L -N x="http://maven.apache.org/POM/4.0.0" --subnode "/x:project/x:repositories" -t elem -n 'repository' \ + --var newnd '$prev' \ + --subnode '$prev' -t elem -n "id" -v "conjars" \ + --subnode '$newnd' -t elem -n "url" -v "https://conjars.org/repo/" \ + --subnode '$newnd' -t elem -n "releases" -v "" \ + --subnode '$prev' -t elem -n "enabled" -v "false" \ + --subnode '$newnd' -t elem -n "snapshots" -v "" \ + --subnode '$prev' -t elem -n "enabled" -v "false" /tmp/oozie-5.2.1/pom.xml + /tmp/oozie-5.2.1/bin/mkdistro.sh -DskipTests -Puber + tar -zxvf /tmp/oozie-5.2.1/distro/target/oozie-5.2.1-distro.tar.gz -C /usr/local/lib + mv /usr/local/lib/oozie-5.2.1/ /usr/local/lib/oozie } function configure_oozie() { @@ -53,7 +62,7 @@ function configure_oozie() { rm /usr/local/lib/oozie/conf/hadoop-conf/* ln -s /etc/hadoop/conf/*-site.xml /usr/local/lib/oozie/conf/hadoop-conf/ - tar -zxvf /usr/local/lib/oozie/oozie-sharelib-5.1.0.tar.gz -C /usr/local/lib/oozie/ + tar -zxvf /usr/local/lib/oozie/oozie-sharelib-5.2.1.tar.gz -C /usr/local/lib/oozie/ chown oozie:oozie -R /usr/local/lib/oozie } diff --git a/mypy.ini b/mypy.ini index f6fa97b6e..4dffc39a4 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,7 +1,7 @@ # Global options: [mypy] -python_version = 3.6 +python_version = 3.8 warn_return_any = True warn_unused_configs = True warn_unused_ignores = False diff --git a/o2a/converter/oozie_converter.py b/o2a/converter/oozie_converter.py index ed5ed9d38..643b6efbd 100644 --- a/o2a/converter/oozie_converter.py +++ b/o2a/converter/oozie_converter.py @@ -34,7 +34,7 @@ from o2a.utils.file_utils import get_lib_files from o2a.mappers.action_mapper import ActionMapper from o2a.transformers.base_transformer import BaseWorkflowTransformer -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet # pylint: disable=too-many-instance-attributes diff --git a/o2a/converter/property_parser.py b/o2a/converter/property_parser.py index cc894b4b6..3ae544e2a 100644 --- a/o2a/converter/property_parser.py +++ b/o2a/converter/property_parser.py @@ -16,7 +16,7 @@ import os from o2a.converter.workflow import Workflow -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils import el_utils from o2a.utils.constants import CONFIG, JOB_PROPS diff --git a/o2a/converter/renderers.py b/o2a/converter/renderers.py index 5306e6522..fef9c9366 100644 --- a/o2a/converter/renderers.py +++ b/o2a/converter/renderers.py @@ -37,7 +37,7 @@ from autoflake import fix_file from o2a.converter.workflow import Workflow -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.el_utils import comma_separated_string_to_list from o2a.utils.template_utils import render_template diff --git a/o2a/converter/workflow.py b/o2a/converter/workflow.py index dc77a5eeb..ca7c620fc 100644 --- a/o2a/converter/workflow.py +++ b/o2a/converter/workflow.py @@ -50,12 +50,12 @@ def __init__( self.dependencies = dependencies or { "import shlex", "import datetime", - "from o2a.o2a_libs.property_utils import PropertySet", - "from o2a.o2a_libs import functions", + "from o2a_lib.property_utils import PropertySet", + "from o2a_lib import functions", "from airflow import models", "from airflow.utils.trigger_rule import TriggerRule", "from airflow.utils import dates", - "from airflow.operators import bash_operator, dummy_operator", + "from airflow.operators import bash, empty", } self.library_folder = os.path.join(self.input_directory_path, HDFS_FOLDER, LIB_FOLDER) self.jar_files = get_lib_files(self.library_folder, extension=".jar") diff --git a/o2a/converter/workflow_xml_parser.py b/o2a/converter/workflow_xml_parser.py index 103c2f019..ad74a0f20 100644 --- a/o2a/converter/workflow_xml_parser.py +++ b/o2a/converter/workflow_xml_parser.py @@ -38,7 +38,7 @@ from o2a.mappers.join_mapper import JoinMapper from o2a.mappers.kill_mapper import KillMapper from o2a.mappers.start_mapper import StartMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.transformers.base_transformer import BaseWorkflowTransformer from o2a.utils import xml_utils @@ -260,7 +260,7 @@ def parse_start_node(self, start_node): name=start_name, dag_name=self.workflow.dag_name, props=self.props, - trigger_rule=TriggerRule.DUMMY, + trigger_rule=TriggerRule.ALWAYS, ) oozie_control_node = OozieControlNode(mapper) diff --git a/o2a/mappers/action_mapper.py b/o2a/mappers/action_mapper.py index bd099a04b..aeaa7fc21 100644 --- a/o2a/mappers/action_mapper.py +++ b/o2a/mappers/action_mapper.py @@ -22,7 +22,7 @@ from o2a.converter.task import Task from o2a.mappers.base_mapper import BaseMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.config_extractors import ( TAG_JOB_XML, diff --git a/o2a/mappers/base_mapper.py b/o2a/mappers/base_mapper.py index ab6294b25..47e295cbf 100644 --- a/o2a/mappers/base_mapper.py +++ b/o2a/mappers/base_mapper.py @@ -21,7 +21,7 @@ from o2a.converter.relation import Relation from o2a.converter.task import Task -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class BaseMapper(ABC): @@ -45,7 +45,7 @@ def required_imports(self) -> Set[str]: Returns a set of strings that are the import statement that python will write to use. - Ex: returns {'from airflow.operators import bash_operator']} + Ex: returns {'from airflow.operators import bash']} """ raise NotImplementedError("Not Implemented") diff --git a/o2a/mappers/decision_mapper.py b/o2a/mappers/decision_mapper.py index 53d3aad27..e9ab20330 100644 --- a/o2a/mappers/decision_mapper.py +++ b/o2a/mappers/decision_mapper.py @@ -22,8 +22,8 @@ from o2a.converter.task import Task from o2a.converter.relation import Relation from o2a.mappers.base_mapper import BaseMapper -from o2a.o2a_libs.property_utils import PropertySet -from o2a.o2a_libs import el_parser +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib import el_parser # noinspection PyAbstractClass @@ -89,4 +89,4 @@ def to_tasks_and_relations(self): return tasks, relations def required_imports(self) -> Set[str]: - return {"from airflow.operators import python_operator", "from airflow.utils import dates"} + return {"from airflow.operators import python", "from airflow.utils import dates"} diff --git a/o2a/mappers/distcp_mapper.py b/o2a/mappers/distcp_mapper.py index a13646f58..233e91e27 100644 --- a/o2a/mappers/distcp_mapper.py +++ b/o2a/mappers/distcp_mapper.py @@ -22,7 +22,7 @@ from o2a.converter.relation import Relation from o2a.converter.task import Task from o2a.mappers.action_mapper import ActionMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils import xml_utils, el_utils from o2a.utils.file_archive_extractors import ArchiveExtractor, FileExtractor @@ -80,4 +80,4 @@ def to_tasks_and_relations(self): return tasks, relations def required_imports(self) -> Set[str]: - return {"import shlex", "from airflow.operators import bash_operator"} + return {"import shlex", "from airflow.operators import bash"} diff --git a/o2a/mappers/dummy_mapper.py b/o2a/mappers/dummy_mapper.py index 3944bf55f..ee947210d 100644 --- a/o2a/mappers/dummy_mapper.py +++ b/o2a/mappers/dummy_mapper.py @@ -20,7 +20,7 @@ from o2a.converter.relation import Relation from o2a.converter.task import Task from o2a.mappers.base_mapper import BaseMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class DummyMapper(BaseMapper): @@ -46,4 +46,4 @@ def to_tasks_and_relations(self): return tasks, relations def required_imports(self) -> Set[str]: - return {"from airflow.operators import dummy_operator"} + return {"from airflow.operators import empty"} diff --git a/o2a/mappers/email_mapper.py b/o2a/mappers/email_mapper.py index 0b528e09e..0687bf580 100644 --- a/o2a/mappers/email_mapper.py +++ b/o2a/mappers/email_mapper.py @@ -20,7 +20,7 @@ from o2a.converter.relation import Relation from o2a.converter.task import Task from o2a.mappers.action_mapper import ActionMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils import xml_utils @@ -62,7 +62,7 @@ def to_tasks_and_relations(self) -> Tuple[List[Task], List[Relation]]: return tasks, relations def required_imports(self) -> Set[str]: - return {"from airflow.operators import email_operator"} + return {"from airflow.operators import email"} def __extract_email_data(self): root = self.oozie_node diff --git a/o2a/mappers/fs_mapper.py b/o2a/mappers/fs_mapper.py index fee03ee9c..df012f40c 100644 --- a/o2a/mappers/fs_mapper.py +++ b/o2a/mappers/fs_mapper.py @@ -21,7 +21,7 @@ from o2a.converter.task import Task from o2a.mappers.action_mapper import ActionMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.relation_utils import chain from o2a.utils.el_utils import normalize_path @@ -145,7 +145,7 @@ def to_tasks_and_relations(self): return self.tasks, chain(self.tasks) def required_imports(self) -> Set[str]: - return {"from airflow.operators import dummy_operator", "from airflow.operators import bash_operator"} + return {"from airflow.operators import empty", "from airflow.operators import bash"} def parse_fs_operation(self, index: int, node: Element, operation_nodes_count: int) -> Task: tag_name = node.tag diff --git a/o2a/mappers/git_mapper.py b/o2a/mappers/git_mapper.py index 2cd2c0b2b..a776d9ab4 100644 --- a/o2a/mappers/git_mapper.py +++ b/o2a/mappers/git_mapper.py @@ -23,7 +23,7 @@ from o2a.mappers.action_mapper import ActionMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.xml_utils import get_tag_el_text from o2a.utils.el_utils import normalize_path @@ -100,4 +100,4 @@ def to_tasks_and_relations(self): return tasks, relations def required_imports(self) -> Set[str]: - return {"from airflow.operators import bash_operator"} + return {"from airflow.operators import bash"} diff --git a/o2a/mappers/hive_mapper.py b/o2a/mappers/hive_mapper.py index e3d94268a..8de7eea54 100644 --- a/o2a/mappers/hive_mapper.py +++ b/o2a/mappers/hive_mapper.py @@ -24,7 +24,7 @@ from o2a.converter.task import Task from o2a.mappers.action_mapper import ActionMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.file_archive_extractors import ArchiveExtractor, FileExtractor @@ -76,7 +76,7 @@ def to_tasks_and_relations(self): task_id=self.name, template_name="hive.tpl", template_params=dict( - query=self.query, + query_obj=dict(queries=[self.query]) if self.query else None, script=self.script, props=self.props, archives=self.hdfs_archives, @@ -101,4 +101,7 @@ def copy_extra_assets(self, input_directory_path: str, output_directory_path: st shutil.copy(source_script_file_path, destination_script_file_path) def required_imports(self) -> Set[str]: - return {"from airflow.utils import dates", "from airflow.contrib.operators import dataproc_operator"} + return { + "from airflow.utils import dates", + "from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator" + } diff --git a/o2a/mappers/java_mapper.py b/o2a/mappers/java_mapper.py index f3ecc882b..9093c9e30 100644 --- a/o2a/mappers/java_mapper.py +++ b/o2a/mappers/java_mapper.py @@ -22,7 +22,7 @@ from o2a.converter.task import Task from o2a.mappers.action_mapper import ActionMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils import xml_utils from o2a.utils.file_archive_extractors import FileExtractor, ArchiveExtractor from o2a.utils.xml_utils import get_tags_el_array_from_text @@ -74,11 +74,13 @@ def to_tasks_and_relations(self): template_name="java.tpl", template_params=dict( props=self.props, - hdfs_files=self.hdfs_files, - hdfs_archives=self.hdfs_archives, - main_class=self.main_class, - jar_files_in_hdfs=self.jar_files_in_hdfs, - args=self.args, + hadoop_job=dict( + args=self.args, + jar_file_uris=self.jar_files_in_hdfs, + file_uris=self.hdfs_files, + archive_uris=self.hdfs_archives, + main_class=self.main_class, + ), ), ) tasks = [action_task] @@ -89,7 +91,10 @@ def to_tasks_and_relations(self): return tasks, relations def required_imports(self) -> Set[str]: - return {"from airflow.utils import dates", "from airflow.contrib.operators import dataproc_operator"} + return { + "from airflow.utils import dates", + "from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator" + } def _get_jar_files_in_hdfs_full_paths(self): hdfs_app_prefix = self.props.job_properties["oozie.wf.application.path"] diff --git a/o2a/mappers/mapreduce_mapper.py b/o2a/mappers/mapreduce_mapper.py index bd4064c2d..4b36c1e35 100644 --- a/o2a/mappers/mapreduce_mapper.py +++ b/o2a/mappers/mapreduce_mapper.py @@ -21,7 +21,7 @@ from o2a.converter.relation import Relation from o2a.mappers.action_mapper import ActionMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.file_archive_extractors import ArchiveExtractor, FileExtractor @@ -81,4 +81,7 @@ def _validate_paths(input_directory_path, output_directory_path): raise Exception(f"The output_directory_path should be set and is {output_directory_path}") def required_imports(self) -> Set[str]: - return {"from airflow.utils import dates", "from airflow.contrib.operators import dataproc_operator"} + return { + "from airflow.utils import dates", + "from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator" + } diff --git a/o2a/mappers/pig_mapper.py b/o2a/mappers/pig_mapper.py index 8270febc9..a0b1a91b3 100644 --- a/o2a/mappers/pig_mapper.py +++ b/o2a/mappers/pig_mapper.py @@ -23,7 +23,7 @@ from o2a.converter.relation import Relation from o2a.mappers.action_mapper import ActionMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.file_archive_extractors import ArchiveExtractor, FileExtractor from o2a.utils.param_extractor import extract_param_values_from_action_node from o2a.utils.xml_utils import get_tag_el_text @@ -105,4 +105,7 @@ def _validate_paths(input_directory_path, output_directory_path): raise Exception(f"The output_directory_path should be set and is {output_directory_path}") def required_imports(self) -> Set[str]: - return {"from airflow.utils import dates", "from airflow.contrib.operators import dataproc_operator"} + return { + "from airflow.utils import dates", + "from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator" + } diff --git a/o2a/mappers/shell_mapper.py b/o2a/mappers/shell_mapper.py index 25b876cdf..8f5172233 100644 --- a/o2a/mappers/shell_mapper.py +++ b/o2a/mappers/shell_mapper.py @@ -21,9 +21,9 @@ from o2a.converter.relation import Relation from o2a.mappers.action_mapper import ActionMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.xml_utils import get_tag_el_text, get_tags_el_array_from_text -from o2a.o2a_libs import el_parser +from o2a.o2a_libs.src.o2a_lib import el_parser TAG_RESOURCE = "resource-manager" @@ -69,4 +69,7 @@ def to_tasks_and_relations(self): return tasks, relations def required_imports(self) -> Set[str]: - return {"from airflow.utils import dates", "from airflow.contrib.operators import dataproc_operator"} + return { + "from airflow.utils import dates", + "from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator" + } diff --git a/o2a/mappers/spark_mapper.py b/o2a/mappers/spark_mapper.py index 51e0ddd6c..d55a65ac2 100644 --- a/o2a/mappers/spark_mapper.py +++ b/o2a/mappers/spark_mapper.py @@ -23,7 +23,7 @@ from o2a.converter.relation import Relation from o2a.mappers.action_mapper import ActionMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils import xml_utils from o2a.utils.file_archive_extractors import FileExtractor, ArchiveExtractor @@ -114,14 +114,16 @@ def to_tasks_and_relations(self): task_id=self.name, template_name="spark.tpl", template_params=dict( - main_jar=self.java_jar, - main_class=self.java_class, - arguments=self.application_args, - hdfs_archives=self.hdfs_archives, - hdfs_files=self.hdfs_files, job_name=self.job_name, - dataproc_spark_jars=self.dataproc_jars, - spark_opts=self.spark_opts, + spark_job=dict( + args=self.application_args, + jar_file_uris=self.dataproc_jars, + file_uris=self.hdfs_files, + archive_uris=self.hdfs_archives, + properties=self.spark_opts, + main_jar_file_uri=self.java_jar, + main_class=self.java_class + ) ), ) tasks = [action_task] @@ -134,7 +136,7 @@ def to_tasks_and_relations(self): def required_imports(self) -> Set[str]: # Bash are for the potential prepare statement return { - "from airflow.contrib.operators import dataproc_operator", - "from airflow.operators import bash_operator", - "from airflow.operators import dummy_operator", + "from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator", + "from airflow.operators import bash", + "from airflow.operators import empty", } diff --git a/o2a/mappers/ssh_mapper.py b/o2a/mappers/ssh_mapper.py index b6543c448..f90e53279 100644 --- a/o2a/mappers/ssh_mapper.py +++ b/o2a/mappers/ssh_mapper.py @@ -21,8 +21,8 @@ from o2a.converter.task import Task from o2a.converter.relation import Relation from o2a.mappers.action_mapper import ActionMapper -from o2a.o2a_libs.property_utils import PropertySet -from o2a.o2a_libs import el_parser +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib import el_parser from o2a.utils import el_utils, xml_utils @@ -96,6 +96,6 @@ def to_tasks_and_relations(self): def required_imports(self) -> Set[str]: return { "from airflow.utils import dates", - "from airflow.contrib.operators import ssh_operator", - "from airflow.contrib.hooks import ssh_hook", + "from airflow.providers.ssh.operators.ssh import SSHOperator", + "from airflow.providers.ssh.hooks.ssh import SSHHook", } diff --git a/o2a/mappers/subworkflow_mapper.py b/o2a/mappers/subworkflow_mapper.py index 2757afad5..efd041399 100644 --- a/o2a/mappers/subworkflow_mapper.py +++ b/o2a/mappers/subworkflow_mapper.py @@ -25,7 +25,7 @@ from o2a.converter.task import Task from o2a.definitions import EXAMPLES_PATH from o2a.mappers.action_mapper import ActionMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.transformers.base_transformer import BaseWorkflowTransformer from o2a.utils import xml_utils @@ -108,7 +108,7 @@ def to_tasks_and_relations(self): def required_imports(self) -> Set[str]: return { "from airflow.utils import dates", - "from airflow.contrib.operators import dataproc_operator", - "from airflow.operators.subdag_operator import SubDagOperator", + "from airflow.providers.google.cloud.operators import dataproc", + "from airflow.operators.subdag import SubDagOperator", f"import subdag_{self.app_name}", } diff --git a/o2a/o2a_libs/LICENSE b/o2a/o2a_libs/LICENSE new file mode 100644 index 000000000..d1aa8775e --- /dev/null +++ b/o2a/o2a_libs/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright 2019 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/o2a/o2a_libs/README.md b/o2a/o2a_libs/README.md new file mode 100644 index 000000000..4e3e310e6 --- /dev/null +++ b/o2a/o2a_libs/README.md @@ -0,0 +1,4 @@ +# o2a Library + +This is an additional library for Oozie To Airflow migration tool. +Importing this package is required by the Airflow DAGs generated by the Oozie To Airflow migration tool \ No newline at end of file diff --git a/o2a/o2a_libs/pyproject.toml b/o2a/o2a_libs/pyproject.toml new file mode 100644 index 000000000..710888c7f --- /dev/null +++ b/o2a/o2a_libs/pyproject.toml @@ -0,0 +1,28 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "o2a_lib" +version = "2.0.0" +authors = [ + { name="Jarek Potiuk", email="jarek.potiuk@polidea.com" }, + { name="Szymon Przedwojski", email="szymon.przedwojski@polidea.com" }, + { name="Kamil Breguła", email="kamil.bregula@polidea.com" }, + { name="Feng Lu", email="fenglu@google.com" }, + { name="Cameron Moberg", email="cjmoberg@google.com" }, +] +description = "Additional library for Oozie To Airflow migration tool" +readme = "README.md" +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", +] + +[project.urls] +"Homepage" = "https://github.com/GoogleCloudPlatform/oozie-to-airflow/tree/master/o2a/o2a_libs" +"Bug Tracker" = "https://github.com/GoogleCloudPlatform/oozie-to-airflow/issues" diff --git a/o2a/o2a_libs/src/__init__.py b/o2a/o2a_libs/src/__init__.py new file mode 100644 index 000000000..558e1bff2 --- /dev/null +++ b/o2a/o2a_libs/src/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# airflow DAG -required for some versions of airflow to correctly import diff --git a/o2a/o2a_libs/src/o2a_lib/__init__.py b/o2a/o2a_libs/src/o2a_lib/__init__.py new file mode 100644 index 000000000..558e1bff2 --- /dev/null +++ b/o2a/o2a_libs/src/o2a_lib/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# airflow DAG -required for some versions of airflow to correctly import diff --git a/o2a/o2a_libs/el_fs_functions.py b/o2a/o2a_libs/src/o2a_lib/el_fs_functions.py similarity index 100% rename from o2a/o2a_libs/el_fs_functions.py rename to o2a/o2a_libs/src/o2a_lib/el_fs_functions.py diff --git a/o2a/o2a_libs/el_parser.py b/o2a/o2a_libs/src/o2a_lib/el_parser.py similarity index 100% rename from o2a/o2a_libs/el_parser.py rename to o2a/o2a_libs/src/o2a_lib/el_parser.py diff --git a/o2a/o2a_libs/el_wf_functions.py b/o2a/o2a_libs/src/o2a_lib/el_wf_functions.py similarity index 98% rename from o2a/o2a_libs/el_wf_functions.py rename to o2a/o2a_libs/src/o2a_lib/el_wf_functions.py index 8800b34aa..c0cb202d7 100644 --- a/o2a/o2a_libs/el_wf_functions.py +++ b/o2a/o2a_libs/src/o2a_lib/el_wf_functions.py @@ -16,7 +16,7 @@ from typing import Optional, Set -from jinja2 import contextfunction +from jinja2 import pass_context from airflow.models import TaskInstance, DagRun, DAG from airflow.utils.db import provide_session @@ -36,7 +36,7 @@ def _reverse_task_map(task_map: dict) -> dict: return new_map -@contextfunction +@pass_context def conf(context=None, key: str = None): """ It returns the value of the workflow job configuration property for the @@ -50,7 +50,7 @@ def conf(context=None, key: str = None): raise AirflowException(f"Property {key} not found in workflow configuration.") -@contextfunction +@pass_context def user(context=None): """ Returns gloabl user name, DAG owner or raises error @@ -75,7 +75,7 @@ def user(context=None): return owner -@contextfunction +@pass_context @provide_session def last_error_node(context=None, session=None) -> str: """ diff --git a/o2a/o2a_libs/functions.py b/o2a/o2a_libs/src/o2a_lib/functions.py similarity index 97% rename from o2a/o2a_libs/functions.py rename to o2a/o2a_libs/src/o2a_lib/functions.py index cccfd7b5d..42efb89c0 100644 --- a/o2a/o2a_libs/functions.py +++ b/o2a/o2a_libs/src/o2a_lib/functions.py @@ -18,8 +18,8 @@ import re import json -import o2a.o2a_libs.el_wf_functions as wf_functions -import o2a.o2a_libs.el_fs_functions as fs_functions +from . import el_wf_functions as wf_functions +from . import el_fs_functions as fs_functions # Used for functions.wf.f_name pattern in templates diff --git a/o2a/o2a_libs/property_utils.py b/o2a/o2a_libs/src/o2a_lib/property_utils.py similarity index 100% rename from o2a/o2a_libs/property_utils.py rename to o2a/o2a_libs/src/o2a_lib/property_utils.py diff --git a/o2a/templates/decision.tpl b/o2a/templates/decision.tpl index 8b197cbb8..4a4c2addf 100644 --- a/o2a/templates/decision.tpl +++ b/o2a/templates/decision.tpl @@ -19,7 +19,7 @@ task = kwargs.get('task') decisions = {{ case_dict | to_python }} for (predicate, decision) in decisions.items(): {% filter indent(4, True) %} -value = task.render_template(content=predicate, context=TEMPLATE_ENV, attr=None) +value = task.render_template(content=predicate, context=TEMPLATE_ENV) if value in ("true", "True", 1): return decision {% endfilter %} @@ -27,9 +27,8 @@ return {{default_case | to_python}} {% endfilter %} -{{ task_id | to_var }} = python_operator.BranchPythonOperator( +{{ task_id | to_var }} = python.BranchPythonOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, python_callable={{ task_id | to_var }}_decision, - provide_context=True ) diff --git a/o2a/templates/distcp.tpl b/o2a/templates/distcp.tpl index 0ee836c98..81c93968c 100644 --- a/o2a/templates/distcp.tpl +++ b/o2a/templates/distcp.tpl @@ -15,7 +15,7 @@ #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = bash_operator.BashOperator( +{{ task_id | to_var }} = bash.BashOperator( task_id={{ task_id | tojson }}, trigger_rule={{ trigger_rule | tojson }}, bash_command={% include "hadoop_command.tpl" %} % (CONFIG['dataproc_cluster'], CONFIG['gcp_region'], diff --git a/o2a/templates/dummy.tpl b/o2a/templates/dummy.tpl index f140da7ec..1d3da7a9b 100644 --- a/o2a/templates/dummy.tpl +++ b/o2a/templates/dummy.tpl @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. #} -{{ task_id | to_var }} = dummy_operator.DummyOperator( +{{ task_id | to_var }} = empty.EmptyOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }} ) diff --git a/o2a/templates/email.tpl b/o2a/templates/email.tpl index 7a3df792d..3656b1605 100644 --- a/o2a/templates/email.tpl +++ b/o2a/templates/email.tpl @@ -14,7 +14,7 @@ limitations under the License. #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = email_operator.EmailOperator( +{{ task_id | to_var }} = email.EmailOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, to={{ to_addr | to_python }}, diff --git a/o2a/templates/fs_op.tpl b/o2a/templates/fs_op.tpl index 678f350ea..afa76d2df 100644 --- a/o2a/templates/fs_op.tpl +++ b/o2a/templates/fs_op.tpl @@ -14,7 +14,7 @@ limitations under the License. #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = bash_operator.BashOperator( +{{ task_id | to_var }} = bash.BashOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, bash_command={% include "pig_command.tpl" %} % (CONFIG['dataproc_cluster'], CONFIG['gcp_region'], diff --git a/o2a/templates/git.tpl b/o2a/templates/git.tpl index 30f10f6a9..34ac5d1f7 100644 --- a/o2a/templates/git.tpl +++ b/o2a/templates/git.tpl @@ -14,7 +14,7 @@ limitations under the License. #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = bash_operator.BashOperator( +{{ task_id | to_var }} = bash.BashOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, bash_command={% include "git_command.tpl" %}, diff --git a/o2a/templates/hive.tpl b/o2a/templates/hive.tpl index a6a6a136c..6cd29810d 100644 --- a/o2a/templates/hive.tpl +++ b/o2a/templates/hive.tpl @@ -15,15 +15,20 @@ #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = dataproc_operator.DataProcHiveOperator( +{{ task_id | to_var }} = DataprocSubmitJobOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, - {% if script %}query_uri='{}/{}'.format(CONFIG['gcp_uri_prefix'], {{ script | to_python }}),{% endif %} - {% if query %}query={{ query | to_python }},{% endif %} - {% if variables %}variables={{ variables | to_python }},{% endif %} - dataproc_hive_properties={{ props_macro.props(action_node_properties=action_node_properties, xml_escaped=True) }}, - cluster_name=CONFIG['dataproc_cluster'], + job=dict( + placement=dict( + cluster_name=CONFIG['dataproc_cluster'], + ), + hive_job=dict( + {% if variables %}script_variables={{ variables | to_python }},{% endif %} + properties={{ props_macro.props(action_node_properties=action_node_properties, xml_escaped=True) }}, + {% if script %}query_file_uri='{}/{}'.format(CONFIG['gcp_uri_prefix'], {{ script | to_python }}),{% endif %} + {% if query_obj %}query_list={{ query_obj | to_python }},{% endif %} + ), + ), gcp_conn_id=CONFIG['gcp_conn_id'], region=CONFIG['gcp_region'], - job_name={{ task_id | to_python }}, ) diff --git a/o2a/templates/http.tpl b/o2a/templates/http.tpl index 86b80649a..f85aad823 100644 --- a/o2a/templates/http.tpl +++ b/o2a/templates/http.tpl @@ -15,7 +15,7 @@ #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = bash_operator.BashOperator( +{{ task_id | to_var }} = bash.BashOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, bash_command={% include "http_command.tpl" %}, diff --git a/o2a/templates/java.tpl b/o2a/templates/java.tpl index 8dda7f3c2..483ffd005 100644 --- a/o2a/templates/java.tpl +++ b/o2a/templates/java.tpl @@ -14,22 +14,16 @@ limitations under the License. #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = dataproc_operator.DataProcHadoopOperator( +{{ task_id | to_var }} = DataprocSubmitJobOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, - main_class={{ main_class | to_python }}, - arguments={{ args }}, - {% if hdfs_files %} - files={{ hdfs_files | to_python }}, - {% endif %} - {% if hdfs_archives %} - archives={{ hdfs_archives | to_python }}, - {% endif %} - cluster_name=CONFIG['dataproc_cluster'], - dataproc_hadoop_properties={{ props_macro.props(action_node_properties=action_node_properties, xml_escaped=True) }}, - dataproc_hadoop_jars={{ jar_files_in_hdfs | to_python}}, + job=dict( + placement=dict( + cluster_name=CONFIG['dataproc_cluster'], + ), + hadoop_job={{ hadoop_job | to_python }}, + ), gcp_conn_id=CONFIG['gcp_conn_id'], region=CONFIG['gcp_region'], - dataproc_job_id={{ task_id | to_python }}, params={{ props_macro.props(action_node_properties=action_node_properties) }}, ) diff --git a/o2a/templates/kill.tpl b/o2a/templates/kill.tpl index 3836f112a..acd2f4ead 100644 --- a/o2a/templates/kill.tpl +++ b/o2a/templates/kill.tpl @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. #} -{{ task_id | to_var }} = bash_operator.BashOperator( +{{ task_id | to_var }} = bash.BashOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, bash_command='exit 1', diff --git a/o2a/templates/mapreduce.tpl b/o2a/templates/mapreduce.tpl index 947515e39..e0229146b 100644 --- a/o2a/templates/mapreduce.tpl +++ b/o2a/templates/mapreduce.tpl @@ -14,25 +14,29 @@ limitations under the License. #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = dataproc_operator.DataProcHadoopOperator( +{{ task_id | to_var }} = DataprocSubmitJobOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, - main_class=CONFIG['hadoop_main_class'], - arguments=[ - "{{ '{{' }} params['mapreduce.input.fileinputformat.inputdir'] {{ '}}' }}", - "{{ '{{' }} params['mapreduce.output.fileoutputformat.outputdir'] {{ '}}' }}" - ], - {% if hdfs_files %} - files={{ hdfs_files | to_python }}, - {% endif %} - {% if hdfs_archives %} - archives={{ hdfs_archives | to_python }}, - {% endif %} - cluster_name=CONFIG['dataproc_cluster'], - dataproc_hadoop_properties={{ props_macro.props(action_node_properties=action_node_properties, xml_escaped=True) }}, - dataproc_hadoop_jars=CONFIG['hadoop_jars'].split(','), + job=dict( + placement=dict( + cluster_name=CONFIG['dataproc_cluster'], + ), + hadoop_job=dict( + args=[ + "{{ '{{' }} params['mapreduce.input.fileinputformat.inputdir'] {{ '}}' }}", + "{{ '{{' }} params['mapreduce.output.fileoutputformat.outputdir'] {{ '}}' }}" + ], + jar_file_uris=CONFIG['hadoop_jars'].split(','), + {% if hdfs_files %} + file_uris={{ hdfs_files | to_python }}, + {% endif %} + {% if hdfs_archives %} + archive_uris={{ hdfs_archives | to_python }}, + {% endif %} + main_class=CONFIG['hadoop_main_class'], + ), + ), gcp_conn_id=CONFIG['gcp_conn_id'], region=CONFIG['gcp_region'], - dataproc_job_id={{ task_id | to_python }}, params={{ props_macro.props(action_node_properties=action_node_properties) }}, ) diff --git a/o2a/templates/pig.tpl b/o2a/templates/pig.tpl index 6cfa1f397..ce98dc776 100644 --- a/o2a/templates/pig.tpl +++ b/o2a/templates/pig.tpl @@ -14,15 +14,19 @@ limitations under the License. #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = dataproc_operator.DataProcPigOperator( +{{ task_id | to_var }} = DataprocSubmitJobOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, - query_uri='%s/%s' % (CONFIG['gcp_uri_prefix'], {{ script_file_name | to_python }}), - variables={{ params_dict | to_python }}, - dataproc_pig_properties={{ props_macro.props(action_node_properties=action_node_properties, xml_escaped=True) }}, - cluster_name=CONFIG['dataproc_cluster'], + job=dict( + placement=dict( + cluster_name=CONFIG['dataproc_cluster'], + ), + pig_job=dict( + script_variables={{ params_dict | to_python }}, + query_file_uri='%s/%s' % (CONFIG['gcp_uri_prefix'], {{ script_file_name | to_python }}), + ), + ), gcp_conn_id=CONFIG['gcp_conn_id'], region=CONFIG['gcp_region'], - dataproc_job_id={{ task_id | to_python }}, params={{ props_macro.props(action_node_properties=action_node_properties) }}, ) diff --git a/o2a/templates/prepare.tpl b/o2a/templates/prepare.tpl index b7c3653df..3f9b4d979 100644 --- a/o2a/templates/prepare.tpl +++ b/o2a/templates/prepare.tpl @@ -15,7 +15,7 @@ #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = bash_operator.BashOperator( +{{ task_id | to_var }} = bash.BashOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, bash_command={% include "prepare_command.tpl" %}, diff --git a/o2a/templates/shell.tpl b/o2a/templates/shell.tpl index 678f350ea..afa76d2df 100644 --- a/o2a/templates/shell.tpl +++ b/o2a/templates/shell.tpl @@ -14,7 +14,7 @@ limitations under the License. #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = bash_operator.BashOperator( +{{ task_id | to_var }} = bash.BashOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, bash_command={% include "pig_command.tpl" %} % (CONFIG['dataproc_cluster'], CONFIG['gcp_region'], diff --git a/o2a/templates/spark.tpl b/o2a/templates/spark.tpl index 3a8d95e8c..041e86487 100644 --- a/o2a/templates/spark.tpl +++ b/o2a/templates/spark.tpl @@ -14,27 +14,16 @@ limitations under the License. #} {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = dataproc_operator.DataProcSparkOperator( +{{ task_id | to_var }} = DataprocSubmitJobOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, - {% if main_jar %}main_jar={{ main_jar | to_python }},{% endif %} - {% if main_class %}main_class={{ main_class | to_python }},{% endif %} - arguments={{ arguments | to_python }}, - {% if hdfs_files %} - files={{ hdfs_files | to_python }}, - {% endif %} - {% if hdfs_archives %} - archives={{ hdfs_archives | to_python }}. - {% endif %} - job_name={{ job_name | to_python }}, - cluster_name=CONFIG['dataproc_cluster'], - {% if dataproc_spark_jars %} - dataproc_spark_jars={{ dataproc_spark_jars | to_python }}, - {% endif %} - {% if spark_opts %} - dataproc_spark_properties={{ spark_opts | to_python }}, - {% endif %} + job=dict( + placement=dict( + cluster_name=CONFIG['dataproc_cluster'], + ), + spark_job={{ spark_job | to_python }}, + ), gcp_conn_id=CONFIG['gcp_conn_id'], region=CONFIG['gcp_region'], params={{ props_macro.props(action_node_properties=action_node_properties) }}, -) +) \ No newline at end of file diff --git a/o2a/templates/ssh.tpl b/o2a/templates/ssh.tpl index 6cb5d20a7..44a7abddb 100644 --- a/o2a/templates/ssh.tpl +++ b/o2a/templates/ssh.tpl @@ -13,14 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. #} -{{ task_id | to_var }}_hook = ssh_hook.SSHHook( +{{ task_id | to_var }}_hook = SSHHook( ssh_conn_id='ssh_default', username={{ user | to_python }}, remote_host={{ host | to_python }}, ) {% import "macros/props.tpl" as props_macro %} -{{ task_id | to_var }} = ssh_operator.SSHOperator( +{{ task_id | to_var }} = SSHOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, ssh_hook={{ task_id | to_var }}_hook, diff --git a/o2a/templates/subwf.tpl b/o2a/templates/subwf.tpl index 27291c54b..91611243f 100644 --- a/o2a/templates/subwf.tpl +++ b/o2a/templates/subwf.tpl @@ -17,5 +17,5 @@ {{ task_id | to_var }} = SubDagOperator( task_id={{ task_id | to_python }}, trigger_rule={{ trigger_rule | to_python }}, - subdag=subdag_{{ app_name | to_var }}.sub_dag(dag.dag_id, {{ task_id | to_python }}, dag.start_date, dag.schedule_interval), + subdag=subdag_{{ app_name | to_var }}.sub_dag(dag.dag_id, {{ task_id | to_python }}, dag.start_date, dag.schedule_interval, dag.user_defined_macros), ) diff --git a/o2a/templates/subworkflow.tpl b/o2a/templates/subworkflow.tpl index 8d3a312cb..f9cc5f698 100644 --- a/o2a/templates/subworkflow.tpl +++ b/o2a/templates/subworkflow.tpl @@ -21,11 +21,12 @@ CONFIG={{ config | to_python }} JOB_PROPS={{ job_properties | to_python }} -def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval): +def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval, user_defined_macros): with models.DAG( '{0}.{1}'.format(parent_dag_name, child_dag_name), schedule_interval=schedule_interval, # Change to suit your needs - start_date=start_date # Change to suit your needs + start_date=start_date, # Change to suit your needs + user_defined_macros=user_defined_macros # Change to suit your needs ) as dag: {% filter indent(8, True) %} diff --git a/o2a/templates/workflow_dot.tpl b/o2a/templates/workflow_dot.tpl index b2da8c38f..ca40174fd 100644 --- a/o2a/templates/workflow_dot.tpl +++ b/o2a/templates/workflow_dot.tpl @@ -48,7 +48,7 @@ green {% elif task.trigger_rule == 'one_failed' %} red - {% elif task.trigger_rule == 'dummy' %} + {% elif task.trigger_rule == 'always' %} orange {% else %} black diff --git a/o2a/transformers/add_node_notificaton_transformer.py b/o2a/transformers/add_node_notificaton_transformer.py index 57854ad85..5ff1441ee 100644 --- a/o2a/transformers/add_node_notificaton_transformer.py +++ b/o2a/transformers/add_node_notificaton_transformer.py @@ -26,7 +26,7 @@ TransitionNotificationTaskGroup, ) from o2a.converter.workflow import Workflow -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.transformers.add_workflow_notificaton_transformer import NOTIFICATION_TASK_GROUP_NAMES from o2a.transformers.base_transformer import BaseWorkflowTransformer diff --git a/o2a/transformers/add_workflow_notificaton_transformer.py b/o2a/transformers/add_workflow_notificaton_transformer.py index 9663032cd..3055ebd4f 100644 --- a/o2a/transformers/add_workflow_notificaton_transformer.py +++ b/o2a/transformers/add_workflow_notificaton_transformer.py @@ -18,7 +18,7 @@ from o2a.converter.task import Task from o2a.converter.task_group import TaskGroup from o2a.converter.workflow import Workflow -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.transformers.base_transformer import BaseWorkflowTransformer PROP_WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url" diff --git a/o2a/transformers/base_transformer.py b/o2a/transformers/base_transformer.py index 248d9a303..568f68996 100644 --- a/o2a/transformers/base_transformer.py +++ b/o2a/transformers/base_transformer.py @@ -17,7 +17,7 @@ from o2a.converter.workflow import Workflow -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class BaseWorkflowTransformer: diff --git a/o2a/utils/config_extractors.py b/o2a/utils/config_extractors.py index 755975902..ab905f214 100644 --- a/o2a/utils/config_extractors.py +++ b/o2a/utils/config_extractors.py @@ -20,7 +20,7 @@ from o2a.converter.constants import HDFS_FOLDER from o2a.converter.exceptions import ParseException -from o2a.o2a_libs import el_parser +from o2a.o2a_libs.src.o2a_lib import el_parser TAG_CONFIGURATION = "configuration" TAG_PROPERTY = "property" diff --git a/o2a/utils/el_utils.py b/o2a/utils/el_utils.py index 567b99cbe..cd50c6cf4 100644 --- a/o2a/utils/el_utils.py +++ b/o2a/utils/el_utils.py @@ -25,8 +25,8 @@ from jinja2.exceptions import UndefinedError from o2a.converter.exceptions import ParseException -from o2a.o2a_libs import el_parser -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib import el_parser +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet def strip_el(el_function: str) -> str: diff --git a/o2a/utils/file_archive_extractors.py b/o2a/utils/file_archive_extractors.py index 1ec57c5b5..d612ea1c4 100644 --- a/o2a/utils/file_archive_extractors.py +++ b/o2a/utils/file_archive_extractors.py @@ -16,8 +16,8 @@ from typing import List from xml.etree.ElementTree import Element -from o2a.o2a_libs import el_parser -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib import el_parser +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class HdfsPathProcessor: diff --git a/o2a/utils/param_extractor.py b/o2a/utils/param_extractor.py index 8d76ff7f0..0bfb7743b 100644 --- a/o2a/utils/param_extractor.py +++ b/o2a/utils/param_extractor.py @@ -15,7 +15,7 @@ """Extract params from oozie's action node""" from _elementtree import Element -from o2a.o2a_libs import el_parser +from o2a.o2a_libs.src.o2a_lib import el_parser from o2a.utils import xml_utils TAG_PARAM = "param" diff --git a/o2a/utils/xml_utils.py b/o2a/utils/xml_utils.py index 5046b0e5d..fe643cd44 100644 --- a/o2a/utils/xml_utils.py +++ b/o2a/utils/xml_utils.py @@ -15,7 +15,7 @@ """XML parsing utilities""" from typing import List, Optional, cast from xml.etree import ElementTree as ET -from o2a.o2a_libs import el_parser +from o2a.o2a_libs.src.o2a_lib import el_parser class NoNodeFoundException(Exception): diff --git a/requirements.txt b/requirements.txt index 506129558..1ee945c9f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,23 +1,23 @@ -apache-airflow==1.10.14 # pyup: ignore +apache-airflow>=2.0.0 # pyup: ignore autoflake==1.4 -black==20.8b1 +black==23.1a1 flake8==3.8.4 -google-api-python-client==1.12.8 +google-api-python-client==1.12.11 isort==5.7.0 j2cli==0.3.10 -Jinja2==2.11.2 # pyup: ignore +Jinja2==3.1.2 # pyup: ignore lark-parser==0.11.1 -mypy==0.790 +mypy==1.0.0 parameterized==0.7.5 -paramiko==2.7.2 -pre-commit==2.10.1 +paramiko==3.1.0 +pre-commit==3.3.1 pydeps==1.9.13 pylint==2.6.0 -pytest==6.2.1 -pytest-cov==2.11.1 +pytest==7.3.1 +pytest-cov==4.0.0 safety==1.10.3 sshtunnel==0.4.0 -twine==3.3.0 -tzlocal==2.1 # pyup: ignore -Werkzeug==1.0.1 # pyup: ignore -yamllint==1.25.0 +twine==4.0.2 +tzlocal==5.0 # pyup: ignore +Werkzeug==2.2.3 # pyup: ignore +yamllint==1.31.0 diff --git a/setup.py b/setup.py index eb508579d..af881a82b 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ setup( name=NAME, - version="1.0.1", + version="2.0.0", author="Jarek Potiuk, Szymon Przedwojski, Kamil Breguła, Feng Lu, Cameron Moberg", author_email="jarek.potiuk@polidea.com, szymon.przedwojski@polidea.com, " "kamil.bregula@polidea.com, fenglu@google.com, cjmoberg@google.com", @@ -40,9 +40,9 @@ scripts=["bin/o2a", "bin/o2a-validate-workflows"], packages=["o2a"], classifiers=[ - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", ], diff --git a/tests/converter/test_oozie_converter.py b/tests/converter/test_oozie_converter.py index dfc50a707..58f3ae598 100644 --- a/tests/converter/test_oozie_converter.py +++ b/tests/converter/test_oozie_converter.py @@ -311,16 +311,17 @@ def test_should_convert_demo_workflow(self): self.assertEqual( { "from airflow import models", - "from airflow.contrib.operators import dataproc_operator", - "from airflow.operators import bash_operator", - "from airflow.operators import dummy_operator", - "from airflow.operators import python_operator", - "from airflow.operators.subdag_operator import SubDagOperator", - "from airflow.operators import bash_operator, dummy_operator", + "from airflow.providers.google.cloud.operators import dataproc", + "from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator", + "from airflow.operators import bash", + "from airflow.operators import empty", + "from airflow.operators import python", + "from airflow.operators.subdag import SubDagOperator", + "from airflow.operators import bash, empty", "from airflow.utils import dates", "from airflow.utils.trigger_rule import TriggerRule", - "from o2a.o2a_libs import functions", - "from o2a.o2a_libs.property_utils import PropertySet", + "from o2a_lib import functions", + "from o2a_lib.property_utils import PropertySet", "import datetime", "import shlex", "import subdag_childwf", diff --git a/tests/converter/test_renderers.py b/tests/converter/test_renderers.py index 7af1dd5a9..ceadd78a1 100644 --- a/tests/converter/test_renderers.py +++ b/tests/converter/test_renderers.py @@ -27,7 +27,7 @@ from o2a.converter.task_group import TaskGroup from o2a.converter.workflow import Workflow from o2a.mappers.dummy_mapper import DummyMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet def _create_workflow(): diff --git a/tests/converter/test_workflow_xml_parser.py b/tests/converter/test_workflow_xml_parser.py index 3da07d976..5d6bd8c0d 100644 --- a/tests/converter/test_workflow_xml_parser.py +++ b/tests/converter/test_workflow_xml_parser.py @@ -30,7 +30,7 @@ from o2a.mappers import dummy_mapper, pig_mapper from o2a.mappers import ssh_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestWorkflowXmlParser(unittest.TestCase): diff --git a/tests/mappers/extensions/test_prepare_mapper_extension.py b/tests/mappers/extensions/test_prepare_mapper_extension.py index 660e058d9..183b0188e 100644 --- a/tests/mappers/extensions/test_prepare_mapper_extension.py +++ b/tests/mappers/extensions/test_prepare_mapper_extension.py @@ -20,7 +20,7 @@ from o2a.converter.task import Task from o2a.mappers.base_mapper import BaseMapper from o2a.mappers.extensions.prepare_mapper_extension import PrepareMapperExtension -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet TEST_MAPPER_NAME = "mapper" diff --git a/tests/mappers/test_action_mapper.py b/tests/mappers/test_action_mapper.py index 4ee9f9aae..4ac776181 100644 --- a/tests/mappers/test_action_mapper.py +++ b/tests/mappers/test_action_mapper.py @@ -21,7 +21,7 @@ from o2a.converter.relation import Relation from o2a.converter.task import Task from o2a.mappers.action_mapper import ActionMapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.utils.config_extractors import TAG_CONFIGURATION, TAG_JOB_XML from o2a.utils.xml_utils import find_node_by_tag, find_nodes_by_tag diff --git a/tests/mappers/test_archive_extractor.py b/tests/mappers/test_archive_extractor.py index dd8d974c0..638b0837e 100644 --- a/tests/mappers/test_archive_extractor.py +++ b/tests/mappers/test_archive_extractor.py @@ -18,7 +18,7 @@ from xml.etree import ElementTree as ET from o2a.utils.file_archive_extractors import ArchiveExtractor -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestArchiveExtractor(unittest.TestCase): diff --git a/tests/mappers/test_base_mapper.py b/tests/mappers/test_base_mapper.py index 7bfc81b38..2b4e3a773 100644 --- a/tests/mappers/test_base_mapper.py +++ b/tests/mappers/test_base_mapper.py @@ -34,7 +34,7 @@ from o2a.mappers import base_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestBaseMapper(unittest.TestCase): diff --git a/tests/mappers/test_distcp_mapper.py b/tests/mappers/test_distcp_mapper.py index bd8da8d97..b62966c78 100644 --- a/tests/mappers/test_distcp_mapper.py +++ b/tests/mappers/test_distcp_mapper.py @@ -24,7 +24,7 @@ from o2a.mappers.distcp_mapper import DistCpMapper # language=XML -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet EXAMPLE_XML = """ diff --git a/tests/mappers/test_email_mapper.py b/tests/mappers/test_email_mapper.py index 05b579a1b..9357594f3 100644 --- a/tests/mappers/test_email_mapper.py +++ b/tests/mappers/test_email_mapper.py @@ -20,7 +20,7 @@ from o2a.converter.task import Task from o2a.mappers import email_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestEmailMapper(unittest.TestCase): diff --git a/tests/mappers/test_file_extractor.py b/tests/mappers/test_file_extractor.py index 096d2f587..f230c3d4f 100644 --- a/tests/mappers/test_file_extractor.py +++ b/tests/mappers/test_file_extractor.py @@ -18,7 +18,7 @@ from xml.etree import ElementTree as ET from o2a.utils.file_archive_extractors import FileExtractor -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestFileExtractor(unittest.TestCase): diff --git a/tests/mappers/test_fs_mapper.py b/tests/mappers/test_fs_mapper.py index f6a8048ec..2c03f40c2 100644 --- a/tests/mappers/test_fs_mapper.py +++ b/tests/mappers/test_fs_mapper.py @@ -24,7 +24,7 @@ from o2a.converter.task import Task from o2a.converter.relation import Relation from o2a.mappers import fs_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet TEST_JOB_PROPS: Dict[str, str] = {"user.name": "pig", "nameNode": "hdfs://localhost:8020"} TEST_CONFIG: Dict[str, str] = {} diff --git a/tests/mappers/test_git_mapper.py b/tests/mappers/test_git_mapper.py index 89851124a..bb3e7fc13 100644 --- a/tests/mappers/test_git_mapper.py +++ b/tests/mappers/test_git_mapper.py @@ -24,7 +24,7 @@ from o2a.mappers.git_mapper import prepare_git_command # language=XML -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet EXAMPLE_XML = """ diff --git a/tests/mappers/test_hive_mapper.py b/tests/mappers/test_hive_mapper.py index 7d5332065..bcfcb8da8 100644 --- a/tests/mappers/test_hive_mapper.py +++ b/tests/mappers/test_hive_mapper.py @@ -23,7 +23,7 @@ from o2a.converter.task import Task from o2a.converter.relation import Relation from o2a.mappers import hive_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet # language=XML TEST_BASE_HIVE = """ @@ -112,10 +112,12 @@ def test_to_tasks_and_relations_should_parse_query_element(self): template_name="hive.tpl", trigger_rule="one_success", template_params={ - "query": "DROP TABLE IF EXISTS test_query;\nCREATE EXTERNAL TABLE test_query (a INT) " - "STORED AS TEXTFILE\nLOCATION '/user/{{userName}}/{{examplesRoot}}/input/';" - "\nINSERT OVERWRITE DIRECTORY '/user/{{userName}}/{{examplesRoot}}/output/' " - "SELECT * FROM test_query;", + "query_obj": { + "queries": ["DROP TABLE IF EXISTS test_query;\nCREATE EXTERNAL TABLE test_query (a INT) " + "STORED AS TEXTFILE\nLOCATION '/user/{{userName}}/{{examplesRoot}}/input/';" + "\nINSERT OVERWRITE DIRECTORY '/user/{{userName}}/{{examplesRoot}}/output/' " + "SELECT * FROM test_query;"] + }, "script": None, "props": PropertySet( config={}, @@ -154,7 +156,7 @@ def test_to_tasks_and_relations_should_parse_script_element(self): template_name="hive.tpl", trigger_rule="one_success", template_params={ - "query": None, + "query_obj": None, "script": "script.q", "props": PropertySet( config={}, @@ -223,11 +225,13 @@ def test_to_tasks_and_relations_should_parse_file_elements(self): template_name="hive.tpl", trigger_rule="one_success", template_params={ - "query": "DROP TABLE IF EXISTS test_query;" - "\nCREATE EXTERNAL TABLE test_query (a INT) STORED " - "AS TEXTFILE\nLOCATION '/user/{{userName}}/{{examplesRoot}}/input/';\nINSERT " - "OVERWRITE DIRECTORY '/user/{{userName}}/{{examplesRoot}}/output/' " - "SELECT * FROM test_query;", + "query_obj": { + "queries": ["DROP TABLE IF EXISTS test_query;" + "\nCREATE EXTERNAL TABLE test_query (a INT) STORED " + "AS TEXTFILE\nLOCATION '/user/{{userName}}/{{examplesRoot}}/input/';\nINSERT " + "OVERWRITE DIRECTORY '/user/{{userName}}/{{examplesRoot}}/output/' " + "SELECT * FROM test_query;"] + }, "script": None, "props": PropertySet( config={}, @@ -272,10 +276,12 @@ def test_to_tasks_and_relations_should_parse_archive_element(self): template_name="hive.tpl", trigger_rule="one_success", template_params={ - "query": "DROP TABLE IF EXISTS test_query;\nCREATE EXTERNAL TABLE test_query (a INT) " - "STORED AS TEXTFILE\nLOCATION '/user/{{userName}}/{{examplesRoot}}/input/';" - "\nINSERT OVERWRITE DIRECTORY '/user/{{userName}}/{{examplesRoot}}/output/' " - "SELECT * FROM test_query;", + "query_obj": { + "queries": [ "DROP TABLE IF EXISTS test_query;\nCREATE EXTERNAL TABLE test_query (a INT) " + "STORED AS TEXTFILE\nLOCATION '/user/{{userName}}/{{examplesRoot}}/input/';" + "\nINSERT OVERWRITE DIRECTORY '/user/{{userName}}/{{examplesRoot}}/output/' " + "SELECT * FROM test_query;"] + }, "script": None, "props": PropertySet( config={}, diff --git a/tests/mappers/test_java_mapper.py b/tests/mappers/test_java_mapper.py index fb7bc1672..1cb2368a6 100644 --- a/tests/mappers/test_java_mapper.py +++ b/tests/mappers/test_java_mapper.py @@ -20,7 +20,7 @@ from o2a.converter.task import Task from o2a.mappers import java_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestJavaMapper(unittest.TestCase): @@ -248,11 +248,13 @@ def test_to_tasks_and_relations(self): }, action_node_properties={"mapred.job.queue.name": "{{queueName}}"}, ), - "hdfs_files": [], - "hdfs_archives": [], - "main_class": "org.apache.oozie.example.DemoJavaMain", - "jar_files_in_hdfs": [], - "args": ["Hello", "Oozie!"], + "hadoop_job": dict( + args=["Hello", "Oozie!"], + jar_file_uris=[], + file_uris=[], + archive_uris=[], + main_class="org.apache.oozie.example.DemoJavaMain", + ), }, ) ], diff --git a/tests/mappers/test_mapreduce_mapper.py b/tests/mappers/test_mapreduce_mapper.py index db2035193..d64061080 100644 --- a/tests/mappers/test_mapreduce_mapper.py +++ b/tests/mappers/test_mapreduce_mapper.py @@ -23,7 +23,7 @@ from o2a.mappers import mapreduce_mapper # language=XML -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet EXAMPLE_XML = """ diff --git a/tests/mappers/test_pig_mapper.py b/tests/mappers/test_pig_mapper.py index cc51c50f8..cf6ee7543 100644 --- a/tests/mappers/test_pig_mapper.py +++ b/tests/mappers/test_pig_mapper.py @@ -21,7 +21,7 @@ from o2a.converter.task import Task from o2a.converter.relation import Relation from o2a.mappers import pig_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestPigMapper(unittest.TestCase): diff --git a/tests/mappers/test_shell_mapper.py b/tests/mappers/test_shell_mapper.py index 3e487c829..ce2cbadd1 100644 --- a/tests/mappers/test_shell_mapper.py +++ b/tests/mappers/test_shell_mapper.py @@ -21,7 +21,7 @@ from o2a.converter.task import Task from o2a.converter.relation import Relation from o2a.mappers import shell_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestShellMapper(unittest.TestCase): diff --git a/tests/mappers/test_spark_mapper.py b/tests/mappers/test_spark_mapper.py index 9d400837d..2190c55d1 100644 --- a/tests/mappers/test_spark_mapper.py +++ b/tests/mappers/test_spark_mapper.py @@ -21,7 +21,7 @@ from o2a.converter.task import Task from o2a.converter.relation import Relation from o2a.mappers import spark_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet EXAMPLE_JOB_PROPS = {"nameNode": "hdfs://", "userName": "test_user", "examplesRoot": "examples"} @@ -106,17 +106,19 @@ def test_to_tasks_and_relations_with_prepare_node(self): task_id="test_id", template_name="spark.tpl", template_params={ - "main_jar": None, - "main_class": "org.apache.spark.examples.mllib.JavaALS", - "arguments": ["inputpath=hdfs:///input/file.txt", "value=2"], - "hdfs_archives": [], - "hdfs_files": [], "job_name": "Spark Examples", - "spark_opts": { - "spark.executor.extraJavaOptions": "-XX:+HeapDumpOnOutOfMemoryError " - "-XX:HeapDumpPath=/tmp" + "spark_job": { + "args": ["inputpath=hdfs:///input/file.txt", "value=2"], + "jar_file_uris": ["/lib/spark-examples_2.10-1.1.0.jar"], + "file_uris": [], + "archive_uris": [], + "properties": { + "spark.executor.extraJavaOptions": "-XX:+HeapDumpOnOutOfMemoryError " + "-XX:HeapDumpPath=/tmp" + }, + "main_jar_file_uri": None, + "main_class": "org.apache.spark.examples.mllib.JavaALS", }, - "dataproc_spark_jars": ["/lib/spark-examples_2.10-1.1.0.jar"], }, ), ], @@ -139,22 +141,24 @@ def test_to_tasks_and_relations_without_prepare_node(self): template_name="spark.tpl", trigger_rule="one_success", template_params={ - "main_jar": None, - "main_class": "org.apache.spark.examples.mllib.JavaALS", - "arguments": [ - "inputpath=hdfs:///input/file.txt", - "value=2", - "/user/{{userName}}/{{examplesRoot}}/apps/spark/lib/oozie-examples-4.3.0.jar", - ], - "hdfs_archives": [], - "hdfs_files": [], "job_name": "Spark Examples", - "dataproc_spark_jars": [ - "/user/{{userName}}/{{examplesRoot}}/apps/spark/lib/oozie-examples-4.3.0.jar" - ], - "spark_opts": { - "spark.executor.extraJavaOptions": "-XX:+HeapDumpOnOutOfMemoryError " - "-XX:HeapDumpPath=/tmp" + "spark_job": { + "args": [ + "inputpath=hdfs:///input/file.txt", + "value=2", + "/user/{{userName}}/{{examplesRoot}}/apps/spark/lib/oozie-examples-4.3.0.jar", + ], + "jar_file_uris": [ + "/user/{{userName}}/{{examplesRoot}}/apps/spark/lib/oozie-examples-4.3.0.jar" + ], + "file_uris": [], + "archive_uris": [], + "properties": { + "spark.executor.extraJavaOptions": "-XX:+HeapDumpOnOutOfMemoryError " + "-XX:HeapDumpPath=/tmp" + }, + "main_jar_file_uri": None, + "main_class": "org.apache.spark.examples.mllib.JavaALS", }, }, ) diff --git a/tests/mappers/test_ssh_mapper.py b/tests/mappers/test_ssh_mapper.py index da6e72b4c..ecfc6109c 100644 --- a/tests/mappers/test_ssh_mapper.py +++ b/tests/mappers/test_ssh_mapper.py @@ -20,7 +20,7 @@ from o2a.converter.task import Task from o2a.mappers import ssh_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestSSHMapper(unittest.TestCase): diff --git a/tests/mappers/test_subworkflow_mapper.py b/tests/mappers/test_subworkflow_mapper.py index 35c4fb228..cb289032a 100644 --- a/tests/mappers/test_subworkflow_mapper.py +++ b/tests/mappers/test_subworkflow_mapper.py @@ -24,7 +24,7 @@ from o2a.converter.task import Task from o2a.definitions import EXAMPLE_SUBWORKFLOW_PATH from o2a.mappers import subworkflow_mapper -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestSubworkflowMapper(TestCase): diff --git a/tests/o2a_libs/test_el_fs_functions.py b/tests/o2a_libs/test_el_fs_functions.py index fbe359333..a86db6834 100644 --- a/tests/o2a_libs/test_el_fs_functions.py +++ b/tests/o2a_libs/test_el_fs_functions.py @@ -19,13 +19,13 @@ from airflow import AirflowException -from o2a.o2a_libs import el_fs_functions as fs +from o2a.o2a_libs.src.o2a_lib import el_fs_functions as fs DELIMITER = "c822c1b63853ed273b89687ac505f9fa" class TestFsFunctions(unittest.TestCase): - @patch("o2a.o2a_libs.el_fs_functions.subprocess.Popen") + @patch("o2a.o2a_libs.src.o2a_lib.el_fs_functions.subprocess.Popen") def test_pig_job_executor_success(self, mock_pipe): output = DELIMITER + "output" + DELIMITER mock_pipe.return_value.communicate.return_value = (b"", bytes(output, encoding="utf-8")) @@ -33,7 +33,7 @@ def test_pig_job_executor_success(self, mock_pipe): self.assertEqual("output", fs._pig_job_executor("success")) # pylint:disable=protected-access - @patch("o2a.o2a_libs.el_fs_functions.subprocess.Popen") + @patch("o2a.o2a_libs.src.o2a_lib.el_fs_functions.subprocess.Popen") def test_pig_job_executor_fail(self, mock_pipe): output = DELIMITER + "output" + DELIMITER mock_pipe.return_value.communicate.return_value = (b"", bytes(output, encoding="utf-8")) @@ -42,7 +42,7 @@ def test_pig_job_executor_fail(self, mock_pipe): with self.assertRaises(AirflowException): fs._pig_job_executor("fail") # pylint:disable=protected-access - @patch("o2a.o2a_libs.el_fs_functions.subprocess.Popen") + @patch("o2a.o2a_libs.src.o2a_lib.el_fs_functions.subprocess.Popen") def test_exists(self, mock_pipe): output = DELIMITER + "output" + DELIMITER mock_pipe.return_value.communicate.return_value = (b"", bytes(output, encoding="utf-8")) @@ -53,7 +53,7 @@ def test_exists(self, mock_pipe): mock_pipe.return_value.poll.return_value = 1 self.assertFalse(fs.exists("path/to/file")) - @patch("o2a.o2a_libs.el_fs_functions.subprocess.Popen") + @patch("o2a.o2a_libs.src.o2a_lib.el_fs_functions.subprocess.Popen") def test_is_dir(self, mock_pipe): output = DELIMITER + "output" + DELIMITER mock_pipe.return_value.communicate.return_value = (b"", bytes(output, encoding="utf-8")) @@ -64,7 +64,7 @@ def test_is_dir(self, mock_pipe): mock_pipe.return_value.poll.return_value = 1 self.assertFalse(fs.is_dir("path/to/file")) - @patch("o2a.o2a_libs.el_fs_functions.subprocess.Popen") + @patch("o2a.o2a_libs.src.o2a_lib.el_fs_functions.subprocess.Popen") def test_dir_size(self, mock_pipe): output = DELIMITER + "12" + DELIMITER mock_pipe.return_value.communicate.return_value = (b"", bytes(output, encoding="utf-8")) @@ -75,7 +75,7 @@ def test_dir_size(self, mock_pipe): mock_pipe.return_value.poll.return_value = 1 self.assertEqual(-1, fs.dir_size("path/to/file")) - @patch("o2a.o2a_libs.el_fs_functions.subprocess.Popen") + @patch("o2a.o2a_libs.src.o2a_lib.el_fs_functions.subprocess.Popen") def test_file_size(self, mock_pipe): output = DELIMITER + "12" + DELIMITER mock_pipe.return_value.communicate.return_value = (b"", bytes(output, encoding="utf-8")) @@ -86,7 +86,7 @@ def test_file_size(self, mock_pipe): mock_pipe.return_value.poll.return_value = 1 self.assertEqual(-1, fs.file_size("path/to/file")) - @patch("o2a.o2a_libs.el_fs_functions.subprocess.Popen") + @patch("o2a.o2a_libs.src.o2a_lib.el_fs_functions.subprocess.Popen") def test_block_size(self, mock_pipe): output = DELIMITER + "12" + DELIMITER mock_pipe.return_value.communicate.return_value = (b"", bytes(output, encoding="utf-8")) diff --git a/tests/o2a_libs/test_el_parser.py b/tests/o2a_libs/test_el_parser.py index 86362a3f7..c7e357026 100644 --- a/tests/o2a_libs/test_el_parser.py +++ b/tests/o2a_libs/test_el_parser.py @@ -22,8 +22,8 @@ from airflow import AirflowException -import o2a.o2a_libs.functions as functions -from o2a.o2a_libs.el_parser import translate +import o2a.o2a_libs.src.o2a_lib.functions as functions +from o2a.o2a_libs.src.o2a_lib.el_parser import translate class TestElParser(unittest.TestCase): diff --git a/tests/o2a_libs/test_functions.py b/tests/o2a_libs/test_functions.py index 8857c847d..36d0aa495 100644 --- a/tests/o2a_libs/test_functions.py +++ b/tests/o2a_libs/test_functions.py @@ -18,8 +18,8 @@ from parameterized import parameterized -import o2a.o2a_libs.functions as functions -from o2a.o2a_libs.el_wf_functions import _reverse_task_map +import o2a.o2a_libs.src.o2a_lib.functions as functions +from o2a.o2a_libs.src.o2a_lib.el_wf_functions import _reverse_task_map class TestFunctions(unittest.TestCase): diff --git a/tests/o2a_libs/test_property_utils.py b/tests/o2a_libs/test_property_utils.py index 1f76f32e7..cc6f4623d 100644 --- a/tests/o2a_libs/test_property_utils.py +++ b/tests/o2a_libs/test_property_utils.py @@ -16,7 +16,7 @@ from unittest import TestCase -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestPropertySet(TestCase): diff --git a/tests/test_templates.py b/tests/test_templates.py index e42c971ce..485178a8e 100644 --- a/tests/test_templates.py +++ b/tests/test_templates.py @@ -181,7 +181,7 @@ class DecisionTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = dict( task_id="DAG_NAME_A", - trigger_rule=TriggerRule.DUMMY, + trigger_rule=TriggerRule.ALWAYS, case_dict={ '{{functions.first_not_null("first","second") == "second"}}': "first", '{{functions.first_not_null("first","second") == "first"}}': "end", @@ -205,7 +205,7 @@ def test_escape_character(self, mutation): class DummyTemplateTestCase(BaseTestCases.BaseTemplateTestCase): TEMPLATE_NAME = "dummy.tpl" - DEFAULT_TEMPLATE_PARAMS = dict(task_id="DAG_NAME_A", trigger_rule=TriggerRule.DUMMY) + DEFAULT_TEMPLATE_PARAMS = dict(task_id="DAG_NAME_A", trigger_rule=TriggerRule.ALWAYS) def test_minimal_green_path(self): res = render_template(self.TEMPLATE_NAME, **self.DEFAULT_TEMPLATE_PARAMS) @@ -224,7 +224,7 @@ class FsOpTempalteTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "DAG_NAME_A", "pig_command": "DAG_NAME_A", - "trigger_rule": TriggerRule.DUMMY, + "trigger_rule": TriggerRule.ALWAYS, "action_node_properties": {"key": "value"}, } @@ -251,7 +251,7 @@ class GitTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "TASK_ID", - "trigger_rule": "dummy", + "trigger_rule": "always", "git_uri": "https://github.com/apache/oozie", "git_branch": "my-awesome-branch", "destination_path": "/my_git_repo_directory", @@ -289,9 +289,11 @@ class HiveTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "AA", - "trigger_rule": "dummy", + "trigger_rule": "always", "script": "id.q", - "query": "SELECT 1", + "query_obj": { + "queries": ["SELECT 1"] + }, "variables": { "INPUT": "/user/${wf:user()}/${examplesRoot}/input-data/text", "OUTPUT": "/user/${wf:user()}/${examplesRoot}/output-data/demo/hive-node", @@ -330,7 +332,7 @@ def test_escape_character(self, mutation): class KillTemplateTestCase(BaseTestCases.BaseTemplateTestCase): TEMPLATE_NAME = "kill.tpl" - DEFAULT_TEMPLATE_PARAMS = dict(task_id="DAG_NAME_A", trigger_rule=TriggerRule.DUMMY) + DEFAULT_TEMPLATE_PARAMS = dict(task_id="DAG_NAME_A", trigger_rule=TriggerRule.ALWAYS) def test_minimal_green_path(self): res = render_template(self.TEMPLATE_NAME, **self.DEFAULT_TEMPLATE_PARAMS) @@ -348,7 +350,7 @@ class MapReduceTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "AA", - "trigger_rule": "dummy", + "trigger_rule": "always", "action_node_properties": { "mapred.mapper.new-api": "true", "mapred.reducer.new-api": "true", @@ -394,7 +396,7 @@ class PigTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "AA", - "trigger_rule": "dummy", + "trigger_rule": "always", "params_dict": { "INPUT": "/user/${wf:user()}/${examplesRoot}/input-data/text", "OUTPUT": "/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node", @@ -435,7 +437,7 @@ class PrepareTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "DAG_NAME_A", - "trigger_rule": "dummy", + "trigger_rule": "always", "delete": "file1 file2", "mkdir": "file3 file4", "action_node_properties": {"key": "value"}, @@ -460,7 +462,7 @@ class ShellTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "DAG_NAME_A", "pig_command": "PIG_CMD", - "trigger_rule": "dummy", + "trigger_rule": "always", "action_node_properties": {"key": "value"}, } @@ -487,18 +489,19 @@ class SparkTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "AA", - "hdfs_archives": [], - "arguments": ["inputpath=hdfs:///input/file.txt", "value=2"], - "dataproc_spark_jars": ["/lib/spark-examples_2.10-1.1.0.jar"], - "spark_opts": { - "mapred.compress.map.output": "true", - "spark.executor.extraJavaOptions": "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp", + "spark_job": { + "args": ["inputpath=hdfs:///input/file.txt", "value=2"], + "jar_file_uris": ["/lib/spark-examples_2.10-1.1.0.jar"], + "file_uris": [], + "archive_uris": [], + "properties": { + "mapred.compress.map.output": "true", + "spark.executor.extraJavaOptions": "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp", + }, + "main_jar_file_uri": None, + "main_class": "org.apache.spark.examples.mllib.JavaALS" }, - "hdfs_files": [], - "job_name": "Spark Examples", - "main_class": "org.apache.spark.examples.mllib.JavaALS", - "main_jar": None, - "trigger_rule": "dummy", + "trigger_rule": "always", "action_node_properties": {"key": "value"}, } @@ -543,7 +546,7 @@ class SshTemplateTestCase(BaseTestCases.BaseTemplateTestCase): DEFAULT_TEMPLATE_PARAMS = { "task_id": "AA", "job_properties": {}, - "trigger_rule": "dummy", + "trigger_rule": "always", "command": "ls -l -a", "user": "user", "host": "apache.org", @@ -580,7 +583,7 @@ def test_escape_character(self, mutation): class SubwfTemplateTestCase(BaseTestCases.BaseTemplateTestCase): TEMPLATE_NAME = "subwf.tpl" - DEFAULT_TEMPLATE_PARAMS = {"task_id": "test_id", "trigger_rule": "dummy", "app_name": "DAG_NAME_A"} + DEFAULT_TEMPLATE_PARAMS = {"task_id": "test_id", "trigger_rule": "always", "app_name": "DAG_NAME_A"} def test_green_path(self): res = render_template(self.TEMPLATE_NAME, **self.DEFAULT_TEMPLATE_PARAMS) diff --git a/tests/transformers/test_add_node_notification_transformer.py b/tests/transformers/test_add_node_notification_transformer.py index 1bc00f765..9dcd87d2a 100644 --- a/tests/transformers/test_add_node_notification_transformer.py +++ b/tests/transformers/test_add_node_notification_transformer.py @@ -18,7 +18,7 @@ from o2a.converter.task import Task from o2a.converter.task_group import ActionTaskGroup, ControlTaskGroup from o2a.converter.workflow import Workflow -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.transformers.add_node_notificaton_transformer import ( AddNodeNotificationTransformer, NODE_STATUS_SUFFIX, diff --git a/tests/transformers/test_add_workflow_notification_transformer.py b/tests/transformers/test_add_workflow_notification_transformer.py index ae1dc87ce..03b8d5e36 100644 --- a/tests/transformers/test_add_workflow_notification_transformer.py +++ b/tests/transformers/test_add_workflow_notification_transformer.py @@ -18,7 +18,7 @@ from o2a.converter.task import Task from o2a.converter.task_group import TaskGroup from o2a.converter.workflow import Workflow -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet from o2a.transformers.add_workflow_notificaton_transformer import ( AddWorkflowNotificationTransformer, PROP_WORKFLOW_NOTIFICATION_URL, diff --git a/tests/utils/test_el_utils.py b/tests/utils/test_el_utils.py index 52eddd747..d54aee646 100644 --- a/tests/utils/test_el_utils.py +++ b/tests/utils/test_el_utils.py @@ -24,7 +24,7 @@ from o2a.utils.el_utils import normalize_path, escape_string_with_python_escapes, replace_url_el # pylint: disable=too-many-public-methods -from o2a.o2a_libs.property_utils import PropertySet +from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet class TestELUtils(unittest.TestCase):