Skip to content

Commit

Permalink
[yaml] add FileIO docs
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber committed Nov 21, 2024
1 parent a06454a commit 7ae8638
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 73 deletions.
35 changes: 24 additions & 11 deletions sdks/python/apache_beam/yaml/generate_yaml_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@ def _fake_value(name, beam_type):
raise ValueError(f"Unrecognized type_info: {type_info!r}")


EXCLUDE_ARGS = ['args', 'kwargs']


def _fake_row(schema):
if schema is None:
return '...'
return {f.name: _fake_value(f.name, f.type) for f in schema.fields}
return {
f.name: _fake_value(f.name, f.type)
for f in schema.fields if f.name not in EXCLUDE_ARGS
}


def pretty_example(provider, t, base_t=None):
Expand Down Expand Up @@ -160,13 +166,14 @@ def normalize_error_handling(f):

def lines():
for f in schema.fields:
f = normalize_error_handling(f)
yield ''.join([
f'**{f.name}** `{pretty_type(f.type)}`',
maybe_optional(f.type),
indent(': ' + f.description if f.description else '', 2),
maybe_row_parameters(f.type),
])
if f.name not in EXCLUDE_ARGS:
f = normalize_error_handling(f)
yield ''.join([
f'**{f.name}** `{pretty_type(f.type)}`',
maybe_optional(f.type),
indent(': ' + f.description if f.description else '', 2),
maybe_row_parameters(f.type),
])

return '\n\n'.join('*' + indent(line, 2) for line in lines()).strip()

Expand All @@ -189,6 +196,11 @@ def io_grouping_key(transform_name):
return 0, transform_name


def normalize_beam_version(desc):
return desc.replace(
'BEAM_VERSION', 'current' if '.dev' in beam_version else beam_version)


SKIP = [
'Combine',
'Filter',
Expand All @@ -200,9 +212,10 @@ def transform_docs(transform_base, transforms, providers, extra_docs=''):
return '\n'.join([
f'## {transform_base}',
'',
longest(
lambda t: longest(lambda p: p.description(t), providers[t]),
transforms).replace('::\n', '\n\n :::yaml\n'),
normalize_beam_version(
longest(
lambda t: longest(lambda p: p.description(t), providers[t]),
transforms).replace('::\n', '\n\n :::yaml\n')),
'',
extra_docs,
'',
Expand Down
72 changes: 19 additions & 53 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# should be kept in sync.
# TODO(yaml): See if this can be enforced programmatically.

# BigQueryIO Java
- type: renaming
transforms:
'ReadFromBigQuery': 'ReadFromBigQuery'
Expand All @@ -48,6 +49,7 @@
config:
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'

# KafkaIO Java
- type: renaming
transforms:
'ReadFromKafka': 'ReadFromKafka'
Expand Down Expand Up @@ -83,6 +85,7 @@
config:
gradle_target: 'sdks:java:io:expansion-service:shadowJar'

# PubSubLite Java
- type: renaming
transforms:
'ReadFromPubSubLite': 'ReadFromPubSubLite'
Expand Down Expand Up @@ -120,76 +123,38 @@
config:
gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'

# BigQueryIO Python
- type: python
transforms:
'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery'
# Disable until https://github.com/apache/beam/issues/28162 is resolved.
# 'WriteToBigQuery': 'apache_beam.yaml.yaml_io.write_to_bigquery'

# FileIO Python
- type: python
transforms:
'ReadFromText': 'apache_beam.yaml.yaml_io.read_from_text'
'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text'
'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub'
'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub'
'ReadFromCsv': 'apache_beam.yaml.yaml_io.read_from_csv'
'WriteToCsv': 'apache_beam.yaml.yaml_io.write_to_csv'
'ReadFromJson': 'apache_beam.yaml.yaml_io.read_from_json'
'WriteToJson': 'apache_beam.yaml.yaml_io.write_to_json'
'ReadFromParquet': 'apache_beam.yaml.yaml_io.read_from_parquet'
'WriteToParquet': 'apache_beam.yaml.yaml_io.write_to_parquet'
'ReadFromAvro': 'apache_beam.yaml.yaml_io.read_from_avro'
'WriteToAvro': 'apache_beam.yaml.yaml_io.write_to_avro'

# Declared as a renaming transform to avoid exposing all
# (implementation-specific) pandas arguments and aligning with possible Java
# implementation.
# Invoking these directly as a PyTransform is still an option for anyone wanting
# to use these power-features in a language-dependent manner.
- type: renaming
transforms:
'ReadFromCsv': 'ReadFromCsv'
'WriteToCsv': 'WriteToCsv'
'ReadFromJson': 'ReadFromJson'
'WriteToJson': 'WriteToJson'
'ReadFromParquet': 'ReadFromParquet'
'WriteToParquet': 'WriteToParquet'
'ReadFromAvro': 'ReadFromAvro'
'WriteToAvro': 'WriteToAvro'
config:
mappings:
'ReadFromCsv':
path: 'path'
delimiter: 'sep'
comment: 'comment'
'WriteToCsv':
path: 'path'
delimiter: 'sep'
'ReadFromJson':
path: 'path'
'WriteToJson':
path: 'path'
'ReadFromParquet':
path: 'file_pattern'
'WriteToParquet':
path: 'file_path_prefix'
'ReadFromAvro':
path: 'file_pattern'
'WriteToAvro':
path: 'file_path_prefix'
defaults:
'ReadFromParquet':
as_rows: True
'ReadFromAvro':
as_rows: True
underlying_provider:
type: python
transforms:
'ReadFromCsv': 'apache_beam.io.ReadFromCsv'
'WriteToCsv': 'apache_beam.io.WriteToCsv'
'ReadFromJson': 'apache_beam.io.ReadFromJson'
'WriteToJson': 'apache_beam.io.WriteToJson'
'ReadFromParquet': 'apache_beam.io.ReadFromParquet'
'WriteToParquet': 'apache_beam.io.WriteToParquet'
'ReadFromAvro': 'apache_beam.io.ReadFromAvro'
'WriteToAvro': 'apache_beam.io.WriteToAvro'

# FileIO Java
- type: beamJar
transforms:
'WriteToCsv': 'beam:schematransform:org.apache.beam:csv_write:v1'
'WriteToJson': 'beam:schematransform:org.apache.beam:json_write:v1'
config:
gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'

# JdbcIO Java
- type: renaming
transforms:
'ReadFromJdbc': 'ReadFromJdbc'
Expand Down Expand Up @@ -258,6 +223,7 @@
config:
gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'

# SpannerIO Java
- type: renaming
transforms:
'ReadFromSpanner': 'ReadFromSpanner'
Expand Down
Loading

0 comments on commit 7ae8638

Please sign in to comment.