Skip to content

Commit

Permalink
Support for Iceberg from YAML via ManagedIO. (apache#33579)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jan 17, 2025
1 parent 46699a0 commit 4057463
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text'
'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub'
'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub'
'ReadFromIceberg': 'apache_beam.yaml.yaml_io.read_from_iceberg'
'WriteToIceberg': 'apache_beam.yaml.yaml_io.write_to_iceberg'

# Declared as a renaming transform to avoid exposing all
# (implementation-specific) pandas arguments and aligning with possible Java
Expand Down
55 changes: 55 additions & 0 deletions sdks/python/apache_beam/yaml/tests/iceberg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

fixtures:
- name: TEMP_DIR
type: "tempfile.TemporaryDirectory"

pipelines:
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
- type: WriteToIceberg
config:
table: "default.table"
catalog_name: "some_catalog"
catalog_properties:
type: "hadoop"
warehouse: "{TEMP_DIR}/dir"

- pipeline:
type: chain
transforms:
- type: ReadFromIceberg
config:
table: "default.table"
catalog_name: "some_catalog"
catalog_properties:
type: "hadoop"
warehouse: "{TEMP_DIR}/dir"
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
90 changes: 90 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,96 @@ def attributes_extractor(row):
timestamp_attribute=timestamp_attribute))


def read_from_iceberg(
table: str,
catalog_name: Optional[str] = None,
catalog_properties: Optional[Mapping[str, str]] = None,
config_properties: Optional[Mapping[str, str]] = None,
):
# TODO(robertwb): It'd be nice to derive this list of parameters, along with
# their types and docs, programmatically from the iceberg (or managed)
# schemas.

"""Reads an Apache Iceberg table.
See also the [Apache Iceberg Beam documentation](
https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg).
Args:
table: The identifier of the Apache Iceberg table. Example: "db.table1".
catalog_name: The name of the catalog. Example: "local".
catalog_properties: A map of configuration properties for the Apache Iceberg
catalog.
The required properties depend on the catalog. For more information, see
CatalogUtil in the Apache Iceberg documentation.
config_properties: An optional set of Hadoop configuration properties.
For more information, see CatalogUtil in the Apache Iceberg documentation.
"""
return beam.managed.Read(
"iceberg",
config=dict(
table=table,
catalog_name=catalog_name,
catalog_properties=catalog_properties,
config_properties=config_properties))


def write_to_iceberg(
table: str,
catalog_name: Optional[str] = None,
catalog_properties: Optional[Mapping[str, str]] = None,
config_properties: Optional[Mapping[str, str]] = None,
triggering_frequency_seconds: Optional[int] = None,
keep: Optional[Iterable[str]] = None,
drop: Optional[Iterable[str]] = None,
only: Optional[str] = None,
):
# TODO(robertwb): It'd be nice to derive this list of parameters, along with
# their types and docs, programmatically from the iceberg (or managed)
# schemas.

"""Writes to an Apache Iceberg table.
See also the [Apache Iceberg Beam documentation](
https://cloud.google.com/dataflow/docs/guides/managed-io#iceberg)
including the [dynamic destinations section](
https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations)
for use of the keep, drop, and only parameters.
Args:
table: The identifier of the Apache Iceberg table. Example: "db.table1".
catalog_name: The name of the catalog. Example: "local".
catalog_properties: A map of configuration properties for the Apache Iceberg
catalog.
The required properties depend on the catalog. For more information, see
CatalogUtil in the Apache Iceberg documentation.
config_properties: An optional set of Hadoop configuration properties.
For more information, see CatalogUtil in the Apache Iceberg documentation.
triggering_frequency_seconds: For streaming write pipelines, the frequency
at which the sink attempts to produce snapshots, in seconds.
keep: An optional list of field names to keep when writing to the
destination. Other fields are dropped. Mutually exclusive with drop
and only.
drop: An optional list of field names to drop before writing to the
destination. Mutually exclusive with keep and only.
only: The name of exactly one field to keep as the top level record when
writing to the destination. All other fields are dropped. This field must
be of row type. Mutually exclusive with drop and keep.
"""
return beam.managed.Write(
"iceberg",
config=dict(
table=table,
catalog_name=catalog_name,
catalog_properties=catalog_properties,
config_properties=config_properties,
triggering_frequency_seconds=triggering_frequency_seconds,
keep=keep,
drop=drop,
only=only))


def io_providers():
with open(os.path.join(os.path.dirname(__file__), 'standard_io.yaml')) as fin:
return yaml_provider.parse_providers(yaml.load(fin, Loader=yaml.SafeLoader))

0 comments on commit 4057463

Please sign in to comment.