Skip to content

Commit

Permalink
Bugfix to correct GCSHook being called even when not required with Be…
Browse files Browse the repository at this point in the history
…amRunPythonPipelineOperator
  • Loading branch information
zstrathe committed Apr 3, 2024
1 parent bfcb765 commit 3036c0e
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion airflow/providers/apache/beam/operators/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,13 @@ def execute(self, context: Context):

def execute_sync(self, context: Context):
with ExitStack() as exit_stack:
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
if self.py_file.lower().startswith("gs://"):
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
tmp_gcs_file = exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
self.py_file = tmp_gcs_file.name
if self.snake_case_pipeline_options.get("requirements_file", "").startswith("gs://"):
if 'gcs_hook' not in locals():
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
tmp_req_file = exit_stack.enter_context(
gcs_hook.provide_file(object_url=self.snake_case_pipeline_options["requirements_file"])
)
Expand Down

0 comments on commit 3036c0e

Please sign in to comment.