Skip to content

Commit

Permalink
Merge PEM file download and SSH
Browse files Browse the repository at this point in the history
  • Loading branch information
rajaths010494 committed Sep 19, 2022
1 parent bf505c9 commit 8c845bc
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 36 deletions.
25 changes: 6 additions & 19 deletions astronomer/providers/apache/hive/example_dags/example_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None:
raise error


def create_key_pair() -> None:
def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None:
"""
Load the private_key from airflow variable and creates a pem_file
at /tmp/.
at /tmp/. SSH into the machine and execute the bash script from the list
of commands.
"""
# remove the file if it exists
if os.path.exists(f"/tmp/{PEM_FILENAME}.pem"):
Expand All @@ -221,17 +222,11 @@ def create_key_pair() -> None:

# write private key to file with 400 permissions
os.chmod(f"/tmp/{PEM_FILENAME}.pem", 0o400)
# check if the PEM file exists or not.
# Check if the PEM file exists or not.
if not os.path.exists(f"/tmp/{PEM_FILENAME}.pem"):
# if it doesn't exists raise an error.
# if it doesn't exists raise an error
raise AirflowException("PEM file wasn't copied properly.")


def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None:
"""
SSH into the machine and execute the bash script from the list
of commands.
"""
import paramiko

key = paramiko.RSAKey.from_private_key_file(kwargs["path_to_pem_file"])
Expand Down Expand Up @@ -296,13 +291,6 @@ def get_cluster_details(task_instance: Any) -> None:
default_args=default_args,
tags=["example", "async", "hive", "hive_partition"],
) as dag:
# [START howto_create_key_pair_file]
create_key_pair_file = PythonOperator(
task_id="create_key_pair_file",
python_callable=create_key_pair,
)
# [END howto_create_key_pair_file]

# [START howto_operator_emr_create_job_flow]
cluster_creator = EmrCreateJobFlowOperator(
task_id="cluster_creator",
Expand Down Expand Up @@ -388,8 +376,7 @@ def get_cluster_details(task_instance: Any) -> None:
)
# [END howto_operator_emr_terminate_job_flow]
(
create_key_pair_file
>> cluster_creator
cluster_creator
>> describe_created_cluster
>> get_and_add_ip_address_for_inbound_rules
>> ssh_and_copy_pifile_to_hdfs
Expand Down
21 changes: 4 additions & 17 deletions astronomer/providers/apache/livy/example_dags/example_livy.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,11 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None:
raise error


def create_key_pair() -> None:
def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None:
"""
Load the private_key from airflow variable and creates a pem_file
at /tmp/.
at /tmp/. SSH into the machine and execute the bash script from the list
of commands.
"""
# remove the file if it exists
if os.path.exists(f"/tmp/{PEM_FILENAME}.pem"):
Expand All @@ -189,12 +190,6 @@ def create_key_pair() -> None:
# if it doesn't exists raise an error.
raise AirflowException("PEM file wasn't copied properly.")


def ssh_and_run_command(task_instance: Any, **kwargs: Any) -> None:
"""
SSH into the machine and execute the bash script from the list
of commands.
"""
import paramiko

key = paramiko.RSAKey.from_private_key_file(kwargs["path_to_pem_file"])
Expand Down Expand Up @@ -262,13 +257,6 @@ def get_cluster_details(task_instance: Any) -> None:
default_args=default_args,
tags=["example", "async", "livy"],
) as dag:
# [START howto_create_key_pair_file]
create_key_pair_file = PythonOperator(
task_id="create_key_pair_file",
python_callable=create_key_pair,
)
# [END howto_create_key_pair_file]

# [START howto_operator_emr_create_job_flow]
cluster_creator = EmrCreateJobFlowOperator(
task_id="cluster_creator",
Expand Down Expand Up @@ -334,8 +322,7 @@ def get_cluster_details(task_instance: Any) -> None:
# [END howto_operator_emr_terminate_job_flow]

(
create_key_pair_file
>> cluster_creator
cluster_creator
>> describe_created_cluster
>> get_and_add_ip_address_for_inbound_rules
>> ssh_and_copy_pifile_to_hdfs
Expand Down

0 comments on commit 8c845bc

Please sign in to comment.