-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[yaml] error_handling normalization follow-up
Signed-off-by: Jeffrey Kinard <[email protected]>
- Loading branch information
Showing
10 changed files
with
225 additions
and
111 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,8 +40,8 @@ | |
|
||
|
||
def check_output(expected: List[str]): | ||
def _check_inner(actual: PCollection[str]): | ||
formatted_actual = actual | beam.Map( | ||
def _check_inner(actual: List[PCollection[str]]): | ||
formatted_actual = actual | beam.Flatten() | beam.Map( | ||
lambda row: str(beam.Row(**row._asdict()))) | ||
assert_matches_stdout(formatted_actual, expected) | ||
|
||
|
@@ -59,6 +59,57 @@ def products_csv(): | |
]) | ||
|
||
|
||
def spanner_data(): | ||
return [{ | ||
'shipment_id': 'S1', | ||
'customer_id': 'C1', | ||
'shipment_date': '2023-05-01', | ||
'shipment_cost': 150.0, | ||
'customer_name': 'Alice', | ||
'customer_email': '[email protected]' | ||
}, | ||
{ | ||
'shipment_id': 'S2', | ||
'customer_id': 'C2', | ||
'shipment_date': '2023-06-12', | ||
'shipment_cost': 300.0, | ||
'customer_name': 'Bob', | ||
'customer_email': '[email protected]' | ||
}, | ||
{ | ||
'shipment_id': 'S3', | ||
'customer_id': 'C1', | ||
'shipment_date': '2023-05-10', | ||
'shipment_cost': 20.0, | ||
'customer_name': 'Alice', | ||
'customer_email': '[email protected]' | ||
}, | ||
{ | ||
'shipment_id': 'S4', | ||
'customer_id': 'C4', | ||
'shipment_date': '2024-07-01', | ||
'shipment_cost': 150.0, | ||
'customer_name': 'Derek', | ||
'customer_email': '[email protected]' | ||
}, | ||
{ | ||
'shipment_id': 'S5', | ||
'customer_id': 'C5', | ||
'shipment_date': '2023-05-09', | ||
'shipment_cost': 300.0, | ||
'customer_name': 'Erin', | ||
'customer_email': '[email protected]' | ||
}, | ||
{ | ||
'shipment_id': 'S6', | ||
'customer_id': 'C4', | ||
'shipment_date': '2024-07-02', | ||
'shipment_cost': 150.0, | ||
'customer_name': 'Derek', | ||
'customer_email': '[email protected]' | ||
}] | ||
|
||
|
||
def create_test_method( | ||
pipeline_spec_file: str, | ||
custom_preprocessors: List[Callable[..., Union[Dict, List]]]): | ||
|
@@ -84,9 +135,12 @@ def test_yaml_example(self): | |
pickle_library='cloudpickle', | ||
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( | ||
'options', {})))) as p: | ||
actual = yaml_transform.expand_pipeline(p, pipeline_spec) | ||
if not actual: | ||
actual = p.transforms_stack[0].parts[-1].outputs[None] | ||
actual = [yaml_transform.expand_pipeline(p, pipeline_spec)] | ||
if not actual[0]: | ||
actual = list(p.transforms_stack[0].parts[-1].outputs.values()) | ||
for transform in p.transforms_stack[0].parts[:-1]: | ||
if transform.transform.label == 'log_for_testing': | ||
actual += list(transform.outputs.values()) | ||
check_output(expected)(actual) | ||
|
||
return test_yaml_example | ||
|
@@ -155,9 +209,13 @@ def _wordcount_test_preprocessor( | |
env.input_file('kinglear.txt', '\n'.join(lines))) | ||
|
||
|
||
@YamlExamplesTestSuite.register_test_preprocessor( | ||
['test_simple_filter_yaml', 'test_simple_filter_and_combine_yaml']) | ||
def _file_io_write_test_preprocessor( | ||
@YamlExamplesTestSuite.register_test_preprocessor([ | ||
'test_simple_filter_yaml', | ||
'test_simple_filter_and_combine_yaml', | ||
'test_spanner_read_yaml', | ||
'test_spanner_write_yaml' | ||
]) | ||
def _io_write_test_preprocessor( | ||
test_spec: dict, expected: List[str], env: TestEnvironment): | ||
|
||
if pipeline := test_spec.get('pipeline', None): | ||
|
@@ -166,8 +224,8 @@ def _file_io_write_test_preprocessor( | |
transform['type'] = 'LogForTesting' | ||
transform['config'] = { | ||
k: v | ||
for k, | ||
v in transform.get('config', {}).items() if k.startswith('__') | ||
for (k, v) in transform.get('config', {}).items() | ||
if (k.startswith('__') or k == 'error_handling') | ||
} | ||
|
||
return test_spec | ||
|
@@ -191,7 +249,30 @@ def _file_io_read_test_preprocessor( | |
return test_spec | ||
|
||
|
||
@YamlExamplesTestSuite.register_test_preprocessor(['test_spanner_read_yaml']) | ||
def _spanner_io_read_test_preprocessor( | ||
test_spec: dict, expected: List[str], env: TestEnvironment): | ||
|
||
if pipeline := test_spec.get('pipeline', None): | ||
for transform in pipeline.get('transforms', []): | ||
if transform.get('type', '').startswith('ReadFromSpanner'): | ||
config = transform['config'] | ||
instance, database = config['instance_id'], config['database_id'] | ||
if table := config.get('table', None) is None: | ||
table = config.get('query', '').split('FROM')[-1].strip() | ||
transform['type'] = 'Create' | ||
transform['config'] = { | ||
k: v | ||
for k, v in config.items() if k.startswith('__') | ||
} | ||
transform['config']['elements'] = INPUT_TABLES[( | ||
str(instance), str(database), str(table))] | ||
|
||
return test_spec | ||
|
||
|
||
INPUT_FILES = {'products.csv': products_csv()} | ||
INPUT_TABLES = {('shipment-test', 'shipment', 'shipments'): spanner_data()} | ||
|
||
YAML_DOCS_DIR = os.path.join(os.path.dirname(__file__)) | ||
ExamplesTest = YamlExamplesTestSuite( | ||
|
@@ -205,6 +286,10 @@ def _file_io_read_test_preprocessor( | |
'AggregationExamplesTest', | ||
os.path.join(YAML_DOCS_DIR, '../transforms/aggregation/*.yaml')).run() | ||
|
||
IOTest = YamlExamplesTestSuite( | ||
'IOExamplesTest', os.path.join(YAML_DOCS_DIR, | ||
'../transforms/io/*.yaml')).run() | ||
|
||
if __name__ == '__main__': | ||
logging.getLogger().setLevel(logging.INFO) | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,10 @@ | |
pipeline: | ||
transforms: | ||
|
||
# Reading data from a Spanner database. The table used here has the following columns: | ||
# shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String) | ||
# ReadFromSpanner transform is called using project_id, instance_id, database_id and a query | ||
# A table with a list of columns can also be specified instead of a query | ||
# Reading data from a Spanner database. The table used here has the following columns: | ||
# shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String) | ||
# ReadFromSpanner transform is called using project_id, instance_id, database_id and a query | ||
# A table with a list of columns can also be specified instead of a query | ||
- type: ReadFromSpanner | ||
name: ReadShipments | ||
config: | ||
|
@@ -30,18 +30,18 @@ pipeline: | |
database_id: 'shipment' | ||
query: 'SELECT * FROM shipments' | ||
|
||
# Filtering the data based on a specific condition | ||
# Here, the condition is used to keep only the rows where the customer_id is 'C1' | ||
# Filtering the data based on a specific condition | ||
# Here, the condition is used to keep only the rows where the customer_id is 'C1' | ||
- type: Filter | ||
name: FilterShipments | ||
input: ReadShipments | ||
config: | ||
language: python | ||
keep: "customer_id == 'C1'" | ||
|
||
# Mapping the data fields and applying transformations | ||
# A new field 'shipment_cost_category' is added with a custom transformation | ||
# A callable is defined to categorize shipment cost | ||
# Mapping the data fields and applying transformations | ||
# A new field 'shipment_cost_category' is added with a custom transformation | ||
# A callable is defined to categorize shipment cost | ||
- type: MapToFields | ||
name: MapFieldsForSpanner | ||
input: FilterShipments | ||
|
@@ -65,16 +65,15 @@ pipeline: | |
else: | ||
return 'High Cost' | ||
# Writing the transformed data to a CSV file | ||
# Writing the transformed data to a CSV file | ||
- type: WriteToCsv | ||
name: WriteBig | ||
input: MapFieldsForSpanner | ||
config: | ||
path: shipments.csv | ||
|
||
|
||
# On executing the above pipeline, a new CSV file is created with the following records | ||
|
||
# On executing the above pipeline, a new CSV file is created with the following records | ||
# Expected: | ||
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Medium Cost') | ||
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Low Cost') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,8 @@ | |
pipeline: | ||
transforms: | ||
|
||
# Step 1: Creating rows to be written to Spanner | ||
# The element names correspond to the column names in the Spanner table | ||
# Step 1: Creating rows to be written to Spanner | ||
# The element names correspond to the column names in the Spanner table | ||
- type: Create | ||
name: CreateRows | ||
config: | ||
|
@@ -31,10 +31,10 @@ pipeline: | |
customer_name: "Erin" | ||
customer_email: "[email protected]" | ||
|
||
# Step 2: Writing the created rows to a Spanner database | ||
# We require the project ID, instance ID, database ID and table ID to connect to Spanner | ||
# Error handling can be specified optionally to ensure any failed operations aren't lost | ||
# The failed data is passed on in the pipeline and can be handled | ||
# Step 2: Writing the created rows to a Spanner database | ||
# We require the project ID, instance ID, database ID and table ID to connect to Spanner | ||
# Error handling can be specified optionally to ensure any failed operations aren't lost | ||
# The failed data is passed on in the pipeline and can be handled | ||
- type: WriteToSpanner | ||
name: WriteSpanner | ||
input: CreateRows | ||
|
@@ -46,8 +46,11 @@ pipeline: | |
error_handling: | ||
output: my_error_output | ||
|
||
# Step 3: Writing the failed records to a JSON file | ||
# Step 3: Writing the failed records to a JSON file | ||
- type: WriteToJson | ||
input: WriteSpanner.my_error_output | ||
config: | ||
path: errors.json | ||
|
||
# Expected: | ||
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='[email protected]') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
import functools | ||
import inspect | ||
from typing import NamedTuple | ||
|
||
import apache_beam as beam | ||
from apache_beam.typehints.row_type import RowTypeConstraint | ||
|
||
|
||
class ErrorHandlingConfig(NamedTuple): | ||
"""This option specifies whether and where to output error rows. | ||
Args: | ||
output (str): Name to use for the output error collection | ||
""" | ||
output: str | ||
# TODO: Other parameters are valid here too, but not common to Java. | ||
|
||
|
||
def exception_handling_args(error_handling_spec): | ||
if error_handling_spec: | ||
return { | ||
'dead_letter_tag' if k == 'output' else k: v | ||
for (k, v) in error_handling_spec.items() | ||
} | ||
else: | ||
return None | ||
|
||
|
||
def map_errors_to_standard_format(input_type): | ||
# TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple. | ||
|
||
return beam.Map( | ||
lambda x: beam.Row( | ||
element=x[0], msg=str(x[1][1]), stack=''.join(x[1][2])) | ||
).with_output_types( | ||
RowTypeConstraint.from_fields([("element", input_type), ("msg", str), | ||
("stack", str)])) | ||
|
||
|
||
def maybe_with_exception_handling(inner_expand): | ||
def expand(self, pcoll): | ||
wrapped_pcoll = beam.core._MaybePValueWithErrors( | ||
pcoll, self._exception_handling_args) | ||
return inner_expand(self, wrapped_pcoll).as_result( | ||
map_errors_to_standard_format(pcoll.element_type)) | ||
|
||
return expand | ||
|
||
|
||
def maybe_with_exception_handling_transform_fn(transform_fn): | ||
@functools.wraps(transform_fn) | ||
def expand(pcoll, error_handling=None, **kwargs): | ||
wrapped_pcoll = beam.core._MaybePValueWithErrors( | ||
pcoll, exception_handling_args(error_handling)) | ||
return transform_fn(wrapped_pcoll, **kwargs).as_result( | ||
map_errors_to_standard_format(pcoll.element_type)) | ||
|
||
original_signature = inspect.signature(transform_fn) | ||
new_parameters = list(original_signature.parameters.values()) | ||
error_handling_param = inspect.Parameter( | ||
'error_handling', | ||
inspect.Parameter.KEYWORD_ONLY, | ||
default=None, | ||
annotation=ErrorHandlingConfig) | ||
if new_parameters[-1].kind == inspect.Parameter.VAR_KEYWORD: | ||
new_parameters.insert(-1, error_handling_param) | ||
else: | ||
new_parameters.append(error_handling_param) | ||
expand.__signature__ = original_signature.replace(parameters=new_parameters) | ||
|
||
return expand |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.