From c281852a58ffb666fdc14badac93deed60682f48 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 30 Jan 2024 16:17:01 -0800 Subject: [PATCH 1/2] [YAML] Add documentation to Beam site. --- .../en/documentation/sdks/yaml-combine.md | 166 +++++ .../en/documentation/sdks/yaml-errors.md | 200 ++++++ .../documentation/sdks/yaml-inline-python.md | 202 ++++++ .../content/en/documentation/sdks/yaml-udf.md | 248 ++++++++ .../content/en/documentation/sdks/yaml.md | 587 ++++++++++++++++++ website/www/site/data/pipelines.yaml | 3 + .../partials/section-menu/en/sdks.html | 11 + .../site/static/images/logos/sdks/yaml.png | Bin 0 -> 6197 bytes 8 files changed, 1417 insertions(+) create mode 100644 website/www/site/content/en/documentation/sdks/yaml-combine.md create mode 100644 website/www/site/content/en/documentation/sdks/yaml-errors.md create mode 100644 website/www/site/content/en/documentation/sdks/yaml-inline-python.md create mode 100644 website/www/site/content/en/documentation/sdks/yaml-udf.md create mode 100644 website/www/site/content/en/documentation/sdks/yaml.md create mode 100644 website/www/site/static/images/logos/sdks/yaml.png diff --git a/website/www/site/content/en/documentation/sdks/yaml-combine.md b/website/www/site/content/en/documentation/sdks/yaml-combine.md new file mode 100644 index 000000000000..e2fef304fb0a --- /dev/null +++ b/website/www/site/content/en/documentation/sdks/yaml-combine.md @@ -0,0 +1,166 @@ + + +# Beam YAML Aggregations + +Beam YAML has EXPERIMENTAL ability to do aggregations to group and combine +values across records. The is accomplished via the `Combine` transform type. +Currently `Combine` needs to be in the `yaml_experimental_features` +option to use this transform. + +For example, one can write + +``` +- type: Combine + config: + group_by: col1 + combine: + total: + value: col2 + fn: + type: sum +``` + +If the function has no configuration requirements, it can be provided directly +as a string + +``` +- type: Combine + config: + group_by: col1 + combine: + total: + value: col2 + fn: sum +``` + +This can be simplified further if the output field name is the same as the input +field name + +``` +- type: Combine + config: + group_by: col1 + combine: + col2: sum +``` + +One can aggregate over may fields at once + +``` +- type: Combine + config: + group_by: col1 + combine: + col2: sum + col3: max +``` + +and/or group by more than one field + +``` +- type: Combine + config: + group_by: [col1, col2] + combine: + col3: sum +``` + +or none at all (which will result in a global combine with a single output) + +``` +- type: Combine + config: + group_by: [] + combine: + col2: sum + col3: max +``` + +## Windowed aggregation + +As with all transforms, `Combine` can take a windowing parameter + +``` +- type: Combine + windowing: + type: fixed + size: 60 + config: + group_by: col1 + combine: + col2: sum + col3: max +``` + +If no windowing specification is provided, it inherits the windowing +parameters from upstream, e.g. + +``` +- type: WindowInto + windowing: + type: fixed + size: 60 +- type: Combine + config: + group_by: col1 + combine: + col2: sum + col3: max +``` + +is equivalent to the previous example. + + +## Custom aggregation functions + +One can use aggregation functions defined in Python by setting the language +parameter. + +``` +- type: Combine + config: + language: python + group_by: col1 + combine: + biggest: + value: "col2 + col2" + fn: + type: 'apache_beam.transforms.combiners.TopCombineFn' + config: + n: 10 +``` + +## SQL-style aggregations + +By setting the language to SQL, one can provide full SQL snippets as the +combine fn. + +``` +- type: Combine + config: + language: sql + group_by: col1 + combine: + num_values: "count(*)" + total: "sum(col2)" +``` + +One can of course also use the `Sql` transform type and provide a query +directly. diff --git a/website/www/site/content/en/documentation/sdks/yaml-errors.md b/website/www/site/content/en/documentation/sdks/yaml-errors.md new file mode 100644 index 000000000000..aec602393674 --- /dev/null +++ b/website/www/site/content/en/documentation/sdks/yaml-errors.md @@ -0,0 +1,200 @@ + + +# Beam YAML Error Handling + +The larger one's pipeline gets, the more common it is to encounter "exceptional" +data that is malformatted, doesn't handle the proper preconditions, or otherwise +breaks during processing. Generally any such record will cause the pipeline to +permanently fail, but often it is desirable to allow the pipeline to continue, +re-directing bad records to another path for special handling or simply +recording them for later off-line analysis. This is often called the +"dead letter queue" pattern. + +Beam YAML has special support for this pattern if the transform supports a +`error_handling` config parameter with an `output` field. For example, +the following code will write all "good" processed records to one file and +any "bad" records to a separate file. + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + + - type: MapToFields + input: ReadFromCsv + config: + language: python + fields: + col1: col1 + # This could raise a divide-by-zero error. + ratio: col2 / col3 + error_handling: + output: my_error_output + + - type: WriteToJson + input: MapToFields + config: + path: /path/to/output.json + + - type: WriteToJson + name: WriteErrorsToJson + input: MapToFields.my_error_output + config: + path: /path/to/errors.json +``` + +Note that with `error_handling` declared, `MapToFields.my_error_output` +**must** be consumed; to ignore it will be an error. Any use is fine, e.g. +logging the bad records to stdout would be sufficient (though not recommended +for a robust pipeline). + +Note also that the exact format of the error outputs is still being finalized. +They can be safely printed and written to outputs, but their precise schema +may change in a future version of Beam and should not yet be depended on. + +Some transforms allow for extra arguments in their error_handling config, e.g. +for Python functions one can give a `threshold` which limits the relative number +of records that can be bad before considering the entire pipeline a failure + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + + - type: MapToFields + input: ReadFromCsv + config: + language: python + fields: + col1: col1 + # This could raise a divide-by-zero error. + ratio: col2 / col3 + error_handling: + output: my_error_output + # If more than 10% of records throw an error, stop the pipeline. + threshold: 0.1 + + - type: WriteToJson + input: MapToFields + config: + path: /path/to/output.json + + - type: WriteToJson + name: WriteErrorsToJson + input: MapToFields.my_error_output + config: + path: /path/to/errors.json +``` + +One can do arbitrary further processing on these failed records if desired, +e.g. + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + + - type: MapToFields + name: ComputeRatio + input: ReadFromCsv + config: + language: python + fields: + col1: col1 + # This could raise a divide-by-zero error. + ratio: col2 / col3 + error_handling: + output: my_error_output + + - type: MapToFields + name: ComputeRatioForBadRecords + input: ComputeRatio.my_error_output + config: + language: python + fields: + col1: col1 + ratio: col2 / (col3 + 1) + error_handling: + output: still_bad + + - type: WriteToJson + # Takes as input everything from the "success" path of both transforms. + input: [ComputeRatio, ComputeRatioForBadRecords] + config: + path: /path/to/output.json + + - type: WriteToJson + name: WriteErrorsToJson + # These failed the first and the second transform. + input: ComputeRatioForBadRecords.still_bad + config: + path: /path/to/errors.json +``` + +When using the `chain` syntax, the required error consumption can happen +in an `extra_transforms` block. + +``` +pipeline: + type: chain + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + + - type: MapToFields + name: SomeStep + config: + language: python + fields: + col1: col1 + # This could raise a divide-by-zero error. + ratio: col2 / col3 + error_handling: + output: errors + + - type: MapToFields + name: AnotherStep + config: + language: python + fields: + col1: col1 + # This could raise a divide-by-zero error. + inverse_ratio: 1 / ratio + error_handling: + output: errors + + - type: WriteToJson + config: + path: /path/to/output.json + + extra_transforms: + - type: WriteToJson + name: WriteErrors + input: [SomeStep.errors, AnotherStep.errors] + config: + path: /path/to/errors.json +``` diff --git a/website/www/site/content/en/documentation/sdks/yaml-inline-python.md b/website/www/site/content/en/documentation/sdks/yaml-inline-python.md new file mode 100644 index 000000000000..72b8b76c58a2 --- /dev/null +++ b/website/www/site/content/en/documentation/sdks/yaml-inline-python.md @@ -0,0 +1,202 @@ + + +# Using PyTransform form YAML + +Beam YAML provides the ability to easily invoke Python transforms via the +`PyTransform` type, simply referencing them by fully qualified name. +For example, + +``` +- type: PyTransform + config: + constructor: apache_beam.pkg.module.SomeTransform + args: [1, 'foo'] + kwargs: + baz: 3 +``` + +will invoke the transform `apache_beam.pkg.mod.SomeTransform(1, 'foo', baz=3)`. +This fully qualified name can be any PTransform class or other callable that +returns a PTransform. Note, however, that PTransforms that do not accept or +return schema'd data may not be as useable to use from YAML. +Restoring the schema-ness after a non-schema returning transform can be done +by using the `callable` option on `MapToFields` which takes the entire element +as an input, e.g. + +``` +- type: PyTransform + config: + constructor: apache_beam.pkg.module.SomeTransform + args: [1, 'foo'] + kwargs: + baz: 3 +- type: MapToFields + config: + language: python + fields: + col1: + callable: 'lambda element: element.col1' + output_type: string + col2: + callable: 'lambda element: element.col2' + output_type: integer +``` + +This can be used to call arbitrary transforms in the Beam SDK, e.g. + +``` +pipeline: + transforms: + - type: PyTransform + name: ReadFromTsv + input: {} + config: + constructor: apache_beam.io.ReadFromCsv + kwargs: + path: '/path/to/*.tsv' + sep: '\t' + skip_blank_lines: True + true_values: ['yes'] + false_values: ['no'] + comment: '#' + on_bad_lines: 'skip' + binary: False + splittable: False +``` + + +## Defining a transform inline using `__constructor__` + +If the desired transform does not exist, one can define it inline as well. +This is done with the special `__constructor__` keywords, +similar to how cross-language transforms are done. + +With the `__constuctor__` keyword, one defines a Python callable that, on +invocation, *returns* the desired transform. The first argument (or `source` +keyword argument, if there are no positional arguments) +is interpreted as the Python code. For example + +``` +- type: PyTransform + config: + constructor: __constructor__ + kwargs: + source: | + import apache_beam as beam + + def create_my_transform(inc): + return beam.Map(lambda x: beam.Row(a=x.col2 + inc)) + + inc: 10 +``` + +will apply `beam.Map(lambda x: beam.Row(a=x.col2 + 10))` to the incoming +PCollection. + +As a class object can be invoked as its own constructor, this allows one to +define a `beam.PTransform` inline, e.g. + +``` +- type: PyTransform + config: + constructor: __constructor__ + kwargs: + source: | + class MyPTransform(beam.PTransform): + def __init__(self, inc): + self._inc = inc + def expand(self, pcoll): + return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + self._inc)) + + inc: 10 +``` + +which works exactly as one would expect. + + +## Defining a transform inline using `__callable__` + +The `__callable__` keyword works similarly, but instead of defining a +callable that returns an applicable `PTransform` one simply defines the +expansion to be performed as a callable. This is analogous to BeamPython's +`ptransform.ptransform_fn` decorator. + +In this case one can simply write + +``` +- type: PyTransform + config: + constructor: __callable__ + kwargs: + source: | + def my_ptransform(pcoll, inc): + return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + inc)) + + inc: 10 +``` + + +# External transforms + +One can also invoke PTransforms define elsewhere via a `python` provider, +for example + +``` +pipeline: + transforms: + - ... + - type: MyTransform + config: + kwarg: whatever + +providers: + - ... + - type: python + input: ... + config: + packages: + - 'some_pypi_package>=version' + transforms: + MyTransform: 'pkg.module.MyTransform' +``` + +These can be defined inline as well, with or without dependencies, e.g. + +``` +pipeline: + transforms: + - ... + - type: ToCase + input: ... + config: + upper: True + +providers: + - type: python + config: {} + transforms: + 'ToCase': | + @beam.ptransform_fn + def ToCase(pcoll, upper): + if upper: + return pcoll | beam.Map(lambda x: str(x).upper()) + else: + return pcoll | beam.Map(lambda x: str(x).lower()) +``` diff --git a/website/www/site/content/en/documentation/sdks/yaml-udf.md b/website/www/site/content/en/documentation/sdks/yaml-udf.md new file mode 100644 index 000000000000..74b95d3cab2a --- /dev/null +++ b/website/www/site/content/en/documentation/sdks/yaml-udf.md @@ -0,0 +1,248 @@ + + +# Beam YAML mappings + +Beam YAML has the ability to do simple transformations which can be used to +get data into the correct shape. The simplest of these is `MaptoFields` +which creates records with new fields defined in terms of the input fields. + +## Field renames + +To rename fields one can write + +``` +- type: MapToFields + config: + fields: + new_col1: col1 + new_col2: col2 +``` + +will result in an output where each record has two fields, +`new_col1` and `new_col2`, whose values are those of `col1` and `col2` +respectively (which are the names of two fields from the input schema). + +One can specify the append parameter which indicates the original fields should +be retained similar to the use of `*` in an SQL select statement. For example + +``` +- type: MapToFields + config: + append: true + fields: + new_col1: col1 + new_col2: col2 +``` + +will output records that have `new_col1` and `new_col2` as *additional* +fields. When the append field is specified, one can drop fields as well, e.g. + +``` +- type: MapToFields + config: + append: true + drop: + - col3 + fields: + new_col1: col1 + new_col2: col2 +``` + +which includes all original fiels *except* col3 in addition to outputting the +two new ones. + + +## Mapping functions + +Of course one may want to do transformations beyond just dropping and renaming +fields. Beam YAML has the ability to inline simple UDFs. +This requires a language specification. For example, we can provide a +Python expression referencing the input fields + +``` +- type: MapToFields + config: + language: python + fields: + new_col: "col1.upper()" + another_col: "col2 + col3" +``` + +In addition, one can provide a full Python callable that takes the row as an +argument to do more complex mappings +(see [PythonCallableSource](https://beam.apache.org/releases/pydoc/current/apache_beam.utils.python_callable.html#apache_beam.utils.python_callable.PythonCallableWithSource) +for acceptable formats). Thus one can write + +``` +- type: MapToFields + config: + language: python + fields: + new_col: + callable: | + import re + def my_mapping(row): + if re.match("[0-9]+", row.col1) and row.col2 > 0: + return "good" + else: + return "bad" +``` + +Once one reaches a certain level of complexity, it may be preferable to package +this up as a dependency and simply refer to it by fully qualified name, e.g. + +``` +- type: MapToFields + config: + language: python + fields: + new_col: + callable: pkg.module.fn +``` + +Currently, in addition to Python, Java, SQL, and JavaScript (experimental) +expressions are supported as well + +``` +- type: MapToFields + config: + language: sql + fields: + new_col: "UPPER(col1)" + another_col: "col2 + col3" +``` + +## FlatMap + +Sometimes it may be desirable to emit more (or less) than one record for each +input record. This can be accomplished by mapping to an iterable type and +following the mapping with an Explode operation, e.g. + +``` +- type: MapToFields + config: + language: python + fields: + new_col: "[col1.upper(), col1.lower(), col1.title()]" + another_col: "col2 + col3" +- type: Explode + config: + fields: new_col +``` + +will result in three output records for every input record. + +If more than one record is to be exploded, one must specify whether the cross +product over all fields should be taken. For example + +``` +- type: MapToFields + config: + language: python + fields: + new_col: "[col1.upper(), col1.lower(), col1.title()]" + another_col: "[col2 - 1, col2, col2 + 1]" +- type: Explode + config: + fields: [new_col, another_col] + cross_product: true +``` + +will emit nine records whereas + +``` +- type: MapToFields + config: + language: python + fields: + new_col: "[col1.upper(), col1.lower(), col1.title()]" + another_col: "[col2 - 1, col2, col2 + 1]" +- type: Explode + config: + fields: [new_col, another_col] + cross_product: false +``` + +will only emit three. + +The `Explode` operation can be used on its own if the field in question is +already an iterable type. + +``` +- type: Explode + config: + fields: [col1] +``` + +## Filtering + +Sometimes it can be desirable to only keep records that satisfy a certain +criteria. This can be accomplished with a `Filter` transform, e.g. + +``` +- type: Filter + config: + language: sql + keep: "col2 > 0" +``` + +## Types + +Beam will try to infer the types involved in the mappings, but sometimes this +is not possible. In these cases one can explicitly denote the expected output +type, e.g. + +``` +- type: MapToFields + config: + language: python + fields: + new_col: + expression: "col1.upper()" + output_type: string +``` + +The expected type is given in json schema notation, with the addition that +a top-level basic types may be given as a literal string rather than requiring +a `{type: 'basic_type_name'}` nesting. + +``` +- type: MapToFields + config: + language: python + fields: + new_col: + expression: "col1.upper()" + output_type: string + another_col: + expression: "beam.Row(a=col1, b=[col2])" + output_type: + type: 'object' + properties: + a: + type: 'string' + b: + type: 'array' + items: + type: 'number' +``` + +This can be especially useful to resolve errors involving the inability to +handle the `beam:logical:pythonsdk_any:v1` type. diff --git a/website/www/site/content/en/documentation/sdks/yaml.md b/website/www/site/content/en/documentation/sdks/yaml.md new file mode 100644 index 000000000000..c278c7755ba1 --- /dev/null +++ b/website/www/site/content/en/documentation/sdks/yaml.md @@ -0,0 +1,587 @@ + + +# Beam YAML API + +While Beam provides powerful APIs for authoring sophisticated data +processing pipelines, it often still has too high a barrier for +getting started and authoring simple pipelines. Even setting up the +environment, installing the dependencies, and setting up the project +can be an overwhelming amount of boilerplate for some (though +https://beam.apache.org/blog/beam-starter-projects/ has gone a long +way in making this easier). + +Here we provide a simple declarative syntax for describing pipelines +that does not require coding experience or learning how to use an +SDK—any text editor will do. +Some installation may be required to actually *execute* a pipeline, but +we envision various services (such as Dataflow) to accept yaml pipelines +directly obviating the need for even that in the future. +We also anticipate the ability to generate code directly from these +higher-level yaml descriptions, should one want to graduate to a full +Beam SDK (and possibly the other direction as well as far as possible). + +Though we intend this syntax to be easily authored (and read) directly by +humans, this may also prove a useful intermediate representation for +tools to use as well, either as output (e.g. a pipeline authoring GUI) +or consumption (e.g. a lineage analysis tool) and expect it to be more +easily manipulated and semantically meaningful than the Beam protos +themselves (which concern themselves more with execution). + +It should be noted that everything here is still under development, but any +features already included are considered stable. Feedback is welcome at +dev@apache.beam.org. + +## Running pipelines + +The Beam yaml parser is currently included as part of the Apache Beam Python SDK. +This can be installed (e.g. within a virtual environment) as + +``` +pip install apache_beam[yaml,gcp] +``` + +In addition, several of the provided transforms (such as SQL) are implemented +in Java and their expansion will require a working Java interpeter. (The +requisite artifacts will be automatically downloaded from the apache maven +repositories, so no further installs will be required.) +Docker is also currently required for local execution of these +cross-language-requiring transforms, but not for submission to a non-local +runner such as Flink or Dataflow. + +Once the prerequisites are installed, you can execute a pipeline defined +in a yaml file as + +``` +python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/pipeline.yaml [other pipeline options such as the runner] +``` + +You can do a dry-run of your pipeline using the render runner to see what the +execution graph is, e.g. + +``` +python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/pipeline.yaml --runner=apache_beam.runners.render.RenderRunner --render_output=out.png [--render_port=0] +``` + +(This requires [Graphviz](https://graphviz.org/download/) to be installed to render the pipeline.) + +We intend to support running a pipeline on Dataflow by directly passing the +yaml specification to a template, no local installation of the Beam SDKs required. + +## Example pipelines + +Here is a simple pipeline that reads some data from csv files and +writes it out in json format. + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + - type: WriteToJson + config: + path: /path/to/output.json + input: ReadFromCsv +``` + +We can also add a transformation + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + - type: Filter + config: + language: python + keep: "col3 > 100" + input: ReadFromCsv + - type: WriteToJson + config: + path: /path/to/output.json + input: Filter +``` + +or two. + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + - type: Filter + config: + language: python + keep: "col3 > 100" + input: ReadFromCsv + - type: Sql + config: + query: "select col1, count(*) as cnt from PCOLLECTION group by col1" + input: Filter + - type: WriteToJson + config: + path: /path/to/output.json + input: Sql +``` + +Transforms can be named to help with monitoring and debugging. + +``` +pipeline: + transforms: + - type: ReadFromCsv + name: ReadMyData + config: + path: /path/to/input*.csv + - type: Filter + name: KeepBigRecords + config: + language: python + keep: "col3 > 100" + input: ReadMyData + - type: Sql + name: MySqlTransform + config: + query: "select col1, count(*) as cnt from PCOLLECTION group by col1" + input: KeepBigRecords + - type: WriteToJson + name: WriteTheOutput + config: + path: /path/to/output.json + input: MySqlTransform +``` + +(This is also needed to disambiguate if more than one transform of the same +type is used.) + +If the pipeline is linear, we can let the inputs be implicit by designating +the pipeline as a `chain` type. + +``` +pipeline: + type: chain + + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + - type: Filter + config: + language: python + keep: "col3 > 100" + - type: Sql + name: MySqlTransform + config: + query: "select col1, count(*) as cnt from PCOLLECTION group by col1" + - type: WriteToJson + config: + path: /path/to/output.json +``` + +As syntactic sugar, we can name the first and last transforms in our pipeline +as `source` and `sink`. + +``` +pipeline: + type: chain + + source: + type: ReadFromCsv + config: + path: /path/to/input*.csv + + transforms: + - type: Filter + config: + language: python + keep: "col3 > 100" + + - type: Sql + name: MySqlTransform + config: + query: "select col1, count(*) as cnt from PCOLLECTION group by col1" + + sink: + type: WriteToJson + config: + path: /path/to/output.json +``` + +Arbitrary non-linear pipelines are supported as well, though in this case +inputs must be explicitly named. +Here we read two sources, join them, and write two outputs. + +``` +pipeline: + transforms: + - type: ReadFromCsv + name: ReadLeft + config: + path: /path/to/left*.csv + + - type: ReadFromCsv + name: ReadRight + config: + path: /path/to/right*.csv + + - type: Sql + config: + query: select A.col1, B.col2 from A join B using (col3) + input: + A: ReadLeft + B: ReadRight + + - type: WriteToJson + name: WriteAll + input: Sql + config: + path: /path/to/all.json + + - type: Filter + name: FilterToBig + input: Sql + config: + language: python + keep: "col2 > 100" + + - type: WriteToCsv + name: WriteBig + input: FilterToBig + config: + path: /path/to/big.csv +``` + +One can, however, nest `chains` within a non-linear pipeline. +For example, here `ExtraProcessingForBigRows` is itself a "chain" transform +that has a single input and contains its own sink. + +``` +pipeline: + transforms: + - type: ReadFromCsv + name: ReadLeft + config: + path: /path/to/left*.csv + + - type: ReadFromCsv + name: ReadRight + config: + path: /path/to/right*.csv + + - type: Sql + config: + query: select A.col1, B.col2 from A join B using (col3) + input: + A: ReadLeft + B: ReadRight + + - type: WriteToJson + name: WriteAll + input: Sql + config: + path: /path/to/all.json + + - type: chain + name: ExtraProcessingForBigRows + input: Sql + transforms: + - type: Filter + config: + language: python + keep: "col2 > 100" + - type: Filter + config: + language: python + keep: "len(col1) > 10" + - type: Filter + config: + language: python + keep: "col1 > 'z'" + sink: + type: WriteToCsv + config: + path: /path/to/big.csv +``` + +## Windowing + +This API can be used to define both streaming and batch pipelines. +In order to meaningfully aggregate elements in a streaming pipeline, +some kind of windowing is typically required. Beam's +[windowing](https://beam.apache.org/documentation/programming-guide/#windowing) +and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers) +can be declared using the same WindowInto transform available in all other +SDKs. + +``` +pipeline: + type: chain + transforms: + - type: ReadFromPubSub + config: + topic: myPubSubTopic + format: json + schema: + type: object + properties: + col1: {type: string} + col2: {type: integer} + col3: {type: number} + - type: WindowInto + windowing: + type: fixed + size: 60s + - type: SomeGroupingTransform + config: + arg: ... + - type: WriteToPubSub + config: + topic: anotherPubSubTopic + format: json +``` + +Rather than using an explicit `WindowInto` operation, one may instead tag a +transform itself with a specified windowing which will cause its inputs +(and hence the transform itself) to be applied with that windowing. + +``` +pipeline: + type: chain + transforms: + - type: ReadFromPubSub + config: + topic: myPubSubTopic + format: ... + schema: ... + - type: SomeGroupingTransform + config: + arg: ... + windowing: + type: sliding + size: 60s + period: 10s + - type: WriteToPubSub + config: + topic: anotherPubSubTopic + format: json +``` + +Note that the `Sql` operation itself is often a from of aggregation, and +applying a windowing (or consuming an already windowed input) will cause all +grouping to be done per window. + +``` +pipeline: + type: chain + transforms: + - type: ReadFromPubSub + config: + topic: myPubSubTopic + format: ... + schema: ... + - type: Sql + config: + query: "select col1, count(*) as c from PCOLLECTION" + windowing: + type: sessions + gap: 60s + - type: WriteToPubSub + config: + topic: anotherPubSubTopic + format: json +``` + +The specified windowing is applied to all inputs, in this case resulting in +a join per window. + +``` +pipeline: + transforms: + - type: ReadFromPubSub + name: ReadLeft + config: + topic: leftTopic + format: ... + schema: ... + + - type: ReadFromPubSub + name: ReadRight + config: + topic: rightTopic + format: ... + schema: ... + + - type: Sql + config: + query: select A.col1, B.col2 from A join B using (col3) + input: + A: ReadLeft + B: ReadRight + windowing: + type: fixed + size: 60s +``` + +For a transform with no inputs, the specified windowing is instead applied to +its output(s). As per the Beam model, the windowing is then inherited by all +consuming operations. This is especially useful for root operations like Read. + +``` +pipeline: + type: chain + transforms: + - type: ReadFromPubSub + config: + topic: myPubSubTopic + format: ... + schema: ... + windowing: + type: fixed + size: 60s + - type: Sql + config: + query: "select col1, count(*) as c from PCOLLECTION" + - type: WriteToPubSub + config: + topic: anotherPubSubTopic + format: json +``` + +One can also specify windowing at the top level of a pipeline (or composite), +which is a shorthand to simply applying this same windowing to all root +operations (that don't otherwise specify their own windowing), +and can be an effective way to apply it everywhere. + +``` +pipeline: + type: chain + transforms: + - type: ReadFromPubSub + config: + topic: myPubSubTopic + format: ... + schema: ... + - type: Sql + config: + query: "select col1, count(*) as c from PCOLLECTION" + - type: WriteToPubSub + config: + topic: anotherPubSubTopic + format: json + windowing: + type: fixed + size: 60 +``` + +Note that all these windowing specifications are compatible with the `source` +and `sink` syntax as well + +``` +pipeline: + type: chain + + source: + type: ReadFromPubSub + config: + topic: myPubSubTopic + format: ... + schema: ... + windowing: + type: fixed + size: 10s + + transforms: + - type: Sql + config: + query: "select col1, count(*) as c from PCOLLECTION" + + sink: + type: WriteToCsv + config: + path: /path/to/output.json + windowing: + type: fixed + size: 5m +``` + + +## Providers + +Though we aim to offer a large suite of built-in transforms, it is inevitable +that people will want to be able to author their own. This is made possible +through the notion of Providers which leverage expansion services and +schema transforms. + +For example, one could build a jar that vends a +[cross language transform](https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/) +or [schema transform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.html) +and then use it in a transform as follows + +``` +pipeline: + type: chain + source: + type: ReadFromCsv + config: + path: /path/to/input*.csv + + transforms: + - type: MyCustomTransform + config: + arg: whatever + + sink: + type: WriteToJson + config: + path: /path/to/output.json + +providers: + - type: javaJar + jar: /path/or/url/to/myExpansionService.jar + transforms: + MyCustomTransform: "urn:registered:in:expansion:service" +``` + +Arbitrary Python transforms can be provided as well, using the syntax + +``` +providers: + - type: pythonPackage + packages: + - my_pypi_package>=version + - /path/to/local/package.zip + transforms: + MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable" +``` + +## Other Resources + +* [Example pipelines](https://gist.github.com/robertwb/2cb26973f1b1203e8f5f8f88c5764da0) +* [More examples](https://github.com/Polber/beam/tree/jkinard/bug-bash/sdks/python/apache_beam/yaml/examples) +* [Transform glossary](https://gist.github.com/robertwb/64e2f51ff88320eeb6ffd96634202df7) + +Additional documentation in this directory + +* [Mapping](yaml_mapping.md) +* [Aggregation](yaml_combine.md) +* [Error handling](yaml_errors.md) +* [Inlining Python](inline_python.md) diff --git a/website/www/site/data/pipelines.yaml b/website/www/site/data/pipelines.yaml index 42fcd2955648..e1dbb67c6a24 100644 --- a/website/www/site/data/pipelines.yaml +++ b/website/www/site/data/pipelines.yaml @@ -25,6 +25,9 @@ - title: Scala image_url: /images/logos/sdks/scala_pipeline.png url: /documentation/sdks/scala/ +- title: YAML + image_url: /images/logos/sdks/yaml.png + url: /documentation/sdks/yaml/ - title: SQL image_url: /images/logos/sdks/sql_pipelines.png url: /documentation/dsls/sql/overview/ diff --git a/website/www/site/layouts/partials/section-menu/en/sdks.html b/website/www/site/layouts/partials/section-menu/en/sdks.html index 73bea15a28d1..4405c9e4d3ee 100644 --- a/website/www/site/layouts/partials/section-menu/en/sdks.html +++ b/website/www/site/layouts/partials/section-menu/en/sdks.html @@ -84,6 +84,17 @@ +
  • + Yaml + +
  • +
  • SQL