Skip to content

Commit

Permalink
Merge pull request apache#29130 [YAML] Guard javascript UDFs with exp…
Browse files Browse the repository at this point in the history
…erimental feature enablement.
  • Loading branch information
robertwb authored Oct 25, 2023
2 parents d6b3467 + 972a0fb commit 40e5c38
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,10 @@ def __init__(self, pcoll, exception_handling_args, upstream_errors=()):
self._exception_handling_args = exception_handling_args
self._upstream_errors = upstream_errors

@property
def pipeline(self):
return self._pvalue.pipeline

@property
def element_type(self):
return self._pcoll.element_type
Expand Down Expand Up @@ -2324,6 +2328,10 @@ def __init__(self, pvalue, exception_handling_args=None):
else:
self._pvalue = _PValueWithErrors(pvalue, exception_handling_args)

@property
def pipeline(self):
return self._pvalue.pipeline

@property
def element_type(self):
return self._pvalue.element_type
Expand Down
36 changes: 36 additions & 0 deletions sdks/python/apache_beam/yaml/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from apache_beam.options import pipeline_options


class YamlOptions(pipeline_options.PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--yaml_experimental_features',
dest='yaml_experimental_features',
action='append',
default=[],
help=('Enable yaml features ahead of them being declared stable.'))

@classmethod
def check_enabled(cls, pipeline, feature, description=None):
if feature not in pipeline._options.view_as(cls).yaml_experimental_features:
raise ValueError(
f'{description or feature} unsupported because '
f'{feature} is not set in --yaml_experimental_features option.')
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import options
from apache_beam.yaml import yaml_provider


Expand Down Expand Up @@ -257,6 +258,9 @@ def with_exception_handling(self, **kwargs):
@maybe_with_exception_handling_transform_fn
def _PyJsFilter(
pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
if language == 'javascript':
options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript')

try:
input_schema = dict(named_fields_from_element_type(pcoll.element_type))
except (TypeError, ValueError) as exn:
Expand Down Expand Up @@ -327,6 +331,9 @@ def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
def _PyJsMapToFields(pcoll, language='generic', **mapping_args):
input_schema, fields = normalize_fields(
pcoll, language=language, **mapping_args)
if language == 'javascript':
options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript')

original_fields = list(input_schema.keys())

return pcoll | beam.Select(
Expand Down
12 changes: 8 additions & 4 deletions sdks/python/apache_beam/yaml/yaml_udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def tearDown(self):

def test_map_to_fields_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
])) as p:
elements = p | beam.Create(self.data)
result = elements | YamlTransform(
'''
Expand Down Expand Up @@ -96,7 +97,8 @@ def test_map_to_fields_filter_inline_py(self):

def test_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
])) as p:
elements = p | beam.Create(self.data)
result = elements | YamlTransform(
'''
Expand Down Expand Up @@ -134,7 +136,8 @@ def test_filter_inline_py(self):

def test_filter_expression_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
])) as p:
elements = p | beam.Create(self.data)
result = elements | YamlTransform(
'''
Expand Down Expand Up @@ -183,7 +186,8 @@ def test_filter_inline_js_file(self):
self.fs.create(path).write(data.encode('utf8'))

with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
])) as p:
elements = p | beam.Create(self.data)
result = elements | YamlTransform(
f'''
Expand Down

0 comments on commit 40e5c38

Please sign in to comment.