From fe9179c56ddc2fdf8e062a5344eb17932003fa13 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Mon, 16 Oct 2023 12:19:44 -0400 Subject: [PATCH 1/2] [yaml] support gcs location for main.py yaml input file Signed-off-by: Jeffrey Kinard --- sdks/python/apache_beam/yaml/main.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py index e2ec8df9cfc3..27b37ea81b89 100644 --- a/sdks/python/apache_beam/yaml/main.py +++ b/sdks/python/apache_beam/yaml/main.py @@ -20,6 +20,7 @@ import yaml import apache_beam as beam +from apache_beam.io.filesystems import FileSystems from apache_beam.typehints.schemas import LogicalType from apache_beam.typehints.schemas import MillisInstant from apache_beam.yaml import yaml_transform @@ -43,8 +44,8 @@ def _pipeline_spec_from_args(known_args): raise ValueError( "Exactly one of pipeline_spec or pipeline_spec_file must be set.") elif known_args.pipeline_spec_file: - with open(known_args.pipeline_spec_file) as fin: - pipeline_yaml = fin.read() + pipeline_yaml = FileSystems.open( + known_args.pipeline_spec_file).read().decode() elif known_args.pipeline_spec: pipeline_yaml = known_args.pipeline_spec else: From b69b834f91417edfd3b2223b2597753300d03196 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Fri, 20 Oct 2023 12:54:15 -0400 Subject: [PATCH 2/2] add with to file opening Signed-off-by: Jeffrey Kinard --- sdks/python/apache_beam/yaml/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py index 27b37ea81b89..331b9e7b3616 100644 --- a/sdks/python/apache_beam/yaml/main.py +++ b/sdks/python/apache_beam/yaml/main.py @@ -44,8 +44,8 @@ def _pipeline_spec_from_args(known_args): raise ValueError( "Exactly one of pipeline_spec or pipeline_spec_file must be set.") elif known_args.pipeline_spec_file: - pipeline_yaml = FileSystems.open( - known_args.pipeline_spec_file).read().decode() + with FileSystems.open(known_args.pipeline_spec_file) as fin: + pipeline_yaml = fin.read().decode() elif known_args.pipeline_spec: pipeline_yaml = known_args.pipeline_spec else: