diff --git a/astronomer/providers/apache/hive/example_dags/example_hive.py b/astronomer/providers/apache/hive/example_dags/example_hive.py index 7f537d062..504073c8a 100644 --- a/astronomer/providers/apache/hive/example_dags/example_hive.py +++ b/astronomer/providers/apache/hive/example_dags/example_hive.py @@ -206,10 +206,11 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None: raise error -def create_key_pair() -> None: +def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None: """ Load the private_key from airflow variable and creates a pem_file - at /tmp/. + at /tmp/. SSH into the machine and execute the bash script from the list + of commands. """ # remove the file if it exists if os.path.exists(f"/tmp/{PEM_FILENAME}.pem"): @@ -221,17 +222,11 @@ def create_key_pair() -> None: # write private key to file with 400 permissions os.chmod(f"/tmp/{PEM_FILENAME}.pem", 0o400) - # check if the PEM file exists or not. + # Check if the PEM file exists or not. if not os.path.exists(f"/tmp/{PEM_FILENAME}.pem"): - # if it doesn't exists raise an error. + # if it doesn't exists raise an error raise AirflowException("PEM file wasn't copied properly.") - -def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None: - """ - SSH into the machine and execute the bash script from the list - of commands. - """ import paramiko key = paramiko.RSAKey.from_private_key_file(kwargs["path_to_pem_file"]) @@ -296,13 +291,6 @@ def get_cluster_details(task_instance: Any) -> None: default_args=default_args, tags=["example", "async", "hive", "hive_partition"], ) as dag: - # [START howto_create_key_pair_file] - create_key_pair_file = PythonOperator( - task_id="create_key_pair_file", - python_callable=create_key_pair, - ) - # [END howto_create_key_pair_file] - # [START howto_operator_emr_create_job_flow] cluster_creator = EmrCreateJobFlowOperator( task_id="cluster_creator", @@ -388,8 +376,7 @@ def get_cluster_details(task_instance: Any) -> None: ) # [END howto_operator_emr_terminate_job_flow] ( - create_key_pair_file - >> cluster_creator + cluster_creator >> describe_created_cluster >> get_and_add_ip_address_for_inbound_rules >> ssh_and_copy_pifile_to_hdfs diff --git a/astronomer/providers/apache/livy/example_dags/example_livy.py b/astronomer/providers/apache/livy/example_dags/example_livy.py index 175af1d23..3f892dc23 100644 --- a/astronomer/providers/apache/livy/example_dags/example_livy.py +++ b/astronomer/providers/apache/livy/example_dags/example_livy.py @@ -169,10 +169,11 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None: raise error -def create_key_pair() -> None: +def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None: """ Load the private_key from airflow variable and creates a pem_file - at /tmp/. + at /tmp/. SSH into the machine and execute the bash script from the list + of commands. """ # remove the file if it exists if os.path.exists(f"/tmp/{PEM_FILENAME}.pem"): @@ -189,12 +190,6 @@ def create_key_pair() -> None: # if it doesn't exists raise an error. raise AirflowException("PEM file wasn't copied properly.") - -def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None: - """ - SSH into the machine and execute the bash script from the list - of commands. - """ import paramiko key = paramiko.RSAKey.from_private_key_file(kwargs["path_to_pem_file"]) @@ -262,13 +257,6 @@ def get_cluster_details(task_instance: Any) -> None: default_args=default_args, tags=["example", "async", "livy"], ) as dag: - # [START howto_create_key_pair_file] - create_key_pair_file = PythonOperator( - task_id="create_key_pair_file", - python_callable=create_key_pair, - ) - # [END howto_create_key_pair_file] - # [START howto_operator_emr_create_job_flow] cluster_creator = EmrCreateJobFlowOperator( task_id="cluster_creator", @@ -334,8 +322,7 @@ def get_cluster_details(task_instance: Any) -> None: # [END howto_operator_emr_terminate_job_flow] ( - create_key_pair_file - >> cluster_creator + cluster_creator >> describe_created_cluster >> get_and_add_ip_address_for_inbound_rules >> ssh_and_copy_pifile_to_hdfs