diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py index 00da81f96e882..e0add3503c281 100644 --- a/airflow/providers/amazon/aws/operators/glue.py +++ b/airflow/providers/amazon/aws/operators/glue.py @@ -64,6 +64,7 @@ class GlueJobOperator(BaseOperator): (default: False) :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs. (default: False) :param update_config: If True, Operator will update job configuration. (default: False) + :param replace_script_file: If True, the script file will be replaced in S3. (default: False) :param stop_job_run_on_kill: If True, Operator will stop the job run when task is killed. """ @@ -105,6 +106,7 @@ def __init__( wait_for_completion: bool = True, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), verbose: bool = False, + replace_script_file: bool = False, update_config: bool = False, job_poll_interval: int | float = 6, stop_job_run_on_kill: bool = False, @@ -130,6 +132,7 @@ def __init__( self.wait_for_completion = wait_for_completion self.verbose = verbose self.update_config = update_config + self.replace_script_file = replace_script_file self.deferrable = deferrable self.job_poll_interval = job_poll_interval self.stop_job_run_on_kill = stop_job_run_on_kill @@ -143,7 +146,10 @@ def glue_job_hook(self) -> GlueJobHook: s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) script_name = os.path.basename(self.script_location) s3_hook.load_file( - self.script_location, self.s3_artifacts_prefix + script_name, bucket_name=self.s3_bucket + self.script_location, + self.s3_artifacts_prefix + script_name, + bucket_name=self.s3_bucket, + replace=self.replace_script_file, ) s3_script_location = f"s3://{self.s3_bucket}/{self.s3_artifacts_prefix}{script_name}" else: diff --git a/tests/providers/amazon/aws/operators/test_glue.py b/tests/providers/amazon/aws/operators/test_glue.py index 272c42e242780..e2fc7baf50d55 100644 --- a/tests/providers/amazon/aws/operators/test_glue.py +++ b/tests/providers/amazon/aws/operators/test_glue.py @@ -272,3 +272,26 @@ def test_killed_with_stop_job_run_on_kill( JobName=JOB_NAME, JobRunIds=[JOB_RUN_ID], ) + + @mock.patch.object(GlueJobHook, "get_job_state") + @mock.patch.object(GlueJobHook, "initialize_job") + @mock.patch.object(GlueJobHook, "get_conn") + @mock.patch.object(GlueJobHook, "conn") + @mock.patch.object(S3Hook, "load_file") + def test_replace_script_file( + self, mock_load_file, mock_conn, mock_get_connection, mock_initialize_job, mock_get_job_state + ): + glue = GlueJobOperator( + task_id=TASK_ID, + job_name=JOB_NAME, + script_location="folder/file", + s3_bucket="bucket_name", + iam_role_name="role_arn", + replace_script_file=True, + ) + mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID} + mock_get_job_state.return_value = "SUCCEEDED" + glue.execute(mock.MagicMock()) + mock_load_file.assert_called_once_with( + "folder/file", "artifacts/glue-scripts/file", bucket_name="bucket_name", replace=True + )