diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index a21782bdc603..b2544e773552 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -129,6 +129,8 @@ 'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text' 'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub' 'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub' + 'ReadFromIceberg': 'apache_beam.yaml.yaml_io.read_from_iceberg' + 'WriteToIceberg': 'apache_beam.yaml.yaml_io.write_to_iceberg' # Declared as a renaming transform to avoid exposing all # (implementation-specific) pandas arguments and aligning with possible Java diff --git a/sdks/python/apache_beam/yaml/tests/iceberg.yaml b/sdks/python/apache_beam/yaml/tests/iceberg.yaml new file mode 100644 index 000000000000..aee9725e0c4d --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/iceberg.yaml @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + - {label: "389a", rank: 2} + - type: WriteToIceberg + config: + table: "default.table" + catalog_name: "some_catalog" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + + - pipeline: + type: chain + transforms: + - type: ReadFromIceberg + config: + table: "default.table" + catalog_name: "some_catalog" + catalog_properties: + type: "hadoop" + warehouse: "{TEMP_DIR}/dir" + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + - {label: "389a", rank: 2} diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index f8d66645d945..a675e70aa0b0 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -482,6 +482,96 @@ def attributes_extractor(row): timestamp_attribute=timestamp_attribute)) +def read_from_iceberg( + table: str, + catalog_name: Optional[str] = None, + catalog_properties: Optional[Mapping[str, str]] = None, + config_properties: Optional[Mapping[str, str]] = None, +): + # TODO(robertwb): It'd be nice to derive this list of parameters, along with + # their types and docs, programmatically from the iceberg (or managed) + # schemas. + + """Reads an Apache Iceberg table. + + See also the [Apache Iceberg Beam documentation]( + https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg). + + Args: + table: The identifier of the Apache Iceberg table. Example: "db.table1". + catalog_name: The name of the catalog. Example: "local". + catalog_properties: A map of configuration properties for the Apache Iceberg + catalog. + The required properties depend on the catalog. For more information, see + CatalogUtil in the Apache Iceberg documentation. + config_properties: An optional set of Hadoop configuration properties. + For more information, see CatalogUtil in the Apache Iceberg documentation. + """ + return beam.managed.Read( + "iceberg", + config=dict( + table=table, + catalog_name=catalog_name, + catalog_properties=catalog_properties, + config_properties=config_properties)) + + +def write_to_iceberg( + table: str, + catalog_name: Optional[str] = None, + catalog_properties: Optional[Mapping[str, str]] = None, + config_properties: Optional[Mapping[str, str]] = None, + triggering_frequency_seconds: Optional[int] = None, + keep: Optional[Iterable[str]] = None, + drop: Optional[Iterable[str]] = None, + only: Optional[str] = None, +): + # TODO(robertwb): It'd be nice to derive this list of parameters, along with + # their types and docs, programmatically from the iceberg (or managed) + # schemas. + + """Writes to an Apache Iceberg table. + + See also the [Apache Iceberg Beam documentation]( + https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg) + including the [dynamic destinations section]( + https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations) + for use of the keep, drop, and only parameters. + + Args: + table: The identifier of the Apache Iceberg table. Example: "db.table1". + catalog_name: The name of the catalog. Example: "local". + catalog_properties: A map of configuration properties for the Apache Iceberg + catalog. + The required properties depend on the catalog. For more information, see + CatalogUtil in the Apache Iceberg documentation. + config_properties: An optional set of Hadoop configuration properties. + For more information, see CatalogUtil in the Apache Iceberg documentation. + triggering_frequency_seconds: For streaming write pipelines, the frequency + at which the sink attempts to produce snapshots, in seconds. + + keep: An optional list of field names to keep when writing to the + destination. Other fields are dropped. Mutually exclusive with drop + and only. + drop: An optional list of field names to drop before writing to the + destination. Mutually exclusive with keep and only. + only: The name of exactly one field to keep as the top level record when + writing to the destination. All other fields are dropped. This field must + be of row type. Mutually exclusive with drop and keep. + """ + return beam.managed.Write( + "iceberg", + config=dict( + table=table, + catalog_name=catalog_name, + catalog_properties=catalog_properties, + config_properties=config_properties, + triggering_frequency_seconds=triggering_frequency_seconds, + keep=keep, + drop=drop, + only=only)) + + def io_providers(): with open(os.path.join(os.path.dirname(__file__), 'standard_io.yaml')) as fin: return yaml_provider.parse_providers(yaml.load(fin, Loader=yaml.SafeLoader))